feat(fleet): honor job.autoMerge on ship and surface PR-merge failures

job.autoMerge was persisted but ignored — PR merging fired only when the host
set FLEET_SHIP_MERGES_PR=1, and a failed merge was silent (PR left open, no
signal). Now:

- mergeRunPrOnShip merges when EITHER the job opted in (job.autoMerge) OR the
  global flag is set (new pure, unit-tested shouldMergePrOnShip gate). Existing
  global-flag behavior is preserved.
- Merge outcomes are surfaced as job events: pr_merged on success (inline or via
  background retry) and pr_merge_failed when the inline attempt + 4 background
  retries all fail, so a stuck PR shows on the timeline instead of vanishing.

Still fully best-effort and gated (no merge attempted unless opted in), so the
real-world side effect only happens when explicitly requested.

Generated with [Devin](https://cli.devin.ai/docs)

Co-Authored-By: Devin <158243242+devin-ai-integration[bot]@users.noreply.github.com>
This commit is contained in:
saravanakumardb1 2026-06-01 11:47:33 -07:00
parent 493027fbad
commit 6770bbeef2
2 changed files with 66 additions and 12 deletions

View File

@ -0,0 +1,28 @@
/**
* autoMerge gating `shouldMergePrOnShip` decides whether a shipped job's PR is
* auto-merged. Pure (job.autoMerge OR the FLEET_SHIP_MERGES_PR host flag), so we
* verify the gate without invoking the GitHub CLI.
*/
import { afterEach, beforeEach, describe, expect, it } from 'vitest';
import { shouldMergePrOnShip } from './coordinator.js';
describe('shouldMergePrOnShip', () => {
beforeEach(() => delete process.env.FLEET_SHIP_MERGES_PR);
afterEach(() => delete process.env.FLEET_SHIP_MERGES_PR);
it('merges when the job opted in via autoMerge', () => {
expect(shouldMergePrOnShip({ autoMerge: true })).toBe(true);
});
it('merges when the host enables it globally (FLEET_SHIP_MERGES_PR=1)', () => {
process.env.FLEET_SHIP_MERGES_PR = '1';
expect(shouldMergePrOnShip({ autoMerge: false })).toBe(true);
expect(shouldMergePrOnShip({ autoMerge: undefined })).toBe(true);
});
it('does NOT merge by default (no opt-in, flag off)', () => {
expect(shouldMergePrOnShip({ autoMerge: false })).toBe(false);
expect(shouldMergePrOnShip({ autoMerge: undefined })).toBe(false);
});
});

View File

@ -737,29 +737,55 @@ async function ghMergePr(prUrl: string): Promise<boolean> {
}
}
async function mergeRunPrOnShip(jobId: string, latest: FleetRunDoc | undefined): Promise<void> {
if (process.env.FLEET_SHIP_MERGES_PR !== '1') return;
/**
* Whether a shipped job's PR should be auto-merged: when the job opted in
* (`job.autoMerge`) OR the host enables it globally (`FLEET_SHIP_MERGES_PR=1`).
* Pure (env + job only) so the gating is unit-testable without invoking `gh`.
*/
export function shouldMergePrOnShip(job: Pick<FleetJobDoc, 'autoMerge'>): boolean {
return job.autoMerge === true || process.env.FLEET_SHIP_MERGES_PR === '1';
}
async function mergeRunPrOnShip(job: FleetJobDoc, latest: FleetRunDoc | undefined): Promise<void> {
if (!shouldMergePrOnShip(job)) return;
if (!latest?.prUrl || latest.prState === 'merged') return;
const { prUrl, id: runId } = latest;
const { id: jobId, productId } = job;
const recordMerged = async () => {
try {
await repo.updateRun(runId, jobId, { prState: 'merged' });
await repo.appendEvent({ jobId, productId, type: 'pr_merged', data: { prUrl } });
} catch {
/* run gone — ignore */
}
};
// Fast attempt inline (so a healthy proxy merges immediately without delaying ship).
if (await ghMergePr(prUrl)) {
await repo.updateRun(runId, jobId, { prState: 'merged' });
await recordMerged();
return;
}
// The corporate proxy intermittently 407s GitHub's API. Retry in the BACKGROUND
// with backoff so the ship response is never blocked; mark merged when one lands.
// with backoff so the ship response is never blocked; mark merged when one lands,
// and SURFACE a `pr_merge_failed` event if every attempt fails (so a stuck PR is
// visible on the job timeline instead of silently left open).
void (async () => {
for (const delayMs of [3_000, 8_000, 20_000, 45_000]) {
await new Promise(r => setTimeout(r, delayMs));
if (await ghMergePr(prUrl)) {
try {
await repo.updateRun(runId, jobId, { prState: 'merged' });
} catch {
/* run gone — ignore */
}
await recordMerged();
return;
}
}
try {
await repo.appendEvent({
jobId,
productId,
type: 'pr_merge_failed',
data: { prUrl, reason: 'gh pr merge failed after inline + 4 background retries' },
});
} catch {
/* job gone — ignore */
}
})().catch(() => {
/* detached best-effort — never throws */
});
@ -802,8 +828,8 @@ export async function patchJobFenced(
try {
// Run-level result mirrors the terminal stage (ungated).
const latest = await markLatestRunShipped(jobId);
// Optionally merge the linked PR (flag-gated, best-effort).
await mergeRunPrOnShip(jobId, latest);
// Optionally merge the linked PR (per-job autoMerge or global flag, best-effort).
await mergeRunPrOnShip(job, latest);
// Budgets (flag-gated): accrue the run's actual cost, idempotent per run.
if (isBudgetsEnabled()) {
await accrueSpend(productId, latest?.insights?.costUsd ?? 0, `${jobId}:${job.leaseEpoch}`);
@ -1272,7 +1298,7 @@ export async function operatorAction(
if (stage === 'shipped') {
try {
const latest = await markLatestRunShipped(jobId);
await mergeRunPrOnShip(jobId, latest);
await mergeRunPrOnShip(job, latest);
} catch {
// best-effort — run bookkeeping never fails the operator action
}