WebSocket Transport
The WebSocket transport enables real-time bidirectional communication between browser clients and servers.
Installation
Section titled “Installation”bun add @pubsubjs/transport-websocketServer Transport
Section titled “Server Transport”Basic Setup
Section titled “Basic Setup”import { WebSocketServerTransport } from "@pubsubjs/transport-websocket";import { Publisher, Subscriber } from "@pubsubjs/core";import { events } from "./events";
const transport = new WebSocketServerTransport({ port: 8080,});
const publisher = new Publisher({ events, transport });const subscriber = new Subscriber({ events, transport });
subscriber.on("message.sent", (payload) => { console.log(`New message: ${payload.content}`);});
await subscriber.subscribe();console.log("WebSocket server running on ws://localhost:8080");Server Options
Section titled “Server Options”const transport = new WebSocketServerTransport({ // Port to listen on port: 8080,
// Host to bind to (default: "0.0.0.0") host: "localhost",
// Path for WebSocket connections (default: "/") path: "/ws",
// Maximum payload size in bytes maxPayloadLength: 16 * 1024 * 1024, // 16MB
// Idle timeout in seconds idleTimeout: 120,
// Enable compression compression: true,});With Bun.serve
Section titled “With Bun.serve”Integrate with Bun’s HTTP server:
import { WebSocketServerTransport } from "@pubsubjs/transport-websocket";
const transport = new WebSocketServerTransport();
Bun.serve({ port: 3000, routes: { "/": new Response("Hello World"), "/api/status": new Response(JSON.stringify({ status: "ok" })), }, websocket: transport.websocketHandler, fetch(req, server) { // Upgrade WebSocket connections if (req.url.endsWith("/ws")) { const upgraded = server.upgrade(req); if (upgraded) return undefined; } return new Response("Not Found", { status: 404 }); },});Broadcasting
Section titled “Broadcasting”Send messages to all connected clients:
await publisher.publish("notification", { message: "Server maintenance in 5 minutes",});Targeting Specific Clients
Section titled “Targeting Specific Clients”Send messages to specific connections:
await publisher.publish("private.message", payload, { targetIds: ["connection-id-1", "connection-id-2"],});Connection Events
Section titled “Connection Events”transport.on("connection", (connectionId) => { console.log(`Client connected: ${connectionId}`);});
transport.on("disconnection", (connectionId) => { console.log(`Client disconnected: ${connectionId}`);});Client Transport
Section titled “Client Transport”Basic Setup
Section titled “Basic Setup”import { WebSocketClientTransport } from "@pubsubjs/transport-websocket";import { Publisher, Subscriber } from "@pubsubjs/core";import { events } from "./events";
const transport = new WebSocketClientTransport({ url: "ws://localhost:8080",});
const publisher = new Publisher({ events, transport });const subscriber = new Subscriber({ events, transport });
subscriber.on("notification", (payload) => { console.log(`Notification: ${payload.message}`);});
await subscriber.subscribe();Client Options
Section titled “Client Options”const transport = new WebSocketClientTransport({ // WebSocket server URL url: "wss://api.example.com/ws",
// Protocols (optional) protocols: ["v1"],
// Reconnection settings reconnect: true, reconnectInterval: 1000, maxReconnectAttempts: 10,});React Integration
Section titled “React Integration”import { useEffect, useState } from "react";import { WebSocketClientTransport } from "@pubsubjs/transport-websocket";import { Subscriber } from "@pubsubjs/core";
function useWebSocketSubscriber() { const [messages, setMessages] = useState([]);
useEffect(() => { const transport = new WebSocketClientTransport({ url: "ws://localhost:8080", });
const subscriber = new Subscriber({ events, transport });
subscriber.on("message.received", (payload) => { setMessages((prev) => [...prev, payload]); });
subscriber.subscribe();
return () => { subscriber.unsubscribe(); }; }, []);
return messages;}Authentication
Section titled “Authentication”Server-Side Auth
Section titled “Server-Side Auth”const transport = new WebSocketServerTransport({ port: 8080, verifyClient: async (req) => { const token = req.headers.get("authorization"); if (!token) return false;
try { const user = await verifyToken(token); return { userId: user.id }; // Attach data to connection } catch { return false; } },});Client-Side Auth
Section titled “Client-Side Auth”const transport = new WebSocketClientTransport({ url: "ws://localhost:8080", headers: { Authorization: `Bearer ${token}`, },});Heartbeat / Ping-Pong
Section titled “Heartbeat / Ping-Pong”Keep connections alive:
const transport = new WebSocketServerTransport({ port: 8080, pingInterval: 30000, // Send ping every 30 seconds pongTimeout: 5000, // Close if no pong within 5 seconds});Scaling
Section titled “Scaling”Multiple Servers
Section titled “Multiple Servers”For horizontal scaling, use Redis as a message broker:
import { WebSocketServerTransport } from "@pubsubjs/transport-websocket";import { RedisTransport } from "@pubsubjs/transport-redis";
// Each server has its own WebSocket transportconst wsTransport = new WebSocketServerTransport({ port: 8080 });
// All servers share Redis for message routingconst redisTransport = new RedisTransport({ url: "redis://localhost:6379" });
// Subscribe to Redis, publish to WebSocket clientsconst subscriber = new Subscriber({ events, transport: redisTransport,});
const publisher = new Publisher({ events, transport: wsTransport,});
subscriber.on("broadcast", async (payload) => { await publisher.publish("broadcast", payload);});Error Handling
Section titled “Error Handling”transport.on("error", (error) => { console.error("WebSocket error:", error);});
transport.on("disconnected", () => { console.log("Connection lost, attempting to reconnect...");});Next Steps
Section titled “Next Steps”- Redis Transport - For distributed systems
- SSE Transport - Server-to-client streaming
- React Integration - Use with React