Example: FastAPI Image Processing Service¶
A REST API for image processing that demonstrates FastAPI integration, progress tracking, async result fetching, job cancellation, and SSE progress streaming.
Project Structure¶
app.py¶
"""FastAPI image processing service with taskito."""
import time
from fastapi import FastAPI, HTTPException
from taskito import Queue, current_job
from taskito.contrib.fastapi import TaskitoRouter
queue = Queue(db_path=".taskito/images.db", workers=4, result_ttl=3600)
# ── Tasks ────────────────────────────────────────────────
@queue.task(timeout=300)
def resize_image(image_url: str, sizes: list[int]) -> dict:
"""Resize an image to multiple sizes with progress updates."""
results = {}
for i, size in enumerate(sizes):
# Simulate resize work
time.sleep(1)
results[f"{size}x{size}"] = f"{image_url}?w={size}&h={size}"
current_job.update_progress(int((i + 1) / len(sizes) * 100))
return results
@queue.task(max_retries=3, retry_backoff=2.0)
def generate_thumbnail(image_url: str) -> str:
"""Generate a thumbnail — retries on failure."""
time.sleep(0.5)
return f"{image_url}?thumb=true"
@queue.task(timeout=600)
def apply_filters(image_url: str, filters: list[str]) -> dict:
"""Apply a sequence of filters with progress."""
results = {}
for i, f in enumerate(filters):
time.sleep(2)
results[f] = f"{image_url}?filter={f}"
current_job.update_progress(int((i + 1) / len(filters) * 100))
return results
# ── FastAPI App ──────────────────────────────────────────
app = FastAPI(title="Image Processing Service")
# Mount the taskito router — adds /tasks/stats, /tasks/jobs/{id}, etc.
app.include_router(TaskitoRouter(queue), prefix="/tasks")
@app.post("/process")
async def submit_job(image_url: str, sizes: list[int] | None = None):
"""Submit an image processing job and return the job ID."""
if sizes is None:
sizes = [128, 256, 512, 1024]
job = resize_image.delay(image_url, sizes)
return {"job_id": job.id, "status_url": f"/tasks/jobs/{job.id}"}
@app.post("/process/{job_id}/cancel")
async def cancel_job(job_id: str):
"""Cancel a pending image processing job."""
cancelled = await queue.acancel_job(job_id)
if not cancelled:
raise HTTPException(400, "Job is not in a cancellable state")
return {"cancelled": True, "job_id": job_id}
@app.get("/process/{job_id}/result")
async def get_result(job_id: str, timeout: float = 0):
"""Get the result, optionally blocking until complete."""
job = queue.get_job(job_id)
if job is None:
raise HTTPException(404, "Job not found")
if timeout > 0:
try:
result = await job.aresult(timeout=timeout)
return {"status": "complete", "result": result}
except TimeoutError:
return {"status": job.status, "result": None}
return {"status": job.status, "progress": job.progress}
client.py¶
"""Example client for the image processing service."""
import httpx
import json
import time
BASE = "http://localhost:8000"
# 1. Submit a job
resp = httpx.post(f"{BASE}/process", params={
"image_url": "https://example.com/photo.jpg",
"sizes": [128, 256, 512, 1024],
})
data = resp.json()
job_id = data["job_id"]
print(f"Submitted job: {job_id}")
# 2. Stream progress via SSE
print("\nStreaming progress:")
with httpx.stream("GET", f"{BASE}/tasks/jobs/{job_id}/progress") as r:
for line in r.iter_lines():
if line.startswith("data:"):
payload = json.loads(line[5:].strip())
print(f" Progress: {payload['progress']}% — Status: {payload['status']}")
if payload["status"] in ("complete", "failed", "dead", "cancelled"):
break
# 3. Fetch the final result
result = httpx.get(f"{BASE}/tasks/jobs/{job_id}/result", params={"timeout": 5})
print(f"\nResult: {result.json()}")
Running It¶
SSE from the Browser¶
const jobId = "01H5K6X...";
const source = new EventSource(`/tasks/jobs/${jobId}/progress`);
const progressBar = document.getElementById("progress");
source.onmessage = (event) => {
const data = JSON.parse(event.data);
progressBar.style.width = `${data.progress}%`;
progressBar.textContent = `${data.progress}%`;
if (["complete", "failed", "dead", "cancelled"].includes(data.status)) {
source.close();
}
};
Key Patterns Demonstrated¶
| Pattern | Where |
|---|---|
| FastAPI integration | TaskitoRouter(queue) mounted at /tasks |
| Progress tracking | current_job.update_progress() in resize_image and apply_filters |
| Async result fetch | await job.aresult(timeout=...) in get_result endpoint |
| Async cancellation | await queue.acancel_job() in cancel_job endpoint |
| SSE streaming | /tasks/jobs/{id}/progress endpoint from TaskitoRouter |
| Retry with backoff | generate_thumbnail — 3 retries, 2x backoff |
| Result TTL | result_ttl=3600 — auto-cleanup after 1 hour |