Source code for aioscraper.core.scraper

import asyncio
from contextlib import AsyncExitStack, asynccontextmanager, suppress
from logging import getLogger
from types import TracebackType
from typing import Any, AsyncGenerator, Callable, Self

from aioscraper._helpers.log import get_log_name
from aioscraper.config import Config, load_config
from aioscraper.holders import MiddlewareHolder, PipelineHolder
from aioscraper.middlewares import RetryMiddleware
from aioscraper.types import Scraper, SendRequest

from .executor import ScraperExecutor
from .pipeline import PipelineDispatcher
from .session import SessionMakerFactory, get_sessionmaker

logger = getLogger(__name__)

Lifespan = Callable[["AIOScraper"], AsyncGenerator[None, None]]


[docs] class AIOScraper: """Core entrypoint that wires scrapers, middlewares, and pipelines. Args: *scrapers (Scraper): Callable scrapers queued on startup. config (Config | None): Pre-built configuration; when ``None`` the scraper loads one lazily via :func:`load_config` on ``start``. lifespan (Lifespan | None): Optional async context manager factory that wraps the scraper's lifecycle (setup/teardown). sessionmaker_factory (SessionMakerFactory | None): Override the function that builds HTTP sessions (defaults to :func:`aioscraper.core.session.factory.get_sessionmaker`). """ def __init__( self, *scrapers: Scraper, config: Config | None = None, lifespan: Lifespan | None = None, sessionmaker_factory: SessionMakerFactory | None = None, ): self.scrapers = [*scrapers] self.config = config or load_config() self.dependencies: dict[str, Any] = {} self._sessionmaker_factory = sessionmaker_factory or get_sessionmaker @asynccontextmanager async def default_lifespan(_: Self): yield self._lifespan = asynccontextmanager(lifespan) if lifespan is not None else default_lifespan self._lifespan_exit_stack = AsyncExitStack() self._middleware_holder = MiddlewareHolder() self._pipeline_holder = PipelineHolder() self._task: asyncio.Task[None] | None = None
[docs] def __call__(self, scraper: Scraper) -> Scraper: "Add a scraper callable and return it for decorator use." logger.debug("Adding scraper %s", get_log_name(scraper)) self.scrapers.append(scraper) return scraper
[docs] def add_dependencies(self, **kwargs: Any): "Add shared dependencies to inject into scraper callbacks." self.dependencies.update(kwargs)
[docs] def lifespan(self, lifespan: Lifespan) -> Lifespan: "Attach a lifespan callback to run before/after scraping." self._lifespan = asynccontextmanager(lifespan) return lifespan
@property def middleware(self) -> MiddlewareHolder: "Access the middleware registry for request/response hooks." return self._middleware_holder @property def pipeline(self) -> PipelineHolder: "Access the pipeline registry and middleware helpers." return self._pipeline_holder async def __aenter__(self) -> Self: await self._lifespan_exit_stack.enter_async_context(self._lifespan(self)) self.start() return self async def __aexit__( self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None, ): try: await self.close() finally: await self._lifespan_exit_stack.__aexit__(exc_type, exc_val, exc_tb)
[docs] def start(self): "Start the scraper and run it in the background." if self._task is None: self._task = asyncio.create_task(self._run())
async def _run(self): """Initialize and run the scraper with the configured settings.""" self._install_builtin_middlewares(self.config) executor = ScraperExecutor( config=self.config, scrapers=self.scrapers, dependencies=self.dependencies, middleware_holder=self._middleware_holder, pipeline_dispatcher=PipelineDispatcher( self.config.pipeline, pipelines=self._pipeline_holder.pipelines, global_middleware_factories=self._pipeline_holder.global_middleware_factories, dependencies=self.dependencies, ), sessionmaker=self._sessionmaker_factory(self.config.session), ) try: logger.debug("Starting executor") await executor.run() logger.debug("Scraper execution completed successfully") finally: logger.debug("Closing executor resources") await executor.close()
[docs] async def shutdown(self): "Trigger a graceful shutdown of the scraper." if self._task is None: logger.debug("Shutdown called but scraper is not running") return logger.debug("Initiating graceful shutdown (timeout=%0.10gs)", self.config.execution.shutdown_timeout) try: await self.wait(timeout=self.config.execution.shutdown_timeout) finally: await self.close()
[docs] async def wait(self, timeout: float | None = None): # noqa: ASYNC109 "Wait for the scraper to finish." if self._task is None: logger.debug("Wait called but scraper is not running") return log_level = self.config.execution.log_level timeout = timeout or self.config.execution.timeout logger.debug("Waiting for scraper to finish (timeout=%ss)", timeout) try: await asyncio.wait_for(self._task, timeout=timeout) except asyncio.TimeoutError: logger.log(log_level, "wait timeout exceeded (%ss) - forcing shutdown", timeout)
[docs] async def close(self): "Close the scraper and its associated resources." if self._task is None: logger.debug("Close called but scraper is not running") return self._task.cancel() with suppress(asyncio.CancelledError): await self._task
def _install_builtin_middlewares(self, config: Config): retry_config = config.session.retry if not retry_config.enabled: return def retry_factory(send_request: SendRequest) -> RetryMiddleware: return RetryMiddleware(retry_config, send_request) self._middleware_holder.add(retry_factory)