Skip to content

WebSocket Transport

The WebSocket transport enables real-time bidirectional communication between browser clients and servers.

Terminal window
bun add @pubsubjs/transport-websocket
import { WebSocketServerTransport } from "@pubsubjs/transport-websocket";
import { Publisher, Subscriber } from "@pubsubjs/core";
import { events } from "./events";
const transport = new WebSocketServerTransport({
port: 8080,
});
const publisher = new Publisher({ events, transport });
const subscriber = new Subscriber({ events, transport });
subscriber.on("message.sent", (payload) => {
console.log(`New message: ${payload.content}`);
});
await subscriber.subscribe();
console.log("WebSocket server running on ws://localhost:8080");
const transport = new WebSocketServerTransport({
// Port to listen on
port: 8080,
// Host to bind to (default: "0.0.0.0")
host: "localhost",
// Path for WebSocket connections (default: "/")
path: "/ws",
// Maximum payload size in bytes
maxPayloadLength: 16 * 1024 * 1024, // 16MB
// Idle timeout in seconds
idleTimeout: 120,
// Enable compression
compression: true,
});

Integrate with Bun’s HTTP server:

import { WebSocketServerTransport } from "@pubsubjs/transport-websocket";
const transport = new WebSocketServerTransport();
Bun.serve({
port: 3000,
routes: {
"/": new Response("Hello World"),
"/api/status": new Response(JSON.stringify({ status: "ok" })),
},
websocket: transport.websocketHandler,
fetch(req, server) {
// Upgrade WebSocket connections
if (req.url.endsWith("/ws")) {
const upgraded = server.upgrade(req);
if (upgraded) return undefined;
}
return new Response("Not Found", { status: 404 });
},
});

Send messages to all connected clients:

await publisher.publish("notification", {
message: "Server maintenance in 5 minutes",
});

Send messages to specific connections:

await publisher.publish("private.message", payload, {
targetIds: ["connection-id-1", "connection-id-2"],
});
transport.on("connection", (connectionId) => {
console.log(`Client connected: ${connectionId}`);
});
transport.on("disconnection", (connectionId) => {
console.log(`Client disconnected: ${connectionId}`);
});
import { WebSocketClientTransport } from "@pubsubjs/transport-websocket";
import { Publisher, Subscriber } from "@pubsubjs/core";
import { events } from "./events";
const transport = new WebSocketClientTransport({
url: "ws://localhost:8080",
});
const publisher = new Publisher({ events, transport });
const subscriber = new Subscriber({ events, transport });
subscriber.on("notification", (payload) => {
console.log(`Notification: ${payload.message}`);
});
await subscriber.subscribe();
const transport = new WebSocketClientTransport({
// WebSocket server URL
url: "wss://api.example.com/ws",
// Protocols (optional)
protocols: ["v1"],
// Reconnection settings
reconnect: true,
reconnectInterval: 1000,
maxReconnectAttempts: 10,
});
import { useEffect, useState } from "react";
import { WebSocketClientTransport } from "@pubsubjs/transport-websocket";
import { Subscriber } from "@pubsubjs/core";
function useWebSocketSubscriber() {
const [messages, setMessages] = useState([]);
useEffect(() => {
const transport = new WebSocketClientTransport({
url: "ws://localhost:8080",
});
const subscriber = new Subscriber({ events, transport });
subscriber.on("message.received", (payload) => {
setMessages((prev) => [...prev, payload]);
});
subscriber.subscribe();
return () => {
subscriber.unsubscribe();
};
}, []);
return messages;
}
const transport = new WebSocketServerTransport({
port: 8080,
verifyClient: async (req) => {
const token = req.headers.get("authorization");
if (!token) return false;
try {
const user = await verifyToken(token);
return { userId: user.id }; // Attach data to connection
} catch {
return false;
}
},
});
const transport = new WebSocketClientTransport({
url: "ws://localhost:8080",
headers: {
Authorization: `Bearer ${token}`,
},
});

Keep connections alive:

const transport = new WebSocketServerTransport({
port: 8080,
pingInterval: 30000, // Send ping every 30 seconds
pongTimeout: 5000, // Close if no pong within 5 seconds
});

For horizontal scaling, use Redis as a message broker:

import { WebSocketServerTransport } from "@pubsubjs/transport-websocket";
import { RedisTransport } from "@pubsubjs/transport-redis";
// Each server has its own WebSocket transport
const wsTransport = new WebSocketServerTransport({ port: 8080 });
// All servers share Redis for message routing
const redisTransport = new RedisTransport({ url: "redis://localhost:6379" });
// Subscribe to Redis, publish to WebSocket clients
const subscriber = new Subscriber({
events,
transport: redisTransport,
});
const publisher = new Publisher({
events,
transport: wsTransport,
});
subscriber.on("broadcast", async (payload) => {
await publisher.publish("broadcast", payload);
});
transport.on("error", (error) => {
console.error("WebSocket error:", error);
});
transport.on("disconnected", () => {
console.log("Connection lost, attempting to reconnect...");
});