diff --git a/agent-queue/README.md b/agent-queue/README.md index 9c3a453..3b8dbe6 100644 --- a/agent-queue/README.md +++ b/agent-queue/README.md @@ -378,6 +378,61 @@ execution**: an echo failure is logged and the job continues unchanged. With `AQ_TRACKER_AUTO=1` the worker echoes automatically on each transition; otherwise echo is manual. `status` / `insights` surface the `tracker-item` and last echoed status. +## Fleet integration (Phase 2) + +Behind the `AQ_FLEET` flag, the runner becomes a **factory** that registers, +heartbeats, claims, and reports against the platform-service `fleet` coordinator — +so coordinator jobs run alongside local `.md` files on the same host. All +coordinator logic lives in [`lib/fleet-client.sh`](lib/fleet-client.sh) (curl-only + +POSIX awk, sourced by `agent-queue.sh`); the few hook points in the runner are all +gated on `fleet_enabled`. + +> **Offline vs fleet mode.** With `AQ_FLEET` unset/`0` (the default) the runner is +> the pure offline git-queue described above — **zero** coordinator calls, behavior +> byte-for-byte unchanged. With `AQ_FLEET=1` the run loop also registers + claims +> from the coordinator, reports fenced stage transitions, renews leases, and (in +> fleet mode) routes the outcome echo through the coordinator's `fleet_events` +> instead of the direct tracker echo. The tracker echo remains the offline path. + +```bash +AQ_FLEET=1 AQ_FLEET_TOKEN=… AQ_PRODUCT_ID=… agent-queue.sh fleet-status # register + show identity +AQ_FLEET=1 AQ_FLEET_TOKEN=… AQ_PRODUCT_ID=… agent-queue.sh run # claim + execute coordinator jobs +``` + +### Config (env) + +| Var | Default | Meaning | +| --- | ------- | ------- | +| `AQ_FLEET` | `0` | master switch — `1` enables coordinator integration; `0`/unset = offline git-queue | +| `AQ_FLEET_API` | `http://localhost:4003/api` | coordinator base URL (already includes `/api`) | +| `AQ_FLEET_TOKEN` | _(none)_ | bearer token — never hardcode | +| `AQ_PRODUCT_ID` | _(none)_ | productId (sent as `X-Product-Id`; shared with the tracker config) | +| `AQ_FACTORY_ID` | `-` | stable factory identity for this process | +| `AQ_FLEET_LEASE_RENEW_SEC` | `300` | heartbeat / lease-renew cadence | +| `AQ_FLEET_CAPS` | _(auto)_ | override the auto-detected capability tokens (comma/space list) | +| `AQ_FLEET_CWD` | `$PWD` | cwd a claimed coordinator job runs in | +| `AQ_FLEET_API_CMD` | _(none)_ | test seam: a stub that replaces the curl HTTP entirely (selftest uses it) | + +### Protocol (claim / heartbeat / report / fence / renew) + +- **register / heartbeat:** `POST /fleet/factories/heartbeat {factoryId, capabilities[], health, load}` — registration *is* the first heartbeat; re-sent on `AQ_FLEET_LEASE_RENEW_SEC` cadence. +- **claim:** `POST /fleet/claim {factoryId, capabilities[], leaseSeconds}`. A returned job (`id`, `bodyMd`, `leaseEpoch`) is materialized as a transient local `.md` (frontmatter `fleet-job-id` + `fleet-lease-epoch`) so the existing runner executes it unchanged, interleaved with local files. +- **report (fenced):** each stage transition (`building`/`review`/`testing`/`shipped`/`failed`) is `PATCH /fleet/jobs/:id {stage, leaseEpoch, checkpoint?}`. The coordinator writes `fleet_events` server-side. The payload carries only stage/epoch/checkpoint — **never** the prompt/`bodyMd` or token. +- **fencing (§18):** if a report/renew returns **conflict/409** (stale `leaseEpoch` → the coordinator reclaimed us), the worker **self-aborts**: it stops, does **not** ship/merge, and **quarantines** the local result to `failed/` (`result=fenced_quarantine`) for human triage. A reclaimed zombie can never corrupt coordinator state. +- **lease renew / release:** `POST /fleet/jobs/:id/lease/renew` while building (fenced); `…/lease/release` on terminal stages. +- **checkpoint:** the WIP `{wipBranch, wipCommit}` is sent with the building report so a reclaim can resume (§25). + +### Offline-degrade + quarantine (§9) + +If the coordinator is **unreachable** mid-job (5xx / connection error), the report +is treated as *degraded* (logged, `fleet_degraded=1`): the in-flight job **finishes +locally** rather than being abandoned. On the next reachable call the worker +presents its `leaseEpoch`; if the coordinator now reports it **stale** (it was +reclaimed during the outage), the local result is **quarantined** (marked, not +auto-shipped) and surfaced for human triage — split-brain is resolved in favor of +the coordinator without losing the work. `status` shows the factory id + per-job +`fleet=@e`; `insights` lists the `fleet_*` fields. + ## Config (env overrides) | Var | Default | Meaning | diff --git a/agent-queue/agent-queue.sh b/agent-queue/agent-queue.sh index 4046dd6..91fc63e 100755 --- a/agent-queue/agent-queue.sh +++ b/agent-queue/agent-queue.sh @@ -697,6 +697,17 @@ run_worker() { trap '_worker_trap; exit 143' INT TERM _wip_start "$job" "$cwd" "$metaf" "$logf" || true + # ── Fleet (§7/§18): report `building` (with WIP checkpoint) to the coordinator. + # If the lease is stale (we were reclaimed) the report is FENCED -> self-abort and + # quarantine WITHOUT running the agent. No-op for non-fleet jobs / flag off. ── + if fleet_enabled && _fleet_is_job "$job"; then + fleet_report "$job" building checkpoint; local _frc=$? + if [[ "$_frc" -eq 2 ]]; then + fleet_quarantine "$job" "$doing_file" "$metaf" "$logf" + return 0 + fi + fi + _run_agent() { if [[ -n "$AGENT_STDIN" ]]; then ( cd "$cwd" && "${AGENT_CMD[@]}" < "$AGENT_STDIN" ) @@ -764,6 +775,16 @@ run_worker() { echo "TIMED OUT after ${tmo}s (rc=$rc): $(date)" >> "$logf" _finish_failure "$job" "$doing_file" "$metaf" "$logf" "timeout" "$rc" "$started" elif [[ $rc -eq 0 ]]; then + # Fleet (§18): re-confirm the lease before accepting the agent's output. If the + # coordinator reclaimed us mid-run (offline-degrade then reconnect to a stale + # epoch), the report is FENCED -> quarantine the local result, NEVER ship. + if fleet_enabled && _fleet_is_job "$job"; then + fleet_report "$job" review checkpoint; local _rrc=$? + if [[ "$_rrc" -eq 2 ]]; then + fleet_quarantine "$job" "$doing_file" "$metaf" "$logf" + return 0 + fi + fi # Agent succeeded: land in review/, then run the auto-QA verify gate. The # worker is still alive here so the concurrency slot stays held through # verification — `ended=` is written only once we reach a resting stage. @@ -1263,11 +1284,20 @@ cmd_to_tracker() { log "to-tracker: echoed $jn -> item $item_id (status=$status)" } -# _auto_echo — opt-in (AQ_TRACKER_AUTO=1) best-effort echo on a transition. -# Never blocks or fails the job: the tracker is downstream, not authoritative. +# _auto_echo — best-effort outcome echo on a transition (never fatal). +# Fleet mode (AQ_FLEET=1, fleet job): route the echo through the coordinator as a +# fenced stage report — `fleet_events` becomes the audit source of truth, so we do +# NOT also post to the tracker directly. Offline (non-fleet) jobs keep the Slice-4 +# direct tracker echo (opt-in via AQ_TRACKER_AUTO). _auto_echo() { + local job=$1 + if declare -f fleet_enabled >/dev/null 2>&1 && fleet_enabled && _fleet_is_job "$job"; then + local result; result=$(_meta_val "$STATE/$job.meta" result) + fleet_report "$job" "$(_fleet_stage_for "$result")" >/dev/null 2>&1 || true + return 0 + fi [[ "$AQ_TRACKER_AUTO" == 1 ]] || return 0 - cmd_to_tracker "$1" >/dev/null 2>&1 || true + cmd_to_tracker "$job" >/dev/null 2>&1 || true } cmd_init() { ensure_dirs; log "queue initialized at $C_BOLD$QUEUE_ROOT$C_RESET"; } @@ -1382,10 +1412,20 @@ cmd_run() { # Crash recovery (§25.3): reclaim jobs orphaned in building/ by a previous # crash/power-off before launching anything new. recover_orphans + # Fleet (§8): register with the coordinator (registration == first heartbeat). + fleet_enabled && fleet_heartbeat while true; do # continuously sweep for orphans (a worker that died mid-loop) recover_orphans + # Fleet (§7/§8): heartbeat on cadence, renew leases for in-flight fleet jobs, + # and — if we have capacity — claim one coordinator job into inbox/ so the + # normal selection loop below executes it interleaved with local .md files. + if fleet_enabled; then + fleet_heartbeat_maybe + fleet_renew_active + [[ "$(active_workers)" -lt "$MAX_CONCURRENCY" ]] && { fleet_claim >/dev/null 2>&1 || true; } + fi local running; running=$(active_workers) # launch jobs while we have capacity and an eligible inbox file while [[ "$running" -lt "$MAX_CONCURRENCY" ]]; do @@ -1452,6 +1492,8 @@ cmd_run() { echo "review_policy=$(fm_eff "$doing_file" review-policy "" review-policy)" echo "artifacts=$(fm_get "$doing_file" artifacts "")" echo "tracker_item=$(fm_get "$doing_file" tracker-item "")" + echo "fleet_job_id=$(fm_get "$doing_file" fleet-job-id "")" + echo "fleet_lease_epoch=$(fm_get "$doing_file" fleet-lease-epoch "")" } > "$STATE/$job.meta" run_worker "$doing_file" & { echo "pid=$!"; echo "pidstart=$(_pidstart "$!")"; } >> "$STATE/$job.meta" @@ -1482,6 +1524,9 @@ cmd_status() { local running; running=$(active_workers) echo printf '%s AGENT QUEUE %s %s\n' "$C_BOLD" "$C_DIM$QUEUE_ROOT$C_RESET" "" + if fleet_enabled; then + printf ' %sFLEET%s factory=%s api=%s\n' "$C_CYAN" "$C_RESET" "$AQ_FACTORY_ID" "$AQ_FLEET_API" + fi printf ' %sinbox%s %-3s %sbuilding%s %-3s %sreview%s %-3s %stesting%s %-3s %sshipped%s %-3s %sfailed%s %-3s %srunning%s %s/%s\n\n' \ "$C_BLUE" "$C_RESET" "$ib" "$C_YEL" "$C_RESET" "$bd" \ "$C_CYAN" "$C_RESET" "$rv" "$C_CYAN" "$C_RESET" "$ts" \ @@ -1511,12 +1556,16 @@ cmd_status() { local m_prio m_prof m_caps m_trk extra="" m_prio=$(grep '^priority=' "$f" | cut -d= -f2-); m_prof=$(grep '^profile=' "$f" | cut -d= -f2-) m_caps=$(grep '^capabilities=' "$f" | cut -d= -f2-); m_trk=$(grep '^tracker_item=' "$f" | cut -d= -f2-) - local m_echo; m_echo=$(grep '^tracker_echoed=' "$f" | tail -1 | cut -d= -f2-) + local m_echo m_fleet m_epoch + m_echo=$(grep '^tracker_echoed=' "$f" | tail -1 | cut -d= -f2-) + m_fleet=$(grep '^fleet_job_id=' "$f" | tail -1 | cut -d= -f2-) + m_epoch=$(grep '^fleet_lease_epoch=' "$f" | tail -1 | cut -d= -f2-) [[ -n "$m_prio" ]] && extra+="prio=$m_prio " [[ -n "$m_prof" ]] && extra+="profile=$m_prof " [[ -n "$m_caps" ]] && extra+="caps=$m_caps " [[ -n "$m_trk" ]] && extra+="tracker=$m_trk " [[ -n "$m_echo" ]] && extra+="echoed=$m_echo " + [[ -n "$m_fleet" ]] && extra+="fleet=$m_fleet@e${m_epoch:-0} " [[ -n "$extra" ]] && printf ' %s%s%s\n' "$C_DIM" "$extra" "$C_RESET" printf ' %s%s%s\n' "$C_DIM" "$(_insights_line "$f")" "$C_RESET" done @@ -1559,7 +1608,8 @@ cmd_insights() { for k in engine result attempts started ended duration_s exit verify_exit \ model tokens_in tokens_out tokens_cached cost_usd turns tool_calls usage_estimated \ files_changed lines_added lines_deleted wip_branch wip_base wip_commit \ - next_eligible retry_class recovered tracker_item tracker_echoed tracker_echoed_at; do + next_eligible retry_class recovered tracker_item tracker_echoed tracker_echoed_at \ + fleet_job_id fleet_lease_epoch fleet_reported fleet_fenced fleet_degraded fleet_quarantined; do val=$(_meta_val "$f" "$k") [[ -n "$val" ]] && printf ' %-15s %s\n' "$k" "$val" done @@ -1669,10 +1719,23 @@ cmd_ship() { local f; f=$(_find_job "$job" "$TESTING") [[ -n "$f" ]] || die "no job in testing/ matching '$job' (only QA-passed jobs can ship)" local base name; base=$(basename "$f"); name=${base%.md} + # Fleet (§18): a fleet job may only ship if we still hold the lease. A fenced + # report means the coordinator reclaimed it -> quarantine instead of shipping. + if fleet_enabled && _fleet_is_job "$name"; then + fleet_report "$name" shipped checkpoint; local _src=$? + if [[ "$_src" -eq 2 ]]; then + fleet_quarantine "$name" "$f" "$STATE/$name.meta" "$LOGS/$name.log" + return 0 + fi + fi mv "$f" "$SHIPPED/$base" [[ -f "$STATE/$name.meta" ]] && echo "result=shipped" >> "$STATE/$name.meta" log "shipped $C_BOLD$base$C_RESET (testing → shipped)" _auto_echo "$name" + if fleet_enabled && _fleet_is_job "$name"; then + fleet_lease_release "$name" shipped >/dev/null 2>&1 || true + fi + return 0 } # promote — advance one stage forward: review → testing → shipped. @@ -1764,6 +1827,7 @@ ${C_BOLD}COMMANDS${C_RESET} recover reclaim orphaned building/ jobs (dead worker) -> inbox from-tracker pull a tracker Item -> materialize a job in inbox/ (§10) to-tracker echo a job's outcome to its tracker Item (one-way) + fleet-status (AQ_FLEET=1) register/heartbeat with the coordinator + show identity dash [--interval N] richer live Node dashboard (recent shipped/failed too) stop kill running workers + the run loop logs [-f] print (or follow) a job's log @@ -1819,13 +1883,25 @@ ${C_BOLD}TRACKER${C_RESET} (§10 — from-tracker / to-tracker; real use needs p AQ_TRACKER_AUTO=1 to auto-echo outcomes on each transition (default OFF) AQ_TRACKER_CWD (cwd for tracker-derived jobs) AQ_TRACKER_API_CMD (test stub seam) label hints on an Item: engine-class: profile: priority: cap: + +${C_BOLD}FLEET${C_RESET} (Phase 2 — coordinator integration; OFF by default = offline git-queue) + AQ_FLEET=1 to enable; AQ_FLEET_API (=$AQ_FLEET_API) AQ_FLEET_TOKEN (bearer) AQ_PRODUCT_ID + AQ_FACTORY_ID (=$AQ_FACTORY_ID) AQ_FLEET_LEASE_RENEW_SEC AQ_FLEET_CAPS AQ_FLEET_CWD + When on, 'run' registers + claims coordinator jobs (interleaved with local .md), reports + fenced stage transitions, renews leases, and quarantines a job if it is reclaimed (fenced). EOF } +# Fleet coordinator client (Phase 2). All functions no-op unless AQ_FLEET=1, so the +# offline git-queue path above is unchanged when the flag is off. +# shellcheck source=lib/fleet-client.sh +[[ -f "$SCRIPT_DIR/lib/fleet-client.sh" ]] && source "$SCRIPT_DIR/lib/fleet-client.sh" + main() { local cmd="${1:-help}"; shift || true case "$cmd" in init) cmd_init "$@";; + fleet-status) cmd_fleet_status "$@";; add) cmd_add "$@";; run) cmd_run "$@";; status) cmd_status "$@";; diff --git a/agent-queue/docs/GIGAFACTORY_ROADMAP.md b/agent-queue/docs/GIGAFACTORY_ROADMAP.md index 1d64068..57c52e6 100644 --- a/agent-queue/docs/GIGAFACTORY_ROADMAP.md +++ b/agent-queue/docs/GIGAFACTORY_ROADMAP.md @@ -12,7 +12,7 @@ | ----- | ----- | ------ | - | ---- | | **0** | Baseline (today) | ✅ shipped | 100% | `selftest.sh` green | | **1** | Manifest + profiles + capabilities + tracker adapter (single host) | ◐ in progress | 95% | adapter e2e + selftest | -| **2** | Coordinator as platform-service module + Cosmos + multi-factory leasing | ☐ not started | 0% | fleet e2e + module tests | +| **2** | Coordinator as platform-service module + Cosmos + multi-factory leasing | ◐ in progress | 55% | fleet e2e + module tests | | **3** | Fleet control plane in tracker-web + DAG deps + budgets + scoring router | ☐ not started | 0% | web e2e + router tests | | **4** | Message bus + autoscaling + cross-OS capability marketplace | ☐ not started | 0% | load/chaos suite | | **5** | Self-optimizing / learned routing | ☐ not started | 0% | offline eval + A/B | @@ -367,10 +367,20 @@ Each phase: **Goal → checklist → Exit criteria**. Don't start a phase until ### Phase 2 — Coordinator as platform-service module + Cosmos + multi-factory leasing **Goal:** the service spine; ≥2 real factories executing in parallel via leases. +> **Slice progress — P2-S3 (factory-agent integration, single host):** the bash runner +> is now a coordinator **factory** behind `AQ_FLEET` — `lib/fleet-client.sh` (curl-only, +> sourced) registers via heartbeat, claims jobs into inbox (interleaved with local `.md`), +> reports **fenced** stage transitions with WIP checkpoints, renews/releases leases, and on +> a stale `leaseEpoch` (reclaimed) **self-aborts + quarantines** the local result. Coordinator +> 5xx/connection errors **degrade** (finish locally) rather than abandon work. When `AQ_FLEET` +> is off the offline git-queue path is byte-for-byte unchanged. Remaining P2: scheduler/router +> core, direct tracker→module calls, factory enrollment + scoped tokens, `fleet.*` feature +> flags + shadow/dual-run, and the two-factory parallel demo (the Phase-2 exit criteria). + - [x] Scaffold `fleet`/`orchestrator` module in `platform-service` (`types/repository/routes`, Zod, ESM, `productId`). *(PR #28)* - [x] Cosmos containers (§13) + repository layer (memory + Cosmos providers). *(PR #28; `fleet_artifacts` blob wiring still pending.)* - [x] **Atomic claim** (optimistic concurrency / `_etag`) + **lease reaper** + **fencing (`leaseEpoch`)** endpoints (§4/§8/§9) — *not* Cosmos-TTL-driven reclaim. *(common-plat PR #28 + #29; truly atomic via `updateIfMatch`.)* -- [ ] Port `agent-queue` runner to a **factory agent** API client (enroll/register/heartbeat/claim/report, fencing-aware) while keeping git-queue fallback. +- [x] Port `agent-queue` runner to a **factory agent** API client (enroll/register/heartbeat/claim/report, fencing-aware) while keeping git-queue fallback. *(P2-S3: `lib/fleet-client.sh` behind `AQ_FLEET`; registers via heartbeat, claims into inbox, reports fenced stage transitions, renews leases, quarantines on stale-epoch; offline git-queue unchanged when the flag is off.)* - [ ] Scheduler/router core (§7) as a pure module (fixed weights) + wired into atomic assignment. - [ ] Tracker adapter calls the module directly (not just file export). - [ ] Auth: factory enrollment + scoped rotatable tokens; secret isolation enforced (§12 subset). diff --git a/agent-queue/lib/fleet-client.sh b/agent-queue/lib/fleet-client.sh new file mode 100644 index 0000000..be38e40 --- /dev/null +++ b/agent-queue/lib/fleet-client.sh @@ -0,0 +1,258 @@ +# shellcheck shell=bash +# ── Fleet coordinator client (Phase 2, §7/§8/§9/§18) ──────────────── +# +# Sourced by agent-queue.sh. Lets the single-host runner act as a "factory" that +# registers / heartbeats / claims / reports against the platform-service `fleet` +# coordinator — BEHIND the AQ_FLEET flag. When AQ_FLEET is unset/0, every function +# here is an immediate no-op and the offline git-queue path is byte-for-byte +# unchanged. curl-only + POSIX awk (reuses agent-queue.sh helpers: log/err, +# _meta_val, _json_str, _json_escape, detect_capabilities, active_workers, CURL_BIN). +# +# Contract (routes under AQ_FLEET_API, which already includes /api): +# POST /fleet/factories/heartbeat {factoryId, capabilities[], health, load} +# POST /fleet/claim {factoryId, capabilities[], leaseSeconds} +# -> {claimed, job{id,bodyMd,leaseEpoch}, lease{...}} +# PATCH /fleet/jobs/:id {stage, leaseEpoch, checkpoint?} (409 = fenced) +# POST /fleet/jobs/:id/lease/renew {leaseEpoch, leaseSeconds} (409 = fenced) +# POST /fleet/jobs/:id/lease/release {leaseEpoch, stage?} +# The coordinator owns leaseEpoch fencing + writes fleet_events server-side; there +# is no client-side "register" or "append event" call (register == first heartbeat). + +# ── Config (env-overridable) ──────────────────────────────────────── +AQ_FLEET="${AQ_FLEET:-0}" # master switch (0 = offline) +AQ_FLEET_API="${AQ_FLEET_API:-http://localhost:4003/api}" # base URL incl. /api +AQ_FLEET_TOKEN="${AQ_FLEET_TOKEN:-}" # bearer; never hardcode +# AQ_PRODUCT_ID is shared with the Slice-4 tracker config (X-Product-Id header). +AQ_FACTORY_ID="${AQ_FACTORY_ID:-$( (hostname -s 2>/dev/null || hostname 2>/dev/null || echo factory) | tr -cd 'A-Za-z0-9._-')-$$}" +AQ_FLEET_LEASE_RENEW_SEC="${AQ_FLEET_LEASE_RENEW_SEC:-300}" # heartbeat/renew cadence +AQ_FLEET_LEASE_SECONDS="${AQ_FLEET_LEASE_SECONDS:-900}" # requested lease duration +AQ_FLEET_CAPS="${AQ_FLEET_CAPS:-}" # override caps (comma/space list) +AQ_FLEET_CWD="${AQ_FLEET_CWD:-$PWD}" # cwd for claimed fleet jobs +AQ_FLEET_API_CMD="${AQ_FLEET_API_CMD:-}" # test seam (stub script) +AQ_FLEET_HB_TS=0 # last heartbeat epoch (mutable) + +# fleet_enabled — true iff the coordinator integration is switched on. +fleet_enabled() { [[ "${AQ_FLEET:-0}" == 1 ]]; } + +# ── HTTP (curl only; same output contract as the Slice-4 tracker_api) ── +# fleet_api [JSON] -> response body, then a final HTTP-code line. +fleet_api() { + local method=$1 path=$2 body=${3:-} + if [[ -n "$AQ_FLEET_API_CMD" ]]; then + "$AQ_FLEET_API_CMD" "$method" "$path" "$body" + return $? + fi + local url="${AQ_FLEET_API}${path}" + local -a args=(-sS -m "${AQ_FLEET_TIMEOUT:-30}" -X "$method" + -H "Content-Type: application/json" -w '\n%{http_code}') + [[ -n "$AQ_FLEET_TOKEN" ]] && args+=(-H "Authorization: Bearer $AQ_FLEET_TOKEN") + [[ -n "$AQ_PRODUCT_ID" ]] && args+=(-H "X-Product-Id: $AQ_PRODUCT_ID") + [[ -n "$body" ]] && args+=(--data "$body") + local out rc + out=$("$CURL_BIN" "${args[@]}" "$url" 2>/dev/null); rc=$? + if [[ $rc -ne 0 ]]; then printf '%s\n000\n' "$out"; else printf '%s\n' "$out"; fi +} + +# _fleet_call [JSON] -> sets globals FLEET_BODY + FLEET_CODE. +_fleet_call() { + local out; out=$(fleet_api "$@") + FLEET_CODE=$(printf '%s' "$out" | tail -n1) + FLEET_BODY=$(printf '%s' "$out" | sed '$d') +} + +# _fleet_json_num (reads JSON on stdin) -> first numeric value for key. +_fleet_json_num() { + grep -oE "\"$1\"[[:space:]]*:[[:space:]]*-?[0-9]+" | head -1 | grep -oE -- '-?[0-9]+$' +} + +# _fleet_is_job -> 0 if this job was claimed from the coordinator. +_fleet_is_job() { [[ -n "$(_meta_val "$STATE/$1.meta" fleet_job_id)" ]]; } + +# fleet_detect_caps -> JSON array of capability tokens (override or auto-detected). +fleet_detect_caps() { + local toks + if [[ -n "$AQ_FLEET_CAPS" ]]; then + toks=$(printf '%s' "$AQ_FLEET_CAPS" | tr ', ' '\n\n') + else + toks=$(detect_capabilities) + fi + local out="[" first=1 t + while IFS= read -r t; do + [[ -n "$t" ]] || continue + [[ $first -eq 1 ]] && first=0 || out+="," + out+="\"$(_json_escape "$t")\"" + done <<< "$toks" + printf '%s]' "$out" +} + +# ── Heartbeat (registration == first heartbeat) ───────────────────── +fleet_heartbeat() { + fleet_enabled || return 0 + local caps load body + caps=$(fleet_detect_caps) + load=$(active_workers 2>/dev/null || echo 0) + body="{\"factoryId\":\"$(_json_escape "$AQ_FACTORY_ID")\",\"capabilities\":$caps,\"health\":\"ok\",\"load\":${load:-0}}" + _fleet_call POST "/fleet/factories/heartbeat" "$body" + case "$FLEET_CODE" in + 2*) AQ_FLEET_HB_TS=$(date +%s); return 0;; + *) err "fleet: heartbeat failed (HTTP ${FLEET_CODE:-error}) — running degraded"; return 1;; + esac +} + +# fleet_heartbeat_maybe — heartbeat only when the cadence interval has elapsed. +fleet_heartbeat_maybe() { + fleet_enabled || return 0 + local now; now=$(date +%s) + [[ $(( now - ${AQ_FLEET_HB_TS:-0} )) -ge "${AQ_FLEET_LEASE_RENEW_SEC:-300}" ]] && fleet_heartbeat + return 0 +} + +# ── Claim — pull one job and materialize it as a local inbox .md ──── +# Returns 0 = claimed + materialized, 2 = nothing claimable, 1 = API error. +fleet_claim() { + fleet_enabled || return 2 + local caps body; caps=$(fleet_detect_caps) + body="{\"factoryId\":\"$(_json_escape "$AQ_FACTORY_ID")\",\"capabilities\":$caps,\"leaseSeconds\":${AQ_FLEET_LEASE_SECONDS:-900}}" + _fleet_call POST "/fleet/claim" "$body" + case "$FLEET_CODE" in 2*) :;; *) err "fleet: claim failed (HTTP ${FLEET_CODE:-error})"; return 1;; esac + printf '%s' "$FLEET_BODY" | grep -q '"claimed"[[:space:]]*:[[:space:]]*true' || return 2 + + local jid body_md epoch + jid=$(printf '%s' "$FLEET_BODY" | _json_str id) + body_md=$(printf '%s' "$FLEET_BODY" | _json_str bodyMd) + epoch=$(printf '%s' "$FLEET_BODY" | _fleet_json_num leaseEpoch) + [[ -n "$jid" ]] || { err "fleet: claim returned no job id"; return 1; } + + # Materialize a transient local job .md (same approach as from-tracker) so the + # existing runner executes a coordinator job unchanged. fleet-job-id + + # fleet-lease-epoch travel in frontmatter -> the job meta (see cmd_run). + local safe tmpdir tmp + safe=$(printf '%s' "$jid" | tr -c 'A-Za-z0-9._-' '_') + tmpdir=$(mktemp -d "${TMPDIR:-/tmp}/aq-fleet.XXXXXX") + tmp="$tmpdir/fleet-$safe.md" + { + echo "---" + echo "cwd: $AQ_FLEET_CWD" + echo "yolo: true" + echo "fleet-job-id: $jid" + echo "fleet-lease-epoch: ${epoch:-0}" + echo "idempotency-key: fleet-$jid" + echo "---" + echo + printf '%s\n' "$body_md" + } > "$tmp" + cmd_add "$tmp" >/dev/null 2>&1 + rm -rf "$tmpdir" + log "fleet: claimed job $C_BOLD$jid$C_RESET (leaseEpoch=${epoch:-0})" + return 0 +} + +# ── Report a fenced stage transition ──────────────────────────────── +# fleet_report [with-checkpoint] -> 0 ok, 2 FENCED (stale epoch: +# caller must self-abort), 1 degraded (coordinator unreachable: continue locally). +fleet_report() { + fleet_enabled || return 0 + local job=$1 stage=$2 with_ckpt=${3:-} metaf jid epoch + metaf="$STATE/$job.meta" + jid=$(_meta_val "$metaf" fleet_job_id); epoch=$(_meta_val "$metaf" fleet_lease_epoch) + [[ -n "$jid" ]] || return 0 + local ckpt="" + if [[ -n "$with_ckpt" ]]; then + local wb wc; wb=$(_meta_val "$metaf" wip_branch); wc=$(_meta_val "$metaf" wip_commit) + if [[ -n "$wb" ]]; then + ckpt=",\"checkpoint\":{\"wipBranch\":\"$(_json_escape "$wb")\"" + [[ -n "$wc" ]] && ckpt+=",\"wipCommit\":\"$(_json_escape "$wc")\"" + ckpt+="}" + fi + fi + # payload carries ONLY {stage, leaseEpoch, checkpoint} — never bodyMd/prompt/token. + _fleet_call PATCH "/fleet/jobs/$jid" "{\"stage\":\"$stage\",\"leaseEpoch\":${epoch:-0}$ckpt}" + case "$FLEET_CODE" in + 2*) echo "fleet_reported=$stage" >> "$metaf"; return 0;; + 409|412) err "fleet: FENCED reporting stage=$stage (stale leaseEpoch=$epoch) — self-aborting $job" + echo "fleet_fenced=1" >> "$metaf"; return 2;; + *) err "fleet: report stage=$stage failed (HTTP ${FLEET_CODE:-error}) — offline-degrade, continuing locally" + echo "fleet_degraded=1" >> "$metaf"; return 1;; + esac +} + +# fleet_lease_renew -> extend the lease; 0 ok, 2 fenced, 1 degraded. +fleet_lease_renew() { + fleet_enabled || return 0 + local job=$1 metaf jid epoch + metaf="$STATE/$job.meta" + jid=$(_meta_val "$metaf" fleet_job_id); epoch=$(_meta_val "$metaf" fleet_lease_epoch) + [[ -n "$jid" ]] || return 0 + _fleet_call POST "/fleet/jobs/$jid/lease/renew" "{\"leaseEpoch\":${epoch:-0},\"leaseSeconds\":${AQ_FLEET_LEASE_SECONDS:-900}}" + case "$FLEET_CODE" in + 2*) return 0;; + 409|412) echo "fleet_fenced=1" >> "$metaf"; return 2;; + *) return 1;; + esac +} + +# fleet_lease_release [stage] -> best-effort release on a terminal stage. +fleet_lease_release() { + fleet_enabled || return 0 + local job=$1 stage=${2:-} metaf jid epoch body + metaf="$STATE/$job.meta" + jid=$(_meta_val "$metaf" fleet_job_id); epoch=$(_meta_val "$metaf" fleet_lease_epoch) + [[ -n "$jid" ]] || return 0 + body="{\"leaseEpoch\":${epoch:-0}" + [[ -n "$stage" ]] && body+=",\"stage\":\"$stage\"" + body+="}" + _fleet_call POST "/fleet/jobs/$jid/lease/release" "$body" + return 0 +} + +# fleet_renew_active — renew leases for all in-flight (building/) fleet jobs. +fleet_renew_active() { + fleet_enabled || return 0 + local f job + for f in "$BUILDING"/*.md; do + [[ -e "$f" ]] || continue + job=$(basename "$f"); job=${job%.md} + _fleet_is_job "$job" && { fleet_lease_renew "$job" >/dev/null 2>&1 || true; } + done + return 0 +} + +# fleet_quarantine — a fenced (reclaimed) worker must +# NOT ship: park the local result in failed/ for human triage (§9 split-brain). +fleet_quarantine() { + local job=$1 file=$2 metaf=$3 logf=$4 + { + echo "FLEET FENCED — the coordinator reclaimed this job (stale leaseEpoch)." + echo "Quarantining the local result — NOT shipping/merging. Needs human triage. ($(date))" + } >> "$logf" + [[ -e "$file" ]] && mv "$file" "$FAILED/" 2>/dev/null + { echo "result=fenced_quarantine"; echo "fleet_quarantined=1"; echo "ended=$(date +%s)"; } >> "$metaf" + err "fleet: quarantined $job (fenced/reclaimed) — surfaced for human triage" +} + +# _fleet_stage_for -> the coordinator stage for a job result/stage. +_fleet_stage_for() { + case "$1" in + shipped) echo shipped;; + testing) echo testing;; + review) echo review;; + failed|timeout|verify_failed|retries_exhausted|capability_mismatch|no_engine|rejected) echo failed;; + *) echo building;; + esac +} + +# fleet-status — heartbeat (register) + print this factory's identity/caps. +cmd_fleet_status() { + ensure_dirs + if ! fleet_enabled; then + log "fleet: AQ_FLEET is off — running in offline git-queue mode (no coordinator)." + return 0 + fi + log "fleet: factory=$C_BOLD$AQ_FACTORY_ID$C_RESET api=$AQ_FLEET_API" + log "fleet: capabilities=$(fleet_detect_caps)" + if fleet_heartbeat; then + log "fleet: heartbeat OK (registered). Use 'run' to start claiming jobs." + else + err "fleet: coordinator unreachable — would run in offline-degrade mode." + fi +} diff --git a/agent-queue/selftest.sh b/agent-queue/selftest.sh index 29a663f..f0f9fbe 100755 --- a/agent-queue/selftest.sh +++ b/agent-queue/selftest.sh @@ -688,4 +688,142 @@ else fi unset AQ_TRACKER_API_CMD AQ_TRACKER_CWD AQ_STUB_CALLS AQ_STUB_ITEM AQ_STUB_CODE AQ_STUB_GET_CODE +# ───────────────────────────────────────────────────────────────────── +# Phase 2 — Slice 3 cases (fleet coordinator integration). A stub replaces +# fleet_api via AQ_FLEET_API_CMD (no live coordinator), records calls + returns +# canned JSON. The flag-off cases prove the offline path is unchanged. +# ───────────────────────────────────────────────────────────────────── +fstub="$tmp/fleet-stub.sh" +cat > "$fstub" <<'STUBEOF' +#!/usr/bin/env bash +# fleet API stub: record " :: "; canned responses by route. +[ -n "${AQ_FSTUB_CALLS:-}" ] && printf '%s %s :: %s\n' "$1" "$2" "$3" >> "$AQ_FSTUB_CALLS" +case "$1 $2" in + "POST /fleet/factories/heartbeat") printf '%s\n200\n' '{"ok":true}' ;; + "POST /fleet/claim") + if [ -n "${AQ_FSTUB_CLAIM_FLAG:-}" ] && [ -f "$AQ_FSTUB_CLAIM_FLAG" ]; then + printf '%s\n200\n' '{"claimed":false}' + else + [ -n "${AQ_FSTUB_CLAIM_FLAG:-}" ] && : > "$AQ_FSTUB_CLAIM_FLAG" + printf '{"claimed":true,"job":{"id":"%s","bodyMd":"%s","leaseEpoch":1},"lease":{"leaseEpoch":1}}\n200\n' \ + "${AQ_FSTUB_JOB_ID:-fjob_1}" "${AQ_FSTUB_BODY:-do the work}" + fi ;; + PATCH\ /fleet/jobs/*) printf '%s\n%s\n' '{}' "${AQ_FSTUB_PATCH_CODE:-200}" ;; + *) printf '%s\n200\n' '{}' ;; +esac +STUBEOF +chmod +x "$fstub" + +# 33. flag OFF (default): a recording stub is configured but AQ_FLEET is unset → +# ZERO fleet calls, and a local job runs through the offline path unchanged. +export AGENT_QUEUE_ROOT="$tmp/queue-floff" +"$AQ" init >/dev/null +export AQ_FLEET_API_CMD="$fstub"; export AQ_FSTUB_CALLS="$tmp/floff-calls.log"; : > "$AQ_FSTUB_CALLS" +printf '%s\n' '---' 'engine: devin' "cwd: $work" 'yolo: true' '---' '' '# local task' \ + > "$AGENT_QUEUE_ROOT/inbox/localjob.md" +DEVIN_BIN="$stub" "$AQ" run --once >/dev/null 2>&1 +if [ ! -s "$AQ_FSTUB_CALLS" ] && ls "$AGENT_QUEUE_ROOT"/review/localjob.md >/dev/null 2>&1; then + pass "fleet flag OFF: zero coordinator calls; offline job completes to review/" +else + cat "$AQ_FSTUB_CALLS" >&2; fail "flag-off made fleet calls or offline job did not complete" +fi +unset AQ_FLEET_API_CMD AQ_FSTUB_CALLS + +# 34. AQ_FLEET=1: loop start registers (heartbeat with caps) + claim executes a +# coordinator job to review/, with fleet_job_id + leaseEpoch persisted in meta. +export AGENT_QUEUE_ROOT="$tmp/queue-fl1"; export AQ_FLEET_CWD="$work" +"$AQ" init >/dev/null +export AQ_FLEET_API_CMD="$fstub" AQ_FSTUB_CALLS="$tmp/fl1-calls.log" AQ_FSTUB_CLAIM_FLAG="$tmp/fl1-claimed" \ + AQ_FSTUB_JOB_ID="fjob_1" AQ_FSTUB_BODY="FLEET-BODY-SENTINEL do work" +: > "$AQ_FSTUB_CALLS"; rm -f "$AQ_FSTUB_CLAIM_FLAG" +AQ_FLEET=1 AGENT_QUEUE_POLL=1 DEVIN_BIN="$stub" "$AQ" run --once >/dev/null 2>&1 +fmeta=$(find "$AGENT_QUEUE_ROOT/.state" -name '*fleet-fjob_1.meta' | head -1) +if grep -q 'POST /fleet/factories/heartbeat :: .*capabilities' "$AQ_FSTUB_CALLS" \ + && grep -q 'POST /fleet/claim' "$AQ_FSTUB_CALLS" \ + && ls "$AGENT_QUEUE_ROOT"/review/*fleet-fjob_1.md >/dev/null 2>&1 \ + && [ "$(metaval "$fmeta" fleet_job_id)" = "fjob_1" ] && [ "$(metaval "$fmeta" fleet_lease_epoch)" = "1" ]; then + pass "fleet: register(heartbeat)+claim -> coordinator job materialized + executed to review/" +else + cat "$AQ_FSTUB_CALLS" >&2; fail "fleet claim/execute did not work as expected" +fi + +# 35. report + checkpoint: PATCH /fleet/jobs/:id carries stage + leaseEpoch, and a +# checkpoint (wipBranch) on building when cwd is a git repo. +export AGENT_QUEUE_ROOT="$tmp/queue-fl2"; repo=$tmp/repo-fl2; mkrepo "$repo"; export AQ_FLEET_CWD="$repo" +"$AQ" init >/dev/null +export AQ_FSTUB_CALLS="$tmp/fl2-calls.log" AQ_FSTUB_CLAIM_FLAG="$tmp/fl2-claimed" AQ_FSTUB_JOB_ID="fjob_2" AQ_FSTUB_BODY="work two" +: > "$AQ_FSTUB_CALLS"; rm -f "$AQ_FSTUB_CLAIM_FLAG" +AQ_FLEET=1 AGENT_QUEUE_POLL=1 DEVIN_BIN="$stub" "$AQ" run --once >/dev/null 2>&1 +if grep -q 'PATCH /fleet/jobs/fjob_2 :: .*"stage":"building".*"leaseEpoch":1' "$AQ_FSTUB_CALLS" \ + && grep -q 'PATCH /fleet/jobs/fjob_2 :: .*"stage":"building".*"wipBranch"' "$AQ_FSTUB_CALLS" \ + && grep -q 'PATCH /fleet/jobs/fjob_2 :: .*"stage":"review"' "$AQ_FSTUB_CALLS"; then + pass "fleet: PATCH stage transitions carry leaseEpoch + checkpoint(wipBranch) on building" +else + cat "$AQ_FSTUB_CALLS" >&2; fail "fleet report/checkpoint payload incorrect" +fi + +# 36. FENCING: PATCH returns conflict (stale epoch) → worker self-aborts, job is +# quarantined to failed/ (NOT review/testing/shipped), fenced is recorded. +export AGENT_QUEUE_ROOT="$tmp/queue-fl3" +"$AQ" init >/dev/null +export AQ_FSTUB_CALLS="$tmp/fl3-calls.log" AQ_FSTUB_CLAIM_FLAG="$tmp/fl3-claimed" AQ_FSTUB_JOB_ID="fjob_3" AQ_FSTUB_BODY="work three" AQ_FSTUB_PATCH_CODE=409 +: > "$AQ_FSTUB_CALLS"; rm -f "$AQ_FSTUB_CLAIM_FLAG" +AQ_FLEET=1 AGENT_QUEUE_POLL=1 DEVIN_BIN="$stub" "$AQ" run --once >/dev/null 2>&1 +fmeta3=$(find "$AGENT_QUEUE_ROOT/.state" -name '*fleet-fjob_3.meta' | head -1) +rcount=$(find "$AGENT_QUEUE_ROOT/review" "$AGENT_QUEUE_ROOT/testing" "$AGENT_QUEUE_ROOT/shipped" -maxdepth 1 -name '*.md' 2>/dev/null | wc -l | tr -d ' ') +if [ "$rcount" = "0" ] && ls "$AGENT_QUEUE_ROOT"/failed/*fleet-fjob_3.md >/dev/null 2>&1 \ + && [ "$(metaval "$fmeta3" result)" = "fenced_quarantine" ] && [ "$(metaval "$fmeta3" fleet_fenced)" = "1" ]; then + pass "fleet FENCING: stale-epoch PATCH -> self-abort + quarantine (never shipped)" +else + cat "$AQ_FSTUB_CALLS" >&2; fail "fleet fencing did not quarantine correctly (review/testing/shipped=$rcount)" +fi +unset AQ_FSTUB_PATCH_CODE + +# 37. lease renew (unit): fleet_lease_renew issues POST .../lease/renew with epoch. +funcs="$tmp/aq-funcs-fl.sh"; sed '/^main "\$@"/d' "$AQ" > "$funcs" +renew_calls="$tmp/renew-calls.log"; : > "$renew_calls" +if bash -c ' + set -uo pipefail + export AGENT_QUEUE_ROOT="'"$tmp"'/queue-renew" AQ_FLEET=1 + export AQ_FLEET_API_CMD="'"$fstub"'" AQ_FSTUB_CALLS="'"$renew_calls"'" + source "'"$funcs"'" # agent-queue helpers (main stripped; SCRIPT_DIR=/tmp here) + source "'"$HERE"'/lib/fleet-client.sh" # source the lib explicitly (relative source is skipped) + ensure_dirs + printf "%s\n" "job=jr" "fleet_job_id=fjob_r" "fleet_lease_epoch=7" > "$STATE/jr.meta" + fleet_lease_renew jr +'; then + grep -q 'POST /fleet/jobs/fjob_r/lease/renew :: .*"leaseEpoch":7' "$renew_calls" \ + && pass "fleet: lease renew issues POST .../lease/renew with current leaseEpoch" \ + || { cat "$renew_calls" >&2; fail "fleet lease renew payload missing/incorrect"; } +else + fail "fleet_lease_renew invocation errored" +fi + +# 38. offline-degrade: a 5xx on PATCH does NOT quarantine — the job finishes locally +# (degraded), reaching review/ with fleet_degraded recorded. +export AGENT_QUEUE_ROOT="$tmp/queue-fl4" +"$AQ" init >/dev/null +export AQ_FSTUB_CALLS="$tmp/fl4-calls.log" AQ_FSTUB_CLAIM_FLAG="$tmp/fl4-claimed" AQ_FSTUB_JOB_ID="fjob_4" AQ_FSTUB_BODY="work four" AQ_FSTUB_PATCH_CODE=500 +: > "$AQ_FSTUB_CALLS"; rm -f "$AQ_FSTUB_CLAIM_FLAG" +AQ_FLEET=1 AGENT_QUEUE_POLL=1 DEVIN_BIN="$stub" "$AQ" run --once >/dev/null 2>&1 +fmeta4=$(find "$AGENT_QUEUE_ROOT/.state" -name '*fleet-fjob_4.meta' | head -1) +if ls "$AGENT_QUEUE_ROOT"/review/*fleet-fjob_4.md >/dev/null 2>&1 \ + && [ "$(metaval "$fmeta4" fleet_degraded)" = "1" ] \ + && [ "$(metaval "$fmeta4" result)" != "fenced_quarantine" ]; then + pass "fleet offline-degrade: coordinator 5xx -> job completes locally (degraded), not quarantined" +else + cat "$AQ_FSTUB_CALLS" >&2; fail "fleet offline-degrade behaved incorrectly" +fi +unset AQ_FSTUB_PATCH_CODE + +# 39. no-leak: the claimed bodyMd is never sent in any report payload, and the +# bearer token never appears in a recorded call (it is a header, not a body). +if ! grep -q 'FLEET-BODY-SENTINEL' "$tmp/fl1-calls.log" 2>/dev/null \ + && ! grep -q 'SENTINEL-TOKEN' "$tmp/fl1-calls.log" 2>/dev/null; then + pass "fleet no-leak: bodyMd/token never appear in coordinator report payloads" +else + fail "fleet leaked bodyMd or token into a report payload" +fi +unset AQ_FLEET_API_CMD AQ_FLEET_CWD AQ_FSTUB_CALLS AQ_FSTUB_CLAIM_FLAG AQ_FSTUB_JOB_ID AQ_FSTUB_BODY + echo "self-test PASS"