Example: Web Scraper Pipeline¶
A complete multi-stage web scraper demonstrating rate limiting, retries, workflows, progress tracking, hooks, periodic cleanup, and named queues.
Project Structure¶
tasks.py¶
import json
import time
from taskito import Queue, current_job, chain, group, chord
queue = Queue(
db_path=".taskito/scraper.db",
workers=4,
default_retry=3,
default_timeout=60,
result_ttl=3600, # Auto-cleanup results after 1 hour
)
# ── Hooks ────────────────────────────────────────────────
@queue.before_task
def log_start(task_name, args, kwargs):
print(f"[START] {task_name}")
@queue.on_success
def log_success(task_name, args, kwargs, result):
print(f"[DONE] {task_name}")
@queue.on_failure
def log_failure(task_name, args, kwargs, error):
print(f"[FAIL] {task_name}: {error}")
# ── Tasks ────────────────────────────────────────────────
@queue.task(
rate_limit="30/m", # Max 30 requests per minute
max_retries=5,
retry_backoff=2.0,
queue="scraping",
)
def fetch_page(url):
"""Fetch a single URL. Rate-limited and retried on failure."""
import urllib.request
with urllib.request.urlopen(url, timeout=10) as resp:
return resp.read().decode("utf-8")
@queue.task(queue="processing")
def extract_links(html):
"""Extract all links from an HTML page."""
import re
return re.findall(r'href="(https?://[^"]+)"', html)
@queue.task(queue="processing")
def extract_title(html):
"""Extract the page title."""
import re
match = re.search(r"<title>(.*?)</title>", html, re.IGNORECASE | re.DOTALL)
return match.group(1).strip() if match else "No title"
@queue.task(queue="storage")
def store_results(results, url=""):
"""Store scraped data to a JSON file."""
data = {"url": url, "results": results, "scraped_at": time.time()}
filename = f"output_{int(time.time())}.json"
with open(filename, "w") as f:
json.dump(data, f, indent=2)
return filename
@queue.task(queue="processing")
def summarize(pages):
"""Aggregate results from multiple pages."""
total_links = sum(len(p.get("links", [])) for p in pages)
titles = [p.get("title", "?") for p in pages]
return {
"pages_scraped": len(pages),
"total_links": total_links,
"titles": titles,
}
@queue.task()
def scrape_page(url):
"""Full pipeline for a single page: fetch → extract links + title."""
html = fetch_page(url) # Direct call (not queued)
links = extract_links(html)
title = extract_title(html)
return {"url": url, "title": title, "links": links}
# ── Periodic cleanup ────────────────────────────────────
@queue.periodic(cron="0 0 * * * *")
def hourly_cleanup():
"""Purge completed jobs and dead letters every hour."""
completed = queue.purge_completed(older_than=3600)
dead = queue.purge_dead(older_than=86400)
print(f"Cleanup: purged {completed} completed, {dead} dead")
run.py¶
"""Enqueue scraping jobs."""
from tasks import queue, scrape_page, summarize, store_results
from taskito import group, chord
urls = [
"https://example.com",
"https://httpbin.org/html",
"https://jsonplaceholder.typicode.com",
]
# ── Option 1: Simple parallel scraping ──────────────────
print("Enqueuing scrape jobs...")
jobs = [scrape_page.delay(url) for url in urls]
# Wait for all results
for job in jobs:
result = job.result(timeout=30)
print(f" {result['title']} — {len(result['links'])} links")
# ── Option 2: Chord — scrape in parallel, then summarize ─
print("\nRunning chord pipeline...")
result = chord(
group(*[scrape_page.s(url) for url in urls]),
summarize.s(),
).apply(queue)
summary = result.result(timeout=60)
print(f" Scraped {summary['pages_scraped']} pages")
print(f" Found {summary['total_links']} total links")
print(f" Titles: {summary['titles']}")
# ── Option 3: Batch enqueue with .map() ─────────────────
print("\nBatch enqueue with .map()...")
jobs = scrape_page.map([(url,) for url in urls])
results = [j.result(timeout=30) for j in jobs]
print(f" Scraped {len(results)} pages in batch")
# ── Check stats ─────────────────────────────────────────
stats = queue.stats()
print(f"\nQueue stats: {stats}")
worker.py¶
"""Start the worker."""
from tasks import queue
if __name__ == "__main__":
print("Starting scraper worker...")
queue.run_worker(queues=["scraping", "processing", "storage"])
Running It¶
Key Patterns Demonstrated¶
| Pattern | Where |
|---|---|
| Rate limiting | fetch_page — 30 requests/min |
| Retry with backoff | fetch_page — 5 retries, 2.0x backoff |
| Named queues | scraping, processing, storage |
| Hooks | log_start, log_success, log_failure |
| Workflows (chord) | Parallel scrape → summarize |
| Batch enqueue | .map() for bulk job creation |
| Periodic tasks | hourly_cleanup runs every hour |
| Result TTL | Auto-cleanup completed jobs after 1 hour |
| Direct call | scrape_page calls fetch_page() directly |