# shellcheck shell=bash # ── Fleet coordinator client (Phase 2, §7/§8/§9/§18) ──────────────── # # Sourced by agent-queue.sh. Lets the single-host runner act as a "factory" that # registers / heartbeats / claims / reports against the platform-service `fleet` # coordinator — BEHIND the AQ_FLEET flag. When AQ_FLEET is unset/0, every function # here is an immediate no-op and the offline git-queue path is byte-for-byte # unchanged. curl-only + POSIX awk (reuses agent-queue.sh helpers: log/err, # _meta_val, _json_str, _json_escape, detect_capabilities, active_workers, CURL_BIN). # # Contract (routes under AQ_FLEET_API, which already includes /api): # POST /fleet/factories/heartbeat {factoryId, capabilities[], health, load} # POST /fleet/claim {factoryId, capabilities[], leaseSeconds} # -> {claimed, job{id,bodyMd,leaseEpoch}, lease{...}} # PATCH /fleet/jobs/:id {stage, leaseEpoch, checkpoint?} (409 = fenced) # POST /fleet/jobs/:id/lease/renew {leaseEpoch, leaseSeconds} (409 = fenced) # POST /fleet/jobs/:id/lease/release {leaseEpoch, stage?} # The coordinator owns leaseEpoch fencing + writes fleet_events server-side; there # is no client-side "register" or "append event" call (register == first heartbeat). # ── Config (env-overridable) ──────────────────────────────────────── AQ_FLEET="${AQ_FLEET:-0}" # master switch (0 = offline) AQ_FLEET_API="${AQ_FLEET_API:-http://localhost:4003/api}" # base URL incl. /api AQ_FLEET_TOKEN="${AQ_FLEET_TOKEN:-}" # bearer; never hardcode # AQ_PRODUCT_ID is shared with the Slice-4 tracker config (X-Product-Id header). AQ_FACTORY_ID="${AQ_FACTORY_ID:-$( (hostname -s 2>/dev/null || hostname 2>/dev/null || echo factory) | tr -cd 'A-Za-z0-9._-')-$$}" AQ_FLEET_LEASE_RENEW_SEC="${AQ_FLEET_LEASE_RENEW_SEC:-300}" # heartbeat/renew cadence AQ_FLEET_LEASE_SECONDS="${AQ_FLEET_LEASE_SECONDS:-900}" # requested lease duration AQ_FLEET_CAPS="${AQ_FLEET_CAPS:-}" # override caps (comma/space list) AQ_FLEET_CWD="${AQ_FLEET_CWD:-$PWD}" # cwd for claimed fleet jobs AQ_FLEET_API_CMD="${AQ_FLEET_API_CMD:-}" # test seam (stub script) AQ_FLEET_HB_TS=0 # last heartbeat epoch (mutable) # fleet_enabled — true iff the coordinator integration is switched on. fleet_enabled() { [[ "${AQ_FLEET:-0}" == 1 ]]; } # ── HTTP (curl only; same output contract as the Slice-4 tracker_api) ── # fleet_api [JSON] -> response body, then a final HTTP-code line. fleet_api() { local method=$1 path=$2 body=${3:-} if [[ -n "$AQ_FLEET_API_CMD" ]]; then "$AQ_FLEET_API_CMD" "$method" "$path" "$body" return $? fi local url="${AQ_FLEET_API}${path}" local -a args=(-sS -m "${AQ_FLEET_TIMEOUT:-30}" -X "$method" -H "Content-Type: application/json" -w '\n%{http_code}') [[ -n "$AQ_FLEET_TOKEN" ]] && args+=(-H "Authorization: Bearer $AQ_FLEET_TOKEN") [[ -n "$AQ_PRODUCT_ID" ]] && args+=(-H "X-Product-Id: $AQ_PRODUCT_ID") [[ -n "$body" ]] && args+=(--data "$body") local out rc out=$("$CURL_BIN" "${args[@]}" "$url" 2>/dev/null); rc=$? if [[ $rc -ne 0 ]]; then printf '%s\n000\n' "$out"; else printf '%s\n' "$out"; fi } # _fleet_call [JSON] -> sets globals FLEET_BODY + FLEET_CODE. _fleet_call() { local out; out=$(fleet_api "$@") FLEET_CODE=$(printf '%s' "$out" | tail -n1) FLEET_BODY=$(printf '%s' "$out" | sed '$d') } # _fleet_json_num (reads JSON on stdin) -> first numeric value for key. _fleet_json_num() { grep -oE "\"$1\"[[:space:]]*:[[:space:]]*-?[0-9]+" | head -1 | grep -oE -- '-?[0-9]+$' } # _fleet_is_job -> 0 if this job was claimed from the coordinator. _fleet_is_job() { [[ -n "$(_meta_val "$STATE/$1.meta" fleet_job_id)" ]]; } # fleet_detect_caps -> JSON array of capability tokens (override or auto-detected). fleet_detect_caps() { local toks if [[ -n "$AQ_FLEET_CAPS" ]]; then toks=$(printf '%s' "$AQ_FLEET_CAPS" | tr ', ' '\n\n') else toks=$(detect_capabilities) fi local out="[" first=1 t while IFS= read -r t; do [[ -n "$t" ]] || continue [[ $first -eq 1 ]] && first=0 || out+="," out+="\"$(_json_escape "$t")\"" done <<< "$toks" printf '%s]' "$out" } # ── Heartbeat (registration == first heartbeat) ───────────────────── fleet_heartbeat() { fleet_enabled || return 0 local caps load body caps=$(fleet_detect_caps) load=$(active_workers 2>/dev/null || echo 0) body="{\"factoryId\":\"$(_json_escape "$AQ_FACTORY_ID")\",\"capabilities\":$caps,\"health\":\"ok\",\"load\":${load:-0}}" _fleet_call POST "/fleet/factories/heartbeat" "$body" case "$FLEET_CODE" in 2*) AQ_FLEET_HB_TS=$(date +%s); return 0;; *) err "fleet: heartbeat failed (HTTP ${FLEET_CODE:-error}) — running degraded"; return 1;; esac } # fleet_heartbeat_maybe — heartbeat only when the cadence interval has elapsed. fleet_heartbeat_maybe() { fleet_enabled || return 0 local now; now=$(date +%s) [[ $(( now - ${AQ_FLEET_HB_TS:-0} )) -ge "${AQ_FLEET_LEASE_RENEW_SEC:-300}" ]] && fleet_heartbeat return 0 } # ── Claim — pull one job and materialize it as a local inbox .md ──── # Returns 0 = claimed + materialized, 2 = nothing claimable, 1 = API error. fleet_claim() { fleet_enabled || return 2 local caps body; caps=$(fleet_detect_caps) body="{\"factoryId\":\"$(_json_escape "$AQ_FACTORY_ID")\",\"capabilities\":$caps,\"leaseSeconds\":${AQ_FLEET_LEASE_SECONDS:-900}}" _fleet_call POST "/fleet/claim" "$body" case "$FLEET_CODE" in 2*) :;; *) err "fleet: claim failed (HTTP ${FLEET_CODE:-error})"; return 1;; esac printf '%s' "$FLEET_BODY" | grep -q '"claimed"[[:space:]]*:[[:space:]]*true' || return 2 local jid body_md epoch jid=$(printf '%s' "$FLEET_BODY" | _json_str id) body_md=$(printf '%s' "$FLEET_BODY" | _json_str bodyMd) epoch=$(printf '%s' "$FLEET_BODY" | _fleet_json_num leaseEpoch) [[ -n "$jid" ]] || { err "fleet: claim returned no job id"; return 1; } # Materialize a transient local job .md (same approach as from-tracker) so the # existing runner executes a coordinator job unchanged. fleet-job-id + # fleet-lease-epoch travel in frontmatter -> the job meta (see cmd_run). local safe tmpdir tmp safe=$(printf '%s' "$jid" | tr -c 'A-Za-z0-9._-' '_') tmpdir=$(mktemp -d "${TMPDIR:-/tmp}/aq-fleet.XXXXXX") tmp="$tmpdir/fleet-$safe.md" { echo "---" echo "cwd: $AQ_FLEET_CWD" echo "yolo: true" echo "fleet-job-id: $jid" echo "fleet-lease-epoch: ${epoch:-0}" echo "idempotency-key: fleet-$jid" echo "---" echo printf '%s\n' "$body_md" } > "$tmp" cmd_add "$tmp" >/dev/null 2>&1 rm -rf "$tmpdir" log "fleet: claimed job $C_BOLD$jid$C_RESET (leaseEpoch=${epoch:-0})" return 0 } # ── Report a fenced stage transition ──────────────────────────────── # fleet_report [with-checkpoint] -> 0 ok, 2 FENCED (stale epoch: # caller must self-abort), 1 degraded (coordinator unreachable: continue locally). fleet_report() { fleet_enabled || return 0 local job=$1 stage=$2 with_ckpt=${3:-} metaf jid epoch metaf="$STATE/$job.meta" jid=$(_meta_val "$metaf" fleet_job_id); epoch=$(_meta_val "$metaf" fleet_lease_epoch) [[ -n "$jid" ]] || return 0 local ckpt="" if [[ -n "$with_ckpt" ]]; then local wb wc; wb=$(_meta_val "$metaf" wip_branch); wc=$(_meta_val "$metaf" wip_commit) if [[ -n "$wb" ]]; then ckpt=",\"checkpoint\":{\"wipBranch\":\"$(_json_escape "$wb")\"" [[ -n "$wc" ]] && ckpt+=",\"wipCommit\":\"$(_json_escape "$wc")\"" ckpt+="}" fi fi # payload carries ONLY {stage, leaseEpoch, checkpoint} — never bodyMd/prompt/token. _fleet_call PATCH "/fleet/jobs/$jid" "{\"stage\":\"$stage\",\"leaseEpoch\":${epoch:-0}$ckpt}" case "$FLEET_CODE" in 2*) echo "fleet_reported=$stage" >> "$metaf"; return 0;; 409|412) err "fleet: FENCED reporting stage=$stage (stale leaseEpoch=$epoch) — self-aborting $job" echo "fleet_fenced=1" >> "$metaf"; return 2;; *) err "fleet: report stage=$stage failed (HTTP ${FLEET_CODE:-error}) — offline-degrade, continuing locally" echo "fleet_degraded=1" >> "$metaf"; return 1;; esac } # fleet_lease_renew -> extend the lease; 0 ok, 2 fenced, 1 degraded. fleet_lease_renew() { fleet_enabled || return 0 local job=$1 metaf jid epoch metaf="$STATE/$job.meta" jid=$(_meta_val "$metaf" fleet_job_id); epoch=$(_meta_val "$metaf" fleet_lease_epoch) [[ -n "$jid" ]] || return 0 _fleet_call POST "/fleet/jobs/$jid/lease/renew" "{\"leaseEpoch\":${epoch:-0},\"leaseSeconds\":${AQ_FLEET_LEASE_SECONDS:-900}}" case "$FLEET_CODE" in 2*) return 0;; 409|412) echo "fleet_fenced=1" >> "$metaf"; return 2;; *) return 1;; esac } # fleet_lease_release [stage] -> best-effort release on a terminal stage. fleet_lease_release() { fleet_enabled || return 0 local job=$1 stage=${2:-} metaf jid epoch body metaf="$STATE/$job.meta" jid=$(_meta_val "$metaf" fleet_job_id); epoch=$(_meta_val "$metaf" fleet_lease_epoch) [[ -n "$jid" ]] || return 0 body="{\"leaseEpoch\":${epoch:-0}" [[ -n "$stage" ]] && body+=",\"stage\":\"$stage\"" body+="}" _fleet_call POST "/fleet/jobs/$jid/lease/release" "$body" return 0 } # fleet_renew_active — renew leases for all in-flight (building/) fleet jobs. fleet_renew_active() { fleet_enabled || return 0 local f job for f in "$BUILDING"/*.md; do [[ -e "$f" ]] || continue job=$(basename "$f"); job=${job%.md} _fleet_is_job "$job" && { fleet_lease_renew "$job" >/dev/null 2>&1 || true; } done return 0 } # fleet_quarantine — a fenced (reclaimed) worker must # NOT ship: park the local result in failed/ for human triage (§9 split-brain). fleet_quarantine() { local job=$1 file=$2 metaf=$3 logf=$4 { echo "FLEET FENCED — the coordinator reclaimed this job (stale leaseEpoch)." echo "Quarantining the local result — NOT shipping/merging. Needs human triage. ($(date))" } >> "$logf" [[ -e "$file" ]] && mv "$file" "$FAILED/" 2>/dev/null { echo "result=fenced_quarantine"; echo "fleet_quarantined=1"; echo "ended=$(date +%s)"; } >> "$metaf" err "fleet: quarantined $job (fenced/reclaimed) — surfaced for human triage" } # _fleet_stage_for -> the coordinator stage for a job result/stage. _fleet_stage_for() { case "$1" in shipped) echo shipped;; testing) echo testing;; review) echo review;; failed|timeout|verify_failed|retries_exhausted|capability_mismatch|no_engine|rejected) echo failed;; *) echo building;; esac } # fleet-status — heartbeat (register) + print this factory's identity/caps. cmd_fleet_status() { ensure_dirs if ! fleet_enabled; then log "fleet: AQ_FLEET is off — running in offline git-queue mode (no coordinator)." return 0 fi log "fleet: factory=$C_BOLD$AQ_FACTORY_ID$C_RESET api=$AQ_FLEET_API" log "fleet: capabilities=$(fleet_detect_caps)" if fleet_heartbeat; then log "fleet: heartbeat OK (registered). Use 'run' to start claiming jobs." else err "fleet: coordinator unreachable — would run in offline-degrade mode." fi }