feat(platform-service): add human review queue

This commit is contained in:
root 2026-03-15 06:08:23 +00:00
parent 33c5a5a5ce
commit 3398d1400f
8 changed files with 430 additions and 0 deletions

View File

@ -69,6 +69,8 @@ const CONTAINER_DEFS: Record<string, ContainerConfig> = {
organizations: { partitionKeyPath: '/productId' },
workspaces: { partitionKeyPath: '/orgId' },
org_memberships: { partitionKeyPath: '/orgId' },
// Human review / approval queue
review_queue: { partitionKeyPath: '/productId', defaultTtl: 30 * 86400 },
// Telemetry (client diagnostics — see docs/WINDSURF/CLIENT_TELEMETRY_DESIGN.md)
telemetry_events: { partitionKeyPath: '/pk', defaultTtl: 30 * 86400 },
telemetry_error_clusters: { partitionKeyPath: '/pk', defaultTtl: 90 * 86400 },

View File

@ -0,0 +1,37 @@
import { dispatchEmail } from '../delivery/dispatcher.js';
import type { ReviewItemDoc } from './types.js';
const noopLog = {
info: (..._args: unknown[]) => {},
error: (..._args: unknown[]) => {},
};
export async function notifyReviewAssigned(
review: ReviewItemDoc,
log: { info: (...args: unknown[]) => void; error: (...args: unknown[]) => void } = noopLog
): Promise<void> {
if (!review.assignedTo) return;
log.info(
{ reviewId: review.id, assignedTo: review.assignedTo },
'[reviews] Review assignment notification queued'
);
const to = `${review.assignedTo}@bytelyst.local`;
await dispatchEmail(
{
to,
subject: `Review assigned: ${review.title}`,
html: [
`<p>A review item has been assigned to you.</p>`,
`<p><strong>${review.title}</strong></p>`,
`<p>${review.description}</p>`,
review.runId ? `<p>Run ID: ${review.runId}</p>` : '',
].join(''),
text: `Review assigned: ${review.title}\n\n${review.description}${review.runId ? `\n\nRun ID: ${review.runId}` : ''}`,
productId: review.productId,
userId: review.assignedTo,
},
log
);
}

View File

@ -0,0 +1,37 @@
import { afterEach, beforeEach, describe, expect, it } from 'vitest';
import { MemoryDatastoreProvider } from '@bytelyst/datastore';
import { _resetDatastoreProvider, setProvider } from '../../lib/datastore.js';
import * as repo from './repository.js';
describe('reviews repository', () => {
beforeEach(() => {
setProvider(new MemoryDatastoreProvider());
});
afterEach(() => {
_resetDatastoreProvider();
});
it('creates and filters review items', async () => {
await repo.create({
id: 'rev_1',
productId: 'lysnrai',
title: 'Approve outbound action',
description: 'Agent wants to send a customer-facing message.',
category: 'agent_action',
status: 'pending',
priority: 'high',
scope: 'org',
orgId: 'org_1',
requestedBy: 'user_1',
source: 'mcp.a2a',
actionType: 'send_message',
createdAt: '2026-03-15T00:00:00.000Z',
updatedAt: '2026-03-15T00:00:00.000Z',
});
const reviews = await repo.list('lysnrai', { limit: 20, status: 'pending' });
expect(reviews).toHaveLength(1);
expect(reviews[0].category).toBe('agent_action');
});
});

View File

@ -0,0 +1,47 @@
import { NotFoundError } from '../../lib/errors.js';
import { getCollection } from '../../lib/datastore.js';
import type { ListReviewsQuery, ReviewItemDoc } from './types.js';
function reviewCollection() {
return getCollection<ReviewItemDoc>('review_queue', '/productId');
}
export async function create(doc: ReviewItemDoc): Promise<ReviewItemDoc> {
return reviewCollection().create(doc);
}
export async function getById(id: string, productId: string): Promise<ReviewItemDoc> {
const review = await reviewCollection().findById(id, productId);
if (!review) throw new NotFoundError(`Review '${id}' not found`);
return review;
}
export async function list(productId: string, query: ListReviewsQuery): Promise<ReviewItemDoc[]> {
return reviewCollection().findMany({
filter: {
productId,
...(query.status ? { status: query.status } : {}),
...(query.priority ? { priority: query.priority } : {}),
...(query.scope ? { scope: query.scope } : {}),
...(query.orgId ? { orgId: query.orgId } : {}),
...(query.workspaceId ? { workspaceId: query.workspaceId } : {}),
...(query.assignedTo ? { assignedTo: query.assignedTo } : {}),
...(query.runId ? { runId: query.runId } : {}),
},
sort: { createdAt: -1 },
limit: query.limit,
});
}
export async function update(
id: string,
productId: string,
updates: Partial<ReviewItemDoc>
): Promise<ReviewItemDoc> {
const updated = await reviewCollection().update(id, productId, {
...updates,
updatedAt: new Date().toISOString(),
});
if (!updated) throw new NotFoundError(`Review '${id}' not found`);
return updated;
}

View File

@ -0,0 +1,88 @@
import Fastify from 'fastify';
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
const repoMock = {
list: vi.fn(),
create: vi.fn(),
getById: vi.fn(),
update: vi.fn(),
};
const notifyMock = {
notifyReviewAssigned: vi.fn(),
};
vi.mock('./repository.js', () => repoMock);
vi.mock('./notifications.js', () => notifyMock);
async function buildApp(payload?: { sub: string; productId: string; role?: string }) {
const { reviewRoutes } = await import('./routes.js');
const app = Fastify({ logger: false });
if (payload) {
app.addHook('onRequest', async req => {
req.jwtPayload = payload;
});
}
await app.register(reviewRoutes, { prefix: '/api' });
return app;
}
describe('reviewRoutes', () => {
beforeEach(() => {
vi.clearAllMocks();
});
afterEach(() => {
vi.restoreAllMocks();
});
it('POST /reviews creates a pending review item', async () => {
repoMock.create.mockResolvedValue({ id: 'rev_1', status: 'pending' });
const app = await buildApp({ sub: 'admin_1', productId: 'lysnrai', role: 'admin' });
const res = await app.inject({
method: 'POST',
url: '/api/reviews',
payload: {
title: 'Approve agent escalation',
description: 'The agent wants to escalate this incident.',
category: 'agent_action',
orgId: 'org_1',
source: 'mcp.a2a',
actionType: 'escalate_incident',
},
});
expect(res.statusCode).toBe(200);
expect(repoMock.create).toHaveBeenCalledWith(
expect.objectContaining({
requestedBy: 'admin_1',
status: 'pending',
})
);
});
it('POST /reviews/:id/decision records an approval decision', async () => {
repoMock.update.mockResolvedValue({ id: 'rev_1', status: 'approved' });
const app = await buildApp({ sub: 'admin_1', productId: 'lysnrai', role: 'admin' });
const res = await app.inject({
method: 'POST',
url: '/api/reviews/rev_1/decision',
payload: { decision: 'approved', reason: 'Looks safe' },
});
expect(res.statusCode).toBe(200);
expect(repoMock.update).toHaveBeenCalledWith(
'rev_1',
'lysnrai',
expect.objectContaining({
status: 'approved',
resolution: expect.objectContaining({
decision: 'approved',
actedBy: 'admin_1',
}),
})
);
});
});

View File

@ -0,0 +1,128 @@
import { randomUUID } from 'node:crypto';
import type { FastifyInstance } from 'fastify';
import { BadRequestError, ForbiddenError } from '../../lib/errors.js';
import {
CreateReviewSchema,
ListReviewsQuerySchema,
ReviewDecisionSchema,
ReviewItemDoc,
UpdateReviewSchema,
} from './types.js';
import * as repo from './repository.js';
import { notifyReviewAssigned } from './notifications.js';
function requireAdmin(req: { jwtPayload?: { sub?: string; role?: string; productId?: string } }): {
userId: string;
productId: string;
} {
const payload = req.jwtPayload;
if (!payload?.sub) {
throw new ForbiddenError('Authentication required');
}
if (!payload.role || !['super_admin', 'admin'].includes(payload.role)) {
throw new ForbiddenError('Admin access required');
}
return {
userId: payload.sub,
productId: payload.productId ?? process.env.DEFAULT_PRODUCT_ID ?? 'lysnrai',
};
}
export async function reviewRoutes(app: FastifyInstance) {
app.get('/reviews', async req => {
const access = requireAdmin(req);
const parsed = ListReviewsQuerySchema.safeParse(req.query);
if (!parsed.success) {
throw new BadRequestError(parsed.error.issues.map(issue => issue.message).join('; '));
}
return repo.list(access.productId, parsed.data);
});
app.post('/reviews', async req => {
const access = requireAdmin(req);
const parsed = CreateReviewSchema.safeParse(req.body);
if (!parsed.success) {
throw new BadRequestError(parsed.error.issues.map(issue => issue.message).join('; '));
}
if (parsed.data.scope === 'workspace' && !parsed.data.workspaceId) {
throw new BadRequestError('workspaceId is required for workspace-scoped review items');
}
const now = new Date().toISOString();
const status = parsed.data.assignedTo ? 'assigned' : 'pending';
const review: ReviewItemDoc = {
id: `rev_${randomUUID()}`,
productId: access.productId,
title: parsed.data.title,
description: parsed.data.description,
category: parsed.data.category,
status,
priority: parsed.data.priority,
scope: parsed.data.scope,
orgId: parsed.data.orgId,
workspaceId: parsed.data.workspaceId,
requestedBy: access.userId,
assignedTo: parsed.data.assignedTo,
runId: parsed.data.runId,
source: parsed.data.source,
actionType: parsed.data.actionType,
metadata: parsed.data.metadata,
dueAt: parsed.data.dueAt,
createdAt: now,
updatedAt: now,
};
const created = await repo.create(review);
if (created.assignedTo) {
await notifyReviewAssigned(created, req.log);
}
return created;
});
app.get('/reviews/:id', async req => {
const access = requireAdmin(req);
const { id } = req.params as { id: string };
return repo.getById(id, access.productId);
});
app.patch('/reviews/:id', async req => {
const access = requireAdmin(req);
const { id } = req.params as { id: string };
const parsed = UpdateReviewSchema.safeParse(req.body);
if (!parsed.success) {
throw new BadRequestError(parsed.error.issues.map(issue => issue.message).join('; '));
}
const updated = await repo.update(id, access.productId, {
...parsed.data,
status: parsed.data.assignedTo && !parsed.data.status ? 'assigned' : parsed.data.status,
});
if (parsed.data.assignedTo) {
await notifyReviewAssigned(updated, req.log);
}
return updated;
});
app.post('/reviews/:id/decision', async req => {
const access = requireAdmin(req);
const { id } = req.params as { id: string };
const parsed = ReviewDecisionSchema.safeParse(req.body);
if (!parsed.success) {
throw new BadRequestError(parsed.error.issues.map(issue => issue.message).join('; '));
}
const decisionStatus = parsed.data.decision;
return repo.update(id, access.productId, {
status: decisionStatus,
resolution: {
decision: decisionStatus,
reason: parsed.data.reason,
actedBy: access.userId,
actedAt: new Date().toISOString(),
},
});
});
}

View File

@ -0,0 +1,89 @@
import { z } from 'zod';
export const ReviewStatusSchema = z.enum([
'pending',
'assigned',
'approved',
'rejected',
'cancelled',
'expired',
]);
export const ReviewPrioritySchema = z.enum(['low', 'normal', 'high', 'urgent']);
export const ReviewScopeSchema = z.enum(['org', 'workspace']);
export const ReviewItemSchema = z.object({
id: z.string().min(1),
productId: z.string().min(1),
title: z.string().min(1),
description: z.string().min(1),
category: z.string().min(1),
status: ReviewStatusSchema,
priority: ReviewPrioritySchema,
scope: ReviewScopeSchema,
orgId: z.string().min(1),
workspaceId: z.string().optional(),
requestedBy: z.string().min(1),
assignedTo: z.string().optional(),
runId: z.string().optional(),
source: z.string().min(1),
actionType: z.string().min(1),
metadata: z.record(z.unknown()).optional(),
resolution: z
.object({
decision: z.enum(['approved', 'rejected', 'cancelled', 'expired']),
reason: z.string().optional(),
actedBy: z.string().optional(),
actedAt: z.string(),
})
.optional(),
dueAt: z.string().optional(),
createdAt: z.string(),
updatedAt: z.string(),
});
export type ReviewItemDoc = z.infer<typeof ReviewItemSchema> & {
_ts?: number;
_etag?: string;
};
export const CreateReviewSchema = z.object({
title: z.string().min(1),
description: z.string().min(1),
category: z.string().min(1),
priority: ReviewPrioritySchema.default('normal'),
scope: ReviewScopeSchema.default('org'),
orgId: z.string().min(1),
workspaceId: z.string().optional(),
assignedTo: z.string().optional(),
runId: z.string().optional(),
source: z.string().min(1),
actionType: z.string().min(1),
metadata: z.record(z.unknown()).optional(),
dueAt: z.string().optional(),
});
export const UpdateReviewSchema = z.object({
status: ReviewStatusSchema.optional(),
priority: ReviewPrioritySchema.optional(),
assignedTo: z.string().optional(),
dueAt: z.string().optional(),
metadata: z.record(z.unknown()).optional(),
});
export const ReviewDecisionSchema = z.object({
decision: z.enum(['approved', 'rejected', 'cancelled']),
reason: z.string().optional(),
});
export const ListReviewsQuerySchema = z.object({
status: ReviewStatusSchema.optional(),
priority: ReviewPrioritySchema.optional(),
scope: ReviewScopeSchema.optional(),
orgId: z.string().optional(),
workspaceId: z.string().optional(),
assignedTo: z.string().optional(),
runId: z.string().optional(),
limit: z.coerce.number().min(1).max(100).default(20),
});
export type ListReviewsQuery = z.infer<typeof ListReviewsQuerySchema>;

View File

@ -77,6 +77,7 @@ import { experimentRoutes } from './modules/experiments/routes.js';
import { abTestingRoutes } from './modules/ab-testing/routes.js';
import { analyticsRoutes } from './modules/analytics/routes.js';
import { feedbackRoutes } from './modules/feedback/routes.js';
import { reviewRoutes } from './modules/reviews/routes.js';
import { impersonationRoutes } from './modules/impersonation/routes.js';
import { changelogRoutes } from './modules/changelog/routes.js';
import { webhookRoutes } from './modules/webhooks/routes.js';
@ -200,6 +201,7 @@ await app.register(experimentRoutes, { prefix: '/api' });
await app.register(abTestingRoutes, { prefix: '/api' });
await app.register(analyticsRoutes, { prefix: '/api' });
await app.register(feedbackRoutes, { prefix: '/api' });
await app.register(reviewRoutes, { prefix: '/api' });
await app.register(impersonationRoutes, { prefix: '/api' });
await app.register(changelogRoutes, { prefix: '/api' });
// Webhook subscriptions (replaces lib/webhooks.ts fire-and-forget)