learning_ai_common_plat/packages/fastify-sse/src/hub.ts

144 lines
3.0 KiB
TypeScript

/**
* SSE Hub — manages connected clients and broadcasts events.
* Product backends create an SSEHub instance and push events to it;
* the hub fans out to all connected SSE clients.
*/
import type { ServerResponse } from 'node:http';
export interface SSEMessage {
event?: string;
data: string;
id?: string;
retry?: number;
}
interface ConnectedClient {
id: string;
userId?: string;
res: ServerResponse;
connectedAt: string;
}
export class SSEHub {
private clients = new Map<string, ConnectedClient>();
private clientCounter = 0;
/**
* Add a new SSE client connection.
* Sets up the SSE headers and returns a client ID.
*/
addClient(res: ServerResponse, userId?: string): string {
const id = `sse_${++this.clientCounter}_${Date.now()}`;
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
'X-Accel-Buffering': 'no',
});
// Send initial connection event
res.write(`event: connected\ndata: ${JSON.stringify({ clientId: id })}\n\n`);
const client: ConnectedClient = {
id,
userId,
res,
connectedAt: new Date().toISOString(),
};
this.clients.set(id, client);
// Clean up on close
res.on('close', () => {
this.clients.delete(id);
});
return id;
}
/**
* Broadcast an SSE message to all connected clients.
*/
broadcast(message: SSEMessage): number {
let sent = 0;
const formatted = formatSSE(message);
for (const [id, client] of this.clients) {
try {
client.res.write(formatted);
sent++;
} catch {
this.clients.delete(id);
}
}
return sent;
}
/**
* Send an SSE message to a specific user's connections.
*/
sendToUser(userId: string, message: SSEMessage): number {
let sent = 0;
const formatted = formatSSE(message);
for (const [id, client] of this.clients) {
if (client.userId === userId) {
try {
client.res.write(formatted);
sent++;
} catch {
this.clients.delete(id);
}
}
}
return sent;
}
/**
* Send a heartbeat (comment) to all clients to keep connections alive.
*/
heartbeat(): void {
for (const [id, client] of this.clients) {
try {
client.res.write(': heartbeat\n\n');
} catch {
this.clients.delete(id);
}
}
}
/**
* Get count of connected clients.
*/
get clientCount(): number {
return this.clients.size;
}
/**
* Disconnect all clients.
*/
disconnectAll(): void {
for (const [, client] of this.clients) {
try {
client.res.end();
} catch {
/* already closed */
}
}
this.clients.clear();
}
}
function formatSSE(message: SSEMessage): string {
let output = '';
if (message.id) output += `id: ${message.id}\n`;
if (message.event) output += `event: ${message.event}\n`;
if (message.retry) output += `retry: ${message.retry}\n`;
output += `data: ${message.data}\n\n`;
return output;
}