WebSocket
Bidirectional real-time communication. Recommended for most use cases.
@veloxts/events provides real-time event broadcasting using WebSocket or Server-Sent Events (SSE), with optional Redis pub/sub for horizontal scaling.
pnpm add @veloxts/eventsimport { veloxApp } from '@veloxts/core';import { eventsPlugin } from '@veloxts/events';
const app = veloxApp();
app.register(eventsPlugin({ driver: 'ws', path: '/ws',}));
await app.start();import { veloxApp } from '@veloxts/core';import { eventsPlugin } from '@veloxts/events';
const app = veloxApp();
app.register(eventsPlugin({ driver: 'ws', path: '/ws', redis: process.env.REDIS_URL, authSecret: process.env.EVENTS_SECRET, authorizer: async (channel, request) => { if (channel.type === 'public') { return { authorized: true }; }
const user = request.user; if (!user) { return { authorized: false, error: 'Authentication required' }; }
if (channel.type === 'presence') { return { authorized: true, member: { id: user.id, info: { name: user.name } }, }; }
return { authorized: true }; },}));
await app.start();REDIS_URL=redis://localhost:6379EVENTS_SECRET=your-32-char-secret-for-signing-tokensWebSocket
Bidirectional real-time communication. Recommended for most use cases.
SSE
Unidirectional server-to-client. Fallback when WebSocket unavailable.
app.register(eventsPlugin({ driver: 'ws', path: '/ws', redis: process.env.REDIS_URL, // For horizontal scaling authSecret: process.env.EVENTS_SECRET, // For private/presence channels pingInterval: 30000, // Keep-alive (default: 30s) maxPayloadSize: 1048576, // Max message size (default: 1MB)}));app.register(eventsPlugin({ driver: 'sse', path: '/events', heartbeatInterval: 15000, // Keep-alive (default: 15s) retryInterval: 3000, // Client reconnect delay (default: 3s)}));import { procedure, procedures } from '@veloxts/router';import { z } from 'zod';
export const orderProcedures = procedures('orders', { createOrder: procedure() .input(z.object({ productId: z.string(), quantity: z.number() })) .mutation(async ({ input, ctx }) => { const order = await ctx.db.order.create({ data: input });
// Broadcast to public channel await ctx.events.broadcast('orders', 'order.created', { orderId: order.id, total: order.total, });
// Broadcast to user's private channel await ctx.events.broadcast( `private-user.${ctx.user.id}`, 'order.confirmed', order );
return order; }),});// Basic broadcastawait ctx.events.broadcast('channel', 'event-name', { data: 'value' });
// Broadcast to multiple channelsawait ctx.events.broadcastToMany( ['user.1', 'user.2', 'user.3'], 'notification', { message: 'System maintenance scheduled' });
// Broadcast to all except sender (e.g., chat)await ctx.events.broadcastExcept( 'chat.room-1', 'message.sent', { text: 'Hello!' }, senderSocketId);| Type | Prefix | Access | Use Case |
|---|---|---|---|
| Public | (none) | Anyone | Announcements, public updates |
| Private | private- | Authenticated | User notifications, private data |
| Presence | presence- | Authenticated + tracking | Chat rooms, collaborative editing |
// Serverawait ctx.events.broadcast('announcements', 'new-feature', { title: 'Dark Mode Released!',});
// Clientsocket.send(JSON.stringify({ type: 'subscribe', channel: 'announcements',}));// Serverawait ctx.events.broadcast('private-user.123', 'notification', { message: 'You have a new message',});
// Client (requires auth token)socket.send(JSON.stringify({ type: 'subscribe', channel: 'private-user.123', auth: authToken,}));// Server - track who's onlineconst members = await ctx.events.presenceMembers('presence-chat.room-1');// [{ id: '123', info: { name: 'Alice' } }, { id: '456', info: { name: 'Bob' } }]
// Client - receives member_added/member_removed automaticallysocket.send(JSON.stringify({ type: 'subscribe', channel: 'presence-chat.room-1', data: { id: 'user-123', name: 'Alice' }, auth: authToken,}));app.register(eventsPlugin({ driver: 'ws', path: '/ws', authSecret: process.env.EVENTS_SECRET, authorizer: async (channel, request) => { // Public channels - allow all if (channel.type === 'public') { return { authorized: true }; }
// Require authentication const user = request.user; if (!user) { return { authorized: false, error: 'Not authenticated' }; }
// Private user channels - only owner if (channel.name.startsWith('private-user.')) { const channelUserId = channel.name.replace('private-user.', ''); if (channelUserId !== user.id) { return { authorized: false, error: 'Access denied' }; } }
// Presence channels - include member info if (channel.type === 'presence') { return { authorized: true, member: { id: user.id, info: { name: user.name, avatar: user.avatar }, }, }; }
return { authorized: true }; },}));const socket = new WebSocket('ws://localhost:3030/ws');
socket.onopen = () => { // Subscribe to public channel socket.send(JSON.stringify({ type: 'subscribe', channel: 'orders', }));};
socket.onmessage = (event) => { const message = JSON.parse(event.data);
switch (message.type) { case 'event': console.log(`${message.event}:`, message.data); break; case 'subscription_succeeded': console.log(`Subscribed to ${message.channel}`); break; case 'subscription_error': console.error(`Failed: ${message.error}`); break; }};
// Unsubscribesocket.send(JSON.stringify({ type: 'unsubscribe', channel: 'orders',}));async function subscribeToPrivateChannel(socket, channel) { // Get auth token from server const response = await fetch('/ws/auth', { method: 'POST', headers: { 'Content-Type': 'application/json' }, credentials: 'include', body: JSON.stringify({ socketId: socket.socketId, channel, }), });
const { auth, channel_data } = await response.json();
socket.send(JSON.stringify({ type: 'subscribe', channel, auth, channel_data, }));}socket.onmessage = (event) => { const message = JSON.parse(event.data);
if (message.event === 'member_added') { console.log('User joined:', message.data); }
if (message.event === 'member_removed') { console.log('User left:', message.data); }};// Get subscriber countconst count = await ctx.events.subscriberCount('orders');
// Check if channel has subscribersconst hasSubscribers = await ctx.events.hasSubscribers('orders');
// Get all active channelsconst channels = await ctx.events.channels();
// Get presence membersconst members = await ctx.events.presenceMembers('presence-chat.room-1');private- and presence- channelswss:// in production (TLS)import { veloxApp } from '@veloxts/core';import { eventsPlugin } from '@veloxts/events';
const app = veloxApp();
app.register(eventsPlugin({ driver: 'ws', path: '/ws',
// Required: Redis for multi-instance deployments redis: process.env.REDIS_URL,
// Required: Secret for signing private channel tokens authSecret: process.env.EVENTS_SECRET,
// Required: Control channel access authorizer: async (channel, request) => { // Public channels - anyone can subscribe if (channel.type === 'public') { return { authorized: true }; }
// Private/presence channels require authentication const user = request.user; if (!user) { return { authorized: false, error: 'Authentication required' }; }
// Example: User can only subscribe to their own private channel if (channel.name.startsWith('private-user.')) { const channelUserId = channel.name.replace('private-user.', ''); if (channelUserId !== user.id) { return { authorized: false, error: 'Access denied' }; } }
// Presence channels - include member info for "who's online" if (channel.type === 'presence') { return { authorized: true, member: { id: user.id, info: { name: user.name } }, }; }
return { authorized: true }; },}));
await app.start();# Redis connection (Upstash, Redis Cloud, or self-hosted)REDIS_URL=redis://user:password@your-redis-host:6379
# Secret for signing auth tokens (min 32 characters)EVENTS_SECRET=your-very-long-secret-key-minimum-32-charsFor multi-instance deployments behind a load balancer:
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐│ Instance 1 │ │ Instance 2 │ │ Instance 3 ││ WebSocket │ │ WebSocket │ │ WebSocket │└────────┬────────┘ └────────┬────────┘ └────────┬────────┘ │ │ │ └───────────────────────┼───────────────────────┘ │ ┌────────────▼────────────┐ │ Redis (pub/sub) │ └─────────────────────────┘How it works:
ctx.events.broadcast() on Instance 1| Provider | Best For |
|---|---|
| Upstash | Serverless, pay-per-request |
| Redis Cloud | Managed Redis clusters |
| Railway | Simple Redis add-on |
| Self-hosted | Full control, existing infrastructure |
Use events outside of Fastify request context (background jobs, CLI):
import { getEvents, closeEvents } from '@veloxts/events';
const events = await getEvents({ driver: 'ws', redis: process.env.REDIS_URL,});
await events.broadcast('jobs', 'job.completed', { jobId: '123' });
// Clean up on shutdownawait closeEvents();