Skip to content

Workers

Workers process queued jobs. taskito runs workers as OS threads within a single process, managed by a Rust scheduler.

Starting a Worker

taskito worker --app myapp.tasks:queue
Flag Description
--app Python path to your Queue instance (module:attribute)
--queues Comma-separated queue names (default: all registered)
# Blocks the current thread
queue.run_worker()

# With specific queues
queue.run_worker(queues=["emails", "reports"])
import threading

t = threading.Thread(target=queue.run_worker, daemon=True)
t.start()

# Your application continues...
import asyncio

async def main():
    # Runs worker in a thread pool, non-blocking
    await queue.arun_worker()

asyncio.run(main())

Worker Count

By default, taskito auto-detects the number of CPU cores:

queue = Queue(db_path="myapp.db", workers=0)  # Auto-detect (default)
queue = Queue(db_path="myapp.db", workers=8)  # Explicit count

Note

Workers are OS threads, not processes. Each worker acquires the Python GIL only during task execution, so the scheduler and dispatch logic run without GIL contention.

Graceful Shutdown

taskito supports graceful shutdown via Ctrl+C:

  1. First Ctrl+C: Stops accepting new jobs, waits up to 30 seconds for in-flight tasks to complete
  2. Second Ctrl+C: Force-kills immediately
$ taskito worker --app myapp:queue
[taskito] Starting worker...
[taskito] Registered tasks: 3
[taskito] Queues: default, emails
^C
[taskito] Shutting down gracefully (waiting for in-flight jobs)...
[taskito] Worker stopped.

Programmatic Shutdown

# From another thread or signal handler
queue._inner.request_shutdown()

How Workers Work

graph LR
    S["Scheduler<br/>(Tokio async)"] -->|Job| CH["Bounded Channel"]

    CH --> W1["Worker 1"]
    CH --> W2["Worker 2"]
    CH --> WN["Worker N"]

    W1 -->|Result| RCH["Result Channel"]
    W2 -->|Result| RCH
    WN -->|Result| RCH

    RCH --> ML["Result Handler"]
    ML -->|"complete / retry / DLQ"| DB[("SQLite")]
  1. The scheduler runs in a dedicated Tokio async thread, polling SQLite for ready jobs every 50ms
  2. Ready jobs are sent to the worker pool via a bounded crossbeam channel
  3. Each worker thread acquires the GIL, deserializes arguments, and runs the Python function
  4. Results flow back through a result channel to the main loop
  5. The main loop updates job status in SQLite (complete, retry, or DLQ)