Skip to Content

Backend Developer Guide

Learn how to create type-safe RPC handlers using Pydantic models and the @websocket_rpc decorator.

Creating Your First Handler

1. Define Pydantic Models

Create request and response models using Pydantic v2:

# core/centrifugo_handlers.py from pydantic import BaseModel, Field from typing import List, Optional class TaskStatsParams(BaseModel): """Request parameters for task statistics.""" user_id: str = Field(..., description="User ID to fetch stats for") include_completed: bool = Field(True, description="Include completed tasks") class TaskStatsResult(BaseModel): """Task statistics response.""" total: int = Field(..., description="Total number of tasks") completed: int = Field(..., description="Number of completed tasks") pending: int = Field(..., description="Number of pending tasks") user_id: str = Field(..., description="User ID")

2. Create RPC Handler

Use the @websocket_rpc decorator to register your handler:

from django_cfg.apps.integrations.centrifugo.decorators import websocket_rpc @websocket_rpc("tasks.get_stats") async def get_task_stats(conn, params: TaskStatsParams) -> TaskStatsResult: """ Get task statistics for a user. This handler retrieves comprehensive task statistics including total, completed, and pending task counts. """ from apps.tasks.models import Task # Use params as Pydantic model user_id = params.user_id # Query database total = Task.objects.filter(user_id=user_id).count() completed = Task.objects.filter(user_id=user_id, status='completed').count() pending = total - completed # Return Pydantic model return TaskStatsResult( total=total, completed=completed, pending=pending, user_id=user_id )

3. Register Handlers

Import handlers in your AppConfig.ready():

# core/apps.py from django.apps import AppConfig class CoreConfig(AppConfig): name = "core" default_auto_field = "django.db.models.BigAutoField" def ready(self): """Import handlers to register them with the router.""" from . import centrifugo_handlers

Handler Best Practices

Use Type Hints

Type hints are required - they drive code generation:

# ✅ GOOD - Pydantic models with type hints @websocket_rpc("user.get") async def get_user(conn, params: GetUserParams) -> UserResult: ... # ❌ BAD - No type hints @websocket_rpc("user.get") async def get_user(conn, params): ...

Add Field Descriptions

Descriptions appear in generated client documentation:

class UserParams(BaseModel): user_id: str = Field(..., description="Unique user identifier") include_profile: bool = Field( True, description="Include full profile data in response" )

Generates TypeScript documentation:

interface UserParams { /** Unique user identifier */ user_id: string; /** Include full profile data in response */ include_profile?: boolean; }

Write Handler Docstrings

Docstrings appear in README files:

@websocket_rpc("tasks.create") async def create_task(conn, params: CreateTaskParams) -> TaskResult: """ Create a new task. Creates a task for the specified user with the given title and description. Automatically sets created_at timestamp and initializes status to 'pending'. Returns the created task with its generated ID. """ ...

Use Async Handlers

All handlers must be async:

# ✅ GOOD @websocket_rpc("tasks.list") async def list_tasks(conn, params: ListParams) -> ListResult: tasks = await Task.objects.filter(user_id=params.user_id).alist() return ListResult(tasks=[...]) # ❌ BAD - sync handler @websocket_rpc("tasks.list") def list_tasks(conn, params: ListParams) -> ListResult: ...

Naming Conventions

Method Names

Use dot notation for namespacing:

@websocket_rpc("tasks.list") # List tasks @websocket_rpc("tasks.create") # Create task @websocket_rpc("tasks.update") # Update task @websocket_rpc("tasks.delete") # Delete task @websocket_rpc("users.get") # Get user @websocket_rpc("users.profile") # Get user profile @websocket_rpc("system.health") # System health check @websocket_rpc("system.stats") # System statistics

This generates organized client methods:

# Python api.tasks_list(...) api.tasks_create(...) api.users_get(...) api.system_health(...)
// TypeScript api.tasksList(...) api.tasksCreate(...) api.usersGet(...) api.systemHealth(...)
// Go api.TasksList(ctx, ...) api.TasksCreate(ctx, ...) api.UsersGet(ctx, ...) api.SystemHealth(ctx, ...)

Model Names

Follow these conventions:

# Request models: <Action><Entity>Params class CreateTaskParams(BaseModel): ... class UpdateUserParams(BaseModel): ... class ListTasksParams(BaseModel): ... # Response models: <Entity>Result or <Action><Entity>Result class TaskResult(BaseModel): ... class UserResult(BaseModel): ... class TaskListResult(BaseModel): ...

Advanced Patterns

Optional Parameters

Use Optional for optional fields:

from typing import Optional class SearchParams(BaseModel): query: str = Field(..., description="Search query") limit: Optional[int] = Field(None, description="Max results") offset: Optional[int] = Field(None, description="Result offset")

Generates:

interface SearchParams { query: string; limit?: number | null; offset?: number | null; }

Lists and Nested Models

Use List for arrays and nest models:

from typing import List class Tag(BaseModel): name: str color: str class Task(BaseModel): id: int title: str tags: List[Tag] class TaskListResult(BaseModel): tasks: List[Task] total: int

Generates:

interface Tag { name: string; color: string; } interface Task { id: number; title: string; tags: Tag[]; } interface TaskListResult { tasks: Task[]; total: number; }

Using Connection Object

The conn parameter provides access to connection metadata:

@websocket_rpc("user.profile") async def get_profile(conn, params: ProfileParams) -> ProfileResult: # Access user from connection user_id = conn.user_id # From JWT token # Use in business logic profile = await UserProfile.objects.get(user_id=user_id) return ProfileResult( username=profile.username, email=profile.email )

Error Handling

Raise exceptions for error cases:

from django.core.exceptions import ObjectDoesNotExist @websocket_rpc("tasks.get") async def get_task(conn, params: GetTaskParams) -> TaskResult: try: task = await Task.objects.aget(id=params.task_id) return TaskResult(...) except ObjectDoesNotExist: raise ValueError(f"Task {params.task_id} not found")

Clients receive error responses:

try: result = await api.tasks_get(GetTaskParams(task_id=999)) except Exception as e: print(f"Error: {e}") # "Task 999 not found"

Database Queries

Use Django ORM async methods:

@websocket_rpc("tasks.list") async def list_tasks(conn, params: ListParams) -> ListResult: # Async query tasks = await Task.objects.filter( user_id=params.user_id ).order_by('-created_at').alist() # Transform to Pydantic task_list = [ TaskItem(id=t.id, title=t.title, status=t.status) for t in tasks ] return ListResult(tasks=task_list, total=len(task_list))

Testing Handlers

Unit Tests

Test handlers directly:

# core/tests/test_centrifugo_handlers.py import pytest from core.centrifugo_handlers import get_task_stats, TaskStatsParams @pytest.mark.asyncio async def test_get_task_stats(): # Create test data user = await User.objects.acreate(username="test") await Task.objects.acreate(user=user, status="completed") await Task.objects.acreate(user=user, status="pending") # Call handler result = await get_task_stats( conn=None, # Mock connection params=TaskStatsParams(user_id=user.id) ) # Assert results assert result.total == 2 assert result.completed == 1 assert result.pending == 1

Integration Tests

Test with generated clients:

@pytest.mark.asyncio async def test_rpc_client_integration(): from opensdk.python import CentrifugoRPCClient, APIClient # Setup client rpc = CentrifugoRPCClient( url=settings.CENTRIFUGO_URL, token=generate_test_token(), user_id="test-user" ) await rpc.connect() api = APIClient(rpc) # Call RPC method result = await api.tasks_get_stats(TaskStatsParams(user_id="test-user")) # Verify assert result.total >= 0 assert result.completed >= 0

Complete Example

Here’s a complete CRUD example for a Todo app:

# core/centrifugo_handlers.py from pydantic import BaseModel, Field from typing import List, Optional from django_cfg.apps.integrations.centrifugo.decorators import websocket_rpc from apps.todos.models import Todo # ======================================== # Models # ======================================== class TodoItem(BaseModel): id: int title: str completed: bool created_at: str class CreateTodoParams(BaseModel): title: str = Field(..., description="Todo title") class UpdateTodoParams(BaseModel): id: int = Field(..., description="Todo ID") title: Optional[str] = Field(None, description="New title") completed: Optional[bool] = Field(None, description="Completion status") class DeleteTodoParams(BaseModel): id: int = Field(..., description="Todo ID to delete") class ListTodosParams(BaseModel): completed: Optional[bool] = Field(None, description="Filter by completion") limit: Optional[int] = Field(10, description="Max results") class TodoResult(BaseModel): todo: TodoItem class TodoListResult(BaseModel): todos: List[TodoItem] total: int class SuccessResult(BaseModel): success: bool message: str # ======================================== # Handlers # ======================================== @websocket_rpc("todos.create") async def create_todo(conn, params: CreateTodoParams) -> TodoResult: """Create a new todo item.""" todo = await Todo.objects.acreate( user_id=conn.user_id, title=params.title, completed=False ) return TodoResult( todo=TodoItem( id=todo.id, title=todo.title, completed=todo.completed, created_at=todo.created_at.isoformat() ) ) @websocket_rpc("todos.update") async def update_todo(conn, params: UpdateTodoParams) -> TodoResult: """Update an existing todo item.""" todo = await Todo.objects.aget(id=params.id, user_id=conn.user_id) if params.title is not None: todo.title = params.title if params.completed is not None: todo.completed = params.completed await todo.asave() return TodoResult( todo=TodoItem( id=todo.id, title=todo.title, completed=todo.completed, created_at=todo.created_at.isoformat() ) ) @websocket_rpc("todos.delete") async def delete_todo(conn, params: DeleteTodoParams) -> SuccessResult: """Delete a todo item.""" await Todo.objects.filter(id=params.id, user_id=conn.user_id).adelete() return SuccessResult( success=True, message=f"Todo {params.id} deleted" ) @websocket_rpc("todos.list") async def list_todos(conn, params: ListTodosParams) -> TodoListResult: """List todos with optional filtering.""" query = Todo.objects.filter(user_id=conn.user_id) if params.completed is not None: query = query.filter(completed=params.completed) query = query.order_by('-created_at')[:params.limit] todos = await query.alist() todo_items = [ TodoItem( id=t.id, title=t.title, completed=t.completed, created_at=t.created_at.isoformat() ) for t in todos ] return TodoListResult( todos=todo_items, total=len(todo_items) )

Next Steps


Handler Checklist Before generating clients:

  • ✅ All handlers use Pydantic models
  • ✅ Type hints on all parameters and returns
  • ✅ Descriptive field descriptions
  • ✅ Handler docstrings written
  • ✅ Handlers are async
  • ✅ Handlers registered in apps.py