From 2ae1af1930a861a6552c1e7696dc5d22bc1ab79a Mon Sep 17 00:00:00 2001 From: saravanakumardb1 Date: Mon, 1 Jun 2026 12:24:45 -0700 Subject: [PATCH] feat(agent-queue): resilient lease renewal + graceful drain MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two runner-side reliability improvements (additive, opt-out via env where relevant): - Lease-renewal retry: a renewal lost to a transient blip (timeout/5xx/proxy) previously let the lease expire and the coordinator reclaim a still-running job, wasting the work. fleet_lease_renew now retries a TRANSIENT failure a few times with a short backoff (AQ_FLEET_RENEW_RETRIES=2, AQ_FLEET_RENEW_BACKOFF_SEC=2); a 409/412 FENCE is terminal and never retried. - Graceful drain: a new `drain` command signals a running loop (via a $STATE/draining flag) to stop taking NEW work (coordinator claim + local inbox), let in-flight jobs finish, release their leases, and exit cleanly — ideal before a deploy. Distinct from `stop` (kills workers immediately) and the `--drain`/--once startup flag (drains the queue to empty). The flag is cleared at run-loop start and on exit. bash -n clean on both files; ./selftest.sh PASS; `drain` smoke-tested. Generated with [Devin](https://cli.devin.ai/docs) Co-Authored-By: Devin <158243242+devin-ai-integration[bot]@users.noreply.github.com> --- agent-queue/agent-queue.sh | 42 ++++++++++++++++++++++++++++++--- agent-queue/lib/fleet-client.sh | 24 ++++++++++++++----- 2 files changed, 57 insertions(+), 9 deletions(-) diff --git a/agent-queue/agent-queue.sh b/agent-queue/agent-queue.sh index bd84c4f..ac13b29 100755 --- a/agent-queue/agent-queue.sh +++ b/agent-queue/agent-queue.sh @@ -1691,10 +1691,17 @@ cmd_run() { # Fleet (§8): register with the coordinator (registration == first heartbeat). fleet_enabled && fleet_heartbeat fleet_flags_warn_once # §16: warn once if ROUTE=1 + SHADOW=1 (ROUTE wins, shadow off) + rm -f "$STATE/draining" # clear any stale drain request from a prior run while true; do # continuously sweep for orphans (a worker that died mid-loop) recover_orphans + # Graceful drain (deploy-friendly): `agent-queue drain` drops this flag; we then + # stop taking NEW work (coordinator claim + local inbox), let in-flight jobs + # finish, release their leases, and exit cleanly. Distinct from `--drain`/--once, + # which drains the queue to empty at startup. + local draining=false + [[ -f "$STATE/draining" ]] && draining=true # Fleet (§7/§8): heartbeat on cadence, renew leases for in-flight fleet jobs, # and — if we have capacity — claim one coordinator job into inbox/ so the # normal selection loop below executes it interleaved with local .md files. @@ -1707,7 +1714,8 @@ cmd_run() { # §M0 RU gate: when AQ_FLEET_GATE=1, skip the (expensive) claim while the # cheap per-product queue version is unchanged and we are not mid-drain. # Gate off (default) -> fleet_gate_should_claim always true == prior behavior. - if fleet_route_enabled && [[ "$(active_workers)" -lt "$MAX_CONCURRENCY" ]] \ + if [[ "$draining" != true ]] && fleet_route_enabled \ + && [[ "$(active_workers)" -lt "$MAX_CONCURRENCY" ]] \ && fleet_gate_should_claim; then local _crc=0 fleet_claim >/dev/null 2>&1 || _crc=$? @@ -1716,8 +1724,9 @@ cmd_run() { fi local shadow_local="" # the local (authoritative) job picked this iteration (for shadow compare) local running; running=$(active_workers) - # launch jobs while we have capacity and an eligible inbox file - while [[ "$running" -lt "$MAX_CONCURRENCY" ]]; do + # launch jobs while we have capacity and an eligible inbox file — but NOT while + # draining (we let in-flight finish and take nothing new). + while [[ "$draining" != true ]] && [[ "$running" -lt "$MAX_CONCURRENCY" ]]; do # pick by priority (critical→low) then age, skipping files whose lock key # is currently busy, so two jobs sharing a cwd (or `lock:` key) never run # at once regardless of --max. inbox_sorted replaces the old pure-FIFO sort. @@ -1816,6 +1825,14 @@ cmd_run() { log "drain complete — no runnable work, no workers running"; rm -f "$STATE/daemon.pid"; exit 0 fi fi + # Graceful drain requested: once every in-flight worker has finished, release any + # leftover leases and exit cleanly (a deploy can now restart us safely). + if [[ "$draining" == true ]] && [[ "$(active_workers)" -eq 0 ]]; then + fleet_release_all_active || true + rm -f "$STATE/draining" "$STATE/daemon.pid" + log "drain complete (graceful) — in-flight work finished, leases released" + exit 0 + fi sleep "$POLL_SECONDS" done } @@ -1979,6 +1996,23 @@ cmd_stop() { log "stopped $killed running worker(s) + run loop" } +# cmd_drain — request a GRACEFUL drain of a running loop: stop taking new work +# (coordinator claim + local inbox), let in-flight jobs finish, release their +# leases, then exit. Unlike `stop` (which kills workers immediately), drain never +# interrupts running work — ideal before a deploy/restart. No-op-safe if no loop +# is running (the flag is cleared on the next `run`). +cmd_drain() { + ensure_dirs + : > "$STATE/draining" + local dpid="" + [[ -f "$STATE/daemon.pid" ]] && dpid=$(cat "$STATE/daemon.pid" 2>/dev/null) + if [[ -n "$dpid" ]] && kill -0 "$dpid" 2>/dev/null; then + log "drain requested — run loop (pid $dpid) will stop claiming, finish in-flight work, and exit" + else + log "drain flag set — no run loop is currently active (it will drain on next start until cleared)" + fi +} + cmd_logs() { local job="${1:-}" follow="" [[ "${2:-}" == "-f" || "$job" == "-f" ]] && follow="-f" @@ -2138,6 +2172,7 @@ ${C_BOLD}COMMANDS${C_RESET} fleet-shadow-report [N] summarize the shadow/dual-run divergence log (counts, agreement, last N) dash [--interval N] richer live Node dashboard (recent shipped/failed too) stop kill running workers + the run loop + drain graceful stop: finish in-flight work, release leases, exit logs [-f] print (or follow) a job's log promote advance one stage (review → testing → shipped) ship manual gate: testing (QA) → shipped @@ -2229,6 +2264,7 @@ main() { to-tracker) cmd_to_tracker "$@";; dash|dashboard) cmd_dash "$@";; stop) cmd_stop "$@";; + drain) cmd_drain "$@";; logs) cmd_logs "$@";; promote) cmd_promote "$@";; ship) cmd_ship "$@";; diff --git a/agent-queue/lib/fleet-client.sh b/agent-queue/lib/fleet-client.sh index df7d3c4..3e2c168 100644 --- a/agent-queue/lib/fleet-client.sh +++ b/agent-queue/lib/fleet-client.sh @@ -316,18 +316,30 @@ fleet_report() { } # fleet_lease_renew -> extend the lease; 0 ok, 2 fenced, 1 degraded. +# A renewal lost to a transient blip (timeout / 5xx / proxy) would let the lease +# expire and the coordinator reclaim a job that is still running, wasting the work. +# So retry a TRANSIENT failure a few times with a short backoff (well within the +# lease window); a 409/412 FENCE is terminal and never retried. Tunables: +# AQ_FLEET_RENEW_RETRIES (default 2 extra attempts), AQ_FLEET_RENEW_BACKOFF_SEC (2). fleet_lease_renew() { fleet_enabled || return 0 local job=$1 metaf jid epoch metaf="$STATE/$job.meta" jid=$(_meta_val "$metaf" fleet_job_id); epoch=$(_meta_val "$metaf" fleet_lease_epoch) [[ -n "$jid" ]] || return 0 - _fleet_call POST "/fleet/jobs/$jid/lease/renew" "{\"leaseEpoch\":${epoch:-0},\"leaseSeconds\":${AQ_FLEET_LEASE_SECONDS:-900}}" - case "$FLEET_CODE" in - 2*) return 0;; - 409|412) echo "fleet_fenced=1" >> "$metaf"; return 2;; - *) return 1;; - esac + local retries="${AQ_FLEET_RENEW_RETRIES:-2}" backoff="${AQ_FLEET_RENEW_BACKOFF_SEC:-2}" i=0 + [[ "$retries" =~ ^[0-9]+$ ]] || retries=2 + while :; do + _fleet_call POST "/fleet/jobs/$jid/lease/renew" "{\"leaseEpoch\":${epoch:-0},\"leaseSeconds\":${AQ_FLEET_LEASE_SECONDS:-900}}" + case "$FLEET_CODE" in + 2*) return 0;; + 409|412) echo "fleet_fenced=1" >> "$metaf"; return 2;; + esac + # transient: retry up to $retries extra times before giving up degraded + [[ "$i" -ge "$retries" ]] && return 1 + i=$((i + 1)) + sleep "$backoff" + done } # fleet_lease_release [stage] -> best-effort release on a terminal stage.