From 9e0afc23d2c9360a29f81017a4c50e6c0b9767bf Mon Sep 17 00:00:00 2001 From: saravanakumardb1 Date: Mon, 1 Jun 2026 12:05:07 -0700 Subject: [PATCH] test(fleet): end-to-end lifecycle/chaos guardrails across coordinator features Adds cross-feature integration tests that drive the coordinator the way a real factory does, asserting the newly-wired paths COMPOSE (the per-feature unit suites only cover them in isolation): - retry: fail then auto-requeue then fail again then dead_letter, one claim per attempt; - chaos: factory dies mid-build, stale reclaim fences the zombie, a late transition from the dead holder is rejected, survivor claims with a higher epoch, builds, and ships with the run result/engine recorded; - no double-assignment under a true claim race; - expiry reaper recovers a vanished factory's job (requeue + epoch bump + lease expired). Test-only; deterministic and offline (memory provider, injected time, no network). Generated with [Devin](https://cli.devin.ai/docs) Co-Authored-By: Devin <158243242+devin-ai-integration[bot]@users.noreply.github.com> --- .../src/modules/fleet/lifecycle.e2e.test.ts | 164 ++++++++++++++++++ 1 file changed, 164 insertions(+) create mode 100644 services/platform-service/src/modules/fleet/lifecycle.e2e.test.ts diff --git a/services/platform-service/src/modules/fleet/lifecycle.e2e.test.ts b/services/platform-service/src/modules/fleet/lifecycle.e2e.test.ts new file mode 100644 index 00000000..aeb68f53 --- /dev/null +++ b/services/platform-service/src/modules/fleet/lifecycle.e2e.test.ts @@ -0,0 +1,164 @@ +/** + * Fleet end-to-end / chaos lifecycle — ties the coordinator pieces together in + * realistic multi-step flows (submit → claim → build → fail → retry → reclaim → + * dead_letter → ship), the way a real factory drives them. These are the + * cross-feature regression guardrails the per-feature unit suites don't provide: + * they assert the new retry, stale-reclaim, fencing, and ship paths COMPOSE. + * + * Deterministic + offline: memory provider, injected time, no `gh`/network. + */ + +import { afterEach, beforeEach, describe, expect, it } from 'vitest'; +import { MemoryDatastoreProvider } from '@bytelyst/datastore'; +import { _resetDatastoreProvider, setProvider } from '../../lib/datastore.js'; +import * as repo from './repository.js'; +import * as coord from './coordinator.js'; +import type { SubmitJobInput } from './types.js'; + +const PID = 'lysnrai'; + +function input(over: Partial = {}): SubmitJobInput { + return { + idempotencyKey: 'task-1', + bodyMd: '# do the thing', + priority: 'medium', + capabilities: [], + prefersEngine: [], + allowedScope: [], + deps: [], + kind: 'leaf', + ...over, + }; +} + +const factory = (over = {}) => ({ + productId: PID, + factoryId: 'fac_1', + capabilities: [] as string[], + leaseSeconds: 900, + ...over, +}); + +/** Event types appended to a job's stream, in order. */ +async function eventTypes(jobId: string): Promise { + return (await repo.listEvents(jobId)).map(e => e.type); +} + +describe('fleet lifecycle (e2e)', () => { + beforeEach(() => setProvider(new MemoryDatastoreProvider())); + afterEach(() => _resetDatastoreProvider()); + + it('retry path: fail → auto-requeue → fail again → dead_letter, with one claim each', async () => { + const { job } = await coord.submitJob( + PID, + input({ engine: 'devin', retry: { max: 2, on: ['timeout'] } }) + ); + + // Attempt 1 — claimed, then times out → retryable (attempts 1 < max 2) → requeued. + const c1 = await coord.claimNextJob(factory({ capabilities: ['engine:devin'] })); + expect(c1?.job.stage).toBe('assigned'); + expect(c1?.job.attempts).toBe(1); + const r1 = await coord.releaseLease(job.id, PID, c1!.job.leaseEpoch, 'failed', { + result: 'timeout', + }); + expect(r1.ok).toBe(true); + expect((await repo.getJob(job.id, PID))?.stage).toBe('queued'); + + // Attempt 2 — claimed again, times out → attempts exhausted → dead_letter. + const c2 = await coord.claimNextJob(factory({ capabilities: ['engine:devin'] })); + expect(c2?.job.attempts).toBe(2); + await coord.releaseLease(job.id, PID, c2!.job.leaseEpoch, 'failed', { result: 'timeout' }); + + const finalJob = await repo.getJob(job.id, PID); + expect(finalJob?.stage).toBe('dead_letter'); + + const types = await eventTypes(job.id); + expect(types).toContain('retry_scheduled'); + expect(types).toContain('dead_letter'); + // exactly one retry was scheduled (one requeue before exhaustion) + expect(types.filter(t => t === 'retry_scheduled')).toHaveLength(1); + }); + + it('chaos: factory dies mid-build → stale reclaim fences it → survivor finishes → ship', async () => { + const { job } = await coord.submitJob(PID, input({ engine: 'claude' })); + + // fac_A registers (heartbeat) and claims; starts building. + await coord.heartbeat({ productId: PID, factoryId: 'fac_A', capabilities: [], load: 0 }); + const cA = await coord.claimNextJob( + factory({ factoryId: 'fac_A', capabilities: ['engine:claude'] }) + ); + const epochA = cA!.job.leaseEpoch; + await coord.patchJobFenced(job.id, PID, { leaseEpoch: epochA, stage: 'building' }); + expect((await repo.getJob(job.id, PID))?.stage).toBe('building'); + + // fac_A goes silent: backdate its heartbeat → stale reclaim returns the job. + const facDoc = await repo.getFactory('fac_A', PID); + await repo.upsertFactory({ + ...facDoc!, + lastHeartbeatAt: new Date(Date.now() - 5 * 60_000).toISOString(), + }); + const reclaimed = await coord.reclaimStaleFactoryLeases(Date.now()); + expect(reclaimed.jobIds).toContain(job.id); + expect((await repo.getJob(job.id, PID))?.stage).toBe('queued'); + + // The zombie fac_A tries a late transition with its stale epoch → fenced (no effect). + const zombie = await coord.patchJobFenced(job.id, PID, { + leaseEpoch: epochA, + stage: 'shipped', + }); + expect(zombie.ok).toBe(false); + expect(zombie.ok === false && zombie.reason).toBe('fenced'); + expect((await repo.getJob(job.id, PID))?.stage).toBe('queued'); // unchanged + + // Survivor fac_B claims (fresh, higher epoch), builds, and ships. + const cB = await coord.claimNextJob( + factory({ factoryId: 'fac_B', capabilities: ['engine:claude'] }) + ); + expect(cB!.job.leaseEpoch).toBeGreaterThan(epochA); // reclaim bumped the epoch + await coord.patchJobFenced(job.id, PID, { leaseEpoch: cB!.job.leaseEpoch, stage: 'building' }); + const shipped = await coord.releaseLease(job.id, PID, cB!.job.leaseEpoch, 'shipped', { + result: 'shipped', + insights: { engine: 'claude' }, + }); + expect(shipped.ok).toBe(true); + + const doneJob = await repo.getJob(job.id, PID); + expect(doneJob?.stage).toBe('shipped'); + const runs = await repo.listRunsByJob(job.id); + // two attempts ran (fac_A reclaimed, fac_B shipped); the latest is shipped. + const latest = runs.reduce((a, b) => (b.attempt > a.attempt ? b : a)); + expect(latest.result).toBe('shipped'); + expect(latest.engine).toBe('claude'); + + const types = await eventTypes(job.id); + expect(types).toContain('factory_stale'); + }); + + it('no double-assignment: two factories race for the only job; exactly one wins', async () => { + const { job } = await coord.submitJob(PID, input()); + const [a, b] = await Promise.all([ + coord.tryClaimJob(job, factory({ factoryId: 'fac_A' })), + coord.tryClaimJob(job, factory({ factoryId: 'fac_B' })), + ]); + expect([a, b].filter(r => r.ok)).toHaveLength(1); + // The job is assigned to exactly one holder. + const lease = await repo.getLease(job.id); + expect(['fac_A', 'fac_B']).toContain(lease?.holderFactoryId); + }); + + it('expiry reaper recovers a job whose factory neither renewed nor reported', async () => { + const { job } = await coord.submitJob(PID, input()); + const c = await coord.claimNextJob(factory()); + // Force the lease past expiry (factory vanished without releasing). + const lease = await repo.getLease(job.id); + await repo.revUpdateLease(job.id, lease!.rev, { + expiresAt: '2000-01-01T00:00:00.000Z', + }); + const res = await coord.reapExpiredLeases(new Date().toISOString()); + expect(res.jobIds).toContain(job.id); + const after = await repo.getJob(job.id, PID); + expect(after?.stage).toBe('queued'); + expect(after?.leaseEpoch).toBe(c!.job.leaseEpoch + 1); // fenced + expect((await repo.getLease(job.id))?.status).toBe('expired'); + }); +});