Quickstart¶
Build your first task queue in 5 minutes.
1. Define Tasks¶
Create a file called tasks.py:
from taskito import Queue
# Create a queue backed by SQLite
queue = Queue(db_path="tasks.db")
@queue.task()
def add(a: int, b: int) -> int:
return a + b
@queue.task(max_retries=3, retry_backoff=2.0)
def send_email(to: str, subject: str, body: str) -> str:
# Your email sending logic here
print(f"Sending email to {to}: {subject}")
return f"sent to {to}"
2. Enqueue Jobs¶
from tasks import add, send_email
# Enqueue returns a JobResult handle
job = add.delay(2, 3)
print(f"Job ID: {job.id}") # Job ID: 01936...
print(f"Status: {job.status}") # Status: pending
3. Start a Worker¶
4. Get Results¶
from tasks import add
job = add.delay(2, 3)
# Block until complete (with exponential backoff polling)
result = job.result(timeout=30)
print(result) # 5
# Or use async
result = await job.aresult(timeout=30)
5. Monitor¶
from tasks import queue
stats = queue.stats()
print(stats)
# {'pending': 0, 'running': 0, 'completed': 5, 'failed': 0, 'dead': 0, 'cancelled': 0}
Or use the CLI:
# One-shot stats
taskito info --app tasks:queue
# Live dashboard (refreshes every 2s)
taskito info --app tasks:queue --watch
Next Steps¶
- Tasks — decorator options,
.delay()vs.apply_async() - Workers — CLI flags, graceful shutdown, worker count
- Retries — exponential backoff, dead letter queue
- Workflows — chain, group, chord
- Testing — run tasks synchronously in tests with
queue.test_mode() - Migrating from Celery — concept mapping and side-by-side examples