Quickstart¶
Build your first API data collector in minutes. You’ll fetch data from GitHub’s REST API, extract repository stats, aggregate them in a pipeline, and run everything from the CLI.
Before you start, install aioscraper with an HTTP backend - see Installation Guide for details.
Create your first scraper¶
Save this as scraper.py:
import logging
from aioscraper import AIOScraper, Request, Response, SendRequest, Pipeline
from dataclasses import dataclass
logger = logging.getLogger("github_repos")
scraper = AIOScraper()
@dataclass(slots=True)
class RepoStats:
"""Data model for extracted repository stats."""
name: str
stars: int
language: str
# this decorator registers this pipeline to handle RepoStats items
@scraper.pipeline(RepoStats)
class StatsPipeline:
"""Pipeline for processing extracted repository data."""
def __init__(self):
self.total_stars = 0
async def put_item(self, item: RepoStats) -> RepoStats:
"""
Called for each extracted item.
This is where you'd:
- Save to database
- Send to message queue
- Perform validation/transformation
- Aggregate statistics
"""
self.total_stars += item.stars
logger.info("✓ %s: ⭐ %s (%s)", item.name, item.stars, item.language)
return item
async def close(self):
"""
Called when scraper shuts down.
Use for:
- Final aggregations
- Closing database connections
- Cleanup operations
"""
logger.info("Total stars collected: %s", self.total_stars)
# this decorator marks this as the scraper's entry point.
@scraper
async def get_repos(send_request: SendRequest):
"""
Entry point: defines what to scrape.
Receives send_request - a function to schedule HTTP requests.
"""
repos = (
"django/django",
"fastapi/fastapi",
"pallets/flask",
"encode/httpx",
"aio-libs/aiohttp",
)
for repo in repos:
await send_request(
Request(
url=f"https://api.github.com/repos/{repo}", # API endpoint
callback=parse_repo, # Success handler
errback=on_failure, # Error handler (network failures, timeouts)
cb_kwargs={"repo": repo}, # Additional arguments to pass to callbacks
headers={"Accept": "application/vnd.github+json"}, # Required by GitHub API
)
)
async def parse_repo(response: Response, pipeline: Pipeline):
"""
Success callback: parse response and extract data.
The `pipeline` dependency is automatically injected by aioscraper.
"""
data = await response.json() # Parse JSON response from API
await pipeline( # Send extracted item to pipeline
RepoStats(
name=data["full_name"],
stars=data["stargazers_count"],
language=data.get("language", "Unknown"),
)
)
async def on_failure(exc: Exception, repo: str):
"""
Error callback: handle request/processing failures.
Use for:
- Logging errors
- Sending alerts
- Custom retry logic
"""
logger.error("%s: cannot parse response: %s", repo, exc)
Run it¶
Execute your scraper from the command line:
aioscraper scraper --concurrent-requests=4
The --concurrent-requests flag controls how many requests run simultaneously. Without it, the default concurrency limit of 64 applies.
What happens when it runs¶
CLI loads the module: The
aioscrapercommand finds yourscraper.pyfile and locates theAIOScraperinstance.Entry point executes: Your
get_repos()function runs and schedules 5 requests to GitHub’s API.Concurrent execution: All 5 requests execute concurrently (limited by
--concurrent-requests). The async HTTP client makes non-blocking calls, so responses can arrive in any order.Callbacks process responses:
If successful:
parse_repo()extracts data and sends it to the pipelineIf failed:
on_failure()logs the error
Pipeline processes items:
StatsPipeline.put_item()runs for eachRepoStatsitem, aggregating the total stars.Cleanup on shutdown: After all requests complete,
StatsPipeline.close()prints the total stars collected.
Customize for your use case¶
- Change the API
Replace GitHub API with your target API:
await send_request( Request( url="https://api.example.com/products", callback=parse_product, headers={"Authorization": "Bearer YOUR_TOKEN"}, ) )
- Add query parameters
Use the
paramsargument:Request( url="https://api.example.com/search", params={"q": "python", "limit": 100}, callback=parse_results, )
- Save to database
In
put_item(), use your ORM or database client:async def put_item(self, item: RepoStats) -> RepoStats: await self.db.execute( "INSERT INTO repos (name, stars, language) VALUES (?, ?, ?)", (item.name, item.stars, item.language) ) return item
- Handle pagination
Send follow-up requests from callbacks:
async def parse_page(response: Response, send_request: SendRequest, page: int): data = await response.json() # Process items... if data.get("next_page"): await send_request( Request( url=data["next_page"], callback=parse_page, cb_kwargs={"page": page + 1}, ) )
Production configuration¶
For production use, configure retries, rate limiting, and concurrency via environment variables:
# Enable retries for transient failures
export RETRY_ENABLED=true
export RETRY_MAX_ATTEMPTS=3
# Enable rate limiting
export RATE_LIMIT_ENABLED=true
export RATE_LIMIT_DEFAULT_INTERVAL=1.0
# Set concurrency
export SCHEDULER_CONCURRENT_REQUESTS=10
aioscraper scraper
See CLI for all available configuration options.
Next steps¶
Learn about pipelines: See Pipelines for advanced item processing, error handling, and multiple pipelines.
Add middlewares: See Middlewares for request/response transformation, auth, logging, and circuit breaking.
Manage resources: See Lifespan for setting up database connections, external services, and cleanup.
Dependency injection: See Wiring scrapers and dependencies to inject custom dependencies into callbacks and pipelines.
Configuration: See Configuration for programmatic configuration and advanced settings.