Task Dependencies¶
taskito supports declaring dependencies between jobs, allowing you to build DAG-style workflows where a job only runs after its upstream dependencies have completed successfully.
Basic Usage¶
Pass depends_on when enqueuing a job to declare that it should wait for one or more other jobs to finish:
The depends_on parameter accepts:
| Value | Description |
|---|---|
str |
A single job ID |
list[str] |
Multiple job IDs (all must complete) |
None (default) |
No dependencies |
Tip
You can also use depends_on with queue.enqueue() directly:
How It Works¶
- When a job with
depends_onis enqueued, it enters a waiting state - The scheduler periodically checks waiting jobs to see if all dependencies have completed
- Once every dependency has
status=completed, the job transitions topendingand becomes eligible for dispatch - If any dependency fails, dies, or is cancelled, the dependent job is cascade cancelled
flowchart TD
E["Enqueue with depends_on"] --> W["Status: Waiting"]
W --> CHECK{"All deps completed?"}
CHECK -->|Yes| P["Status: Pending"]
CHECK -->|"Any dep failed/dead/cancelled"| CC["Cascade Cancel"]
P --> R["Dispatched to worker"]
CC --> DONE["Status: Cancelled<br/>reason: dependency_failed"]
Cascade Cancel¶
When a dependency fails (exhausts retries and moves to DLQ), dies, or is cancelled, all downstream dependents are automatically cancelled. This propagates transitively through the entire dependency graph:
job_a = step_one.delay()
job_b = step_two.apply_async(args=(), depends_on=job_a.id)
job_c = step_three.apply_async(args=(), depends_on=job_b.id)
# If job_a fails permanently:
# - job_b is cascade cancelled
# - job_c is cascade cancelled (transitive)
Cascade is immediate
As soon as a dependency enters a terminal failure state (dead or cancelled), all downstream dependents are cancelled in the same scheduler tick. There is no grace period.
Inspecting Dependencies¶
job.dependencies¶
Returns the list of job IDs this job depends on:
job_c = merge.apply_async(
args=(),
depends_on=[job_a.id, job_b.id],
)
fetched = queue.get_job(job_c.id)
print(fetched.dependencies) # ['01H5K6X...', '01H5K7Y...']
job.dependents¶
Returns the list of job IDs that depend on this job:
Error Handling¶
Missing Dependencies¶
If you reference a job ID that does not exist, enqueue raises a ValueError:
try:
job = transform.apply_async(
args=(),
depends_on="nonexistent-job-id",
)
except ValueError as e:
print(e) # "Dependency job 'nonexistent-job-id' not found"
Already-Dead Dependencies¶
If a dependency is already in a terminal failure state (dead or cancelled) at enqueue time, the dependent job is immediately cancelled:
dead_job = queue.get_job(some_dead_id)
assert dead_job.status == "dead"
# This job is immediately cancelled — it will never run
job = transform.apply_async(
args=(),
depends_on=dead_job.id,
)
fetched = queue.get_job(job.id)
print(fetched.status) # "cancelled"
DAG Workflow Examples¶
Diamond Pattern¶
A classic diamond dependency graph where two branches converge:
graph TD
A["extract.delay()"] --> B["transform_a.apply_async(depends_on=A)"]
A --> C["transform_b.apply_async(depends_on=A)"]
B --> D["load.apply_async(depends_on=[B, C])"]
C --> D
# Extract
job_a = extract.delay(source_url)
# Two parallel transforms, each depending on extract
job_b = transform_a.apply_async(
args=("schema_a",),
depends_on=job_a.id,
)
job_c = transform_b.apply_async(
args=("schema_b",),
depends_on=job_a.id,
)
# Load waits for both transforms
job_d = load.apply_async(
args=(),
depends_on=[job_b.id, job_c.id],
)
Multi-Stage Pipeline¶
A sequential pipeline with fan-out at one stage:
# Stage 1: Download
download_jobs = [
download.delay(url) for url in urls
]
# Stage 2: Process each download (each depends on its own download)
process_jobs = [
process.apply_async(
args=(url,),
depends_on=dl.id,
)
for dl, url in zip(download_jobs, urls)
]
# Stage 3: Aggregate all results
aggregate_job = aggregate.apply_async(
args=(),
depends_on=[j.id for j in process_jobs],
)
Conditional Branching¶
Combine dependencies with metadata to build conditional workflows:
job_a = validate.delay(data)
# Both branches depend on validation
job_success = on_valid.apply_async(
args=(data,),
depends_on=job_a.id,
metadata='{"branch": "success"}',
)
# Use a separate task to handle the "validation failed" path
# by inspecting job_a's result in the task body
Dependencies vs. Workflows
depends_on is a lower-level primitive than chains, groups, and chords. Use depends_on when you need fine-grained control over a custom DAG. Use the workflow primitives when your pipeline fits a standard pattern.
Combining with Other Features¶
Dependencies compose naturally with other taskito features:
job = transform.apply_async(
args=(data,),
depends_on=job_a.id,
priority=10, # High priority once unblocked
queue="processing", # Target queue
max_retries=5, # Retry policy
delay=60, # Additional delay after deps resolve
unique_key="transform-daily", # Deduplication
)
Delay + depends_on
When both delay and depends_on are set, the job first waits for all dependencies to complete, then waits for the additional delay before becoming eligible for dispatch.