feat(agent-queue): fleet coordinator client library (lib/fleet-client.sh, P2-S3)
New sourced library implementing the factory side of the Phase-2 `fleet`
coordinator contract — curl-only + POSIX awk, reusing the Slice-4 HTTP/JSON
helper patterns, no new deps. Every function is a no-op unless AQ_FLEET=1.
- fleet_enabled / fleet_api (AQ_FLEET_API_CMD test seam) / _fleet_call
- fleet_detect_caps (reuses detect_capabilities) -> JSON caps array
- fleet_heartbeat (+ _maybe cadence): registration == first heartbeat
- fleet_claim: POST /fleet/claim, parse job id/bodyMd/leaseEpoch, materialize a
transient local .md (fleet-job-id + fleet-lease-epoch in frontmatter)
- fleet_report: PATCH fenced stage transition {stage, leaseEpoch, checkpoint?};
returns ok / FENCED(2, stale epoch -> self-abort) / degraded(1, unreachable)
- fleet_lease_renew / fleet_lease_release / fleet_renew_active (fenced)
- fleet_quarantine: park a reclaimed (fenced) job in failed/ for human triage
- cmd_fleet_status: register + print factory identity/caps
Report payloads carry only stage/epoch/checkpoint — never prompt/bodyMd/token.
This commit is contained in:
parent
10395983e7
commit
a10d4003e6
258
agent-queue/lib/fleet-client.sh
Normal file
258
agent-queue/lib/fleet-client.sh
Normal file
@ -0,0 +1,258 @@
|
||||
# shellcheck shell=bash
|
||||
# ── Fleet coordinator client (Phase 2, §7/§8/§9/§18) ────────────────
|
||||
#
|
||||
# Sourced by agent-queue.sh. Lets the single-host runner act as a "factory" that
|
||||
# registers / heartbeats / claims / reports against the platform-service `fleet`
|
||||
# coordinator — BEHIND the AQ_FLEET flag. When AQ_FLEET is unset/0, every function
|
||||
# here is an immediate no-op and the offline git-queue path is byte-for-byte
|
||||
# unchanged. curl-only + POSIX awk (reuses agent-queue.sh helpers: log/err,
|
||||
# _meta_val, _json_str, _json_escape, detect_capabilities, active_workers, CURL_BIN).
|
||||
#
|
||||
# Contract (routes under AQ_FLEET_API, which already includes /api):
|
||||
# POST /fleet/factories/heartbeat {factoryId, capabilities[], health, load}
|
||||
# POST /fleet/claim {factoryId, capabilities[], leaseSeconds}
|
||||
# -> {claimed, job{id,bodyMd,leaseEpoch}, lease{...}}
|
||||
# PATCH /fleet/jobs/:id {stage, leaseEpoch, checkpoint?} (409 = fenced)
|
||||
# POST /fleet/jobs/:id/lease/renew {leaseEpoch, leaseSeconds} (409 = fenced)
|
||||
# POST /fleet/jobs/:id/lease/release {leaseEpoch, stage?}
|
||||
# The coordinator owns leaseEpoch fencing + writes fleet_events server-side; there
|
||||
# is no client-side "register" or "append event" call (register == first heartbeat).
|
||||
|
||||
# ── Config (env-overridable) ────────────────────────────────────────
|
||||
AQ_FLEET="${AQ_FLEET:-0}" # master switch (0 = offline)
|
||||
AQ_FLEET_API="${AQ_FLEET_API:-http://localhost:4003/api}" # base URL incl. /api
|
||||
AQ_FLEET_TOKEN="${AQ_FLEET_TOKEN:-}" # bearer; never hardcode
|
||||
# AQ_PRODUCT_ID is shared with the Slice-4 tracker config (X-Product-Id header).
|
||||
AQ_FACTORY_ID="${AQ_FACTORY_ID:-$( (hostname -s 2>/dev/null || hostname 2>/dev/null || echo factory) | tr -cd 'A-Za-z0-9._-')-$$}"
|
||||
AQ_FLEET_LEASE_RENEW_SEC="${AQ_FLEET_LEASE_RENEW_SEC:-300}" # heartbeat/renew cadence
|
||||
AQ_FLEET_LEASE_SECONDS="${AQ_FLEET_LEASE_SECONDS:-900}" # requested lease duration
|
||||
AQ_FLEET_CAPS="${AQ_FLEET_CAPS:-}" # override caps (comma/space list)
|
||||
AQ_FLEET_CWD="${AQ_FLEET_CWD:-$PWD}" # cwd for claimed fleet jobs
|
||||
AQ_FLEET_API_CMD="${AQ_FLEET_API_CMD:-}" # test seam (stub script)
|
||||
AQ_FLEET_HB_TS=0 # last heartbeat epoch (mutable)
|
||||
|
||||
# fleet_enabled — true iff the coordinator integration is switched on.
|
||||
fleet_enabled() { [[ "${AQ_FLEET:-0}" == 1 ]]; }
|
||||
|
||||
# ── HTTP (curl only; same output contract as the Slice-4 tracker_api) ──
|
||||
# fleet_api <METHOD> <PATH> [JSON] -> response body, then a final HTTP-code line.
|
||||
fleet_api() {
|
||||
local method=$1 path=$2 body=${3:-}
|
||||
if [[ -n "$AQ_FLEET_API_CMD" ]]; then
|
||||
"$AQ_FLEET_API_CMD" "$method" "$path" "$body"
|
||||
return $?
|
||||
fi
|
||||
local url="${AQ_FLEET_API}${path}"
|
||||
local -a args=(-sS -m "${AQ_FLEET_TIMEOUT:-30}" -X "$method"
|
||||
-H "Content-Type: application/json" -w '\n%{http_code}')
|
||||
[[ -n "$AQ_FLEET_TOKEN" ]] && args+=(-H "Authorization: Bearer $AQ_FLEET_TOKEN")
|
||||
[[ -n "$AQ_PRODUCT_ID" ]] && args+=(-H "X-Product-Id: $AQ_PRODUCT_ID")
|
||||
[[ -n "$body" ]] && args+=(--data "$body")
|
||||
local out rc
|
||||
out=$("$CURL_BIN" "${args[@]}" "$url" 2>/dev/null); rc=$?
|
||||
if [[ $rc -ne 0 ]]; then printf '%s\n000\n' "$out"; else printf '%s\n' "$out"; fi
|
||||
}
|
||||
|
||||
# _fleet_call <METHOD> <PATH> [JSON] -> sets globals FLEET_BODY + FLEET_CODE.
|
||||
_fleet_call() {
|
||||
local out; out=$(fleet_api "$@")
|
||||
FLEET_CODE=$(printf '%s' "$out" | tail -n1)
|
||||
FLEET_BODY=$(printf '%s' "$out" | sed '$d')
|
||||
}
|
||||
|
||||
# _fleet_json_num <key> (reads JSON on stdin) -> first numeric value for key.
|
||||
_fleet_json_num() {
|
||||
grep -oE "\"$1\"[[:space:]]*:[[:space:]]*-?[0-9]+" | head -1 | grep -oE -- '-?[0-9]+$'
|
||||
}
|
||||
|
||||
# _fleet_is_job <job> -> 0 if this job was claimed from the coordinator.
|
||||
_fleet_is_job() { [[ -n "$(_meta_val "$STATE/$1.meta" fleet_job_id)" ]]; }
|
||||
|
||||
# fleet_detect_caps -> JSON array of capability tokens (override or auto-detected).
|
||||
fleet_detect_caps() {
|
||||
local toks
|
||||
if [[ -n "$AQ_FLEET_CAPS" ]]; then
|
||||
toks=$(printf '%s' "$AQ_FLEET_CAPS" | tr ', ' '\n\n')
|
||||
else
|
||||
toks=$(detect_capabilities)
|
||||
fi
|
||||
local out="[" first=1 t
|
||||
while IFS= read -r t; do
|
||||
[[ -n "$t" ]] || continue
|
||||
[[ $first -eq 1 ]] && first=0 || out+=","
|
||||
out+="\"$(_json_escape "$t")\""
|
||||
done <<< "$toks"
|
||||
printf '%s]' "$out"
|
||||
}
|
||||
|
||||
# ── Heartbeat (registration == first heartbeat) ─────────────────────
|
||||
fleet_heartbeat() {
|
||||
fleet_enabled || return 0
|
||||
local caps load body
|
||||
caps=$(fleet_detect_caps)
|
||||
load=$(active_workers 2>/dev/null || echo 0)
|
||||
body="{\"factoryId\":\"$(_json_escape "$AQ_FACTORY_ID")\",\"capabilities\":$caps,\"health\":\"ok\",\"load\":${load:-0}}"
|
||||
_fleet_call POST "/fleet/factories/heartbeat" "$body"
|
||||
case "$FLEET_CODE" in
|
||||
2*) AQ_FLEET_HB_TS=$(date +%s); return 0;;
|
||||
*) err "fleet: heartbeat failed (HTTP ${FLEET_CODE:-error}) — running degraded"; return 1;;
|
||||
esac
|
||||
}
|
||||
|
||||
# fleet_heartbeat_maybe — heartbeat only when the cadence interval has elapsed.
|
||||
fleet_heartbeat_maybe() {
|
||||
fleet_enabled || return 0
|
||||
local now; now=$(date +%s)
|
||||
[[ $(( now - ${AQ_FLEET_HB_TS:-0} )) -ge "${AQ_FLEET_LEASE_RENEW_SEC:-300}" ]] && fleet_heartbeat
|
||||
return 0
|
||||
}
|
||||
|
||||
# ── Claim — pull one job and materialize it as a local inbox .md ────
|
||||
# Returns 0 = claimed + materialized, 2 = nothing claimable, 1 = API error.
|
||||
fleet_claim() {
|
||||
fleet_enabled || return 2
|
||||
local caps body; caps=$(fleet_detect_caps)
|
||||
body="{\"factoryId\":\"$(_json_escape "$AQ_FACTORY_ID")\",\"capabilities\":$caps,\"leaseSeconds\":${AQ_FLEET_LEASE_SECONDS:-900}}"
|
||||
_fleet_call POST "/fleet/claim" "$body"
|
||||
case "$FLEET_CODE" in 2*) :;; *) err "fleet: claim failed (HTTP ${FLEET_CODE:-error})"; return 1;; esac
|
||||
printf '%s' "$FLEET_BODY" | grep -q '"claimed"[[:space:]]*:[[:space:]]*true' || return 2
|
||||
|
||||
local jid body_md epoch
|
||||
jid=$(printf '%s' "$FLEET_BODY" | _json_str id)
|
||||
body_md=$(printf '%s' "$FLEET_BODY" | _json_str bodyMd)
|
||||
epoch=$(printf '%s' "$FLEET_BODY" | _fleet_json_num leaseEpoch)
|
||||
[[ -n "$jid" ]] || { err "fleet: claim returned no job id"; return 1; }
|
||||
|
||||
# Materialize a transient local job .md (same approach as from-tracker) so the
|
||||
# existing runner executes a coordinator job unchanged. fleet-job-id +
|
||||
# fleet-lease-epoch travel in frontmatter -> the job meta (see cmd_run).
|
||||
local safe tmpdir tmp
|
||||
safe=$(printf '%s' "$jid" | tr -c 'A-Za-z0-9._-' '_')
|
||||
tmpdir=$(mktemp -d "${TMPDIR:-/tmp}/aq-fleet.XXXXXX")
|
||||
tmp="$tmpdir/fleet-$safe.md"
|
||||
{
|
||||
echo "---"
|
||||
echo "cwd: $AQ_FLEET_CWD"
|
||||
echo "yolo: true"
|
||||
echo "fleet-job-id: $jid"
|
||||
echo "fleet-lease-epoch: ${epoch:-0}"
|
||||
echo "idempotency-key: fleet-$jid"
|
||||
echo "---"
|
||||
echo
|
||||
printf '%s\n' "$body_md"
|
||||
} > "$tmp"
|
||||
cmd_add "$tmp" >/dev/null 2>&1
|
||||
rm -rf "$tmpdir"
|
||||
log "fleet: claimed job $C_BOLD$jid$C_RESET (leaseEpoch=${epoch:-0})"
|
||||
return 0
|
||||
}
|
||||
|
||||
# ── Report a fenced stage transition ────────────────────────────────
|
||||
# fleet_report <job> <stage> [with-checkpoint] -> 0 ok, 2 FENCED (stale epoch:
|
||||
# caller must self-abort), 1 degraded (coordinator unreachable: continue locally).
|
||||
fleet_report() {
|
||||
fleet_enabled || return 0
|
||||
local job=$1 stage=$2 with_ckpt=${3:-} metaf jid epoch
|
||||
metaf="$STATE/$job.meta"
|
||||
jid=$(_meta_val "$metaf" fleet_job_id); epoch=$(_meta_val "$metaf" fleet_lease_epoch)
|
||||
[[ -n "$jid" ]] || return 0
|
||||
local ckpt=""
|
||||
if [[ -n "$with_ckpt" ]]; then
|
||||
local wb wc; wb=$(_meta_val "$metaf" wip_branch); wc=$(_meta_val "$metaf" wip_commit)
|
||||
if [[ -n "$wb" ]]; then
|
||||
ckpt=",\"checkpoint\":{\"wipBranch\":\"$(_json_escape "$wb")\""
|
||||
[[ -n "$wc" ]] && ckpt+=",\"wipCommit\":\"$(_json_escape "$wc")\""
|
||||
ckpt+="}"
|
||||
fi
|
||||
fi
|
||||
# payload carries ONLY {stage, leaseEpoch, checkpoint} — never bodyMd/prompt/token.
|
||||
_fleet_call PATCH "/fleet/jobs/$jid" "{\"stage\":\"$stage\",\"leaseEpoch\":${epoch:-0}$ckpt}"
|
||||
case "$FLEET_CODE" in
|
||||
2*) echo "fleet_reported=$stage" >> "$metaf"; return 0;;
|
||||
409|412) err "fleet: FENCED reporting stage=$stage (stale leaseEpoch=$epoch) — self-aborting $job"
|
||||
echo "fleet_fenced=1" >> "$metaf"; return 2;;
|
||||
*) err "fleet: report stage=$stage failed (HTTP ${FLEET_CODE:-error}) — offline-degrade, continuing locally"
|
||||
echo "fleet_degraded=1" >> "$metaf"; return 1;;
|
||||
esac
|
||||
}
|
||||
|
||||
# fleet_lease_renew <job> -> extend the lease; 0 ok, 2 fenced, 1 degraded.
|
||||
fleet_lease_renew() {
|
||||
fleet_enabled || return 0
|
||||
local job=$1 metaf jid epoch
|
||||
metaf="$STATE/$job.meta"
|
||||
jid=$(_meta_val "$metaf" fleet_job_id); epoch=$(_meta_val "$metaf" fleet_lease_epoch)
|
||||
[[ -n "$jid" ]] || return 0
|
||||
_fleet_call POST "/fleet/jobs/$jid/lease/renew" "{\"leaseEpoch\":${epoch:-0},\"leaseSeconds\":${AQ_FLEET_LEASE_SECONDS:-900}}"
|
||||
case "$FLEET_CODE" in
|
||||
2*) return 0;;
|
||||
409|412) echo "fleet_fenced=1" >> "$metaf"; return 2;;
|
||||
*) return 1;;
|
||||
esac
|
||||
}
|
||||
|
||||
# fleet_lease_release <job> [stage] -> best-effort release on a terminal stage.
|
||||
fleet_lease_release() {
|
||||
fleet_enabled || return 0
|
||||
local job=$1 stage=${2:-} metaf jid epoch body
|
||||
metaf="$STATE/$job.meta"
|
||||
jid=$(_meta_val "$metaf" fleet_job_id); epoch=$(_meta_val "$metaf" fleet_lease_epoch)
|
||||
[[ -n "$jid" ]] || return 0
|
||||
body="{\"leaseEpoch\":${epoch:-0}"
|
||||
[[ -n "$stage" ]] && body+=",\"stage\":\"$stage\""
|
||||
body+="}"
|
||||
_fleet_call POST "/fleet/jobs/$jid/lease/release" "$body"
|
||||
return 0
|
||||
}
|
||||
|
||||
# fleet_renew_active — renew leases for all in-flight (building/) fleet jobs.
|
||||
fleet_renew_active() {
|
||||
fleet_enabled || return 0
|
||||
local f job
|
||||
for f in "$BUILDING"/*.md; do
|
||||
[[ -e "$f" ]] || continue
|
||||
job=$(basename "$f"); job=${job%.md}
|
||||
_fleet_is_job "$job" && { fleet_lease_renew "$job" >/dev/null 2>&1 || true; }
|
||||
done
|
||||
return 0
|
||||
}
|
||||
|
||||
# fleet_quarantine <job> <file> <metaf> <logf> — a fenced (reclaimed) worker must
|
||||
# NOT ship: park the local result in failed/ for human triage (§9 split-brain).
|
||||
fleet_quarantine() {
|
||||
local job=$1 file=$2 metaf=$3 logf=$4
|
||||
{
|
||||
echo "FLEET FENCED — the coordinator reclaimed this job (stale leaseEpoch)."
|
||||
echo "Quarantining the local result — NOT shipping/merging. Needs human triage. ($(date))"
|
||||
} >> "$logf"
|
||||
[[ -e "$file" ]] && mv "$file" "$FAILED/" 2>/dev/null
|
||||
{ echo "result=fenced_quarantine"; echo "fleet_quarantined=1"; echo "ended=$(date +%s)"; } >> "$metaf"
|
||||
err "fleet: quarantined $job (fenced/reclaimed) — surfaced for human triage"
|
||||
}
|
||||
|
||||
# _fleet_stage_for <result> -> the coordinator stage for a job result/stage.
|
||||
_fleet_stage_for() {
|
||||
case "$1" in
|
||||
shipped) echo shipped;;
|
||||
testing) echo testing;;
|
||||
review) echo review;;
|
||||
failed|timeout|verify_failed|retries_exhausted|capability_mismatch|no_engine|rejected) echo failed;;
|
||||
*) echo building;;
|
||||
esac
|
||||
}
|
||||
|
||||
# fleet-status — heartbeat (register) + print this factory's identity/caps.
|
||||
cmd_fleet_status() {
|
||||
ensure_dirs
|
||||
if ! fleet_enabled; then
|
||||
log "fleet: AQ_FLEET is off — running in offline git-queue mode (no coordinator)."
|
||||
return 0
|
||||
fi
|
||||
log "fleet: factory=$C_BOLD$AQ_FACTORY_ID$C_RESET api=$AQ_FLEET_API"
|
||||
log "fleet: capabilities=$(fleet_detect_caps)"
|
||||
if fleet_heartbeat; then
|
||||
log "fleet: heartbeat OK (registered). Use 'run' to start claiming jobs."
|
||||
else
|
||||
err "fleet: coordinator unreachable — would run in offline-degrade mode."
|
||||
fi
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user