feat(cowork-service): platform-client + flush scheduler (H.4-H.8 TS wiring)

New files:
- lib/platform-client.ts — REST client for platform-service endpoints:
  POST /audit, POST /telemetry/events, POST /usage, GET /flags/poll

- lib/flush-scheduler.ts — periodic drain of IPC buffers → platform-service:
  - flushAll(): drains audit, telemetry, budget from Rust IPC → REST
  - pollAndSyncFlags(): GET /flags/poll → update TS registry + push to IPC
  - Singleton pattern with start/stop/finalFlush lifecycle
  - All operations best-effort (logged, never crash service)

Updated server.ts:
- Starts flush scheduler after IPC bridge connects
- finalFlush() before shutdown (drain remaining events)

32 tests passing, typecheck clean.
This commit is contained in:
saravanakumardb1 2026-04-02 22:50:00 -07:00
parent ff433e172d
commit f8f3cdc242
3 changed files with 367 additions and 0 deletions

View File

@ -0,0 +1,226 @@
/**
* Periodic flush scheduler drains IPC buffers and forwards to platform-service.
*
* Responsibilities:
* - Poll feature flags from platform-service update local registry + IPC bridge
* - Flush audit events from Rust IPC POST /audit
* - Flush telemetry events from Rust IPC POST /telemetry/events
* - Flush budget records from Rust IPC POST /usage
*
* All operations are best-effort: failures are logged but never crash the service.
*/
import { type FastifyBaseLogger } from 'fastify';
import { getIpcBridge } from './ipc-bridge.js';
import { setFlag } from './feature-flags.js';
import {
pollFlags,
postAuditEvents,
postTelemetryEvents,
postUsageRecords,
type AuditEntry,
type TelemetryEvent,
type UsageRecord,
} from './platform-client.js';
export interface FlushSchedulerOptions {
/** Interval in ms between flush cycles. Default: 30_000 (30s). */
flushIntervalMs?: number;
/** Interval in ms between flag polls. Default: 60_000 (60s). */
flagPollIntervalMs?: number;
/** Logger instance. */
log: FastifyBaseLogger;
}
export class FlushScheduler {
private flushTimer: ReturnType<typeof setInterval> | null = null;
private flagTimer: ReturnType<typeof setInterval> | null = null;
private readonly flushIntervalMs: number;
private readonly flagPollIntervalMs: number;
private readonly log: FastifyBaseLogger;
private _running = false;
constructor(opts: FlushSchedulerOptions) {
this.flushIntervalMs = opts.flushIntervalMs ?? 30_000;
this.flagPollIntervalMs = opts.flagPollIntervalMs ?? 60_000;
this.log = opts.log;
}
get isRunning(): boolean {
return this._running;
}
/** Start periodic flush and flag polling. */
start(): void {
if (this._running) return;
this._running = true;
this.flushTimer = setInterval(() => {
this.flushAll().catch(err => this.log.error(err, 'flush cycle failed'));
}, this.flushIntervalMs);
this.flagTimer = setInterval(() => {
this.pollAndSyncFlags().catch(err => this.log.error(err, 'flag poll failed'));
}, this.flagPollIntervalMs);
this.log.info(
{ flushIntervalMs: this.flushIntervalMs, flagPollIntervalMs: this.flagPollIntervalMs },
'flush scheduler started',
);
}
/** Stop all timers. */
stop(): void {
if (this.flushTimer) clearInterval(this.flushTimer);
if (this.flagTimer) clearInterval(this.flagTimer);
this.flushTimer = null;
this.flagTimer = null;
this._running = false;
this.log.info('flush scheduler stopped');
}
/** Run a final flush before shutdown. */
async finalFlush(): Promise<void> {
this.stop();
await this.flushAll();
}
// ── Flush cycle ────────────────────────────────────────────
/** Drain all IPC buffers and post to platform-service. */
async flushAll(): Promise<FlushResult> {
const bridge = getIpcBridge();
if (!bridge.isRunning) {
return { audit: null, telemetry: null, budget: null, skipped: true };
}
const svcAuth = { userId: 'system', role: 'admin' };
const result: FlushResult = { audit: null, telemetry: null, budget: null, skipped: false };
result.audit = await this.flushAudit(bridge, svcAuth);
result.telemetry = await this.flushTelemetry(bridge, svcAuth);
result.budget = await this.flushBudget(bridge, svcAuth);
return result;
}
private async flushAudit(
bridge: ReturnType<typeof getIpcBridge>,
auth: Record<string, unknown>,
): Promise<{ drained: number; posted: number; errors: number } | null> {
try {
const resp = await bridge.flushAudit(auth);
if (resp.error) {
this.log.warn({ error: resp.error }, 'IPC flush_audit returned error');
return null;
}
const r = resp.result as Record<string, unknown> | undefined;
const entries: AuditEntry[] = (r?.events as AuditEntry[]) ?? [];
if (entries.length === 0) return { drained: 0, posted: 0, errors: 0 };
const { posted, errors } = await postAuditEvents(entries);
this.log.debug({ drained: entries.length, posted, errors }, 'audit flush');
return { drained: entries.length, posted, errors };
} catch (err) {
this.log.error(err, 'audit flush failed');
return null;
}
}
private async flushTelemetry(
bridge: ReturnType<typeof getIpcBridge>,
auth: Record<string, unknown>,
): Promise<{ drained: number; accepted: number; rejected: number } | null> {
try {
const resp = await bridge.flushTelemetry(auth);
if (resp.error) {
this.log.warn({ error: resp.error }, 'IPC flush_telemetry returned error');
return null;
}
const r = resp.result as Record<string, unknown> | undefined;
const events: TelemetryEvent[] = (r?.events as TelemetryEvent[]) ?? [];
if (events.length === 0) return { drained: 0, accepted: 0, rejected: 0 };
const { accepted, rejected } = await postTelemetryEvents(events);
this.log.debug({ drained: events.length, accepted, rejected }, 'telemetry flush');
return { drained: events.length, accepted, rejected };
} catch (err) {
this.log.error(err, 'telemetry flush failed');
return null;
}
}
private async flushBudget(
bridge: ReturnType<typeof getIpcBridge>,
auth: Record<string, unknown>,
): Promise<{ drained: number; posted: number; errors: number } | null> {
try {
const resp = await bridge.flushBudget(auth);
if (resp.error) {
this.log.warn({ error: resp.error }, 'IPC flush_budget returned error');
return null;
}
const r = resp.result as Record<string, unknown> | undefined;
const records: UsageRecord[] = (r?.records as UsageRecord[]) ?? [];
if (records.length === 0) return { drained: 0, posted: 0, errors: 0 };
const { posted, errors } = await postUsageRecords(records);
this.log.debug({ drained: records.length, posted, errors }, 'budget flush');
return { drained: records.length, posted, errors };
} catch (err) {
this.log.error(err, 'budget flush failed');
return null;
}
}
// ── Flag polling ───────────────────────────────────────────
/** Poll platform-service for flag updates and sync to local registry + IPC. */
async pollAndSyncFlags(): Promise<{ updated: number } | null> {
try {
const { flags } = await pollFlags();
let updated = 0;
// Update local TS registry
for (const [key, value] of Object.entries(flags)) {
setFlag(key, value);
updated++;
}
// Push to Rust IPC bridge if running
const bridge = getIpcBridge();
if (bridge.isRunning) {
await bridge.updateFlags(flags, { userId: 'system', role: 'admin' });
}
this.log.debug({ updated }, 'flag poll sync');
return { updated };
} catch (err) {
this.log.error(err, 'flag poll failed');
return null;
}
}
}
export interface FlushResult {
audit: { drained: number; posted: number; errors: number } | null;
telemetry: { drained: number; accepted: number; rejected: number } | null;
budget: { drained: number; posted: number; errors: number } | null;
skipped?: boolean;
}
// ── Singleton ──
let _scheduler: FlushScheduler | null = null;
export function getFlushScheduler(log?: FastifyBaseLogger): FlushScheduler {
if (!_scheduler) {
if (!log) throw new Error('FlushScheduler not initialized — provide a logger');
_scheduler = new FlushScheduler({ log });
}
return _scheduler;
}
export function setFlushScheduler(scheduler: FlushScheduler | null): void {
_scheduler = scheduler;
}

View File

@ -0,0 +1,132 @@
/**
* REST client for platform-service endpoints.
*
* Used by the flush scheduler to forward audit events, telemetry,
* budget/usage records, and poll feature flags from platform-service.
*/
import { config } from './config.js';
import { PRODUCT_ID } from './product-config.js';
const baseUrl = () => config.PLATFORM_SERVICE_URL;
interface FetchOptions {
method: 'GET' | 'POST';
path: string;
body?: unknown;
headers?: Record<string, string>;
}
async function request<T = unknown>(opts: FetchOptions): Promise<T> {
const url = `${baseUrl()}${opts.path}`;
const res = await fetch(url, {
method: opts.method,
headers: {
'Content-Type': 'application/json',
'x-product-id': PRODUCT_ID,
...opts.headers,
},
body: opts.body ? JSON.stringify(opts.body) : undefined,
});
if (!res.ok) {
const text = await res.text().catch(() => '');
throw new Error(`platform-service ${opts.method} ${opts.path}${res.status}: ${text.slice(0, 200)}`);
}
return res.json() as Promise<T>;
}
// ── Audit ──────────────────────────────────────────────────────
export interface AuditEntry {
userId: string;
action: string;
category?: string;
details?: Record<string, unknown>;
}
/** POST /audit — fire-and-forget audit write. Returns { accepted: true }. */
export async function postAuditEvents(entries: AuditEntry[]): Promise<{ posted: number; errors: number }> {
let posted = 0;
let errors = 0;
for (const entry of entries) {
try {
await request({ method: 'POST', path: '/audit', body: entry });
posted++;
} catch {
errors++;
}
}
return { posted, errors };
}
// ── Telemetry ──────────────────────────────────────────────────
export interface TelemetryEvent {
id: string;
name: string;
timestamp: string;
properties?: Record<string, unknown>;
[key: string]: unknown;
}
/** POST /telemetry/events — batch ingest telemetry events. */
export async function postTelemetryEvents(
events: TelemetryEvent[],
): Promise<{ accepted: number; rejected: number }> {
if (events.length === 0) return { accepted: 0, rejected: 0 };
return request<{ accepted: number; rejected: number }>({
method: 'POST',
path: '/telemetry/events',
body: { productId: PRODUCT_ID, events },
});
}
// ── Usage / Budget ─────────────────────────────────────────────
export interface UsageRecord {
userId: string;
date: string;
model?: string;
source?: string;
inputTokens?: number;
outputTokens?: number;
costUsd?: number;
requestCount?: number;
}
/** POST /usage — upsert usage record. */
export async function postUsageRecord(record: UsageRecord): Promise<unknown> {
return request({ method: 'POST', path: '/usage', body: record });
}
/** Post multiple usage records (budget flush). */
export async function postUsageRecords(records: UsageRecord[]): Promise<{ posted: number; errors: number }> {
let posted = 0;
let errors = 0;
for (const record of records) {
try {
await postUsageRecord(record);
posted++;
} catch {
errors++;
}
}
return { posted, errors };
}
// ── Feature Flags ──────────────────────────────────────────────
/** GET /flags/poll — poll current flag values for this product. */
export async function pollFlags(
platform?: string,
): Promise<{ flags: Record<string, boolean>; productId: string }> {
const params = new URLSearchParams();
if (platform) params.set('platform', platform);
const qs = params.toString();
return request<{ flags: Record<string, boolean>; productId: string }>({
method: 'GET',
path: `/flags/poll${qs ? `?${qs}` : ''}`,
});
}

View File

@ -22,6 +22,7 @@ import { taskRoutes } from './modules/tasks/routes.js';
import { config } from './lib/config.js';
import { productConfig, PRODUCT_ID } from './lib/product-config.js';
import { getIpcBridge } from './lib/ipc-bridge.js';
import { getFlushScheduler } from './lib/flush-scheduler.js';
import type { JwtPayload } from './lib/request-context.js';
const jwtSecret = new TextEncoder().encode(config.JWT_SECRET);
@ -68,8 +69,16 @@ try {
app.log.warn({ err }, 'IPC bridge failed to start — running in fallback mode');
}
// Start flush scheduler (periodic drain of IPC buffers → platform-service)
const scheduler = getFlushScheduler(app.log);
if (bridge.isRunning) {
scheduler.start();
app.log.info('flush scheduler started — audit/telemetry/budget/flags');
}
// Graceful shutdown
app.addHook('onClose', async () => {
await scheduler.finalFlush();
await bridge.shutdown();
});