diff --git a/services/platform-service/src/lib/cosmos-init.ts b/services/platform-service/src/lib/cosmos-init.ts index 5a71f777..2d697640 100644 --- a/services/platform-service/src/lib/cosmos-init.ts +++ b/services/platform-service/src/lib/cosmos-init.ts @@ -69,6 +69,8 @@ const CONTAINER_DEFS: Record = { organizations: { partitionKeyPath: '/productId' }, workspaces: { partitionKeyPath: '/orgId' }, org_memberships: { partitionKeyPath: '/orgId' }, + // Human review / approval queue + review_queue: { partitionKeyPath: '/productId', defaultTtl: 30 * 86400 }, // Telemetry (client diagnostics — see docs/WINDSURF/CLIENT_TELEMETRY_DESIGN.md) telemetry_events: { partitionKeyPath: '/pk', defaultTtl: 30 * 86400 }, telemetry_error_clusters: { partitionKeyPath: '/pk', defaultTtl: 90 * 86400 }, diff --git a/services/platform-service/src/modules/reviews/notifications.ts b/services/platform-service/src/modules/reviews/notifications.ts new file mode 100644 index 00000000..58c3995b --- /dev/null +++ b/services/platform-service/src/modules/reviews/notifications.ts @@ -0,0 +1,37 @@ +import { dispatchEmail } from '../delivery/dispatcher.js'; +import type { ReviewItemDoc } from './types.js'; + +const noopLog = { + info: (..._args: unknown[]) => {}, + error: (..._args: unknown[]) => {}, +}; + +export async function notifyReviewAssigned( + review: ReviewItemDoc, + log: { info: (...args: unknown[]) => void; error: (...args: unknown[]) => void } = noopLog +): Promise { + if (!review.assignedTo) return; + + log.info( + { reviewId: review.id, assignedTo: review.assignedTo }, + '[reviews] Review assignment notification queued' + ); + + const to = `${review.assignedTo}@bytelyst.local`; + await dispatchEmail( + { + to, + subject: `Review assigned: ${review.title}`, + html: [ + `

A review item has been assigned to you.

`, + `

${review.title}

`, + `

${review.description}

`, + review.runId ? `

Run ID: ${review.runId}

` : '', + ].join(''), + text: `Review assigned: ${review.title}\n\n${review.description}${review.runId ? `\n\nRun ID: ${review.runId}` : ''}`, + productId: review.productId, + userId: review.assignedTo, + }, + log + ); +} diff --git a/services/platform-service/src/modules/reviews/repository.test.ts b/services/platform-service/src/modules/reviews/repository.test.ts new file mode 100644 index 00000000..8f685290 --- /dev/null +++ b/services/platform-service/src/modules/reviews/repository.test.ts @@ -0,0 +1,37 @@ +import { afterEach, beforeEach, describe, expect, it } from 'vitest'; +import { MemoryDatastoreProvider } from '@bytelyst/datastore'; +import { _resetDatastoreProvider, setProvider } from '../../lib/datastore.js'; +import * as repo from './repository.js'; + +describe('reviews repository', () => { + beforeEach(() => { + setProvider(new MemoryDatastoreProvider()); + }); + + afterEach(() => { + _resetDatastoreProvider(); + }); + + it('creates and filters review items', async () => { + await repo.create({ + id: 'rev_1', + productId: 'lysnrai', + title: 'Approve outbound action', + description: 'Agent wants to send a customer-facing message.', + category: 'agent_action', + status: 'pending', + priority: 'high', + scope: 'org', + orgId: 'org_1', + requestedBy: 'user_1', + source: 'mcp.a2a', + actionType: 'send_message', + createdAt: '2026-03-15T00:00:00.000Z', + updatedAt: '2026-03-15T00:00:00.000Z', + }); + + const reviews = await repo.list('lysnrai', { limit: 20, status: 'pending' }); + expect(reviews).toHaveLength(1); + expect(reviews[0].category).toBe('agent_action'); + }); +}); diff --git a/services/platform-service/src/modules/reviews/repository.ts b/services/platform-service/src/modules/reviews/repository.ts new file mode 100644 index 00000000..82cbdd3e --- /dev/null +++ b/services/platform-service/src/modules/reviews/repository.ts @@ -0,0 +1,47 @@ +import { NotFoundError } from '../../lib/errors.js'; +import { getCollection } from '../../lib/datastore.js'; +import type { ListReviewsQuery, ReviewItemDoc } from './types.js'; + +function reviewCollection() { + return getCollection('review_queue', '/productId'); +} + +export async function create(doc: ReviewItemDoc): Promise { + return reviewCollection().create(doc); +} + +export async function getById(id: string, productId: string): Promise { + const review = await reviewCollection().findById(id, productId); + if (!review) throw new NotFoundError(`Review '${id}' not found`); + return review; +} + +export async function list(productId: string, query: ListReviewsQuery): Promise { + return reviewCollection().findMany({ + filter: { + productId, + ...(query.status ? { status: query.status } : {}), + ...(query.priority ? { priority: query.priority } : {}), + ...(query.scope ? { scope: query.scope } : {}), + ...(query.orgId ? { orgId: query.orgId } : {}), + ...(query.workspaceId ? { workspaceId: query.workspaceId } : {}), + ...(query.assignedTo ? { assignedTo: query.assignedTo } : {}), + ...(query.runId ? { runId: query.runId } : {}), + }, + sort: { createdAt: -1 }, + limit: query.limit, + }); +} + +export async function update( + id: string, + productId: string, + updates: Partial +): Promise { + const updated = await reviewCollection().update(id, productId, { + ...updates, + updatedAt: new Date().toISOString(), + }); + if (!updated) throw new NotFoundError(`Review '${id}' not found`); + return updated; +} diff --git a/services/platform-service/src/modules/reviews/routes.test.ts b/services/platform-service/src/modules/reviews/routes.test.ts new file mode 100644 index 00000000..10e8c15c --- /dev/null +++ b/services/platform-service/src/modules/reviews/routes.test.ts @@ -0,0 +1,88 @@ +import Fastify from 'fastify'; +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; + +const repoMock = { + list: vi.fn(), + create: vi.fn(), + getById: vi.fn(), + update: vi.fn(), +}; + +const notifyMock = { + notifyReviewAssigned: vi.fn(), +}; + +vi.mock('./repository.js', () => repoMock); +vi.mock('./notifications.js', () => notifyMock); + +async function buildApp(payload?: { sub: string; productId: string; role?: string }) { + const { reviewRoutes } = await import('./routes.js'); + const app = Fastify({ logger: false }); + if (payload) { + app.addHook('onRequest', async req => { + req.jwtPayload = payload; + }); + } + await app.register(reviewRoutes, { prefix: '/api' }); + return app; +} + +describe('reviewRoutes', () => { + beforeEach(() => { + vi.clearAllMocks(); + }); + + afterEach(() => { + vi.restoreAllMocks(); + }); + + it('POST /reviews creates a pending review item', async () => { + repoMock.create.mockResolvedValue({ id: 'rev_1', status: 'pending' }); + const app = await buildApp({ sub: 'admin_1', productId: 'lysnrai', role: 'admin' }); + + const res = await app.inject({ + method: 'POST', + url: '/api/reviews', + payload: { + title: 'Approve agent escalation', + description: 'The agent wants to escalate this incident.', + category: 'agent_action', + orgId: 'org_1', + source: 'mcp.a2a', + actionType: 'escalate_incident', + }, + }); + + expect(res.statusCode).toBe(200); + expect(repoMock.create).toHaveBeenCalledWith( + expect.objectContaining({ + requestedBy: 'admin_1', + status: 'pending', + }) + ); + }); + + it('POST /reviews/:id/decision records an approval decision', async () => { + repoMock.update.mockResolvedValue({ id: 'rev_1', status: 'approved' }); + const app = await buildApp({ sub: 'admin_1', productId: 'lysnrai', role: 'admin' }); + + const res = await app.inject({ + method: 'POST', + url: '/api/reviews/rev_1/decision', + payload: { decision: 'approved', reason: 'Looks safe' }, + }); + + expect(res.statusCode).toBe(200); + expect(repoMock.update).toHaveBeenCalledWith( + 'rev_1', + 'lysnrai', + expect.objectContaining({ + status: 'approved', + resolution: expect.objectContaining({ + decision: 'approved', + actedBy: 'admin_1', + }), + }) + ); + }); +}); diff --git a/services/platform-service/src/modules/reviews/routes.ts b/services/platform-service/src/modules/reviews/routes.ts new file mode 100644 index 00000000..d9cdfa1c --- /dev/null +++ b/services/platform-service/src/modules/reviews/routes.ts @@ -0,0 +1,128 @@ +import { randomUUID } from 'node:crypto'; +import type { FastifyInstance } from 'fastify'; +import { BadRequestError, ForbiddenError } from '../../lib/errors.js'; +import { + CreateReviewSchema, + ListReviewsQuerySchema, + ReviewDecisionSchema, + ReviewItemDoc, + UpdateReviewSchema, +} from './types.js'; +import * as repo from './repository.js'; +import { notifyReviewAssigned } from './notifications.js'; + +function requireAdmin(req: { jwtPayload?: { sub?: string; role?: string; productId?: string } }): { + userId: string; + productId: string; +} { + const payload = req.jwtPayload; + if (!payload?.sub) { + throw new ForbiddenError('Authentication required'); + } + if (!payload.role || !['super_admin', 'admin'].includes(payload.role)) { + throw new ForbiddenError('Admin access required'); + } + return { + userId: payload.sub, + productId: payload.productId ?? process.env.DEFAULT_PRODUCT_ID ?? 'lysnrai', + }; +} + +export async function reviewRoutes(app: FastifyInstance) { + app.get('/reviews', async req => { + const access = requireAdmin(req); + const parsed = ListReviewsQuerySchema.safeParse(req.query); + if (!parsed.success) { + throw new BadRequestError(parsed.error.issues.map(issue => issue.message).join('; ')); + } + return repo.list(access.productId, parsed.data); + }); + + app.post('/reviews', async req => { + const access = requireAdmin(req); + const parsed = CreateReviewSchema.safeParse(req.body); + if (!parsed.success) { + throw new BadRequestError(parsed.error.issues.map(issue => issue.message).join('; ')); + } + + if (parsed.data.scope === 'workspace' && !parsed.data.workspaceId) { + throw new BadRequestError('workspaceId is required for workspace-scoped review items'); + } + + const now = new Date().toISOString(); + const status = parsed.data.assignedTo ? 'assigned' : 'pending'; + const review: ReviewItemDoc = { + id: `rev_${randomUUID()}`, + productId: access.productId, + title: parsed.data.title, + description: parsed.data.description, + category: parsed.data.category, + status, + priority: parsed.data.priority, + scope: parsed.data.scope, + orgId: parsed.data.orgId, + workspaceId: parsed.data.workspaceId, + requestedBy: access.userId, + assignedTo: parsed.data.assignedTo, + runId: parsed.data.runId, + source: parsed.data.source, + actionType: parsed.data.actionType, + metadata: parsed.data.metadata, + dueAt: parsed.data.dueAt, + createdAt: now, + updatedAt: now, + }; + + const created = await repo.create(review); + if (created.assignedTo) { + await notifyReviewAssigned(created, req.log); + } + return created; + }); + + app.get('/reviews/:id', async req => { + const access = requireAdmin(req); + const { id } = req.params as { id: string }; + return repo.getById(id, access.productId); + }); + + app.patch('/reviews/:id', async req => { + const access = requireAdmin(req); + const { id } = req.params as { id: string }; + const parsed = UpdateReviewSchema.safeParse(req.body); + if (!parsed.success) { + throw new BadRequestError(parsed.error.issues.map(issue => issue.message).join('; ')); + } + + const updated = await repo.update(id, access.productId, { + ...parsed.data, + status: parsed.data.assignedTo && !parsed.data.status ? 'assigned' : parsed.data.status, + }); + + if (parsed.data.assignedTo) { + await notifyReviewAssigned(updated, req.log); + } + + return updated; + }); + + app.post('/reviews/:id/decision', async req => { + const access = requireAdmin(req); + const { id } = req.params as { id: string }; + const parsed = ReviewDecisionSchema.safeParse(req.body); + if (!parsed.success) { + throw new BadRequestError(parsed.error.issues.map(issue => issue.message).join('; ')); + } + + const decisionStatus = parsed.data.decision; + return repo.update(id, access.productId, { + status: decisionStatus, + resolution: { + decision: decisionStatus, + reason: parsed.data.reason, + actedBy: access.userId, + actedAt: new Date().toISOString(), + }, + }); + }); +} diff --git a/services/platform-service/src/modules/reviews/types.ts b/services/platform-service/src/modules/reviews/types.ts new file mode 100644 index 00000000..317e56af --- /dev/null +++ b/services/platform-service/src/modules/reviews/types.ts @@ -0,0 +1,89 @@ +import { z } from 'zod'; + +export const ReviewStatusSchema = z.enum([ + 'pending', + 'assigned', + 'approved', + 'rejected', + 'cancelled', + 'expired', +]); +export const ReviewPrioritySchema = z.enum(['low', 'normal', 'high', 'urgent']); +export const ReviewScopeSchema = z.enum(['org', 'workspace']); + +export const ReviewItemSchema = z.object({ + id: z.string().min(1), + productId: z.string().min(1), + title: z.string().min(1), + description: z.string().min(1), + category: z.string().min(1), + status: ReviewStatusSchema, + priority: ReviewPrioritySchema, + scope: ReviewScopeSchema, + orgId: z.string().min(1), + workspaceId: z.string().optional(), + requestedBy: z.string().min(1), + assignedTo: z.string().optional(), + runId: z.string().optional(), + source: z.string().min(1), + actionType: z.string().min(1), + metadata: z.record(z.unknown()).optional(), + resolution: z + .object({ + decision: z.enum(['approved', 'rejected', 'cancelled', 'expired']), + reason: z.string().optional(), + actedBy: z.string().optional(), + actedAt: z.string(), + }) + .optional(), + dueAt: z.string().optional(), + createdAt: z.string(), + updatedAt: z.string(), +}); + +export type ReviewItemDoc = z.infer & { + _ts?: number; + _etag?: string; +}; + +export const CreateReviewSchema = z.object({ + title: z.string().min(1), + description: z.string().min(1), + category: z.string().min(1), + priority: ReviewPrioritySchema.default('normal'), + scope: ReviewScopeSchema.default('org'), + orgId: z.string().min(1), + workspaceId: z.string().optional(), + assignedTo: z.string().optional(), + runId: z.string().optional(), + source: z.string().min(1), + actionType: z.string().min(1), + metadata: z.record(z.unknown()).optional(), + dueAt: z.string().optional(), +}); + +export const UpdateReviewSchema = z.object({ + status: ReviewStatusSchema.optional(), + priority: ReviewPrioritySchema.optional(), + assignedTo: z.string().optional(), + dueAt: z.string().optional(), + metadata: z.record(z.unknown()).optional(), +}); + +export const ReviewDecisionSchema = z.object({ + decision: z.enum(['approved', 'rejected', 'cancelled']), + reason: z.string().optional(), +}); + +export const ListReviewsQuerySchema = z.object({ + status: ReviewStatusSchema.optional(), + priority: ReviewPrioritySchema.optional(), + scope: ReviewScopeSchema.optional(), + orgId: z.string().optional(), + workspaceId: z.string().optional(), + assignedTo: z.string().optional(), + runId: z.string().optional(), + limit: z.coerce.number().min(1).max(100).default(20), +}); + +export type ListReviewsQuery = z.infer; diff --git a/services/platform-service/src/server.ts b/services/platform-service/src/server.ts index af76ee7f..8965a762 100644 --- a/services/platform-service/src/server.ts +++ b/services/platform-service/src/server.ts @@ -77,6 +77,7 @@ import { experimentRoutes } from './modules/experiments/routes.js'; import { abTestingRoutes } from './modules/ab-testing/routes.js'; import { analyticsRoutes } from './modules/analytics/routes.js'; import { feedbackRoutes } from './modules/feedback/routes.js'; +import { reviewRoutes } from './modules/reviews/routes.js'; import { impersonationRoutes } from './modules/impersonation/routes.js'; import { changelogRoutes } from './modules/changelog/routes.js'; import { webhookRoutes } from './modules/webhooks/routes.js'; @@ -200,6 +201,7 @@ await app.register(experimentRoutes, { prefix: '/api' }); await app.register(abTestingRoutes, { prefix: '/api' }); await app.register(analyticsRoutes, { prefix: '/api' }); await app.register(feedbackRoutes, { prefix: '/api' }); +await app.register(reviewRoutes, { prefix: '/api' }); await app.register(impersonationRoutes, { prefix: '/api' }); await app.register(changelogRoutes, { prefix: '/api' }); // Webhook subscriptions (replaces lib/webhooks.ts fire-and-forget)