Source code for aioscraper.types.pipeline

from dataclasses import dataclass, field
from typing import Any, Callable, Literal, Protocol, TypeVar, runtime_checkable

PipelineItemType = TypeVar("PipelineItemType")

PipelineMiddlewareStage = Literal["pre", "post"]


[docs] @runtime_checkable class BasePipeline(Protocol[PipelineItemType]): "Interface for classes that process scraped items of a specific type."
[docs] async def put_item(self, item: PipelineItemType) -> PipelineItemType: """ Process an item and return it (mutated or replaced). This method must be implemented by all concrete pipeline classes. """ ...
[docs] async def close(self): """ Close the pipeline. This method is called when the pipeline is no longer needed. It can be overridden to perform any necessary cleanup operations. """ ...
[docs] class PipelineMiddleware(Protocol[PipelineItemType]): "Async hook used before or after pipeline execution; must return the item."
[docs] async def __call__(self, item: PipelineItemType) -> PipelineItemType: ...
[docs] class Pipeline(Protocol[PipelineItemType]): """Protocol for callables that accept an item and return the processed item."""
[docs] async def __call__(self, item: PipelineItemType) -> PipelineItemType: ...
ItemHandler = Pipeline
[docs] class GlobalPipelineMiddleware(Protocol[PipelineItemType]): "Wrapper invoked around the entire pipeline chain for every item type."
[docs] async def __call__(self, handler: ItemHandler, item: PipelineItemType) -> PipelineItemType: ...
GlobalPipelineMiddlewareFactory = Callable[..., GlobalPipelineMiddleware[PipelineItemType]] @dataclass(slots=True, kw_only=True) class PipelineContainer: pipelines: list[BasePipeline[Any]] = field(default_factory=list) pre_middlewares: list[PipelineMiddleware[Any]] = field(default_factory=list) post_middlewares: list[PipelineMiddleware[Any]] = field(default_factory=list)