Skip to content

Example: FastAPI Image Processing Service

A REST API for image processing that demonstrates FastAPI integration, progress tracking, async result fetching, job cancellation, and SSE progress streaming.

Project Structure

image-service/
  app.py       # FastAPI app + task definitions
  client.py    # Example client script

app.py

"""FastAPI image processing service with taskito."""

import time

from fastapi import FastAPI, HTTPException
from taskito import Queue, current_job
from taskito.contrib.fastapi import TaskitoRouter

queue = Queue(db_path=".taskito/images.db", workers=4, result_ttl=3600)

# ── Tasks ────────────────────────────────────────────────

@queue.task(timeout=300)
def resize_image(image_url: str, sizes: list[int]) -> dict:
    """Resize an image to multiple sizes with progress updates."""
    results = {}
    for i, size in enumerate(sizes):
        # Simulate resize work
        time.sleep(1)
        results[f"{size}x{size}"] = f"{image_url}?w={size}&h={size}"
        current_job.update_progress(int((i + 1) / len(sizes) * 100))
    return results

@queue.task(max_retries=3, retry_backoff=2.0)
def generate_thumbnail(image_url: str) -> str:
    """Generate a thumbnail — retries on failure."""
    time.sleep(0.5)
    return f"{image_url}?thumb=true"

@queue.task(timeout=600)
def apply_filters(image_url: str, filters: list[str]) -> dict:
    """Apply a sequence of filters with progress."""
    results = {}
    for i, f in enumerate(filters):
        time.sleep(2)
        results[f] = f"{image_url}?filter={f}"
        current_job.update_progress(int((i + 1) / len(filters) * 100))
    return results

# ── FastAPI App ──────────────────────────────────────────

app = FastAPI(title="Image Processing Service")

# Mount the taskito router — adds /tasks/stats, /tasks/jobs/{id}, etc.
app.include_router(TaskitoRouter(queue), prefix="/tasks")

@app.post("/process")
async def submit_job(image_url: str, sizes: list[int] | None = None):
    """Submit an image processing job and return the job ID."""
    if sizes is None:
        sizes = [128, 256, 512, 1024]
    job = resize_image.delay(image_url, sizes)
    return {"job_id": job.id, "status_url": f"/tasks/jobs/{job.id}"}

@app.post("/process/{job_id}/cancel")
async def cancel_job(job_id: str):
    """Cancel a pending image processing job."""
    cancelled = await queue.acancel_job(job_id)
    if not cancelled:
        raise HTTPException(400, "Job is not in a cancellable state")
    return {"cancelled": True, "job_id": job_id}

@app.get("/process/{job_id}/result")
async def get_result(job_id: str, timeout: float = 0):
    """Get the result, optionally blocking until complete."""
    job = queue.get_job(job_id)
    if job is None:
        raise HTTPException(404, "Job not found")
    if timeout > 0:
        try:
            result = await job.aresult(timeout=timeout)
            return {"status": "complete", "result": result}
        except TimeoutError:
            return {"status": job.status, "result": None}
    return {"status": job.status, "progress": job.progress}

client.py

"""Example client for the image processing service."""

import httpx
import json
import time

BASE = "http://localhost:8000"

# 1. Submit a job
resp = httpx.post(f"{BASE}/process", params={
    "image_url": "https://example.com/photo.jpg",
    "sizes": [128, 256, 512, 1024],
})
data = resp.json()
job_id = data["job_id"]
print(f"Submitted job: {job_id}")

# 2. Stream progress via SSE
print("\nStreaming progress:")
with httpx.stream("GET", f"{BASE}/tasks/jobs/{job_id}/progress") as r:
    for line in r.iter_lines():
        if line.startswith("data:"):
            payload = json.loads(line[5:].strip())
            print(f"  Progress: {payload['progress']}% — Status: {payload['status']}")
            if payload["status"] in ("complete", "failed", "dead", "cancelled"):
                break

# 3. Fetch the final result
result = httpx.get(f"{BASE}/tasks/jobs/{job_id}/result", params={"timeout": 5})
print(f"\nResult: {result.json()}")

Running It

taskito worker --app app:queue
uvicorn app:app --reload
python client.py

SSE from the Browser

const jobId = "01H5K6X...";
const source = new EventSource(`/tasks/jobs/${jobId}/progress`);

const progressBar = document.getElementById("progress");

source.onmessage = (event) => {
  const data = JSON.parse(event.data);
  progressBar.style.width = `${data.progress}%`;
  progressBar.textContent = `${data.progress}%`;

  if (["complete", "failed", "dead", "cancelled"].includes(data.status)) {
    source.close();
  }
};

Key Patterns Demonstrated

Pattern Where
FastAPI integration TaskitoRouter(queue) mounted at /tasks
Progress tracking current_job.update_progress() in resize_image and apply_filters
Async result fetch await job.aresult(timeout=...) in get_result endpoint
Async cancellation await queue.acancel_job() in cancel_job endpoint
SSE streaming /tasks/jobs/{id}/progress endpoint from TaskitoRouter
Retry with backoff generate_thumbnail — 3 retries, 2x backoff
Result TTL result_ttl=3600 — auto-cleanup after 1 hour