Monitoring & Hooks¶
Queue Statistics¶
Get a snapshot of job counts by status:
stats = queue.stats()
# {'pending': 12, 'running': 3, 'completed': 450, 'failed': 2, 'dead': 1, 'cancelled': 0}
Async variant:
CLI Monitoring¶
One-Shot Stats¶
taskito queue statistics
------------------------------
pending 12
running 3
completed 450
failed 2
dead 1
cancelled 0
------------------------------
total 468
Live Dashboard¶
Refreshes every 2 seconds with throughput calculation (completed jobs per second).
Progress Tracking¶
Report progress from inside tasks using current_job:
from taskito import current_job
@queue.task()
def process_batch(items):
total = len(items)
for i, item in enumerate(items):
process(item)
current_job.update_progress(int((i + 1) / total * 100))
return f"Processed {total} items"
Read progress from outside:
job = process_batch.delay(items)
# Poll progress
fetched = queue.get_job(job.id)
print(fetched.progress) # 0-100 or None
Job Context¶
Inside a running task, current_job provides:
| Property | Type | Description |
|---|---|---|
current_job.id |
str |
The current job ID |
current_job.task_name |
str |
The registered task name |
current_job.retry_count |
int |
Current retry attempt (0 = first run) |
current_job.queue_name |
str |
The queue this job is running on |
from taskito import current_job
@queue.task()
def my_task():
print(f"Running job {current_job.id}")
print(f"Task: {current_job.task_name}")
print(f"Attempt: {current_job.retry_count}")
print(f"Queue: {current_job.queue_name}")
Warning
current_job properties raise RuntimeError when accessed outside of a running task.
Worker Heartbeat¶
Monitor active workers and their health:
workers = queue.workers()
for w in workers:
print(f"Worker {w['worker_id']}: {w['status']} (last seen: {w['last_heartbeat']})")
Async variant:
The worker heartbeat is also available via the dashboard REST API at GET /api/workers. See the Dashboard guide for details.
Hooks¶
Run code before/after every task, or on success/failure.
@queue.before_task¶
Called before each task executes:
@queue.after_task¶
Called after each task, regardless of success or failure:
@queue.after_task
def log_end(task_name, args, kwargs, result, error):
status = "OK" if error is None else f"FAILED: {error}"
print(f"[END] {task_name} - {status}")
@queue.on_success¶
Called only when a task succeeds:
@queue.on_success
def track_metrics(task_name, args, kwargs, result):
metrics.increment(f"task.{task_name}.success")
@queue.on_failure¶
Called only when a task raises an exception:
@queue.on_failure
def alert_on_error(task_name, args, kwargs, error):
sentry_sdk.capture_exception(error)
Hook Signatures¶
| Hook | Signature |
|---|---|
before_task |
fn(task_name, args, kwargs) |
after_task |
fn(task_name, args, kwargs, result, error) |
on_success |
fn(task_name, args, kwargs, result) |
on_failure |
fn(task_name, args, kwargs, error) |
Multiple hooks
You can register multiple hooks of the same type. They execute in registration order.