From f8f3cdc24212d602033257d5a510b5d2f6c23807 Mon Sep 17 00:00:00 2001 From: saravanakumardb1 Date: Thu, 2 Apr 2026 22:50:00 -0700 Subject: [PATCH] feat(cowork-service): platform-client + flush scheduler (H.4-H.8 TS wiring) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit New files: - lib/platform-client.ts — REST client for platform-service endpoints: POST /audit, POST /telemetry/events, POST /usage, GET /flags/poll - lib/flush-scheduler.ts — periodic drain of IPC buffers → platform-service: - flushAll(): drains audit, telemetry, budget from Rust IPC → REST - pollAndSyncFlags(): GET /flags/poll → update TS registry + push to IPC - Singleton pattern with start/stop/finalFlush lifecycle - All operations best-effort (logged, never crash service) Updated server.ts: - Starts flush scheduler after IPC bridge connects - finalFlush() before shutdown (drain remaining events) 32 tests passing, typecheck clean. --- .../cowork-service/src/lib/flush-scheduler.ts | 226 ++++++++++++++++++ .../cowork-service/src/lib/platform-client.ts | 132 ++++++++++ services/cowork-service/src/server.ts | 9 + 3 files changed, 367 insertions(+) create mode 100644 services/cowork-service/src/lib/flush-scheduler.ts create mode 100644 services/cowork-service/src/lib/platform-client.ts diff --git a/services/cowork-service/src/lib/flush-scheduler.ts b/services/cowork-service/src/lib/flush-scheduler.ts new file mode 100644 index 00000000..eb23dfea --- /dev/null +++ b/services/cowork-service/src/lib/flush-scheduler.ts @@ -0,0 +1,226 @@ +/** + * Periodic flush scheduler — drains IPC buffers and forwards to platform-service. + * + * Responsibilities: + * - Poll feature flags from platform-service → update local registry + IPC bridge + * - Flush audit events from Rust IPC → POST /audit + * - Flush telemetry events from Rust IPC → POST /telemetry/events + * - Flush budget records from Rust IPC → POST /usage + * + * All operations are best-effort: failures are logged but never crash the service. + */ + +import { type FastifyBaseLogger } from 'fastify'; +import { getIpcBridge } from './ipc-bridge.js'; +import { setFlag } from './feature-flags.js'; +import { + pollFlags, + postAuditEvents, + postTelemetryEvents, + postUsageRecords, + type AuditEntry, + type TelemetryEvent, + type UsageRecord, +} from './platform-client.js'; + +export interface FlushSchedulerOptions { + /** Interval in ms between flush cycles. Default: 30_000 (30s). */ + flushIntervalMs?: number; + /** Interval in ms between flag polls. Default: 60_000 (60s). */ + flagPollIntervalMs?: number; + /** Logger instance. */ + log: FastifyBaseLogger; +} + +export class FlushScheduler { + private flushTimer: ReturnType | null = null; + private flagTimer: ReturnType | null = null; + private readonly flushIntervalMs: number; + private readonly flagPollIntervalMs: number; + private readonly log: FastifyBaseLogger; + private _running = false; + + constructor(opts: FlushSchedulerOptions) { + this.flushIntervalMs = opts.flushIntervalMs ?? 30_000; + this.flagPollIntervalMs = opts.flagPollIntervalMs ?? 60_000; + this.log = opts.log; + } + + get isRunning(): boolean { + return this._running; + } + + /** Start periodic flush and flag polling. */ + start(): void { + if (this._running) return; + this._running = true; + + this.flushTimer = setInterval(() => { + this.flushAll().catch(err => this.log.error(err, 'flush cycle failed')); + }, this.flushIntervalMs); + + this.flagTimer = setInterval(() => { + this.pollAndSyncFlags().catch(err => this.log.error(err, 'flag poll failed')); + }, this.flagPollIntervalMs); + + this.log.info( + { flushIntervalMs: this.flushIntervalMs, flagPollIntervalMs: this.flagPollIntervalMs }, + 'flush scheduler started', + ); + } + + /** Stop all timers. */ + stop(): void { + if (this.flushTimer) clearInterval(this.flushTimer); + if (this.flagTimer) clearInterval(this.flagTimer); + this.flushTimer = null; + this.flagTimer = null; + this._running = false; + this.log.info('flush scheduler stopped'); + } + + /** Run a final flush before shutdown. */ + async finalFlush(): Promise { + this.stop(); + await this.flushAll(); + } + + // ── Flush cycle ──────────────────────────────────────────── + + /** Drain all IPC buffers and post to platform-service. */ + async flushAll(): Promise { + const bridge = getIpcBridge(); + if (!bridge.isRunning) { + return { audit: null, telemetry: null, budget: null, skipped: true }; + } + + const svcAuth = { userId: 'system', role: 'admin' }; + const result: FlushResult = { audit: null, telemetry: null, budget: null, skipped: false }; + + result.audit = await this.flushAudit(bridge, svcAuth); + result.telemetry = await this.flushTelemetry(bridge, svcAuth); + result.budget = await this.flushBudget(bridge, svcAuth); + + return result; + } + + private async flushAudit( + bridge: ReturnType, + auth: Record, + ): Promise<{ drained: number; posted: number; errors: number } | null> { + try { + const resp = await bridge.flushAudit(auth); + if (resp.error) { + this.log.warn({ error: resp.error }, 'IPC flush_audit returned error'); + return null; + } + const r = resp.result as Record | undefined; + const entries: AuditEntry[] = (r?.events as AuditEntry[]) ?? []; + if (entries.length === 0) return { drained: 0, posted: 0, errors: 0 }; + + const { posted, errors } = await postAuditEvents(entries); + this.log.debug({ drained: entries.length, posted, errors }, 'audit flush'); + return { drained: entries.length, posted, errors }; + } catch (err) { + this.log.error(err, 'audit flush failed'); + return null; + } + } + + private async flushTelemetry( + bridge: ReturnType, + auth: Record, + ): Promise<{ drained: number; accepted: number; rejected: number } | null> { + try { + const resp = await bridge.flushTelemetry(auth); + if (resp.error) { + this.log.warn({ error: resp.error }, 'IPC flush_telemetry returned error'); + return null; + } + const r = resp.result as Record | undefined; + const events: TelemetryEvent[] = (r?.events as TelemetryEvent[]) ?? []; + if (events.length === 0) return { drained: 0, accepted: 0, rejected: 0 }; + + const { accepted, rejected } = await postTelemetryEvents(events); + this.log.debug({ drained: events.length, accepted, rejected }, 'telemetry flush'); + return { drained: events.length, accepted, rejected }; + } catch (err) { + this.log.error(err, 'telemetry flush failed'); + return null; + } + } + + private async flushBudget( + bridge: ReturnType, + auth: Record, + ): Promise<{ drained: number; posted: number; errors: number } | null> { + try { + const resp = await bridge.flushBudget(auth); + if (resp.error) { + this.log.warn({ error: resp.error }, 'IPC flush_budget returned error'); + return null; + } + const r = resp.result as Record | undefined; + const records: UsageRecord[] = (r?.records as UsageRecord[]) ?? []; + if (records.length === 0) return { drained: 0, posted: 0, errors: 0 }; + + const { posted, errors } = await postUsageRecords(records); + this.log.debug({ drained: records.length, posted, errors }, 'budget flush'); + return { drained: records.length, posted, errors }; + } catch (err) { + this.log.error(err, 'budget flush failed'); + return null; + } + } + + // ── Flag polling ─────────────────────────────────────────── + + /** Poll platform-service for flag updates and sync to local registry + IPC. */ + async pollAndSyncFlags(): Promise<{ updated: number } | null> { + try { + const { flags } = await pollFlags(); + let updated = 0; + + // Update local TS registry + for (const [key, value] of Object.entries(flags)) { + setFlag(key, value); + updated++; + } + + // Push to Rust IPC bridge if running + const bridge = getIpcBridge(); + if (bridge.isRunning) { + await bridge.updateFlags(flags, { userId: 'system', role: 'admin' }); + } + + this.log.debug({ updated }, 'flag poll sync'); + return { updated }; + } catch (err) { + this.log.error(err, 'flag poll failed'); + return null; + } + } +} + +export interface FlushResult { + audit: { drained: number; posted: number; errors: number } | null; + telemetry: { drained: number; accepted: number; rejected: number } | null; + budget: { drained: number; posted: number; errors: number } | null; + skipped?: boolean; +} + +// ── Singleton ── + +let _scheduler: FlushScheduler | null = null; + +export function getFlushScheduler(log?: FastifyBaseLogger): FlushScheduler { + if (!_scheduler) { + if (!log) throw new Error('FlushScheduler not initialized — provide a logger'); + _scheduler = new FlushScheduler({ log }); + } + return _scheduler; +} + +export function setFlushScheduler(scheduler: FlushScheduler | null): void { + _scheduler = scheduler; +} diff --git a/services/cowork-service/src/lib/platform-client.ts b/services/cowork-service/src/lib/platform-client.ts new file mode 100644 index 00000000..b5504f6e --- /dev/null +++ b/services/cowork-service/src/lib/platform-client.ts @@ -0,0 +1,132 @@ +/** + * REST client for platform-service endpoints. + * + * Used by the flush scheduler to forward audit events, telemetry, + * budget/usage records, and poll feature flags from platform-service. + */ + +import { config } from './config.js'; +import { PRODUCT_ID } from './product-config.js'; + +const baseUrl = () => config.PLATFORM_SERVICE_URL; + +interface FetchOptions { + method: 'GET' | 'POST'; + path: string; + body?: unknown; + headers?: Record; +} + +async function request(opts: FetchOptions): Promise { + const url = `${baseUrl()}${opts.path}`; + const res = await fetch(url, { + method: opts.method, + headers: { + 'Content-Type': 'application/json', + 'x-product-id': PRODUCT_ID, + ...opts.headers, + }, + body: opts.body ? JSON.stringify(opts.body) : undefined, + }); + + if (!res.ok) { + const text = await res.text().catch(() => ''); + throw new Error(`platform-service ${opts.method} ${opts.path} → ${res.status}: ${text.slice(0, 200)}`); + } + + return res.json() as Promise; +} + +// ── Audit ────────────────────────────────────────────────────── + +export interface AuditEntry { + userId: string; + action: string; + category?: string; + details?: Record; +} + +/** POST /audit — fire-and-forget audit write. Returns { accepted: true }. */ +export async function postAuditEvents(entries: AuditEntry[]): Promise<{ posted: number; errors: number }> { + let posted = 0; + let errors = 0; + for (const entry of entries) { + try { + await request({ method: 'POST', path: '/audit', body: entry }); + posted++; + } catch { + errors++; + } + } + return { posted, errors }; +} + +// ── Telemetry ────────────────────────────────────────────────── + +export interface TelemetryEvent { + id: string; + name: string; + timestamp: string; + properties?: Record; + [key: string]: unknown; +} + +/** POST /telemetry/events — batch ingest telemetry events. */ +export async function postTelemetryEvents( + events: TelemetryEvent[], +): Promise<{ accepted: number; rejected: number }> { + if (events.length === 0) return { accepted: 0, rejected: 0 }; + return request<{ accepted: number; rejected: number }>({ + method: 'POST', + path: '/telemetry/events', + body: { productId: PRODUCT_ID, events }, + }); +} + +// ── Usage / Budget ───────────────────────────────────────────── + +export interface UsageRecord { + userId: string; + date: string; + model?: string; + source?: string; + inputTokens?: number; + outputTokens?: number; + costUsd?: number; + requestCount?: number; +} + +/** POST /usage — upsert usage record. */ +export async function postUsageRecord(record: UsageRecord): Promise { + return request({ method: 'POST', path: '/usage', body: record }); +} + +/** Post multiple usage records (budget flush). */ +export async function postUsageRecords(records: UsageRecord[]): Promise<{ posted: number; errors: number }> { + let posted = 0; + let errors = 0; + for (const record of records) { + try { + await postUsageRecord(record); + posted++; + } catch { + errors++; + } + } + return { posted, errors }; +} + +// ── Feature Flags ────────────────────────────────────────────── + +/** GET /flags/poll — poll current flag values for this product. */ +export async function pollFlags( + platform?: string, +): Promise<{ flags: Record; productId: string }> { + const params = new URLSearchParams(); + if (platform) params.set('platform', platform); + const qs = params.toString(); + return request<{ flags: Record; productId: string }>({ + method: 'GET', + path: `/flags/poll${qs ? `?${qs}` : ''}`, + }); +} diff --git a/services/cowork-service/src/server.ts b/services/cowork-service/src/server.ts index 8fa9d165..506a7bfe 100644 --- a/services/cowork-service/src/server.ts +++ b/services/cowork-service/src/server.ts @@ -22,6 +22,7 @@ import { taskRoutes } from './modules/tasks/routes.js'; import { config } from './lib/config.js'; import { productConfig, PRODUCT_ID } from './lib/product-config.js'; import { getIpcBridge } from './lib/ipc-bridge.js'; +import { getFlushScheduler } from './lib/flush-scheduler.js'; import type { JwtPayload } from './lib/request-context.js'; const jwtSecret = new TextEncoder().encode(config.JWT_SECRET); @@ -68,8 +69,16 @@ try { app.log.warn({ err }, 'IPC bridge failed to start — running in fallback mode'); } +// Start flush scheduler (periodic drain of IPC buffers → platform-service) +const scheduler = getFlushScheduler(app.log); +if (bridge.isRunning) { + scheduler.start(); + app.log.info('flush scheduler started — audit/telemetry/budget/flags'); +} + // Graceful shutdown app.addHook('onClose', async () => { + await scheduler.finalFlush(); await bridge.shutdown(); });