Testing¶
taskito includes a built-in test mode that runs tasks synchronously in the calling thread — no worker, no Rust scheduler, no SQLite. This makes tests fast, deterministic, and easy to write.
Quick Example¶
from taskito import Queue
queue = Queue()
@queue.task()
def add(a: int, b: int) -> int:
return a + b
def test_add():
with queue.test_mode() as results:
add.delay(2, 3)
assert len(results) == 1
assert results[0].return_value == 5
assert results[0].succeeded
How It Works¶
When you enter queue.test_mode(), taskito patches the enqueue() method so that every .delay() or .apply_async() call:
- Looks up the task function in the registry
- Calls it immediately in the current thread
- Captures the return value (or exception) in a
TestResult - Appends the result to the
TestResultslist
No database is created. No worker threads are spawned. Tasks execute eagerly and synchronously.
queue.test_mode()¶
| Parameter | Type | Default | Description |
|---|---|---|---|
propagate_errors |
bool |
False |
If True, task exceptions are re-raised immediately instead of being captured in TestResult.error |
The context manager yields a TestResults list that accumulates results as tasks execute.
TestResult¶
Each executed task produces a TestResult:
with queue.test_mode() as results:
add.delay(2, 3)
r = results[0]
r.job_id # "test-000001"
r.task_name # "mymodule.add"
r.args # (2, 3)
r.kwargs # {}
r.return_value # 5
r.error # None
r.traceback # None
r.succeeded # True
r.failed # False
| Attribute | Type | Description |
|---|---|---|
job_id |
str |
Synthetic ID like "test-000001" |
task_name |
str |
Fully qualified task name |
args |
tuple |
Positional arguments passed to the task |
kwargs |
dict |
Keyword arguments passed to the task |
return_value |
Any |
Return value on success, None on failure |
error |
Exception \| None |
The exception if the task failed |
traceback |
str \| None |
Formatted traceback if the task failed |
succeeded |
bool |
True if no error |
failed |
bool |
True if an error occurred |
TestResults¶
TestResults is a list of TestResult with convenience methods:
with queue.test_mode() as results:
add.delay(2, 3)
failing_task.delay()
add.delay(10, 20)
# Filter by outcome
results.succeeded # TestResults with 2 items
results.failed # TestResults with 1 item
# Filter by task name
results.filter(task_name="mymodule.add") # 2 items
# Combine filters
results.filter(task_name="mymodule.add", succeeded=True) # 2 items
.filter()¶
| Parameter | Type | Description |
|---|---|---|
task_name |
str \| None |
Filter by exact task name |
succeeded |
bool \| None |
True for successes, False for failures |
Testing Failures¶
By default, task exceptions are captured — not raised:
@queue.task()
def risky():
raise ValueError("something broke")
def test_failure_captured():
with queue.test_mode() as results:
risky.delay()
assert len(results) == 1
assert results[0].failed
assert isinstance(results[0].error, ValueError)
assert "something broke" in str(results[0].error)
assert results[0].traceback is not None
Propagating Errors¶
Use propagate_errors=True when you want exceptions to bubble up:
def test_failure_propagated():
with queue.test_mode(propagate_errors=True) as results:
with pytest.raises(ValueError, match="something broke"):
risky.delay()
Testing Workflows¶
Chains, groups, and chords work in test mode because they call enqueue() internally, which is intercepted by the test mode patch.
Chains¶
from taskito import chain
@queue.task()
def double(n: int) -> int:
return n * 2
@queue.task()
def add_ten(n: int) -> int:
return n + 10
def test_chain():
with queue.test_mode() as results:
chain(double.s(5), add_ten.s()).apply()
assert len(results) == 2
assert results[0].return_value == 10 # double(5)
assert results[1].return_value == 20 # add_ten(10)
Groups¶
from taskito import group
def test_group():
with queue.test_mode() as results:
group(double.s(1), double.s(2), double.s(3)).apply()
assert len(results) == 3
values = [r.return_value for r in results]
assert values == [2, 4, 6]
Job Context in Tests¶
current_job works inside test mode. The context is set up before each task runs:
from taskito import current_job
@queue.task()
def context_aware():
return {
"job_id": current_job.id,
"task_name": current_job.task_name,
"retry_count": current_job.retry_count,
"queue_name": current_job.queue_name,
}
def test_context():
with queue.test_mode() as results:
context_aware.delay()
ctx = results[0].return_value
assert ctx["job_id"].startswith("test-")
assert ctx["retry_count"] == 0
assert ctx["queue_name"] == "default"
Pytest Integration¶
Fixture Pattern¶
Create a reusable fixture for test mode:
# conftest.py
import pytest
from myapp import queue
@pytest.fixture
def task_results():
with queue.test_mode() as results:
yield results
# test_tasks.py
def test_add(task_results):
add.delay(2, 3)
assert task_results[0].return_value == 5
def test_email(task_results):
send_email.delay("user@example.com", "Hello", "World")
assert task_results[0].succeeded
Fixture with Error Propagation¶
@pytest.fixture
def strict_tasks():
with queue.test_mode(propagate_errors=True) as results:
yield results
Testing Async Code¶
Test mode works with async test functions — the tasks still execute synchronously:
import pytest
@pytest.mark.asyncio
async def test_async_enqueue(task_results):
add.delay(1, 2)
assert task_results[0].return_value == 3
What Test Mode Does NOT Cover¶
Test mode is designed for unit and integration testing of task logic. It does not exercise:
- SQLite storage or queries
- Retry/backoff scheduling
- Rate limiting
- Timeout reaping
- Worker thread pool dispatch
- Priority ordering
For end-to-end tests that exercise the full Rust scheduler, run a real worker in a background thread:
import threading
import time
def test_e2e():
queue_e2e = Queue(db_path=":memory:")
@queue_e2e.task()
def add(a, b):
return a + b
t = threading.Thread(target=queue_e2e.run_worker, daemon=True)
t.start()
job = add.delay(2, 3)
result = job.result(timeout=10)
assert result == 5
Middleware in test mode
Per-task and queue-level TaskMiddleware hooks (before, after, on_retry) do fire in test mode, since they run in the Python wrapper around your task function. This lets you verify middleware behavior in tests without running a real worker.