Adds AQ_FLEET_GATE (default OFF): the run loop point-reads the cheap per-product queue version (GET /fleet/queue-state) and SKIPS the expensive /fleet/claim while the version is unchanged and it is not mid-drain, with a periodic safety backstop and fail-open-on-read-error so work is never stranded. Keeps POLL_SECONDS for local job responsiveness rather than raising it globally. selftest 39b covers the gate decisions; reconciles the M0 section of the dispatch redesign doc. Generated with [Devin](https://cli.devin.ai/docs) Co-Authored-By: Devin <158243242+devin-ai-integration[bot]@users.noreply.github.com>
543 lines
27 KiB
Bash
543 lines
27 KiB
Bash
# shellcheck shell=bash
|
|
# ── Fleet coordinator client (Phase 2, §7/§8/§9/§18) ────────────────
|
|
#
|
|
# Sourced by agent-queue.sh. Lets the single-host runner act as a "factory" that
|
|
# registers / heartbeats / claims / reports against the platform-service `fleet`
|
|
# coordinator — BEHIND the AQ_FLEET flag. When AQ_FLEET is unset/0, every function
|
|
# here is an immediate no-op and the offline git-queue path is byte-for-byte
|
|
# unchanged. curl-only + POSIX awk (reuses agent-queue.sh helpers: log/err,
|
|
# _meta_val, _json_str, _json_escape, detect_capabilities, active_workers, CURL_BIN).
|
|
#
|
|
# Contract (routes under AQ_FLEET_API, which already includes /api):
|
|
# POST /fleet/factories/heartbeat {factoryId, capabilities[], health, load}
|
|
# POST /fleet/claim {factoryId, capabilities[], leaseSeconds}
|
|
# -> {claimed, job{id,bodyMd,leaseEpoch}, lease{...}}
|
|
# PATCH /fleet/jobs/:id {stage, leaseEpoch, checkpoint?} (409 = fenced)
|
|
# POST /fleet/jobs/:id/lease/renew {leaseEpoch, leaseSeconds} (409 = fenced)
|
|
# POST /fleet/jobs/:id/lease/release {leaseEpoch, stage?}
|
|
# The coordinator owns leaseEpoch fencing + writes fleet_events server-side; there
|
|
# is no client-side "register" or "append event" call (register == first heartbeat).
|
|
|
|
# ── Config (env-overridable) ────────────────────────────────────────
|
|
AQ_FLEET="${AQ_FLEET:-0}" # master switch (0 = offline)
|
|
AQ_FLEET_API="${AQ_FLEET_API:-http://localhost:4003/api}" # base URL incl. /api
|
|
# Normalize: platform-service mounts the fleet routes under /api. Strip a trailing
|
|
# slash and append /api unless already present, so AQ_FLEET_API=http://host:4003
|
|
# (the natural form) works too instead of silently 404ing every fleet call.
|
|
AQ_FLEET_API="${AQ_FLEET_API%/}"
|
|
[[ "$AQ_FLEET_API" == */api ]] || AQ_FLEET_API="${AQ_FLEET_API}/api"
|
|
AQ_FLEET_TOKEN="${AQ_FLEET_TOKEN:-}" # bearer; never hardcode
|
|
# AQ_PRODUCT_ID is shared with the Slice-4 tracker config (X-Product-Id header).
|
|
AQ_FACTORY_ID="${AQ_FACTORY_ID:-$( (hostname -s 2>/dev/null || hostname 2>/dev/null || echo factory) | tr -cd 'A-Za-z0-9._-')-$$}"
|
|
AQ_FLEET_LEASE_RENEW_SEC="${AQ_FLEET_LEASE_RENEW_SEC:-300}" # heartbeat/renew cadence
|
|
AQ_FLEET_LEASE_SECONDS="${AQ_FLEET_LEASE_SECONDS:-900}" # requested lease duration
|
|
AQ_FLEET_CAPS="${AQ_FLEET_CAPS:-}" # override caps (comma/space list)
|
|
AQ_FLEET_CWD="${AQ_FLEET_CWD:-$PWD}" # cwd for claimed fleet jobs
|
|
AQ_FLEET_API_CMD="${AQ_FLEET_API_CMD:-}" # test seam (stub script)
|
|
AQ_FLEET_HB_TS=0 # last heartbeat epoch (mutable)
|
|
|
|
# ── Slice 4: feature-flag levels (three explicit, independently-toggleable) ──
|
|
# Precedence (documented in README §Cutover):
|
|
# AQ_FLEET=0 ⇒ pure offline, ZERO coordinator calls (master switch).
|
|
# AQ_FLEET_ROUTE=1 ⇒ route_via_service: coordinator is AUTHORITATIVE for claim
|
|
# (default; preserves the P2-S3 behavior).
|
|
# AQ_FLEET_ROUTE=0 ⇒ LOCAL inbox is authoritative (coordinator not used to
|
|
# source work) — the pre-cutover state.
|
|
# AQ_FLEET_SHADOW=1 ⇒ shadow/dual-run (requires AQ_FLEET=1 AND AQ_FLEET_ROUTE=0):
|
|
# run the normal offline path as authoritative AND query the coordinator in
|
|
# parallel WITHOUT acting on its responses, purely to record divergence.
|
|
# If AQ_FLEET_ROUTE=1 AND AQ_FLEET_SHADOW=1, ROUTE WINS and shadow is disabled
|
|
# (a one-shot warning is logged) — you never shadow and route at the same time.
|
|
AQ_FLEET_ROUTE="${AQ_FLEET_ROUTE:-1}"
|
|
# AQ_FLEET_AUTOSHIP=1 ⇒ when the factory's local verify gate passes, advance the
|
|
# coordinator job testing -> shipped (the factory's verify IS the test phase).
|
|
# Default 0 keeps the human review gate authoritative (job rests at testing).
|
|
AQ_FLEET_AUTOSHIP="${AQ_FLEET_AUTOSHIP:-0}"
|
|
# AQ_FLEET_PR=1 ⇒ for jobs that carry a `repo`, run the agent in an isolated
|
|
# checkout on branch aq/job/<id>, then commit/push and open a PR; the PR URL is
|
|
# reported back and recorded on the run. Checkouts are cached under AQ_FLEET_REPOS_DIR.
|
|
AQ_FLEET_PR="${AQ_FLEET_PR:-0}"
|
|
AQ_FLEET_REPOS_DIR="${AQ_FLEET_REPOS_DIR:-}" # default resolved to $STATE/repos at call time
|
|
AQ_FLEET_SHADOW="${AQ_FLEET_SHADOW:-0}"
|
|
# Isolated factory id for the read-only shadow claim (never the real factory id).
|
|
AQ_FLEET_SHADOW_FACTORY_ID="${AQ_FLEET_SHADOW_FACTORY_ID:-${AQ_FACTORY_ID}-shadow}"
|
|
# Shadow divergence log (default resolved to $STATE/fleet-shadow.log at call time).
|
|
AQ_FLEET_SHADOW_LOG="${AQ_FLEET_SHADOW_LOG:-}"
|
|
_AQ_FLEET_SHADOW_WARNED=0 # one-shot ROUTE>SHADOW precedence warning (per process)
|
|
SHADOW_COORD_JOB="" # set by fleet_shadow_claim: would-be coordinator job id
|
|
|
|
# ── §M0 RU gate (docs/GIGAFACTORY/FLEET_DISPATCH_REDESIGN.md §8/§12) ──
|
|
# When ON, the run loop point-reads a cheap per-product queue version
|
|
# (GET /fleet/queue-state, ~1 RU) and SKIPS the expensive claim while nothing has
|
|
# changed and we are not mid-drain — slashing idle Cosmos RU. Default OFF
|
|
# (opt-in): behavior is byte-for-byte unchanged unless AQ_FLEET_GATE=1, and the
|
|
# gate always FAILS OPEN (claims) on any read error so work is never stranded.
|
|
AQ_FLEET_GATE="${AQ_FLEET_GATE:-0}"
|
|
# Force a full claim at least this often even when the gate is unchanged (backstops
|
|
# a missed/raced version bump). 0 disables the periodic backstop.
|
|
AQ_FLEET_GATE_SAFETY_SEC="${AQ_FLEET_GATE_SAFETY_SEC:-300}"
|
|
AQ_FLEET_GATE_SEEN="" # last-seen queue version (mutable, per process)
|
|
AQ_FLEET_GATE_TS=0 # epoch of the last full (drained) claim attempt
|
|
AQ_FLEET_GATE_DRAINING=1 # 1 = keep claiming (last claim got a job / startup)
|
|
|
|
# fleet_enabled — true iff the coordinator integration is switched on.
|
|
fleet_enabled() { [[ "${AQ_FLEET:-0}" == 1 ]]; }
|
|
|
|
# fleet_route_enabled — coordinator is authoritative for claim/assignment (ROUTE=1).
|
|
fleet_route_enabled() { fleet_enabled && [[ "${AQ_FLEET_ROUTE:-1}" == 1 ]]; }
|
|
|
|
# fleet_shadow_enabled — shadow/dual-run is active. Pure (no logging): requires
|
|
# AQ_FLEET=1 AND AQ_FLEET_ROUTE=0 AND AQ_FLEET_SHADOW=1. When ROUTE=1 this returns
|
|
# false (ROUTE wins) — the precedence warning is emitted once by fleet_flags_warn_once.
|
|
fleet_shadow_enabled() {
|
|
fleet_enabled || return 1
|
|
[[ "${AQ_FLEET_ROUTE:-1}" == 0 ]] || return 1
|
|
[[ "${AQ_FLEET_SHADOW:-0}" == 1 ]]
|
|
}
|
|
|
|
# fleet_flags_warn_once — emit the ROUTE>SHADOW precedence warning at most once.
|
|
# Called from the run-loop init so an operator who sets ROUTE=1 + SHADOW=1 is told
|
|
# that shadow is suppressed. No-op unless that exact (conflicting) combo is set.
|
|
fleet_flags_warn_once() {
|
|
fleet_enabled || return 0
|
|
if [[ "${AQ_FLEET_ROUTE:-1}" == 1 && "${AQ_FLEET_SHADOW:-0}" == 1 && "${_AQ_FLEET_SHADOW_WARNED:-0}" != 1 ]]; then
|
|
err "fleet: AQ_FLEET_ROUTE=1 and AQ_FLEET_SHADOW=1 — ROUTE wins; shadow/dual-run is DISABLED. Set AQ_FLEET_ROUTE=0 to shadow."
|
|
_AQ_FLEET_SHADOW_WARNED=1
|
|
fi
|
|
return 0
|
|
}
|
|
|
|
# fleet_flags_state — one-line resolved flag summary (for `status` / `fleet-status`).
|
|
fleet_flags_state() {
|
|
local route shadow
|
|
if [[ "${AQ_FLEET_ROUTE:-1}" == 1 ]]; then route="route_via_service"; else route="local-authoritative"; fi
|
|
if fleet_shadow_enabled; then shadow="shadow=ON"; else shadow="shadow=off"; fi
|
|
printf 'AQ_FLEET=1 route=%s(AQ_FLEET_ROUTE=%s) %s(AQ_FLEET_SHADOW=%s)' \
|
|
"$route" "${AQ_FLEET_ROUTE:-1}" "$shadow" "${AQ_FLEET_SHADOW:-0}"
|
|
}
|
|
|
|
# ── HTTP (curl only; same output contract as the Slice-4 tracker_api) ──
|
|
# fleet_api <METHOD> <PATH> [JSON] -> response body, then a final HTTP-code line.
|
|
fleet_api() {
|
|
local method=$1 path=$2 body=${3:-}
|
|
if [[ -n "$AQ_FLEET_API_CMD" ]]; then
|
|
"$AQ_FLEET_API_CMD" "$method" "$path" "$body"
|
|
return $?
|
|
fi
|
|
local url="${AQ_FLEET_API}${path}"
|
|
local -a args=(-sS -m "${AQ_FLEET_TIMEOUT:-30}" -X "$method"
|
|
-H "Content-Type: application/json" -w '\n%{http_code}')
|
|
[[ -n "$AQ_FLEET_TOKEN" ]] && args+=(-H "Authorization: Bearer $AQ_FLEET_TOKEN")
|
|
[[ -n "$AQ_PRODUCT_ID" ]] && args+=(-H "X-Product-Id: $AQ_PRODUCT_ID")
|
|
[[ -n "$body" ]] && args+=(--data "$body")
|
|
local out rc
|
|
out=$("$CURL_BIN" "${args[@]}" "$url" 2>/dev/null); rc=$?
|
|
if [[ $rc -ne 0 ]]; then printf '%s\n000\n' "$out"; else printf '%s\n' "$out"; fi
|
|
}
|
|
|
|
# _fleet_call <METHOD> <PATH> [JSON] -> sets globals FLEET_BODY + FLEET_CODE.
|
|
_fleet_call() {
|
|
local out; out=$(fleet_api "$@")
|
|
FLEET_CODE=$(printf '%s' "$out" | tail -n1)
|
|
FLEET_BODY=$(printf '%s' "$out" | sed '$d')
|
|
}
|
|
|
|
# _fleet_json_num <key> (reads JSON on stdin) -> first numeric value for key.
|
|
_fleet_json_num() {
|
|
grep -oE "\"$1\"[[:space:]]*:[[:space:]]*-?[0-9]+" | head -1 | grep -oE -- '-?[0-9]+$'
|
|
}
|
|
|
|
# _fleet_is_job <job> -> 0 if this job was claimed from the coordinator.
|
|
_fleet_is_job() { [[ -n "$(_meta_val "$STATE/$1.meta" fleet_job_id)" ]]; }
|
|
|
|
# fleet_detect_caps -> JSON array of capability tokens (override or auto-detected).
|
|
fleet_detect_caps() {
|
|
local toks
|
|
if [[ -n "$AQ_FLEET_CAPS" ]]; then
|
|
toks=$(printf '%s' "$AQ_FLEET_CAPS" | tr ', ' '\n\n')
|
|
else
|
|
toks=$(detect_capabilities)
|
|
fi
|
|
local out="[" first=1 t
|
|
while IFS= read -r t; do
|
|
[[ -n "$t" ]] || continue
|
|
[[ $first -eq 1 ]] && first=0 || out+=","
|
|
out+="\"$(_json_escape "$t")\""
|
|
done <<< "$toks"
|
|
printf '%s]' "$out"
|
|
}
|
|
|
|
# ── Heartbeat (registration == first heartbeat) ─────────────────────
|
|
fleet_heartbeat() {
|
|
fleet_enabled || return 0
|
|
local caps load body
|
|
caps=$(fleet_detect_caps)
|
|
load=$(active_workers 2>/dev/null || echo 0)
|
|
body="{\"factoryId\":\"$(_json_escape "$AQ_FACTORY_ID")\",\"capabilities\":$caps,\"health\":\"ok\",\"load\":${load:-0}}"
|
|
_fleet_call POST "/fleet/factories/heartbeat" "$body"
|
|
case "$FLEET_CODE" in
|
|
2*) AQ_FLEET_HB_TS=$(date +%s); return 0;;
|
|
*) err "fleet: heartbeat failed (HTTP ${FLEET_CODE:-error}) — running degraded"; return 1;;
|
|
esac
|
|
}
|
|
|
|
# fleet_heartbeat_maybe — heartbeat only when the cadence interval has elapsed.
|
|
fleet_heartbeat_maybe() {
|
|
fleet_enabled || return 0
|
|
local now; now=$(date +%s)
|
|
[[ $(( now - ${AQ_FLEET_HB_TS:-0} )) -ge "${AQ_FLEET_LEASE_RENEW_SEC:-300}" ]] && fleet_heartbeat
|
|
return 0
|
|
}
|
|
|
|
# ── §M0 RU gate helpers ─────────────────────────────────────────────
|
|
# fleet_gate_enabled — true iff the cheap-poll gate is switched on.
|
|
fleet_gate_enabled() { fleet_enabled && [[ "${AQ_FLEET_GATE:-0}" == 1 ]]; }
|
|
|
|
# fleet_queue_version — print the product's queue version (GET /fleet/queue-state);
|
|
# return non-zero on any read failure so callers can fail open.
|
|
fleet_queue_version() {
|
|
_fleet_call GET "/fleet/queue-state"
|
|
case "$FLEET_CODE" in 2*) :;; *) return 1;; esac
|
|
printf '%s' "$FLEET_BODY" | _fleet_json_num version
|
|
}
|
|
|
|
# fleet_gate_should_claim — 0 = run the (expensive) claim this tick, 1 = skip it.
|
|
# Read-only. Fails OPEN (claim) on any uncertainty so work is never stranded.
|
|
# Always 0 when the gate is OFF, preserving the pre-gate behavior exactly.
|
|
fleet_gate_should_claim() {
|
|
fleet_gate_enabled || return 0 # gate off -> always claim
|
|
[[ "${AQ_FLEET_GATE_DRAINING:-1}" == 1 ]] && return 0 # mid-drain -> keep claiming
|
|
local now; now=$(date +%s)
|
|
if [[ "${AQ_FLEET_GATE_SAFETY_SEC:-0}" -gt 0 \
|
|
&& $(( now - ${AQ_FLEET_GATE_TS:-0} )) -ge "${AQ_FLEET_GATE_SAFETY_SEC}" ]]; then
|
|
return 0 # periodic safety backstop
|
|
fi
|
|
local v; v=$(fleet_queue_version) || return 0 # read failed -> fail open
|
|
[[ -n "$v" ]] || return 0
|
|
[[ "$v" != "${AQ_FLEET_GATE_SEEN:-}" ]] && return 0 # changed -> claim
|
|
return 1 # unchanged + within backstop -> skip
|
|
}
|
|
|
|
# fleet_gate_note_claim <claim_rc> — update gate state after a claim attempt.
|
|
# rc 0 (claimed a job) -> stay draining (there may be more, keep claiming).
|
|
# rc 2 (nothing claimable) / 1 (API error) -> arm the gate: record the current
|
|
# version + timestamp and stop draining, so we skip until the version changes.
|
|
fleet_gate_note_claim() {
|
|
fleet_gate_enabled || return 0
|
|
if [[ "${1:-1}" == 0 ]]; then AQ_FLEET_GATE_DRAINING=1; return 0; fi
|
|
AQ_FLEET_GATE_DRAINING=0
|
|
AQ_FLEET_GATE_TS=$(date +%s)
|
|
local v; v=$(fleet_queue_version) && [[ -n "$v" ]] && AQ_FLEET_GATE_SEEN="$v"
|
|
return 0
|
|
}
|
|
|
|
# ── Claim — pull one job and materialize it as a local inbox .md ────
|
|
# Returns 0 = claimed + materialized, 2 = nothing claimable, 1 = API error.
|
|
fleet_claim() {
|
|
fleet_enabled || return 2
|
|
local caps body; caps=$(fleet_detect_caps)
|
|
body="{\"factoryId\":\"$(_json_escape "$AQ_FACTORY_ID")\",\"capabilities\":$caps,\"leaseSeconds\":${AQ_FLEET_LEASE_SECONDS:-900}}"
|
|
_fleet_call POST "/fleet/claim" "$body"
|
|
case "$FLEET_CODE" in 2*) :;; *) err "fleet: claim failed (HTTP ${FLEET_CODE:-error})"; return 1;; esac
|
|
printf '%s' "$FLEET_BODY" | grep -q '"claimed"[[:space:]]*:[[:space:]]*true' || return 2
|
|
|
|
local jid body_md epoch repo base_branch verify automerge=""
|
|
jid=$(printf '%s' "$FLEET_BODY" | _json_str id)
|
|
body_md=$(printf '%s' "$FLEET_BODY" | _json_str bodyMd)
|
|
epoch=$(printf '%s' "$FLEET_BODY" | _fleet_json_num leaseEpoch)
|
|
repo=$(printf '%s' "$FLEET_BODY" | _json_str repo)
|
|
base_branch=$(printf '%s' "$FLEET_BODY" | _json_str baseBranch)
|
|
verify=$(printf '%s' "$FLEET_BODY" | _json_str verify)
|
|
printf '%s' "$FLEET_BODY" | grep -q '"autoMerge"[[:space:]]*:[[:space:]]*true' && automerge=true
|
|
[[ -n "$jid" ]] || { err "fleet: claim returned no job id"; return 1; }
|
|
|
|
# Materialize a transient local job .md (same approach as from-tracker) so the
|
|
# existing runner executes a coordinator job unchanged. fleet-job-id +
|
|
# fleet-lease-epoch travel in frontmatter -> the job meta (see cmd_run).
|
|
local safe tmpdir tmp
|
|
safe=$(printf '%s' "$jid" | tr -c 'A-Za-z0-9._-' '_')
|
|
tmpdir=$(mktemp -d "${TMPDIR:-/tmp}/aq-fleet.XXXXXX")
|
|
tmp="$tmpdir/fleet-$safe.md"
|
|
{
|
|
echo "---"
|
|
echo "cwd: $AQ_FLEET_CWD"
|
|
echo "yolo: true"
|
|
echo "fleet-job-id: $jid"
|
|
echo "fleet-lease-epoch: ${epoch:-0}"
|
|
[[ -n "$repo" ]] && echo "fleet-repo: $repo"
|
|
[[ -n "$base_branch" ]] && echo "fleet-base-branch: $base_branch"
|
|
# Per-repo verify command (drives the existing verify gate) + auto-merge flag.
|
|
[[ -n "$verify" ]] && echo "verify: $verify"
|
|
[[ -n "$automerge" ]] && echo "fleet-automerge: true"
|
|
echo "idempotency-key: fleet-$jid"
|
|
echo "---"
|
|
echo
|
|
printf '%s\n' "$body_md"
|
|
} > "$tmp"
|
|
cmd_add "$tmp" >/dev/null 2>&1
|
|
rm -rf "$tmpdir"
|
|
log "fleet: claimed job $C_BOLD$jid$C_RESET (leaseEpoch=${epoch:-0})"
|
|
return 0
|
|
}
|
|
|
|
# ── Report a fenced stage transition ────────────────────────────────
|
|
# fleet_report <job> <stage> [with-checkpoint] -> 0 ok, 2 FENCED (stale epoch:
|
|
# caller must self-abort), 1 degraded (coordinator unreachable: continue locally).
|
|
fleet_report() {
|
|
fleet_enabled || return 0
|
|
local job=$1 stage=$2 with_ckpt=${3:-} metaf jid epoch
|
|
metaf="$STATE/$job.meta"
|
|
jid=$(_meta_val "$metaf" fleet_job_id); epoch=$(_meta_val "$metaf" fleet_lease_epoch)
|
|
[[ -n "$jid" ]] || return 0
|
|
local ckpt=""
|
|
if [[ -n "$with_ckpt" ]]; then
|
|
local wb wc; wb=$(_meta_val "$metaf" wip_branch); wc=$(_meta_val "$metaf" wip_commit)
|
|
if [[ -n "$wb" ]]; then
|
|
ckpt=",\"checkpoint\":{\"wipBranch\":\"$(_json_escape "$wb")\""
|
|
[[ -n "$wc" ]] && ckpt+=",\"wipCommit\":\"$(_json_escape "$wc")\""
|
|
ckpt+="}"
|
|
fi
|
|
fi
|
|
# payload carries ONLY {stage, leaseEpoch, checkpoint} — never bodyMd/prompt/token.
|
|
_fleet_call PATCH "/fleet/jobs/$jid" "{\"stage\":\"$stage\",\"leaseEpoch\":${epoch:-0}$ckpt}"
|
|
case "$FLEET_CODE" in
|
|
2*) echo "fleet_reported=$stage" >> "$metaf"; return 0;;
|
|
409|412) err "fleet: FENCED reporting stage=$stage (stale leaseEpoch=$epoch) — self-aborting $job"
|
|
echo "fleet_fenced=1" >> "$metaf"; return 2;;
|
|
*) err "fleet: report stage=$stage failed (HTTP ${FLEET_CODE:-error}) — offline-degrade, continuing locally"
|
|
echo "fleet_degraded=1" >> "$metaf"; return 1;;
|
|
esac
|
|
}
|
|
|
|
# fleet_lease_renew <job> -> extend the lease; 0 ok, 2 fenced, 1 degraded.
|
|
fleet_lease_renew() {
|
|
fleet_enabled || return 0
|
|
local job=$1 metaf jid epoch
|
|
metaf="$STATE/$job.meta"
|
|
jid=$(_meta_val "$metaf" fleet_job_id); epoch=$(_meta_val "$metaf" fleet_lease_epoch)
|
|
[[ -n "$jid" ]] || return 0
|
|
_fleet_call POST "/fleet/jobs/$jid/lease/renew" "{\"leaseEpoch\":${epoch:-0},\"leaseSeconds\":${AQ_FLEET_LEASE_SECONDS:-900}}"
|
|
case "$FLEET_CODE" in
|
|
2*) return 0;;
|
|
409|412) echo "fleet_fenced=1" >> "$metaf"; return 2;;
|
|
*) return 1;;
|
|
esac
|
|
}
|
|
|
|
# fleet_lease_release <job> [stage] -> best-effort release on a terminal stage.
|
|
fleet_lease_release() {
|
|
fleet_enabled || return 0
|
|
local job=$1 stage=${2:-} metaf jid epoch body
|
|
metaf="$STATE/$job.meta"
|
|
jid=$(_meta_val "$metaf" fleet_job_id); epoch=$(_meta_val "$metaf" fleet_lease_epoch)
|
|
[[ -n "$jid" ]] || return 0
|
|
body="{\"leaseEpoch\":${epoch:-0}"
|
|
[[ -n "$stage" ]] && body+=",\"stage\":\"$stage\""
|
|
body+="}"
|
|
_fleet_call POST "/fleet/jobs/$jid/lease/release" "$body"
|
|
return 0
|
|
}
|
|
|
|
# fleet_report_insights <job> [result] — report the run's cost/token/effort metrics
|
|
# (parsed by parse_usage into the job meta) to the coordinator, recorded on the
|
|
# current run. Also releases the held lease (the agent has finished its work unit).
|
|
# Best-effort: never blocks the loop. Engines that don't expose usage locally
|
|
# (e.g. devin) simply omit token/cost fields; `result` + endedAt still land.
|
|
fleet_report_insights() {
|
|
fleet_enabled || return 0
|
|
local job=$1 result=${2:-} metaf jid epoch
|
|
metaf="$STATE/$job.meta"
|
|
jid=$(_meta_val "$metaf" fleet_job_id); epoch=$(_meta_val "$metaf" fleet_lease_epoch)
|
|
[[ -n "$jid" ]] || return 0
|
|
local model ti to tc cost turns tools est ins=""
|
|
model=$(_meta_val "$metaf" model)
|
|
ti=$(_meta_val "$metaf" tokens_in); to=$(_meta_val "$metaf" tokens_out)
|
|
tc=$(_meta_val "$metaf" tokens_cached); cost=$(_meta_val "$metaf" cost_usd)
|
|
turns=$(_meta_val "$metaf" turns); tools=$(_meta_val "$metaf" tool_calls)
|
|
est=$(_meta_val "$metaf" usage_estimated)
|
|
[[ -n "$model" ]] && ins+=",\"model\":\"$(_json_escape "$model")\""
|
|
[[ "$ti" =~ ^[0-9]+$ ]] && ins+=",\"tokensIn\":$ti"
|
|
[[ "$to" =~ ^[0-9]+$ ]] && ins+=",\"tokensOut\":$to"
|
|
[[ "$tc" =~ ^[0-9]+$ ]] && ins+=",\"tokensCached\":$tc"
|
|
[[ "$cost" =~ ^[0-9]+(\.[0-9]+)?$ ]] && ins+=",\"costUsd\":$cost"
|
|
[[ "$turns" =~ ^[0-9]+$ ]] && ins+=",\"turns\":$turns"
|
|
[[ "$tools" =~ ^[0-9]+$ ]] && ins+=",\"toolCalls\":$tools"
|
|
[[ "$est" == "true" || "$est" == "1" ]] && ins+=",\"estimated\":true"
|
|
local pr_url pr_branch pr_state
|
|
pr_url=$(_meta_val "$metaf" pr_url); pr_branch=$(_meta_val "$metaf" pr_branch)
|
|
pr_state=$(_meta_val "$metaf" pr_state)
|
|
local body="{\"leaseEpoch\":${epoch:-0}"
|
|
[[ -n "$ins" ]] && body+=",\"insights\":{${ins#,}}"
|
|
[[ -n "$result" ]] && body+=",\"result\":\"$(_json_escape "$result")\""
|
|
[[ -n "$pr_url" ]] && body+=",\"prUrl\":\"$(_json_escape "$pr_url")\""
|
|
[[ -n "$pr_branch" ]] && body+=",\"branch\":\"$(_json_escape "$pr_branch")\""
|
|
[[ -n "$pr_state" ]] && body+=",\"prState\":\"$(_json_escape "$pr_state")\""
|
|
body+="}"
|
|
_fleet_call POST "/fleet/jobs/$jid/lease/release" "$body"
|
|
return 0
|
|
}
|
|
|
|
# fleet_renew_active — renew leases for all in-flight (building/) fleet jobs.
|
|
fleet_renew_active() {
|
|
fleet_enabled || return 0
|
|
local f job
|
|
for f in "$BUILDING"/*.md; do
|
|
[[ -e "$f" ]] || continue
|
|
job=$(basename "$f"); job=${job%.md}
|
|
_fleet_is_job "$job" && { fleet_lease_renew "$job" >/dev/null 2>&1 || true; }
|
|
done
|
|
return 0
|
|
}
|
|
|
|
# fleet_quarantine <job> <file> <metaf> <logf> — a fenced (reclaimed) worker must
|
|
# NOT ship: park the local result in failed/ for human triage (§9 split-brain).
|
|
fleet_quarantine() {
|
|
local job=$1 file=$2 metaf=$3 logf=$4
|
|
{
|
|
echo "FLEET FENCED — the coordinator reclaimed this job (stale leaseEpoch)."
|
|
echo "Quarantining the local result — NOT shipping/merging. Needs human triage. ($(date))"
|
|
} >> "$logf"
|
|
[[ -e "$file" ]] && mv "$file" "$FAILED/" 2>/dev/null
|
|
{ echo "result=fenced_quarantine"; echo "fleet_quarantined=1"; echo "ended=$(date +%s)"; } >> "$metaf"
|
|
err "fleet: quarantined $job (fenced/reclaimed) — surfaced for human triage"
|
|
}
|
|
|
|
# _fleet_stage_for <result> -> the coordinator stage for a job result/stage.
|
|
_fleet_stage_for() {
|
|
case "$1" in
|
|
shipped) echo shipped;;
|
|
testing) echo testing;;
|
|
review) echo review;;
|
|
failed|timeout|verify_failed|retries_exhausted|capability_mismatch|no_engine|rejected) echo failed;;
|
|
*) echo building;;
|
|
esac
|
|
}
|
|
|
|
# ── Slice 4: shadow / dual-run (strictly side-effect-free on real job state) ──
|
|
|
|
# _fleet_shadow_log -> path to the structured shadow-divergence log.
|
|
_fleet_shadow_log() { printf '%s\n' "${AQ_FLEET_SHADOW_LOG:-$STATE/fleet-shadow.log}"; }
|
|
|
|
# fleet_shadow_claim — ask the coordinator what it WOULD assign for this factory's
|
|
# capabilities, read-only. Side-effect-free on real job state, by construction:
|
|
# * uses an ISOLATED shadow factoryId (never the real one), so it can't take a
|
|
# job away from the real factory's identity;
|
|
# * sends "dryRun":true,"shadow":true — a coordinator that honors it never
|
|
# assigns (purely returns the would-be job);
|
|
# * if the coordinator DID assign anyway (no dry-run support), the lease is
|
|
# released immediately so no real assignment persists;
|
|
# * the would-be job is NEVER materialized / run / reported / shipped locally.
|
|
# Sets SHADOW_COORD_JOB to the would-be job id ("" = none). Best-effort: any error
|
|
# is recorded as SHADOW_ERROR and swallowed — shadow must NEVER fail a real job.
|
|
fleet_shadow_claim() {
|
|
SHADOW_COORD_JOB=""
|
|
fleet_shadow_enabled || return 0
|
|
local caps body; caps=$(fleet_detect_caps)
|
|
body="{\"factoryId\":\"$(_json_escape "$AQ_FLEET_SHADOW_FACTORY_ID")\",\"capabilities\":$caps,\"leaseSeconds\":${AQ_FLEET_LEASE_SECONDS:-900},\"dryRun\":true,\"shadow\":true}"
|
|
_fleet_call POST "/fleet/claim" "$body"
|
|
case "$FLEET_CODE" in
|
|
2*) : ;;
|
|
*) printf '%s\t%s\t%s\t%s\n' "$(date +%s)" "<none>" "<none>" "SHADOW_ERROR(claim:HTTP_${FLEET_CODE:-error})" \
|
|
>> "$(_fleet_shadow_log)" 2>/dev/null || true
|
|
return 0 ;;
|
|
esac
|
|
printf '%s' "$FLEET_BODY" | grep -q '"claimed"[[:space:]]*:[[:space:]]*true' || return 0
|
|
local jid epoch
|
|
jid=$(printf '%s' "$FLEET_BODY" | _json_str id)
|
|
epoch=$(printf '%s' "$FLEET_BODY" | _fleet_json_num leaseEpoch)
|
|
SHADOW_COORD_JOB="$jid"
|
|
# Undo any REAL lease the coordinator may have created (no dry-run support) so
|
|
# the shadow probe leaves zero residue. Best-effort, response ignored.
|
|
if [[ -n "$jid" ]]; then
|
|
_fleet_call POST "/fleet/jobs/$jid/lease/release" "{\"leaseEpoch\":${epoch:-0},\"shadow\":true}" >/dev/null 2>&1 || true
|
|
fi
|
|
return 0
|
|
}
|
|
|
|
# fleet_shadow_compare <localJobId> <coordJobId> — classify the local (authoritative)
|
|
# decision against the coordinator's would-be decision and append a structured line
|
|
# (ts<TAB>localJob<TAB>coordJob<TAB>verdict) to the shadow log. Verdicts:
|
|
# AGREE | DIVERGE | COORD_EMPTY | LOCAL_EMPTY. Both-empty is a no-op (nothing to compare).
|
|
fleet_shadow_compare() {
|
|
fleet_shadow_enabled || return 0
|
|
local lj=${1:-} cj=${2:-} verdict
|
|
if [[ -z "$lj" && -z "$cj" ]]; then return 0; fi
|
|
if [[ -n "$lj" && -z "$cj" ]]; then verdict=COORD_EMPTY
|
|
elif [[ -z "$lj" && -n "$cj" ]]; then verdict=LOCAL_EMPTY
|
|
elif [[ "$lj" == "$cj" ]]; then verdict=AGREE
|
|
else verdict=DIVERGE; fi
|
|
printf '%s\t%s\t%s\t%s\n' "$(date +%s)" "${lj:-<none>}" "${cj:-<none>}" "$verdict" \
|
|
>> "$(_fleet_shadow_log)" 2>/dev/null || true
|
|
return 0
|
|
}
|
|
|
|
# fleet_shadow_report <localJobId> <coordJobId> [stage] — mirror a stage transition
|
|
# to the coordinator as a SHADOW event ("shadow":true,"dryRun":true) so the report
|
|
# path is EXERCISED, but the coordinator response is NEVER acted on (no fence /
|
|
# quarantine / state change) — divergence (e.g. 409) is only logged. Targets the
|
|
# would-be coordinator job id; a no-op when there is none. Best-effort + swallowed.
|
|
fleet_shadow_report() {
|
|
fleet_shadow_enabled || return 0
|
|
local lj=${1:-} cj=${2:-} stage=${3:-building}
|
|
[[ -n "$cj" ]] || return 0
|
|
_fleet_call PATCH "/fleet/jobs/$cj" "{\"stage\":\"$(_json_escape "$stage")\",\"shadow\":true,\"dryRun\":true}"
|
|
case "${FLEET_CODE:-}" in
|
|
2*) : ;;
|
|
*) printf '%s\t%s\t%s\t%s\n' "$(date +%s)" "${lj:-<none>}" "${cj:-<none>}" "SHADOW_REPORT_DIVERGE(HTTP_${FLEET_CODE:-error})" \
|
|
>> "$(_fleet_shadow_log)" 2>/dev/null || true ;;
|
|
esac
|
|
return 0
|
|
}
|
|
|
|
# fleet-shadow-report — summarize the shadow log: per-verdict counts, agreement
|
|
# rate, and the last N divergences. Read-only; safe regardless of the flags.
|
|
cmd_fleet_shadow_report() {
|
|
ensure_dirs
|
|
local n=10 logf; logf=$(_fleet_shadow_log)
|
|
[[ "${1:-}" =~ ^[0-9]+$ ]] && n=$1
|
|
if [[ ! -s "$logf" ]]; then
|
|
log "fleet shadow: no shadow log yet ($logf)."
|
|
log "fleet shadow: run with AQ_FLEET=1 AQ_FLEET_ROUTE=0 AQ_FLEET_SHADOW=1 to record divergence."
|
|
return 0
|
|
fi
|
|
log "fleet shadow report ($logf):"
|
|
awk -F'\t' '
|
|
{ v=$4; sub(/\(.*/, "", v); c[v]++; tot++
|
|
if (v=="AGREE") ag++
|
|
if (v=="AGREE"||v=="DIVERGE"||v=="COORD_EMPTY"||v=="LOCAL_EMPTY") dec++ }
|
|
END {
|
|
split("AGREE DIVERGE COORD_EMPTY LOCAL_EMPTY SHADOW_ERROR SHADOW_REPORT_DIVERGE", ord, " ")
|
|
for (i=1; i<=6; i++) printf " %-22s %d\n", ord[i], c[ord[i]]+0
|
|
printf " %-22s %d\n", "TOTAL", tot+0
|
|
if (dec>0) printf " %-22s %d%%\n", "AGREEMENT", int(100*ag/dec)
|
|
}' "$logf"
|
|
log "last $n divergence/error events:"
|
|
grep -E "$(printf '\t')(DIVERGE|COORD_EMPTY|LOCAL_EMPTY|SHADOW_ERROR|SHADOW_REPORT_DIVERGE)" "$logf" 2>/dev/null \
|
|
| tail -n "$n" \
|
|
| awk -F'\t' '{ printf " ts=%s local=%s coord=%s verdict=%s\n", $1, $2, $3, $4 }' || true
|
|
return 0
|
|
}
|
|
|
|
# fleet-status — heartbeat (register) + print this factory's identity/caps + flags.
|
|
cmd_fleet_status() {
|
|
ensure_dirs
|
|
if ! fleet_enabled; then
|
|
log "fleet: AQ_FLEET is off — running in offline git-queue mode (no coordinator)."
|
|
return 0
|
|
fi
|
|
log "fleet: factory=$C_BOLD$AQ_FACTORY_ID$C_RESET api=$AQ_FLEET_API"
|
|
log "fleet: flags=$(fleet_flags_state)"
|
|
fleet_flags_warn_once
|
|
log "fleet: capabilities=$(fleet_detect_caps)"
|
|
if fleet_shadow_enabled; then
|
|
log "fleet: SHADOW/dual-run mode — local inbox is authoritative; coordinator queried for comparison only (never acted on)."
|
|
elif ! fleet_route_enabled; then
|
|
log "fleet: ROUTE off — local inbox is authoritative; coordinator not used to source work."
|
|
fi
|
|
if fleet_heartbeat; then
|
|
log "fleet: heartbeat OK (registered)."
|
|
else
|
|
err "fleet: coordinator unreachable — would run in offline-degrade mode."
|
|
fi
|
|
}
|