Skip to Content

Django-RQ Examples

Comprehensive collection of real-world examples demonstrating Django-RQ usage in django-cfg projects, from simple tasks to advanced patterns.


Table of Contents

Basics

Real-World Examples

Advanced Patterns

Best Practices


Simple Task

Basic Task Function

# apps/myapp/tasks.py def say_hello(name: str) -> str: """Simple task that returns a greeting.""" return f"Hello, {name}!" # Enqueue the task import django_rq queue = django_rq.get_queue('default') job = queue.enqueue('apps.myapp.tasks.say_hello', name="Alice") # Get result print(job.result) # "Hello, Alice!"

Task with Type Hints

from typing import Dict, Any def process_data(data: Dict[str, Any], options: Dict[str, Any] = None) -> Dict[str, Any]: """ Process data with optional configuration. Args: data: Input data to process options: Optional processing options Returns: Processed data with statistics """ options = options or {} # Process data result = { "success": True, "processed": len(data), "options": options, } return result # Enqueue job = queue.enqueue( 'apps.myapp.tasks.process_data', data={"key": "value"}, options={"format": "json"}, )

Task with Arguments

Positional and Keyword Arguments

def create_user_profile(user_id: int, bio: str = "", interests: list = None) -> dict: """Create user profile with given data.""" from apps.profiles.models import UserProfile from django.contrib.auth import get_user_model User = get_user_model() user = User.objects.get(pk=user_id) profile = UserProfile.objects.create( user=user, bio=bio, interests=interests or [], ) return { "success": True, "profile_id": profile.id, "user_id": user_id, } # Enqueue with positional args job = queue.enqueue( 'apps.profiles.tasks.create_user_profile', 123, # user_id "Software developer", # bio ["Python", "Django"], # interests ) # Enqueue with keyword args job = queue.enqueue( 'apps.profiles.tasks.create_user_profile', user_id=123, bio="Software developer", interests=["Python", "Django"], )

Task with Complex Arguments

from dataclasses import dataclass from typing import List @dataclass class ImportConfig: source: str batch_size: int validate: bool def import_data_batch(config: dict, items: List[dict]) -> dict: """ Import batch of items with configuration. Note: Complex objects are serialized to dict/JSON. """ # Reconstruct config from dict source = config['source'] batch_size = config['batch_size'] validate = config['validate'] imported = 0 for item in items[:batch_size]: if validate: # Validate item pass # Import item imported += 1 return { "success": True, "imported": imported, "source": source, } # Enqueue with complex args config = { "source": "api", "batch_size": 100, "validate": True, } items = [{"id": 1, "name": "Item 1"}, ...] job = queue.enqueue( 'apps.myapp.tasks.import_data_batch', config=config, items=items, )

Real-World Example: Update Cryptocurrency Prices

Task Function

# apps/crypto/tasks.py import logging import random from decimal import Decimal from typing import Optional from django.utils import timezone from apps.crypto.models import Coin logger = logging.getLogger(__name__) def update_coin_prices(limit: int = 100, verbosity: int = 0, days: Optional[int] = None, force: bool = False) -> dict: """ Update cryptocurrency prices for active coins. In production, this would fetch real data from CoinGecko/CoinMarketCap API. For demo purposes, it generates realistic price movements. Args: limit: Maximum number of coins to update verbosity: Logging verbosity (0=quiet, 1=normal, 2=verbose) days: Days of historical data to update (not used in demo) force: Force update even if recently updated Returns: dict with update statistics """ if verbosity > 0: logger.info(f"Starting coin price update (limit={limit}, force={force})") # Get active coins to update queryset = Coin.objects.filter(is_active=True)[:limit] if not queryset.exists(): logger.warning("No active coins found to update") return { "success": True, "updated": 0, "skipped": 0, "failed": 0, "message": "No active coins to update" } updated_count = 0 skipped_count = 0 failed_count = 0 for coin in queryset: try: # Demo: Generate realistic price movements # In production: price = fetch_from_api(coin.symbol) # Generate random price change between -10% and +10% price_change_pct = Decimal(str(random.uniform(-0.10, 0.10))) old_price = coin.current_price_usd if old_price > 0: new_price = old_price * (1 + price_change_pct) else: # If no price set, generate a random starting price new_price = Decimal(str(random.uniform(0.01, 50000))) # Update coin data coin.current_price_usd = new_price coin.price_change_24h_percent = price_change_pct * 100 # Generate random volume and market cap coin.volume_24h_usd = new_price * Decimal(str(random.randint(1000000, 100000000))) coin.market_cap_usd = new_price * Decimal(str(random.randint(1000000, 1000000000))) coin.save(update_fields=[ 'current_price_usd', 'price_change_24h_percent', 'volume_24h_usd', 'market_cap_usd', 'updated_at' ]) updated_count += 1 if verbosity >= 2: logger.info( f"✓ Updated {coin.symbol}: " f"${old_price:.2f} → ${new_price:.2f} " f"({price_change_pct * 100:+.2f}%)" ) except Exception as e: failed_count += 1 logger.error(f"Failed to update {coin.symbol}: {e}") result = { "success": True, "updated": updated_count, "skipped": skipped_count, "failed": failed_count, "timestamp": timezone.now().isoformat(), "message": f"Successfully updated {updated_count} coins" } if verbosity > 0: logger.info( f"Price update completed: {updated_count} updated, " f"{skipped_count} skipped, {failed_count} failed" ) return result

Enqueue from View

# apps/crypto/views.py from rest_framework.decorators import api_view from rest_framework.response import Response import django_rq @api_view(['POST']) def trigger_price_update(request): """Trigger cryptocurrency price update.""" limit = request.data.get('limit', 100) verbosity = request.data.get('verbosity', 1) # Enqueue job queue = django_rq.get_queue('default') job = queue.enqueue( 'apps.crypto.tasks.update_coin_prices', limit=limit, verbosity=verbosity, job_timeout=360, # 6 minutes ) return Response({ "success": True, "job_id": job.id, "queue": "default", "message": f"Price update job enqueued (limit={limit})", })

Schedule Task

# api/config.py class MyConfig(DjangoConfig): django_rq: DjangoRQConfig = DjangoRQConfig( enabled=True, queues=[...], schedules=[ # Update prices every 5 minutes RQScheduleConfig( func="apps.crypto.tasks.update_coin_prices", interval=300, # seconds queue="default", limit=50, verbosity=0, description="Update coin prices (frequent)", ), # Update all coins hourly with verbose logging RQScheduleConfig( func="apps.crypto.tasks.update_coin_prices", interval=3600, queue="default", limit=100, verbosity=1, description="Update coin prices (hourly)", ), ], )

Management Command

# apps/crypto/management/commands/update_coin_prices.py from django.core.management.base import BaseCommand from apps.crypto.tasks import update_coin_prices class Command(BaseCommand): help = 'Update cryptocurrency prices from CoinGecko API' def add_arguments(self, parser): parser.add_argument('--limit', type=int, default=100) parser.add_argument('--verbosity', type=int, default=1, choices=[0, 1, 2]) parser.add_argument('--force', action='store_true') def handle(self, *args, **options): limit = options.get('limit', 100) verbosity = options.get('verbosity', 1) force = options.get('force', False) # Call task function directly (synchronous) result = update_coin_prices( limit=limit, verbosity=verbosity, force=force ) # Display results if result['success']: self.stdout.write( self.style.SUCCESS( f"✓ Price update completed:\n" f" - Updated: {result['updated']}\n" f" - Skipped: {result['skipped']}\n" f" - Failed: {result['failed']}" ) ) else: self.stdout.write( self.style.ERROR(f"✗ Price update failed: {result.get('message')}") )

Real-World Example: Import Data

Task Function

# apps/crypto/tasks.py def import_coins(source: str = "demo", batch_size: int = 10) -> dict: """ Import new cryptocurrency coins. In production, this would fetch data from external APIs or CSV files. For demo purposes, it creates sample coins. Args: source: Data source (demo, api, csv) batch_size: Number of coins to import in this batch Returns: dict with import statistics """ logger.info(f"Starting coin import from {source} (batch_size={batch_size})") # Demo coin data demo_coins = [ {"symbol": "BTC", "name": "Bitcoin", "rank": 1}, {"symbol": "ETH", "name": "Ethereum", "rank": 2}, {"symbol": "BNB", "name": "Binance Coin", "rank": 3}, {"symbol": "SOL", "name": "Solana", "rank": 4}, {"symbol": "XRP", "name": "Ripple", "rank": 5}, {"symbol": "ADA", "name": "Cardano", "rank": 6}, {"symbol": "DOGE", "name": "Dogecoin", "rank": 7}, {"symbol": "TRX", "name": "TRON", "rank": 8}, {"symbol": "AVAX", "name": "Avalanche", "rank": 9}, {"symbol": "DOT", "name": "Polkadot", "rank": 10}, ] imported_count = 0 updated_count = 0 skipped_count = 0 for coin_data in demo_coins[:batch_size]: symbol = coin_data["symbol"] # Check if coin already exists coin, created = Coin.objects.get_or_create( symbol=symbol, defaults={ "name": coin_data["name"], "slug": coin_data["name"].lower().replace(" ", "-"), "rank": coin_data["rank"], "is_active": True, "is_tradeable": True, "current_price_usd": Decimal(str(random.uniform(0.01, 50000))), "market_cap_usd": Decimal(str(random.randint(1000000, 1000000000))), "volume_24h_usd": Decimal(str(random.randint(1000000, 100000000))), } ) if created: imported_count += 1 logger.info(f"✓ Imported new coin: {symbol} - {coin_data['name']}") else: # Update existing coin coin.name = coin_data["name"] coin.rank = coin_data["rank"] coin.save(update_fields=['name', 'rank', 'updated_at']) updated_count += 1 logger.info(f"↻ Updated existing coin: {symbol}") result = { "success": True, "imported": imported_count, "updated": updated_count, "skipped": skipped_count, "timestamp": timezone.now().isoformat(), "message": f"Imported {imported_count} new coins, updated {updated_count}" } logger.info( f"Import completed: {imported_count} imported, " f"{updated_count} updated, {skipped_count} skipped" ) return result

Schedule Daily Import

schedules=[ # Import new coins daily at 3 AM RQScheduleConfig( func="apps.crypto.tasks.import_coins", cron="0 3 * * *", queue="low", source="api", batch_size=50, description="Daily coin import from API", ), ]

Real-World Example: Generate Reports

Task Function

# apps/crypto/tasks.py from django.db.models import Avg, Sum def generate_report(report_type: str = "daily") -> dict: """ Generate cryptocurrency market report. Collects statistics and generates insights about the crypto market. In production, this could send emails, store in DB, or export to PDF. Args: report_type: Type of report (daily, weekly, monthly) Returns: dict with report data """ logger.info(f"Generating {report_type} crypto market report") # Get statistics total_coins = Coin.objects.filter(is_active=True).count() if total_coins == 0: logger.warning("No coins found for report generation") return { "success": False, "message": "No coins available for reporting" } # Calculate aggregate statistics stats = Coin.objects.filter(is_active=True).aggregate( avg_price_change_24h=Avg('price_change_24h_percent'), total_volume_24h=Sum('volume_24h_usd'), total_market_cap=Sum('market_cap_usd'), ) # Get top gainers and losers top_gainers = list( Coin.objects.filter(is_active=True) .order_by('-price_change_24h_percent')[:5] .values('symbol', 'name', 'price_change_24h_percent', 'current_price_usd') ) top_losers = list( Coin.objects.filter(is_active=True) .order_by('price_change_24h_percent')[:5] .values('symbol', 'name', 'price_change_24h_percent', 'current_price_usd') ) # Get top by market cap top_by_market_cap = list( Coin.objects.filter(is_active=True) .order_by('-market_cap_usd')[:10] .values('symbol', 'name', 'market_cap_usd', 'current_price_usd') ) report = { "success": True, "report_type": report_type, "generated_at": timezone.now().isoformat(), "summary": { "total_coins": total_coins, "avg_price_change_24h": float(stats['avg_price_change_24h'] or 0), "total_volume_24h": float(stats['total_volume_24h'] or 0), "total_market_cap": float(stats['total_market_cap'] or 0), }, "top_gainers": top_gainers, "top_losers": top_losers, "top_by_market_cap": top_by_market_cap, } logger.info( f"Report generated: {total_coins} coins, " f"avg 24h change: {stats['avg_price_change_24h']:.2f}%" ) # In production, you could: # - Send email with report # - Store in database # - Export to PDF # - Send to Telegram/Slack return report

Schedule Report Generation

schedules=[ # Daily report at midnight RQScheduleConfig( func="apps.crypto.tasks.generate_report", cron="0 0 * * *", queue="low", report_type="daily", description="Daily market report", ), # Weekly report on Mondays at 9 AM RQScheduleConfig( func="apps.crypto.tasks.generate_report", cron="0 9 * * 1", queue="low", report_type="weekly", description="Weekly market summary", ), ]

Send Emails

Simple Email Task

def send_welcome_email(user_id: int) -> dict: """Send welcome email to new user.""" from django.contrib.auth import get_user_model from django.core.mail import send_mail from django.template.loader import render_to_string User = get_user_model() user = User.objects.get(pk=user_id) # Render email template html_message = render_to_string('emails/welcome.html', { 'user': user, }) # Send email send_mail( subject="Welcome to Our Platform!", message=f"Hi {user.username}!", from_email="[email protected]", recipient_list=[user.email], html_message=html_message, ) return { "success": True, "user_id": user_id, "email": user.email, } # Enqueue from signal from django.db.models.signals import post_save from django.dispatch import receiver import django_rq @receiver(post_save, sender=User) def send_welcome_email_on_signup(sender, instance, created, **kwargs): """Send welcome email when user signs up.""" if created: queue = django_rq.get_queue('default') queue.enqueue('apps.accounts.tasks.send_welcome_email', user_id=instance.id)

Bulk Email Task

def send_newsletter(newsletter_id: int, batch_size: int = 100) -> dict: """Send newsletter to all subscribers in batches.""" from apps.newsletter.models import Newsletter, Subscriber newsletter = Newsletter.objects.get(pk=newsletter_id) subscribers = Subscriber.objects.filter(is_active=True) sent = 0 failed = 0 for subscriber in subscribers: try: send_mail( subject=newsletter.subject, message=newsletter.text_content, from_email="[email protected]", recipient_list=[subscriber.email], html_message=newsletter.html_content, ) sent += 1 except Exception as e: logger.error(f"Failed to send to {subscriber.email}: {e}") failed += 1 return { "success": True, "newsletter_id": newsletter_id, "sent": sent, "failed": failed, } # Enqueue newsletter queue = django_rq.get_queue('low') # Use low priority for bulk operations job = queue.enqueue( 'apps.newsletter.tasks.send_newsletter', newsletter_id=123, batch_size=100, job_timeout=1800, # 30 minutes for bulk operation )

Long-Running Tasks

Task with Progress Tracking

def process_large_dataset(dataset_id: int) -> dict: """Process large dataset with progress tracking.""" from rq import get_current_job from apps.data.models import Dataset, DataRow job = get_current_job() dataset = Dataset.objects.get(pk=dataset_id) rows = DataRow.objects.filter(dataset=dataset) total = rows.count() processed = 0 for row in rows: # Process row process_row(row) processed += 1 # Update progress every 100 rows if processed % 100 == 0: progress = (processed / total) * 100 job.meta['progress'] = progress job.meta['processed'] = processed job.meta['total'] = total job.save_meta() logger.info(f"Progress: {progress:.1f}% ({processed}/{total})") return { "success": True, "dataset_id": dataset_id, "processed": processed, "total": total, } # Check progress from view @api_view(['GET']) def check_job_progress(request, job_id): """Check job progress.""" queue = django_rq.get_queue('default') job = queue.fetch_job(job_id) if not job: return Response({"error": "Job not found"}, status=404) return Response({ "job_id": job_id, "status": job.get_status(), "progress": job.meta.get('progress', 0), "processed": job.meta.get('processed', 0), "total": job.meta.get('total', 0), })

Task with Retry

Automatic Retry

from rq import Retry def fetch_api_data(url: str) -> dict: """Fetch data from API with automatic retry.""" import requests response = requests.get(url, timeout=30) response.raise_for_status() # Raises exception on HTTP error return response.json() # Enqueue with retry queue.enqueue( 'apps.myapp.tasks.fetch_api_data', url="https://api.example.com/data", retry=Retry(max=3, interval=[10, 30, 60]), # Retry 3 times with delays )

Manual Retry Logic

def fetch_with_retry(url: str, max_retries: int = 3) -> dict: """Fetch data with manual retry logic.""" import requests import time for attempt in range(max_retries): try: response = requests.get(url, timeout=30) response.raise_for_status() return { "success": True, "data": response.json(), "attempts": attempt + 1, } except requests.RequestException as e: if attempt < max_retries - 1: wait_time = 2 ** attempt # Exponential backoff logger.warning(f"Attempt {attempt + 1} failed, retrying in {wait_time}s: {e}") time.sleep(wait_time) else: logger.error(f"All {max_retries} attempts failed: {e}") return { "success": False, "error": str(e), "attempts": max_retries, }

Job Dependencies

Chain Jobs

# Job 1: Import data job1 = queue.enqueue('apps.data.tasks.import_data', source_url='...') # Job 2: Process data (runs after job1 completes) job2 = queue.enqueue('apps.data.tasks.process_data', depends_on=job1) # Job 3: Generate report (runs after job2 completes) job3 = queue.enqueue('apps.data.tasks.generate_report', depends_on=job2) # Check status print(f"Job 1: {job1.get_status()}") # finished print(f"Job 2: {job2.get_status()}") # started print(f"Job 3: {job3.get_status()}") # deferred (waiting for job2)

Parallel Jobs with Final Step

# Start multiple parallel jobs job1 = queue.enqueue('apps.data.tasks.process_batch', batch_id=1) job2 = queue.enqueue('apps.data.tasks.process_batch', batch_id=2) job3 = queue.enqueue('apps.data.tasks.process_batch', batch_id=3) # Final job depends on all parallel jobs final_job = queue.enqueue( 'apps.data.tasks.merge_results', depends_on=[job1, job2, job3] )

Scheduled Tasks

Cron-based Scheduling

schedules=[ # Every 5 minutes RQScheduleConfig( func="apps.monitoring.tasks.health_check", cron="*/5 * * * *", queue="high", ), # Daily at 2 AM RQScheduleConfig( func="apps.data.tasks.cleanup_old_records", cron="0 2 * * *", queue="low", days=30, # Delete records older than 30 days ), # Weekly on Sunday at midnight RQScheduleConfig( func="apps.reports.tasks.weekly_summary", cron="0 0 * * 0", queue="low", ), # Monthly on 1st at 3 AM RQScheduleConfig( func="apps.billing.tasks.generate_invoices", cron="0 3 1 * *", queue="default", ), ]

Interval-based Scheduling

schedules=[ # Every 30 seconds RQScheduleConfig( func="apps.monitoring.tasks.check_workers", interval=30, queue="high", ), # Every hour RQScheduleConfig( func="apps.cache.tasks.warm_cache", interval=3600, queue="default", ), # Every 6 hours RQScheduleConfig( func="apps.data.tasks.sync_external_data", interval=21600, queue="low", ), ]

Best Practices

Error Handling

def robust_task(item_id: int) -> dict: """Task with comprehensive error handling.""" try: # Get item try: item = MyModel.objects.get(pk=item_id) except MyModel.DoesNotExist: logger.error(f"Item {item_id} not found") return { "success": False, "error": "Item not found", "item_id": item_id, } # Process item try: result = process_item(item) except ValidationError as e: logger.error(f"Validation failed for item {item_id}: {e}") return { "success": False, "error": "Validation failed", "details": str(e), "item_id": item_id, } # Save result try: save_result(result) except Exception as e: logger.error(f"Failed to save result for item {item_id}: {e}") return { "success": False, "error": "Save failed", "details": str(e), "item_id": item_id, } return { "success": True, "item_id": item_id, "result": result, } except Exception as e: # Catch-all for unexpected errors logger.exception(f"Unexpected error in robust_task for item {item_id}") return { "success": False, "error": "Unexpected error", "details": str(e), "item_id": item_id, }

Testing Tasks

# tests/test_tasks.py from django.test import TestCase from apps.crypto.tasks import update_coin_prices from apps.crypto.models import Coin class TaskTests(TestCase): def setUp(self): """Create test data.""" self.coin = Coin.objects.create( symbol="BTC", name="Bitcoin", is_active=True, current_price_usd=50000, ) def test_update_coin_prices(self): """Test update_coin_prices task.""" # Call task directly (synchronous) result = update_coin_prices(limit=10, verbosity=0) # Assert results self.assertTrue(result['success']) self.assertEqual(result['updated'], 1) self.assertEqual(result['failed'], 0) # Check coin was updated self.coin.refresh_from_db() self.assertGreater(self.coin.updated_at, self.coin.created_at) def test_update_coin_prices_no_coins(self): """Test update with no active coins.""" Coin.objects.all().delete() result = update_coin_prices(limit=10) self.assertTrue(result['success']) self.assertEqual(result['updated'], 0) self.assertIn("No active coins", result['message'])

Performance Tips

# ✅ Good: Bulk operations def process_items_bulk(item_ids: list) -> dict: """Process multiple items in single query.""" items = MyModel.objects.filter(id__in=item_ids) # Process all items for item in items: item.status = "processed" # Bulk update MyModel.objects.bulk_update(items, ['status']) return {"success": True, "processed": len(items)} # ❌ Bad: Individual queries def process_items_slow(item_ids: list) -> dict: """Process items one by one (slow).""" for item_id in item_ids: item = MyModel.objects.get(pk=item_id) # N queries! item.status = "processed" item.save() return {"success": True, "processed": len(item_ids)} # ✅ Good: Select related def process_with_relations(order_id: int) -> dict: """Process order with relations efficiently.""" order = Order.objects.select_related('user', 'shipping').prefetch_related('items').get(pk=order_id) # All data loaded in 1-2 queries user = order.user items = order.items.all() shipping = order.shipping return {"success": True}

See Also

Documentation

Reference