Source code for aioscraper.middlewares.retry
import logging
from aioscraper._helpers.http import parse_retry_after
from aioscraper.config import RequestRetryConfig
from aioscraper.exceptions import HTTPException
from aioscraper.types import Request, RequestHandler, Response, SendRequest
RETRY_STATE_KEY = "_aioscraper_retry_attempts"
logger = logging.getLogger(__name__)
[docs]
class RetryMiddleware:
"""Request middleware that retries failed requests based on configuration."""
def __init__(self, config: RequestRetryConfig, send_request: SendRequest):
self._enabled = config.enabled
self._attempts = max(0, config.attempts)
self._retry_delay_factory = config.delay_factory
self._statuses = set(config.statuses)
self._exception_types = tuple(config.exceptions)
self._send_request = send_request
if self._enabled:
logger.info(
"Retry middleware enabled: attempts=%d, backoff=%s, base_delay=%0.10g, max_delay=%0.10g, "
"statuses=%s, exceptions=%s",
self._attempts,
config.backoff,
config.base_delay,
config.max_delay,
",".join(map(str, sorted(self._statuses))),
",".join(exc.__module__ + "." + exc.__qualname__ for exc in self._exception_types),
)
async def __call__(self, call_next: RequestHandler, request: Request) -> Response | None:
try:
return await call_next(request)
except Exception as exc:
if not self._enabled or not self._should_retry(exc):
raise
attempts_used = request.state.get(RETRY_STATE_KEY, 0)
if attempts_used >= self._attempts:
raise
attempts = request.state[RETRY_STATE_KEY] = attempts_used + 1
if retry_after_delay := parse_retry_after(exc):
request.delay = min(600.0, round(retry_after_delay, 6))
else:
request.delay = round(self._retry_delay_factory(attempts), 6)
await self._send_request(request)
return None
def _should_retry(self, exc: Exception) -> bool:
if self._statuses and isinstance(exc, HTTPException) and exc.status_code in self._statuses:
return True
if self._exception_types and isinstance(exc, self._exception_types):
return True
return False