Source code for aioscraper.core.executor

import asyncio
from logging import getLogger
from typing import Any

from aioscraper._helpers.asyncio import execute_coroutines
from aioscraper._helpers.func import get_func_kwargs
from aioscraper.config import Config
from aioscraper.holders import MiddlewareHolder
from aioscraper.types import Scraper

from .pipeline import PipelineDispatcher
from .request_manager import RequestManager
from .session import SessionMaker

logger = getLogger(__name__)


[docs] class ScraperExecutor: """ Executes scrapers and manages the scraping process. This class is responsible for running scraper functions, managing the request scheduler, and handling the graceful shutdown of the scraping process. """ def __init__( self, config: Config, scrapers: list[Scraper], dependencies: dict[str, Any], middleware_holder: MiddlewareHolder, pipeline_dispatcher: PipelineDispatcher, sessionmaker: SessionMaker, ): self._config = config self._scrapers = scrapers self._dependencies = {"config": config, "pipeline": pipeline_dispatcher.put_item, **dependencies} self._pipeline_dispatcher = pipeline_dispatcher self._request_manager = RequestManager( scheduler_config=self._config.scheduler, rate_limit_config=self._config.session.rate_limit, retry_config=self._config.session.retry, shutdown_check_interval=self._config.execution.shutdown_check_interval, sessionmaker=sessionmaker, dependencies=self._dependencies, middleware_holder=middleware_holder, )
[docs] async def run(self): "Start the scraping process." self._request_manager.start_listening() try: logger.debug("Running %d scraper(s) concurrently", len(self._scrapers)) await asyncio.gather( *[ scraper( **get_func_kwargs( scraper, send_request=self._request_manager.sender, **self._dependencies, ), ) for scraper in self._scrapers ], ) logger.debug("Waiting for pending requests") await self._request_manager.wait() logger.info("Executor finished: all scrapers and requests completed") finally: await self._request_manager.shutdown()
[docs] async def close(self): "Close all resources and cleanup." await execute_coroutines(self._request_manager.close(), self._pipeline_dispatcher.close()) logger.debug("Executor closed successfully")