test(fleet): end-to-end lifecycle/chaos guardrails across coordinator features

Adds cross-feature integration tests that drive the coordinator the way a real
factory does, asserting the newly-wired paths COMPOSE (the per-feature unit
suites only cover them in isolation):

- retry: fail then auto-requeue then fail again then dead_letter, one claim per attempt;
- chaos: factory dies mid-build, stale reclaim fences the zombie, a late
  transition from the dead holder is rejected, survivor claims with a higher
  epoch, builds, and ships with the run result/engine recorded;
- no double-assignment under a true claim race;
- expiry reaper recovers a vanished factory's job (requeue + epoch bump + lease expired).

Test-only; deterministic and offline (memory provider, injected time, no network).

Generated with [Devin](https://cli.devin.ai/docs)

Co-Authored-By: Devin <158243242+devin-ai-integration[bot]@users.noreply.github.com>
This commit is contained in:
saravanakumardb1 2026-06-01 12:05:07 -07:00
parent 6770bbeef2
commit 9e0afc23d2

View File

@ -0,0 +1,164 @@
/**
* Fleet end-to-end / chaos lifecycle ties the coordinator pieces together in
* realistic multi-step flows (submit claim build fail retry reclaim
* dead_letter ship), the way a real factory drives them. These are the
* cross-feature regression guardrails the per-feature unit suites don't provide:
* they assert the new retry, stale-reclaim, fencing, and ship paths COMPOSE.
*
* Deterministic + offline: memory provider, injected time, no `gh`/network.
*/
import { afterEach, beforeEach, describe, expect, it } from 'vitest';
import { MemoryDatastoreProvider } from '@bytelyst/datastore';
import { _resetDatastoreProvider, setProvider } from '../../lib/datastore.js';
import * as repo from './repository.js';
import * as coord from './coordinator.js';
import type { SubmitJobInput } from './types.js';
const PID = 'lysnrai';
function input(over: Partial<SubmitJobInput> = {}): SubmitJobInput {
return {
idempotencyKey: 'task-1',
bodyMd: '# do the thing',
priority: 'medium',
capabilities: [],
prefersEngine: [],
allowedScope: [],
deps: [],
kind: 'leaf',
...over,
};
}
const factory = (over = {}) => ({
productId: PID,
factoryId: 'fac_1',
capabilities: [] as string[],
leaseSeconds: 900,
...over,
});
/** Event types appended to a job's stream, in order. */
async function eventTypes(jobId: string): Promise<string[]> {
return (await repo.listEvents(jobId)).map(e => e.type);
}
describe('fleet lifecycle (e2e)', () => {
beforeEach(() => setProvider(new MemoryDatastoreProvider()));
afterEach(() => _resetDatastoreProvider());
it('retry path: fail → auto-requeue → fail again → dead_letter, with one claim each', async () => {
const { job } = await coord.submitJob(
PID,
input({ engine: 'devin', retry: { max: 2, on: ['timeout'] } })
);
// Attempt 1 — claimed, then times out → retryable (attempts 1 < max 2) → requeued.
const c1 = await coord.claimNextJob(factory({ capabilities: ['engine:devin'] }));
expect(c1?.job.stage).toBe('assigned');
expect(c1?.job.attempts).toBe(1);
const r1 = await coord.releaseLease(job.id, PID, c1!.job.leaseEpoch, 'failed', {
result: 'timeout',
});
expect(r1.ok).toBe(true);
expect((await repo.getJob(job.id, PID))?.stage).toBe('queued');
// Attempt 2 — claimed again, times out → attempts exhausted → dead_letter.
const c2 = await coord.claimNextJob(factory({ capabilities: ['engine:devin'] }));
expect(c2?.job.attempts).toBe(2);
await coord.releaseLease(job.id, PID, c2!.job.leaseEpoch, 'failed', { result: 'timeout' });
const finalJob = await repo.getJob(job.id, PID);
expect(finalJob?.stage).toBe('dead_letter');
const types = await eventTypes(job.id);
expect(types).toContain('retry_scheduled');
expect(types).toContain('dead_letter');
// exactly one retry was scheduled (one requeue before exhaustion)
expect(types.filter(t => t === 'retry_scheduled')).toHaveLength(1);
});
it('chaos: factory dies mid-build → stale reclaim fences it → survivor finishes → ship', async () => {
const { job } = await coord.submitJob(PID, input({ engine: 'claude' }));
// fac_A registers (heartbeat) and claims; starts building.
await coord.heartbeat({ productId: PID, factoryId: 'fac_A', capabilities: [], load: 0 });
const cA = await coord.claimNextJob(
factory({ factoryId: 'fac_A', capabilities: ['engine:claude'] })
);
const epochA = cA!.job.leaseEpoch;
await coord.patchJobFenced(job.id, PID, { leaseEpoch: epochA, stage: 'building' });
expect((await repo.getJob(job.id, PID))?.stage).toBe('building');
// fac_A goes silent: backdate its heartbeat → stale reclaim returns the job.
const facDoc = await repo.getFactory('fac_A', PID);
await repo.upsertFactory({
...facDoc!,
lastHeartbeatAt: new Date(Date.now() - 5 * 60_000).toISOString(),
});
const reclaimed = await coord.reclaimStaleFactoryLeases(Date.now());
expect(reclaimed.jobIds).toContain(job.id);
expect((await repo.getJob(job.id, PID))?.stage).toBe('queued');
// The zombie fac_A tries a late transition with its stale epoch → fenced (no effect).
const zombie = await coord.patchJobFenced(job.id, PID, {
leaseEpoch: epochA,
stage: 'shipped',
});
expect(zombie.ok).toBe(false);
expect(zombie.ok === false && zombie.reason).toBe('fenced');
expect((await repo.getJob(job.id, PID))?.stage).toBe('queued'); // unchanged
// Survivor fac_B claims (fresh, higher epoch), builds, and ships.
const cB = await coord.claimNextJob(
factory({ factoryId: 'fac_B', capabilities: ['engine:claude'] })
);
expect(cB!.job.leaseEpoch).toBeGreaterThan(epochA); // reclaim bumped the epoch
await coord.patchJobFenced(job.id, PID, { leaseEpoch: cB!.job.leaseEpoch, stage: 'building' });
const shipped = await coord.releaseLease(job.id, PID, cB!.job.leaseEpoch, 'shipped', {
result: 'shipped',
insights: { engine: 'claude' },
});
expect(shipped.ok).toBe(true);
const doneJob = await repo.getJob(job.id, PID);
expect(doneJob?.stage).toBe('shipped');
const runs = await repo.listRunsByJob(job.id);
// two attempts ran (fac_A reclaimed, fac_B shipped); the latest is shipped.
const latest = runs.reduce((a, b) => (b.attempt > a.attempt ? b : a));
expect(latest.result).toBe('shipped');
expect(latest.engine).toBe('claude');
const types = await eventTypes(job.id);
expect(types).toContain('factory_stale');
});
it('no double-assignment: two factories race for the only job; exactly one wins', async () => {
const { job } = await coord.submitJob(PID, input());
const [a, b] = await Promise.all([
coord.tryClaimJob(job, factory({ factoryId: 'fac_A' })),
coord.tryClaimJob(job, factory({ factoryId: 'fac_B' })),
]);
expect([a, b].filter(r => r.ok)).toHaveLength(1);
// The job is assigned to exactly one holder.
const lease = await repo.getLease(job.id);
expect(['fac_A', 'fac_B']).toContain(lease?.holderFactoryId);
});
it('expiry reaper recovers a job whose factory neither renewed nor reported', async () => {
const { job } = await coord.submitJob(PID, input());
const c = await coord.claimNextJob(factory());
// Force the lease past expiry (factory vanished without releasing).
const lease = await repo.getLease(job.id);
await repo.revUpdateLease(job.id, lease!.rev, {
expiresAt: '2000-01-01T00:00:00.000Z',
});
const res = await coord.reapExpiredLeases(new Date().toISOString());
expect(res.jobIds).toContain(job.id);
const after = await repo.getJob(job.id, PID);
expect(after?.stage).toBe('queued');
expect(after?.leaseEpoch).toBe(c!.job.leaseEpoch + 1); // fenced
expect((await repo.getLease(job.id))?.status).toBe('expired');
});
});