Skip to content

Redis Transport

The Redis transport enables distributed pub/sub across multiple servers and processes.

Terminal window
bun add @pubsubjs/transport-redis
import { RedisTransport } from "@pubsubjs/transport-redis";
import { Publisher, Subscriber } from "@pubsubjs/core";
import { events } from "./events";
const transport = new RedisTransport({
url: "redis://localhost:6379",
});
const publisher = new Publisher({ events, transport });
const subscriber = new Subscriber({ events, transport });
subscriber.on("order.placed", async (payload) => {
console.log(`New order: ${payload.orderId}`);
await processOrder(payload);
});
await subscriber.subscribe();
const transport = new RedisTransport({
// Connection URL
url: "redis://localhost:6379",
// Or use host/port
host: "localhost",
port: 6379,
// Authentication
password: "your-password",
username: "default",
// Database number (0-15)
db: 0,
// TLS/SSL
tls: {
ca: fs.readFileSync("ca.pem"),
cert: fs.readFileSync("cert.pem"),
key: fs.readFileSync("key.pem"),
},
// Connection pool
maxConnections: 10,
// Retry strategy
retryStrategy: (times) => Math.min(times * 100, 3000),
});

Redis transport uses Redis Pub/Sub under the hood:

  1. Publisher sends messages to Redis channels
  2. Redis broadcasts to all subscribers of that channel
  3. Subscriber receives messages from subscribed channels
┌──────────┐ ┌─────────┐ ┌──────────┐
│ Server 1 │────▶│ Redis │────▶│ Server 2 │
│ Publisher│ │ Pub/Sub │ │Subscriber│
└──────────┘ └─────────┘ └──────────┘
┌──────────┐
│ Server 3 │
│Subscriber│
└──────────┘
order-service.ts
const publisher = new Publisher({ events, transport });
await publisher.publish("order.placed", {
orderId: "123",
items: [...],
total: 99.99,
});
// inventory-service.ts
const subscriber = new Subscriber({ events, transport });
subscriber.on("order.placed", async (payload) => {
await reserveInventory(payload.items);
});
// notification-service.ts
const subscriber = new Subscriber({ events, transport });
subscriber.on("order.placed", async (payload) => {
await sendOrderConfirmationEmail(payload);
});
// AWS Lambda / Vercel Functions
import { RedisTransport } from "@pubsubjs/transport-redis";
import { Publisher } from "@pubsubjs/core";
export async function handler(event) {
const transport = new RedisTransport({
url: process.env.REDIS_URL,
});
const publisher = new Publisher({ events, transport });
await publisher.publish("webhook.received", {
source: "stripe",
payload: event.body,
});
await transport.disconnect();
return { statusCode: 200 };
}
// Producer
await publisher.publish("job.queued", {
jobId: "job-123",
type: "video-transcode",
input: "s3://bucket/video.mp4",
});
// Worker (can be on different server)
subscriber.on("job.queued", async (payload) => {
console.log(`Processing job: ${payload.jobId}`);
await processJob(payload);
await publisher.publish("job.completed", {
jobId: payload.jobId,
output: "s3://bucket/video-720p.mp4",
});
});

Connect to Redis Cluster:

const transport = new RedisTransport({
cluster: [
{ host: "node1.redis.example.com", port: 6379 },
{ host: "node2.redis.example.com", port: 6379 },
{ host: "node3.redis.example.com", port: 6379 },
],
clusterOptions: {
scaleReads: "slave",
},
});

High availability with Sentinel:

const transport = new RedisTransport({
sentinels: [
{ host: "sentinel1.example.com", port: 26379 },
{ host: "sentinel2.example.com", port: 26379 },
{ host: "sentinel3.example.com", port: 26379 },
],
sentinelName: "mymaster",
});
// Reuse transport across your application
const transport = new RedisTransport({ url: process.env.REDIS_URL });
// Graceful shutdown
process.on("SIGTERM", async () => {
await subscriber.unsubscribe();
await transport.disconnect();
process.exit(0);
});

Redis Pub/Sub doesn’t persist messages. For durability, consider:

  1. Redis Streams for message persistence
  2. Dead letter queues for failed messages
  3. Idempotency middleware for message deduplication
const subscriber = new Subscriber({
events,
transport,
middleware: [
createIdempotencyMiddleware({
hasProcessed: async (id) => redis.exists(`processed:${id}`),
markProcessed: async (id) => redis.setex(`processed:${id}`, 86400, "1"),
}),
],
});
// Monitor Redis connection
transport.on("connected", () => {
metrics.gauge("redis_connected", 1);
});
transport.on("disconnected", () => {
metrics.gauge("redis_connected", 0);
});
transport.on("error", (error) => {
metrics.counter("redis_errors", 1);
logger.error("Redis error:", error);
});
  • No message persistence: Messages are lost if no subscriber is listening
  • No message ordering: Messages may arrive out of order under high load
  • No acknowledgments: No built-in delivery confirmation

For these features, consider using Redis Streams or a dedicated message queue.