Skip to Content

Async gRPC Server

High-concurrency async gRPC server with full streaming support.

Overview

Django-CFG gRPC server uses grpc.aio for async operations, enabling:

  • 1000+ concurrent connections
  • Server-side, client-side, and bidirectional streaming
  • Non-blocking I/O operations
  • Full Django ORM integration via async wrappers
┌─────────────────────────────────────────┐ │ python manage.py rungrpc │ │ │ │ ┌─────────────────────────────────┐ │ │ │ grpc.aio.server() [ASYNC] │ │ │ │ └─ asyncio event loop │ │ │ │ ├─ API Key Auth (async) │ │ │ │ ├─ Logging (async) │ │ │ │ ├─ Streaming Services │ │ │ │ └─ Unary Services │ │ │ └─────────────────────────────────┘ │ └─────────────────────────────────────────┘

Streaming Types

Unary (Request-Response)

Traditional single request, single response:

class UserService(BaseService): async def GetUser(self, request, context): user = await User.objects.aget(id=request.user_id) return UserResponse(id=user.id, name=user.name)

Server-Side Streaming

Server sends multiple responses to one request:

class EventService(BaseService): async def StreamEvents(self, request, context): """Stream events to client.""" async for event in Event.objects.filter( user_id=request.user_id ).aiterator(): yield EventResponse( id=event.id, type=event.type, data=event.data, ) await asyncio.sleep(0) # Yield to event loop

Client-Side Streaming

Client sends multiple requests, server responds once:

class LogService(BaseService): async def UploadLogs(self, request_iterator, context): """Receive stream of logs from client.""" count = 0 async for log_entry in request_iterator: await Log.objects.acreate( message=log_entry.message, level=log_entry.level, ) count += 1 return UploadResponse(logs_received=count)

Bidirectional Streaming

Both client and server stream simultaneously:

class ChatService(BaseService): async def Chat(self, request_iterator, context): """Bidirectional chat stream.""" async for message in request_iterator: # Process incoming message await Message.objects.acreate( text=message.text, user_id=message.user_id, ) # Send response yield ChatResponse( text=f"Received: {message.text}", timestamp=int(time.time()), )

For production bidirectional streaming, use BidirectionalStreamingService which provides ping/keepalive, connection management, and proper error handling. See Streaming Patterns.

BidirectionalStreamingService

For complex bidirectional streaming scenarios, use the built-in BidirectionalStreamingService:

from django_cfg.apps.integrations.grpc.services.streaming import ( BidirectionalStreamingService, BidirectionalStreamingConfig, StreamingMode, PingStrategy, ) class BotService(BidirectionalStreamingService): config = BidirectionalStreamingConfig( streaming_mode=StreamingMode.ANEXT, ping_strategy=PingStrategy.INTERVAL, ping_interval=5.0, ping_timeout=180.0, connection_timeout=None, # CRITICAL: Must be None! ) async def on_client_connected(self, client_id: str, context): logger.info(f"Client {client_id} connected") async def on_message_received(self, client_id: str, message, context): # Handle message based on type return await self.dispatch_message(message)

See Streaming Patterns for complete documentation.

Django ORM with Async

Using Django Async ORM (Django 4.1+)

# Async ORM methods user = await User.objects.aget(id=1) users = [user async for user in User.objects.filter(active=True).aiterator()] await User.objects.acreate(name="John") await user.asave() await user.adelete() # Async related objects profile = await user.profile.aget() orders = [order async for order in user.orders.all().aiterator()]

Using asyncio.to_thread for Sync Operations

For operations not yet supported by async ORM:

import asyncio # Wrap sync ORM in to_thread count = await asyncio.to_thread(User.objects.count) exists = await asyncio.to_thread(User.objects.filter(email=email).exists) # Complex queries result = await asyncio.to_thread( lambda: User.objects.select_related('profile') .prefetch_related('orders') .get(id=user_id) )

Configuration

Server Configuration

from django_cfg import DjangoConfig, GRPCConfig, GRPCServerConfig class MyConfig(DjangoConfig): grpc = GRPCConfig( enabled=True, server=GRPCServerConfig( host="[::]", port=50051, # Async options max_concurrent_streams=1000, asyncio_debug=False, # Standard options enable_reflection=True, enable_health_check=True, ), )

Streaming Configuration

from django_cfg.apps.integrations.grpc.services.streaming.config import ( BidirectionalStreamingConfig, StreamingMode, PingStrategy, ) config = BidirectionalStreamingConfig( streaming_mode=StreamingMode.ANEXT, ping_strategy=PingStrategy.INTERVAL, ping_interval=5.0, ping_timeout=180.0, connection_timeout=None, # CRITICAL! max_queue_size=1000, enable_sleep_zero=True, enable_logging=True, )

Performance

Concurrency Comparison

MetricSync ServerAsync Server
Max concurrent requests~101000+
Memory per request2-4 MB50-100 KB
Streaming supportLimitedFull
I/O-bound workloadsSlowFast

When to Use Async

Use async for:

  • High-concurrency scenarios (100+ concurrent clients)
  • Streaming (server-side, client-side, bidirectional)
  • Long-running connections (bots, real-time updates)
  • I/O-bound operations (external APIs, databases)

Sync is fine for:

  • Low-medium concurrency (under 50 clients)
  • Simple CRUD operations
  • CPU-bound workloads

Best Practices

1. Yield to Event Loop

In tight loops, yield control to prevent blocking:

async def StreamData(self, request, context): for item in large_dataset: yield ItemResponse(data=item) await asyncio.sleep(0) # Yield to event loop

2. Handle Cancellation

Check if stream was cancelled:

async def StreamEvents(self, request, context): while not context.cancelled(): events = await self.get_new_events() for event in events: yield EventResponse(event=event) await asyncio.sleep(1)

3. Use Async Context Managers

async def ProcessStream(self, request_iterator, context): async with self.get_async_connection() as conn: async for request in request_iterator: await conn.process(request)

4. Batch Database Operations

async def UploadItems(self, request_iterator, context): batch = [] async for item in request_iterator: batch.append(Item(data=item.data)) if len(batch) >= 100: await Item.objects.abulk_create(batch) batch = [] if batch: await Item.objects.abulk_create(batch)

Running the Server

# Development python manage.py rungrpc # Production python manage.py rungrpc --host 0.0.0.0 --port 50051 # With debug mode python manage.py rungrpc --asyncio-debug

Debugging

Enable Asyncio Debug

GRPCServerConfig( asyncio_debug=True, # Shows async warnings )

Or via command line:

python manage.py rungrpc --asyncio-debug

Common Issues

IssueSolution
RuntimeError: no running event loopMake method async def
SynchronousOnlyOperationUse await asyncio.to_thread() or async ORM
Stream closes unexpectedlySee Troubleshooting
Memory usage growingCheck queue sizes and cleanup