feat(timeline): add platform timeline ingest and query api

This commit is contained in:
Saravana Achu Mac 2026-04-04 00:54:07 -07:00
parent 6cb4bbaf02
commit e377351842
9 changed files with 424 additions and 2 deletions

View File

@ -198,6 +198,13 @@ These should be resolved before claiming the ecosystem docs are fully implementa
- [x] verify that Phase 1 to Phase 3 events render in one unified stream
Commits:
- `3f2482b`
- [x] add first hosted timeline ingest/query API in `platform-service`
Commits:
- `TBD`
Status note:
- platform-service now persists timeline items in `timeline_items`
- `POST /api/timeline/ingest` accepts canonical ecosystem events and stores unified timeline items
- `GET /api/timeline` returns the unified timeline stream with non-admin JWT callers scoped to their own data
---

View File

@ -1,7 +1,7 @@
# Phase 4 Execution Plan
> **Flow:** Shared personal timeline over the first three ecosystem flows
> **Status:** Implemented baseline
> **Status:** Service baseline implemented
> **Owner:** `learning_ai_common_plat`
> **Purpose:** Turn the personal timeline PRD into a reusable contract and baseline aggregator so the first three ecosystem flows can be rendered as one coherent activity stream.
@ -14,8 +14,9 @@ Phase 4 defines and implements:
1. the canonical `TimelineItem` shape
2. baseline event-to-timeline mapping rules
3. a pure aggregator that converts canonical ecosystem events into timeline items
4. the first hosted platform-service API for timeline ingest and timeline query
This phase does not add a hosted UI yet. It establishes the shared data contract first.
This phase still does not add a hosted UI. It now establishes both the shared data contract and the first persisted service surface.
---
@ -24,6 +25,10 @@ This phase does not add a hosted UI yet. It establishes the shared data contract
- `packages/events/src/timeline.ts`
- `packages/events/src/timeline.test.ts`
- `packages/events/src/index.ts`
- `services/platform-service/src/modules/timeline/types.ts`
- `services/platform-service/src/modules/timeline/repository.ts`
- `services/platform-service/src/modules/timeline/routes.ts`
- `services/platform-service/src/modules/timeline/routes.test.ts`
---
@ -47,6 +52,9 @@ This is enough to render:
## 4. Verification
- `cd learning_ai_common_plat/packages/events && pnpm exec vitest run src/timeline.test.ts`
- `cd learning_ai_common_plat && pnpm --filter @bytelyst/events build`
- `cd learning_ai_common_plat && pnpm --filter @lysnrai/platform-service exec vitest run src/modules/timeline/routes.test.ts src/server.test.ts`
- `cd learning_ai_common_plat && pnpm --filter @lysnrai/platform-service exec tsc --noEmit`
Observed baseline:
@ -58,6 +66,9 @@ Observed baseline:
- related event refs
- actor type
- correlation ID
- platform-service now exposes `POST /api/timeline/ingest` for canonical ecosystem-event ingestion
- platform-service now exposes `GET /api/timeline` for unified timeline query
- non-admin JWT callers are scoped to their own timeline while admin and API-key callers can query broader product timelines
---
@ -67,6 +78,7 @@ Observed baseline:
- [x] define baseline event-to-timeline mapping rules
- [x] implement pure timeline aggregation over canonical events
- [x] verify that Phase 1 to Phase 3 events render in one unified stream
- [x] add first hosted timeline ingest/query API in platform-service
- [ ] choose first hosted timeline UI
---
@ -74,3 +86,4 @@ Observed baseline:
## 6. Commits
- `3f2482b` timeline contract and aggregator baseline
- `TBD` platform-service timeline ingest + query baseline

View File

@ -66,6 +66,8 @@ const CONTAINER_DEFS: Record<string, ContainerConfig> = {
// Generic orchestration runs
agent_runs: { partitionKeyPath: '/productId', defaultTtl: 30 * 86400 },
agent_run_steps: { partitionKeyPath: '/pk', defaultTtl: 30 * 86400 },
// Cross-product personal timeline
timeline_items: { partitionKeyPath: '/productId', defaultTtl: 90 * 86400 },
// Canonical tenant model
organizations: { partitionKeyPath: '/productId' },
workspaces: { partitionKeyPath: '/orgId' },

View File

@ -0,0 +1,33 @@
import type { FilterMap } from '@bytelyst/datastore';
import { getCollection } from '../../lib/datastore.js';
import type { ListTimelineQuery, TimelineItemDoc } from './types.js';
function collection() {
return getCollection<TimelineItemDoc>('timeline_items', '/productId');
}
export async function upsertTimelineItems(items: TimelineItemDoc[]): Promise<TimelineItemDoc[]> {
const results: TimelineItemDoc[] = [];
for (const item of items) {
results.push(await collection().upsert(item));
}
return results;
}
export async function listTimelineItems(
productId: string,
query: ListTimelineQuery
): Promise<TimelineItemDoc[]> {
const filter: FilterMap = { productId };
if (query.userId) filter.userId = query.userId;
if (query.correlationId) filter.correlationId = query.correlationId;
if (query.artifactId) filter.artifactId = query.artifactId;
if (query.eventName) filter.eventName = query.eventName;
return collection().findMany({
filter,
sort: { occurredAt: -1 },
limit: query.limit,
});
}

View File

@ -0,0 +1,188 @@
import Fastify from 'fastify';
import { beforeEach, describe, expect, it, vi } from 'vitest';
const repoMock = {
upsertTimelineItems: vi.fn(),
listTimelineItems: vi.fn(),
};
vi.mock('./repository.js', () => repoMock);
vi.mock('../../lib/api-key-auth.js', () => ({
requireJwtOrApiKey: vi.fn(async req => {
const roleHeader = req.headers['x-test-role'];
const userHeader = req.headers['x-test-user'];
const productHeader = req.headers['x-test-product'];
req.jwtPayload =
typeof userHeader === 'string'
? {
sub: userHeader,
role: typeof roleHeader === 'string' ? roleHeader : undefined,
productId: typeof productHeader === 'string' ? productHeader : 'lysnrai',
}
: undefined;
return {
actorId: typeof userHeader === 'string' ? userHeader : 'svc_timeline',
productId: typeof productHeader === 'string' ? productHeader : 'lysnrai',
source: typeof userHeader === 'string' ? 'jwt' : 'api_key',
};
}),
}));
async function buildApp() {
const { timelineRoutes } = await import('./routes.js');
const app = Fastify({ logger: false });
await app.register(timelineRoutes, { prefix: '/api' });
return app;
}
describe('timelineRoutes', () => {
beforeEach(() => {
vi.clearAllMocks();
});
it('POST /timeline/ingest persists canonical events as timeline items', async () => {
repoMock.upsertTimelineItems.mockImplementation(async docs => docs);
const app = await buildApp();
const res = await app.inject({
method: 'POST',
url: '/api/timeline/ingest',
headers: {
'x-test-user': 'user_1',
'x-test-product': 'lysnrai',
},
payload: {
events: [
{
eventId: 'evt_1',
eventName: 'capture.transcript.created',
eventVersion: 1,
occurredAt: '2026-04-04T10:00:00.000Z',
productId: 'lysnrai',
sourceSurface: 'voice',
actor: { actorType: 'user', actorId: 'user_1' },
trace: { correlationId: 'corr_1', causationId: null, parentEventId: null },
payload: {
artifactId: 'art_1',
durationMs: 1200,
language: 'en',
transcriptSource: 'microphone',
},
},
],
},
});
expect(res.statusCode).toBe(200);
const body = JSON.parse(res.body);
expect(body.ingestedCount).toBe(1);
expect(body.items[0].title).toBe('Transcript captured');
expect(repoMock.upsertTimelineItems).toHaveBeenCalledOnce();
expect(repoMock.upsertTimelineItems.mock.calls[0][0][0]).toMatchObject({
productId: 'lysnrai',
userId: 'user_1',
eventId: 'evt_1',
visibility: 'private',
});
});
it('POST /timeline/ingest rejects cross-product events', async () => {
const app = await buildApp();
const res = await app.inject({
method: 'POST',
url: '/api/timeline/ingest',
headers: {
'x-test-user': 'user_1',
'x-test-product': 'lysnrai',
},
payload: {
events: [
{
eventId: 'evt_2',
eventName: 'artifact.created',
eventVersion: 1,
occurredAt: '2026-04-04T10:00:00.000Z',
productId: 'notelett',
sourceSurface: 'notes',
actor: { actorType: 'user', actorId: 'user_1' },
trace: { correlationId: null, causationId: null, parentEventId: null },
payload: {
artifactType: 'note',
title: 'Quick note',
status: 'created',
},
},
],
},
});
expect(res.statusCode).toBe(400);
});
it('GET /timeline scopes non-admin JWT callers to their own timeline', async () => {
repoMock.listTimelineItems.mockResolvedValue([
{
id: 'timeline_evt_1',
eventId: 'evt_1',
eventName: 'artifact.created',
productId: 'lysnrai',
userId: 'user_1',
orgId: null,
artifactId: 'art_1',
correlationId: 'corr_1',
actorType: 'user',
visibility: 'private',
occurredAt: '2026-04-04T10:00:00.000Z',
item: {
itemId: 'timeline_evt_1',
occurredAt: '2026-04-04T10:00:00.000Z',
eventName: 'artifact.created',
productId: 'lysnrai',
title: 'Note created',
summary: 'note status: created',
artifactRefs: ['art_1'],
relatedEventIds: [],
actorType: 'user',
visibility: 'private',
correlationId: 'corr_1',
},
createdAt: '2026-04-04T10:00:00.000Z',
updatedAt: '2026-04-04T10:00:00.000Z',
},
]);
const app = await buildApp();
const res = await app.inject({
method: 'GET',
url: '/api/timeline?limit=10',
headers: {
'x-test-user': 'user_1',
'x-test-product': 'lysnrai',
},
});
expect(res.statusCode).toBe(200);
expect(repoMock.listTimelineItems).toHaveBeenCalledWith(
'lysnrai',
expect.objectContaining({ userId: 'user_1', limit: 10 })
);
expect(JSON.parse(res.body).count).toBe(1);
});
it('GET /timeline forbids non-admin JWT callers from querying another user', async () => {
const app = await buildApp();
const res = await app.inject({
method: 'GET',
url: '/api/timeline?userId=user_2',
headers: {
'x-test-user': 'user_1',
'x-test-product': 'lysnrai',
},
});
expect(res.statusCode).toBe(403);
});
});

View File

@ -0,0 +1,129 @@
import crypto from 'node:crypto';
import type { FastifyInstance, FastifyRequest } from 'fastify';
import { buildTimelineItem } from '@bytelyst/events';
import { requireJwtOrApiKey } from '../../lib/api-key-auth.js';
import { BadRequestError, ForbiddenError } from '../../lib/errors.js';
import * as repo from './repository.js';
import {
IngestTimelineEventsSchema,
ListTimelineQuerySchema,
type TimelineItemDoc,
} from './types.js';
function isAdmin(req: FastifyRequest): boolean {
const role = req.jwtPayload?.role;
return role === 'admin' || role === 'super_admin';
}
function resolveTimelineUserId(
req: FastifyRequest,
source: 'jwt' | 'api_key',
explicitUserId: string | null | undefined,
fallbackUserId: string
): string | undefined {
if (source === 'api_key') {
return explicitUserId ?? undefined;
}
if (isAdmin(req)) {
return explicitUserId ?? undefined;
}
if (explicitUserId && explicitUserId !== fallbackUserId) {
throw new ForbiddenError('Non-admin callers can only access their own timeline');
}
return fallbackUserId;
}
export async function timelineRoutes(app: FastifyInstance) {
app.post('/timeline/ingest', async req => {
const access = await requireJwtOrApiKey(req, {
allowJwt: true,
apiKeyScopes: ['timeline:write'],
apiKeyTokenTypes: ['product_api', 'service_api'],
});
const parsed = IngestTimelineEventsSchema.safeParse(req.body);
if (!parsed.success) {
throw new BadRequestError(parsed.error.issues.map(issue => issue.message).join('; '));
}
const requestedUserId = parsed.data.userId ?? undefined;
const resolvedUserId = resolveTimelineUserId(
req,
access.source,
requestedUserId,
access.actorId
);
const visibilityOverride = parsed.data.visibility;
const docs: TimelineItemDoc[] = parsed.data.events.map(event => {
if (event.productId !== access.productId) {
throw new BadRequestError(
`Event '${event.eventId}' productId must match authenticated product '${access.productId}'`
);
}
const item = buildTimelineItem(event);
const normalizedItem =
visibilityOverride === undefined ? item : { ...item, visibility: visibilityOverride };
const now = new Date().toISOString();
return {
id: normalizedItem.itemId,
productId: access.productId,
userId: event.userId ?? resolvedUserId ?? null,
orgId: event.orgId ?? null,
eventId: event.eventId,
eventName: event.eventName,
artifactId: event.artifactId ?? null,
correlationId: event.trace.correlationId ?? null,
actorType: event.actor.actorType,
visibility: normalizedItem.visibility,
occurredAt: event.occurredAt,
item: normalizedItem,
createdAt: now,
updatedAt: now,
};
});
const persisted = await repo.upsertTimelineItems(docs);
return {
ingestedCount: persisted.length,
items: persisted.map(doc => doc.item),
eventIds: persisted.map(doc => doc.eventId),
batchId: `timeline_batch_${crypto.randomUUID()}`,
};
});
app.get('/timeline', async req => {
const access = await requireJwtOrApiKey(req, {
allowJwt: true,
apiKeyScopes: ['timeline:read'],
apiKeyTokenTypes: ['product_api', 'service_api'],
});
const parsed = ListTimelineQuerySchema.safeParse(req.query);
if (!parsed.success) {
throw new BadRequestError(parsed.error.issues.map(issue => issue.message).join('; '));
}
const effectiveUserId = resolveTimelineUserId(
req,
access.source,
parsed.data.userId,
access.actorId
);
const docs = await repo.listTimelineItems(access.productId, {
...parsed.data,
userId: effectiveUserId,
});
return {
items: docs.map(doc => doc.item),
count: docs.length,
};
});
}

View File

@ -0,0 +1,46 @@
import { z } from 'zod';
import {
BaseEcosystemEventSchema,
TimelineItemSchema,
TimelineVisibilitySchema,
} from '@bytelyst/events';
export const TimelineItemDocSchema = z.object({
id: z.string().min(1),
productId: z.string().min(1),
userId: z.string().min(1).nullable().optional(),
orgId: z.string().min(1).nullable().optional(),
eventId: z.string().min(1),
eventName: z.string().min(1),
artifactId: z.string().min(1).nullable().optional(),
correlationId: z.string().min(1).nullable().optional(),
actorType: z.enum(['user', 'agent', 'system', 'device']),
visibility: TimelineVisibilitySchema,
occurredAt: z.string().datetime(),
item: TimelineItemSchema,
createdAt: z.string().datetime(),
updatedAt: z.string().datetime(),
});
export type TimelineItemDoc = z.infer<typeof TimelineItemDocSchema> & {
_ts?: number;
_etag?: string;
};
export const IngestTimelineEventsSchema = z.object({
userId: z.string().min(1).nullable().optional(),
visibility: TimelineVisibilitySchema.optional(),
events: z.array(BaseEcosystemEventSchema).min(1),
});
export type IngestTimelineEventsInput = z.infer<typeof IngestTimelineEventsSchema>;
export const ListTimelineQuerySchema = z.object({
userId: z.string().min(1).optional(),
correlationId: z.string().min(1).optional(),
artifactId: z.string().min(1).optional(),
eventName: z.string().min(1).optional(),
limit: z.coerce.number().int().min(1).max(100).default(50),
});
export type ListTimelineQuery = z.infer<typeof ListTimelineQuerySchema>;

View File

@ -46,6 +46,7 @@ vi.mock('./modules/products/cache.js', () => ({ loadProductCache: loadProductCac
vi.mock('./modules/auth/routes.js', () => ({ authRoutes: vi.fn() }));
vi.mock('./modules/audit/routes.js', () => ({ auditRoutes: vi.fn() }));
vi.mock('./modules/notifications/routes.js', () => ({ notificationRoutes: vi.fn() }));
vi.mock('./modules/timeline/routes.js', () => ({ timelineRoutes: vi.fn() }));
vi.mock('./modules/flags/routes.js', () => ({ flagRoutes: vi.fn() }));
vi.mock('./modules/ratelimit/routes.js', () => ({ rateLimitRoutes: vi.fn() }));
vi.mock('./modules/blob/routes.js', () => ({ blobRoutes: vi.fn() }));

View File

@ -76,6 +76,7 @@ import { runRoutes } from './modules/runs/routes.js';
import { statusRoutes } from './modules/status/routes.js';
import { deliveryRoutes } from './modules/delivery/routes.js';
import { sessionRoutes } from './modules/sessions/routes.js';
import { timelineRoutes } from './modules/timeline/routes.js';
import { maintenanceRoutes } from './modules/maintenance/routes.js';
import { exportRoutes } from './modules/exports/routes.js';
import { ipRuleRoutes } from './modules/ip-rules/routes.js';
@ -229,6 +230,8 @@ await app.register(statusRoutes, { prefix: '/api' });
await app.register(deliveryRoutes, { prefix: '/api' });
// Session management
await app.register(sessionRoutes, { prefix: '/api' });
// Cross-product timeline ingest + query
await app.register(timelineRoutes, { prefix: '/api' });
// Maintenance mode
await app.register(maintenanceRoutes, { prefix: '/api' });
// Data exports