Skip to content

Events

@veloxts/events provides real-time event broadcasting using WebSocket or Server-Sent Events (SSE), with optional Redis pub/sub for horizontal scaling.

Terminal window
pnpm add @veloxts/events
import { veloxApp } from '@veloxts/core';
import { eventsPlugin } from '@veloxts/events';
const app = veloxApp();
app.register(eventsPlugin({
driver: 'ws',
path: '/ws',
}));
await app.start();
.env
REDIS_URL=redis://localhost:6379
EVENTS_SECRET=your-32-char-secret-for-signing-tokens

WebSocket

Bidirectional real-time communication. Recommended for most use cases.

SSE

Unidirectional server-to-client. Fallback when WebSocket unavailable.

app.register(eventsPlugin({
driver: 'ws',
path: '/ws',
redis: process.env.REDIS_URL, // For horizontal scaling
authSecret: process.env.EVENTS_SECRET, // For private/presence channels
pingInterval: 30000, // Keep-alive (default: 30s)
maxPayloadSize: 1048576, // Max message size (default: 1MB)
}));
app.register(eventsPlugin({
driver: 'sse',
path: '/events',
heartbeatInterval: 15000, // Keep-alive (default: 15s)
retryInterval: 3000, // Client reconnect delay (default: 3s)
}));
import { procedure, procedures } from '@veloxts/router';
import { z } from 'zod';
export const orderProcedures = procedures('orders', {
createOrder: procedure()
.input(z.object({ productId: z.string(), quantity: z.number() }))
.mutation(async ({ input, ctx }) => {
const order = await ctx.db.order.create({ data: input });
// Broadcast to public channel
await ctx.events.broadcast('orders', 'order.created', {
orderId: order.id,
total: order.total,
});
// Broadcast to user's private channel
await ctx.events.broadcast(
`private-user.${ctx.user.id}`,
'order.confirmed',
order
);
return order;
}),
});
// Basic broadcast
await ctx.events.broadcast('channel', 'event-name', { data: 'value' });
// Broadcast to multiple channels
await ctx.events.broadcastToMany(
['user.1', 'user.2', 'user.3'],
'notification',
{ message: 'System maintenance scheduled' }
);
// Broadcast to all except sender (e.g., chat)
await ctx.events.broadcastExcept(
'chat.room-1',
'message.sent',
{ text: 'Hello!' },
senderSocketId
);
TypePrefixAccessUse Case
Public(none)AnyoneAnnouncements, public updates
Privateprivate-AuthenticatedUser notifications, private data
Presencepresence-Authenticated + trackingChat rooms, collaborative editing
// Server
await ctx.events.broadcast('announcements', 'new-feature', {
title: 'Dark Mode Released!',
});
// Client
socket.send(JSON.stringify({
type: 'subscribe',
channel: 'announcements',
}));
// Server
await ctx.events.broadcast('private-user.123', 'notification', {
message: 'You have a new message',
});
// Client (requires auth token)
socket.send(JSON.stringify({
type: 'subscribe',
channel: 'private-user.123',
auth: authToken,
}));
// Server - track who's online
const members = await ctx.events.presenceMembers('presence-chat.room-1');
// [{ id: '123', info: { name: 'Alice' } }, { id: '456', info: { name: 'Bob' } }]
// Client - receives member_added/member_removed automatically
socket.send(JSON.stringify({
type: 'subscribe',
channel: 'presence-chat.room-1',
data: { id: 'user-123', name: 'Alice' },
auth: authToken,
}));
app.register(eventsPlugin({
driver: 'ws',
path: '/ws',
authSecret: process.env.EVENTS_SECRET,
authorizer: async (channel, request) => {
// Public channels - allow all
if (channel.type === 'public') {
return { authorized: true };
}
// Require authentication
const user = request.user;
if (!user) {
return { authorized: false, error: 'Not authenticated' };
}
// Private user channels - only owner
if (channel.name.startsWith('private-user.')) {
const channelUserId = channel.name.replace('private-user.', '');
if (channelUserId !== user.id) {
return { authorized: false, error: 'Access denied' };
}
}
// Presence channels - include member info
if (channel.type === 'presence') {
return {
authorized: true,
member: {
id: user.id,
info: { name: user.name, avatar: user.avatar },
},
};
}
return { authorized: true };
},
}));
const socket = new WebSocket('ws://localhost:3030/ws');
socket.onopen = () => {
// Subscribe to public channel
socket.send(JSON.stringify({
type: 'subscribe',
channel: 'orders',
}));
};
socket.onmessage = (event) => {
const message = JSON.parse(event.data);
switch (message.type) {
case 'event':
console.log(`${message.event}:`, message.data);
break;
case 'subscription_succeeded':
console.log(`Subscribed to ${message.channel}`);
break;
case 'subscription_error':
console.error(`Failed: ${message.error}`);
break;
}
};
// Unsubscribe
socket.send(JSON.stringify({
type: 'unsubscribe',
channel: 'orders',
}));
async function subscribeToPrivateChannel(socket, channel) {
// Get auth token from server
const response = await fetch('/ws/auth', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
credentials: 'include',
body: JSON.stringify({
socketId: socket.socketId,
channel,
}),
});
const { auth, channel_data } = await response.json();
socket.send(JSON.stringify({
type: 'subscribe',
channel,
auth,
channel_data,
}));
}
socket.onmessage = (event) => {
const message = JSON.parse(event.data);
if (message.event === 'member_added') {
console.log('User joined:', message.data);
}
if (message.event === 'member_removed') {
console.log('User left:', message.data);
}
};
// Get subscriber count
const count = await ctx.events.subscriberCount('orders');
// Check if channel has subscribers
const hasSubscribers = await ctx.events.hasSubscribers('orders');
// Get all active channels
const channels = await ctx.events.channels();
// Get presence members
const members = await ctx.events.presenceMembers('presence-chat.room-1');
  1. Redis for horizontal scaling - Required when running multiple instances
  2. Auth secret for private channels - Required for private- and presence- channels
  3. Channel authorizer - Control who can subscribe to which channels
  4. Secure WebSocket URL - Use wss:// in production (TLS)
src/index.ts
import { veloxApp } from '@veloxts/core';
import { eventsPlugin } from '@veloxts/events';
const app = veloxApp();
app.register(eventsPlugin({
driver: 'ws',
path: '/ws',
// Required: Redis for multi-instance deployments
redis: process.env.REDIS_URL,
// Required: Secret for signing private channel tokens
authSecret: process.env.EVENTS_SECRET,
// Required: Control channel access
authorizer: async (channel, request) => {
// Public channels - anyone can subscribe
if (channel.type === 'public') {
return { authorized: true };
}
// Private/presence channels require authentication
const user = request.user;
if (!user) {
return { authorized: false, error: 'Authentication required' };
}
// Example: User can only subscribe to their own private channel
if (channel.name.startsWith('private-user.')) {
const channelUserId = channel.name.replace('private-user.', '');
if (channelUserId !== user.id) {
return { authorized: false, error: 'Access denied' };
}
}
// Presence channels - include member info for "who's online"
if (channel.type === 'presence') {
return {
authorized: true,
member: { id: user.id, info: { name: user.name } },
};
}
return { authorized: true };
},
}));
await app.start();
.env
# Redis connection (Upstash, Redis Cloud, or self-hosted)
REDIS_URL=redis://user:password@your-redis-host:6379
# Secret for signing auth tokens (min 32 characters)
EVENTS_SECRET=your-very-long-secret-key-minimum-32-chars

For multi-instance deployments behind a load balancer:

┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Instance 1 │ │ Instance 2 │ │ Instance 3 │
│ WebSocket │ │ WebSocket │ │ WebSocket │
└────────┬────────┘ └────────┬────────┘ └────────┬────────┘
│ │ │
└───────────────────────┼───────────────────────┘
┌────────────▼────────────┐
│ Redis (pub/sub) │
└─────────────────────────┘

How it works:

  1. Client calls ctx.events.broadcast() on Instance 1
  2. Event is sent to local WebSocket clients on Instance 1
  3. Event is published to Redis
  4. Instances 2 and 3 receive from Redis
  5. Each instance delivers to its local clients
ProviderBest For
UpstashServerless, pay-per-request
Redis CloudManaged Redis clusters
RailwaySimple Redis add-on
Self-hostedFull control, existing infrastructure

Use events outside of Fastify request context (background jobs, CLI):

import { getEvents, closeEvents } from '@veloxts/events';
const events = await getEvents({
driver: 'ws',
redis: process.env.REDIS_URL,
});
await events.broadcast('jobs', 'job.completed', { jobId: '123' });
// Clean up on shutdown
await closeEvents();