diff --git a/agent-queue/agent-queue.sh b/agent-queue/agent-queue.sh index 4046dd6..91fc63e 100755 --- a/agent-queue/agent-queue.sh +++ b/agent-queue/agent-queue.sh @@ -697,6 +697,17 @@ run_worker() { trap '_worker_trap; exit 143' INT TERM _wip_start "$job" "$cwd" "$metaf" "$logf" || true + # ── Fleet (§7/§18): report `building` (with WIP checkpoint) to the coordinator. + # If the lease is stale (we were reclaimed) the report is FENCED -> self-abort and + # quarantine WITHOUT running the agent. No-op for non-fleet jobs / flag off. ── + if fleet_enabled && _fleet_is_job "$job"; then + fleet_report "$job" building checkpoint; local _frc=$? + if [[ "$_frc" -eq 2 ]]; then + fleet_quarantine "$job" "$doing_file" "$metaf" "$logf" + return 0 + fi + fi + _run_agent() { if [[ -n "$AGENT_STDIN" ]]; then ( cd "$cwd" && "${AGENT_CMD[@]}" < "$AGENT_STDIN" ) @@ -764,6 +775,16 @@ run_worker() { echo "TIMED OUT after ${tmo}s (rc=$rc): $(date)" >> "$logf" _finish_failure "$job" "$doing_file" "$metaf" "$logf" "timeout" "$rc" "$started" elif [[ $rc -eq 0 ]]; then + # Fleet (§18): re-confirm the lease before accepting the agent's output. If the + # coordinator reclaimed us mid-run (offline-degrade then reconnect to a stale + # epoch), the report is FENCED -> quarantine the local result, NEVER ship. + if fleet_enabled && _fleet_is_job "$job"; then + fleet_report "$job" review checkpoint; local _rrc=$? + if [[ "$_rrc" -eq 2 ]]; then + fleet_quarantine "$job" "$doing_file" "$metaf" "$logf" + return 0 + fi + fi # Agent succeeded: land in review/, then run the auto-QA verify gate. The # worker is still alive here so the concurrency slot stays held through # verification — `ended=` is written only once we reach a resting stage. @@ -1263,11 +1284,20 @@ cmd_to_tracker() { log "to-tracker: echoed $jn -> item $item_id (status=$status)" } -# _auto_echo — opt-in (AQ_TRACKER_AUTO=1) best-effort echo on a transition. -# Never blocks or fails the job: the tracker is downstream, not authoritative. +# _auto_echo — best-effort outcome echo on a transition (never fatal). +# Fleet mode (AQ_FLEET=1, fleet job): route the echo through the coordinator as a +# fenced stage report — `fleet_events` becomes the audit source of truth, so we do +# NOT also post to the tracker directly. Offline (non-fleet) jobs keep the Slice-4 +# direct tracker echo (opt-in via AQ_TRACKER_AUTO). _auto_echo() { + local job=$1 + if declare -f fleet_enabled >/dev/null 2>&1 && fleet_enabled && _fleet_is_job "$job"; then + local result; result=$(_meta_val "$STATE/$job.meta" result) + fleet_report "$job" "$(_fleet_stage_for "$result")" >/dev/null 2>&1 || true + return 0 + fi [[ "$AQ_TRACKER_AUTO" == 1 ]] || return 0 - cmd_to_tracker "$1" >/dev/null 2>&1 || true + cmd_to_tracker "$job" >/dev/null 2>&1 || true } cmd_init() { ensure_dirs; log "queue initialized at $C_BOLD$QUEUE_ROOT$C_RESET"; } @@ -1382,10 +1412,20 @@ cmd_run() { # Crash recovery (§25.3): reclaim jobs orphaned in building/ by a previous # crash/power-off before launching anything new. recover_orphans + # Fleet (§8): register with the coordinator (registration == first heartbeat). + fleet_enabled && fleet_heartbeat while true; do # continuously sweep for orphans (a worker that died mid-loop) recover_orphans + # Fleet (§7/§8): heartbeat on cadence, renew leases for in-flight fleet jobs, + # and — if we have capacity — claim one coordinator job into inbox/ so the + # normal selection loop below executes it interleaved with local .md files. + if fleet_enabled; then + fleet_heartbeat_maybe + fleet_renew_active + [[ "$(active_workers)" -lt "$MAX_CONCURRENCY" ]] && { fleet_claim >/dev/null 2>&1 || true; } + fi local running; running=$(active_workers) # launch jobs while we have capacity and an eligible inbox file while [[ "$running" -lt "$MAX_CONCURRENCY" ]]; do @@ -1452,6 +1492,8 @@ cmd_run() { echo "review_policy=$(fm_eff "$doing_file" review-policy "" review-policy)" echo "artifacts=$(fm_get "$doing_file" artifacts "")" echo "tracker_item=$(fm_get "$doing_file" tracker-item "")" + echo "fleet_job_id=$(fm_get "$doing_file" fleet-job-id "")" + echo "fleet_lease_epoch=$(fm_get "$doing_file" fleet-lease-epoch "")" } > "$STATE/$job.meta" run_worker "$doing_file" & { echo "pid=$!"; echo "pidstart=$(_pidstart "$!")"; } >> "$STATE/$job.meta" @@ -1482,6 +1524,9 @@ cmd_status() { local running; running=$(active_workers) echo printf '%s AGENT QUEUE %s %s\n' "$C_BOLD" "$C_DIM$QUEUE_ROOT$C_RESET" "" + if fleet_enabled; then + printf ' %sFLEET%s factory=%s api=%s\n' "$C_CYAN" "$C_RESET" "$AQ_FACTORY_ID" "$AQ_FLEET_API" + fi printf ' %sinbox%s %-3s %sbuilding%s %-3s %sreview%s %-3s %stesting%s %-3s %sshipped%s %-3s %sfailed%s %-3s %srunning%s %s/%s\n\n' \ "$C_BLUE" "$C_RESET" "$ib" "$C_YEL" "$C_RESET" "$bd" \ "$C_CYAN" "$C_RESET" "$rv" "$C_CYAN" "$C_RESET" "$ts" \ @@ -1511,12 +1556,16 @@ cmd_status() { local m_prio m_prof m_caps m_trk extra="" m_prio=$(grep '^priority=' "$f" | cut -d= -f2-); m_prof=$(grep '^profile=' "$f" | cut -d= -f2-) m_caps=$(grep '^capabilities=' "$f" | cut -d= -f2-); m_trk=$(grep '^tracker_item=' "$f" | cut -d= -f2-) - local m_echo; m_echo=$(grep '^tracker_echoed=' "$f" | tail -1 | cut -d= -f2-) + local m_echo m_fleet m_epoch + m_echo=$(grep '^tracker_echoed=' "$f" | tail -1 | cut -d= -f2-) + m_fleet=$(grep '^fleet_job_id=' "$f" | tail -1 | cut -d= -f2-) + m_epoch=$(grep '^fleet_lease_epoch=' "$f" | tail -1 | cut -d= -f2-) [[ -n "$m_prio" ]] && extra+="prio=$m_prio " [[ -n "$m_prof" ]] && extra+="profile=$m_prof " [[ -n "$m_caps" ]] && extra+="caps=$m_caps " [[ -n "$m_trk" ]] && extra+="tracker=$m_trk " [[ -n "$m_echo" ]] && extra+="echoed=$m_echo " + [[ -n "$m_fleet" ]] && extra+="fleet=$m_fleet@e${m_epoch:-0} " [[ -n "$extra" ]] && printf ' %s%s%s\n' "$C_DIM" "$extra" "$C_RESET" printf ' %s%s%s\n' "$C_DIM" "$(_insights_line "$f")" "$C_RESET" done @@ -1559,7 +1608,8 @@ cmd_insights() { for k in engine result attempts started ended duration_s exit verify_exit \ model tokens_in tokens_out tokens_cached cost_usd turns tool_calls usage_estimated \ files_changed lines_added lines_deleted wip_branch wip_base wip_commit \ - next_eligible retry_class recovered tracker_item tracker_echoed tracker_echoed_at; do + next_eligible retry_class recovered tracker_item tracker_echoed tracker_echoed_at \ + fleet_job_id fleet_lease_epoch fleet_reported fleet_fenced fleet_degraded fleet_quarantined; do val=$(_meta_val "$f" "$k") [[ -n "$val" ]] && printf ' %-15s %s\n' "$k" "$val" done @@ -1669,10 +1719,23 @@ cmd_ship() { local f; f=$(_find_job "$job" "$TESTING") [[ -n "$f" ]] || die "no job in testing/ matching '$job' (only QA-passed jobs can ship)" local base name; base=$(basename "$f"); name=${base%.md} + # Fleet (§18): a fleet job may only ship if we still hold the lease. A fenced + # report means the coordinator reclaimed it -> quarantine instead of shipping. + if fleet_enabled && _fleet_is_job "$name"; then + fleet_report "$name" shipped checkpoint; local _src=$? + if [[ "$_src" -eq 2 ]]; then + fleet_quarantine "$name" "$f" "$STATE/$name.meta" "$LOGS/$name.log" + return 0 + fi + fi mv "$f" "$SHIPPED/$base" [[ -f "$STATE/$name.meta" ]] && echo "result=shipped" >> "$STATE/$name.meta" log "shipped $C_BOLD$base$C_RESET (testing → shipped)" _auto_echo "$name" + if fleet_enabled && _fleet_is_job "$name"; then + fleet_lease_release "$name" shipped >/dev/null 2>&1 || true + fi + return 0 } # promote — advance one stage forward: review → testing → shipped. @@ -1764,6 +1827,7 @@ ${C_BOLD}COMMANDS${C_RESET} recover reclaim orphaned building/ jobs (dead worker) -> inbox from-tracker pull a tracker Item -> materialize a job in inbox/ (§10) to-tracker echo a job's outcome to its tracker Item (one-way) + fleet-status (AQ_FLEET=1) register/heartbeat with the coordinator + show identity dash [--interval N] richer live Node dashboard (recent shipped/failed too) stop kill running workers + the run loop logs [-f] print (or follow) a job's log @@ -1819,13 +1883,25 @@ ${C_BOLD}TRACKER${C_RESET} (§10 — from-tracker / to-tracker; real use needs p AQ_TRACKER_AUTO=1 to auto-echo outcomes on each transition (default OFF) AQ_TRACKER_CWD (cwd for tracker-derived jobs) AQ_TRACKER_API_CMD (test stub seam) label hints on an Item: engine-class: profile: priority: cap: + +${C_BOLD}FLEET${C_RESET} (Phase 2 — coordinator integration; OFF by default = offline git-queue) + AQ_FLEET=1 to enable; AQ_FLEET_API (=$AQ_FLEET_API) AQ_FLEET_TOKEN (bearer) AQ_PRODUCT_ID + AQ_FACTORY_ID (=$AQ_FACTORY_ID) AQ_FLEET_LEASE_RENEW_SEC AQ_FLEET_CAPS AQ_FLEET_CWD + When on, 'run' registers + claims coordinator jobs (interleaved with local .md), reports + fenced stage transitions, renews leases, and quarantines a job if it is reclaimed (fenced). EOF } +# Fleet coordinator client (Phase 2). All functions no-op unless AQ_FLEET=1, so the +# offline git-queue path above is unchanged when the flag is off. +# shellcheck source=lib/fleet-client.sh +[[ -f "$SCRIPT_DIR/lib/fleet-client.sh" ]] && source "$SCRIPT_DIR/lib/fleet-client.sh" + main() { local cmd="${1:-help}"; shift || true case "$cmd" in init) cmd_init "$@";; + fleet-status) cmd_fleet_status "$@";; add) cmd_add "$@";; run) cmd_run "$@";; status) cmd_status "$@";;