Queue¶
::: taskito.app.Queue
The central class for creating and managing a task queue.
Constructor¶
Queue(
db_path: str = ".taskito/taskito.db",
workers: int = 0,
default_retry: int = 3,
default_timeout: int = 300,
default_priority: int = 0,
result_ttl: int | None = None,
)
| Parameter | Type | Default | Description |
|---|---|---|---|
db_path |
str |
".taskito/taskito.db" |
Path to SQLite database file. Parent directories are created automatically. |
workers |
int |
0 |
Number of worker threads (0 = auto-detect CPU count) |
default_retry |
int |
3 |
Default max retry attempts for tasks |
default_timeout |
int |
300 |
Default task timeout in seconds |
default_priority |
int |
0 |
Default task priority (higher = more urgent) |
result_ttl |
int \| None |
None |
Auto-cleanup completed/dead jobs older than this many seconds. None disables. |
Task Registration¶
@queue.task()¶
@queue.task(
name: str | None = None,
max_retries: int = 3,
retry_backoff: float = 1.0,
timeout: int = 300,
priority: int = 0,
rate_limit: str | None = None,
queue: str = "default",
) -> TaskWrapper
Register a function as a background task. Returns a TaskWrapper.
@queue.periodic()¶
@queue.periodic(
cron: str,
name: str | None = None,
args: tuple = (),
kwargs: dict | None = None,
queue: str = "default",
) -> TaskWrapper
Register a periodic (cron-scheduled) task. Uses 6-field cron expressions with seconds.
Enqueue Methods¶
queue.enqueue()¶
queue.enqueue(
task_name: str,
args: tuple = (),
kwargs: dict | None = None,
priority: int | None = None,
delay: float | None = None,
queue: str | None = None,
max_retries: int | None = None,
timeout: int | None = None,
unique_key: str | None = None,
metadata: str | None = None,
depends_on: str | list[str] | None = None,
) -> JobResult
Enqueue a task for execution. Returns a JobResult handle.
| Parameter | Type | Default | Description |
|---|---|---|---|
depends_on |
str \| list[str] \| None |
None |
Job ID(s) this job depends on. See Dependencies. |
queue.enqueue_many()¶
queue.enqueue_many(
task_name: str,
args_list: list[tuple],
kwargs_list: list[dict] | None = None,
priority: int | None = None,
queue: str | None = None,
max_retries: int | None = None,
timeout: int | None = None,
) -> list[JobResult]
Enqueue multiple jobs in a single SQLite transaction for high throughput.
Job Management¶
queue.get_job()¶
Retrieve a job by ID. Returns None if not found.
queue.list_jobs()¶
queue.list_jobs(
status: str | None = None,
queue: str | None = None,
task_name: str | None = None,
limit: int = 50,
offset: int = 0,
) -> list[JobResult]
List jobs with optional filters. Returns a list of JobResult handles ordered by creation time (newest first).
| Parameter | Type | Default | Description |
|---|---|---|---|
status |
str \| None |
None |
Filter by status: pending, running, completed, failed, dead, cancelled |
queue |
str \| None |
None |
Filter by queue name |
task_name |
str \| None |
None |
Filter by task name |
limit |
int |
50 |
Maximum results to return |
offset |
int |
0 |
Pagination offset |
queue.cancel_job()¶
Cancel a pending job. Returns True if cancelled, False if not pending. If the job has dependents, they are cascade-cancelled.
queue.update_progress()¶
Update progress for a running job (0–100).
queue.job_errors()¶
Get error history for a job. Returns a list of dicts with id, job_id, attempt, error, failed_at.
Statistics¶
queue.stats()¶
Returns {"pending": N, "running": N, "completed": N, "failed": N, "dead": N, "cancelled": N}.
Dead Letter Queue¶
queue.dead_letters()¶
List dead letter entries. Each dict contains: id, original_job_id, queue, task_name, error, retry_count, failed_at, metadata.
queue.retry_dead()¶
Re-enqueue a dead letter job. Returns the new job ID.
queue.purge_dead()¶
Purge dead letter entries older than older_than seconds. Returns count deleted.
Cleanup¶
queue.purge_completed()¶
Purge completed jobs older than older_than seconds. Returns count deleted.
Worker¶
queue.run_worker()¶
Start the worker loop. Blocks until interrupted. Pass queues to limit which queues are processed.
Hooks¶
@queue.before_task¶
@queue.after_task¶
@queue.after_task
def hook(task_name: str, args: tuple, kwargs: dict, result: Any, error: Exception | None) -> None: ...
@queue.on_success¶
@queue.on_failure¶
@queue.on_failure
def hook(task_name: str, args: tuple, kwargs: dict, error: Exception) -> None: ...
Async Methods¶
| Sync | Async |
|---|---|
queue.stats() |
await queue.astats() |
queue.dead_letters() |
await queue.adead_letters() |
queue.retry_dead() |
await queue.aretry_dead() |
queue.cancel_job() |
await queue.acancel_job() |
queue.run_worker() |
await queue.arun_worker() |