Redis Transport
The Redis transport enables distributed pub/sub across multiple servers and processes.
Installation
Section titled “Installation”bun add @pubsubjs/transport-redisBasic Usage
Section titled “Basic Usage”import { RedisTransport } from "@pubsubjs/transport-redis";import { Publisher, Subscriber } from "@pubsubjs/core";import { events } from "./events";
const transport = new RedisTransport({ url: "redis://localhost:6379",});
const publisher = new Publisher({ events, transport });const subscriber = new Subscriber({ events, transport });
subscriber.on("order.placed", async (payload) => { console.log(`New order: ${payload.orderId}`); await processOrder(payload);});
await subscriber.subscribe();Configuration Options
Section titled “Configuration Options”const transport = new RedisTransport({ // Connection URL url: "redis://localhost:6379",
// Or use host/port host: "localhost", port: 6379,
// Authentication password: "your-password", username: "default",
// Database number (0-15) db: 0,
// TLS/SSL tls: { ca: fs.readFileSync("ca.pem"), cert: fs.readFileSync("cert.pem"), key: fs.readFileSync("key.pem"), },
// Connection pool maxConnections: 10,
// Retry strategy retryStrategy: (times) => Math.min(times * 100, 3000),});How It Works
Section titled “How It Works”Redis transport uses Redis Pub/Sub under the hood:
- Publisher sends messages to Redis channels
- Redis broadcasts to all subscribers of that channel
- Subscriber receives messages from subscribed channels
┌──────────┐ ┌─────────┐ ┌──────────┐│ Server 1 │────▶│ Redis │────▶│ Server 2 ││ Publisher│ │ Pub/Sub │ │Subscriber│└──────────┘ └─────────┘ └──────────┘ │ ▼ ┌──────────┐ │ Server 3 │ │Subscriber│ └──────────┘Use Cases
Section titled “Use Cases”Microservices Communication
Section titled “Microservices Communication”const publisher = new Publisher({ events, transport });
await publisher.publish("order.placed", { orderId: "123", items: [...], total: 99.99,});
// inventory-service.tsconst subscriber = new Subscriber({ events, transport });
subscriber.on("order.placed", async (payload) => { await reserveInventory(payload.items);});
// notification-service.tsconst subscriber = new Subscriber({ events, transport });
subscriber.on("order.placed", async (payload) => { await sendOrderConfirmationEmail(payload);});Serverless Functions
Section titled “Serverless Functions”// AWS Lambda / Vercel Functionsimport { RedisTransport } from "@pubsubjs/transport-redis";import { Publisher } from "@pubsubjs/core";
export async function handler(event) { const transport = new RedisTransport({ url: process.env.REDIS_URL, });
const publisher = new Publisher({ events, transport });
await publisher.publish("webhook.received", { source: "stripe", payload: event.body, });
await transport.disconnect();
return { statusCode: 200 };}Worker Queues
Section titled “Worker Queues”// Producerawait publisher.publish("job.queued", { jobId: "job-123", type: "video-transcode", input: "s3://bucket/video.mp4",});
// Worker (can be on different server)subscriber.on("job.queued", async (payload) => { console.log(`Processing job: ${payload.jobId}`); await processJob(payload);
await publisher.publish("job.completed", { jobId: payload.jobId, output: "s3://bucket/video-720p.mp4", });});Redis Cluster
Section titled “Redis Cluster”Connect to Redis Cluster:
const transport = new RedisTransport({ cluster: [ { host: "node1.redis.example.com", port: 6379 }, { host: "node2.redis.example.com", port: 6379 }, { host: "node3.redis.example.com", port: 6379 }, ], clusterOptions: { scaleReads: "slave", },});Redis Sentinel
Section titled “Redis Sentinel”High availability with Sentinel:
const transport = new RedisTransport({ sentinels: [ { host: "sentinel1.example.com", port: 26379 }, { host: "sentinel2.example.com", port: 26379 }, { host: "sentinel3.example.com", port: 26379 }, ], sentinelName: "mymaster",});Best Practices
Section titled “Best Practices”Connection Management
Section titled “Connection Management”// Reuse transport across your applicationconst transport = new RedisTransport({ url: process.env.REDIS_URL });
// Graceful shutdownprocess.on("SIGTERM", async () => { await subscriber.unsubscribe(); await transport.disconnect(); process.exit(0);});Message Durability
Section titled “Message Durability”Redis Pub/Sub doesn’t persist messages. For durability, consider:
- Redis Streams for message persistence
- Dead letter queues for failed messages
- Idempotency middleware for message deduplication
const subscriber = new Subscriber({ events, transport, middleware: [ createIdempotencyMiddleware({ hasProcessed: async (id) => redis.exists(`processed:${id}`), markProcessed: async (id) => redis.setex(`processed:${id}`, 86400, "1"), }), ],});Monitoring
Section titled “Monitoring”// Monitor Redis connectiontransport.on("connected", () => { metrics.gauge("redis_connected", 1);});
transport.on("disconnected", () => { metrics.gauge("redis_connected", 0);});
transport.on("error", (error) => { metrics.counter("redis_errors", 1); logger.error("Redis error:", error);});Limitations
Section titled “Limitations”- No message persistence: Messages are lost if no subscriber is listening
- No message ordering: Messages may arrive out of order under high load
- No acknowledgments: No built-in delivery confirmation
For these features, consider using Redis Streams or a dedicated message queue.
Next Steps
Section titled “Next Steps”- WebSocket Transport - Real-time browser apps
- SSE Transport - Server-to-client streaming
- Middleware - Add idempotency and more