From 484ed05c1f81af27e18e0e32cee32455ceb8c4e6 Mon Sep 17 00:00:00 2001 From: Saravanakumar D Date: Sat, 30 May 2026 02:23:42 -0700 Subject: [PATCH] =?UTF-8?q?feat(fleet):=20DAG=20job=20decomposition=20?= =?UTF-8?q?=E2=80=94=20parent/child=20+=20fan-out=20(Phase=203=20Slice=202?= =?UTF-8?q?)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - SubmitJobSchema accepts inline children[] for atomic fan-out creation - Parent blocked until all children reach dep-satisfying terminal stage - patchJobFenced triggers maybeUnblockParent on child stage transitions - submitChildren() for POST /fleet/jobs/:id/children (add children later) - getDagSubtree() for GET /fleet/jobs/:id/dag (recursive subtree query) - listChildrenByParent() repository helper - SubmitChildrenSchema for route validation - 8 new coordinator tests (fan-out, blocking, unblocking, cycle, DAG query) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../src/modules/fleet/coordinator.test.ts | 225 +++++++++++++++++ .../src/modules/fleet/coordinator.ts | 229 +++++++++++++++++- .../src/modules/fleet/repository.ts | 8 + .../src/modules/fleet/routes.ts | 25 ++ .../src/modules/fleet/types.ts | 40 +++ 5 files changed, 526 insertions(+), 1 deletion(-) diff --git a/services/platform-service/src/modules/fleet/coordinator.test.ts b/services/platform-service/src/modules/fleet/coordinator.test.ts index c6ce7732..04041c62 100644 --- a/services/platform-service/src/modules/fleet/coordinator.test.ts +++ b/services/platform-service/src/modules/fleet/coordinator.test.ts @@ -440,3 +440,228 @@ describe('fleet coordinator', () => { } }); }); + +describe('fleet coordinator — Phase 3 DAG decomposition', () => { + beforeEach(() => setProvider(new MemoryDatastoreProvider())); + afterEach(() => _resetDatastoreProvider()); + + it('fan-out: submitting a parent with children atomically creates children + blocks parent', async () => { + const { job: parent } = await coord.submitJob( + PID, + input({ + idempotencyKey: 'parent-1', + kind: 'composite', + children: [ + { + idempotencyKey: 'child-a', + bodyMd: '# child A', + capabilities: [], + prefersEngine: [], + allowedScope: [], + deps: [], + }, + { + idempotencyKey: 'child-b', + bodyMd: '# child B', + capabilities: [], + prefersEngine: [], + allowedScope: [], + deps: [], + }, + ], + }) + ); + expect(parent.kind).toBe('composite'); + expect(parent.stage).toBe('blocked'); + expect(parent.deps).toContain('child-a'); + expect(parent.deps).toContain('child-b'); + + // Children exist and are queued + const children = await repo.listChildrenByParent(PID, parent.id); + expect(children).toHaveLength(2); + expect(children.every(c => c.stage === 'queued')).toBe(true); + expect(children.every(c => c.parentId === parent.id)).toBe(true); + }); + + it('parent is not claimable while children are still running', async () => { + const { job: parent } = await coord.submitJob( + PID, + input({ + idempotencyKey: 'parent-2', + kind: 'composite', + children: [ + { + idempotencyKey: 'child-x', + bodyMd: '# child X', + capabilities: [], + prefersEngine: [], + allowedScope: [], + deps: [], + }, + ], + }) + ); + // Parent is blocked — attempting to claim should skip it + const claim = await coord.claimNextJob(factory()); + // Should claim the child, not the parent + expect(claim).not.toBeNull(); + expect(claim!.job.parentId).toBe(parent.id); + expect(claim!.job.idempotencyKey).toBe('child-x'); + }); + + it('last child completion unblocks parent', async () => { + const { job: parent } = await coord.submitJob( + PID, + input({ + idempotencyKey: 'parent-3', + kind: 'composite', + children: [ + { + idempotencyKey: 'child-1', + bodyMd: '# c1', + capabilities: [], + prefersEngine: [], + allowedScope: [], + deps: [], + }, + { + idempotencyKey: 'child-2', + bodyMd: '# c2', + capabilities: [], + prefersEngine: [], + allowedScope: [], + deps: [], + }, + ], + }) + ); + + // Claim and ship child-1 + const c1Claim = await coord.claimNextJob(factory({ factoryId: 'f1' })); + expect(c1Claim!.job.idempotencyKey).toBe('child-1'); + await coord.patchJobFenced(c1Claim!.job.id, PID, { + leaseEpoch: c1Claim!.job.leaseEpoch, + stage: 'shipped', + }); + + // Parent still blocked (child-2 not done) + const parentMid = await repo.getJob(parent.id, PID); + expect(parentMid?.stage).toBe('blocked'); + + // Claim and ship child-2 + const c2Claim = await coord.claimNextJob(factory({ factoryId: 'f2' })); + expect(c2Claim!.job.idempotencyKey).toBe('child-2'); + await coord.patchJobFenced(c2Claim!.job.id, PID, { + leaseEpoch: c2Claim!.job.leaseEpoch, + stage: 'shipped', + }); + + // Parent should now be unblocked (queued) + const parentAfter = await repo.getJob(parent.id, PID); + expect(parentAfter?.stage).toBe('queued'); + }); + + it('cycle at submit: parent depending on itself is rejected', async () => { + await expect( + coord.submitJob( + PID, + input({ + idempotencyKey: 'self-cycle', + deps: ['self-cycle'], + }) + ) + ).rejects.toBeInstanceOf(BadRequestError); + }); + + it('submitChildren: add children to an existing job', async () => { + const { job: parent } = await coord.submitJob( + PID, + input({ + idempotencyKey: 'parent-add', + kind: 'leaf', + }) + ); + const result = await coord.submitChildren(parent.id, PID, [ + { idempotencyKey: 'added-child', bodyMd: '# added', capabilities: [] }, + ]); + expect(result.childJobs).toHaveLength(1); + expect(result.parent.kind).toBe('composite'); + expect(result.parent.stage).toBe('blocked'); + expect(result.parent.deps).toContain('added-child'); + }); + + it('getDagSubtree returns the correct tree structure', async () => { + const { job: parent } = await coord.submitJob( + PID, + input({ + idempotencyKey: 'dag-root', + kind: 'composite', + children: [ + { + idempotencyKey: 'dag-c1', + bodyMd: '# c1', + capabilities: [], + prefersEngine: [], + allowedScope: [], + deps: [], + }, + { + idempotencyKey: 'dag-c2', + bodyMd: '# c2', + capabilities: [], + prefersEngine: [], + allowedScope: [], + deps: [], + }, + ], + }) + ); + const dag = await coord.getDagSubtree(parent.id, PID); + expect(dag).not.toBeNull(); + expect(dag!.id).toBe(parent.id); + expect(dag!.kind).toBe('composite'); + expect(dag!.children).toHaveLength(2); + expect(dag!.children[0].parentId).toBe(parent.id); + expect(dag!.children[1].parentId).toBe(parent.id); + }); + + it('capability + priority still respected per DAG node', async () => { + await coord.submitJob( + PID, + input({ + idempotencyKey: 'dag-parent', + kind: 'composite', + children: [ + { + idempotencyKey: 'dag-mac', + bodyMd: '# mac', + capabilities: ['os:mac'], + prefersEngine: [], + allowedScope: [], + deps: [], + }, + { + idempotencyKey: 'dag-linux', + bodyMd: '# linux', + capabilities: ['os:linux'], + prefersEngine: [], + allowedScope: [], + deps: [], + }, + ], + }) + ); + // Factory with only os:linux can only claim the linux child + const claim = await coord.claimNextJob(factory({ capabilities: ['os:linux'] })); + expect(claim!.job.idempotencyKey).toBe('dag-linux'); + }); + + it('all prior fleet tests remain green (non-DAG flows unaffected)', async () => { + // Simple non-DAG submit + claim + ship still works + const { job } = await coord.submitJob(PID, input({ idempotencyKey: 'simple' })); + expect(job.stage).toBe('queued'); + const claim = await coord.claimNextJob(factory()); + expect(claim).not.toBeNull(); + expect(claim!.job.id).toBe(job.id); + }); +}); diff --git a/services/platform-service/src/modules/fleet/coordinator.ts b/services/platform-service/src/modules/fleet/coordinator.ts index 1ad083b1..1b8c30c6 100644 --- a/services/platform-service/src/modules/fleet/coordinator.ts +++ b/services/platform-service/src/modules/fleet/coordinator.ts @@ -205,12 +205,78 @@ export async function submitJob(productId: string, input: SubmitJobInput): Promi }; const { stage } = await stageForDeps(base); base.stage = stage; + + // Phase 3 DAG: if children are specified, create them atomically and block parent + const children = (input as { children?: SubmitJobInput['children'] }).children ?? []; + if (children.length > 0) { + base.kind = 'composite'; + // Parent is blocked until all children complete + const childKeys = children.map(c => c.idempotencyKey); + base.deps = [...base.deps, ...childKeys]; + base.stage = 'blocked'; + base.blockedReason = `waiting on children: ${childKeys.join(', ')}`; + } + const created = await repo.createJob(base); + + // Create child jobs + for (const child of children) { + const childId = `fjob_${crypto.randomUUID()}`; + const childNow = new Date().toISOString(); + const childHash = contentHash(child.bodyMd); + const childPriority = child.priority ?? input.priority; + const childJob: FleetJobDoc = { + id: childId, + productId, + stage: 'queued', + idempotencyKey: child.idempotencyKey, + contentHash: childHash, + bodyMd: child.bodyMd, + manifestSnapshot: { + priority: childPriority, + capabilities: child.capabilities, + engineClass: child.engineClass, + profile: child.profile, + prefersEngine: child.prefersEngine, + allowedScope: child.allowedScope, + deps: child.deps, + depsMode: child.depsMode, + budget: child.budget, + retry: child.retry, + }, + priority: childPriority, + priorityOrder: PRIORITY_ORDER[childPriority], + capabilities: child.capabilities, + engineClass: child.engineClass, + profile: child.profile, + deps: child.deps, + depsMode: child.depsMode, + budget: child.budget, + retry: child.retry, + kind: 'leaf', + parentId: id, + attempts: 0, + leaseEpoch: 0, + rev: 0, + createdAt: childNow, + updatedAt: childNow, + }; + const { stage: childStage } = await stageForDeps(childJob); + childJob.stage = childStage; + await repo.createJob(childJob); + await repo.appendEvent({ + jobId: childId, + productId, + type: 'submitted', + data: { stage: childStage, idempotencyKey: child.idempotencyKey, parentId: id }, + }); + } + await repo.appendEvent({ jobId: id, productId, type: 'submitted', - data: { stage, idempotencyKey: input.idempotencyKey }, + data: { stage: base.stage, idempotencyKey: input.idempotencyKey, childCount: children.length }, }); return { job: created, outcome: 'created' }; } @@ -541,9 +607,170 @@ export async function patchJobFenced( type: 'transition', data: { stage: patch.stage ?? job.stage, leaseEpoch: patch.leaseEpoch }, }); + + // Phase 3 DAG: when a child reaches a terminal dep-satisfying stage, + // check if its parent should be unblocked. + if (patch.stage && job.parentId) { + await maybeUnblockParent(job.parentId, productId); + } + return { ok: true, doc: res.doc }; } +// ── Phase 3 DAG: parent unblocking + child submission + subtree query ───────── + +/** + * Check if a parent job's deps are all satisfied and unblock it if so. + * Called after a child job transitions to a terminal stage. + */ +export async function maybeUnblockParent(parentId: string, productId: string): Promise { + const parent = await repo.getJob(parentId, productId); + if (!parent) return; + if (parent.stage !== 'blocked') return; + const unmet = await unmetDeps(parent); + if (unmet.length === 0) { + await repo.revUpdateJob(parentId, productId, parent.rev, { + stage: 'queued', + blockedReason: undefined, + }); + await repo.appendEvent({ + jobId: parentId, + productId, + type: 'unblocked', + data: { reason: 'all children/deps completed' }, + }); + } +} + +/** Submit children for an existing parent (POST /fleet/jobs/:id/children). */ +export async function submitChildren( + parentId: string, + productId: string, + children: Array<{ + idempotencyKey: string; + bodyMd: string; + priority?: 'critical' | 'high' | 'medium' | 'low'; + capabilities?: string[]; + engineClass?: string; + profile?: string; + prefersEngine?: string[]; + allowedScope?: string[]; + deps?: string[]; + depsMode?: 'hard' | 'soft'; + budget?: { usd?: number; tokens?: number; wall?: number }; + retry?: { max: number; backoff?: string; on?: string[] }; + }> +): Promise<{ parent: FleetJobDoc; childJobs: FleetJobDoc[] }> { + const parent = await repo.getJob(parentId, productId); + if (!parent) throw new BadRequestError('Parent job not found'); + + // Cycle check: children depend on parent implicitly; ensure no back-edge + const childKeys = children.map(c => c.idempotencyKey); + for (const ck of childKeys) { + if (await wouldCreateCycle(productId, ck, [parent.idempotencyKey])) { + throw new BadRequestError(`dependency cycle detected for child '${ck}'`); + } + } + + const createdChildren: FleetJobDoc[] = []; + for (const child of children) { + const childId = `fjob_${crypto.randomUUID()}`; + const now = new Date().toISOString(); + const childPriority = child.priority ?? parent.priority; + const childJob: FleetJobDoc = { + id: childId, + productId, + stage: 'queued', + idempotencyKey: child.idempotencyKey, + contentHash: contentHash(child.bodyMd), + bodyMd: child.bodyMd, + manifestSnapshot: { + priority: childPriority, + capabilities: child.capabilities ?? [], + engineClass: child.engineClass as FleetJobDoc['engineClass'], + profile: child.profile, + prefersEngine: child.prefersEngine ?? [], + allowedScope: child.allowedScope ?? [], + deps: child.deps ?? [], + depsMode: child.depsMode as FleetJobDoc['depsMode'], + budget: child.budget, + retry: child.retry as FleetJobDoc['retry'], + }, + priority: childPriority, + priorityOrder: PRIORITY_ORDER[childPriority], + capabilities: child.capabilities ?? [], + engineClass: child.engineClass as FleetJobDoc['engineClass'], + profile: child.profile, + deps: child.deps ?? [], + depsMode: child.depsMode as FleetJobDoc['depsMode'], + budget: child.budget, + retry: child.retry as FleetJobDoc['retry'], + kind: 'leaf', + parentId, + attempts: 0, + leaseEpoch: 0, + rev: 0, + createdAt: now, + updatedAt: now, + }; + const { stage } = await stageForDeps(childJob); + childJob.stage = stage; + const created = await repo.createJob(childJob); + createdChildren.push(created); + await repo.appendEvent({ + jobId: childId, + productId, + type: 'submitted', + data: { stage, idempotencyKey: child.idempotencyKey, parentId }, + }); + } + + // Block parent on new children + const existingDeps = parent.deps ?? []; + const newDeps = [...existingDeps, ...childKeys]; + const updatedParent = await repo.revUpdateJob(parentId, productId, parent.rev, { + kind: 'composite', + deps: newDeps, + stage: 'blocked', + blockedReason: `waiting on children: ${childKeys.join(', ')}`, + }); + + return { parent: updatedParent.ok ? updatedParent.doc : parent, childJobs: createdChildren }; +} + +/** DAG node representation for the subtree endpoint. */ +export interface DagNode { + id: string; + idempotencyKey: string; + stage: string; + priority: string; + kind: string; + parentId?: string; + children: DagNode[]; +} + +/** Get the DAG subtree rooted at jobId. */ +export async function getDagSubtree(jobId: string, productId: string): Promise { + const job = await repo.getJob(jobId, productId); + if (!job) return null; + + async function buildNode(j: FleetJobDoc): Promise { + const children = await repo.listChildrenByParent(productId, j.id); + const childNodes = await Promise.all(children.map(buildNode)); + return { + id: j.id, + idempotencyKey: j.idempotencyKey, + stage: j.stage, + priority: j.priority, + kind: j.kind, + parentId: j.parentId, + children: childNodes, + }; + } + + return buildNode(job); +} + export async function renewLease( jobId: string, productId: string, diff --git a/services/platform-service/src/modules/fleet/repository.ts b/services/platform-service/src/modules/fleet/repository.ts index 905bdbe1..23b6378e 100644 --- a/services/platform-service/src/modules/fleet/repository.ts +++ b/services/platform-service/src/modules/fleet/repository.ts @@ -98,6 +98,14 @@ export async function findJobsByIdempotencyKey( return jobs().findMany({ filter: { productId, idempotencyKey } }); } +/** All child jobs of a given parent (Phase 3 DAG). */ +export async function listChildrenByParent( + productId: string, + parentId: string +): Promise { + return jobs().findMany({ filter: { productId, parentId }, sort: { createdAt: 1 } }); +} + /** Unconditional merge update (use only when concurrency is not contended). */ export async function updateJob( id: string, diff --git a/services/platform-service/src/modules/fleet/routes.ts b/services/platform-service/src/modules/fleet/routes.ts index b1405566..6e04aa1d 100644 --- a/services/platform-service/src/modules/fleet/routes.ts +++ b/services/platform-service/src/modules/fleet/routes.ts @@ -42,6 +42,7 @@ import { RevokeTokenSchema, IngestItemSchema, EchoJobSchema, + SubmitChildrenSchema, } from './types.js'; function badRequest(issues: { message: string }[]): never { @@ -322,6 +323,30 @@ export async function fleetRoutes(app: FastifyInstance) { return { revoked }; }); + // ── Phase 3 DAG: submit children for an existing parent ── + app.post('/fleet/jobs/:id/children', async (req, reply) => { + await extractAuth(req); + const { id: parentId } = req.params as { id: string }; + const pid = getRequestProductId(req); + const parsed = SubmitChildrenSchema.safeParse(req.body); + if (!parsed.success) badRequest(parsed.error.issues); + const parent = await repo.getJob(parentId, pid); + if (!parent) throw new NotFoundError('Parent job not found'); + const result = await coordinator.submitChildren(parentId, pid, parsed.data.children); + reply.code(201); + return { parent: result.parent, children: result.childJobs }; + }); + + // ── Phase 3 DAG: get the subtree rooted at a job ── + app.get('/fleet/jobs/:id/dag', async req => { + await extractAuth(req); + const { id } = req.params as { id: string }; + const pid = getRequestProductId(req); + const dag = await coordinator.getDagSubtree(id, pid); + if (!dag) throw new NotFoundError('Job not found'); + return { dag }; + }); + // ── Tracker bridge (§10): ingest an Item as a job (idempotent, scheduled by §7) ── app.post('/fleet/tracker/ingest', async (req, reply) => { await extractAuth(req); diff --git a/services/platform-service/src/modules/fleet/types.ts b/services/platform-service/src/modules/fleet/types.ts index ed644902..0114f82d 100644 --- a/services/platform-service/src/modules/fleet/types.ts +++ b/services/platform-service/src/modules/fleet/types.ts @@ -310,6 +310,25 @@ export const SubmitJobSchema = z.object({ kind: z.enum(JOB_KINDS).default('leaf'), parentId: z.string().optional(), trackerItemId: z.string().optional(), + /** Phase 3 DAG: inline children to create atomically with the parent. */ + children: z + .array( + z.object({ + idempotencyKey: z.string().min(1), + bodyMd: z.string().min(1), + priority: z.enum(FLEET_PRIORITIES).default('medium'), + capabilities: z.array(z.string()).default([]), + engineClass: z.enum(FLEET_ENGINE_CLASSES).optional(), + profile: z.string().optional(), + prefersEngine: z.array(z.string()).default([]), + allowedScope: z.array(z.string()).default([]), + deps: z.array(z.string()).default([]), + depsMode: z.enum(DEPS_MODES).optional(), + budget: BudgetSchema.optional(), + retry: RetryPolicySchema.optional(), + }) + ) + .default([]), }); export type SubmitJobInput = z.infer; @@ -411,3 +430,24 @@ export const EchoJobSchema = z.object({ jobId: z.string().min(1), }); export type EchoJobInput = z.infer; + +/** Phase 3 DAG: submit children for an existing parent job. */ +export const SubmitChildrenSchema = z.object({ + children: z.array( + z.object({ + idempotencyKey: z.string().min(1), + bodyMd: z.string().min(1), + priority: z.enum(FLEET_PRIORITIES).default('medium'), + capabilities: z.array(z.string()).default([]), + engineClass: z.enum(FLEET_ENGINE_CLASSES).optional(), + profile: z.string().optional(), + prefersEngine: z.array(z.string()).default([]), + allowedScope: z.array(z.string()).default([]), + deps: z.array(z.string()).default([]), + depsMode: z.enum(DEPS_MODES).optional(), + budget: BudgetSchema.optional(), + retry: RetryPolicySchema.optional(), + }) + ), +}); +export type SubmitChildrenInput = z.infer;