Async gRPC Server
High-concurrency async gRPC server with full streaming support.
Overview
Django-CFG gRPC server uses grpc.aio for async operations, enabling:
- 1000+ concurrent connections
- Server-side, client-side, and bidirectional streaming
- Non-blocking I/O operations
- Full Django ORM integration via async wrappers
┌─────────────────────────────────────────┐
│ python manage.py rungrpc │
│ │
│ ┌─────────────────────────────────┐ │
│ │ grpc.aio.server() [ASYNC] │ │
│ │ └─ asyncio event loop │ │
│ │ ├─ API Key Auth (async) │ │
│ │ ├─ Logging (async) │ │
│ │ ├─ Streaming Services │ │
│ │ └─ Unary Services │ │
│ └─────────────────────────────────┘ │
└─────────────────────────────────────────┘Streaming Types
Unary (Request-Response)
Traditional single request, single response:
class UserService(BaseService):
async def GetUser(self, request, context):
user = await User.objects.aget(id=request.user_id)
return UserResponse(id=user.id, name=user.name)Server-Side Streaming
Server sends multiple responses to one request:
class EventService(BaseService):
async def StreamEvents(self, request, context):
"""Stream events to client."""
async for event in Event.objects.filter(
user_id=request.user_id
).aiterator():
yield EventResponse(
id=event.id,
type=event.type,
data=event.data,
)
await asyncio.sleep(0) # Yield to event loopClient-Side Streaming
Client sends multiple requests, server responds once:
class LogService(BaseService):
async def UploadLogs(self, request_iterator, context):
"""Receive stream of logs from client."""
count = 0
async for log_entry in request_iterator:
await Log.objects.acreate(
message=log_entry.message,
level=log_entry.level,
)
count += 1
return UploadResponse(logs_received=count)Bidirectional Streaming
Both client and server stream simultaneously:
class ChatService(BaseService):
async def Chat(self, request_iterator, context):
"""Bidirectional chat stream."""
async for message in request_iterator:
# Process incoming message
await Message.objects.acreate(
text=message.text,
user_id=message.user_id,
)
# Send response
yield ChatResponse(
text=f"Received: {message.text}",
timestamp=int(time.time()),
)For production bidirectional streaming, use BidirectionalStreamingService which provides ping/keepalive, connection management, and proper error handling. See Streaming Patterns.
BidirectionalStreamingService
For complex bidirectional streaming scenarios, use the built-in BidirectionalStreamingService:
from django_cfg.apps.integrations.grpc.services.streaming import (
BidirectionalStreamingService,
BidirectionalStreamingConfig,
StreamingMode,
PingStrategy,
)
class BotService(BidirectionalStreamingService):
config = BidirectionalStreamingConfig(
streaming_mode=StreamingMode.ANEXT,
ping_strategy=PingStrategy.INTERVAL,
ping_interval=5.0,
ping_timeout=180.0,
connection_timeout=None, # CRITICAL: Must be None!
)
async def on_client_connected(self, client_id: str, context):
logger.info(f"Client {client_id} connected")
async def on_message_received(self, client_id: str, message, context):
# Handle message based on type
return await self.dispatch_message(message)See Streaming Patterns for complete documentation.
Django ORM with Async
Using Django Async ORM (Django 4.1+)
# Async ORM methods
user = await User.objects.aget(id=1)
users = [user async for user in User.objects.filter(active=True).aiterator()]
await User.objects.acreate(name="John")
await user.asave()
await user.adelete()
# Async related objects
profile = await user.profile.aget()
orders = [order async for order in user.orders.all().aiterator()]Using asyncio.to_thread for Sync Operations
For operations not yet supported by async ORM:
import asyncio
# Wrap sync ORM in to_thread
count = await asyncio.to_thread(User.objects.count)
exists = await asyncio.to_thread(User.objects.filter(email=email).exists)
# Complex queries
result = await asyncio.to_thread(
lambda: User.objects.select_related('profile')
.prefetch_related('orders')
.get(id=user_id)
)Configuration
Server Configuration
from django_cfg import DjangoConfig, GRPCConfig, GRPCServerConfig
class MyConfig(DjangoConfig):
grpc = GRPCConfig(
enabled=True,
server=GRPCServerConfig(
host="[::]",
port=50051,
# Async options
max_concurrent_streams=1000,
asyncio_debug=False,
# Standard options
enable_reflection=True,
enable_health_check=True,
),
)Streaming Configuration
from django_cfg.apps.integrations.grpc.services.streaming.config import (
BidirectionalStreamingConfig,
StreamingMode,
PingStrategy,
)
config = BidirectionalStreamingConfig(
streaming_mode=StreamingMode.ANEXT,
ping_strategy=PingStrategy.INTERVAL,
ping_interval=5.0,
ping_timeout=180.0,
connection_timeout=None, # CRITICAL!
max_queue_size=1000,
enable_sleep_zero=True,
enable_logging=True,
)Performance
Concurrency Comparison
| Metric | Sync Server | Async Server |
|---|---|---|
| Max concurrent requests | ~10 | 1000+ |
| Memory per request | 2-4 MB | 50-100 KB |
| Streaming support | Limited | Full |
| I/O-bound workloads | Slow | Fast |
When to Use Async
Use async for:
- High-concurrency scenarios (100+ concurrent clients)
- Streaming (server-side, client-side, bidirectional)
- Long-running connections (bots, real-time updates)
- I/O-bound operations (external APIs, databases)
Sync is fine for:
- Low-medium concurrency (under 50 clients)
- Simple CRUD operations
- CPU-bound workloads
Best Practices
1. Yield to Event Loop
In tight loops, yield control to prevent blocking:
async def StreamData(self, request, context):
for item in large_dataset:
yield ItemResponse(data=item)
await asyncio.sleep(0) # Yield to event loop2. Handle Cancellation
Check if stream was cancelled:
async def StreamEvents(self, request, context):
while not context.cancelled():
events = await self.get_new_events()
for event in events:
yield EventResponse(event=event)
await asyncio.sleep(1)3. Use Async Context Managers
async def ProcessStream(self, request_iterator, context):
async with self.get_async_connection() as conn:
async for request in request_iterator:
await conn.process(request)4. Batch Database Operations
async def UploadItems(self, request_iterator, context):
batch = []
async for item in request_iterator:
batch.append(Item(data=item.data))
if len(batch) >= 100:
await Item.objects.abulk_create(batch)
batch = []
if batch:
await Item.objects.abulk_create(batch)Running the Server
# Development
python manage.py rungrpc
# Production
python manage.py rungrpc --host 0.0.0.0 --port 50051
# With debug mode
python manage.py rungrpc --asyncio-debugDebugging
Enable Asyncio Debug
GRPCServerConfig(
asyncio_debug=True, # Shows async warnings
)Or via command line:
python manage.py rungrpc --asyncio-debugCommon Issues
| Issue | Solution |
|---|---|
RuntimeError: no running event loop | Make method async def |
SynchronousOnlyOperation | Use await asyncio.to_thread() or async ORM |
| Stream closes unexpectedly | See Troubleshooting |
| Memory usage growing | Check queue sizes and cleanup |
Related Documentation
- Streaming Patterns - Production streaming patterns
- Troubleshooting - Common issues and solutions
- Configuration - Full configuration options
- Architecture - System architecture