Skip to content

Example: Web Scraper Pipeline

A complete multi-stage web scraper demonstrating rate limiting, retries, workflows, progress tracking, hooks, periodic cleanup, and named queues.

Project Structure

scraper/
  tasks.py      # Task definitions
  worker.py     # Worker entry point
  run.py        # Enqueue scraping jobs

tasks.py

import json
import time
from taskito import Queue, current_job, chain, group, chord

queue = Queue(
    db_path=".taskito/scraper.db",
    workers=4,
    default_retry=3,
    default_timeout=60,
    result_ttl=3600,  # Auto-cleanup results after 1 hour
)

# ── Hooks ────────────────────────────────────────────────

@queue.before_task
def log_start(task_name, args, kwargs):
    print(f"[START] {task_name}")

@queue.on_success
def log_success(task_name, args, kwargs, result):
    print(f"[DONE]  {task_name}")

@queue.on_failure
def log_failure(task_name, args, kwargs, error):
    print(f"[FAIL]  {task_name}: {error}")

# ── Tasks ────────────────────────────────────────────────

@queue.task(
    rate_limit="30/m",       # Max 30 requests per minute
    max_retries=5,
    retry_backoff=2.0,
    queue="scraping",
)
def fetch_page(url):
    """Fetch a single URL. Rate-limited and retried on failure."""
    import urllib.request
    with urllib.request.urlopen(url, timeout=10) as resp:
        return resp.read().decode("utf-8")

@queue.task(queue="processing")
def extract_links(html):
    """Extract all links from an HTML page."""
    import re
    return re.findall(r'href="(https?://[^"]+)"', html)

@queue.task(queue="processing")
def extract_title(html):
    """Extract the page title."""
    import re
    match = re.search(r"<title>(.*?)</title>", html, re.IGNORECASE | re.DOTALL)
    return match.group(1).strip() if match else "No title"

@queue.task(queue="storage")
def store_results(results, url=""):
    """Store scraped data to a JSON file."""
    data = {"url": url, "results": results, "scraped_at": time.time()}
    filename = f"output_{int(time.time())}.json"
    with open(filename, "w") as f:
        json.dump(data, f, indent=2)
    return filename

@queue.task(queue="processing")
def summarize(pages):
    """Aggregate results from multiple pages."""
    total_links = sum(len(p.get("links", [])) for p in pages)
    titles = [p.get("title", "?") for p in pages]
    return {
        "pages_scraped": len(pages),
        "total_links": total_links,
        "titles": titles,
    }

@queue.task()
def scrape_page(url):
    """Full pipeline for a single page: fetch → extract links + title."""
    html = fetch_page(url)  # Direct call (not queued)
    links = extract_links(html)
    title = extract_title(html)
    return {"url": url, "title": title, "links": links}

# ── Periodic cleanup ────────────────────────────────────

@queue.periodic(cron="0 0 * * * *")
def hourly_cleanup():
    """Purge completed jobs and dead letters every hour."""
    completed = queue.purge_completed(older_than=3600)
    dead = queue.purge_dead(older_than=86400)
    print(f"Cleanup: purged {completed} completed, {dead} dead")

run.py

"""Enqueue scraping jobs."""
from tasks import queue, scrape_page, summarize, store_results
from taskito import group, chord

urls = [
    "https://example.com",
    "https://httpbin.org/html",
    "https://jsonplaceholder.typicode.com",
]

# ── Option 1: Simple parallel scraping ──────────────────

print("Enqueuing scrape jobs...")
jobs = [scrape_page.delay(url) for url in urls]

# Wait for all results
for job in jobs:
    result = job.result(timeout=30)
    print(f"  {result['title']}{len(result['links'])} links")

# ── Option 2: Chord — scrape in parallel, then summarize ─

print("\nRunning chord pipeline...")
result = chord(
    group(*[scrape_page.s(url) for url in urls]),
    summarize.s(),
).apply(queue)

summary = result.result(timeout=60)
print(f"  Scraped {summary['pages_scraped']} pages")
print(f"  Found {summary['total_links']} total links")
print(f"  Titles: {summary['titles']}")

# ── Option 3: Batch enqueue with .map() ─────────────────

print("\nBatch enqueue with .map()...")
jobs = scrape_page.map([(url,) for url in urls])
results = [j.result(timeout=30) for j in jobs]
print(f"  Scraped {len(results)} pages in batch")

# ── Check stats ─────────────────────────────────────────

stats = queue.stats()
print(f"\nQueue stats: {stats}")

worker.py

"""Start the worker."""
from tasks import queue

if __name__ == "__main__":
    print("Starting scraper worker...")
    queue.run_worker(queues=["scraping", "processing", "storage"])

Running It

python worker.py
python run.py
taskito info --app tasks:queue --watch

Key Patterns Demonstrated

Pattern Where
Rate limiting fetch_page — 30 requests/min
Retry with backoff fetch_page — 5 retries, 2.0x backoff
Named queues scraping, processing, storage
Hooks log_start, log_success, log_failure
Workflows (chord) Parallel scrape → summarize
Batch enqueue .map() for bulk job creation
Periodic tasks hourly_cleanup runs every hour
Result TTL Auto-cleanup completed jobs after 1 hour
Direct call scrape_page calls fetch_page() directly