Skip to content

Example: Notification Service

A notification service demonstrating delayed scheduling, unique tasks, priority queues, and job cancellation.

Project Structure

notifications/
  tasks.py     # Notification task definitions
  service.py   # Enqueue notifications with scheduling

tasks.py

"""Notification tasks with priority and deduplication."""

import time

from taskito import Queue

queue = Queue(
    db_path=".taskito/notifications.db",
    workers=4,
    default_retry=3,
    default_timeout=30,
    result_ttl=7200,  # auto-cleanup after 2 hours
)

# ── Notification Tasks ───────────────────────────────────

@queue.task(priority=10)
def send_urgent_email(to: str, subject: str, body: str) -> dict:
    """High-priority email — runs before bulk notifications."""
    # Simulate sending
    time.sleep(0.2)
    print(f"[URGENT] Email to {to}: {subject}")
    return {"to": to, "subject": subject, "sent": True}

@queue.task(priority=0)
def send_bulk_email(to: str, subject: str, body: str) -> dict:
    """Low-priority bulk email."""
    time.sleep(0.1)
    print(f"[BULK] Email to {to}: {subject}")
    return {"to": to, "subject": subject, "sent": True}

@queue.task(priority=5, max_retries=5, retry_backoff=2.0)
def send_push(user_id: str, title: str, message: str) -> dict:
    """Push notification with retries."""
    time.sleep(0.3)
    print(f"[PUSH] {user_id}: {title}")
    return {"user_id": user_id, "title": title, "sent": True}

@queue.task()
def send_sms(phone: str, message: str) -> dict:
    """SMS notification."""
    time.sleep(0.5)
    print(f"[SMS] {phone}: {message}")
    return {"phone": phone, "sent": True}

# ── Periodic Digest ──────────────────────────────────────

@queue.periodic(cron="0 9 * * * *")
def daily_digest():
    """Send daily digest email every day at 9 AM."""
    print("[DIGEST] Sending daily digest to all subscribers...")

service.py

"""Notification service — enqueue with scheduling, deduplication, and cancellation."""

import time
from tasks import (
    queue,
    send_urgent_email,
    send_bulk_email,
    send_push,
    send_sms,
)

# ── 1. Delayed Scheduling ───────────────────────────────
# Schedule a reminder 30 minutes from now

print("1. Delayed scheduling")
reminder = send_push.apply_async(
    args=["user_123", "Reminder", "Your meeting starts in 5 minutes"],
    delay=1800,  # 30 minutes from now
)
print(f"   Scheduled reminder: {reminder.id} (runs in 30 min)")

# Schedule a follow-up email for 1 hour later
followup = send_bulk_email.apply_async(
    args=["user@example.com", "How was your meeting?", "We'd love your feedback."],
    delay=3600,  # 1 hour from now
)
print(f"   Scheduled follow-up: {followup.id} (runs in 1 hour)")

# ── 2. Unique Tasks (Deduplication) ─────────────────────
# Prevent duplicate notifications for the same event

print("\n2. Unique tasks")
job1 = send_push.apply_async(
    args=["user_456", "New message", "You have a new message"],
    unique_key="notify:user_456:new_message",
)
job2 = send_push.apply_async(
    args=["user_456", "New message", "You have a new message"],
    unique_key="notify:user_456:new_message",
)
print(f"   First enqueue:  {job1.id}")
print(f"   Second enqueue: {job2.id}")
print(f"   Same job? {job1.id == job2.id}")  # True — deduplicated

# ── 3. Priority Queues ──────────────────────────────────
# Urgent notifications run before bulk

print("\n3. Priority queues")
# Enqueue bulk emails first
bulk_jobs = send_bulk_email.map([
    ("alice@example.com", "Newsletter", "This week's updates..."),
    ("bob@example.com", "Newsletter", "This week's updates..."),
    ("carol@example.com", "Newsletter", "This week's updates..."),
])
print(f"   Enqueued {len(bulk_jobs)} bulk emails (priority=0)")

# Enqueue urgent email after — but it runs first due to priority=10
urgent = send_urgent_email.delay(
    "admin@example.com",
    "Server Alert",
    "CPU usage exceeded 90%",
)
print(f"   Enqueued urgent email (priority=10) — runs before bulk")

# ── 4. Job Cancellation ─────────────────────────────────
# Cancel a scheduled notification before it sends

print("\n4. Job cancellation")
scheduled = send_sms.apply_async(
    args=["+1234567890", "Your order ships tomorrow"],
    delay=7200,  # 2 hours from now
)
print(f"   Scheduled SMS: {scheduled.id}")

# User updated their preference — cancel the SMS
cancelled = queue.cancel_job(scheduled.id)
print(f"   Cancelled: {cancelled}")  # True

job = queue.get_job(scheduled.id)
print(f"   Status: {job.status}")  # "cancelled"

# ── 5. Inspect Pending Notifications ─────────────────────

print("\n5. Pending notifications")
pending = queue.list_jobs(status="pending", limit=10)
for j in pending:
    d = j.to_dict()
    print(f"   {d['id'][:12]}... | {d['task_name']} | priority={d['priority']}")

stats = queue.stats()
print(f"\nQueue stats: {stats}")

Running It

taskito worker --app tasks:queue
python service.py
taskito info --app tasks:queue --watch

Key Patterns Demonstrated

Pattern Where
Delayed scheduling delay=1800 — reminder runs 30 min later
Unique tasks unique_key="notify:user_456:..." — deduplicates
Priority queues priority=10 urgent runs before priority=0 bulk
Job cancellation queue.cancel_job() revokes a scheduled SMS
Batch enqueue send_bulk_email.map() for newsletter
Periodic tasks daily_digest runs every day at 9 AM
Result TTL result_ttl=7200 — auto-cleanup after 2 hours
Retry with backoff send_push — 5 retries, 2x backoff
Job inspection queue.list_jobs() to view pending notifications