Example: Notification Service¶
A notification service demonstrating delayed scheduling, unique tasks, priority queues, and job cancellation.
Project Structure¶
notifications/
tasks.py # Notification task definitions
service.py # Enqueue notifications with scheduling
tasks.py¶
"""Notification tasks with priority and deduplication."""
import time
from taskito import Queue
queue = Queue(
db_path=".taskito/notifications.db",
workers=4,
default_retry=3,
default_timeout=30,
result_ttl=7200, # auto-cleanup after 2 hours
)
# ── Notification Tasks ───────────────────────────────────
@queue.task(priority=10)
def send_urgent_email(to: str, subject: str, body: str) -> dict:
"""High-priority email — runs before bulk notifications."""
# Simulate sending
time.sleep(0.2)
print(f"[URGENT] Email to {to}: {subject}")
return {"to": to, "subject": subject, "sent": True}
@queue.task(priority=0)
def send_bulk_email(to: str, subject: str, body: str) -> dict:
"""Low-priority bulk email."""
time.sleep(0.1)
print(f"[BULK] Email to {to}: {subject}")
return {"to": to, "subject": subject, "sent": True}
@queue.task(priority=5, max_retries=5, retry_backoff=2.0)
def send_push(user_id: str, title: str, message: str) -> dict:
"""Push notification with retries."""
time.sleep(0.3)
print(f"[PUSH] {user_id}: {title}")
return {"user_id": user_id, "title": title, "sent": True}
@queue.task()
def send_sms(phone: str, message: str) -> dict:
"""SMS notification."""
time.sleep(0.5)
print(f"[SMS] {phone}: {message}")
return {"phone": phone, "sent": True}
# ── Periodic Digest ──────────────────────────────────────
@queue.periodic(cron="0 9 * * * *")
def daily_digest():
"""Send daily digest email every day at 9 AM."""
print("[DIGEST] Sending daily digest to all subscribers...")
service.py¶
"""Notification service — enqueue with scheduling, deduplication, and cancellation."""
import time
from tasks import (
queue,
send_urgent_email,
send_bulk_email,
send_push,
send_sms,
)
# ── 1. Delayed Scheduling ───────────────────────────────
# Schedule a reminder 30 minutes from now
print("1. Delayed scheduling")
reminder = send_push.apply_async(
args=["user_123", "Reminder", "Your meeting starts in 5 minutes"],
delay=1800, # 30 minutes from now
)
print(f" Scheduled reminder: {reminder.id} (runs in 30 min)")
# Schedule a follow-up email for 1 hour later
followup = send_bulk_email.apply_async(
args=["user@example.com", "How was your meeting?", "We'd love your feedback."],
delay=3600, # 1 hour from now
)
print(f" Scheduled follow-up: {followup.id} (runs in 1 hour)")
# ── 2. Unique Tasks (Deduplication) ─────────────────────
# Prevent duplicate notifications for the same event
print("\n2. Unique tasks")
job1 = send_push.apply_async(
args=["user_456", "New message", "You have a new message"],
unique_key="notify:user_456:new_message",
)
job2 = send_push.apply_async(
args=["user_456", "New message", "You have a new message"],
unique_key="notify:user_456:new_message",
)
print(f" First enqueue: {job1.id}")
print(f" Second enqueue: {job2.id}")
print(f" Same job? {job1.id == job2.id}") # True — deduplicated
# ── 3. Priority Queues ──────────────────────────────────
# Urgent notifications run before bulk
print("\n3. Priority queues")
# Enqueue bulk emails first
bulk_jobs = send_bulk_email.map([
("alice@example.com", "Newsletter", "This week's updates..."),
("bob@example.com", "Newsletter", "This week's updates..."),
("carol@example.com", "Newsletter", "This week's updates..."),
])
print(f" Enqueued {len(bulk_jobs)} bulk emails (priority=0)")
# Enqueue urgent email after — but it runs first due to priority=10
urgent = send_urgent_email.delay(
"admin@example.com",
"Server Alert",
"CPU usage exceeded 90%",
)
print(f" Enqueued urgent email (priority=10) — runs before bulk")
# ── 4. Job Cancellation ─────────────────────────────────
# Cancel a scheduled notification before it sends
print("\n4. Job cancellation")
scheduled = send_sms.apply_async(
args=["+1234567890", "Your order ships tomorrow"],
delay=7200, # 2 hours from now
)
print(f" Scheduled SMS: {scheduled.id}")
# User updated their preference — cancel the SMS
cancelled = queue.cancel_job(scheduled.id)
print(f" Cancelled: {cancelled}") # True
job = queue.get_job(scheduled.id)
print(f" Status: {job.status}") # "cancelled"
# ── 5. Inspect Pending Notifications ─────────────────────
print("\n5. Pending notifications")
pending = queue.list_jobs(status="pending", limit=10)
for j in pending:
d = j.to_dict()
print(f" {d['id'][:12]}... | {d['task_name']} | priority={d['priority']}")
stats = queue.stats()
print(f"\nQueue stats: {stats}")
Running It¶
Key Patterns Demonstrated¶
| Pattern | Where |
|---|---|
| Delayed scheduling | delay=1800 — reminder runs 30 min later |
| Unique tasks | unique_key="notify:user_456:..." — deduplicates |
| Priority queues | priority=10 urgent runs before priority=0 bulk |
| Job cancellation | queue.cancel_job() revokes a scheduled SMS |
| Batch enqueue | send_bulk_email.map() for newsletter |
| Periodic tasks | daily_digest runs every day at 9 AM |
| Result TTL | result_ttl=7200 — auto-cleanup after 2 hours |
| Retry with backoff | send_push — 5 retries, 2x backoff |
| Job inspection | queue.list_jobs() to view pending notifications |