Source code for aioscraper.core.pipeline
from logging import getLogger
from typing import Any, Mapping
from aioscraper._helpers.func import get_func_kwargs
from aioscraper._helpers.log import get_log_name
from aioscraper.config import PipelineConfig
from aioscraper.exceptions import PipelineException, StopItemProcessing, StopMiddlewareProcessing
from aioscraper.types.pipeline import (
GlobalPipelineMiddleware,
GlobalPipelineMiddlewareFactory,
Pipeline,
PipelineContainer,
PipelineItemType,
)
logger = getLogger(__name__)
[docs]
class PipelineDispatcher:
"Routes items through the registered pipeline chain."
def __init__(
self,
config: PipelineConfig,
pipelines: Mapping[Any, PipelineContainer],
global_middleware_factories: list[GlobalPipelineMiddlewareFactory[Any]] | None = None,
dependencies: Mapping[str, Any] | None = None,
):
self._config = config
self._pipelines = pipelines
self._global_middleware_factories = global_middleware_factories or []
self._dependencies: Mapping[str, Any] = dependencies or {}
logger.info(
"Pipeline dispatcher created: pipelines=%d, global_middleware_factories=%d, strict=%s",
len(pipelines),
len(self._global_middleware_factories),
config.strict,
)
self._handler = self._build_handler()
async def _put_item(self, item: PipelineItemType) -> PipelineItemType:
"Processes an item through pre-middleware, pipelines, and post-middleware for its type."
item_type = type(item).__name__
logger.debug("Pipeline item received: %s", item)
try:
pipe_container = self._pipelines[type(item)]
except KeyError:
if self._config.strict:
logger.exception("Pipeline not found for item type %s (strict mode)", item_type)
raise PipelineException(f"Pipelines for item {item_type} not found") from None
logger.warning("Pipeline not found for item type %s, skipping", item_type)
return item
for middleware in pipe_container.pre_middlewares:
try:
item = await middleware(item)
except StopMiddlewareProcessing:
logger.debug("StopMiddlewareProcessing in pre middleware for %s: stopping pre chain", item_type)
break
except StopItemProcessing:
logger.debug("StopItemProcessing in pre middleware for %s: aborting", item_type)
return item
for pipeline in pipe_container.pipelines:
item = await pipeline.put_item(item)
for middleware in pipe_container.post_middlewares:
try:
item = await middleware(item)
except StopMiddlewareProcessing:
logger.debug("StopMiddlewareProcessing in post middleware for %s: stopping post chain", item_type)
break
except StopItemProcessing:
logger.debug("StopItemProcessing in post middleware for %s: aborting", item_type)
return item
return item
def _build_handler(self) -> Pipeline[Any]:
async def handler(item: PipelineItemType) -> PipelineItemType:
return await self._put_item(item)
for factory in self._global_middleware_factories:
try:
logger.debug("Instantiating global pipeline middleware factory: %s", get_log_name(factory))
middleware = factory(**get_func_kwargs(factory, **self._dependencies))
except Exception as e:
raise PipelineException(
f"Failed to instantiate global pipeline middleware factory {get_log_name(factory)}",
) from e
next_handler = handler
async def wrapped(
item: PipelineItemType,
_mw: GlobalPipelineMiddleware[PipelineItemType] = middleware,
_next: Pipeline[PipelineItemType] = next_handler,
):
return await _mw(_next, item)
handler = wrapped
return handler
[docs]
async def put_item(self, item: PipelineItemType) -> PipelineItemType:
"Dispatches an item through the pipeline."
try:
return await self._handler(item)
except StopItemProcessing:
logger.debug("StopItemProcessing in pipeline handler: aborting item processing")
return item
[docs]
async def close(self):
"""
Closes all pipelines.
Calls the close() method for each pipeline in the system.
"""
total_pipelines = sum(len(pc.pipelines) for pc in self._pipelines.values())
logger.debug("Closing pipeline dispatcher: %d pipeline(s) to close", total_pipelines)
for item_type, pipe_container in self._pipelines.items():
for pipeline in pipe_container.pipelines:
try:
await pipeline.close()
except Exception:
logger.exception("Error closing pipeline for type %s", get_log_name(item_type))