Source code for aioscraper.core.runner
import asyncio
import logging
import signal
from contextlib import suppress
from functools import partial
from .scraper import AIOScraper
logger = logging.getLogger(__name__)
def _setup_signal_handlers(loop: asyncio.AbstractEventLoop, shutdown: asyncio.Event, force_exit: asyncio.Event):
"Register SIGINT/SIGTERM handlers; repeated signal triggers force_exit."
def _trigger(sig_name: str):
if shutdown.is_set():
if not force_exit.is_set():
logger.warning("Received second %s, ignore shutdown timeout", sig_name)
force_exit.set()
return
logger.info("Received %s, starting shutdown", sig_name)
shutdown.set()
for sig in (signal.SIGINT, signal.SIGTERM):
sig_name = signal.Signals(sig).name
try:
loop.add_signal_handler(sig, partial(_trigger, sig_name))
except NotImplementedError:
# Windows / limited envs: fallback to signal.signal
try:
signal.signal(sig, lambda *_, s=sig_name: loop.call_soon_threadsafe(_trigger, s))
except (ValueError, RuntimeError):
logger.debug("Signal handler for %s was not installed", sig_name)
async def _run_scraper_without_force_exit(scraper: AIOScraper, shutdown_event: asyncio.Event):
"Run scraper with shutdown handling, ignoring force-exit logic."
shutdown_task = asyncio.create_task(shutdown_event.wait())
async with scraper:
scraper_task = asyncio.create_task(scraper.wait())
wait_set = {scraper_task, shutdown_task}
done, _ = await asyncio.wait(wait_set, return_when=asyncio.FIRST_COMPLETED)
if scraper_task in done:
shutdown_task.cancel()
with suppress(asyncio.CancelledError):
await shutdown_task
return await scraper_task
logger.warning("Shutdown requested, cancelling tasks")
scraper_task.cancel()
try:
await asyncio.wait_for(scraper_task, timeout=scraper.config.execution.shutdown_timeout)
except asyncio.TimeoutError:
logger.exception("Shutdown timeout expired")
except asyncio.CancelledError:
pass
finally:
with suppress(asyncio.CancelledError):
await shutdown_task
async def _run_scraper(
scraper: AIOScraper,
*,
shutdown_event: asyncio.Event | None = None,
force_exit_event: asyncio.Event | None = None,
install_signal_handlers: bool = True,
):
"Main runner: wires signal handlers, listens for force-exit, delegates shutdown-aware execution."
loop = asyncio.get_running_loop()
shutdown = shutdown_event or asyncio.Event()
force_exit = force_exit_event or asyncio.Event()
if install_signal_handlers:
_setup_signal_handlers(loop, shutdown, force_exit)
force_exit_task = asyncio.create_task(force_exit.wait())
scraper_task = asyncio.create_task(_run_scraper_without_force_exit(scraper, shutdown))
done, _ = await asyncio.wait([force_exit_task, scraper_task], return_when=asyncio.FIRST_COMPLETED)
if force_exit_task in done:
scraper_task.cancel()
with suppress(asyncio.CancelledError):
await scraper_task
return
await scraper_task
[docs]
async def run_scraper(scraper: AIOScraper):
"Public entrypoint to run scraper with signal handling."
await _run_scraper(scraper)