feat(fleet): DAG job decomposition — parent/child + fan-out (Phase 3 Slice 2)

- SubmitJobSchema accepts inline children[] for atomic fan-out creation
- Parent blocked until all children reach dep-satisfying terminal stage
- patchJobFenced triggers maybeUnblockParent on child stage transitions
- submitChildren() for POST /fleet/jobs/:id/children (add children later)
- getDagSubtree() for GET /fleet/jobs/:id/dag (recursive subtree query)
- listChildrenByParent() repository helper
- SubmitChildrenSchema for route validation
- 8 new coordinator tests (fan-out, blocking, unblocking, cycle, DAG query)

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
Saravanakumar D 2026-05-30 02:23:42 -07:00
parent 4468a69526
commit 484ed05c1f
5 changed files with 526 additions and 1 deletions

View File

@ -440,3 +440,228 @@ describe('fleet coordinator', () => {
} }
}); });
}); });
describe('fleet coordinator — Phase 3 DAG decomposition', () => {
beforeEach(() => setProvider(new MemoryDatastoreProvider()));
afterEach(() => _resetDatastoreProvider());
it('fan-out: submitting a parent with children atomically creates children + blocks parent', async () => {
const { job: parent } = await coord.submitJob(
PID,
input({
idempotencyKey: 'parent-1',
kind: 'composite',
children: [
{
idempotencyKey: 'child-a',
bodyMd: '# child A',
capabilities: [],
prefersEngine: [],
allowedScope: [],
deps: [],
},
{
idempotencyKey: 'child-b',
bodyMd: '# child B',
capabilities: [],
prefersEngine: [],
allowedScope: [],
deps: [],
},
],
})
);
expect(parent.kind).toBe('composite');
expect(parent.stage).toBe('blocked');
expect(parent.deps).toContain('child-a');
expect(parent.deps).toContain('child-b');
// Children exist and are queued
const children = await repo.listChildrenByParent(PID, parent.id);
expect(children).toHaveLength(2);
expect(children.every(c => c.stage === 'queued')).toBe(true);
expect(children.every(c => c.parentId === parent.id)).toBe(true);
});
it('parent is not claimable while children are still running', async () => {
const { job: parent } = await coord.submitJob(
PID,
input({
idempotencyKey: 'parent-2',
kind: 'composite',
children: [
{
idempotencyKey: 'child-x',
bodyMd: '# child X',
capabilities: [],
prefersEngine: [],
allowedScope: [],
deps: [],
},
],
})
);
// Parent is blocked — attempting to claim should skip it
const claim = await coord.claimNextJob(factory());
// Should claim the child, not the parent
expect(claim).not.toBeNull();
expect(claim!.job.parentId).toBe(parent.id);
expect(claim!.job.idempotencyKey).toBe('child-x');
});
it('last child completion unblocks parent', async () => {
const { job: parent } = await coord.submitJob(
PID,
input({
idempotencyKey: 'parent-3',
kind: 'composite',
children: [
{
idempotencyKey: 'child-1',
bodyMd: '# c1',
capabilities: [],
prefersEngine: [],
allowedScope: [],
deps: [],
},
{
idempotencyKey: 'child-2',
bodyMd: '# c2',
capabilities: [],
prefersEngine: [],
allowedScope: [],
deps: [],
},
],
})
);
// Claim and ship child-1
const c1Claim = await coord.claimNextJob(factory({ factoryId: 'f1' }));
expect(c1Claim!.job.idempotencyKey).toBe('child-1');
await coord.patchJobFenced(c1Claim!.job.id, PID, {
leaseEpoch: c1Claim!.job.leaseEpoch,
stage: 'shipped',
});
// Parent still blocked (child-2 not done)
const parentMid = await repo.getJob(parent.id, PID);
expect(parentMid?.stage).toBe('blocked');
// Claim and ship child-2
const c2Claim = await coord.claimNextJob(factory({ factoryId: 'f2' }));
expect(c2Claim!.job.idempotencyKey).toBe('child-2');
await coord.patchJobFenced(c2Claim!.job.id, PID, {
leaseEpoch: c2Claim!.job.leaseEpoch,
stage: 'shipped',
});
// Parent should now be unblocked (queued)
const parentAfter = await repo.getJob(parent.id, PID);
expect(parentAfter?.stage).toBe('queued');
});
it('cycle at submit: parent depending on itself is rejected', async () => {
await expect(
coord.submitJob(
PID,
input({
idempotencyKey: 'self-cycle',
deps: ['self-cycle'],
})
)
).rejects.toBeInstanceOf(BadRequestError);
});
it('submitChildren: add children to an existing job', async () => {
const { job: parent } = await coord.submitJob(
PID,
input({
idempotencyKey: 'parent-add',
kind: 'leaf',
})
);
const result = await coord.submitChildren(parent.id, PID, [
{ idempotencyKey: 'added-child', bodyMd: '# added', capabilities: [] },
]);
expect(result.childJobs).toHaveLength(1);
expect(result.parent.kind).toBe('composite');
expect(result.parent.stage).toBe('blocked');
expect(result.parent.deps).toContain('added-child');
});
it('getDagSubtree returns the correct tree structure', async () => {
const { job: parent } = await coord.submitJob(
PID,
input({
idempotencyKey: 'dag-root',
kind: 'composite',
children: [
{
idempotencyKey: 'dag-c1',
bodyMd: '# c1',
capabilities: [],
prefersEngine: [],
allowedScope: [],
deps: [],
},
{
idempotencyKey: 'dag-c2',
bodyMd: '# c2',
capabilities: [],
prefersEngine: [],
allowedScope: [],
deps: [],
},
],
})
);
const dag = await coord.getDagSubtree(parent.id, PID);
expect(dag).not.toBeNull();
expect(dag!.id).toBe(parent.id);
expect(dag!.kind).toBe('composite');
expect(dag!.children).toHaveLength(2);
expect(dag!.children[0].parentId).toBe(parent.id);
expect(dag!.children[1].parentId).toBe(parent.id);
});
it('capability + priority still respected per DAG node', async () => {
await coord.submitJob(
PID,
input({
idempotencyKey: 'dag-parent',
kind: 'composite',
children: [
{
idempotencyKey: 'dag-mac',
bodyMd: '# mac',
capabilities: ['os:mac'],
prefersEngine: [],
allowedScope: [],
deps: [],
},
{
idempotencyKey: 'dag-linux',
bodyMd: '# linux',
capabilities: ['os:linux'],
prefersEngine: [],
allowedScope: [],
deps: [],
},
],
})
);
// Factory with only os:linux can only claim the linux child
const claim = await coord.claimNextJob(factory({ capabilities: ['os:linux'] }));
expect(claim!.job.idempotencyKey).toBe('dag-linux');
});
it('all prior fleet tests remain green (non-DAG flows unaffected)', async () => {
// Simple non-DAG submit + claim + ship still works
const { job } = await coord.submitJob(PID, input({ idempotencyKey: 'simple' }));
expect(job.stage).toBe('queued');
const claim = await coord.claimNextJob(factory());
expect(claim).not.toBeNull();
expect(claim!.job.id).toBe(job.id);
});
});

View File

@ -205,12 +205,78 @@ export async function submitJob(productId: string, input: SubmitJobInput): Promi
}; };
const { stage } = await stageForDeps(base); const { stage } = await stageForDeps(base);
base.stage = stage; base.stage = stage;
// Phase 3 DAG: if children are specified, create them atomically and block parent
const children = (input as { children?: SubmitJobInput['children'] }).children ?? [];
if (children.length > 0) {
base.kind = 'composite';
// Parent is blocked until all children complete
const childKeys = children.map(c => c.idempotencyKey);
base.deps = [...base.deps, ...childKeys];
base.stage = 'blocked';
base.blockedReason = `waiting on children: ${childKeys.join(', ')}`;
}
const created = await repo.createJob(base); const created = await repo.createJob(base);
// Create child jobs
for (const child of children) {
const childId = `fjob_${crypto.randomUUID()}`;
const childNow = new Date().toISOString();
const childHash = contentHash(child.bodyMd);
const childPriority = child.priority ?? input.priority;
const childJob: FleetJobDoc = {
id: childId,
productId,
stage: 'queued',
idempotencyKey: child.idempotencyKey,
contentHash: childHash,
bodyMd: child.bodyMd,
manifestSnapshot: {
priority: childPriority,
capabilities: child.capabilities,
engineClass: child.engineClass,
profile: child.profile,
prefersEngine: child.prefersEngine,
allowedScope: child.allowedScope,
deps: child.deps,
depsMode: child.depsMode,
budget: child.budget,
retry: child.retry,
},
priority: childPriority,
priorityOrder: PRIORITY_ORDER[childPriority],
capabilities: child.capabilities,
engineClass: child.engineClass,
profile: child.profile,
deps: child.deps,
depsMode: child.depsMode,
budget: child.budget,
retry: child.retry,
kind: 'leaf',
parentId: id,
attempts: 0,
leaseEpoch: 0,
rev: 0,
createdAt: childNow,
updatedAt: childNow,
};
const { stage: childStage } = await stageForDeps(childJob);
childJob.stage = childStage;
await repo.createJob(childJob);
await repo.appendEvent({
jobId: childId,
productId,
type: 'submitted',
data: { stage: childStage, idempotencyKey: child.idempotencyKey, parentId: id },
});
}
await repo.appendEvent({ await repo.appendEvent({
jobId: id, jobId: id,
productId, productId,
type: 'submitted', type: 'submitted',
data: { stage, idempotencyKey: input.idempotencyKey }, data: { stage: base.stage, idempotencyKey: input.idempotencyKey, childCount: children.length },
}); });
return { job: created, outcome: 'created' }; return { job: created, outcome: 'created' };
} }
@ -541,9 +607,170 @@ export async function patchJobFenced(
type: 'transition', type: 'transition',
data: { stage: patch.stage ?? job.stage, leaseEpoch: patch.leaseEpoch }, data: { stage: patch.stage ?? job.stage, leaseEpoch: patch.leaseEpoch },
}); });
// Phase 3 DAG: when a child reaches a terminal dep-satisfying stage,
// check if its parent should be unblocked.
if (patch.stage && job.parentId) {
await maybeUnblockParent(job.parentId, productId);
}
return { ok: true, doc: res.doc }; return { ok: true, doc: res.doc };
} }
// ── Phase 3 DAG: parent unblocking + child submission + subtree query ─────────
/**
* Check if a parent job's deps are all satisfied and unblock it if so.
* Called after a child job transitions to a terminal stage.
*/
export async function maybeUnblockParent(parentId: string, productId: string): Promise<void> {
const parent = await repo.getJob(parentId, productId);
if (!parent) return;
if (parent.stage !== 'blocked') return;
const unmet = await unmetDeps(parent);
if (unmet.length === 0) {
await repo.revUpdateJob(parentId, productId, parent.rev, {
stage: 'queued',
blockedReason: undefined,
});
await repo.appendEvent({
jobId: parentId,
productId,
type: 'unblocked',
data: { reason: 'all children/deps completed' },
});
}
}
/** Submit children for an existing parent (POST /fleet/jobs/:id/children). */
export async function submitChildren(
parentId: string,
productId: string,
children: Array<{
idempotencyKey: string;
bodyMd: string;
priority?: 'critical' | 'high' | 'medium' | 'low';
capabilities?: string[];
engineClass?: string;
profile?: string;
prefersEngine?: string[];
allowedScope?: string[];
deps?: string[];
depsMode?: 'hard' | 'soft';
budget?: { usd?: number; tokens?: number; wall?: number };
retry?: { max: number; backoff?: string; on?: string[] };
}>
): Promise<{ parent: FleetJobDoc; childJobs: FleetJobDoc[] }> {
const parent = await repo.getJob(parentId, productId);
if (!parent) throw new BadRequestError('Parent job not found');
// Cycle check: children depend on parent implicitly; ensure no back-edge
const childKeys = children.map(c => c.idempotencyKey);
for (const ck of childKeys) {
if (await wouldCreateCycle(productId, ck, [parent.idempotencyKey])) {
throw new BadRequestError(`dependency cycle detected for child '${ck}'`);
}
}
const createdChildren: FleetJobDoc[] = [];
for (const child of children) {
const childId = `fjob_${crypto.randomUUID()}`;
const now = new Date().toISOString();
const childPriority = child.priority ?? parent.priority;
const childJob: FleetJobDoc = {
id: childId,
productId,
stage: 'queued',
idempotencyKey: child.idempotencyKey,
contentHash: contentHash(child.bodyMd),
bodyMd: child.bodyMd,
manifestSnapshot: {
priority: childPriority,
capabilities: child.capabilities ?? [],
engineClass: child.engineClass as FleetJobDoc['engineClass'],
profile: child.profile,
prefersEngine: child.prefersEngine ?? [],
allowedScope: child.allowedScope ?? [],
deps: child.deps ?? [],
depsMode: child.depsMode as FleetJobDoc['depsMode'],
budget: child.budget,
retry: child.retry as FleetJobDoc['retry'],
},
priority: childPriority,
priorityOrder: PRIORITY_ORDER[childPriority],
capabilities: child.capabilities ?? [],
engineClass: child.engineClass as FleetJobDoc['engineClass'],
profile: child.profile,
deps: child.deps ?? [],
depsMode: child.depsMode as FleetJobDoc['depsMode'],
budget: child.budget,
retry: child.retry as FleetJobDoc['retry'],
kind: 'leaf',
parentId,
attempts: 0,
leaseEpoch: 0,
rev: 0,
createdAt: now,
updatedAt: now,
};
const { stage } = await stageForDeps(childJob);
childJob.stage = stage;
const created = await repo.createJob(childJob);
createdChildren.push(created);
await repo.appendEvent({
jobId: childId,
productId,
type: 'submitted',
data: { stage, idempotencyKey: child.idempotencyKey, parentId },
});
}
// Block parent on new children
const existingDeps = parent.deps ?? [];
const newDeps = [...existingDeps, ...childKeys];
const updatedParent = await repo.revUpdateJob(parentId, productId, parent.rev, {
kind: 'composite',
deps: newDeps,
stage: 'blocked',
blockedReason: `waiting on children: ${childKeys.join(', ')}`,
});
return { parent: updatedParent.ok ? updatedParent.doc : parent, childJobs: createdChildren };
}
/** DAG node representation for the subtree endpoint. */
export interface DagNode {
id: string;
idempotencyKey: string;
stage: string;
priority: string;
kind: string;
parentId?: string;
children: DagNode[];
}
/** Get the DAG subtree rooted at jobId. */
export async function getDagSubtree(jobId: string, productId: string): Promise<DagNode | null> {
const job = await repo.getJob(jobId, productId);
if (!job) return null;
async function buildNode(j: FleetJobDoc): Promise<DagNode> {
const children = await repo.listChildrenByParent(productId, j.id);
const childNodes = await Promise.all(children.map(buildNode));
return {
id: j.id,
idempotencyKey: j.idempotencyKey,
stage: j.stage,
priority: j.priority,
kind: j.kind,
parentId: j.parentId,
children: childNodes,
};
}
return buildNode(job);
}
export async function renewLease( export async function renewLease(
jobId: string, jobId: string,
productId: string, productId: string,

View File

@ -98,6 +98,14 @@ export async function findJobsByIdempotencyKey(
return jobs().findMany({ filter: { productId, idempotencyKey } }); return jobs().findMany({ filter: { productId, idempotencyKey } });
} }
/** All child jobs of a given parent (Phase 3 DAG). */
export async function listChildrenByParent(
productId: string,
parentId: string
): Promise<FleetJobDoc[]> {
return jobs().findMany({ filter: { productId, parentId }, sort: { createdAt: 1 } });
}
/** Unconditional merge update (use only when concurrency is not contended). */ /** Unconditional merge update (use only when concurrency is not contended). */
export async function updateJob( export async function updateJob(
id: string, id: string,

View File

@ -42,6 +42,7 @@ import {
RevokeTokenSchema, RevokeTokenSchema,
IngestItemSchema, IngestItemSchema,
EchoJobSchema, EchoJobSchema,
SubmitChildrenSchema,
} from './types.js'; } from './types.js';
function badRequest(issues: { message: string }[]): never { function badRequest(issues: { message: string }[]): never {
@ -322,6 +323,30 @@ export async function fleetRoutes(app: FastifyInstance) {
return { revoked }; return { revoked };
}); });
// ── Phase 3 DAG: submit children for an existing parent ──
app.post('/fleet/jobs/:id/children', async (req, reply) => {
await extractAuth(req);
const { id: parentId } = req.params as { id: string };
const pid = getRequestProductId(req);
const parsed = SubmitChildrenSchema.safeParse(req.body);
if (!parsed.success) badRequest(parsed.error.issues);
const parent = await repo.getJob(parentId, pid);
if (!parent) throw new NotFoundError('Parent job not found');
const result = await coordinator.submitChildren(parentId, pid, parsed.data.children);
reply.code(201);
return { parent: result.parent, children: result.childJobs };
});
// ── Phase 3 DAG: get the subtree rooted at a job ──
app.get('/fleet/jobs/:id/dag', async req => {
await extractAuth(req);
const { id } = req.params as { id: string };
const pid = getRequestProductId(req);
const dag = await coordinator.getDagSubtree(id, pid);
if (!dag) throw new NotFoundError('Job not found');
return { dag };
});
// ── Tracker bridge (§10): ingest an Item as a job (idempotent, scheduled by §7) ── // ── Tracker bridge (§10): ingest an Item as a job (idempotent, scheduled by §7) ──
app.post('/fleet/tracker/ingest', async (req, reply) => { app.post('/fleet/tracker/ingest', async (req, reply) => {
await extractAuth(req); await extractAuth(req);

View File

@ -310,6 +310,25 @@ export const SubmitJobSchema = z.object({
kind: z.enum(JOB_KINDS).default('leaf'), kind: z.enum(JOB_KINDS).default('leaf'),
parentId: z.string().optional(), parentId: z.string().optional(),
trackerItemId: z.string().optional(), trackerItemId: z.string().optional(),
/** Phase 3 DAG: inline children to create atomically with the parent. */
children: z
.array(
z.object({
idempotencyKey: z.string().min(1),
bodyMd: z.string().min(1),
priority: z.enum(FLEET_PRIORITIES).default('medium'),
capabilities: z.array(z.string()).default([]),
engineClass: z.enum(FLEET_ENGINE_CLASSES).optional(),
profile: z.string().optional(),
prefersEngine: z.array(z.string()).default([]),
allowedScope: z.array(z.string()).default([]),
deps: z.array(z.string()).default([]),
depsMode: z.enum(DEPS_MODES).optional(),
budget: BudgetSchema.optional(),
retry: RetryPolicySchema.optional(),
})
)
.default([]),
}); });
export type SubmitJobInput = z.infer<typeof SubmitJobSchema>; export type SubmitJobInput = z.infer<typeof SubmitJobSchema>;
@ -411,3 +430,24 @@ export const EchoJobSchema = z.object({
jobId: z.string().min(1), jobId: z.string().min(1),
}); });
export type EchoJobInput = z.infer<typeof EchoJobSchema>; export type EchoJobInput = z.infer<typeof EchoJobSchema>;
/** Phase 3 DAG: submit children for an existing parent job. */
export const SubmitChildrenSchema = z.object({
children: z.array(
z.object({
idempotencyKey: z.string().min(1),
bodyMd: z.string().min(1),
priority: z.enum(FLEET_PRIORITIES).default('medium'),
capabilities: z.array(z.string()).default([]),
engineClass: z.enum(FLEET_ENGINE_CLASSES).optional(),
profile: z.string().optional(),
prefersEngine: z.array(z.string()).default([]),
allowedScope: z.array(z.string()).default([]),
deps: z.array(z.string()).default([]),
depsMode: z.enum(DEPS_MODES).optional(),
budget: BudgetSchema.optional(),
retry: RetryPolicySchema.optional(),
})
),
});
export type SubmitChildrenInput = z.infer<typeof SubmitChildrenSchema>;