Pipelines¶
Pipelines are ordered processors for your scraped items. Items are routed by their Python type: register pipelines against the item class and the dispatcher will use type(item) to find them. Add pipelines with scraper.pipeline.add or decorate pipeline classes with @scraper.pipeline(ItemType, *args, **kwargs); wrap their flow with middleware decorators.
Core¶
Implement the
BasePipelineprotocol: provideput_item(persist/transform/fan out and return the item) andclosefor cleanup.Pipelines are keyed by item type; every pipeline registered for that type runs sequentially.
Missing pipeline handling is controlled by
PipelineConfig.strict(defaults to raising; setPIPELINE_STRICT=falseto warn and continue).scraper.pipeline.add(...)adds one or more pipeline instances for a given item type.@scraper.pipeline(ItemType, *args, **kwargs)is a convenient decorator that instantiates the pipeline class and registers it for you (handy when the pipeline needs constructor arguments).
from dataclasses import dataclass
from aioscraper import AIOScraper, Request, SendRequest, Response, Pipeline, ItemHandler
scraper = AIOScraper()
# Mock database client (replace with real DB client in production)
class DatabaseClient:
async def connect(self):
"""Establish database connection"""
print("Connected to database")
async def save_article(self, title: str, url: str):
"""Save article to database"""
print(f"Saved: {title} -> {url}")
async def close(self):
"""Close database connection"""
print("Closed database connection")
@dataclass(slots=True)
class Article:
title: str
url: str
class SaveArticlePipeline:
"""Save articles to database using DB client"""
def __init__(self, db: DatabaseClient):
self.db = db
async def put_item(self, item: Article) -> Article:
await self.db.save_article(item.title, item.url)
return item
async def close(self):
"""Cleanup when scraper shuts down"""
pass
# Lifespan: setup and teardown for resources
@scraper.lifespan
async def lifespan(scraper: AIOScraper):
"""
Manage resources lifecycle.
Setup phase: create DB client, connect, register as dependency.
Teardown phase: close connections.
"""
db = DatabaseClient()
await db.connect()
# Register db as dependency - it will be injected into callbacks/errbacks/middlewares
scraper.add_dependencies(db=db)
# Register pipeline for Article type - will handle all Article items
scraper.pipeline.add(Article, SaveArticlePipeline(db))
yield # Scraper runs here
# Cleanup
await db.close()
# Entry point: schedule requests to fetch articles
@scraper
async def get_article(send_request: SendRequest):
"""Scraper entry point - sends request to article API"""
await send_request(Request(url="https://api.article.com", callback=callback))
async def callback(response: Response, pipeline: Pipeline):
"""
Process article response and send to pipeline.
The pipeline dependency is automatically injected by aioscraper.
"""
data = await response.json()
# Send item to pipeline - it will be saved to DB via SaveArticlePipeline
await pipeline(Article(title=data["title"], url=response.url))
Middlewares around pipelines¶
Pipeline middlewares let you step in before the first pipeline sees an item and after the last one finishes.
Use @scraper.pipeline.middleware("pre", ItemType) to normalize or enrich items on the way in, and @scraper.pipeline.middleware("post", ItemType) to finalize, log, or fan out results on the way out.
Global middlewares registered via @scraper.pipeline.global_middleware wrap the entire chain for every item type; they work like FastAPI-style wrappers that accept injected dependencies and must await handler(item) to keep the item moving.
If you need to bail out of a pre/post stage, raise StopMiddlewareProcessing to skip the remaining middlewares in that stage but continue the rest of the flow, or raise StopItemProcessing to stop processing the current item altogether.
@scraper.pipeline.middleware("pre", Article)
async def pre_process(item: Article) -> Article:
...
@scraper.pipeline.middleware("post", Article)
async def post_process(item: Article) -> Article:
...
@scraper.pipeline.global_middleware
def wrap_pipeline(db: DatabaseClient):
async def middleware(handler: ItemHandler, item: Article) -> Article:
db.log("start")
item = await handler(item)
db.log("end")
return item
return middleware
Flow¶
Picture the flow as nested wrappers (matryoshka style): global middlewares form the outer shells around the per-type chain. If you’ve used FastAPI middleware, it’s the same shape: a wrapper receives handler and must await handler(item) to keep the item moving.
global mw 1
global mw 2
global mw 3
pre middlewares -> pipelines -> post middlewares
global mw 3
global mw 2
global mw 1
When you call await pipeline(item):
The dispatcher picks the container by
type(item); if none is registered it raises or warns depending onPipelineConfig.strict.Global middlewares run outer-to-inner. Each wrapper does its work and awaits
handler(item)to keep going; the final result bubbles back out through them in reverse order.Inside the core chain: run all pre-middlewares in registration order (each can mutate/replace the item).
Run each pipeline instance in order; each must return the (possibly mutated) item for the next step.
Run all post-middlewares in registration order.
The returned item is whatever the last post-middleware (or pipeline, if no posts) produced.