feat(agent-queue): resilient lease renewal + graceful drain

Two runner-side reliability improvements (additive, opt-out via env where relevant):

- Lease-renewal retry: a renewal lost to a transient blip (timeout/5xx/proxy)
  previously let the lease expire and the coordinator reclaim a still-running job,
  wasting the work. fleet_lease_renew now retries a TRANSIENT failure a few times
  with a short backoff (AQ_FLEET_RENEW_RETRIES=2, AQ_FLEET_RENEW_BACKOFF_SEC=2);
  a 409/412 FENCE is terminal and never retried.

- Graceful drain: a new `drain` command signals a running loop (via a $STATE/draining
  flag) to stop taking NEW work (coordinator claim + local inbox), let in-flight
  jobs finish, release their leases, and exit cleanly — ideal before a deploy.
  Distinct from `stop` (kills workers immediately) and the `--drain`/--once startup
  flag (drains the queue to empty). The flag is cleared at run-loop start and on exit.

bash -n clean on both files; ./selftest.sh PASS; `drain` smoke-tested.

Generated with [Devin](https://cli.devin.ai/docs)

Co-Authored-By: Devin <158243242+devin-ai-integration[bot]@users.noreply.github.com>
This commit is contained in:
saravanakumardb1 2026-06-01 12:24:45 -07:00
parent 14308fc382
commit 2ae1af1930
2 changed files with 57 additions and 9 deletions

View File

@ -1691,10 +1691,17 @@ cmd_run() {
# Fleet (§8): register with the coordinator (registration == first heartbeat). # Fleet (§8): register with the coordinator (registration == first heartbeat).
fleet_enabled && fleet_heartbeat fleet_enabled && fleet_heartbeat
fleet_flags_warn_once # §16: warn once if ROUTE=1 + SHADOW=1 (ROUTE wins, shadow off) fleet_flags_warn_once # §16: warn once if ROUTE=1 + SHADOW=1 (ROUTE wins, shadow off)
rm -f "$STATE/draining" # clear any stale drain request from a prior run
while true; do while true; do
# continuously sweep for orphans (a worker that died mid-loop) # continuously sweep for orphans (a worker that died mid-loop)
recover_orphans recover_orphans
# Graceful drain (deploy-friendly): `agent-queue drain` drops this flag; we then
# stop taking NEW work (coordinator claim + local inbox), let in-flight jobs
# finish, release their leases, and exit cleanly. Distinct from `--drain`/--once,
# which drains the queue to empty at startup.
local draining=false
[[ -f "$STATE/draining" ]] && draining=true
# Fleet (§7/§8): heartbeat on cadence, renew leases for in-flight fleet jobs, # Fleet (§7/§8): heartbeat on cadence, renew leases for in-flight fleet jobs,
# and — if we have capacity — claim one coordinator job into inbox/ so the # and — if we have capacity — claim one coordinator job into inbox/ so the
# normal selection loop below executes it interleaved with local .md files. # normal selection loop below executes it interleaved with local .md files.
@ -1707,7 +1714,8 @@ cmd_run() {
# §M0 RU gate: when AQ_FLEET_GATE=1, skip the (expensive) claim while the # §M0 RU gate: when AQ_FLEET_GATE=1, skip the (expensive) claim while the
# cheap per-product queue version is unchanged and we are not mid-drain. # cheap per-product queue version is unchanged and we are not mid-drain.
# Gate off (default) -> fleet_gate_should_claim always true == prior behavior. # Gate off (default) -> fleet_gate_should_claim always true == prior behavior.
if fleet_route_enabled && [[ "$(active_workers)" -lt "$MAX_CONCURRENCY" ]] \ if [[ "$draining" != true ]] && fleet_route_enabled \
&& [[ "$(active_workers)" -lt "$MAX_CONCURRENCY" ]] \
&& fleet_gate_should_claim; then && fleet_gate_should_claim; then
local _crc=0 local _crc=0
fleet_claim >/dev/null 2>&1 || _crc=$? fleet_claim >/dev/null 2>&1 || _crc=$?
@ -1716,8 +1724,9 @@ cmd_run() {
fi fi
local shadow_local="" # the local (authoritative) job picked this iteration (for shadow compare) local shadow_local="" # the local (authoritative) job picked this iteration (for shadow compare)
local running; running=$(active_workers) local running; running=$(active_workers)
# launch jobs while we have capacity and an eligible inbox file # launch jobs while we have capacity and an eligible inbox file — but NOT while
while [[ "$running" -lt "$MAX_CONCURRENCY" ]]; do # draining (we let in-flight finish and take nothing new).
while [[ "$draining" != true ]] && [[ "$running" -lt "$MAX_CONCURRENCY" ]]; do
# pick by priority (critical→low) then age, skipping files whose lock key # pick by priority (critical→low) then age, skipping files whose lock key
# is currently busy, so two jobs sharing a cwd (or `lock:` key) never run # is currently busy, so two jobs sharing a cwd (or `lock:` key) never run
# at once regardless of --max. inbox_sorted replaces the old pure-FIFO sort. # at once regardless of --max. inbox_sorted replaces the old pure-FIFO sort.
@ -1816,6 +1825,14 @@ cmd_run() {
log "drain complete — no runnable work, no workers running"; rm -f "$STATE/daemon.pid"; exit 0 log "drain complete — no runnable work, no workers running"; rm -f "$STATE/daemon.pid"; exit 0
fi fi
fi fi
# Graceful drain requested: once every in-flight worker has finished, release any
# leftover leases and exit cleanly (a deploy can now restart us safely).
if [[ "$draining" == true ]] && [[ "$(active_workers)" -eq 0 ]]; then
fleet_release_all_active || true
rm -f "$STATE/draining" "$STATE/daemon.pid"
log "drain complete (graceful) — in-flight work finished, leases released"
exit 0
fi
sleep "$POLL_SECONDS" sleep "$POLL_SECONDS"
done done
} }
@ -1979,6 +1996,23 @@ cmd_stop() {
log "stopped $killed running worker(s) + run loop" log "stopped $killed running worker(s) + run loop"
} }
# cmd_drain — request a GRACEFUL drain of a running loop: stop taking new work
# (coordinator claim + local inbox), let in-flight jobs finish, release their
# leases, then exit. Unlike `stop` (which kills workers immediately), drain never
# interrupts running work — ideal before a deploy/restart. No-op-safe if no loop
# is running (the flag is cleared on the next `run`).
cmd_drain() {
ensure_dirs
: > "$STATE/draining"
local dpid=""
[[ -f "$STATE/daemon.pid" ]] && dpid=$(cat "$STATE/daemon.pid" 2>/dev/null)
if [[ -n "$dpid" ]] && kill -0 "$dpid" 2>/dev/null; then
log "drain requested — run loop (pid $dpid) will stop claiming, finish in-flight work, and exit"
else
log "drain flag set — no run loop is currently active (it will drain on next start until cleared)"
fi
}
cmd_logs() { cmd_logs() {
local job="${1:-}" follow="" local job="${1:-}" follow=""
[[ "${2:-}" == "-f" || "$job" == "-f" ]] && follow="-f" [[ "${2:-}" == "-f" || "$job" == "-f" ]] && follow="-f"
@ -2138,6 +2172,7 @@ ${C_BOLD}COMMANDS${C_RESET}
fleet-shadow-report [N] summarize the shadow/dual-run divergence log (counts, agreement, last N) fleet-shadow-report [N] summarize the shadow/dual-run divergence log (counts, agreement, last N)
dash [--interval N] richer live Node dashboard (recent shipped/failed too) dash [--interval N] richer live Node dashboard (recent shipped/failed too)
stop kill running workers + the run loop stop kill running workers + the run loop
drain graceful stop: finish in-flight work, release leases, exit
logs <job> [-f] print (or follow) a job's log logs <job> [-f] print (or follow) a job's log
promote <job> advance one stage (review → testing → shipped) promote <job> advance one stage (review → testing → shipped)
ship <job> manual gate: testing (QA) → shipped ship <job> manual gate: testing (QA) → shipped
@ -2229,6 +2264,7 @@ main() {
to-tracker) cmd_to_tracker "$@";; to-tracker) cmd_to_tracker "$@";;
dash|dashboard) cmd_dash "$@";; dash|dashboard) cmd_dash "$@";;
stop) cmd_stop "$@";; stop) cmd_stop "$@";;
drain) cmd_drain "$@";;
logs) cmd_logs "$@";; logs) cmd_logs "$@";;
promote) cmd_promote "$@";; promote) cmd_promote "$@";;
ship) cmd_ship "$@";; ship) cmd_ship "$@";;

View File

@ -316,18 +316,30 @@ fleet_report() {
} }
# fleet_lease_renew <job> -> extend the lease; 0 ok, 2 fenced, 1 degraded. # fleet_lease_renew <job> -> extend the lease; 0 ok, 2 fenced, 1 degraded.
# A renewal lost to a transient blip (timeout / 5xx / proxy) would let the lease
# expire and the coordinator reclaim a job that is still running, wasting the work.
# So retry a TRANSIENT failure a few times with a short backoff (well within the
# lease window); a 409/412 FENCE is terminal and never retried. Tunables:
# AQ_FLEET_RENEW_RETRIES (default 2 extra attempts), AQ_FLEET_RENEW_BACKOFF_SEC (2).
fleet_lease_renew() { fleet_lease_renew() {
fleet_enabled || return 0 fleet_enabled || return 0
local job=$1 metaf jid epoch local job=$1 metaf jid epoch
metaf="$STATE/$job.meta" metaf="$STATE/$job.meta"
jid=$(_meta_val "$metaf" fleet_job_id); epoch=$(_meta_val "$metaf" fleet_lease_epoch) jid=$(_meta_val "$metaf" fleet_job_id); epoch=$(_meta_val "$metaf" fleet_lease_epoch)
[[ -n "$jid" ]] || return 0 [[ -n "$jid" ]] || return 0
_fleet_call POST "/fleet/jobs/$jid/lease/renew" "{\"leaseEpoch\":${epoch:-0},\"leaseSeconds\":${AQ_FLEET_LEASE_SECONDS:-900}}" local retries="${AQ_FLEET_RENEW_RETRIES:-2}" backoff="${AQ_FLEET_RENEW_BACKOFF_SEC:-2}" i=0
case "$FLEET_CODE" in [[ "$retries" =~ ^[0-9]+$ ]] || retries=2
2*) return 0;; while :; do
409|412) echo "fleet_fenced=1" >> "$metaf"; return 2;; _fleet_call POST "/fleet/jobs/$jid/lease/renew" "{\"leaseEpoch\":${epoch:-0},\"leaseSeconds\":${AQ_FLEET_LEASE_SECONDS:-900}}"
*) return 1;; case "$FLEET_CODE" in
esac 2*) return 0;;
409|412) echo "fleet_fenced=1" >> "$metaf"; return 2;;
esac
# transient: retry up to $retries extra times before giving up degraded
[[ "$i" -ge "$retries" ]] && return 1
i=$((i + 1))
sleep "$backoff"
done
} }
# fleet_lease_release <job> [stage] -> best-effort release on a terminal stage. # fleet_lease_release <job> [stage] -> best-effort release on a terminal stage.