feat(agent-queue): wire runner to fleet coordinator at minimal hook points (P2-S3)

Sources lib/fleet-client.sh and adds a few fleet_enabled-gated hooks so the offline
git-queue path is byte-for-byte unchanged when AQ_FLEET is unset/0:

- cmd_run: register at loop start; per-iteration heartbeat (cadence) + lease renew
  for in-flight fleet jobs + claim one coordinator job into inbox when capacity.
- meta: persist fleet_job_id + fleet_lease_epoch (from claim frontmatter).
- run_worker: report `building` (with WIP checkpoint) after WIP setup and `review`
  before accepting the agent's output — a FENCED (stale-epoch/409) report self-aborts
  and quarantines (never ships); 5xx/unreachable degrades (finish locally).
- _auto_echo: for fleet jobs route the outcome echo through the coordinator
  (fleet_events) instead of the direct tracker echo; offline jobs unchanged.
- cmd_ship: fence-check before shipping a fleet job; release lease after.
- status: show factory id + per-job fleet=<id>@e<epoch>; insights lists fleet_* fields.
- dispatch + help: `fleet-status` command + a FLEET env section.
This commit is contained in:
saravanakumardb1 2026-05-29 22:43:21 -07:00
parent a10d4003e6
commit 1d84712b47

View File

@ -697,6 +697,17 @@ run_worker() {
trap '_worker_trap; exit 143' INT TERM trap '_worker_trap; exit 143' INT TERM
_wip_start "$job" "$cwd" "$metaf" "$logf" || true _wip_start "$job" "$cwd" "$metaf" "$logf" || true
# ── Fleet (§7/§18): report `building` (with WIP checkpoint) to the coordinator.
# If the lease is stale (we were reclaimed) the report is FENCED -> self-abort and
# quarantine WITHOUT running the agent. No-op for non-fleet jobs / flag off. ──
if fleet_enabled && _fleet_is_job "$job"; then
fleet_report "$job" building checkpoint; local _frc=$?
if [[ "$_frc" -eq 2 ]]; then
fleet_quarantine "$job" "$doing_file" "$metaf" "$logf"
return 0
fi
fi
_run_agent() { _run_agent() {
if [[ -n "$AGENT_STDIN" ]]; then if [[ -n "$AGENT_STDIN" ]]; then
( cd "$cwd" && "${AGENT_CMD[@]}" < "$AGENT_STDIN" ) ( cd "$cwd" && "${AGENT_CMD[@]}" < "$AGENT_STDIN" )
@ -764,6 +775,16 @@ run_worker() {
echo "TIMED OUT after ${tmo}s (rc=$rc): $(date)" >> "$logf" echo "TIMED OUT after ${tmo}s (rc=$rc): $(date)" >> "$logf"
_finish_failure "$job" "$doing_file" "$metaf" "$logf" "timeout" "$rc" "$started" _finish_failure "$job" "$doing_file" "$metaf" "$logf" "timeout" "$rc" "$started"
elif [[ $rc -eq 0 ]]; then elif [[ $rc -eq 0 ]]; then
# Fleet (§18): re-confirm the lease before accepting the agent's output. If the
# coordinator reclaimed us mid-run (offline-degrade then reconnect to a stale
# epoch), the report is FENCED -> quarantine the local result, NEVER ship.
if fleet_enabled && _fleet_is_job "$job"; then
fleet_report "$job" review checkpoint; local _rrc=$?
if [[ "$_rrc" -eq 2 ]]; then
fleet_quarantine "$job" "$doing_file" "$metaf" "$logf"
return 0
fi
fi
# Agent succeeded: land in review/, then run the auto-QA verify gate. The # Agent succeeded: land in review/, then run the auto-QA verify gate. The
# worker is still alive here so the concurrency slot stays held through # worker is still alive here so the concurrency slot stays held through
# verification — `ended=` is written only once we reach a resting stage. # verification — `ended=` is written only once we reach a resting stage.
@ -1263,11 +1284,20 @@ cmd_to_tracker() {
log "to-tracker: echoed $jn -> item $item_id (status=$status)" log "to-tracker: echoed $jn -> item $item_id (status=$status)"
} }
# _auto_echo <job> — opt-in (AQ_TRACKER_AUTO=1) best-effort echo on a transition. # _auto_echo <job> — best-effort outcome echo on a transition (never fatal).
# Never blocks or fails the job: the tracker is downstream, not authoritative. # Fleet mode (AQ_FLEET=1, fleet job): route the echo through the coordinator as a
# fenced stage report — `fleet_events` becomes the audit source of truth, so we do
# NOT also post to the tracker directly. Offline (non-fleet) jobs keep the Slice-4
# direct tracker echo (opt-in via AQ_TRACKER_AUTO).
_auto_echo() { _auto_echo() {
local job=$1
if declare -f fleet_enabled >/dev/null 2>&1 && fleet_enabled && _fleet_is_job "$job"; then
local result; result=$(_meta_val "$STATE/$job.meta" result)
fleet_report "$job" "$(_fleet_stage_for "$result")" >/dev/null 2>&1 || true
return 0
fi
[[ "$AQ_TRACKER_AUTO" == 1 ]] || return 0 [[ "$AQ_TRACKER_AUTO" == 1 ]] || return 0
cmd_to_tracker "$1" >/dev/null 2>&1 || true cmd_to_tracker "$job" >/dev/null 2>&1 || true
} }
cmd_init() { ensure_dirs; log "queue initialized at $C_BOLD$QUEUE_ROOT$C_RESET"; } cmd_init() { ensure_dirs; log "queue initialized at $C_BOLD$QUEUE_ROOT$C_RESET"; }
@ -1382,10 +1412,20 @@ cmd_run() {
# Crash recovery (§25.3): reclaim jobs orphaned in building/ by a previous # Crash recovery (§25.3): reclaim jobs orphaned in building/ by a previous
# crash/power-off before launching anything new. # crash/power-off before launching anything new.
recover_orphans recover_orphans
# Fleet (§8): register with the coordinator (registration == first heartbeat).
fleet_enabled && fleet_heartbeat
while true; do while true; do
# continuously sweep for orphans (a worker that died mid-loop) # continuously sweep for orphans (a worker that died mid-loop)
recover_orphans recover_orphans
# Fleet (§7/§8): heartbeat on cadence, renew leases for in-flight fleet jobs,
# and — if we have capacity — claim one coordinator job into inbox/ so the
# normal selection loop below executes it interleaved with local .md files.
if fleet_enabled; then
fleet_heartbeat_maybe
fleet_renew_active
[[ "$(active_workers)" -lt "$MAX_CONCURRENCY" ]] && { fleet_claim >/dev/null 2>&1 || true; }
fi
local running; running=$(active_workers) local running; running=$(active_workers)
# launch jobs while we have capacity and an eligible inbox file # launch jobs while we have capacity and an eligible inbox file
while [[ "$running" -lt "$MAX_CONCURRENCY" ]]; do while [[ "$running" -lt "$MAX_CONCURRENCY" ]]; do
@ -1452,6 +1492,8 @@ cmd_run() {
echo "review_policy=$(fm_eff "$doing_file" review-policy "" review-policy)" echo "review_policy=$(fm_eff "$doing_file" review-policy "" review-policy)"
echo "artifacts=$(fm_get "$doing_file" artifacts "")" echo "artifacts=$(fm_get "$doing_file" artifacts "")"
echo "tracker_item=$(fm_get "$doing_file" tracker-item "")" echo "tracker_item=$(fm_get "$doing_file" tracker-item "")"
echo "fleet_job_id=$(fm_get "$doing_file" fleet-job-id "")"
echo "fleet_lease_epoch=$(fm_get "$doing_file" fleet-lease-epoch "")"
} > "$STATE/$job.meta" } > "$STATE/$job.meta"
run_worker "$doing_file" & run_worker "$doing_file" &
{ echo "pid=$!"; echo "pidstart=$(_pidstart "$!")"; } >> "$STATE/$job.meta" { echo "pid=$!"; echo "pidstart=$(_pidstart "$!")"; } >> "$STATE/$job.meta"
@ -1482,6 +1524,9 @@ cmd_status() {
local running; running=$(active_workers) local running; running=$(active_workers)
echo echo
printf '%s AGENT QUEUE %s %s\n' "$C_BOLD" "$C_DIM$QUEUE_ROOT$C_RESET" "" printf '%s AGENT QUEUE %s %s\n' "$C_BOLD" "$C_DIM$QUEUE_ROOT$C_RESET" ""
if fleet_enabled; then
printf ' %sFLEET%s factory=%s api=%s\n' "$C_CYAN" "$C_RESET" "$AQ_FACTORY_ID" "$AQ_FLEET_API"
fi
printf ' %sinbox%s %-3s %sbuilding%s %-3s %sreview%s %-3s %stesting%s %-3s %sshipped%s %-3s %sfailed%s %-3s %srunning%s %s/%s\n\n' \ printf ' %sinbox%s %-3s %sbuilding%s %-3s %sreview%s %-3s %stesting%s %-3s %sshipped%s %-3s %sfailed%s %-3s %srunning%s %s/%s\n\n' \
"$C_BLUE" "$C_RESET" "$ib" "$C_YEL" "$C_RESET" "$bd" \ "$C_BLUE" "$C_RESET" "$ib" "$C_YEL" "$C_RESET" "$bd" \
"$C_CYAN" "$C_RESET" "$rv" "$C_CYAN" "$C_RESET" "$ts" \ "$C_CYAN" "$C_RESET" "$rv" "$C_CYAN" "$C_RESET" "$ts" \
@ -1511,12 +1556,16 @@ cmd_status() {
local m_prio m_prof m_caps m_trk extra="" local m_prio m_prof m_caps m_trk extra=""
m_prio=$(grep '^priority=' "$f" | cut -d= -f2-); m_prof=$(grep '^profile=' "$f" | cut -d= -f2-) m_prio=$(grep '^priority=' "$f" | cut -d= -f2-); m_prof=$(grep '^profile=' "$f" | cut -d= -f2-)
m_caps=$(grep '^capabilities=' "$f" | cut -d= -f2-); m_trk=$(grep '^tracker_item=' "$f" | cut -d= -f2-) m_caps=$(grep '^capabilities=' "$f" | cut -d= -f2-); m_trk=$(grep '^tracker_item=' "$f" | cut -d= -f2-)
local m_echo; m_echo=$(grep '^tracker_echoed=' "$f" | tail -1 | cut -d= -f2-) local m_echo m_fleet m_epoch
m_echo=$(grep '^tracker_echoed=' "$f" | tail -1 | cut -d= -f2-)
m_fleet=$(grep '^fleet_job_id=' "$f" | tail -1 | cut -d= -f2-)
m_epoch=$(grep '^fleet_lease_epoch=' "$f" | tail -1 | cut -d= -f2-)
[[ -n "$m_prio" ]] && extra+="prio=$m_prio " [[ -n "$m_prio" ]] && extra+="prio=$m_prio "
[[ -n "$m_prof" ]] && extra+="profile=$m_prof " [[ -n "$m_prof" ]] && extra+="profile=$m_prof "
[[ -n "$m_caps" ]] && extra+="caps=$m_caps " [[ -n "$m_caps" ]] && extra+="caps=$m_caps "
[[ -n "$m_trk" ]] && extra+="tracker=$m_trk " [[ -n "$m_trk" ]] && extra+="tracker=$m_trk "
[[ -n "$m_echo" ]] && extra+="echoed=$m_echo " [[ -n "$m_echo" ]] && extra+="echoed=$m_echo "
[[ -n "$m_fleet" ]] && extra+="fleet=$m_fleet@e${m_epoch:-0} "
[[ -n "$extra" ]] && printf ' %s%s%s\n' "$C_DIM" "$extra" "$C_RESET" [[ -n "$extra" ]] && printf ' %s%s%s\n' "$C_DIM" "$extra" "$C_RESET"
printf ' %s%s%s\n' "$C_DIM" "$(_insights_line "$f")" "$C_RESET" printf ' %s%s%s\n' "$C_DIM" "$(_insights_line "$f")" "$C_RESET"
done done
@ -1559,7 +1608,8 @@ cmd_insights() {
for k in engine result attempts started ended duration_s exit verify_exit \ for k in engine result attempts started ended duration_s exit verify_exit \
model tokens_in tokens_out tokens_cached cost_usd turns tool_calls usage_estimated \ model tokens_in tokens_out tokens_cached cost_usd turns tool_calls usage_estimated \
files_changed lines_added lines_deleted wip_branch wip_base wip_commit \ files_changed lines_added lines_deleted wip_branch wip_base wip_commit \
next_eligible retry_class recovered tracker_item tracker_echoed tracker_echoed_at; do next_eligible retry_class recovered tracker_item tracker_echoed tracker_echoed_at \
fleet_job_id fleet_lease_epoch fleet_reported fleet_fenced fleet_degraded fleet_quarantined; do
val=$(_meta_val "$f" "$k") val=$(_meta_val "$f" "$k")
[[ -n "$val" ]] && printf ' %-15s %s\n' "$k" "$val" [[ -n "$val" ]] && printf ' %-15s %s\n' "$k" "$val"
done done
@ -1669,10 +1719,23 @@ cmd_ship() {
local f; f=$(_find_job "$job" "$TESTING") local f; f=$(_find_job "$job" "$TESTING")
[[ -n "$f" ]] || die "no job in testing/ matching '$job' (only QA-passed jobs can ship)" [[ -n "$f" ]] || die "no job in testing/ matching '$job' (only QA-passed jobs can ship)"
local base name; base=$(basename "$f"); name=${base%.md} local base name; base=$(basename "$f"); name=${base%.md}
# Fleet (§18): a fleet job may only ship if we still hold the lease. A fenced
# report means the coordinator reclaimed it -> quarantine instead of shipping.
if fleet_enabled && _fleet_is_job "$name"; then
fleet_report "$name" shipped checkpoint; local _src=$?
if [[ "$_src" -eq 2 ]]; then
fleet_quarantine "$name" "$f" "$STATE/$name.meta" "$LOGS/$name.log"
return 0
fi
fi
mv "$f" "$SHIPPED/$base" mv "$f" "$SHIPPED/$base"
[[ -f "$STATE/$name.meta" ]] && echo "result=shipped" >> "$STATE/$name.meta" [[ -f "$STATE/$name.meta" ]] && echo "result=shipped" >> "$STATE/$name.meta"
log "shipped $C_BOLD$base$C_RESET (testing → shipped)" log "shipped $C_BOLD$base$C_RESET (testing → shipped)"
_auto_echo "$name" _auto_echo "$name"
if fleet_enabled && _fleet_is_job "$name"; then
fleet_lease_release "$name" shipped >/dev/null 2>&1 || true
fi
return 0
} }
# promote <job> — advance one stage forward: review → testing → shipped. # promote <job> — advance one stage forward: review → testing → shipped.
@ -1764,6 +1827,7 @@ ${C_BOLD}COMMANDS${C_RESET}
recover reclaim orphaned building/ jobs (dead worker) -> inbox recover reclaim orphaned building/ jobs (dead worker) -> inbox
from-tracker <ITEM_ID> pull a tracker Item -> materialize a job in inbox/ (§10) from-tracker <ITEM_ID> pull a tracker Item -> materialize a job in inbox/ (§10)
to-tracker <job> echo a job's outcome to its tracker Item (one-way) to-tracker <job> echo a job's outcome to its tracker Item (one-way)
fleet-status (AQ_FLEET=1) register/heartbeat with the coordinator + show identity
dash [--interval N] richer live Node dashboard (recent shipped/failed too) dash [--interval N] richer live Node dashboard (recent shipped/failed too)
stop kill running workers + the run loop stop kill running workers + the run loop
logs <job> [-f] print (or follow) a job's log logs <job> [-f] print (or follow) a job's log
@ -1819,13 +1883,25 @@ ${C_BOLD}TRACKER${C_RESET} (§10 — from-tracker / to-tracker; real use needs p
AQ_TRACKER_AUTO=1 to auto-echo outcomes on each transition (default OFF) AQ_TRACKER_AUTO=1 to auto-echo outcomes on each transition (default OFF)
AQ_TRACKER_CWD (cwd for tracker-derived jobs) AQ_TRACKER_API_CMD (test stub seam) AQ_TRACKER_CWD (cwd for tracker-derived jobs) AQ_TRACKER_API_CMD (test stub seam)
label hints on an Item: engine-class:<x> profile:<x> priority:<x> cap:<token> label hints on an Item: engine-class:<x> profile:<x> priority:<x> cap:<token>
${C_BOLD}FLEET${C_RESET} (Phase 2 — coordinator integration; OFF by default = offline git-queue)
AQ_FLEET=1 to enable; AQ_FLEET_API (=$AQ_FLEET_API) AQ_FLEET_TOKEN (bearer) AQ_PRODUCT_ID
AQ_FACTORY_ID (=$AQ_FACTORY_ID) AQ_FLEET_LEASE_RENEW_SEC AQ_FLEET_CAPS AQ_FLEET_CWD
When on, 'run' registers + claims coordinator jobs (interleaved with local .md), reports
fenced stage transitions, renews leases, and quarantines a job if it is reclaimed (fenced).
EOF EOF
} }
# Fleet coordinator client (Phase 2). All functions no-op unless AQ_FLEET=1, so the
# offline git-queue path above is unchanged when the flag is off.
# shellcheck source=lib/fleet-client.sh
[[ -f "$SCRIPT_DIR/lib/fleet-client.sh" ]] && source "$SCRIPT_DIR/lib/fleet-client.sh"
main() { main() {
local cmd="${1:-help}"; shift || true local cmd="${1:-help}"; shift || true
case "$cmd" in case "$cmd" in
init) cmd_init "$@";; init) cmd_init "$@";;
fleet-status) cmd_fleet_status "$@";;
add) cmd_add "$@";; add) cmd_add "$@";;
run) cmd_run "$@";; run) cmd_run "$@";;
status) cmd_status "$@";; status) cmd_status "$@";;