Source code for aioscraper.holders.pipeline

import logging
from typing import Any, Callable

from aioscraper._helpers.log import get_log_name
from aioscraper.exceptions import AIOScraperException
from aioscraper.types.pipeline import (
    BasePipeline,
    GlobalPipelineMiddlewareFactory,
    PipelineContainer,
    PipelineItemType,
    PipelineMiddleware,
    PipelineMiddlewareStage,
)

logger = logging.getLogger(__name__)


[docs] class PipelineHolder: "Keeps pipeline containers and exposes decorator helpers." def __init__(self): self.pipelines: dict[Any, PipelineContainer] = {} self.global_middleware_factories: list[GlobalPipelineMiddlewareFactory[Any]] = []
[docs] def __call__( self, item_type: type[PipelineItemType], *args, **kwargs, ) -> Callable[[type[BasePipeline[PipelineItemType]]], type[BasePipeline[PipelineItemType]]]: "Return a decorator that instantiates and registers a pipeline class for the given item type." def decorator(pipeline_class: type[BasePipeline[PipelineItemType]]) -> type[BasePipeline[PipelineItemType]]: try: pipeline = pipeline_class(*args, **kwargs) except Exception as e: raise AIOScraperException( f"Failed to instantiate pipeline {pipeline_class.__name__} with provided arguments", ) from e self.add(item_type, pipeline) return pipeline_class return decorator
[docs] def add(self, item_type: type[PipelineItemType], *pipelines: BasePipeline[PipelineItemType]): "Add pipelines to process scraped data." for pipeline in pipelines: # runtime protocol check to ensure BasePipeline interface compliance try: ok = isinstance(pipeline, BasePipeline) except TypeError as exc: raise AIOScraperException( f"Invalid pipeline type {type(pipeline)!r}; " "expected an instance implementing BasePipeline protocol", ) from exc pipeline_type = type(pipeline).__name__ if not ok: raise AIOScraperException(f"Pipeline {pipeline_type} does not implement required BasePipeline methods") logger.debug("Installing pipeline %s: type=%s", pipeline_type, item_type.__name__) if item_type not in self.pipelines: self.pipelines[item_type] = PipelineContainer(pipelines=[*pipelines]) else: self.pipelines[item_type].pipelines.extend(pipelines)
[docs] def middleware( self, middleware_type: PipelineMiddlewareStage, item_type: type[PipelineItemType], ) -> Callable[[PipelineMiddleware[PipelineItemType]], PipelineMiddleware[PipelineItemType]]: "Return a decorator that registers a pipeline middleware for the given stage." def decorator(middleware: PipelineMiddleware[PipelineItemType]) -> PipelineMiddleware[PipelineItemType]: self.add_middlewares(middleware_type, item_type, middleware) return middleware return decorator
[docs] def add_middlewares( self, middleware_type: PipelineMiddlewareStage, item_type: type[PipelineItemType], *middlewares: PipelineMiddleware[PipelineItemType], ): "Add pipeline processing middlewares." if item_type not in self.pipelines: container = self.pipelines[item_type] = PipelineContainer() else: container = self.pipelines[item_type] for middleware in middlewares: logger.debug("Installing pipeline middleware %s: type=%s", get_log_name(middleware), middleware_type) match middleware_type: case "pre": container.pre_middlewares.extend(middlewares) case "post": container.post_middlewares.extend(middlewares) case _: raise ValueError(f"Unsupported pipeline middleware type: {middleware_type}")
[docs] def global_middleware( self, factory: GlobalPipelineMiddlewareFactory[PipelineItemType], ) -> GlobalPipelineMiddlewareFactory[PipelineItemType]: "Decorator form of :meth:`add_global_middlewares`." self.add_global_middlewares(factory) return factory
[docs] def add_global_middlewares(self, *factories: GlobalPipelineMiddlewareFactory[PipelineItemType]): """ Register global pipeline middleware factories in order. Each factory can accept injected dependencies and must return a middleware with signature ``async def mw(handler, item): ...`` which wraps the entire pipeline chain for every item type. """ for factory in factories: logger.debug("Installing global pipeline middleware factory %s", get_log_name(factory)) self.global_middleware_factories.append(factory)