Source code for aioscraper.core.session.base
import abc
from contextlib import AsyncExitStack
from types import TracebackType
from aioscraper.types import Request, Response
[docs]
class BaseRequestContextManager(abc.ABC):
"""Asynchronous context manager that encapsulates request execution lifecycle."""
def __init__(self, request: Request):
self._request = request
self._exit_stack = AsyncExitStack()
[docs]
@abc.abstractmethod
async def __aenter__(self) -> Response:
"""Send the HTTP request and return a populated :class:`Response`."""
[docs]
async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
):
"""Tear down resources registered in the exit stack when the request finishes."""
await self._exit_stack.__aexit__(exc_type, exc_val, exc_tb)
[docs]
class BaseSession(abc.ABC):
"Base abstract class for HTTP session."
[docs]
@abc.abstractmethod
def make_request(self, request: Request) -> BaseRequestContextManager:
"""Build a context manager responsible for executing ``request``."""
...
[docs]
@abc.abstractmethod
async def close(self):
"""
Close the session and release all resources.
This method should be called after finishing work with the session
to properly release resources.
"""
...