# shellcheck shell=bash # ── Fleet coordinator client (Phase 2, §7/§8/§9/§18) ──────────────── # # Sourced by agent-queue.sh. Lets the single-host runner act as a "factory" that # registers / heartbeats / claims / reports against the platform-service `fleet` # coordinator — BEHIND the AQ_FLEET flag. When AQ_FLEET is unset/0, every function # here is an immediate no-op and the offline git-queue path is byte-for-byte # unchanged. curl-only + POSIX awk (reuses agent-queue.sh helpers: log/err, # _meta_val, _json_str, _json_escape, detect_capabilities, active_workers, CURL_BIN). # # Contract (routes under AQ_FLEET_API, which already includes /api): # POST /fleet/factories/heartbeat {factoryId, capabilities[], health, load} # POST /fleet/claim {factoryId, capabilities[], leaseSeconds} # -> {claimed, job{id,bodyMd,leaseEpoch}, lease{...}} # PATCH /fleet/jobs/:id {stage, leaseEpoch, checkpoint?} (409 = fenced) # POST /fleet/jobs/:id/lease/renew {leaseEpoch, leaseSeconds} (409 = fenced) # POST /fleet/jobs/:id/lease/release {leaseEpoch, stage?} # The coordinator owns leaseEpoch fencing + writes fleet_events server-side; there # is no client-side "register" or "append event" call (register == first heartbeat). # ── Config (env-overridable) ──────────────────────────────────────── AQ_FLEET="${AQ_FLEET:-0}" # master switch (0 = offline) AQ_FLEET_API="${AQ_FLEET_API:-http://localhost:4003/api}" # base URL incl. /api AQ_FLEET_TOKEN="${AQ_FLEET_TOKEN:-}" # bearer; never hardcode # AQ_PRODUCT_ID is shared with the Slice-4 tracker config (X-Product-Id header). AQ_FACTORY_ID="${AQ_FACTORY_ID:-$( (hostname -s 2>/dev/null || hostname 2>/dev/null || echo factory) | tr -cd 'A-Za-z0-9._-')-$$}" AQ_FLEET_LEASE_RENEW_SEC="${AQ_FLEET_LEASE_RENEW_SEC:-300}" # heartbeat/renew cadence AQ_FLEET_LEASE_SECONDS="${AQ_FLEET_LEASE_SECONDS:-900}" # requested lease duration AQ_FLEET_CAPS="${AQ_FLEET_CAPS:-}" # override caps (comma/space list) AQ_FLEET_CWD="${AQ_FLEET_CWD:-$PWD}" # cwd for claimed fleet jobs AQ_FLEET_API_CMD="${AQ_FLEET_API_CMD:-}" # test seam (stub script) AQ_FLEET_HB_TS=0 # last heartbeat epoch (mutable) # ── Slice 4: feature-flag levels (three explicit, independently-toggleable) ── # Precedence (documented in README §Cutover): # AQ_FLEET=0 ⇒ pure offline, ZERO coordinator calls (master switch). # AQ_FLEET_ROUTE=1 ⇒ route_via_service: coordinator is AUTHORITATIVE for claim # (default; preserves the P2-S3 behavior). # AQ_FLEET_ROUTE=0 ⇒ LOCAL inbox is authoritative (coordinator not used to # source work) — the pre-cutover state. # AQ_FLEET_SHADOW=1 ⇒ shadow/dual-run (requires AQ_FLEET=1 AND AQ_FLEET_ROUTE=0): # run the normal offline path as authoritative AND query the coordinator in # parallel WITHOUT acting on its responses, purely to record divergence. # If AQ_FLEET_ROUTE=1 AND AQ_FLEET_SHADOW=1, ROUTE WINS and shadow is disabled # (a one-shot warning is logged) — you never shadow and route at the same time. AQ_FLEET_ROUTE="${AQ_FLEET_ROUTE:-1}" AQ_FLEET_SHADOW="${AQ_FLEET_SHADOW:-0}" # Isolated factory id for the read-only shadow claim (never the real factory id). AQ_FLEET_SHADOW_FACTORY_ID="${AQ_FLEET_SHADOW_FACTORY_ID:-${AQ_FACTORY_ID}-shadow}" # Shadow divergence log (default resolved to $STATE/fleet-shadow.log at call time). AQ_FLEET_SHADOW_LOG="${AQ_FLEET_SHADOW_LOG:-}" _AQ_FLEET_SHADOW_WARNED=0 # one-shot ROUTE>SHADOW precedence warning (per process) SHADOW_COORD_JOB="" # set by fleet_shadow_claim: would-be coordinator job id # fleet_enabled — true iff the coordinator integration is switched on. fleet_enabled() { [[ "${AQ_FLEET:-0}" == 1 ]]; } # fleet_route_enabled — coordinator is authoritative for claim/assignment (ROUTE=1). fleet_route_enabled() { fleet_enabled && [[ "${AQ_FLEET_ROUTE:-1}" == 1 ]]; } # fleet_shadow_enabled — shadow/dual-run is active. Pure (no logging): requires # AQ_FLEET=1 AND AQ_FLEET_ROUTE=0 AND AQ_FLEET_SHADOW=1. When ROUTE=1 this returns # false (ROUTE wins) — the precedence warning is emitted once by fleet_flags_warn_once. fleet_shadow_enabled() { fleet_enabled || return 1 [[ "${AQ_FLEET_ROUTE:-1}" == 0 ]] || return 1 [[ "${AQ_FLEET_SHADOW:-0}" == 1 ]] } # fleet_flags_warn_once — emit the ROUTE>SHADOW precedence warning at most once. # Called from the run-loop init so an operator who sets ROUTE=1 + SHADOW=1 is told # that shadow is suppressed. No-op unless that exact (conflicting) combo is set. fleet_flags_warn_once() { fleet_enabled || return 0 if [[ "${AQ_FLEET_ROUTE:-1}" == 1 && "${AQ_FLEET_SHADOW:-0}" == 1 && "${_AQ_FLEET_SHADOW_WARNED:-0}" != 1 ]]; then err "fleet: AQ_FLEET_ROUTE=1 and AQ_FLEET_SHADOW=1 — ROUTE wins; shadow/dual-run is DISABLED. Set AQ_FLEET_ROUTE=0 to shadow." _AQ_FLEET_SHADOW_WARNED=1 fi return 0 } # fleet_flags_state — one-line resolved flag summary (for `status` / `fleet-status`). fleet_flags_state() { local route shadow if [[ "${AQ_FLEET_ROUTE:-1}" == 1 ]]; then route="route_via_service"; else route="local-authoritative"; fi if fleet_shadow_enabled; then shadow="shadow=ON"; else shadow="shadow=off"; fi printf 'AQ_FLEET=1 route=%s(AQ_FLEET_ROUTE=%s) %s(AQ_FLEET_SHADOW=%s)' \ "$route" "${AQ_FLEET_ROUTE:-1}" "$shadow" "${AQ_FLEET_SHADOW:-0}" } # ── HTTP (curl only; same output contract as the Slice-4 tracker_api) ── # fleet_api [JSON] -> response body, then a final HTTP-code line. fleet_api() { local method=$1 path=$2 body=${3:-} if [[ -n "$AQ_FLEET_API_CMD" ]]; then "$AQ_FLEET_API_CMD" "$method" "$path" "$body" return $? fi local url="${AQ_FLEET_API}${path}" local -a args=(-sS -m "${AQ_FLEET_TIMEOUT:-30}" -X "$method" -H "Content-Type: application/json" -w '\n%{http_code}') [[ -n "$AQ_FLEET_TOKEN" ]] && args+=(-H "Authorization: Bearer $AQ_FLEET_TOKEN") [[ -n "$AQ_PRODUCT_ID" ]] && args+=(-H "X-Product-Id: $AQ_PRODUCT_ID") [[ -n "$body" ]] && args+=(--data "$body") local out rc out=$("$CURL_BIN" "${args[@]}" "$url" 2>/dev/null); rc=$? if [[ $rc -ne 0 ]]; then printf '%s\n000\n' "$out"; else printf '%s\n' "$out"; fi } # _fleet_call [JSON] -> sets globals FLEET_BODY + FLEET_CODE. _fleet_call() { local out; out=$(fleet_api "$@") FLEET_CODE=$(printf '%s' "$out" | tail -n1) FLEET_BODY=$(printf '%s' "$out" | sed '$d') } # _fleet_json_num (reads JSON on stdin) -> first numeric value for key. _fleet_json_num() { grep -oE "\"$1\"[[:space:]]*:[[:space:]]*-?[0-9]+" | head -1 | grep -oE -- '-?[0-9]+$' } # _fleet_is_job -> 0 if this job was claimed from the coordinator. _fleet_is_job() { [[ -n "$(_meta_val "$STATE/$1.meta" fleet_job_id)" ]]; } # fleet_detect_caps -> JSON array of capability tokens (override or auto-detected). fleet_detect_caps() { local toks if [[ -n "$AQ_FLEET_CAPS" ]]; then toks=$(printf '%s' "$AQ_FLEET_CAPS" | tr ', ' '\n\n') else toks=$(detect_capabilities) fi local out="[" first=1 t while IFS= read -r t; do [[ -n "$t" ]] || continue [[ $first -eq 1 ]] && first=0 || out+="," out+="\"$(_json_escape "$t")\"" done <<< "$toks" printf '%s]' "$out" } # ── Heartbeat (registration == first heartbeat) ───────────────────── fleet_heartbeat() { fleet_enabled || return 0 local caps load body caps=$(fleet_detect_caps) load=$(active_workers 2>/dev/null || echo 0) body="{\"factoryId\":\"$(_json_escape "$AQ_FACTORY_ID")\",\"capabilities\":$caps,\"health\":\"ok\",\"load\":${load:-0}}" _fleet_call POST "/fleet/factories/heartbeat" "$body" case "$FLEET_CODE" in 2*) AQ_FLEET_HB_TS=$(date +%s); return 0;; *) err "fleet: heartbeat failed (HTTP ${FLEET_CODE:-error}) — running degraded"; return 1;; esac } # fleet_heartbeat_maybe — heartbeat only when the cadence interval has elapsed. fleet_heartbeat_maybe() { fleet_enabled || return 0 local now; now=$(date +%s) [[ $(( now - ${AQ_FLEET_HB_TS:-0} )) -ge "${AQ_FLEET_LEASE_RENEW_SEC:-300}" ]] && fleet_heartbeat return 0 } # ── Claim — pull one job and materialize it as a local inbox .md ──── # Returns 0 = claimed + materialized, 2 = nothing claimable, 1 = API error. fleet_claim() { fleet_enabled || return 2 local caps body; caps=$(fleet_detect_caps) body="{\"factoryId\":\"$(_json_escape "$AQ_FACTORY_ID")\",\"capabilities\":$caps,\"leaseSeconds\":${AQ_FLEET_LEASE_SECONDS:-900}}" _fleet_call POST "/fleet/claim" "$body" case "$FLEET_CODE" in 2*) :;; *) err "fleet: claim failed (HTTP ${FLEET_CODE:-error})"; return 1;; esac printf '%s' "$FLEET_BODY" | grep -q '"claimed"[[:space:]]*:[[:space:]]*true' || return 2 local jid body_md epoch jid=$(printf '%s' "$FLEET_BODY" | _json_str id) body_md=$(printf '%s' "$FLEET_BODY" | _json_str bodyMd) epoch=$(printf '%s' "$FLEET_BODY" | _fleet_json_num leaseEpoch) [[ -n "$jid" ]] || { err "fleet: claim returned no job id"; return 1; } # Materialize a transient local job .md (same approach as from-tracker) so the # existing runner executes a coordinator job unchanged. fleet-job-id + # fleet-lease-epoch travel in frontmatter -> the job meta (see cmd_run). local safe tmpdir tmp safe=$(printf '%s' "$jid" | tr -c 'A-Za-z0-9._-' '_') tmpdir=$(mktemp -d "${TMPDIR:-/tmp}/aq-fleet.XXXXXX") tmp="$tmpdir/fleet-$safe.md" { echo "---" echo "cwd: $AQ_FLEET_CWD" echo "yolo: true" echo "fleet-job-id: $jid" echo "fleet-lease-epoch: ${epoch:-0}" echo "idempotency-key: fleet-$jid" echo "---" echo printf '%s\n' "$body_md" } > "$tmp" cmd_add "$tmp" >/dev/null 2>&1 rm -rf "$tmpdir" log "fleet: claimed job $C_BOLD$jid$C_RESET (leaseEpoch=${epoch:-0})" return 0 } # ── Report a fenced stage transition ──────────────────────────────── # fleet_report [with-checkpoint] -> 0 ok, 2 FENCED (stale epoch: # caller must self-abort), 1 degraded (coordinator unreachable: continue locally). fleet_report() { fleet_enabled || return 0 local job=$1 stage=$2 with_ckpt=${3:-} metaf jid epoch metaf="$STATE/$job.meta" jid=$(_meta_val "$metaf" fleet_job_id); epoch=$(_meta_val "$metaf" fleet_lease_epoch) [[ -n "$jid" ]] || return 0 local ckpt="" if [[ -n "$with_ckpt" ]]; then local wb wc; wb=$(_meta_val "$metaf" wip_branch); wc=$(_meta_val "$metaf" wip_commit) if [[ -n "$wb" ]]; then ckpt=",\"checkpoint\":{\"wipBranch\":\"$(_json_escape "$wb")\"" [[ -n "$wc" ]] && ckpt+=",\"wipCommit\":\"$(_json_escape "$wc")\"" ckpt+="}" fi fi # payload carries ONLY {stage, leaseEpoch, checkpoint} — never bodyMd/prompt/token. _fleet_call PATCH "/fleet/jobs/$jid" "{\"stage\":\"$stage\",\"leaseEpoch\":${epoch:-0}$ckpt}" case "$FLEET_CODE" in 2*) echo "fleet_reported=$stage" >> "$metaf"; return 0;; 409|412) err "fleet: FENCED reporting stage=$stage (stale leaseEpoch=$epoch) — self-aborting $job" echo "fleet_fenced=1" >> "$metaf"; return 2;; *) err "fleet: report stage=$stage failed (HTTP ${FLEET_CODE:-error}) — offline-degrade, continuing locally" echo "fleet_degraded=1" >> "$metaf"; return 1;; esac } # fleet_lease_renew -> extend the lease; 0 ok, 2 fenced, 1 degraded. fleet_lease_renew() { fleet_enabled || return 0 local job=$1 metaf jid epoch metaf="$STATE/$job.meta" jid=$(_meta_val "$metaf" fleet_job_id); epoch=$(_meta_val "$metaf" fleet_lease_epoch) [[ -n "$jid" ]] || return 0 _fleet_call POST "/fleet/jobs/$jid/lease/renew" "{\"leaseEpoch\":${epoch:-0},\"leaseSeconds\":${AQ_FLEET_LEASE_SECONDS:-900}}" case "$FLEET_CODE" in 2*) return 0;; 409|412) echo "fleet_fenced=1" >> "$metaf"; return 2;; *) return 1;; esac } # fleet_lease_release [stage] -> best-effort release on a terminal stage. fleet_lease_release() { fleet_enabled || return 0 local job=$1 stage=${2:-} metaf jid epoch body metaf="$STATE/$job.meta" jid=$(_meta_val "$metaf" fleet_job_id); epoch=$(_meta_val "$metaf" fleet_lease_epoch) [[ -n "$jid" ]] || return 0 body="{\"leaseEpoch\":${epoch:-0}" [[ -n "$stage" ]] && body+=",\"stage\":\"$stage\"" body+="}" _fleet_call POST "/fleet/jobs/$jid/lease/release" "$body" return 0 } # fleet_renew_active — renew leases for all in-flight (building/) fleet jobs. fleet_renew_active() { fleet_enabled || return 0 local f job for f in "$BUILDING"/*.md; do [[ -e "$f" ]] || continue job=$(basename "$f"); job=${job%.md} _fleet_is_job "$job" && { fleet_lease_renew "$job" >/dev/null 2>&1 || true; } done return 0 } # fleet_quarantine — a fenced (reclaimed) worker must # NOT ship: park the local result in failed/ for human triage (§9 split-brain). fleet_quarantine() { local job=$1 file=$2 metaf=$3 logf=$4 { echo "FLEET FENCED — the coordinator reclaimed this job (stale leaseEpoch)." echo "Quarantining the local result — NOT shipping/merging. Needs human triage. ($(date))" } >> "$logf" [[ -e "$file" ]] && mv "$file" "$FAILED/" 2>/dev/null { echo "result=fenced_quarantine"; echo "fleet_quarantined=1"; echo "ended=$(date +%s)"; } >> "$metaf" err "fleet: quarantined $job (fenced/reclaimed) — surfaced for human triage" } # _fleet_stage_for -> the coordinator stage for a job result/stage. _fleet_stage_for() { case "$1" in shipped) echo shipped;; testing) echo testing;; review) echo review;; failed|timeout|verify_failed|retries_exhausted|capability_mismatch|no_engine|rejected) echo failed;; *) echo building;; esac } # ── Slice 4: shadow / dual-run (strictly side-effect-free on real job state) ── # _fleet_shadow_log -> path to the structured shadow-divergence log. _fleet_shadow_log() { printf '%s\n' "${AQ_FLEET_SHADOW_LOG:-$STATE/fleet-shadow.log}"; } # fleet_shadow_claim — ask the coordinator what it WOULD assign for this factory's # capabilities, read-only. Side-effect-free on real job state, by construction: # * uses an ISOLATED shadow factoryId (never the real one), so it can't take a # job away from the real factory's identity; # * sends "dryRun":true,"shadow":true — a coordinator that honors it never # assigns (purely returns the would-be job); # * if the coordinator DID assign anyway (no dry-run support), the lease is # released immediately so no real assignment persists; # * the would-be job is NEVER materialized / run / reported / shipped locally. # Sets SHADOW_COORD_JOB to the would-be job id ("" = none). Best-effort: any error # is recorded as SHADOW_ERROR and swallowed — shadow must NEVER fail a real job. fleet_shadow_claim() { SHADOW_COORD_JOB="" fleet_shadow_enabled || return 0 local caps body; caps=$(fleet_detect_caps) body="{\"factoryId\":\"$(_json_escape "$AQ_FLEET_SHADOW_FACTORY_ID")\",\"capabilities\":$caps,\"leaseSeconds\":${AQ_FLEET_LEASE_SECONDS:-900},\"dryRun\":true,\"shadow\":true}" _fleet_call POST "/fleet/claim" "$body" case "$FLEET_CODE" in 2*) : ;; *) printf '%s\t%s\t%s\t%s\n' "$(date +%s)" "" "" "SHADOW_ERROR(claim:HTTP_${FLEET_CODE:-error})" \ >> "$(_fleet_shadow_log)" 2>/dev/null || true return 0 ;; esac printf '%s' "$FLEET_BODY" | grep -q '"claimed"[[:space:]]*:[[:space:]]*true' || return 0 local jid epoch jid=$(printf '%s' "$FLEET_BODY" | _json_str id) epoch=$(printf '%s' "$FLEET_BODY" | _fleet_json_num leaseEpoch) SHADOW_COORD_JOB="$jid" # Undo any REAL lease the coordinator may have created (no dry-run support) so # the shadow probe leaves zero residue. Best-effort, response ignored. if [[ -n "$jid" ]]; then _fleet_call POST "/fleet/jobs/$jid/lease/release" "{\"leaseEpoch\":${epoch:-0},\"shadow\":true}" >/dev/null 2>&1 || true fi return 0 } # fleet_shadow_compare — classify the local (authoritative) # decision against the coordinator's would-be decision and append a structured line # (tslocalJobcoordJobverdict) to the shadow log. Verdicts: # AGREE | DIVERGE | COORD_EMPTY | LOCAL_EMPTY. Both-empty is a no-op (nothing to compare). fleet_shadow_compare() { fleet_shadow_enabled || return 0 local lj=${1:-} cj=${2:-} verdict if [[ -z "$lj" && -z "$cj" ]]; then return 0; fi if [[ -n "$lj" && -z "$cj" ]]; then verdict=COORD_EMPTY elif [[ -z "$lj" && -n "$cj" ]]; then verdict=LOCAL_EMPTY elif [[ "$lj" == "$cj" ]]; then verdict=AGREE else verdict=DIVERGE; fi printf '%s\t%s\t%s\t%s\n' "$(date +%s)" "${lj:-}" "${cj:-}" "$verdict" \ >> "$(_fleet_shadow_log)" 2>/dev/null || true return 0 } # fleet_shadow_report [stage] — mirror a stage transition # to the coordinator as a SHADOW event ("shadow":true,"dryRun":true) so the report # path is EXERCISED, but the coordinator response is NEVER acted on (no fence / # quarantine / state change) — divergence (e.g. 409) is only logged. Targets the # would-be coordinator job id; a no-op when there is none. Best-effort + swallowed. fleet_shadow_report() { fleet_shadow_enabled || return 0 local lj=${1:-} cj=${2:-} stage=${3:-building} [[ -n "$cj" ]] || return 0 _fleet_call PATCH "/fleet/jobs/$cj" "{\"stage\":\"$(_json_escape "$stage")\",\"shadow\":true,\"dryRun\":true}" case "${FLEET_CODE:-}" in 2*) : ;; *) printf '%s\t%s\t%s\t%s\n' "$(date +%s)" "${lj:-}" "${cj:-}" "SHADOW_REPORT_DIVERGE(HTTP_${FLEET_CODE:-error})" \ >> "$(_fleet_shadow_log)" 2>/dev/null || true ;; esac return 0 } # fleet-shadow-report — summarize the shadow log: per-verdict counts, agreement # rate, and the last N divergences. Read-only; safe regardless of the flags. cmd_fleet_shadow_report() { ensure_dirs local n=10 logf; logf=$(_fleet_shadow_log) [[ "${1:-}" =~ ^[0-9]+$ ]] && n=$1 if [[ ! -s "$logf" ]]; then log "fleet shadow: no shadow log yet ($logf)." log "fleet shadow: run with AQ_FLEET=1 AQ_FLEET_ROUTE=0 AQ_FLEET_SHADOW=1 to record divergence." return 0 fi log "fleet shadow report ($logf):" awk -F'\t' ' { v=$4; sub(/\(.*/, "", v); c[v]++; tot++ if (v=="AGREE") ag++ if (v=="AGREE"||v=="DIVERGE"||v=="COORD_EMPTY"||v=="LOCAL_EMPTY") dec++ } END { split("AGREE DIVERGE COORD_EMPTY LOCAL_EMPTY SHADOW_ERROR SHADOW_REPORT_DIVERGE", ord, " ") for (i=1; i<=6; i++) printf " %-22s %d\n", ord[i], c[ord[i]]+0 printf " %-22s %d\n", "TOTAL", tot+0 if (dec>0) printf " %-22s %d%%\n", "AGREEMENT", int(100*ag/dec) }' "$logf" log "last $n divergence/error events:" grep -E "$(printf '\t')(DIVERGE|COORD_EMPTY|LOCAL_EMPTY|SHADOW_ERROR|SHADOW_REPORT_DIVERGE)" "$logf" 2>/dev/null \ | tail -n "$n" \ | awk -F'\t' '{ printf " ts=%s local=%s coord=%s verdict=%s\n", $1, $2, $3, $4 }' || true return 0 } # fleet-status — heartbeat (register) + print this factory's identity/caps + flags. cmd_fleet_status() { ensure_dirs if ! fleet_enabled; then log "fleet: AQ_FLEET is off — running in offline git-queue mode (no coordinator)." return 0 fi log "fleet: factory=$C_BOLD$AQ_FACTORY_ID$C_RESET api=$AQ_FLEET_API" log "fleet: flags=$(fleet_flags_state)" fleet_flags_warn_once log "fleet: capabilities=$(fleet_detect_caps)" if fleet_shadow_enabled; then log "fleet: SHADOW/dual-run mode — local inbox is authoritative; coordinator queried for comparison only (never acted on)." elif ! fleet_route_enabled; then log "fleet: ROUTE off — local inbox is authoritative; coordinator not used to source work." fi if fleet_heartbeat; then log "fleet: heartbeat OK (registered)." else err "fleet: coordinator unreachable — would run in offline-degrade mode." fi }