Skip to Content

API Reference

Complete API documentation for Centrifugo WebSocket RPC integration.

Python API

Decorator

@websocket_rpc(method_name: str)

Registers an async function as an RPC handler.

Parameters:

  • method_name (str): RPC method name (e.g., "tasks.get_stats")

Example:

@websocket_rpc("tasks.get_stats") async def get_task_stats(conn, params: TaskStatsParams) -> TaskStatsResult: return TaskStatsResult(total=100)

Handler Signature:

async def handler_name( conn: ConnectionContext, params: ParamsModel ) -> ResultModel: ...

Client Classes

CentrifugoRPCClient

Base RPC client handling WebSocket and correlation IDs.

Constructor:

CentrifugoRPCClient( url: str, token: str, user_id: str )

Methods:

async connect() -> None

Establishes WebSocket connection and subscribes to user channel.

Example:

await rpc.connect()
async disconnect() -> None

Closes WebSocket connection.

Example:

await rpc.disconnect()
async call(method: str, params: dict) -> dict

Calls an RPC method with correlation ID pattern.

Parameters:

  • method (str): RPC method name
  • params (dict): Request parameters

Returns:

  • dict: Response data

Example:

result = await rpc.call("tasks.get_stats", {"user_id": "123"})

APIClient

Type-safe wrapper with generated methods.

Constructor:

APIClient(rpc_client: CentrifugoRPCClient)

Generated Methods:

Each RPC handler generates a typed method:

async def <method_name>( self, params: <ParamsModel> ) -> <ResultModel>

Example:

api = APIClient(rpc) result = await api.tasks_get_stats(TaskStatsParams(user_id="123"))

Configuration

DjangoCfgCentrifugoConfig

Configuration model for Centrifugo integration.

Parameters:

  • enabled (bool): Enable Centrifugo integration
  • wrapper_url (str): Django wrapper service URL
  • wrapper_api_key (str): Wrapper API key
  • centrifugo_url (str): Centrifugo WebSocket URL
  • centrifugo_api_url (str): Centrifugo HTTP API URL
  • centrifugo_api_key (str): Centrifugo API key
  • centrifugo_token_hmac_secret (str): HMAC secret for JWT
  • ack_timeout (int, optional): RPC timeout in seconds (default: 30)
  • log_level (str, optional): Logging level (default: “INFO”)
  • log_all_calls (bool, optional): Log all RPC calls (default: True)
  • log_only_with_ack (bool, optional): Log only ack calls (default: False)

Example:

centrifugo: DjangoCfgCentrifugoConfig = DjangoCfgCentrifugoConfig( enabled=True, wrapper_url="http://localhost:8001", wrapper_api_key="secret-key", centrifugo_url="ws://localhost:8000/connection/websocket", centrifugo_api_url="http://localhost:8000/api", centrifugo_api_key="api-key", centrifugo_token_hmac_secret="hmac-secret", ack_timeout=60, log_level="DEBUG" )

TypeScript API

Client Classes

CentrifugoRPCClient

Base RPC client.

Constructor:

constructor( url: string, token: string, userId: string )

Methods:

async connect(): Promise<void>

Connects to Centrifugo.

Example:

await rpc.connect();
disconnect(): void

Disconnects from Centrifugo.

Example:

rpc.disconnect();
async call(method: string, params: any): Promise<any>

Calls RPC method.

Parameters:

  • method (string): RPC method name
  • params (any): Request parameters

Returns:

  • Promise<any>: Response data

Example:

const result = await rpc.call('tasks.get_stats', { user_id: '123' });

APIClient

Type-safe API wrapper.

Constructor:

constructor(rpc: CentrifugoRPCClient)

Generated Methods:

async <methodName>(params: <ParamsType>): Promise<ResultType>

Example:

const api = new APIClient(rpc); const result = await api.tasksGetStats({ user_id: '123' });

Type Interfaces

Generated TypeScript interfaces from Pydantic models:

interface TaskStatsParams { user_id: string; include_completed?: boolean; } interface TaskStatsResult { total: number; completed: number; pending: number; }

Go API

Client Struct

APIClient

Combined RPC and API client.

Constructor:

func NewAPIClient(url, token, userID string) *APIClient

Methods:

func (c *APIClient) Connect(ctx context.Context) error

Connects to Centrifugo.

Example:

err := client.Connect(ctx)
func (c *APIClient) Disconnect()

Disconnects from Centrifugo.

Example:

defer client.Disconnect()
func (c *APIClient) Call(ctx context.Context, method string, params interface{}) ([]byte, error)

Calls RPC method.

Parameters:

  • ctx (context.Context): Context for timeout/cancellation
  • method (string): RPC method name
  • params (interface): Request parameters

Returns:

  • []byte: Response JSON
  • error: Error if any

Example:

result, err := client.Call(ctx, "tasks.get_stats", params)

Generated Methods

Each handler generates a typed method:

func (api *APIClient) <MethodName>( ctx context.Context, params <ParamsStruct> ) (*<ResultStruct>, error)

Example:

result, err := api.TasksGetStats(ctx, TaskStatsParams{ UserId: "123", IncludeCompleted: true, })

Type Structs

Generated Go structs from Pydantic models:

type TaskStatsParams struct { UserId string `json:"user_id"` IncludeCompleted bool `json:"include_completed"` } type TaskStatsResult struct { Total int64 `json:"total"` Completed int64 `json:"completed"` Pending int64 `json:"pending"` }

Management Commands

generate_centrifugo_clients

Generates type-safe clients from RPC handlers.

Usage:

python manage.py generate_centrifugo_clients [OPTIONS]

Options:

  • --output DIR: Output directory (required)
  • --all: Generate all clients (Python, TypeScript, Go)
  • --python: Generate Python client
  • --typescript: Generate TypeScript client
  • --go: Generate Go client
  • --verbose: Verbose output

Examples:

# All clients python manage.py generate_centrifugo_clients --output ./opensdk --all # Specific languages python manage.py generate_centrifugo_clients --output ./opensdk --python --typescript # With verbose output python manage.py generate_centrifugo_clients --output ./opensdk --all --verbose

Error Handling

Python Exceptions

# Validation errors from pydantic import ValidationError try: params = TaskStatsParams(user_id=123) # Wrong type except ValidationError as e: print(e) # Connection errors try: await rpc.connect() except ConnectionError as e: print(f"Failed to connect: {e}") # RPC errors try: result = await api.tasks_get_stats(params) except ValueError as e: print(f"RPC error: {e}")

TypeScript Errors

// Connection errors try { await rpc.connect(); } catch (error) { console.error('Connection failed:', error); } // RPC errors try { const result = await api.tasksGetStats(params); } catch (error) { if (error.code) { console.error(`RPC error ${error.code}: ${error.message}`); } }

Go Errors

// Connection errors if err := client.Connect(ctx); err != nil { log.Fatalf("Connection failed: %v", err) } // RPC errors result, err := api.TasksGetStats(ctx, params) if err != nil { if rpcErr, ok := err.(*RPCError); ok { log.Printf("RPC error %d: %s", rpcErr.Code, rpcErr.Message) } else { log.Printf("Error: %v", err) } }

Environment Variables

Centrifugo Server (Docker container)

# Used by Centrifugo Docker container CENTRIFUGO_API_KEY=your-api-key-min-32-chars CENTRIFUGO_TOKEN_HMAC_SECRET=your-hmac-secret-min-32-chars CENTRIFUGO_ADMIN_PASSWORD=admin CENTRIFUGO_ADMIN_SECRET=admin-secret CENTRIFUGO_LOG_LEVEL=info CENTRIFUGO_RPC_PROXY_ENDPOINT=http://django:8000/centrifugo/rpc/

Django Client (pydantic-settings nested format)

# Used by Django application CENTRIFUGO__ENABLED=true CENTRIFUGO__CENTRIFUGO_URL=ws://localhost:8000/connection/websocket CENTRIFUGO__CENTRIFUGO_API_URL=http://centrifugo:8000/api CENTRIFUGO__CENTRIFUGO_API_KEY=your-api-key-min-32-chars CENTRIFUGO__CENTRIFUGO_TOKEN_HMAC_SECRET=your-hmac-secret-min-32-chars CENTRIFUGO__DEFAULT_ACK_TIMEOUT=30 CENTRIFUGO__LOG_LEVEL=INFO CENTRIFUGO__LOG_ALL_CALLS=true

Important: CENTRIFUGO_TOKEN_HMAC_SECRET (Centrifugo server) and CENTRIFUGO__CENTRIFUGO_TOKEN_HMAC_SECRET (Django) must have the same value for JWT token validation to work.


Next Steps