Skip to content

Advanced

Unique Tasks

Deduplicate active jobs by key — if a job with the same unique_key is already pending or running, the existing job is returned instead of creating a new one:

job1 = process.apply_async(args=("report",), unique_key="daily-report")
job2 = process.apply_async(args=("report",), unique_key="daily-report")
assert job1.id == job2.id  # Same job, not duplicated

Once the original job completes (or fails to DLQ), the key is released and a new job can be created with the same key.

Implementation

Deduplication uses a partial unique index: CREATE UNIQUE INDEX ... ON jobs(unique_key) WHERE unique_key IS NOT NULL AND status IN (0, 1). Only pending and running jobs participate. The check-and-insert is atomic (transaction-protected), so concurrent calls with the same unique_key are handled gracefully without race conditions.

Job Cancellation

Cancel a pending job before it starts:

job = send_email.delay("user@example.com", "Hello", "World")
cancelled = queue.cancel_job(job.id)  # True if was pending
  • Returns True if the job was pending and is now cancelled
  • Returns False if the job was already running, completed, or in another non-pending state
  • Cancelled jobs cannot be un-cancelled

Result TTL & Auto-Cleanup

Manual Cleanup

# Purge completed jobs older than 1 hour
deleted = queue.purge_completed(older_than=3600)

# Purge dead letters older than 24 hours
deleted = queue.purge_dead(older_than=86400)

Automatic Cleanup

Set result_ttl on the Queue to automatically purge old jobs while the worker runs:

queue = Queue(
    db_path="myapp.db",
    result_ttl=3600,  # Auto-purge completed/dead jobs older than 1 hour
)

The scheduler checks every ~60 seconds and purges:

  • Completed jobs older than result_ttl
  • Dead letter entries older than result_ttl
  • Error history records older than result_ttl

Set to None (default) to disable auto-cleanup.

Cascade Cleanup

When jobs are purged — either manually via purge_completed() or automatically via result_ttl — their related child records are also deleted:

  • Error history (job_errors)
  • Task logs (task_logs)
  • Task metrics (task_metrics)
  • Job dependencies (job_dependencies)
  • Replay history (replay_history)

This prevents orphaned records from accumulating when parent jobs are removed.

# Manual purge — child records are cleaned up automatically
deleted = queue.purge_completed(older_than=3600)
print(f"Purged {deleted} jobs and their related records")

# With per-job TTL — cascade cleanup still applies
job = resize_image.apply_async(
    args=("photo.jpg",),
    result_ttl=600,  # This job's results expire after 10 minutes
)
# When this job is purged (after 10 min), its errors, logs,
# metrics, dependencies, and replay history are also removed.

Note

Dead letter entries are not cascade-deleted — they have their own lifecycle managed by purge_dead(). Timestamp-based cleanup (result_ttl) of error history, logs, and metrics also continues to run independently, catching old records regardless of whether the parent job still exists.

Async Support

All inspection methods have async variants that run in a thread pool:

# Sync
stats = queue.stats()
dead = queue.dead_letters()
new_id = queue.retry_dead(dead_id)
cancelled = queue.cancel_job(job_id)
result = job.result(timeout=30)

# Async equivalents
stats = await queue.astats()
dead = await queue.adead_letters()
new_id = await queue.aretry_dead(dead_id)
cancelled = await queue.acancel_job(job_id)
result = await job.aresult(timeout=30)

Async Worker

async def main():
    await queue.arun_worker(queues=["default"])

asyncio.run(main())

Batch Enqueue

Insert many jobs in a single SQLite transaction for high throughput:

task.map()

@queue.task()
def process(item_id):
    return fetch_and_process(item_id)

# Enqueue 1000 jobs in one transaction
jobs = process.map([(i,) for i in range(1000)])

queue.enqueue_many()

jobs = queue.enqueue_many(
    task_name="myapp.process",
    args_list=[(i,) for i in range(1000)],
    kwargs_list=None,   # Optional per-job kwargs
    priority=5,         # Same priority for all
    queue="processing", # Same queue for all
)

SQLite Configuration

taskito configures SQLite for optimal performance:

Pragma Value Purpose
journal_mode WAL Concurrent reads during writes
busy_timeout 5000ms Wait instead of failing on lock contention
synchronous NORMAL Balance between safety and speed
journal_size_limit 64MB Prevent unbounded WAL growth

The connection pool uses up to 8 connections via r2d2.

FastAPI Integration

taskito provides a first-class FastAPI integration via taskito.contrib.fastapi. It gives you a pre-built APIRouter with endpoints for job status, progress streaming via SSE, and dead letter queue management.

Installation

pip install taskito[fastapi]

This installs fastapi and pydantic as extras.

Quick Setup

from fastapi import FastAPI
from taskito import Queue
from taskito.contrib.fastapi import TaskitoRouter

queue = Queue(db_path="myapp.db")

@queue.task()
def process_data(payload: dict) -> str:
    return "done"

app = FastAPI()
app.include_router(TaskitoRouter(queue), prefix="/tasks")

Run with:

uvicorn myapp:app --reload

All taskito endpoints are now available under /tasks/.

Endpoints

The TaskitoRouter exposes the following endpoints:

Method Path Description
GET /stats Queue statistics (pending, running, completed, etc.)
GET /jobs/{job_id} Job status, progress, and metadata
GET /jobs/{job_id}/errors Error history for a job
GET /jobs/{job_id}/result Job result (optional blocking with ?timeout=N)
GET /jobs/{job_id}/progress SSE stream of progress updates
POST /jobs/{job_id}/cancel Cancel a pending job
GET /dead-letters List dead letter entries (paginated)
POST /dead-letters/{dead_id}/retry Re-enqueue a dead letter

Blocking Result Fetch

The /jobs/{job_id}/result endpoint supports an optional timeout query parameter (0–300 seconds). When timeout > 0, the request blocks until the job completes or the timeout elapses:

# Non-blocking (default)
curl http://localhost:8000/tasks/jobs/01H5K6X.../result

# Block up to 30 seconds for the result
curl http://localhost:8000/tasks/jobs/01H5K6X.../result?timeout=30

SSE Progress Streaming

Stream real-time progress for a running job using Server-Sent Events:

import httpx

with httpx.stream("GET", "http://localhost:8000/tasks/jobs/01H5K6X.../progress") as r:
    for line in r.iter_lines():
        print(line)
        # data: {"progress": 25, "status": "running"}
        # data: {"progress": 50, "status": "running"}
        # data: {"progress": 100, "status": "completed"}

From the browser:

const source = new EventSource("/tasks/jobs/01H5K6X.../progress");
source.onmessage = (event) => {
  const data = JSON.parse(event.data);
  console.log(`Progress: ${data.progress}%`);
  if (data.status === "completed" || data.status === "failed") {
    source.close();
  }
};

The stream sends a JSON event every 0.5 seconds while the job is active, then a final event when the job reaches a terminal state.

Pydantic Response Models

All endpoints return validated Pydantic models with clean OpenAPI docs. You can import them for type-safe client code:

from taskito.contrib.fastapi import (
    StatsResponse,
    JobResponse,
    JobErrorResponse,
    JobResultResponse,
    CancelResponse,
    DeadLetterResponse,
    RetryResponse,
)

Custom Tags and Dependencies

Customize the router with FastAPI tags and dependency injection:

from fastapi import Depends, FastAPI, Header, HTTPException
from taskito.contrib.fastapi import TaskitoRouter

async def verify_api_key(x_api_key: str = Header(...)):
    if x_api_key != "secret":
        raise HTTPException(status_code=401, detail="Invalid API key")

app = FastAPI()

router = TaskitoRouter(
    queue,
    tags=["task-queue"],              # OpenAPI tags
    dependencies=[Depends(verify_api_key)],  # Applied to all endpoints
)

app.include_router(router, prefix="/tasks")

Full Example

from fastapi import FastAPI, Header, HTTPException, Depends
from taskito import Queue, current_job
from taskito.contrib.fastapi import TaskitoRouter

queue = Queue(db_path="myapp.db")

@queue.task()
def resize_image(image_url: str, sizes: list[int]) -> dict:
    results = {}
    for i, size in enumerate(sizes):
        results[size] = do_resize(image_url, size)
        current_job.update_progress(int((i + 1) / len(sizes) * 100))
    return results

async def require_auth(authorization: str = Header(...)):
    if not authorization.startswith("Bearer "):
        raise HTTPException(401)

app = FastAPI(title="Image Service")
app.include_router(
    TaskitoRouter(queue, dependencies=[Depends(require_auth)]),
    prefix="/tasks",
    tags=["tasks"],
)

# Start worker in a separate process:
#   taskito worker --app myapp:queue
# Check job status
curl http://localhost:8000/tasks/jobs/01H5K6X... \
  -H "Authorization: Bearer mytoken"

# Stream progress
curl -N http://localhost:8000/tasks/jobs/01H5K6X.../progress \
  -H "Authorization: Bearer mytoken"

# Block for result (up to 60s)
curl http://localhost:8000/tasks/jobs/01H5K6X.../result?timeout=60 \
  -H "Authorization: Bearer mytoken"