diff --git a/agent-queue/README.md b/agent-queue/README.md index 3b8dbe6..6d65cbf 100644 --- a/agent-queue/README.md +++ b/agent-queue/README.md @@ -403,7 +403,11 @@ AQ_FLEET=1 AQ_FLEET_TOKEN=… AQ_PRODUCT_ID=… agent-queue.sh run # | Var | Default | Meaning | | --- | ------- | ------- | -| `AQ_FLEET` | `0` | master switch — `1` enables coordinator integration; `0`/unset = offline git-queue | +| `AQ_FLEET` | `0` | master switch — `1` enables coordinator integration; `0`/unset = offline git-queue (zero coordinator calls) | +| `AQ_FLEET_ROUTE` | `1` | `route_via_service`: `1` = coordinator is authoritative for claim (P2-S3 behavior); `0` = local inbox authoritative (coordinator not used to source work) | +| `AQ_FLEET_SHADOW` | `0` | shadow/dual-run: `1` (requires `AQ_FLEET=1` + `AQ_FLEET_ROUTE=0`) queries the coordinator in parallel and records divergence, **never acting on it** | +| `AQ_FLEET_SHADOW_FACTORY_ID` | `-shadow` | isolated id used for the read-only shadow claim (never the real factory id) | +| `AQ_FLEET_SHADOW_LOG` | `.state/fleet-shadow.log` | structured shadow-divergence log (`ts⇥localJob⇥coordJob⇥verdict`) | | `AQ_FLEET_API` | `http://localhost:4003/api` | coordinator base URL (already includes `/api`) | | `AQ_FLEET_TOKEN` | _(none)_ | bearer token — never hardcode | | `AQ_PRODUCT_ID` | _(none)_ | productId (sent as `X-Product-Id`; shared with the tracker config) | @@ -433,6 +437,53 @@ auto-shipped) and surfaced for human triage — split-brain is resolved in favor the coordinator without losing the work. `status` shows the factory id + per-job `fleet=@e`; `insights` lists the `fleet_*` fields. +### Feature flags + shadow / dual-run (Slice 4, §16/§27) + +Three explicit, independently-toggleable levels gate the coordinator — a safe, +reversible path to validate the fleet coordinator against the proven single-host +(P1) behavior **before** any real cutover: + +| Flag | Effect | +| ---- | ------ | +| `AQ_FLEET=0` | **Pure offline.** Zero coordinator calls (including shadow). Offline git-queue path is byte-for-byte unchanged. | +| `AQ_FLEET_ROUTE=1` (default) | **route_via_service** — the coordinator is *authoritative* for claim/assignment (today's P2-S3 behavior). | +| `AQ_FLEET_ROUTE=0` | **Local inbox authoritative** — the coordinator is *not* used to source work (the pre-cutover state). | +| `AQ_FLEET_SHADOW=1` | **Shadow / dual-run** (requires `AQ_FLEET=1` **and** `AQ_FLEET_ROUTE=0`): run the offline path as authoritative **and** query the coordinator in parallel, recording divergence **without acting on it**. | + +**Precedence.** Shadow is only meaningful when `ROUTE=0`. If both `AQ_FLEET_ROUTE=1` +and `AQ_FLEET_SHADOW=1` are set, **ROUTE wins** and shadow is disabled (a one-shot +warning is logged) — you never route *and* shadow at the same time. + +**Side-effect-free by construction.** Shadow **never** ships, quarantines, or +mutates real job state. `fleet_shadow_claim` asks the coordinator what it *would* +assign using an **isolated `-shadow` factoryId** + `"dryRun":true,"shadow":true`; +if a coordinator without dry-run support actually assigned, the lease is **released +immediately** so no real assignment persists. The would-be job is never +materialized, run, or shipped locally. `fleet_shadow_report` mirrors the local +stage as a shadow event (`"shadow":true`) purely to exercise reporting — the +coordinator response is logged but **never acted on** (no fence/quarantine). + +Each iteration `fleet_shadow_compare` classifies the local (authoritative) decision +vs the coordinator's would-be decision as **AGREE / DIVERGE / COORD_EMPTY / +LOCAL_EMPTY** and appends a line to the shadow log. Summarize it any time: + +```bash +agent-queue.sh fleet-shadow-report # per-verdict counts + agreement rate + recent divergences +agent-queue.sh fleet-shadow-report 25 # last 25 divergence/error events +agent-queue.sh status # surfaces the three flags' resolved state +``` + +**Cutover ladder (rollback at any step):** + +1. **Observe (zero risk):** `AQ_FLEET=1 AQ_FLEET_ROUTE=0 AQ_FLEET_SHADOW=1 run` — + the local path stays authoritative; the coordinator is only shadowed. +2. **Inspect agreement:** `fleet-shadow-report` — drive `AGREEMENT` toward 100%, + investigating each `DIVERGE`. +3. **Cut over:** once agreement is high, flip `AQ_FLEET_ROUTE=1` (coordinator + becomes authoritative). +4. **Rollback:** set `AQ_FLEET_ROUTE=0` (and/or `AQ_FLEET=0`) at any time — instant + return to the local/offline path, no data migration. + ## Config (env overrides) | Var | Default | Meaning | diff --git a/agent-queue/agent-queue.sh b/agent-queue/agent-queue.sh index 91fc63e..2642325 100755 --- a/agent-queue/agent-queue.sh +++ b/agent-queue/agent-queue.sh @@ -1414,6 +1414,7 @@ cmd_run() { recover_orphans # Fleet (§8): register with the coordinator (registration == first heartbeat). fleet_enabled && fleet_heartbeat + fleet_flags_warn_once # §16: warn once if ROUTE=1 + SHADOW=1 (ROUTE wins, shadow off) while true; do # continuously sweep for orphans (a worker that died mid-loop) @@ -1424,8 +1425,14 @@ cmd_run() { if fleet_enabled; then fleet_heartbeat_maybe fleet_renew_active - [[ "$(active_workers)" -lt "$MAX_CONCURRENCY" ]] && { fleet_claim >/dev/null 2>&1 || true; } + # ROUTE flag (§16): only SOURCE work from the coordinator when route_via_service + # is on (ROUTE=1, the default). With ROUTE=0 the LOCAL inbox is authoritative + # (pre-cutover / shadow) — the coordinator is never used to source work. + if fleet_route_enabled && [[ "$(active_workers)" -lt "$MAX_CONCURRENCY" ]]; then + fleet_claim >/dev/null 2>&1 || true + fi fi + local shadow_local="" # the local (authoritative) job picked this iteration (for shadow compare) local running; running=$(active_workers) # launch jobs while we have capacity and an eligible inbox file while [[ "$running" -lt "$MAX_CONCURRENCY" ]]; do @@ -1452,6 +1459,10 @@ cmd_run() { local job; job=$(basename "$next"); job=${job%.md} local doing_file="$BUILDING/$(basename "$next")" mv "$next" "$doing_file" + # Shadow compare key: the LOCAL authoritative decision is identified by its + # idempotency-key (the coordinator keys jobs the same way), falling back to + # the job name. Capture the first job launched this iteration. + [[ -z "$shadow_local" ]] && shadow_local=$(fm_get "$doing_file" idempotency-key "$job") local w_eng w_cwd w_yolo w_key # resolve the concrete engine now (explicit engine / engine-class) so the # meta + status reflect what will actually run; run_worker re-resolves and @@ -1503,6 +1514,19 @@ cmd_run() { running=$(active_workers) done + # Shadow / dual-run (§9/§16): AFTER the local authoritative decision this + # iteration, ask the coordinator what it WOULD have assigned and record the + # divergence — WITHOUT acting on its response (shadow never claims/ships/ + # quarantines or mutates real job state). Best-effort + error-swallowed so a + # coordinator hiccup can NEVER fail a real job. + if fleet_shadow_enabled && [[ -n "$shadow_local" ]]; then + { + fleet_shadow_claim + fleet_shadow_compare "$shadow_local" "${SHADOW_COORD_JOB:-}" + fleet_shadow_report "$shadow_local" "${SHADOW_COORD_JOB:-}" building + } >/dev/null 2>&1 || true + fi + if $once; then # drain when no worker is running and nothing in inbox can still progress on # its own (backoff jobs still count as pending; dep-blocked jobs do not). @@ -1526,6 +1550,7 @@ cmd_status() { printf '%s AGENT QUEUE %s %s\n' "$C_BOLD" "$C_DIM$QUEUE_ROOT$C_RESET" "" if fleet_enabled; then printf ' %sFLEET%s factory=%s api=%s\n' "$C_CYAN" "$C_RESET" "$AQ_FACTORY_ID" "$AQ_FLEET_API" + printf ' %sFLEET%s %s\n' "$C_CYAN" "$C_RESET" "$(fleet_flags_state)" fi printf ' %sinbox%s %-3s %sbuilding%s %-3s %sreview%s %-3s %stesting%s %-3s %sshipped%s %-3s %sfailed%s %-3s %srunning%s %s/%s\n\n' \ "$C_BLUE" "$C_RESET" "$ib" "$C_YEL" "$C_RESET" "$bd" \ @@ -1827,7 +1852,8 @@ ${C_BOLD}COMMANDS${C_RESET} recover reclaim orphaned building/ jobs (dead worker) -> inbox from-tracker pull a tracker Item -> materialize a job in inbox/ (§10) to-tracker echo a job's outcome to its tracker Item (one-way) - fleet-status (AQ_FLEET=1) register/heartbeat with the coordinator + show identity + fleet-status (AQ_FLEET=1) register/heartbeat with the coordinator + show identity + flags + fleet-shadow-report [N] summarize the shadow/dual-run divergence log (counts, agreement, last N) dash [--interval N] richer live Node dashboard (recent shipped/failed too) stop kill running workers + the run loop logs [-f] print (or follow) a job's log @@ -1887,8 +1913,15 @@ ${C_BOLD}TRACKER${C_RESET} (§10 — from-tracker / to-tracker; real use needs p ${C_BOLD}FLEET${C_RESET} (Phase 2 — coordinator integration; OFF by default = offline git-queue) AQ_FLEET=1 to enable; AQ_FLEET_API (=$AQ_FLEET_API) AQ_FLEET_TOKEN (bearer) AQ_PRODUCT_ID AQ_FACTORY_ID (=$AQ_FACTORY_ID) AQ_FLEET_LEASE_RENEW_SEC AQ_FLEET_CAPS AQ_FLEET_CWD - When on, 'run' registers + claims coordinator jobs (interleaved with local .md), reports - fenced stage transitions, renews leases, and quarantines a job if it is reclaimed (fenced). + Three independently-toggleable flags (precedence below): + AQ_FLEET=0 pure offline — ZERO coordinator calls (master switch). + AQ_FLEET_ROUTE=1 route_via_service (default): coordinator AUTHORITATIVE for claim. + AQ_FLEET_ROUTE=0 LOCAL inbox authoritative — coordinator not used to source work. + AQ_FLEET_SHADOW=1 shadow/dual-run (needs AQ_FLEET=1 + AQ_FLEET_ROUTE=0): run the offline + path as authoritative AND query the coordinator in parallel to record + divergence, never acting on it. If ROUTE=1 + SHADOW=1, ROUTE wins (warned). + Cutover ladder: (1) ROUTE=0 SHADOW=1 observe → (2) inspect 'fleet-shadow-report' agreement + → (3) flip ROUTE=1 once agreement is high. Rollback = ROUTE=0 (and/or AQ_FLEET=0) anytime. EOF } @@ -1902,6 +1935,7 @@ main() { case "$cmd" in init) cmd_init "$@";; fleet-status) cmd_fleet_status "$@";; + fleet-shadow-report) cmd_fleet_shadow_report "$@";; add) cmd_add "$@";; run) cmd_run "$@";; status) cmd_status "$@";; diff --git a/agent-queue/docs/GIGAFACTORY_ROADMAP.md b/agent-queue/docs/GIGAFACTORY_ROADMAP.md index 57c52e6..084adb4 100644 --- a/agent-queue/docs/GIGAFACTORY_ROADMAP.md +++ b/agent-queue/docs/GIGAFACTORY_ROADMAP.md @@ -384,7 +384,7 @@ Each phase: **Goal → checklist → Exit criteria**. Don't start a phase until - [ ] Scheduler/router core (§7) as a pure module (fixed weights) + wired into atomic assignment. - [ ] Tracker adapter calls the module directly (not just file export). - [ ] Auth: factory enrollment + scoped rotatable tokens; secret isolation enforced (§12 subset). -- [ ] **Feature flags** (`fleet.enabled`, `fleet.route_via_service`) + **shadow/dual-run** vs P1 before cutover (§21). +- [x] **Feature flags** (`fleet.enabled`, `fleet.route_via_service`) + **shadow/dual-run** vs P1 before cutover (§21). *(agent-queue runner: `AQ_FLEET` / `AQ_FLEET_ROUTE` / `AQ_FLEET_SHADOW` with documented precedence; shadow claim/compare/report is side-effect-free (isolated `-shadow` factoryId + dryRun, never materializes/ships); `fleet-shadow-report` summarizes AGREE/DIVERGE/COORD_EMPTY/LOCAL_EMPTY + agreement; 60→68 selftest checks.)* - [x] Module test suite (repository + routes via `@bytelyst/testing`); **atomic-claim race**, crash-recovery, fencing-rejection, reaper-reclaim tests. *(PR #28 + #29: 53 fleet + 48 datastore tests, incl. true-concurrency claim.)* - [ ] Two-factory demo (e.g. mac + ubuntu) running 3 parallel jobs end-to-end. - **Exit criteria:** all boxes ✅; `pnpm --filter @lysnrai/platform-service test` green; killing a factory mid-job → another reclaims and completes **and the dead worker's late report is fenced**; concurrent claimers never double-assign; all state in Cosmos with `productId`; **flag-off rollback verified** (§21). @@ -536,8 +536,8 @@ Each metric has a **provisional SLO target** (tune with real data; tracked with Each phase ships behind controls so it can be turned off without losing work. - [ ] **Feature-flagged rollout**: gate each phase's new path behind a platform feature flag (`fleet.enabled`, `fleet.route_via_service`, `fleet.tracker_sync`); default off; enable per-product first. -- [ ] **Dual-run / shadow**: P2 coordinator runs in shadow (assign decisions logged, not executed) alongside the P0/P1 path before cutover; compare decisions. -- [ ] **Cutover is reversible**: a factory can fall back from service-claim to git-queue via flag; no schema-destructive step on the rollback path. +- [x] **Dual-run / shadow**: P2 coordinator runs in shadow (assign decisions logged, not executed) alongside the P0/P1 path before cutover; compare decisions. *(agent-queue `AQ_FLEET_SHADOW=1`: offline path stays authoritative, coordinator queried in parallel, decisions classified AGREE/DIVERGE/COORD_EMPTY/LOCAL_EMPTY into `.state/fleet-shadow.log`; strictly side-effect-free — never ships/quarantines/mutates real job state.)* +- [x] **Cutover is reversible**: a factory can fall back from service-claim to git-queue via flag; no schema-destructive step on the rollback path. *(rollback = `AQ_FLEET_ROUTE=0` and/or `AQ_FLEET=0` at any time → instant return to the local/offline path; no data migration.)* - [ ] **Data migration**: introducing Cosmos containers (P2) is **additive** — no migration of existing tracker data; backfill is read-only (link `tracker-item`, don't mutate). Container creation is idempotent (registered in `cosmos-init`). - [ ] **Backward-compat gate**: every phase re-runs Phase-0 `selftest.sh` + a corpus of legacy `.md` files (regression). - [ ] **Rollback drill**: each phase's exit includes a tested rollback (flag off → prior behavior, in-flight jobs drain or requeue cleanly). diff --git a/agent-queue/lib/fleet-client.sh b/agent-queue/lib/fleet-client.sh index be38e40..89d499f 100644 --- a/agent-queue/lib/fleet-client.sh +++ b/agent-queue/lib/fleet-client.sh @@ -31,9 +31,63 @@ AQ_FLEET_CWD="${AQ_FLEET_CWD:-$PWD}" # cwd for claimed fl AQ_FLEET_API_CMD="${AQ_FLEET_API_CMD:-}" # test seam (stub script) AQ_FLEET_HB_TS=0 # last heartbeat epoch (mutable) +# ── Slice 4: feature-flag levels (three explicit, independently-toggleable) ── +# Precedence (documented in README §Cutover): +# AQ_FLEET=0 ⇒ pure offline, ZERO coordinator calls (master switch). +# AQ_FLEET_ROUTE=1 ⇒ route_via_service: coordinator is AUTHORITATIVE for claim +# (default; preserves the P2-S3 behavior). +# AQ_FLEET_ROUTE=0 ⇒ LOCAL inbox is authoritative (coordinator not used to +# source work) — the pre-cutover state. +# AQ_FLEET_SHADOW=1 ⇒ shadow/dual-run (requires AQ_FLEET=1 AND AQ_FLEET_ROUTE=0): +# run the normal offline path as authoritative AND query the coordinator in +# parallel WITHOUT acting on its responses, purely to record divergence. +# If AQ_FLEET_ROUTE=1 AND AQ_FLEET_SHADOW=1, ROUTE WINS and shadow is disabled +# (a one-shot warning is logged) — you never shadow and route at the same time. +AQ_FLEET_ROUTE="${AQ_FLEET_ROUTE:-1}" +AQ_FLEET_SHADOW="${AQ_FLEET_SHADOW:-0}" +# Isolated factory id for the read-only shadow claim (never the real factory id). +AQ_FLEET_SHADOW_FACTORY_ID="${AQ_FLEET_SHADOW_FACTORY_ID:-${AQ_FACTORY_ID}-shadow}" +# Shadow divergence log (default resolved to $STATE/fleet-shadow.log at call time). +AQ_FLEET_SHADOW_LOG="${AQ_FLEET_SHADOW_LOG:-}" +_AQ_FLEET_SHADOW_WARNED=0 # one-shot ROUTE>SHADOW precedence warning (per process) +SHADOW_COORD_JOB="" # set by fleet_shadow_claim: would-be coordinator job id + # fleet_enabled — true iff the coordinator integration is switched on. fleet_enabled() { [[ "${AQ_FLEET:-0}" == 1 ]]; } +# fleet_route_enabled — coordinator is authoritative for claim/assignment (ROUTE=1). +fleet_route_enabled() { fleet_enabled && [[ "${AQ_FLEET_ROUTE:-1}" == 1 ]]; } + +# fleet_shadow_enabled — shadow/dual-run is active. Pure (no logging): requires +# AQ_FLEET=1 AND AQ_FLEET_ROUTE=0 AND AQ_FLEET_SHADOW=1. When ROUTE=1 this returns +# false (ROUTE wins) — the precedence warning is emitted once by fleet_flags_warn_once. +fleet_shadow_enabled() { + fleet_enabled || return 1 + [[ "${AQ_FLEET_ROUTE:-1}" == 0 ]] || return 1 + [[ "${AQ_FLEET_SHADOW:-0}" == 1 ]] +} + +# fleet_flags_warn_once — emit the ROUTE>SHADOW precedence warning at most once. +# Called from the run-loop init so an operator who sets ROUTE=1 + SHADOW=1 is told +# that shadow is suppressed. No-op unless that exact (conflicting) combo is set. +fleet_flags_warn_once() { + fleet_enabled || return 0 + if [[ "${AQ_FLEET_ROUTE:-1}" == 1 && "${AQ_FLEET_SHADOW:-0}" == 1 && "${_AQ_FLEET_SHADOW_WARNED:-0}" != 1 ]]; then + err "fleet: AQ_FLEET_ROUTE=1 and AQ_FLEET_SHADOW=1 — ROUTE wins; shadow/dual-run is DISABLED. Set AQ_FLEET_ROUTE=0 to shadow." + _AQ_FLEET_SHADOW_WARNED=1 + fi + return 0 +} + +# fleet_flags_state — one-line resolved flag summary (for `status` / `fleet-status`). +fleet_flags_state() { + local route shadow + if [[ "${AQ_FLEET_ROUTE:-1}" == 1 ]]; then route="route_via_service"; else route="local-authoritative"; fi + if fleet_shadow_enabled; then shadow="shadow=ON"; else shadow="shadow=off"; fi + printf 'AQ_FLEET=1 route=%s(AQ_FLEET_ROUTE=%s) %s(AQ_FLEET_SHADOW=%s)' \ + "$route" "${AQ_FLEET_ROUTE:-1}" "$shadow" "${AQ_FLEET_SHADOW:-0}" +} + # ── HTTP (curl only; same output contract as the Slice-4 tracker_api) ── # fleet_api [JSON] -> response body, then a final HTTP-code line. fleet_api() { @@ -241,7 +295,112 @@ _fleet_stage_for() { esac } -# fleet-status — heartbeat (register) + print this factory's identity/caps. +# ── Slice 4: shadow / dual-run (strictly side-effect-free on real job state) ── + +# _fleet_shadow_log -> path to the structured shadow-divergence log. +_fleet_shadow_log() { printf '%s\n' "${AQ_FLEET_SHADOW_LOG:-$STATE/fleet-shadow.log}"; } + +# fleet_shadow_claim — ask the coordinator what it WOULD assign for this factory's +# capabilities, read-only. Side-effect-free on real job state, by construction: +# * uses an ISOLATED shadow factoryId (never the real one), so it can't take a +# job away from the real factory's identity; +# * sends "dryRun":true,"shadow":true — a coordinator that honors it never +# assigns (purely returns the would-be job); +# * if the coordinator DID assign anyway (no dry-run support), the lease is +# released immediately so no real assignment persists; +# * the would-be job is NEVER materialized / run / reported / shipped locally. +# Sets SHADOW_COORD_JOB to the would-be job id ("" = none). Best-effort: any error +# is recorded as SHADOW_ERROR and swallowed — shadow must NEVER fail a real job. +fleet_shadow_claim() { + SHADOW_COORD_JOB="" + fleet_shadow_enabled || return 0 + local caps body; caps=$(fleet_detect_caps) + body="{\"factoryId\":\"$(_json_escape "$AQ_FLEET_SHADOW_FACTORY_ID")\",\"capabilities\":$caps,\"leaseSeconds\":${AQ_FLEET_LEASE_SECONDS:-900},\"dryRun\":true,\"shadow\":true}" + _fleet_call POST "/fleet/claim" "$body" + case "$FLEET_CODE" in + 2*) : ;; + *) printf '%s\t%s\t%s\t%s\n' "$(date +%s)" "" "" "SHADOW_ERROR(claim:HTTP_${FLEET_CODE:-error})" \ + >> "$(_fleet_shadow_log)" 2>/dev/null || true + return 0 ;; + esac + printf '%s' "$FLEET_BODY" | grep -q '"claimed"[[:space:]]*:[[:space:]]*true' || return 0 + local jid epoch + jid=$(printf '%s' "$FLEET_BODY" | _json_str id) + epoch=$(printf '%s' "$FLEET_BODY" | _fleet_json_num leaseEpoch) + SHADOW_COORD_JOB="$jid" + # Undo any REAL lease the coordinator may have created (no dry-run support) so + # the shadow probe leaves zero residue. Best-effort, response ignored. + if [[ -n "$jid" ]]; then + _fleet_call POST "/fleet/jobs/$jid/lease/release" "{\"leaseEpoch\":${epoch:-0},\"shadow\":true}" >/dev/null 2>&1 || true + fi + return 0 +} + +# fleet_shadow_compare — classify the local (authoritative) +# decision against the coordinator's would-be decision and append a structured line +# (tslocalJobcoordJobverdict) to the shadow log. Verdicts: +# AGREE | DIVERGE | COORD_EMPTY | LOCAL_EMPTY. Both-empty is a no-op (nothing to compare). +fleet_shadow_compare() { + fleet_shadow_enabled || return 0 + local lj=${1:-} cj=${2:-} verdict + if [[ -z "$lj" && -z "$cj" ]]; then return 0; fi + if [[ -n "$lj" && -z "$cj" ]]; then verdict=COORD_EMPTY + elif [[ -z "$lj" && -n "$cj" ]]; then verdict=LOCAL_EMPTY + elif [[ "$lj" == "$cj" ]]; then verdict=AGREE + else verdict=DIVERGE; fi + printf '%s\t%s\t%s\t%s\n' "$(date +%s)" "${lj:-}" "${cj:-}" "$verdict" \ + >> "$(_fleet_shadow_log)" 2>/dev/null || true + return 0 +} + +# fleet_shadow_report [stage] — mirror a stage transition +# to the coordinator as a SHADOW event ("shadow":true,"dryRun":true) so the report +# path is EXERCISED, but the coordinator response is NEVER acted on (no fence / +# quarantine / state change) — divergence (e.g. 409) is only logged. Targets the +# would-be coordinator job id; a no-op when there is none. Best-effort + swallowed. +fleet_shadow_report() { + fleet_shadow_enabled || return 0 + local lj=${1:-} cj=${2:-} stage=${3:-building} + [[ -n "$cj" ]] || return 0 + _fleet_call PATCH "/fleet/jobs/$cj" "{\"stage\":\"$(_json_escape "$stage")\",\"shadow\":true,\"dryRun\":true}" + case "${FLEET_CODE:-}" in + 2*) : ;; + *) printf '%s\t%s\t%s\t%s\n' "$(date +%s)" "${lj:-}" "${cj:-}" "SHADOW_REPORT_DIVERGE(HTTP_${FLEET_CODE:-error})" \ + >> "$(_fleet_shadow_log)" 2>/dev/null || true ;; + esac + return 0 +} + +# fleet-shadow-report — summarize the shadow log: per-verdict counts, agreement +# rate, and the last N divergences. Read-only; safe regardless of the flags. +cmd_fleet_shadow_report() { + ensure_dirs + local n=10 logf; logf=$(_fleet_shadow_log) + [[ "${1:-}" =~ ^[0-9]+$ ]] && n=$1 + if [[ ! -s "$logf" ]]; then + log "fleet shadow: no shadow log yet ($logf)." + log "fleet shadow: run with AQ_FLEET=1 AQ_FLEET_ROUTE=0 AQ_FLEET_SHADOW=1 to record divergence." + return 0 + fi + log "fleet shadow report ($logf):" + awk -F'\t' ' + { v=$4; sub(/\(.*/, "", v); c[v]++; tot++ + if (v=="AGREE") ag++ + if (v=="AGREE"||v=="DIVERGE"||v=="COORD_EMPTY"||v=="LOCAL_EMPTY") dec++ } + END { + split("AGREE DIVERGE COORD_EMPTY LOCAL_EMPTY SHADOW_ERROR SHADOW_REPORT_DIVERGE", ord, " ") + for (i=1; i<=6; i++) printf " %-22s %d\n", ord[i], c[ord[i]]+0 + printf " %-22s %d\n", "TOTAL", tot+0 + if (dec>0) printf " %-22s %d%%\n", "AGREEMENT", int(100*ag/dec) + }' "$logf" + log "last $n divergence/error events:" + grep -E "$(printf '\t')(DIVERGE|COORD_EMPTY|LOCAL_EMPTY|SHADOW_ERROR|SHADOW_REPORT_DIVERGE)" "$logf" 2>/dev/null \ + | tail -n "$n" \ + | awk -F'\t' '{ printf " ts=%s local=%s coord=%s verdict=%s\n", $1, $2, $3, $4 }' || true + return 0 +} + +# fleet-status — heartbeat (register) + print this factory's identity/caps + flags. cmd_fleet_status() { ensure_dirs if ! fleet_enabled; then @@ -249,9 +408,16 @@ cmd_fleet_status() { return 0 fi log "fleet: factory=$C_BOLD$AQ_FACTORY_ID$C_RESET api=$AQ_FLEET_API" + log "fleet: flags=$(fleet_flags_state)" + fleet_flags_warn_once log "fleet: capabilities=$(fleet_detect_caps)" + if fleet_shadow_enabled; then + log "fleet: SHADOW/dual-run mode — local inbox is authoritative; coordinator queried for comparison only (never acted on)." + elif ! fleet_route_enabled; then + log "fleet: ROUTE off — local inbox is authoritative; coordinator not used to source work." + fi if fleet_heartbeat; then - log "fleet: heartbeat OK (registered). Use 'run' to start claiming jobs." + log "fleet: heartbeat OK (registered)." else err "fleet: coordinator unreachable — would run in offline-degrade mode." fi diff --git a/agent-queue/selftest.sh b/agent-queue/selftest.sh index f0f9fbe..eb6041a 100755 --- a/agent-queue/selftest.sh +++ b/agent-queue/selftest.sh @@ -826,4 +826,166 @@ else fi unset AQ_FLEET_API_CMD AQ_FLEET_CWD AQ_FSTUB_CALLS AQ_FSTUB_CLAIM_FLAG AQ_FSTUB_JOB_ID AQ_FSTUB_BODY +# ───────────────────────────────────────────────────────────────────── +# Phase 2 — Slice 4 cases (feature flags + shadow / dual-run). Reuses the +# fleet stub. SHADOW mode = AQ_FLEET=1 + AQ_FLEET_ROUTE=0 + AQ_FLEET_SHADOW=1: +# the LOCAL inbox is authoritative; the coordinator is queried in parallel and +# compared, NEVER acted on; verdicts land in the shadow log. (Check 33 already +# covers flags-off ⇒ zero coordinator calls.) +# ───────────────────────────────────────────────────────────────────── +mk_local_job() { # + printf '%s\n' '---' 'engine: devin' "cwd: $work" 'yolo: true' "idempotency-key: $3" '---' '' "# local $2" \ + > "$1/inbox/$2.md" +} +# verdict_has — exact structured-line match +verdict_has() { awk -F'\t' -v l="$2" -v c="$3" -v v="$4" '$2==l && $3==c && $4==v{f=1} END{exit f?0:1}' "$1"; } + +# 40. SHADOW AGREE: coord would-be == local key → AGREE; the real job still ships +# via the offline/local path; NO coordinator job is materialized locally; nothing +# quarantined; and the probe is read-only (dryRun + isolated -shadow factoryId). +export AGENT_QUEUE_ROOT="$tmp/queue-sh-agree"; "$AQ" init >/dev/null +slog="$tmp/sh-agree.log"; scalls="$tmp/sh-agree-calls.log"; : > "$scalls"; rm -f "$slog" +mk_local_job "$AGENT_QUEUE_ROOT" "locA" "fjob_same" +AQ_FLEET=1 AQ_FLEET_ROUTE=0 AQ_FLEET_SHADOW=1 \ + AQ_FLEET_API_CMD="$fstub" AQ_FSTUB_CALLS="$scalls" AQ_FSTUB_JOB_ID="fjob_same" \ + AQ_FLEET_SHADOW_LOG="$slog" DEVIN_BIN="$stub" "$AQ" run --once >/dev/null 2>&1 +if verdict_has "$slog" fjob_same fjob_same AGREE \ + && ls "$AGENT_QUEUE_ROOT"/review/locA.md >/dev/null 2>&1 \ + && [ -z "$(ls "$AGENT_QUEUE_ROOT"/failed 2>/dev/null)" ] \ + && ! ls "$AGENT_QUEUE_ROOT"/review/*fleet-* >/dev/null 2>&1 \ + && grep -q '"dryRun":true' "$scalls" && grep -q -- '-shadow"' "$scalls"; then + pass "fleet shadow AGREE: local==coord ⇒ AGREE; real job ships offline; no coord job materialized; read-only probe" +else + cat "$slog" "$scalls" >&2; fail "shadow AGREE behaved incorrectly" +fi + +# 41. SHADOW DIVERGE: coord would-be != local key → DIVERGE; real job still ships; +# nothing quarantined. +export AGENT_QUEUE_ROOT="$tmp/queue-sh-div"; "$AQ" init >/dev/null +slog="$tmp/sh-div.log"; : > "$tmp/sh-div-calls.log"; rm -f "$slog" +mk_local_job "$AGENT_QUEUE_ROOT" "locB" "fjob_local" +AQ_FLEET=1 AQ_FLEET_ROUTE=0 AQ_FLEET_SHADOW=1 \ + AQ_FLEET_API_CMD="$fstub" AQ_FSTUB_CALLS="$tmp/sh-div-calls.log" AQ_FSTUB_JOB_ID="fjob_remote" \ + AQ_FLEET_SHADOW_LOG="$slog" DEVIN_BIN="$stub" "$AQ" run --once >/dev/null 2>&1 +if verdict_has "$slog" fjob_local fjob_remote DIVERGE \ + && ls "$AGENT_QUEUE_ROOT"/review/locB.md >/dev/null 2>&1 \ + && [ -z "$(ls "$AGENT_QUEUE_ROOT"/failed 2>/dev/null)" ]; then + pass "fleet shadow DIVERGE: local!=coord ⇒ DIVERGE logged; real job still completes; nothing quarantined" +else + cat "$slog" >&2; fail "shadow DIVERGE behaved incorrectly" +fi + +# 42. SHADOW COORD_EMPTY: coordinator returns claimed:false → COORD_EMPTY; real job ships. +export AGENT_QUEUE_ROOT="$tmp/queue-sh-ce"; "$AQ" init >/dev/null +slog="$tmp/sh-ce.log"; rm -f "$slog"; ceflag="$tmp/sh-ce-flag"; : > "$ceflag" # flag present ⇒ stub claimed:false +mk_local_job "$AGENT_QUEUE_ROOT" "locC" "fjob_ce" +AQ_FLEET=1 AQ_FLEET_ROUTE=0 AQ_FLEET_SHADOW=1 \ + AQ_FLEET_API_CMD="$fstub" AQ_FSTUB_CALLS="$tmp/sh-ce-calls.log" AQ_FSTUB_CLAIM_FLAG="$ceflag" \ + AQ_FLEET_SHADOW_LOG="$slog" DEVIN_BIN="$stub" "$AQ" run --once >/dev/null 2>&1 +if verdict_has "$slog" fjob_ce "" COORD_EMPTY \ + && ls "$AGENT_QUEUE_ROOT"/review/locC.md >/dev/null 2>&1; then + pass "fleet shadow COORD_EMPTY: coordinator empty ⇒ COORD_EMPTY logged; real job still completes" +else + cat "$slog" >&2; fail "shadow COORD_EMPTY behaved incorrectly" +fi + +# 43. SHADOW NON-FATAL: a coordinator 5xx during the shadow claim must NOT fail the +# real job — it still completes (review/), exit 0, and a SHADOW_ERROR is recorded. +fstub5xx="$tmp/fleet-stub-5xx.sh" +cat > "$fstub5xx" <<'STUBEOF' +#!/usr/bin/env bash +[ -n "${AQ_FSTUB_CALLS:-}" ] && printf '%s %s :: %s\n' "$1" "$2" "$3" >> "$AQ_FSTUB_CALLS" +case "$1 $2" in + "POST /fleet/factories/heartbeat") printf '%s\n200\n' '{"ok":true}' ;; + "POST /fleet/claim") printf '%s\n500\n' '{}' ;; + *) printf '%s\n200\n' '{}' ;; +esac +STUBEOF +chmod +x "$fstub5xx" +export AGENT_QUEUE_ROOT="$tmp/queue-sh-5xx"; "$AQ" init >/dev/null +slog="$tmp/sh-5xx.log"; rm -f "$slog" +mk_local_job "$AGENT_QUEUE_ROOT" "locD" "fjob_5xx" +AQ_FLEET=1 AQ_FLEET_ROUTE=0 AQ_FLEET_SHADOW=1 \ + AQ_FLEET_API_CMD="$fstub5xx" AQ_FSTUB_CALLS="$tmp/sh-5xx-calls.log" \ + AQ_FLEET_SHADOW_LOG="$slog" DEVIN_BIN="$stub" "$AQ" run --once >/dev/null 2>&1; rc=$? +if [ "$rc" -eq 0 ] && ls "$AGENT_QUEUE_ROOT"/review/locD.md >/dev/null 2>&1 \ + && grep -q 'SHADOW_ERROR' "$slog" 2>/dev/null \ + && [ -z "$(ls "$AGENT_QUEUE_ROOT"/failed 2>/dev/null)" ]; then + pass "fleet shadow NON-FATAL: coordinator 5xx ⇒ real job completes (exit 0), SHADOW_ERROR noted, not quarantined" +else + echo "rc=$rc"; cat "$slog" >&2; fail "shadow non-fatal behaved incorrectly" +fi + +# 44. ROUTE precedence: AQ_FLEET_ROUTE=1 + AQ_FLEET_SHADOW=1 ⇒ ROUTE wins — the +# coordinator sources work (job materialized + run), a one-shot warning is logged, +# and NO shadow comparison happens (shadow log stays empty/absent). +export AGENT_QUEUE_ROOT="$tmp/queue-sh-prec"; export AQ_FLEET_CWD="$work"; "$AQ" init >/dev/null +slog="$tmp/sh-prec.log"; rm -f "$slog" +AQ_FLEET=1 AQ_FLEET_ROUTE=1 AQ_FLEET_SHADOW=1 \ + AQ_FLEET_API_CMD="$fstub" AQ_FSTUB_CALLS="$tmp/sh-prec-calls.log" AQ_FSTUB_CLAIM_FLAG="$tmp/sh-prec-claimed" \ + AQ_FSTUB_JOB_ID="fjob_p" AQ_FLEET_SHADOW_LOG="$slog" DEVIN_BIN="$stub" \ + "$AQ" run --once >/dev/null 2>"$tmp/sh-prec.err" +if grep -qi 'ROUTE wins' "$tmp/sh-prec.err" \ + && ls "$AGENT_QUEUE_ROOT"/review/*fleet-fjob_p.md >/dev/null 2>&1 \ + && [ ! -s "$slog" ]; then + pass "fleet ROUTE precedence: ROUTE=1 + SHADOW=1 ⇒ ROUTE path + warning, no shadow compare" +else + cat "$tmp/sh-prec.err" >&2; fail "ROUTE>SHADOW precedence behaved incorrectly" +fi +unset AQ_FLEET_CWD + +# 45. ROUTE=0 + AQ_FLEET=1 (no shadow): LOCAL inbox authoritative — the coordinator +# is NOT used to source work (zero /fleet/claim), and the local job still completes. +export AGENT_QUEUE_ROOT="$tmp/queue-sh-route0"; "$AQ" init >/dev/null +scalls="$tmp/sh-route0-calls.log"; : > "$scalls" +mk_local_job "$AGENT_QUEUE_ROOT" "locE" "fjob_e" +AQ_FLEET=1 AQ_FLEET_ROUTE=0 AQ_FLEET_SHADOW=0 \ + AQ_FLEET_API_CMD="$fstub" AQ_FSTUB_CALLS="$scalls" AQ_FSTUB_JOB_ID="fjob_e" \ + DEVIN_BIN="$stub" "$AQ" run --once >/dev/null 2>&1 +if ! grep -q 'POST /fleet/claim' "$scalls" \ + && ls "$AGENT_QUEUE_ROOT"/review/locE.md >/dev/null 2>&1; then + pass "fleet ROUTE=0: local inbox authoritative — coordinator not used to source work; local job completes" +else + cat "$scalls" >&2; fail "ROUTE=0 local-authoritative behaved incorrectly" +fi + +# 46. fleet-shadow-report: summarize a seeded shadow log (counts + agreement rate). +rlog="$tmp/sh-report.log" +printf '%s\t%s\t%s\t%s\n' \ + 100 fjob_a fjob_a AGREE \ + 101 fjob_b fjob_b AGREE \ + 102 fjob_c fjob_x DIVERGE \ + 103 fjob_d '' COORD_EMPTY > "$rlog" +rout=$(AQ_FLEET_SHADOW_LOG="$rlog" "$AQ" fleet-shadow-report 2>&1) +if printf '%s\n' "$rout" | grep -qE 'AGREE +2' \ + && printf '%s\n' "$rout" | grep -qE 'DIVERGE +1' \ + && printf '%s\n' "$rout" | grep -qE 'COORD_EMPTY +1' \ + && printf '%s\n' "$rout" | grep -qE 'TOTAL +4' \ + && printf '%s\n' "$rout" | grep -qE 'AGREEMENT +50%'; then + pass "fleet-shadow-report: per-verdict counts (AGREE 2 / DIVERGE 1 / COORD_EMPTY 1), TOTAL 4, AGREEMENT 50%" +else + printf '%s\n' "$rout" >&2; fail "fleet-shadow-report summary incorrect" +fi + +# 47. fleet_shadow_report (unit): mirrors a stage transition as a SHADOW event +# (shadow:true) against the would-be coord job; the response is never acted on. +funcs2="$tmp/aq-funcs-sh.sh"; sed '/^main "\$@"/d' "$AQ" > "$funcs2" +rep_calls="$tmp/sh-rep-calls.log"; : > "$rep_calls" +if bash -c ' + set -uo pipefail + export AGENT_QUEUE_ROOT="'"$tmp"'/queue-sh-rep" AQ_FLEET=1 AQ_FLEET_ROUTE=0 AQ_FLEET_SHADOW=1 + export AQ_FLEET_API_CMD="'"$fstub"'" AQ_FSTUB_CALLS="'"$rep_calls"'" + source "'"$funcs2"'" + source "'"$HERE"'/lib/fleet-client.sh" + ensure_dirs + fleet_shadow_report js fjob_r building +'; then + grep -q 'PATCH /fleet/jobs/fjob_r :: .*"shadow":true' "$rep_calls" \ + && pass "fleet_shadow_report: PATCH carries shadow:true (report exercised, response ignored)" \ + || { cat "$rep_calls" >&2; fail "shadow report payload missing shadow flag"; } +else + fail "fleet_shadow_report invocation errored" +fi +unset AQ_FLEET_API_CMD AQ_FLEET_SHADOW_LOG AGENT_QUEUE_ROOT + echo "self-test PASS"