Django-RQ Examples
Comprehensive collection of real-world examples demonstrating Django-RQ usage in django-cfg projects, from simple tasks to advanced patterns.
Table of Contents
Basics
Real-World Examples
Advanced Patterns
Best Practices
Simple Task
Basic Task Function
# apps/myapp/tasks.py
def say_hello(name: str) -> str:
"""Simple task that returns a greeting."""
return f"Hello, {name}!"
# Enqueue the task
import django_rq
queue = django_rq.get_queue('default')
job = queue.enqueue('apps.myapp.tasks.say_hello', name="Alice")
# Get result
print(job.result) # "Hello, Alice!"Task with Type Hints
from typing import Dict, Any
def process_data(data: Dict[str, Any], options: Dict[str, Any] = None) -> Dict[str, Any]:
"""
Process data with optional configuration.
Args:
data: Input data to process
options: Optional processing options
Returns:
Processed data with statistics
"""
options = options or {}
# Process data
result = {
"success": True,
"processed": len(data),
"options": options,
}
return result
# Enqueue
job = queue.enqueue(
'apps.myapp.tasks.process_data',
data={"key": "value"},
options={"format": "json"},
)Task with Arguments
Positional and Keyword Arguments
def create_user_profile(user_id: int, bio: str = "", interests: list = None) -> dict:
"""Create user profile with given data."""
from apps.profiles.models import UserProfile
from django.contrib.auth import get_user_model
User = get_user_model()
user = User.objects.get(pk=user_id)
profile = UserProfile.objects.create(
user=user,
bio=bio,
interests=interests or [],
)
return {
"success": True,
"profile_id": profile.id,
"user_id": user_id,
}
# Enqueue with positional args
job = queue.enqueue(
'apps.profiles.tasks.create_user_profile',
123, # user_id
"Software developer", # bio
["Python", "Django"], # interests
)
# Enqueue with keyword args
job = queue.enqueue(
'apps.profiles.tasks.create_user_profile',
user_id=123,
bio="Software developer",
interests=["Python", "Django"],
)Task with Complex Arguments
from dataclasses import dataclass
from typing import List
@dataclass
class ImportConfig:
source: str
batch_size: int
validate: bool
def import_data_batch(config: dict, items: List[dict]) -> dict:
"""
Import batch of items with configuration.
Note: Complex objects are serialized to dict/JSON.
"""
# Reconstruct config from dict
source = config['source']
batch_size = config['batch_size']
validate = config['validate']
imported = 0
for item in items[:batch_size]:
if validate:
# Validate item
pass
# Import item
imported += 1
return {
"success": True,
"imported": imported,
"source": source,
}
# Enqueue with complex args
config = {
"source": "api",
"batch_size": 100,
"validate": True,
}
items = [{"id": 1, "name": "Item 1"}, ...]
job = queue.enqueue(
'apps.myapp.tasks.import_data_batch',
config=config,
items=items,
)Real-World Example: Update Cryptocurrency Prices
Task Function
# apps/crypto/tasks.py
import logging
import random
from decimal import Decimal
from typing import Optional
from django.utils import timezone
from apps.crypto.models import Coin
logger = logging.getLogger(__name__)
def update_coin_prices(limit: int = 100, verbosity: int = 0, days: Optional[int] = None, force: bool = False) -> dict:
"""
Update cryptocurrency prices for active coins.
In production, this would fetch real data from CoinGecko/CoinMarketCap API.
For demo purposes, it generates realistic price movements.
Args:
limit: Maximum number of coins to update
verbosity: Logging verbosity (0=quiet, 1=normal, 2=verbose)
days: Days of historical data to update (not used in demo)
force: Force update even if recently updated
Returns:
dict with update statistics
"""
if verbosity > 0:
logger.info(f"Starting coin price update (limit={limit}, force={force})")
# Get active coins to update
queryset = Coin.objects.filter(is_active=True)[:limit]
if not queryset.exists():
logger.warning("No active coins found to update")
return {
"success": True,
"updated": 0,
"skipped": 0,
"failed": 0,
"message": "No active coins to update"
}
updated_count = 0
skipped_count = 0
failed_count = 0
for coin in queryset:
try:
# Demo: Generate realistic price movements
# In production: price = fetch_from_api(coin.symbol)
# Generate random price change between -10% and +10%
price_change_pct = Decimal(str(random.uniform(-0.10, 0.10)))
old_price = coin.current_price_usd
if old_price > 0:
new_price = old_price * (1 + price_change_pct)
else:
# If no price set, generate a random starting price
new_price = Decimal(str(random.uniform(0.01, 50000)))
# Update coin data
coin.current_price_usd = new_price
coin.price_change_24h_percent = price_change_pct * 100
# Generate random volume and market cap
coin.volume_24h_usd = new_price * Decimal(str(random.randint(1000000, 100000000)))
coin.market_cap_usd = new_price * Decimal(str(random.randint(1000000, 1000000000)))
coin.save(update_fields=[
'current_price_usd',
'price_change_24h_percent',
'volume_24h_usd',
'market_cap_usd',
'updated_at'
])
updated_count += 1
if verbosity >= 2:
logger.info(
f"✓ Updated {coin.symbol}: "
f"${old_price:.2f} → ${new_price:.2f} "
f"({price_change_pct * 100:+.2f}%)"
)
except Exception as e:
failed_count += 1
logger.error(f"Failed to update {coin.symbol}: {e}")
result = {
"success": True,
"updated": updated_count,
"skipped": skipped_count,
"failed": failed_count,
"timestamp": timezone.now().isoformat(),
"message": f"Successfully updated {updated_count} coins"
}
if verbosity > 0:
logger.info(
f"Price update completed: {updated_count} updated, "
f"{skipped_count} skipped, {failed_count} failed"
)
return resultEnqueue from View
# apps/crypto/views.py
from rest_framework.decorators import api_view
from rest_framework.response import Response
import django_rq
@api_view(['POST'])
def trigger_price_update(request):
"""Trigger cryptocurrency price update."""
limit = request.data.get('limit', 100)
verbosity = request.data.get('verbosity', 1)
# Enqueue job
queue = django_rq.get_queue('default')
job = queue.enqueue(
'apps.crypto.tasks.update_coin_prices',
limit=limit,
verbosity=verbosity,
job_timeout=360, # 6 minutes
)
return Response({
"success": True,
"job_id": job.id,
"queue": "default",
"message": f"Price update job enqueued (limit={limit})",
})Schedule Task
# api/config.py
class MyConfig(DjangoConfig):
django_rq: DjangoRQConfig = DjangoRQConfig(
enabled=True,
queues=[...],
schedules=[
# Update prices every 5 minutes
RQScheduleConfig(
func="apps.crypto.tasks.update_coin_prices",
interval=300, # seconds
queue="default",
limit=50,
verbosity=0,
description="Update coin prices (frequent)",
),
# Update all coins hourly with verbose logging
RQScheduleConfig(
func="apps.crypto.tasks.update_coin_prices",
interval=3600,
queue="default",
limit=100,
verbosity=1,
description="Update coin prices (hourly)",
),
],
)Management Command
# apps/crypto/management/commands/update_coin_prices.py
from django.core.management.base import BaseCommand
from apps.crypto.tasks import update_coin_prices
class Command(BaseCommand):
help = 'Update cryptocurrency prices from CoinGecko API'
def add_arguments(self, parser):
parser.add_argument('--limit', type=int, default=100)
parser.add_argument('--verbosity', type=int, default=1, choices=[0, 1, 2])
parser.add_argument('--force', action='store_true')
def handle(self, *args, **options):
limit = options.get('limit', 100)
verbosity = options.get('verbosity', 1)
force = options.get('force', False)
# Call task function directly (synchronous)
result = update_coin_prices(
limit=limit,
verbosity=verbosity,
force=force
)
# Display results
if result['success']:
self.stdout.write(
self.style.SUCCESS(
f"✓ Price update completed:\n"
f" - Updated: {result['updated']}\n"
f" - Skipped: {result['skipped']}\n"
f" - Failed: {result['failed']}"
)
)
else:
self.stdout.write(
self.style.ERROR(f"✗ Price update failed: {result.get('message')}")
)Real-World Example: Import Data
Task Function
# apps/crypto/tasks.py
def import_coins(source: str = "demo", batch_size: int = 10) -> dict:
"""
Import new cryptocurrency coins.
In production, this would fetch data from external APIs or CSV files.
For demo purposes, it creates sample coins.
Args:
source: Data source (demo, api, csv)
batch_size: Number of coins to import in this batch
Returns:
dict with import statistics
"""
logger.info(f"Starting coin import from {source} (batch_size={batch_size})")
# Demo coin data
demo_coins = [
{"symbol": "BTC", "name": "Bitcoin", "rank": 1},
{"symbol": "ETH", "name": "Ethereum", "rank": 2},
{"symbol": "BNB", "name": "Binance Coin", "rank": 3},
{"symbol": "SOL", "name": "Solana", "rank": 4},
{"symbol": "XRP", "name": "Ripple", "rank": 5},
{"symbol": "ADA", "name": "Cardano", "rank": 6},
{"symbol": "DOGE", "name": "Dogecoin", "rank": 7},
{"symbol": "TRX", "name": "TRON", "rank": 8},
{"symbol": "AVAX", "name": "Avalanche", "rank": 9},
{"symbol": "DOT", "name": "Polkadot", "rank": 10},
]
imported_count = 0
updated_count = 0
skipped_count = 0
for coin_data in demo_coins[:batch_size]:
symbol = coin_data["symbol"]
# Check if coin already exists
coin, created = Coin.objects.get_or_create(
symbol=symbol,
defaults={
"name": coin_data["name"],
"slug": coin_data["name"].lower().replace(" ", "-"),
"rank": coin_data["rank"],
"is_active": True,
"is_tradeable": True,
"current_price_usd": Decimal(str(random.uniform(0.01, 50000))),
"market_cap_usd": Decimal(str(random.randint(1000000, 1000000000))),
"volume_24h_usd": Decimal(str(random.randint(1000000, 100000000))),
}
)
if created:
imported_count += 1
logger.info(f"✓ Imported new coin: {symbol} - {coin_data['name']}")
else:
# Update existing coin
coin.name = coin_data["name"]
coin.rank = coin_data["rank"]
coin.save(update_fields=['name', 'rank', 'updated_at'])
updated_count += 1
logger.info(f"↻ Updated existing coin: {symbol}")
result = {
"success": True,
"imported": imported_count,
"updated": updated_count,
"skipped": skipped_count,
"timestamp": timezone.now().isoformat(),
"message": f"Imported {imported_count} new coins, updated {updated_count}"
}
logger.info(
f"Import completed: {imported_count} imported, "
f"{updated_count} updated, {skipped_count} skipped"
)
return resultSchedule Daily Import
schedules=[
# Import new coins daily at 3 AM
RQScheduleConfig(
func="apps.crypto.tasks.import_coins",
cron="0 3 * * *",
queue="low",
source="api",
batch_size=50,
description="Daily coin import from API",
),
]Real-World Example: Generate Reports
Task Function
# apps/crypto/tasks.py
from django.db.models import Avg, Sum
def generate_report(report_type: str = "daily") -> dict:
"""
Generate cryptocurrency market report.
Collects statistics and generates insights about the crypto market.
In production, this could send emails, store in DB, or export to PDF.
Args:
report_type: Type of report (daily, weekly, monthly)
Returns:
dict with report data
"""
logger.info(f"Generating {report_type} crypto market report")
# Get statistics
total_coins = Coin.objects.filter(is_active=True).count()
if total_coins == 0:
logger.warning("No coins found for report generation")
return {
"success": False,
"message": "No coins available for reporting"
}
# Calculate aggregate statistics
stats = Coin.objects.filter(is_active=True).aggregate(
avg_price_change_24h=Avg('price_change_24h_percent'),
total_volume_24h=Sum('volume_24h_usd'),
total_market_cap=Sum('market_cap_usd'),
)
# Get top gainers and losers
top_gainers = list(
Coin.objects.filter(is_active=True)
.order_by('-price_change_24h_percent')[:5]
.values('symbol', 'name', 'price_change_24h_percent', 'current_price_usd')
)
top_losers = list(
Coin.objects.filter(is_active=True)
.order_by('price_change_24h_percent')[:5]
.values('symbol', 'name', 'price_change_24h_percent', 'current_price_usd')
)
# Get top by market cap
top_by_market_cap = list(
Coin.objects.filter(is_active=True)
.order_by('-market_cap_usd')[:10]
.values('symbol', 'name', 'market_cap_usd', 'current_price_usd')
)
report = {
"success": True,
"report_type": report_type,
"generated_at": timezone.now().isoformat(),
"summary": {
"total_coins": total_coins,
"avg_price_change_24h": float(stats['avg_price_change_24h'] or 0),
"total_volume_24h": float(stats['total_volume_24h'] or 0),
"total_market_cap": float(stats['total_market_cap'] or 0),
},
"top_gainers": top_gainers,
"top_losers": top_losers,
"top_by_market_cap": top_by_market_cap,
}
logger.info(
f"Report generated: {total_coins} coins, "
f"avg 24h change: {stats['avg_price_change_24h']:.2f}%"
)
# In production, you could:
# - Send email with report
# - Store in database
# - Export to PDF
# - Send to Telegram/Slack
return reportSchedule Report Generation
schedules=[
# Daily report at midnight
RQScheduleConfig(
func="apps.crypto.tasks.generate_report",
cron="0 0 * * *",
queue="low",
report_type="daily",
description="Daily market report",
),
# Weekly report on Mondays at 9 AM
RQScheduleConfig(
func="apps.crypto.tasks.generate_report",
cron="0 9 * * 1",
queue="low",
report_type="weekly",
description="Weekly market summary",
),
]Send Emails
Simple Email Task
def send_welcome_email(user_id: int) -> dict:
"""Send welcome email to new user."""
from django.contrib.auth import get_user_model
from django.core.mail import send_mail
from django.template.loader import render_to_string
User = get_user_model()
user = User.objects.get(pk=user_id)
# Render email template
html_message = render_to_string('emails/welcome.html', {
'user': user,
})
# Send email
send_mail(
subject="Welcome to Our Platform!",
message=f"Hi {user.username}!",
from_email="[email protected]",
recipient_list=[user.email],
html_message=html_message,
)
return {
"success": True,
"user_id": user_id,
"email": user.email,
}
# Enqueue from signal
from django.db.models.signals import post_save
from django.dispatch import receiver
import django_rq
@receiver(post_save, sender=User)
def send_welcome_email_on_signup(sender, instance, created, **kwargs):
"""Send welcome email when user signs up."""
if created:
queue = django_rq.get_queue('default')
queue.enqueue('apps.accounts.tasks.send_welcome_email', user_id=instance.id)Bulk Email Task
def send_newsletter(newsletter_id: int, batch_size: int = 100) -> dict:
"""Send newsletter to all subscribers in batches."""
from apps.newsletter.models import Newsletter, Subscriber
newsletter = Newsletter.objects.get(pk=newsletter_id)
subscribers = Subscriber.objects.filter(is_active=True)
sent = 0
failed = 0
for subscriber in subscribers:
try:
send_mail(
subject=newsletter.subject,
message=newsletter.text_content,
from_email="[email protected]",
recipient_list=[subscriber.email],
html_message=newsletter.html_content,
)
sent += 1
except Exception as e:
logger.error(f"Failed to send to {subscriber.email}: {e}")
failed += 1
return {
"success": True,
"newsletter_id": newsletter_id,
"sent": sent,
"failed": failed,
}
# Enqueue newsletter
queue = django_rq.get_queue('low') # Use low priority for bulk operations
job = queue.enqueue(
'apps.newsletter.tasks.send_newsletter',
newsletter_id=123,
batch_size=100,
job_timeout=1800, # 30 minutes for bulk operation
)Long-Running Tasks
Task with Progress Tracking
def process_large_dataset(dataset_id: int) -> dict:
"""Process large dataset with progress tracking."""
from rq import get_current_job
from apps.data.models import Dataset, DataRow
job = get_current_job()
dataset = Dataset.objects.get(pk=dataset_id)
rows = DataRow.objects.filter(dataset=dataset)
total = rows.count()
processed = 0
for row in rows:
# Process row
process_row(row)
processed += 1
# Update progress every 100 rows
if processed % 100 == 0:
progress = (processed / total) * 100
job.meta['progress'] = progress
job.meta['processed'] = processed
job.meta['total'] = total
job.save_meta()
logger.info(f"Progress: {progress:.1f}% ({processed}/{total})")
return {
"success": True,
"dataset_id": dataset_id,
"processed": processed,
"total": total,
}
# Check progress from view
@api_view(['GET'])
def check_job_progress(request, job_id):
"""Check job progress."""
queue = django_rq.get_queue('default')
job = queue.fetch_job(job_id)
if not job:
return Response({"error": "Job not found"}, status=404)
return Response({
"job_id": job_id,
"status": job.get_status(),
"progress": job.meta.get('progress', 0),
"processed": job.meta.get('processed', 0),
"total": job.meta.get('total', 0),
})Task with Retry
Automatic Retry
from rq import Retry
def fetch_api_data(url: str) -> dict:
"""Fetch data from API with automatic retry."""
import requests
response = requests.get(url, timeout=30)
response.raise_for_status() # Raises exception on HTTP error
return response.json()
# Enqueue with retry
queue.enqueue(
'apps.myapp.tasks.fetch_api_data',
url="https://api.example.com/data",
retry=Retry(max=3, interval=[10, 30, 60]), # Retry 3 times with delays
)Manual Retry Logic
def fetch_with_retry(url: str, max_retries: int = 3) -> dict:
"""Fetch data with manual retry logic."""
import requests
import time
for attempt in range(max_retries):
try:
response = requests.get(url, timeout=30)
response.raise_for_status()
return {
"success": True,
"data": response.json(),
"attempts": attempt + 1,
}
except requests.RequestException as e:
if attempt < max_retries - 1:
wait_time = 2 ** attempt # Exponential backoff
logger.warning(f"Attempt {attempt + 1} failed, retrying in {wait_time}s: {e}")
time.sleep(wait_time)
else:
logger.error(f"All {max_retries} attempts failed: {e}")
return {
"success": False,
"error": str(e),
"attempts": max_retries,
}Job Dependencies
Chain Jobs
# Job 1: Import data
job1 = queue.enqueue('apps.data.tasks.import_data', source_url='...')
# Job 2: Process data (runs after job1 completes)
job2 = queue.enqueue('apps.data.tasks.process_data', depends_on=job1)
# Job 3: Generate report (runs after job2 completes)
job3 = queue.enqueue('apps.data.tasks.generate_report', depends_on=job2)
# Check status
print(f"Job 1: {job1.get_status()}") # finished
print(f"Job 2: {job2.get_status()}") # started
print(f"Job 3: {job3.get_status()}") # deferred (waiting for job2)Parallel Jobs with Final Step
# Start multiple parallel jobs
job1 = queue.enqueue('apps.data.tasks.process_batch', batch_id=1)
job2 = queue.enqueue('apps.data.tasks.process_batch', batch_id=2)
job3 = queue.enqueue('apps.data.tasks.process_batch', batch_id=3)
# Final job depends on all parallel jobs
final_job = queue.enqueue(
'apps.data.tasks.merge_results',
depends_on=[job1, job2, job3]
)Scheduled Tasks
Cron-based Scheduling
schedules=[
# Every 5 minutes
RQScheduleConfig(
func="apps.monitoring.tasks.health_check",
cron="*/5 * * * *",
queue="high",
),
# Daily at 2 AM
RQScheduleConfig(
func="apps.data.tasks.cleanup_old_records",
cron="0 2 * * *",
queue="low",
days=30, # Delete records older than 30 days
),
# Weekly on Sunday at midnight
RQScheduleConfig(
func="apps.reports.tasks.weekly_summary",
cron="0 0 * * 0",
queue="low",
),
# Monthly on 1st at 3 AM
RQScheduleConfig(
func="apps.billing.tasks.generate_invoices",
cron="0 3 1 * *",
queue="default",
),
]Interval-based Scheduling
schedules=[
# Every 30 seconds
RQScheduleConfig(
func="apps.monitoring.tasks.check_workers",
interval=30,
queue="high",
),
# Every hour
RQScheduleConfig(
func="apps.cache.tasks.warm_cache",
interval=3600,
queue="default",
),
# Every 6 hours
RQScheduleConfig(
func="apps.data.tasks.sync_external_data",
interval=21600,
queue="low",
),
]Best Practices
Error Handling
def robust_task(item_id: int) -> dict:
"""Task with comprehensive error handling."""
try:
# Get item
try:
item = MyModel.objects.get(pk=item_id)
except MyModel.DoesNotExist:
logger.error(f"Item {item_id} not found")
return {
"success": False,
"error": "Item not found",
"item_id": item_id,
}
# Process item
try:
result = process_item(item)
except ValidationError as e:
logger.error(f"Validation failed for item {item_id}: {e}")
return {
"success": False,
"error": "Validation failed",
"details": str(e),
"item_id": item_id,
}
# Save result
try:
save_result(result)
except Exception as e:
logger.error(f"Failed to save result for item {item_id}: {e}")
return {
"success": False,
"error": "Save failed",
"details": str(e),
"item_id": item_id,
}
return {
"success": True,
"item_id": item_id,
"result": result,
}
except Exception as e:
# Catch-all for unexpected errors
logger.exception(f"Unexpected error in robust_task for item {item_id}")
return {
"success": False,
"error": "Unexpected error",
"details": str(e),
"item_id": item_id,
}Testing Tasks
# tests/test_tasks.py
from django.test import TestCase
from apps.crypto.tasks import update_coin_prices
from apps.crypto.models import Coin
class TaskTests(TestCase):
def setUp(self):
"""Create test data."""
self.coin = Coin.objects.create(
symbol="BTC",
name="Bitcoin",
is_active=True,
current_price_usd=50000,
)
def test_update_coin_prices(self):
"""Test update_coin_prices task."""
# Call task directly (synchronous)
result = update_coin_prices(limit=10, verbosity=0)
# Assert results
self.assertTrue(result['success'])
self.assertEqual(result['updated'], 1)
self.assertEqual(result['failed'], 0)
# Check coin was updated
self.coin.refresh_from_db()
self.assertGreater(self.coin.updated_at, self.coin.created_at)
def test_update_coin_prices_no_coins(self):
"""Test update with no active coins."""
Coin.objects.all().delete()
result = update_coin_prices(limit=10)
self.assertTrue(result['success'])
self.assertEqual(result['updated'], 0)
self.assertIn("No active coins", result['message'])Performance Tips
# ✅ Good: Bulk operations
def process_items_bulk(item_ids: list) -> dict:
"""Process multiple items in single query."""
items = MyModel.objects.filter(id__in=item_ids)
# Process all items
for item in items:
item.status = "processed"
# Bulk update
MyModel.objects.bulk_update(items, ['status'])
return {"success": True, "processed": len(items)}
# ❌ Bad: Individual queries
def process_items_slow(item_ids: list) -> dict:
"""Process items one by one (slow)."""
for item_id in item_ids:
item = MyModel.objects.get(pk=item_id) # N queries!
item.status = "processed"
item.save()
return {"success": True, "processed": len(item_ids)}
# ✅ Good: Select related
def process_with_relations(order_id: int) -> dict:
"""Process order with relations efficiently."""
order = Order.objects.select_related('user', 'shipping').prefetch_related('items').get(pk=order_id)
# All data loaded in 1-2 queries
user = order.user
items = order.items.all()
shipping = order.shipping
return {"success": True}See Also
Documentation
- Overview - Introduction and features
- Architecture - System design
- Configuration - Setup guide
- Monitoring - Monitoring and management
Reference
- Django-RQ Docs - Official documentation
- RQ Docs - Core RQ documentation
- Best Practices - RQ patterns