Merge PR #5: Phase 2 Slice 3 — factory-agent integration

Wires agent-queue.sh to the fleet coordinator behind AQ_FLEET=1 (offline path
unchanged when off). Fencing-aware (stale leaseEpoch -> self-abort + quarantine),
offline-degrade, tracker echo via fleet_events. selftest 60/60 (53 prior + 7 new);
token env-sourced; no bodyMd/token leak (asserted).
This commit is contained in:
saravanakumardb1 2026-05-29 22:50:56 -07:00
commit 51e4c5f271
5 changed files with 544 additions and 7 deletions

View File

@ -378,6 +378,61 @@ execution**: an echo failure is logged and the job continues unchanged. With
`AQ_TRACKER_AUTO=1` the worker echoes automatically on each transition; otherwise
echo is manual. `status` / `insights` surface the `tracker-item` and last echoed status.
## Fleet integration (Phase 2)
Behind the `AQ_FLEET` flag, the runner becomes a **factory** that registers,
heartbeats, claims, and reports against the platform-service `fleet` coordinator —
so coordinator jobs run alongside local `.md` files on the same host. All
coordinator logic lives in [`lib/fleet-client.sh`](lib/fleet-client.sh) (curl-only +
POSIX awk, sourced by `agent-queue.sh`); the few hook points in the runner are all
gated on `fleet_enabled`.
> **Offline vs fleet mode.** With `AQ_FLEET` unset/`0` (the default) the runner is
> the pure offline git-queue described above — **zero** coordinator calls, behavior
> byte-for-byte unchanged. With `AQ_FLEET=1` the run loop also registers + claims
> from the coordinator, reports fenced stage transitions, renews leases, and (in
> fleet mode) routes the outcome echo through the coordinator's `fleet_events`
> instead of the direct tracker echo. The tracker echo remains the offline path.
```bash
AQ_FLEET=1 AQ_FLEET_TOKEN=… AQ_PRODUCT_ID=… agent-queue.sh fleet-status # register + show identity
AQ_FLEET=1 AQ_FLEET_TOKEN=… AQ_PRODUCT_ID=… agent-queue.sh run # claim + execute coordinator jobs
```
### Config (env)
| Var | Default | Meaning |
| --- | ------- | ------- |
| `AQ_FLEET` | `0` | master switch — `1` enables coordinator integration; `0`/unset = offline git-queue |
| `AQ_FLEET_API` | `http://localhost:4003/api` | coordinator base URL (already includes `/api`) |
| `AQ_FLEET_TOKEN` | _(none)_ | bearer token — never hardcode |
| `AQ_PRODUCT_ID` | _(none)_ | productId (sent as `X-Product-Id`; shared with the tracker config) |
| `AQ_FACTORY_ID` | `<hostname>-<pid>` | stable factory identity for this process |
| `AQ_FLEET_LEASE_RENEW_SEC` | `300` | heartbeat / lease-renew cadence |
| `AQ_FLEET_CAPS` | _(auto)_ | override the auto-detected capability tokens (comma/space list) |
| `AQ_FLEET_CWD` | `$PWD` | cwd a claimed coordinator job runs in |
| `AQ_FLEET_API_CMD` | _(none)_ | test seam: a stub that replaces the curl HTTP entirely (selftest uses it) |
### Protocol (claim / heartbeat / report / fence / renew)
- **register / heartbeat:** `POST /fleet/factories/heartbeat {factoryId, capabilities[], health, load}` — registration *is* the first heartbeat; re-sent on `AQ_FLEET_LEASE_RENEW_SEC` cadence.
- **claim:** `POST /fleet/claim {factoryId, capabilities[], leaseSeconds}`. A returned job (`id`, `bodyMd`, `leaseEpoch`) is materialized as a transient local `.md` (frontmatter `fleet-job-id` + `fleet-lease-epoch`) so the existing runner executes it unchanged, interleaved with local files.
- **report (fenced):** each stage transition (`building`/`review`/`testing`/`shipped`/`failed`) is `PATCH /fleet/jobs/:id {stage, leaseEpoch, checkpoint?}`. The coordinator writes `fleet_events` server-side. The payload carries only stage/epoch/checkpoint — **never** the prompt/`bodyMd` or token.
- **fencing (§18):** if a report/renew returns **conflict/409** (stale `leaseEpoch` → the coordinator reclaimed us), the worker **self-aborts**: it stops, does **not** ship/merge, and **quarantines** the local result to `failed/` (`result=fenced_quarantine`) for human triage. A reclaimed zombie can never corrupt coordinator state.
- **lease renew / release:** `POST /fleet/jobs/:id/lease/renew` while building (fenced); `…/lease/release` on terminal stages.
- **checkpoint:** the WIP `{wipBranch, wipCommit}` is sent with the building report so a reclaim can resume (§25).
### Offline-degrade + quarantine (§9)
If the coordinator is **unreachable** mid-job (5xx / connection error), the report
is treated as *degraded* (logged, `fleet_degraded=1`): the in-flight job **finishes
locally** rather than being abandoned. On the next reachable call the worker
presents its `leaseEpoch`; if the coordinator now reports it **stale** (it was
reclaimed during the outage), the local result is **quarantined** (marked, not
auto-shipped) and surfaced for human triage — split-brain is resolved in favor of
the coordinator without losing the work. `status` shows the factory id + per-job
`fleet=<id>@e<epoch>`; `insights` lists the `fleet_*` fields.
## Config (env overrides)
| Var | Default | Meaning |

View File

@ -697,6 +697,17 @@ run_worker() {
trap '_worker_trap; exit 143' INT TERM
_wip_start "$job" "$cwd" "$metaf" "$logf" || true
# ── Fleet (§7/§18): report `building` (with WIP checkpoint) to the coordinator.
# If the lease is stale (we were reclaimed) the report is FENCED -> self-abort and
# quarantine WITHOUT running the agent. No-op for non-fleet jobs / flag off. ──
if fleet_enabled && _fleet_is_job "$job"; then
fleet_report "$job" building checkpoint; local _frc=$?
if [[ "$_frc" -eq 2 ]]; then
fleet_quarantine "$job" "$doing_file" "$metaf" "$logf"
return 0
fi
fi
_run_agent() {
if [[ -n "$AGENT_STDIN" ]]; then
( cd "$cwd" && "${AGENT_CMD[@]}" < "$AGENT_STDIN" )
@ -764,6 +775,16 @@ run_worker() {
echo "TIMED OUT after ${tmo}s (rc=$rc): $(date)" >> "$logf"
_finish_failure "$job" "$doing_file" "$metaf" "$logf" "timeout" "$rc" "$started"
elif [[ $rc -eq 0 ]]; then
# Fleet (§18): re-confirm the lease before accepting the agent's output. If the
# coordinator reclaimed us mid-run (offline-degrade then reconnect to a stale
# epoch), the report is FENCED -> quarantine the local result, NEVER ship.
if fleet_enabled && _fleet_is_job "$job"; then
fleet_report "$job" review checkpoint; local _rrc=$?
if [[ "$_rrc" -eq 2 ]]; then
fleet_quarantine "$job" "$doing_file" "$metaf" "$logf"
return 0
fi
fi
# Agent succeeded: land in review/, then run the auto-QA verify gate. The
# worker is still alive here so the concurrency slot stays held through
# verification — `ended=` is written only once we reach a resting stage.
@ -1263,11 +1284,20 @@ cmd_to_tracker() {
log "to-tracker: echoed $jn -> item $item_id (status=$status)"
}
# _auto_echo <job> — opt-in (AQ_TRACKER_AUTO=1) best-effort echo on a transition.
# Never blocks or fails the job: the tracker is downstream, not authoritative.
# _auto_echo <job> — best-effort outcome echo on a transition (never fatal).
# Fleet mode (AQ_FLEET=1, fleet job): route the echo through the coordinator as a
# fenced stage report — `fleet_events` becomes the audit source of truth, so we do
# NOT also post to the tracker directly. Offline (non-fleet) jobs keep the Slice-4
# direct tracker echo (opt-in via AQ_TRACKER_AUTO).
_auto_echo() {
local job=$1
if declare -f fleet_enabled >/dev/null 2>&1 && fleet_enabled && _fleet_is_job "$job"; then
local result; result=$(_meta_val "$STATE/$job.meta" result)
fleet_report "$job" "$(_fleet_stage_for "$result")" >/dev/null 2>&1 || true
return 0
fi
[[ "$AQ_TRACKER_AUTO" == 1 ]] || return 0
cmd_to_tracker "$1" >/dev/null 2>&1 || true
cmd_to_tracker "$job" >/dev/null 2>&1 || true
}
cmd_init() { ensure_dirs; log "queue initialized at $C_BOLD$QUEUE_ROOT$C_RESET"; }
@ -1382,10 +1412,20 @@ cmd_run() {
# Crash recovery (§25.3): reclaim jobs orphaned in building/ by a previous
# crash/power-off before launching anything new.
recover_orphans
# Fleet (§8): register with the coordinator (registration == first heartbeat).
fleet_enabled && fleet_heartbeat
while true; do
# continuously sweep for orphans (a worker that died mid-loop)
recover_orphans
# Fleet (§7/§8): heartbeat on cadence, renew leases for in-flight fleet jobs,
# and — if we have capacity — claim one coordinator job into inbox/ so the
# normal selection loop below executes it interleaved with local .md files.
if fleet_enabled; then
fleet_heartbeat_maybe
fleet_renew_active
[[ "$(active_workers)" -lt "$MAX_CONCURRENCY" ]] && { fleet_claim >/dev/null 2>&1 || true; }
fi
local running; running=$(active_workers)
# launch jobs while we have capacity and an eligible inbox file
while [[ "$running" -lt "$MAX_CONCURRENCY" ]]; do
@ -1452,6 +1492,8 @@ cmd_run() {
echo "review_policy=$(fm_eff "$doing_file" review-policy "" review-policy)"
echo "artifacts=$(fm_get "$doing_file" artifacts "")"
echo "tracker_item=$(fm_get "$doing_file" tracker-item "")"
echo "fleet_job_id=$(fm_get "$doing_file" fleet-job-id "")"
echo "fleet_lease_epoch=$(fm_get "$doing_file" fleet-lease-epoch "")"
} > "$STATE/$job.meta"
run_worker "$doing_file" &
{ echo "pid=$!"; echo "pidstart=$(_pidstart "$!")"; } >> "$STATE/$job.meta"
@ -1482,6 +1524,9 @@ cmd_status() {
local running; running=$(active_workers)
echo
printf '%s AGENT QUEUE %s %s\n' "$C_BOLD" "$C_DIM$QUEUE_ROOT$C_RESET" ""
if fleet_enabled; then
printf ' %sFLEET%s factory=%s api=%s\n' "$C_CYAN" "$C_RESET" "$AQ_FACTORY_ID" "$AQ_FLEET_API"
fi
printf ' %sinbox%s %-3s %sbuilding%s %-3s %sreview%s %-3s %stesting%s %-3s %sshipped%s %-3s %sfailed%s %-3s %srunning%s %s/%s\n\n' \
"$C_BLUE" "$C_RESET" "$ib" "$C_YEL" "$C_RESET" "$bd" \
"$C_CYAN" "$C_RESET" "$rv" "$C_CYAN" "$C_RESET" "$ts" \
@ -1511,12 +1556,16 @@ cmd_status() {
local m_prio m_prof m_caps m_trk extra=""
m_prio=$(grep '^priority=' "$f" | cut -d= -f2-); m_prof=$(grep '^profile=' "$f" | cut -d= -f2-)
m_caps=$(grep '^capabilities=' "$f" | cut -d= -f2-); m_trk=$(grep '^tracker_item=' "$f" | cut -d= -f2-)
local m_echo; m_echo=$(grep '^tracker_echoed=' "$f" | tail -1 | cut -d= -f2-)
local m_echo m_fleet m_epoch
m_echo=$(grep '^tracker_echoed=' "$f" | tail -1 | cut -d= -f2-)
m_fleet=$(grep '^fleet_job_id=' "$f" | tail -1 | cut -d= -f2-)
m_epoch=$(grep '^fleet_lease_epoch=' "$f" | tail -1 | cut -d= -f2-)
[[ -n "$m_prio" ]] && extra+="prio=$m_prio "
[[ -n "$m_prof" ]] && extra+="profile=$m_prof "
[[ -n "$m_caps" ]] && extra+="caps=$m_caps "
[[ -n "$m_trk" ]] && extra+="tracker=$m_trk "
[[ -n "$m_echo" ]] && extra+="echoed=$m_echo "
[[ -n "$m_fleet" ]] && extra+="fleet=$m_fleet@e${m_epoch:-0} "
[[ -n "$extra" ]] && printf ' %s%s%s\n' "$C_DIM" "$extra" "$C_RESET"
printf ' %s%s%s\n' "$C_DIM" "$(_insights_line "$f")" "$C_RESET"
done
@ -1559,7 +1608,8 @@ cmd_insights() {
for k in engine result attempts started ended duration_s exit verify_exit \
model tokens_in tokens_out tokens_cached cost_usd turns tool_calls usage_estimated \
files_changed lines_added lines_deleted wip_branch wip_base wip_commit \
next_eligible retry_class recovered tracker_item tracker_echoed tracker_echoed_at; do
next_eligible retry_class recovered tracker_item tracker_echoed tracker_echoed_at \
fleet_job_id fleet_lease_epoch fleet_reported fleet_fenced fleet_degraded fleet_quarantined; do
val=$(_meta_val "$f" "$k")
[[ -n "$val" ]] && printf ' %-15s %s\n' "$k" "$val"
done
@ -1669,10 +1719,23 @@ cmd_ship() {
local f; f=$(_find_job "$job" "$TESTING")
[[ -n "$f" ]] || die "no job in testing/ matching '$job' (only QA-passed jobs can ship)"
local base name; base=$(basename "$f"); name=${base%.md}
# Fleet (§18): a fleet job may only ship if we still hold the lease. A fenced
# report means the coordinator reclaimed it -> quarantine instead of shipping.
if fleet_enabled && _fleet_is_job "$name"; then
fleet_report "$name" shipped checkpoint; local _src=$?
if [[ "$_src" -eq 2 ]]; then
fleet_quarantine "$name" "$f" "$STATE/$name.meta" "$LOGS/$name.log"
return 0
fi
fi
mv "$f" "$SHIPPED/$base"
[[ -f "$STATE/$name.meta" ]] && echo "result=shipped" >> "$STATE/$name.meta"
log "shipped $C_BOLD$base$C_RESET (testing → shipped)"
_auto_echo "$name"
if fleet_enabled && _fleet_is_job "$name"; then
fleet_lease_release "$name" shipped >/dev/null 2>&1 || true
fi
return 0
}
# promote <job> — advance one stage forward: review → testing → shipped.
@ -1764,6 +1827,7 @@ ${C_BOLD}COMMANDS${C_RESET}
recover reclaim orphaned building/ jobs (dead worker) -> inbox
from-tracker <ITEM_ID> pull a tracker Item -> materialize a job in inbox/ (§10)
to-tracker <job> echo a job's outcome to its tracker Item (one-way)
fleet-status (AQ_FLEET=1) register/heartbeat with the coordinator + show identity
dash [--interval N] richer live Node dashboard (recent shipped/failed too)
stop kill running workers + the run loop
logs <job> [-f] print (or follow) a job's log
@ -1819,13 +1883,25 @@ ${C_BOLD}TRACKER${C_RESET} (§10 — from-tracker / to-tracker; real use needs p
AQ_TRACKER_AUTO=1 to auto-echo outcomes on each transition (default OFF)
AQ_TRACKER_CWD (cwd for tracker-derived jobs) AQ_TRACKER_API_CMD (test stub seam)
label hints on an Item: engine-class:<x> profile:<x> priority:<x> cap:<token>
${C_BOLD}FLEET${C_RESET} (Phase 2 — coordinator integration; OFF by default = offline git-queue)
AQ_FLEET=1 to enable; AQ_FLEET_API (=$AQ_FLEET_API) AQ_FLEET_TOKEN (bearer) AQ_PRODUCT_ID
AQ_FACTORY_ID (=$AQ_FACTORY_ID) AQ_FLEET_LEASE_RENEW_SEC AQ_FLEET_CAPS AQ_FLEET_CWD
When on, 'run' registers + claims coordinator jobs (interleaved with local .md), reports
fenced stage transitions, renews leases, and quarantines a job if it is reclaimed (fenced).
EOF
}
# Fleet coordinator client (Phase 2). All functions no-op unless AQ_FLEET=1, so the
# offline git-queue path above is unchanged when the flag is off.
# shellcheck source=lib/fleet-client.sh
[[ -f "$SCRIPT_DIR/lib/fleet-client.sh" ]] && source "$SCRIPT_DIR/lib/fleet-client.sh"
main() {
local cmd="${1:-help}"; shift || true
case "$cmd" in
init) cmd_init "$@";;
fleet-status) cmd_fleet_status "$@";;
add) cmd_add "$@";;
run) cmd_run "$@";;
status) cmd_status "$@";;

View File

@ -12,7 +12,7 @@
| ----- | ----- | ------ | - | ---- |
| **0** | Baseline (today) | ✅ shipped | 100% | `selftest.sh` green |
| **1** | Manifest + profiles + capabilities + tracker adapter (single host) | ◐ in progress | 95% | adapter e2e + selftest |
| **2** | Coordinator as platform-service module + Cosmos + multi-factory leasing | ☐ not started | 0% | fleet e2e + module tests |
| **2** | Coordinator as platform-service module + Cosmos + multi-factory leasing | ◐ in progress | 55% | fleet e2e + module tests |
| **3** | Fleet control plane in tracker-web + DAG deps + budgets + scoring router | ☐ not started | 0% | web e2e + router tests |
| **4** | Message bus + autoscaling + cross-OS capability marketplace | ☐ not started | 0% | load/chaos suite |
| **5** | Self-optimizing / learned routing | ☐ not started | 0% | offline eval + A/B |
@ -367,10 +367,20 @@ Each phase: **Goal → checklist → Exit criteria**. Don't start a phase until
### Phase 2 — Coordinator as platform-service module + Cosmos + multi-factory leasing
**Goal:** the service spine; ≥2 real factories executing in parallel via leases.
> **Slice progress — P2-S3 (factory-agent integration, single host):** the bash runner
> is now a coordinator **factory** behind `AQ_FLEET``lib/fleet-client.sh` (curl-only,
> sourced) registers via heartbeat, claims jobs into inbox (interleaved with local `.md`),
> reports **fenced** stage transitions with WIP checkpoints, renews/releases leases, and on
> a stale `leaseEpoch` (reclaimed) **self-aborts + quarantines** the local result. Coordinator
> 5xx/connection errors **degrade** (finish locally) rather than abandon work. When `AQ_FLEET`
> is off the offline git-queue path is byte-for-byte unchanged. Remaining P2: scheduler/router
> core, direct tracker→module calls, factory enrollment + scoped tokens, `fleet.*` feature
> flags + shadow/dual-run, and the two-factory parallel demo (the Phase-2 exit criteria).
- [x] Scaffold `fleet`/`orchestrator` module in `platform-service` (`types/repository/routes`, Zod, ESM, `productId`). *(PR #28)*
- [x] Cosmos containers (§13) + repository layer (memory + Cosmos providers). *(PR #28; `fleet_artifacts` blob wiring still pending.)*
- [x] **Atomic claim** (optimistic concurrency / `_etag`) + **lease reaper** + **fencing (`leaseEpoch`)** endpoints (§4/§8/§9) — *not* Cosmos-TTL-driven reclaim. *(common-plat PR #28 + #29; truly atomic via `updateIfMatch`.)*
- [ ] Port `agent-queue` runner to a **factory agent** API client (enroll/register/heartbeat/claim/report, fencing-aware) while keeping git-queue fallback.
- [x] Port `agent-queue` runner to a **factory agent** API client (enroll/register/heartbeat/claim/report, fencing-aware) while keeping git-queue fallback. *(P2-S3: `lib/fleet-client.sh` behind `AQ_FLEET`; registers via heartbeat, claims into inbox, reports fenced stage transitions, renews leases, quarantines on stale-epoch; offline git-queue unchanged when the flag is off.)*
- [ ] Scheduler/router core (§7) as a pure module (fixed weights) + wired into atomic assignment.
- [ ] Tracker adapter calls the module directly (not just file export).
- [ ] Auth: factory enrollment + scoped rotatable tokens; secret isolation enforced (§12 subset).

View File

@ -0,0 +1,258 @@
# shellcheck shell=bash
# ── Fleet coordinator client (Phase 2, §7/§8/§9/§18) ────────────────
#
# Sourced by agent-queue.sh. Lets the single-host runner act as a "factory" that
# registers / heartbeats / claims / reports against the platform-service `fleet`
# coordinator — BEHIND the AQ_FLEET flag. When AQ_FLEET is unset/0, every function
# here is an immediate no-op and the offline git-queue path is byte-for-byte
# unchanged. curl-only + POSIX awk (reuses agent-queue.sh helpers: log/err,
# _meta_val, _json_str, _json_escape, detect_capabilities, active_workers, CURL_BIN).
#
# Contract (routes under AQ_FLEET_API, which already includes /api):
# POST /fleet/factories/heartbeat {factoryId, capabilities[], health, load}
# POST /fleet/claim {factoryId, capabilities[], leaseSeconds}
# -> {claimed, job{id,bodyMd,leaseEpoch}, lease{...}}
# PATCH /fleet/jobs/:id {stage, leaseEpoch, checkpoint?} (409 = fenced)
# POST /fleet/jobs/:id/lease/renew {leaseEpoch, leaseSeconds} (409 = fenced)
# POST /fleet/jobs/:id/lease/release {leaseEpoch, stage?}
# The coordinator owns leaseEpoch fencing + writes fleet_events server-side; there
# is no client-side "register" or "append event" call (register == first heartbeat).
# ── Config (env-overridable) ────────────────────────────────────────
AQ_FLEET="${AQ_FLEET:-0}" # master switch (0 = offline)
AQ_FLEET_API="${AQ_FLEET_API:-http://localhost:4003/api}" # base URL incl. /api
AQ_FLEET_TOKEN="${AQ_FLEET_TOKEN:-}" # bearer; never hardcode
# AQ_PRODUCT_ID is shared with the Slice-4 tracker config (X-Product-Id header).
AQ_FACTORY_ID="${AQ_FACTORY_ID:-$( (hostname -s 2>/dev/null || hostname 2>/dev/null || echo factory) | tr -cd 'A-Za-z0-9._-')-$$}"
AQ_FLEET_LEASE_RENEW_SEC="${AQ_FLEET_LEASE_RENEW_SEC:-300}" # heartbeat/renew cadence
AQ_FLEET_LEASE_SECONDS="${AQ_FLEET_LEASE_SECONDS:-900}" # requested lease duration
AQ_FLEET_CAPS="${AQ_FLEET_CAPS:-}" # override caps (comma/space list)
AQ_FLEET_CWD="${AQ_FLEET_CWD:-$PWD}" # cwd for claimed fleet jobs
AQ_FLEET_API_CMD="${AQ_FLEET_API_CMD:-}" # test seam (stub script)
AQ_FLEET_HB_TS=0 # last heartbeat epoch (mutable)
# fleet_enabled — true iff the coordinator integration is switched on.
fleet_enabled() { [[ "${AQ_FLEET:-0}" == 1 ]]; }
# ── HTTP (curl only; same output contract as the Slice-4 tracker_api) ──
# fleet_api <METHOD> <PATH> [JSON] -> response body, then a final HTTP-code line.
fleet_api() {
local method=$1 path=$2 body=${3:-}
if [[ -n "$AQ_FLEET_API_CMD" ]]; then
"$AQ_FLEET_API_CMD" "$method" "$path" "$body"
return $?
fi
local url="${AQ_FLEET_API}${path}"
local -a args=(-sS -m "${AQ_FLEET_TIMEOUT:-30}" -X "$method"
-H "Content-Type: application/json" -w '\n%{http_code}')
[[ -n "$AQ_FLEET_TOKEN" ]] && args+=(-H "Authorization: Bearer $AQ_FLEET_TOKEN")
[[ -n "$AQ_PRODUCT_ID" ]] && args+=(-H "X-Product-Id: $AQ_PRODUCT_ID")
[[ -n "$body" ]] && args+=(--data "$body")
local out rc
out=$("$CURL_BIN" "${args[@]}" "$url" 2>/dev/null); rc=$?
if [[ $rc -ne 0 ]]; then printf '%s\n000\n' "$out"; else printf '%s\n' "$out"; fi
}
# _fleet_call <METHOD> <PATH> [JSON] -> sets globals FLEET_BODY + FLEET_CODE.
_fleet_call() {
local out; out=$(fleet_api "$@")
FLEET_CODE=$(printf '%s' "$out" | tail -n1)
FLEET_BODY=$(printf '%s' "$out" | sed '$d')
}
# _fleet_json_num <key> (reads JSON on stdin) -> first numeric value for key.
_fleet_json_num() {
grep -oE "\"$1\"[[:space:]]*:[[:space:]]*-?[0-9]+" | head -1 | grep -oE -- '-?[0-9]+$'
}
# _fleet_is_job <job> -> 0 if this job was claimed from the coordinator.
_fleet_is_job() { [[ -n "$(_meta_val "$STATE/$1.meta" fleet_job_id)" ]]; }
# fleet_detect_caps -> JSON array of capability tokens (override or auto-detected).
fleet_detect_caps() {
local toks
if [[ -n "$AQ_FLEET_CAPS" ]]; then
toks=$(printf '%s' "$AQ_FLEET_CAPS" | tr ', ' '\n\n')
else
toks=$(detect_capabilities)
fi
local out="[" first=1 t
while IFS= read -r t; do
[[ -n "$t" ]] || continue
[[ $first -eq 1 ]] && first=0 || out+=","
out+="\"$(_json_escape "$t")\""
done <<< "$toks"
printf '%s]' "$out"
}
# ── Heartbeat (registration == first heartbeat) ─────────────────────
fleet_heartbeat() {
fleet_enabled || return 0
local caps load body
caps=$(fleet_detect_caps)
load=$(active_workers 2>/dev/null || echo 0)
body="{\"factoryId\":\"$(_json_escape "$AQ_FACTORY_ID")\",\"capabilities\":$caps,\"health\":\"ok\",\"load\":${load:-0}}"
_fleet_call POST "/fleet/factories/heartbeat" "$body"
case "$FLEET_CODE" in
2*) AQ_FLEET_HB_TS=$(date +%s); return 0;;
*) err "fleet: heartbeat failed (HTTP ${FLEET_CODE:-error}) — running degraded"; return 1;;
esac
}
# fleet_heartbeat_maybe — heartbeat only when the cadence interval has elapsed.
fleet_heartbeat_maybe() {
fleet_enabled || return 0
local now; now=$(date +%s)
[[ $(( now - ${AQ_FLEET_HB_TS:-0} )) -ge "${AQ_FLEET_LEASE_RENEW_SEC:-300}" ]] && fleet_heartbeat
return 0
}
# ── Claim — pull one job and materialize it as a local inbox .md ────
# Returns 0 = claimed + materialized, 2 = nothing claimable, 1 = API error.
fleet_claim() {
fleet_enabled || return 2
local caps body; caps=$(fleet_detect_caps)
body="{\"factoryId\":\"$(_json_escape "$AQ_FACTORY_ID")\",\"capabilities\":$caps,\"leaseSeconds\":${AQ_FLEET_LEASE_SECONDS:-900}}"
_fleet_call POST "/fleet/claim" "$body"
case "$FLEET_CODE" in 2*) :;; *) err "fleet: claim failed (HTTP ${FLEET_CODE:-error})"; return 1;; esac
printf '%s' "$FLEET_BODY" | grep -q '"claimed"[[:space:]]*:[[:space:]]*true' || return 2
local jid body_md epoch
jid=$(printf '%s' "$FLEET_BODY" | _json_str id)
body_md=$(printf '%s' "$FLEET_BODY" | _json_str bodyMd)
epoch=$(printf '%s' "$FLEET_BODY" | _fleet_json_num leaseEpoch)
[[ -n "$jid" ]] || { err "fleet: claim returned no job id"; return 1; }
# Materialize a transient local job .md (same approach as from-tracker) so the
# existing runner executes a coordinator job unchanged. fleet-job-id +
# fleet-lease-epoch travel in frontmatter -> the job meta (see cmd_run).
local safe tmpdir tmp
safe=$(printf '%s' "$jid" | tr -c 'A-Za-z0-9._-' '_')
tmpdir=$(mktemp -d "${TMPDIR:-/tmp}/aq-fleet.XXXXXX")
tmp="$tmpdir/fleet-$safe.md"
{
echo "---"
echo "cwd: $AQ_FLEET_CWD"
echo "yolo: true"
echo "fleet-job-id: $jid"
echo "fleet-lease-epoch: ${epoch:-0}"
echo "idempotency-key: fleet-$jid"
echo "---"
echo
printf '%s\n' "$body_md"
} > "$tmp"
cmd_add "$tmp" >/dev/null 2>&1
rm -rf "$tmpdir"
log "fleet: claimed job $C_BOLD$jid$C_RESET (leaseEpoch=${epoch:-0})"
return 0
}
# ── Report a fenced stage transition ────────────────────────────────
# fleet_report <job> <stage> [with-checkpoint] -> 0 ok, 2 FENCED (stale epoch:
# caller must self-abort), 1 degraded (coordinator unreachable: continue locally).
fleet_report() {
fleet_enabled || return 0
local job=$1 stage=$2 with_ckpt=${3:-} metaf jid epoch
metaf="$STATE/$job.meta"
jid=$(_meta_val "$metaf" fleet_job_id); epoch=$(_meta_val "$metaf" fleet_lease_epoch)
[[ -n "$jid" ]] || return 0
local ckpt=""
if [[ -n "$with_ckpt" ]]; then
local wb wc; wb=$(_meta_val "$metaf" wip_branch); wc=$(_meta_val "$metaf" wip_commit)
if [[ -n "$wb" ]]; then
ckpt=",\"checkpoint\":{\"wipBranch\":\"$(_json_escape "$wb")\""
[[ -n "$wc" ]] && ckpt+=",\"wipCommit\":\"$(_json_escape "$wc")\""
ckpt+="}"
fi
fi
# payload carries ONLY {stage, leaseEpoch, checkpoint} — never bodyMd/prompt/token.
_fleet_call PATCH "/fleet/jobs/$jid" "{\"stage\":\"$stage\",\"leaseEpoch\":${epoch:-0}$ckpt}"
case "$FLEET_CODE" in
2*) echo "fleet_reported=$stage" >> "$metaf"; return 0;;
409|412) err "fleet: FENCED reporting stage=$stage (stale leaseEpoch=$epoch) — self-aborting $job"
echo "fleet_fenced=1" >> "$metaf"; return 2;;
*) err "fleet: report stage=$stage failed (HTTP ${FLEET_CODE:-error}) — offline-degrade, continuing locally"
echo "fleet_degraded=1" >> "$metaf"; return 1;;
esac
}
# fleet_lease_renew <job> -> extend the lease; 0 ok, 2 fenced, 1 degraded.
fleet_lease_renew() {
fleet_enabled || return 0
local job=$1 metaf jid epoch
metaf="$STATE/$job.meta"
jid=$(_meta_val "$metaf" fleet_job_id); epoch=$(_meta_val "$metaf" fleet_lease_epoch)
[[ -n "$jid" ]] || return 0
_fleet_call POST "/fleet/jobs/$jid/lease/renew" "{\"leaseEpoch\":${epoch:-0},\"leaseSeconds\":${AQ_FLEET_LEASE_SECONDS:-900}}"
case "$FLEET_CODE" in
2*) return 0;;
409|412) echo "fleet_fenced=1" >> "$metaf"; return 2;;
*) return 1;;
esac
}
# fleet_lease_release <job> [stage] -> best-effort release on a terminal stage.
fleet_lease_release() {
fleet_enabled || return 0
local job=$1 stage=${2:-} metaf jid epoch body
metaf="$STATE/$job.meta"
jid=$(_meta_val "$metaf" fleet_job_id); epoch=$(_meta_val "$metaf" fleet_lease_epoch)
[[ -n "$jid" ]] || return 0
body="{\"leaseEpoch\":${epoch:-0}"
[[ -n "$stage" ]] && body+=",\"stage\":\"$stage\""
body+="}"
_fleet_call POST "/fleet/jobs/$jid/lease/release" "$body"
return 0
}
# fleet_renew_active — renew leases for all in-flight (building/) fleet jobs.
fleet_renew_active() {
fleet_enabled || return 0
local f job
for f in "$BUILDING"/*.md; do
[[ -e "$f" ]] || continue
job=$(basename "$f"); job=${job%.md}
_fleet_is_job "$job" && { fleet_lease_renew "$job" >/dev/null 2>&1 || true; }
done
return 0
}
# fleet_quarantine <job> <file> <metaf> <logf> — a fenced (reclaimed) worker must
# NOT ship: park the local result in failed/ for human triage (§9 split-brain).
fleet_quarantine() {
local job=$1 file=$2 metaf=$3 logf=$4
{
echo "FLEET FENCED — the coordinator reclaimed this job (stale leaseEpoch)."
echo "Quarantining the local result — NOT shipping/merging. Needs human triage. ($(date))"
} >> "$logf"
[[ -e "$file" ]] && mv "$file" "$FAILED/" 2>/dev/null
{ echo "result=fenced_quarantine"; echo "fleet_quarantined=1"; echo "ended=$(date +%s)"; } >> "$metaf"
err "fleet: quarantined $job (fenced/reclaimed) — surfaced for human triage"
}
# _fleet_stage_for <result> -> the coordinator stage for a job result/stage.
_fleet_stage_for() {
case "$1" in
shipped) echo shipped;;
testing) echo testing;;
review) echo review;;
failed|timeout|verify_failed|retries_exhausted|capability_mismatch|no_engine|rejected) echo failed;;
*) echo building;;
esac
}
# fleet-status — heartbeat (register) + print this factory's identity/caps.
cmd_fleet_status() {
ensure_dirs
if ! fleet_enabled; then
log "fleet: AQ_FLEET is off — running in offline git-queue mode (no coordinator)."
return 0
fi
log "fleet: factory=$C_BOLD$AQ_FACTORY_ID$C_RESET api=$AQ_FLEET_API"
log "fleet: capabilities=$(fleet_detect_caps)"
if fleet_heartbeat; then
log "fleet: heartbeat OK (registered). Use 'run' to start claiming jobs."
else
err "fleet: coordinator unreachable — would run in offline-degrade mode."
fi
}

View File

@ -688,4 +688,142 @@ else
fi
unset AQ_TRACKER_API_CMD AQ_TRACKER_CWD AQ_STUB_CALLS AQ_STUB_ITEM AQ_STUB_CODE AQ_STUB_GET_CODE
# ─────────────────────────────────────────────────────────────────────
# Phase 2 — Slice 3 cases (fleet coordinator integration). A stub replaces
# fleet_api via AQ_FLEET_API_CMD (no live coordinator), records calls + returns
# canned JSON. The flag-off cases prove the offline path is unchanged.
# ─────────────────────────────────────────────────────────────────────
fstub="$tmp/fleet-stub.sh"
cat > "$fstub" <<'STUBEOF'
#!/usr/bin/env bash
# fleet API stub: record "<method> <path> :: <body>"; canned responses by route.
[ -n "${AQ_FSTUB_CALLS:-}" ] && printf '%s %s :: %s\n' "$1" "$2" "$3" >> "$AQ_FSTUB_CALLS"
case "$1 $2" in
"POST /fleet/factories/heartbeat") printf '%s\n200\n' '{"ok":true}' ;;
"POST /fleet/claim")
if [ -n "${AQ_FSTUB_CLAIM_FLAG:-}" ] && [ -f "$AQ_FSTUB_CLAIM_FLAG" ]; then
printf '%s\n200\n' '{"claimed":false}'
else
[ -n "${AQ_FSTUB_CLAIM_FLAG:-}" ] && : > "$AQ_FSTUB_CLAIM_FLAG"
printf '{"claimed":true,"job":{"id":"%s","bodyMd":"%s","leaseEpoch":1},"lease":{"leaseEpoch":1}}\n200\n' \
"${AQ_FSTUB_JOB_ID:-fjob_1}" "${AQ_FSTUB_BODY:-do the work}"
fi ;;
PATCH\ /fleet/jobs/*) printf '%s\n%s\n' '{}' "${AQ_FSTUB_PATCH_CODE:-200}" ;;
*) printf '%s\n200\n' '{}' ;;
esac
STUBEOF
chmod +x "$fstub"
# 33. flag OFF (default): a recording stub is configured but AQ_FLEET is unset →
# ZERO fleet calls, and a local job runs through the offline path unchanged.
export AGENT_QUEUE_ROOT="$tmp/queue-floff"
"$AQ" init >/dev/null
export AQ_FLEET_API_CMD="$fstub"; export AQ_FSTUB_CALLS="$tmp/floff-calls.log"; : > "$AQ_FSTUB_CALLS"
printf '%s\n' '---' 'engine: devin' "cwd: $work" 'yolo: true' '---' '' '# local task' \
> "$AGENT_QUEUE_ROOT/inbox/localjob.md"
DEVIN_BIN="$stub" "$AQ" run --once >/dev/null 2>&1
if [ ! -s "$AQ_FSTUB_CALLS" ] && ls "$AGENT_QUEUE_ROOT"/review/localjob.md >/dev/null 2>&1; then
pass "fleet flag OFF: zero coordinator calls; offline job completes to review/"
else
cat "$AQ_FSTUB_CALLS" >&2; fail "flag-off made fleet calls or offline job did not complete"
fi
unset AQ_FLEET_API_CMD AQ_FSTUB_CALLS
# 34. AQ_FLEET=1: loop start registers (heartbeat with caps) + claim executes a
# coordinator job to review/, with fleet_job_id + leaseEpoch persisted in meta.
export AGENT_QUEUE_ROOT="$tmp/queue-fl1"; export AQ_FLEET_CWD="$work"
"$AQ" init >/dev/null
export AQ_FLEET_API_CMD="$fstub" AQ_FSTUB_CALLS="$tmp/fl1-calls.log" AQ_FSTUB_CLAIM_FLAG="$tmp/fl1-claimed" \
AQ_FSTUB_JOB_ID="fjob_1" AQ_FSTUB_BODY="FLEET-BODY-SENTINEL do work"
: > "$AQ_FSTUB_CALLS"; rm -f "$AQ_FSTUB_CLAIM_FLAG"
AQ_FLEET=1 AGENT_QUEUE_POLL=1 DEVIN_BIN="$stub" "$AQ" run --once >/dev/null 2>&1
fmeta=$(find "$AGENT_QUEUE_ROOT/.state" -name '*fleet-fjob_1.meta' | head -1)
if grep -q 'POST /fleet/factories/heartbeat :: .*capabilities' "$AQ_FSTUB_CALLS" \
&& grep -q 'POST /fleet/claim' "$AQ_FSTUB_CALLS" \
&& ls "$AGENT_QUEUE_ROOT"/review/*fleet-fjob_1.md >/dev/null 2>&1 \
&& [ "$(metaval "$fmeta" fleet_job_id)" = "fjob_1" ] && [ "$(metaval "$fmeta" fleet_lease_epoch)" = "1" ]; then
pass "fleet: register(heartbeat)+claim -> coordinator job materialized + executed to review/"
else
cat "$AQ_FSTUB_CALLS" >&2; fail "fleet claim/execute did not work as expected"
fi
# 35. report + checkpoint: PATCH /fleet/jobs/:id carries stage + leaseEpoch, and a
# checkpoint (wipBranch) on building when cwd is a git repo.
export AGENT_QUEUE_ROOT="$tmp/queue-fl2"; repo=$tmp/repo-fl2; mkrepo "$repo"; export AQ_FLEET_CWD="$repo"
"$AQ" init >/dev/null
export AQ_FSTUB_CALLS="$tmp/fl2-calls.log" AQ_FSTUB_CLAIM_FLAG="$tmp/fl2-claimed" AQ_FSTUB_JOB_ID="fjob_2" AQ_FSTUB_BODY="work two"
: > "$AQ_FSTUB_CALLS"; rm -f "$AQ_FSTUB_CLAIM_FLAG"
AQ_FLEET=1 AGENT_QUEUE_POLL=1 DEVIN_BIN="$stub" "$AQ" run --once >/dev/null 2>&1
if grep -q 'PATCH /fleet/jobs/fjob_2 :: .*"stage":"building".*"leaseEpoch":1' "$AQ_FSTUB_CALLS" \
&& grep -q 'PATCH /fleet/jobs/fjob_2 :: .*"stage":"building".*"wipBranch"' "$AQ_FSTUB_CALLS" \
&& grep -q 'PATCH /fleet/jobs/fjob_2 :: .*"stage":"review"' "$AQ_FSTUB_CALLS"; then
pass "fleet: PATCH stage transitions carry leaseEpoch + checkpoint(wipBranch) on building"
else
cat "$AQ_FSTUB_CALLS" >&2; fail "fleet report/checkpoint payload incorrect"
fi
# 36. FENCING: PATCH returns conflict (stale epoch) → worker self-aborts, job is
# quarantined to failed/ (NOT review/testing/shipped), fenced is recorded.
export AGENT_QUEUE_ROOT="$tmp/queue-fl3"
"$AQ" init >/dev/null
export AQ_FSTUB_CALLS="$tmp/fl3-calls.log" AQ_FSTUB_CLAIM_FLAG="$tmp/fl3-claimed" AQ_FSTUB_JOB_ID="fjob_3" AQ_FSTUB_BODY="work three" AQ_FSTUB_PATCH_CODE=409
: > "$AQ_FSTUB_CALLS"; rm -f "$AQ_FSTUB_CLAIM_FLAG"
AQ_FLEET=1 AGENT_QUEUE_POLL=1 DEVIN_BIN="$stub" "$AQ" run --once >/dev/null 2>&1
fmeta3=$(find "$AGENT_QUEUE_ROOT/.state" -name '*fleet-fjob_3.meta' | head -1)
rcount=$(find "$AGENT_QUEUE_ROOT/review" "$AGENT_QUEUE_ROOT/testing" "$AGENT_QUEUE_ROOT/shipped" -maxdepth 1 -name '*.md' 2>/dev/null | wc -l | tr -d ' ')
if [ "$rcount" = "0" ] && ls "$AGENT_QUEUE_ROOT"/failed/*fleet-fjob_3.md >/dev/null 2>&1 \
&& [ "$(metaval "$fmeta3" result)" = "fenced_quarantine" ] && [ "$(metaval "$fmeta3" fleet_fenced)" = "1" ]; then
pass "fleet FENCING: stale-epoch PATCH -> self-abort + quarantine (never shipped)"
else
cat "$AQ_FSTUB_CALLS" >&2; fail "fleet fencing did not quarantine correctly (review/testing/shipped=$rcount)"
fi
unset AQ_FSTUB_PATCH_CODE
# 37. lease renew (unit): fleet_lease_renew issues POST .../lease/renew with epoch.
funcs="$tmp/aq-funcs-fl.sh"; sed '/^main "\$@"/d' "$AQ" > "$funcs"
renew_calls="$tmp/renew-calls.log"; : > "$renew_calls"
if bash -c '
set -uo pipefail
export AGENT_QUEUE_ROOT="'"$tmp"'/queue-renew" AQ_FLEET=1
export AQ_FLEET_API_CMD="'"$fstub"'" AQ_FSTUB_CALLS="'"$renew_calls"'"
source "'"$funcs"'" # agent-queue helpers (main stripped; SCRIPT_DIR=/tmp here)
source "'"$HERE"'/lib/fleet-client.sh" # source the lib explicitly (relative source is skipped)
ensure_dirs
printf "%s\n" "job=jr" "fleet_job_id=fjob_r" "fleet_lease_epoch=7" > "$STATE/jr.meta"
fleet_lease_renew jr
'; then
grep -q 'POST /fleet/jobs/fjob_r/lease/renew :: .*"leaseEpoch":7' "$renew_calls" \
&& pass "fleet: lease renew issues POST .../lease/renew with current leaseEpoch" \
|| { cat "$renew_calls" >&2; fail "fleet lease renew payload missing/incorrect"; }
else
fail "fleet_lease_renew invocation errored"
fi
# 38. offline-degrade: a 5xx on PATCH does NOT quarantine — the job finishes locally
# (degraded), reaching review/ with fleet_degraded recorded.
export AGENT_QUEUE_ROOT="$tmp/queue-fl4"
"$AQ" init >/dev/null
export AQ_FSTUB_CALLS="$tmp/fl4-calls.log" AQ_FSTUB_CLAIM_FLAG="$tmp/fl4-claimed" AQ_FSTUB_JOB_ID="fjob_4" AQ_FSTUB_BODY="work four" AQ_FSTUB_PATCH_CODE=500
: > "$AQ_FSTUB_CALLS"; rm -f "$AQ_FSTUB_CLAIM_FLAG"
AQ_FLEET=1 AGENT_QUEUE_POLL=1 DEVIN_BIN="$stub" "$AQ" run --once >/dev/null 2>&1
fmeta4=$(find "$AGENT_QUEUE_ROOT/.state" -name '*fleet-fjob_4.meta' | head -1)
if ls "$AGENT_QUEUE_ROOT"/review/*fleet-fjob_4.md >/dev/null 2>&1 \
&& [ "$(metaval "$fmeta4" fleet_degraded)" = "1" ] \
&& [ "$(metaval "$fmeta4" result)" != "fenced_quarantine" ]; then
pass "fleet offline-degrade: coordinator 5xx -> job completes locally (degraded), not quarantined"
else
cat "$AQ_FSTUB_CALLS" >&2; fail "fleet offline-degrade behaved incorrectly"
fi
unset AQ_FSTUB_PATCH_CODE
# 39. no-leak: the claimed bodyMd is never sent in any report payload, and the
# bearer token never appears in a recorded call (it is a header, not a body).
if ! grep -q 'FLEET-BODY-SENTINEL' "$tmp/fl1-calls.log" 2>/dev/null \
&& ! grep -q 'SENTINEL-TOKEN' "$tmp/fl1-calls.log" 2>/dev/null; then
pass "fleet no-leak: bodyMd/token never appear in coordinator report payloads"
else
fail "fleet leaked bodyMd or token into a report payload"
fi
unset AQ_FLEET_API_CMD AQ_FLEET_CWD AQ_FSTUB_CALLS AQ_FSTUB_CLAIM_FLAG AQ_FSTUB_JOB_ID AQ_FSTUB_BODY
echo "self-test PASS"