From a10d4003e630359ec2d61bf797497082c42c320f Mon Sep 17 00:00:00 2001 From: saravanakumardb1 Date: Fri, 29 May 2026 22:43:06 -0700 Subject: [PATCH] feat(agent-queue): fleet coordinator client library (lib/fleet-client.sh, P2-S3) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit New sourced library implementing the factory side of the Phase-2 `fleet` coordinator contract — curl-only + POSIX awk, reusing the Slice-4 HTTP/JSON helper patterns, no new deps. Every function is a no-op unless AQ_FLEET=1. - fleet_enabled / fleet_api (AQ_FLEET_API_CMD test seam) / _fleet_call - fleet_detect_caps (reuses detect_capabilities) -> JSON caps array - fleet_heartbeat (+ _maybe cadence): registration == first heartbeat - fleet_claim: POST /fleet/claim, parse job id/bodyMd/leaseEpoch, materialize a transient local .md (fleet-job-id + fleet-lease-epoch in frontmatter) - fleet_report: PATCH fenced stage transition {stage, leaseEpoch, checkpoint?}; returns ok / FENCED(2, stale epoch -> self-abort) / degraded(1, unreachable) - fleet_lease_renew / fleet_lease_release / fleet_renew_active (fenced) - fleet_quarantine: park a reclaimed (fenced) job in failed/ for human triage - cmd_fleet_status: register + print factory identity/caps Report payloads carry only stage/epoch/checkpoint — never prompt/bodyMd/token. --- agent-queue/lib/fleet-client.sh | 258 ++++++++++++++++++++++++++++++++ 1 file changed, 258 insertions(+) create mode 100644 agent-queue/lib/fleet-client.sh diff --git a/agent-queue/lib/fleet-client.sh b/agent-queue/lib/fleet-client.sh new file mode 100644 index 0000000..be38e40 --- /dev/null +++ b/agent-queue/lib/fleet-client.sh @@ -0,0 +1,258 @@ +# shellcheck shell=bash +# ── Fleet coordinator client (Phase 2, §7/§8/§9/§18) ──────────────── +# +# Sourced by agent-queue.sh. Lets the single-host runner act as a "factory" that +# registers / heartbeats / claims / reports against the platform-service `fleet` +# coordinator — BEHIND the AQ_FLEET flag. When AQ_FLEET is unset/0, every function +# here is an immediate no-op and the offline git-queue path is byte-for-byte +# unchanged. curl-only + POSIX awk (reuses agent-queue.sh helpers: log/err, +# _meta_val, _json_str, _json_escape, detect_capabilities, active_workers, CURL_BIN). +# +# Contract (routes under AQ_FLEET_API, which already includes /api): +# POST /fleet/factories/heartbeat {factoryId, capabilities[], health, load} +# POST /fleet/claim {factoryId, capabilities[], leaseSeconds} +# -> {claimed, job{id,bodyMd,leaseEpoch}, lease{...}} +# PATCH /fleet/jobs/:id {stage, leaseEpoch, checkpoint?} (409 = fenced) +# POST /fleet/jobs/:id/lease/renew {leaseEpoch, leaseSeconds} (409 = fenced) +# POST /fleet/jobs/:id/lease/release {leaseEpoch, stage?} +# The coordinator owns leaseEpoch fencing + writes fleet_events server-side; there +# is no client-side "register" or "append event" call (register == first heartbeat). + +# ── Config (env-overridable) ──────────────────────────────────────── +AQ_FLEET="${AQ_FLEET:-0}" # master switch (0 = offline) +AQ_FLEET_API="${AQ_FLEET_API:-http://localhost:4003/api}" # base URL incl. /api +AQ_FLEET_TOKEN="${AQ_FLEET_TOKEN:-}" # bearer; never hardcode +# AQ_PRODUCT_ID is shared with the Slice-4 tracker config (X-Product-Id header). +AQ_FACTORY_ID="${AQ_FACTORY_ID:-$( (hostname -s 2>/dev/null || hostname 2>/dev/null || echo factory) | tr -cd 'A-Za-z0-9._-')-$$}" +AQ_FLEET_LEASE_RENEW_SEC="${AQ_FLEET_LEASE_RENEW_SEC:-300}" # heartbeat/renew cadence +AQ_FLEET_LEASE_SECONDS="${AQ_FLEET_LEASE_SECONDS:-900}" # requested lease duration +AQ_FLEET_CAPS="${AQ_FLEET_CAPS:-}" # override caps (comma/space list) +AQ_FLEET_CWD="${AQ_FLEET_CWD:-$PWD}" # cwd for claimed fleet jobs +AQ_FLEET_API_CMD="${AQ_FLEET_API_CMD:-}" # test seam (stub script) +AQ_FLEET_HB_TS=0 # last heartbeat epoch (mutable) + +# fleet_enabled — true iff the coordinator integration is switched on. +fleet_enabled() { [[ "${AQ_FLEET:-0}" == 1 ]]; } + +# ── HTTP (curl only; same output contract as the Slice-4 tracker_api) ── +# fleet_api [JSON] -> response body, then a final HTTP-code line. +fleet_api() { + local method=$1 path=$2 body=${3:-} + if [[ -n "$AQ_FLEET_API_CMD" ]]; then + "$AQ_FLEET_API_CMD" "$method" "$path" "$body" + return $? + fi + local url="${AQ_FLEET_API}${path}" + local -a args=(-sS -m "${AQ_FLEET_TIMEOUT:-30}" -X "$method" + -H "Content-Type: application/json" -w '\n%{http_code}') + [[ -n "$AQ_FLEET_TOKEN" ]] && args+=(-H "Authorization: Bearer $AQ_FLEET_TOKEN") + [[ -n "$AQ_PRODUCT_ID" ]] && args+=(-H "X-Product-Id: $AQ_PRODUCT_ID") + [[ -n "$body" ]] && args+=(--data "$body") + local out rc + out=$("$CURL_BIN" "${args[@]}" "$url" 2>/dev/null); rc=$? + if [[ $rc -ne 0 ]]; then printf '%s\n000\n' "$out"; else printf '%s\n' "$out"; fi +} + +# _fleet_call [JSON] -> sets globals FLEET_BODY + FLEET_CODE. +_fleet_call() { + local out; out=$(fleet_api "$@") + FLEET_CODE=$(printf '%s' "$out" | tail -n1) + FLEET_BODY=$(printf '%s' "$out" | sed '$d') +} + +# _fleet_json_num (reads JSON on stdin) -> first numeric value for key. +_fleet_json_num() { + grep -oE "\"$1\"[[:space:]]*:[[:space:]]*-?[0-9]+" | head -1 | grep -oE -- '-?[0-9]+$' +} + +# _fleet_is_job -> 0 if this job was claimed from the coordinator. +_fleet_is_job() { [[ -n "$(_meta_val "$STATE/$1.meta" fleet_job_id)" ]]; } + +# fleet_detect_caps -> JSON array of capability tokens (override or auto-detected). +fleet_detect_caps() { + local toks + if [[ -n "$AQ_FLEET_CAPS" ]]; then + toks=$(printf '%s' "$AQ_FLEET_CAPS" | tr ', ' '\n\n') + else + toks=$(detect_capabilities) + fi + local out="[" first=1 t + while IFS= read -r t; do + [[ -n "$t" ]] || continue + [[ $first -eq 1 ]] && first=0 || out+="," + out+="\"$(_json_escape "$t")\"" + done <<< "$toks" + printf '%s]' "$out" +} + +# ── Heartbeat (registration == first heartbeat) ───────────────────── +fleet_heartbeat() { + fleet_enabled || return 0 + local caps load body + caps=$(fleet_detect_caps) + load=$(active_workers 2>/dev/null || echo 0) + body="{\"factoryId\":\"$(_json_escape "$AQ_FACTORY_ID")\",\"capabilities\":$caps,\"health\":\"ok\",\"load\":${load:-0}}" + _fleet_call POST "/fleet/factories/heartbeat" "$body" + case "$FLEET_CODE" in + 2*) AQ_FLEET_HB_TS=$(date +%s); return 0;; + *) err "fleet: heartbeat failed (HTTP ${FLEET_CODE:-error}) — running degraded"; return 1;; + esac +} + +# fleet_heartbeat_maybe — heartbeat only when the cadence interval has elapsed. +fleet_heartbeat_maybe() { + fleet_enabled || return 0 + local now; now=$(date +%s) + [[ $(( now - ${AQ_FLEET_HB_TS:-0} )) -ge "${AQ_FLEET_LEASE_RENEW_SEC:-300}" ]] && fleet_heartbeat + return 0 +} + +# ── Claim — pull one job and materialize it as a local inbox .md ──── +# Returns 0 = claimed + materialized, 2 = nothing claimable, 1 = API error. +fleet_claim() { + fleet_enabled || return 2 + local caps body; caps=$(fleet_detect_caps) + body="{\"factoryId\":\"$(_json_escape "$AQ_FACTORY_ID")\",\"capabilities\":$caps,\"leaseSeconds\":${AQ_FLEET_LEASE_SECONDS:-900}}" + _fleet_call POST "/fleet/claim" "$body" + case "$FLEET_CODE" in 2*) :;; *) err "fleet: claim failed (HTTP ${FLEET_CODE:-error})"; return 1;; esac + printf '%s' "$FLEET_BODY" | grep -q '"claimed"[[:space:]]*:[[:space:]]*true' || return 2 + + local jid body_md epoch + jid=$(printf '%s' "$FLEET_BODY" | _json_str id) + body_md=$(printf '%s' "$FLEET_BODY" | _json_str bodyMd) + epoch=$(printf '%s' "$FLEET_BODY" | _fleet_json_num leaseEpoch) + [[ -n "$jid" ]] || { err "fleet: claim returned no job id"; return 1; } + + # Materialize a transient local job .md (same approach as from-tracker) so the + # existing runner executes a coordinator job unchanged. fleet-job-id + + # fleet-lease-epoch travel in frontmatter -> the job meta (see cmd_run). + local safe tmpdir tmp + safe=$(printf '%s' "$jid" | tr -c 'A-Za-z0-9._-' '_') + tmpdir=$(mktemp -d "${TMPDIR:-/tmp}/aq-fleet.XXXXXX") + tmp="$tmpdir/fleet-$safe.md" + { + echo "---" + echo "cwd: $AQ_FLEET_CWD" + echo "yolo: true" + echo "fleet-job-id: $jid" + echo "fleet-lease-epoch: ${epoch:-0}" + echo "idempotency-key: fleet-$jid" + echo "---" + echo + printf '%s\n' "$body_md" + } > "$tmp" + cmd_add "$tmp" >/dev/null 2>&1 + rm -rf "$tmpdir" + log "fleet: claimed job $C_BOLD$jid$C_RESET (leaseEpoch=${epoch:-0})" + return 0 +} + +# ── Report a fenced stage transition ──────────────────────────────── +# fleet_report [with-checkpoint] -> 0 ok, 2 FENCED (stale epoch: +# caller must self-abort), 1 degraded (coordinator unreachable: continue locally). +fleet_report() { + fleet_enabled || return 0 + local job=$1 stage=$2 with_ckpt=${3:-} metaf jid epoch + metaf="$STATE/$job.meta" + jid=$(_meta_val "$metaf" fleet_job_id); epoch=$(_meta_val "$metaf" fleet_lease_epoch) + [[ -n "$jid" ]] || return 0 + local ckpt="" + if [[ -n "$with_ckpt" ]]; then + local wb wc; wb=$(_meta_val "$metaf" wip_branch); wc=$(_meta_val "$metaf" wip_commit) + if [[ -n "$wb" ]]; then + ckpt=",\"checkpoint\":{\"wipBranch\":\"$(_json_escape "$wb")\"" + [[ -n "$wc" ]] && ckpt+=",\"wipCommit\":\"$(_json_escape "$wc")\"" + ckpt+="}" + fi + fi + # payload carries ONLY {stage, leaseEpoch, checkpoint} — never bodyMd/prompt/token. + _fleet_call PATCH "/fleet/jobs/$jid" "{\"stage\":\"$stage\",\"leaseEpoch\":${epoch:-0}$ckpt}" + case "$FLEET_CODE" in + 2*) echo "fleet_reported=$stage" >> "$metaf"; return 0;; + 409|412) err "fleet: FENCED reporting stage=$stage (stale leaseEpoch=$epoch) — self-aborting $job" + echo "fleet_fenced=1" >> "$metaf"; return 2;; + *) err "fleet: report stage=$stage failed (HTTP ${FLEET_CODE:-error}) — offline-degrade, continuing locally" + echo "fleet_degraded=1" >> "$metaf"; return 1;; + esac +} + +# fleet_lease_renew -> extend the lease; 0 ok, 2 fenced, 1 degraded. +fleet_lease_renew() { + fleet_enabled || return 0 + local job=$1 metaf jid epoch + metaf="$STATE/$job.meta" + jid=$(_meta_val "$metaf" fleet_job_id); epoch=$(_meta_val "$metaf" fleet_lease_epoch) + [[ -n "$jid" ]] || return 0 + _fleet_call POST "/fleet/jobs/$jid/lease/renew" "{\"leaseEpoch\":${epoch:-0},\"leaseSeconds\":${AQ_FLEET_LEASE_SECONDS:-900}}" + case "$FLEET_CODE" in + 2*) return 0;; + 409|412) echo "fleet_fenced=1" >> "$metaf"; return 2;; + *) return 1;; + esac +} + +# fleet_lease_release [stage] -> best-effort release on a terminal stage. +fleet_lease_release() { + fleet_enabled || return 0 + local job=$1 stage=${2:-} metaf jid epoch body + metaf="$STATE/$job.meta" + jid=$(_meta_val "$metaf" fleet_job_id); epoch=$(_meta_val "$metaf" fleet_lease_epoch) + [[ -n "$jid" ]] || return 0 + body="{\"leaseEpoch\":${epoch:-0}" + [[ -n "$stage" ]] && body+=",\"stage\":\"$stage\"" + body+="}" + _fleet_call POST "/fleet/jobs/$jid/lease/release" "$body" + return 0 +} + +# fleet_renew_active — renew leases for all in-flight (building/) fleet jobs. +fleet_renew_active() { + fleet_enabled || return 0 + local f job + for f in "$BUILDING"/*.md; do + [[ -e "$f" ]] || continue + job=$(basename "$f"); job=${job%.md} + _fleet_is_job "$job" && { fleet_lease_renew "$job" >/dev/null 2>&1 || true; } + done + return 0 +} + +# fleet_quarantine — a fenced (reclaimed) worker must +# NOT ship: park the local result in failed/ for human triage (§9 split-brain). +fleet_quarantine() { + local job=$1 file=$2 metaf=$3 logf=$4 + { + echo "FLEET FENCED — the coordinator reclaimed this job (stale leaseEpoch)." + echo "Quarantining the local result — NOT shipping/merging. Needs human triage. ($(date))" + } >> "$logf" + [[ -e "$file" ]] && mv "$file" "$FAILED/" 2>/dev/null + { echo "result=fenced_quarantine"; echo "fleet_quarantined=1"; echo "ended=$(date +%s)"; } >> "$metaf" + err "fleet: quarantined $job (fenced/reclaimed) — surfaced for human triage" +} + +# _fleet_stage_for -> the coordinator stage for a job result/stage. +_fleet_stage_for() { + case "$1" in + shipped) echo shipped;; + testing) echo testing;; + review) echo review;; + failed|timeout|verify_failed|retries_exhausted|capability_mismatch|no_engine|rejected) echo failed;; + *) echo building;; + esac +} + +# fleet-status — heartbeat (register) + print this factory's identity/caps. +cmd_fleet_status() { + ensure_dirs + if ! fleet_enabled; then + log "fleet: AQ_FLEET is off — running in offline git-queue mode (no coordinator)." + return 0 + fi + log "fleet: factory=$C_BOLD$AQ_FACTORY_ID$C_RESET api=$AQ_FLEET_API" + log "fleet: capabilities=$(fleet_detect_caps)" + if fleet_heartbeat; then + log "fleet: heartbeat OK (registered). Use 'run' to start claiming jobs." + else + err "fleet: coordinator unreachable — would run in offline-degrade mode." + fi +}