diff --git a/docs/ecosystem/ECOSYSTEM_IMPLEMENTATION_TRACKER.md b/docs/ecosystem/ECOSYSTEM_IMPLEMENTATION_TRACKER.md index db55a0f2..3676b052 100644 --- a/docs/ecosystem/ECOSYSTEM_IMPLEMENTATION_TRACKER.md +++ b/docs/ecosystem/ECOSYSTEM_IMPLEMENTATION_TRACKER.md @@ -198,6 +198,13 @@ These should be resolved before claiming the ecosystem docs are fully implementa - [x] verify that Phase 1 to Phase 3 events render in one unified stream Commits: - `3f2482b` +- [x] add first hosted timeline ingest/query API in `platform-service` + Commits: + - `TBD` + Status note: + - platform-service now persists timeline items in `timeline_items` + - `POST /api/timeline/ingest` accepts canonical ecosystem events and stores unified timeline items + - `GET /api/timeline` returns the unified timeline stream with non-admin JWT callers scoped to their own data --- diff --git a/docs/ecosystem/PHASE4_PERSONAL_TIMELINE_EXECUTION_PLAN.md b/docs/ecosystem/PHASE4_PERSONAL_TIMELINE_EXECUTION_PLAN.md index 989e0079..f1f1ede2 100644 --- a/docs/ecosystem/PHASE4_PERSONAL_TIMELINE_EXECUTION_PLAN.md +++ b/docs/ecosystem/PHASE4_PERSONAL_TIMELINE_EXECUTION_PLAN.md @@ -1,7 +1,7 @@ # Phase 4 Execution Plan > **Flow:** Shared personal timeline over the first three ecosystem flows -> **Status:** Implemented baseline +> **Status:** Service baseline implemented > **Owner:** `learning_ai_common_plat` > **Purpose:** Turn the personal timeline PRD into a reusable contract and baseline aggregator so the first three ecosystem flows can be rendered as one coherent activity stream. @@ -14,8 +14,9 @@ Phase 4 defines and implements: 1. the canonical `TimelineItem` shape 2. baseline event-to-timeline mapping rules 3. a pure aggregator that converts canonical ecosystem events into timeline items +4. the first hosted platform-service API for timeline ingest and timeline query -This phase does not add a hosted UI yet. It establishes the shared data contract first. +This phase still does not add a hosted UI. It now establishes both the shared data contract and the first persisted service surface. --- @@ -24,6 +25,10 @@ This phase does not add a hosted UI yet. It establishes the shared data contract - `packages/events/src/timeline.ts` - `packages/events/src/timeline.test.ts` - `packages/events/src/index.ts` +- `services/platform-service/src/modules/timeline/types.ts` +- `services/platform-service/src/modules/timeline/repository.ts` +- `services/platform-service/src/modules/timeline/routes.ts` +- `services/platform-service/src/modules/timeline/routes.test.ts` --- @@ -47,6 +52,9 @@ This is enough to render: ## 4. Verification - `cd learning_ai_common_plat/packages/events && pnpm exec vitest run src/timeline.test.ts` +- `cd learning_ai_common_plat && pnpm --filter @bytelyst/events build` +- `cd learning_ai_common_plat && pnpm --filter @lysnrai/platform-service exec vitest run src/modules/timeline/routes.test.ts src/server.test.ts` +- `cd learning_ai_common_plat && pnpm --filter @lysnrai/platform-service exec tsc --noEmit` Observed baseline: @@ -58,6 +66,9 @@ Observed baseline: - related event refs - actor type - correlation ID +- platform-service now exposes `POST /api/timeline/ingest` for canonical ecosystem-event ingestion +- platform-service now exposes `GET /api/timeline` for unified timeline query +- non-admin JWT callers are scoped to their own timeline while admin and API-key callers can query broader product timelines --- @@ -67,6 +78,7 @@ Observed baseline: - [x] define baseline event-to-timeline mapping rules - [x] implement pure timeline aggregation over canonical events - [x] verify that Phase 1 to Phase 3 events render in one unified stream +- [x] add first hosted timeline ingest/query API in platform-service - [ ] choose first hosted timeline UI --- @@ -74,3 +86,4 @@ Observed baseline: ## 6. Commits - `3f2482b` timeline contract and aggregator baseline +- `TBD` platform-service timeline ingest + query baseline diff --git a/services/platform-service/src/lib/cosmos-init.ts b/services/platform-service/src/lib/cosmos-init.ts index 486ac003..84d2a98a 100644 --- a/services/platform-service/src/lib/cosmos-init.ts +++ b/services/platform-service/src/lib/cosmos-init.ts @@ -66,6 +66,8 @@ const CONTAINER_DEFS: Record = { // Generic orchestration runs agent_runs: { partitionKeyPath: '/productId', defaultTtl: 30 * 86400 }, agent_run_steps: { partitionKeyPath: '/pk', defaultTtl: 30 * 86400 }, + // Cross-product personal timeline + timeline_items: { partitionKeyPath: '/productId', defaultTtl: 90 * 86400 }, // Canonical tenant model organizations: { partitionKeyPath: '/productId' }, workspaces: { partitionKeyPath: '/orgId' }, diff --git a/services/platform-service/src/modules/timeline/repository.ts b/services/platform-service/src/modules/timeline/repository.ts new file mode 100644 index 00000000..5053dc67 --- /dev/null +++ b/services/platform-service/src/modules/timeline/repository.ts @@ -0,0 +1,33 @@ +import type { FilterMap } from '@bytelyst/datastore'; +import { getCollection } from '../../lib/datastore.js'; +import type { ListTimelineQuery, TimelineItemDoc } from './types.js'; + +function collection() { + return getCollection('timeline_items', '/productId'); +} + +export async function upsertTimelineItems(items: TimelineItemDoc[]): Promise { + const results: TimelineItemDoc[] = []; + for (const item of items) { + results.push(await collection().upsert(item)); + } + return results; +} + +export async function listTimelineItems( + productId: string, + query: ListTimelineQuery +): Promise { + const filter: FilterMap = { productId }; + + if (query.userId) filter.userId = query.userId; + if (query.correlationId) filter.correlationId = query.correlationId; + if (query.artifactId) filter.artifactId = query.artifactId; + if (query.eventName) filter.eventName = query.eventName; + + return collection().findMany({ + filter, + sort: { occurredAt: -1 }, + limit: query.limit, + }); +} diff --git a/services/platform-service/src/modules/timeline/routes.test.ts b/services/platform-service/src/modules/timeline/routes.test.ts new file mode 100644 index 00000000..0765c648 --- /dev/null +++ b/services/platform-service/src/modules/timeline/routes.test.ts @@ -0,0 +1,188 @@ +import Fastify from 'fastify'; +import { beforeEach, describe, expect, it, vi } from 'vitest'; + +const repoMock = { + upsertTimelineItems: vi.fn(), + listTimelineItems: vi.fn(), +}; + +vi.mock('./repository.js', () => repoMock); +vi.mock('../../lib/api-key-auth.js', () => ({ + requireJwtOrApiKey: vi.fn(async req => { + const roleHeader = req.headers['x-test-role']; + const userHeader = req.headers['x-test-user']; + const productHeader = req.headers['x-test-product']; + req.jwtPayload = + typeof userHeader === 'string' + ? { + sub: userHeader, + role: typeof roleHeader === 'string' ? roleHeader : undefined, + productId: typeof productHeader === 'string' ? productHeader : 'lysnrai', + } + : undefined; + + return { + actorId: typeof userHeader === 'string' ? userHeader : 'svc_timeline', + productId: typeof productHeader === 'string' ? productHeader : 'lysnrai', + source: typeof userHeader === 'string' ? 'jwt' : 'api_key', + }; + }), +})); + +async function buildApp() { + const { timelineRoutes } = await import('./routes.js'); + const app = Fastify({ logger: false }); + await app.register(timelineRoutes, { prefix: '/api' }); + return app; +} + +describe('timelineRoutes', () => { + beforeEach(() => { + vi.clearAllMocks(); + }); + + it('POST /timeline/ingest persists canonical events as timeline items', async () => { + repoMock.upsertTimelineItems.mockImplementation(async docs => docs); + const app = await buildApp(); + + const res = await app.inject({ + method: 'POST', + url: '/api/timeline/ingest', + headers: { + 'x-test-user': 'user_1', + 'x-test-product': 'lysnrai', + }, + payload: { + events: [ + { + eventId: 'evt_1', + eventName: 'capture.transcript.created', + eventVersion: 1, + occurredAt: '2026-04-04T10:00:00.000Z', + productId: 'lysnrai', + sourceSurface: 'voice', + actor: { actorType: 'user', actorId: 'user_1' }, + trace: { correlationId: 'corr_1', causationId: null, parentEventId: null }, + payload: { + artifactId: 'art_1', + durationMs: 1200, + language: 'en', + transcriptSource: 'microphone', + }, + }, + ], + }, + }); + + expect(res.statusCode).toBe(200); + const body = JSON.parse(res.body); + expect(body.ingestedCount).toBe(1); + expect(body.items[0].title).toBe('Transcript captured'); + expect(repoMock.upsertTimelineItems).toHaveBeenCalledOnce(); + expect(repoMock.upsertTimelineItems.mock.calls[0][0][0]).toMatchObject({ + productId: 'lysnrai', + userId: 'user_1', + eventId: 'evt_1', + visibility: 'private', + }); + }); + + it('POST /timeline/ingest rejects cross-product events', async () => { + const app = await buildApp(); + + const res = await app.inject({ + method: 'POST', + url: '/api/timeline/ingest', + headers: { + 'x-test-user': 'user_1', + 'x-test-product': 'lysnrai', + }, + payload: { + events: [ + { + eventId: 'evt_2', + eventName: 'artifact.created', + eventVersion: 1, + occurredAt: '2026-04-04T10:00:00.000Z', + productId: 'notelett', + sourceSurface: 'notes', + actor: { actorType: 'user', actorId: 'user_1' }, + trace: { correlationId: null, causationId: null, parentEventId: null }, + payload: { + artifactType: 'note', + title: 'Quick note', + status: 'created', + }, + }, + ], + }, + }); + + expect(res.statusCode).toBe(400); + }); + + it('GET /timeline scopes non-admin JWT callers to their own timeline', async () => { + repoMock.listTimelineItems.mockResolvedValue([ + { + id: 'timeline_evt_1', + eventId: 'evt_1', + eventName: 'artifact.created', + productId: 'lysnrai', + userId: 'user_1', + orgId: null, + artifactId: 'art_1', + correlationId: 'corr_1', + actorType: 'user', + visibility: 'private', + occurredAt: '2026-04-04T10:00:00.000Z', + item: { + itemId: 'timeline_evt_1', + occurredAt: '2026-04-04T10:00:00.000Z', + eventName: 'artifact.created', + productId: 'lysnrai', + title: 'Note created', + summary: 'note status: created', + artifactRefs: ['art_1'], + relatedEventIds: [], + actorType: 'user', + visibility: 'private', + correlationId: 'corr_1', + }, + createdAt: '2026-04-04T10:00:00.000Z', + updatedAt: '2026-04-04T10:00:00.000Z', + }, + ]); + const app = await buildApp(); + + const res = await app.inject({ + method: 'GET', + url: '/api/timeline?limit=10', + headers: { + 'x-test-user': 'user_1', + 'x-test-product': 'lysnrai', + }, + }); + + expect(res.statusCode).toBe(200); + expect(repoMock.listTimelineItems).toHaveBeenCalledWith( + 'lysnrai', + expect.objectContaining({ userId: 'user_1', limit: 10 }) + ); + expect(JSON.parse(res.body).count).toBe(1); + }); + + it('GET /timeline forbids non-admin JWT callers from querying another user', async () => { + const app = await buildApp(); + + const res = await app.inject({ + method: 'GET', + url: '/api/timeline?userId=user_2', + headers: { + 'x-test-user': 'user_1', + 'x-test-product': 'lysnrai', + }, + }); + + expect(res.statusCode).toBe(403); + }); +}); diff --git a/services/platform-service/src/modules/timeline/routes.ts b/services/platform-service/src/modules/timeline/routes.ts new file mode 100644 index 00000000..38deeabb --- /dev/null +++ b/services/platform-service/src/modules/timeline/routes.ts @@ -0,0 +1,129 @@ +import crypto from 'node:crypto'; +import type { FastifyInstance, FastifyRequest } from 'fastify'; +import { buildTimelineItem } from '@bytelyst/events'; +import { requireJwtOrApiKey } from '../../lib/api-key-auth.js'; +import { BadRequestError, ForbiddenError } from '../../lib/errors.js'; +import * as repo from './repository.js'; +import { + IngestTimelineEventsSchema, + ListTimelineQuerySchema, + type TimelineItemDoc, +} from './types.js'; + +function isAdmin(req: FastifyRequest): boolean { + const role = req.jwtPayload?.role; + return role === 'admin' || role === 'super_admin'; +} + +function resolveTimelineUserId( + req: FastifyRequest, + source: 'jwt' | 'api_key', + explicitUserId: string | null | undefined, + fallbackUserId: string +): string | undefined { + if (source === 'api_key') { + return explicitUserId ?? undefined; + } + + if (isAdmin(req)) { + return explicitUserId ?? undefined; + } + + if (explicitUserId && explicitUserId !== fallbackUserId) { + throw new ForbiddenError('Non-admin callers can only access their own timeline'); + } + + return fallbackUserId; +} + +export async function timelineRoutes(app: FastifyInstance) { + app.post('/timeline/ingest', async req => { + const access = await requireJwtOrApiKey(req, { + allowJwt: true, + apiKeyScopes: ['timeline:write'], + apiKeyTokenTypes: ['product_api', 'service_api'], + }); + + const parsed = IngestTimelineEventsSchema.safeParse(req.body); + if (!parsed.success) { + throw new BadRequestError(parsed.error.issues.map(issue => issue.message).join('; ')); + } + + const requestedUserId = parsed.data.userId ?? undefined; + const resolvedUserId = resolveTimelineUserId( + req, + access.source, + requestedUserId, + access.actorId + ); + const visibilityOverride = parsed.data.visibility; + + const docs: TimelineItemDoc[] = parsed.data.events.map(event => { + if (event.productId !== access.productId) { + throw new BadRequestError( + `Event '${event.eventId}' productId must match authenticated product '${access.productId}'` + ); + } + + const item = buildTimelineItem(event); + const normalizedItem = + visibilityOverride === undefined ? item : { ...item, visibility: visibilityOverride }; + const now = new Date().toISOString(); + + return { + id: normalizedItem.itemId, + productId: access.productId, + userId: event.userId ?? resolvedUserId ?? null, + orgId: event.orgId ?? null, + eventId: event.eventId, + eventName: event.eventName, + artifactId: event.artifactId ?? null, + correlationId: event.trace.correlationId ?? null, + actorType: event.actor.actorType, + visibility: normalizedItem.visibility, + occurredAt: event.occurredAt, + item: normalizedItem, + createdAt: now, + updatedAt: now, + }; + }); + + const persisted = await repo.upsertTimelineItems(docs); + + return { + ingestedCount: persisted.length, + items: persisted.map(doc => doc.item), + eventIds: persisted.map(doc => doc.eventId), + batchId: `timeline_batch_${crypto.randomUUID()}`, + }; + }); + + app.get('/timeline', async req => { + const access = await requireJwtOrApiKey(req, { + allowJwt: true, + apiKeyScopes: ['timeline:read'], + apiKeyTokenTypes: ['product_api', 'service_api'], + }); + + const parsed = ListTimelineQuerySchema.safeParse(req.query); + if (!parsed.success) { + throw new BadRequestError(parsed.error.issues.map(issue => issue.message).join('; ')); + } + + const effectiveUserId = resolveTimelineUserId( + req, + access.source, + parsed.data.userId, + access.actorId + ); + const docs = await repo.listTimelineItems(access.productId, { + ...parsed.data, + userId: effectiveUserId, + }); + + return { + items: docs.map(doc => doc.item), + count: docs.length, + }; + }); +} diff --git a/services/platform-service/src/modules/timeline/types.ts b/services/platform-service/src/modules/timeline/types.ts new file mode 100644 index 00000000..36689a74 --- /dev/null +++ b/services/platform-service/src/modules/timeline/types.ts @@ -0,0 +1,46 @@ +import { z } from 'zod'; +import { + BaseEcosystemEventSchema, + TimelineItemSchema, + TimelineVisibilitySchema, +} from '@bytelyst/events'; + +export const TimelineItemDocSchema = z.object({ + id: z.string().min(1), + productId: z.string().min(1), + userId: z.string().min(1).nullable().optional(), + orgId: z.string().min(1).nullable().optional(), + eventId: z.string().min(1), + eventName: z.string().min(1), + artifactId: z.string().min(1).nullable().optional(), + correlationId: z.string().min(1).nullable().optional(), + actorType: z.enum(['user', 'agent', 'system', 'device']), + visibility: TimelineVisibilitySchema, + occurredAt: z.string().datetime(), + item: TimelineItemSchema, + createdAt: z.string().datetime(), + updatedAt: z.string().datetime(), +}); + +export type TimelineItemDoc = z.infer & { + _ts?: number; + _etag?: string; +}; + +export const IngestTimelineEventsSchema = z.object({ + userId: z.string().min(1).nullable().optional(), + visibility: TimelineVisibilitySchema.optional(), + events: z.array(BaseEcosystemEventSchema).min(1), +}); + +export type IngestTimelineEventsInput = z.infer; + +export const ListTimelineQuerySchema = z.object({ + userId: z.string().min(1).optional(), + correlationId: z.string().min(1).optional(), + artifactId: z.string().min(1).optional(), + eventName: z.string().min(1).optional(), + limit: z.coerce.number().int().min(1).max(100).default(50), +}); + +export type ListTimelineQuery = z.infer; diff --git a/services/platform-service/src/server.test.ts b/services/platform-service/src/server.test.ts index e5647465..5ef8aa06 100644 --- a/services/platform-service/src/server.test.ts +++ b/services/platform-service/src/server.test.ts @@ -46,6 +46,7 @@ vi.mock('./modules/products/cache.js', () => ({ loadProductCache: loadProductCac vi.mock('./modules/auth/routes.js', () => ({ authRoutes: vi.fn() })); vi.mock('./modules/audit/routes.js', () => ({ auditRoutes: vi.fn() })); vi.mock('./modules/notifications/routes.js', () => ({ notificationRoutes: vi.fn() })); +vi.mock('./modules/timeline/routes.js', () => ({ timelineRoutes: vi.fn() })); vi.mock('./modules/flags/routes.js', () => ({ flagRoutes: vi.fn() })); vi.mock('./modules/ratelimit/routes.js', () => ({ rateLimitRoutes: vi.fn() })); vi.mock('./modules/blob/routes.js', () => ({ blobRoutes: vi.fn() })); diff --git a/services/platform-service/src/server.ts b/services/platform-service/src/server.ts index 964ebb3c..210acbea 100644 --- a/services/platform-service/src/server.ts +++ b/services/platform-service/src/server.ts @@ -76,6 +76,7 @@ import { runRoutes } from './modules/runs/routes.js'; import { statusRoutes } from './modules/status/routes.js'; import { deliveryRoutes } from './modules/delivery/routes.js'; import { sessionRoutes } from './modules/sessions/routes.js'; +import { timelineRoutes } from './modules/timeline/routes.js'; import { maintenanceRoutes } from './modules/maintenance/routes.js'; import { exportRoutes } from './modules/exports/routes.js'; import { ipRuleRoutes } from './modules/ip-rules/routes.js'; @@ -229,6 +230,8 @@ await app.register(statusRoutes, { prefix: '/api' }); await app.register(deliveryRoutes, { prefix: '/api' }); // Session management await app.register(sessionRoutes, { prefix: '/api' }); +// Cross-product timeline ingest + query +await app.register(timelineRoutes, { prefix: '/api' }); // Maintenance mode await app.register(maintenanceRoutes, { prefix: '/api' }); // Data exports