import asyncio
import heapq
from contextlib import AsyncExitStack
from logging import getLogger
from time import monotonic
from typing import Any
from aiojobs import Scheduler
from aioscraper._helpers.asyncio import execute_coroutine, execute_coroutines
from aioscraper._helpers.func import get_func_kwargs
from aioscraper._helpers.http import parse_retry_after, parse_url
from aioscraper.config import RateLimitConfig, RequestRetryConfig, SchedulerConfig
from aioscraper.exceptions import AIOScraperException, HTTPException, InvalidRequestData
from aioscraper.holders import MiddlewareHolder
from aioscraper.types import RequestHandler, RequestMiddleware, Response
from aioscraper.types.session import PRequest, Request, SendRequest
from .rate_limiter import RateLimitManager, RequestOutcome
from .session import SessionMaker
logger = getLogger(__name__)
_RequestQueue = asyncio.PriorityQueue[PRequest]
_RequestHead = list[PRequest]
def _get_request_sender(queue: _RequestQueue, heap: _RequestHead) -> SendRequest:
"Creates a request sender function that adds requests to the priority queue."
async def sender(request: Request) -> Request:
now = monotonic()
if request.json_data is not None and request.data is not None:
raise InvalidRequestData("Cannot send both data and json_data")
if request.json_data is not None and request.files is not None:
raise InvalidRequestData("Cannot send both files and json_data")
if request.delay:
heapq.heappush(heap, PRequest(priority=now + request.delay, request=request))
else:
await queue.put(PRequest(priority=request.priority, request=request))
return request
return sender
[docs]
class RequestManager:
"""
Manages HTTP requests with priority queuing, rate limiting, and middleware support.
Args:
scheduler_config (SchedulerConfig): Configuration for the request scheduler.
rate_limit_config (RateLimitConfig): Configuration for the request rate limiter.
retry_config (RequestRetryConfig): Configuration for request retries.
shutdown_check_interval (float): Interval between shutdown checks in seconds
sessionmaker (SessionMaker): A factory for creating session objects.
dependencies (dict[str, Any]): Additional dependencies to be injected into middleware and callbacks.
middleware_holder (MiddlewareHolder): A container for middleware collections.
"""
def __init__(
self,
scheduler_config: SchedulerConfig,
rate_limit_config: RateLimitConfig,
retry_config: RequestRetryConfig,
shutdown_check_interval: float,
sessionmaker: SessionMaker,
dependencies: dict[str, Any],
middleware_holder: MiddlewareHolder,
):
logger.info(
"Creating scheduler: concurrent_requests=%s, pending_requests=%s, close_timeout=%s",
scheduler_config.concurrent_requests,
scheduler_config.pending_requests,
scheduler_config.close_timeout,
)
self._scheduler = Scheduler(
limit=scheduler_config.concurrent_requests,
pending_limit=scheduler_config.pending_requests,
close_timeout=scheduler_config.close_timeout,
)
self._shutdown_check_interval = shutdown_check_interval
self._session = sessionmaker()
self._ready_queue: _RequestQueue = asyncio.PriorityQueue(maxsize=scheduler_config.ready_queue_max_size)
self._delayed_heap: _RequestHead = []
self._request_sender = _get_request_sender(self._ready_queue, self._delayed_heap)
self._dependencies: dict[str, Any] = {"send_request": self._request_sender, **dependencies}
self._middleware_holder = middleware_holder
self._rate_limiter_manager = RateLimitManager(
rate_limit_config,
retry_config=retry_config,
schedule=lambda pr: self._scheduler.spawn(execute_coroutine(self._send_request(pr.request))),
)
self._middlewares: list[RequestMiddleware] = self._instantiate_middlewares()
self._initialized = False
self._completed = asyncio.Event()
self._task: asyncio.Task[None] | None = None
@property
def sender(self) -> SendRequest:
return self._request_sender
def _instantiate_middlewares(self) -> list[RequestMiddleware]:
"Instantiate registered middleware factories once, injecting dependencies."
middlewares: list[RequestMiddleware] = []
for factory in self._middleware_holder:
try:
middlewares.append(factory(**get_func_kwargs(factory, **self._dependencies)))
except Exception as exc:
raise AIOScraperException(
f"Failed to instantiate request middleware factory {factory!r}",
) from exc
return middlewares
def _build_handler(self, stack: AsyncExitStack) -> RequestHandler:
"Compose the middleware chain around the innermost dispatch."
async def dispatch(request: Request) -> Response | None:
response = await stack.enter_async_context(self._session.make_request(request))
if not response.ok:
raise HTTPException(
url=str(parse_url(request.url, request.params)),
method=response.method,
headers=response.headers,
status_code=response.status,
message=await response.text(errors="replace"),
)
return response
handler: RequestHandler = dispatch
for middleware in reversed(self._middlewares):
next_handler = handler
async def wrapped(
request: Request,
_mw: RequestMiddleware = middleware,
_next: RequestHandler = next_handler,
) -> Response | None:
return await _mw(_next, request)
handler = wrapped
return handler
async def _send_request(self, request: Request):
start_time = monotonic()
latency = status_code = exception_type = retry_after = None
url = parse_url(request.url, request.params)
try:
async with AsyncExitStack() as stack:
handler = self._build_handler(stack)
logger.debug("Sending request: %s %s", request.method, url)
response = await handler(request)
if response is None:
logger.debug(
"Request handled by middleware chain: %s %s",
request.method,
url,
)
return
latency = monotonic() - start_time
status_code = response.status
logger.debug(
"Response received: %s %s - status=%d, latency=%.3fs",
request.method,
url,
response.status,
latency,
)
await self._callback(request, response)
except Exception as exc:
latency = monotonic() - start_time
exception_type = type(exc)
if isinstance(exc, HTTPException):
status_code = exc.status_code
if self._rate_limiter_manager.adaptive_strategy:
retry_after = parse_retry_after(exc)
logger.debug("Request exception: %s %s - %s: %s", request.method, url, type(exc).__name__, exc)
await self._handle_exception(request, exc)
finally:
if self._rate_limiter_manager.adaptive_strategy:
if latency is None:
latency = monotonic() - start_time
self._rate_limiter_manager.on_request_outcome(
RequestOutcome(
group_key=self._rate_limiter_manager.get_group_key(request),
latency=latency,
retry_after=retry_after,
status_code=status_code,
exception_type=exception_type,
),
)
async def _callback(self, request: Request, response: Response):
if request.callback is None:
return
if hasattr(request.callback, "__compiled__"):
await request.callback(
request=request,
response=response,
**request.cb_kwargs,
**self._dependencies,
)
else:
await request.callback(
**get_func_kwargs(
request.callback,
request=request,
response=response,
**request.cb_kwargs,
**self._dependencies,
),
)
async def _handle_exception(self, request: Request, exc: Exception):
if request.errback is not None:
try:
if hasattr(request.errback, "__compiled__"):
await request.errback(
request=request,
exc=exc,
**request.cb_kwargs,
**self._dependencies,
)
else:
await request.errback(
**get_func_kwargs(
request.errback,
request=request,
exc=exc,
**request.cb_kwargs,
**self._dependencies,
),
)
except Exception as errback_exc:
logger.exception(
"Errback failed for %s %s: original=%s, errback=%s",
request.method,
request.url,
type(exc).__name__,
type(errback_exc).__name__,
)
raise ExceptionGroup("Errback failed", [exc, errback_exc]) from None
else:
logger.error("%s: %s: %s", request.method, request.url, exc, exc_info=exc)
async def wait(self):
logger.debug("Request manager waiting for completion")
self._initialized = True
await self._completed.wait()
logger.debug("Request manager wait completed")
async def shutdown(self):
logger.debug("Request manager shutting down")
self._initialized = True
if self._task is not None:
await self._task
logger.debug("Request manager shutdown completed")
def start_listening(self):
logger.debug("Request manager starting queue listener")
self._task = asyncio.create_task(self._listen_queue())
async def _listen_queue(self):
"""Process requests from the queue using the rate limiter."""
while (
not self._initialized
or len(self._scheduler) > 0
or self._rate_limiter_manager.active
or not self._ready_queue.empty()
or len(self._delayed_heap) > 0
or await self._rate_limiter_manager.shutdown()
):
await self._pop_due_delayed()
timeout = self._next_timeout()
try:
pr = await asyncio.wait_for(self._ready_queue.get(), timeout)
except asyncio.TimeoutError:
continue
try:
await asyncio.shield(self._rate_limiter_manager(pr))
except asyncio.CancelledError:
logger.debug("Queue listener cancelled")
break
self._completed.set()
logger.info("Queue listener completed: all requests processed")
async def _pop_due_delayed(self):
"""Pop the next due delayed request from the heap."""
now = monotonic()
while self._delayed_heap and self._delayed_heap[0].priority <= now:
pr = heapq.heappop(self._delayed_heap)
pr.request.delay = None
await self._ready_queue.put(pr)
def _next_timeout(self) -> float | None:
if not self._delayed_heap:
return 0.05
pr = self._delayed_heap[0]
timeout = pr.priority - monotonic()
if timeout <= 0:
return 0.0
return timeout
[docs]
async def close(self):
"""Close the underlying session."""
await execute_coroutines(self._rate_limiter_manager.close(), self._scheduler.close(), self._session.close())
logger.debug("Request manager closed successfully")