bytelyst-devops-tools/agent-queue/agent-queue.sh
saravanakumardb1 1758bc1ab1 feat(agent-queue): single-host crash recovery, WIP checkpoint/resume, retry + insights (P1-S3)
Implements the single-host bash equivalents of roadmap §25 (durability/crash
recovery) and §26 (execution insights), plus §11 retry/dead-letter stand-in.

Resilience (A1-A4):
- recover_orphans + `recover` command: building/ jobs with a dead worker (dead
  pid, pidstart reuse-guard) are moved back to inbox/ with attempts incremented,
  on `run` startup and each loop. Idempotent (folder location is the guard).
- WIP checkpointing: for a git cwd, _wip_start creates/checks out aq/wip/<job>
  and _wip_checkpoint commits changes on every exit path via an EXIT/INT/TERM
  trap; never commits to main/current branch; non-git cwd skipped. RESUME: a
  relaunch whose aq/wip/<job> exists checks it out first (continue from
  checkpoint). wip_base persisted in a write-once sidecar.
- retry policy (now functional): retry { max, backoff, on } requeues failures
  whose class (timeout|verify_failed|crash) is in `on`, honoring backoff via
  next_eligible (selection skips until eligible), up to max attempts; exhaustion
  -> failed/ result=retries_exhausted with the WIP branch + full log preserved.
- state integrity: all meta writes stay append-only; attempts/next_eligible/wip_*
  are re-derivable; recovery is crash-safe.

Insights (B1-B6):
- per-run metrics into meta: duration_s, exit, result, attempts, and (git cwd)
  files_changed/lines_added/lines_deleted from numstat wip_base..HEAD.
- parse_usage(engine, log) adapter: generic AQ_USAGE line + Claude/Codex token
  heuristics; Devin/Copilot TODO; usage_estimated flag; never fabricates numbers.
- status insights sub-line; new `insights [job]` command (per-job metrics or a
  recent table + per-engine token/cost/success/duration rollup).
- privacy: only metrics are recorded, never prompt content or secrets.

Backward-compatible: legacy .md and non-git cwd behave exactly as before.
2026-05-29 18:43:21 -07:00

1370 lines
58 KiB
Bash
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env bash
#
# agent-queue — a folder-based "kanban" runner for headless coding-agent CLIs.
#
# Drop a prompt .md file into queue/inbox/, and `agent-queue run` will:
# 1. pick the oldest file (respecting --max concurrency),
# 2. move it inbox/ -> building/,
# 3. launch the chosen agent CLI (devin | claude | codex) in --yolo mode,
# 4. on agent rc=0 move building/ -> review/, then run the auto-QA verify gate:
# verify pass -> testing/ verify fail -> failed/ (no verify -> stays in review/)
# 5. on agent failure/timeout move building/ -> failed/,
# 6. you manually `ship` testing/ -> shipped/ (the human gate),
# 7. write a per-job log + live state so `status`/`watch` can show progress.
#
# Lifecycle: inbox -> building -> review -> testing -> shipped (+ failed)
#
# Per-task config travels in YAML-ish frontmatter at the top of the .md:
# ---
# engine: devin # devin | claude | codex (default: $DEFAULT_ENGINE)
# cwd: /abs/path/repo # where the agent runs (default: $PWD when added)
# yolo: true # auto-approve all tools (default: true)
# ---
#
# Subcommands: init | add | run | status | watch | dash | stop | logs |
# promote | ship | reject | requeue | clean | help
#
set -uo pipefail
# ── Resolve paths ───────────────────────────────────────────────────
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
QUEUE_ROOT="${AGENT_QUEUE_ROOT:-$SCRIPT_DIR/queue}"
INBOX="$QUEUE_ROOT/inbox"
BUILDING="$QUEUE_ROOT/building"
REVIEW="$QUEUE_ROOT/review"
TESTING="$QUEUE_ROOT/testing"
SHIPPED="$QUEUE_ROOT/shipped"
FAILED="$QUEUE_ROOT/failed"
LOGS="$QUEUE_ROOT/logs"
STATE="$QUEUE_ROOT/.state"
LOCKS="$QUEUE_ROOT/locks"
# ── Config (env-overridable) ────────────────────────────────────────
MAX_CONCURRENCY="${AGENT_QUEUE_MAX:-3}"
DEFAULT_ENGINE="${AGENT_QUEUE_ENGINE:-devin}"
POLL_SECONDS="${AGENT_QUEUE_POLL:-3}"
# A running worker is flagged "stalled" if its log has not changed in this many
# minutes (no new agent output) — surfaced in status + dash.
STALL_MIN="${AGENT_QUEUE_STALL_MIN:-10}"
# Auto-QA verify command. After an agent exits 0 the job lands in review/; if a
# verify command is set (frontmatter `verify:` overrides this default) it runs in
# the job's cwd: pass -> testing/ (QA), fail -> failed/. Empty default = jobs park
# in review/ for manual `promote`. Shipping (testing -> shipped) is always manual.
DEFAULT_VERIFY="${AGENT_QUEUE_VERIFY:-}"
# flock is used for cross-process lock hardening when available (Linux). macOS
# has no flock; mutual exclusion there relies on the single run-loop (see cmd_run).
FLOCK_BIN="${FLOCK_BIN:-$(command -v flock || true)}"
# timeout/gtimeout give hard process-tree kills for per-job timeouts; if absent
# (stock macOS) a pure-bash watchdog is used as a best-effort fallback.
TIMEOUT_BIN="${TIMEOUT_BIN:-$(command -v timeout || command -v gtimeout || true)}"
DEVIN_BIN="${DEVIN_BIN:-$(command -v devin || echo "$HOME/.local/bin/devin")}"
CLAUDE_BIN="${CLAUDE_BIN:-$(command -v claude || echo claude)}"
CODEX_BIN="${CODEX_BIN:-$(command -v codex || echo codex)}"
COPILOT_BIN="${COPILOT_BIN:-$(command -v copilot || echo copilot)}"
# ── Colors ──────────────────────────────────────────────────────────
if [[ -t 1 ]]; then
C_RESET=$'\033[0m'; C_DIM=$'\033[2m'; C_BOLD=$'\033[1m'
C_BLUE=$'\033[34m'; C_GREEN=$'\033[32m'; C_RED=$'\033[31m'; C_YEL=$'\033[33m'; C_CYAN=$'\033[36m'
else
C_RESET=""; C_DIM=""; C_BOLD=""; C_BLUE=""; C_GREEN=""; C_RED=""; C_YEL=""; C_CYAN=""
fi
log() { printf '%s[agent-queue]%s %s\n' "$C_CYAN" "$C_RESET" "$*"; }
err() { printf '%s[agent-queue]%s %s\n' "$C_RED" "$C_RESET" "$*" >&2; }
die() { err "$*"; exit 1; }
# ── Init ────────────────────────────────────────────────────────────
ensure_dirs() { mkdir -p "$INBOX" "$BUILDING" "$REVIEW" "$TESTING" "$SHIPPED" "$FAILED" "$LOGS" "$STATE" "$LOCKS"; }
# ── Frontmatter parsing ─────────────────────────────────────────────
# fm_get <file> <key> <default>
fm_get() {
local file=$1 key=$2 def=${3:-}
local val
# only scan a leading --- ... --- block
val=$(awk -v k="$key" '
NR==1 && $0!="---" { exit }
NR==1 { infm=1; next }
infm && $0=="---" { exit }
infm {
line=$0
sub(/^[ \t]*/,"",line)
if (line ~ "^" k "[ \t]*:") {
sub("^" k "[ \t]*:[ \t]*","",line)
gsub(/^["'\''[:space:]]+|["'\''[:space:]]+$/,"",line)
print line; exit
}
}' "$file" 2>/dev/null)
[[ -n "$val" ]] && printf '%s' "$val" || printf '%s' "$def"
}
# strip_frontmatter <file> -> prints the body (everything after a leading ---..--- block)
strip_frontmatter() {
awk 'NR==1 && $0=="---" { infm=1; next }
infm && $0=="---" { infm=0; next }
{ if (!infm) print }' "$1"
}
# lock_key_for <file> -> the mutual-exclusion key for a job: frontmatter `lock:`
# if set, otherwise the cwd. Jobs sharing a key never run concurrently.
lock_key_for() {
local f=$1 k
k=$(fm_get "$f" lock "")
[[ -n "$k" ]] && { printf '%s' "$k"; return; }
fm_get "$f" cwd "$PWD"
}
# _keyhash <key> -> stable filename-safe token for a lock key
_keyhash() { printf '%s' "$1" | cksum | awk '{print $1}'; }
# _mtime <file> -> file modification time in epoch seconds (BSD or GNU stat); empty if missing
_mtime() {
[[ -e "$1" ]] || { echo ""; return; }
stat -f %m "$1" 2>/dev/null || stat -c %Y "$1" 2>/dev/null || echo ""
}
# _pidstart <pid> -> the process start time as reported by ps (whitespace-normalized).
# Used as an identity token so a recycled pid is never mistaken for our worker.
_pidstart() { ps -o lstart= -p "$1" 2>/dev/null | awk '{$1=$1;print}'; }
# _pid_alive <pid> <pidstart> -> 0 if the pid is live AND (when a start time was
# recorded) its current start time still matches — defeating pid reuse.
_pid_alive() {
local pid=$1 want=$2 cur
[[ -n "$pid" ]] || return 1
kill -0 "$pid" 2>/dev/null || return 1
[[ -z "$want" ]] && return 0
cur=$(_pidstart "$pid")
[[ "$cur" == "$want" ]]
}
# _dur_to_secs <dur> -> seconds. Accepts 90, 90s, 45m, 2h, 1d. Invalid/empty -> 0.
_dur_to_secs() {
local d=$1
[[ -z "$d" || "$d" == "0" ]] && { echo 0; return; }
if [[ "$d" =~ ^([0-9]+)([smhd]?)$ ]]; then
local n=${BASH_REMATCH[1]} u=${BASH_REMATCH[2]}
case "$u" in
""|s) echo "$n";;
m) echo $((n*60));;
h) echo $((n*3600));;
d) echo $((n*86400));;
esac
else
echo 0
fi
}
# _meta_active <metafile> -> 0 if the job is occupying a concurrency slot.
# Active = no `ended=` AND (pid is live, OR pid not yet written but the meta was
# created moments ago — the reserved-slot window between meta-write and launch).
# The <30s guard prevents a meta orphaned mid-launch (daemon killed in the gap)
# from pinning a slot forever.
_meta_active() {
local f=$1 pid mt age
grep -q '^ended=' "$f" && return 1
pid=$(grep '^pid=' "$f" | head -1 | cut -d= -f2)
if [[ -n "$pid" ]]; then
local pidstart; pidstart=$(grep '^pidstart=' "$f" | head -1 | cut -d= -f2-)
_pid_alive "$pid" "$pidstart"
return $?
fi
mt=$(_mtime "$f"); age=$(( $(date +%s) - ${mt:-0} ))
[[ "$age" -lt 30 ]]
}
# active_workers -> count of jobs occupying a concurrency slot (reservation-aware).
active_workers() {
local n=0 f
for f in "$STATE"/*.meta; do
[[ -e "$f" ]] || continue
_meta_active "$f" && n=$((n+1))
done
echo "$n"
}
# busy_keys -> newline list of lock keys currently held by active workers.
busy_keys() {
local f
for f in "$STATE"/*.meta; do
[[ -e "$f" ]] || continue
_meta_active "$f" && grep '^lock=' "$f" | head -1 | cut -d= -f2-
done
}
# ── Manifest helpers (Phase 1 — §5/§6/§7/§8 single-host subset) ──────
#
# parse_list <raw> -> one token per line. Accepts a YAML-ish inline list
# ("[a, b]"), comma- or space-separated values, with surrounding quotes
# stripped. Empty tokens are dropped. Used for `capabilities`, `prefers`,
# `prefers-engine`, `deps`, etc.
parse_list() {
local raw cleaned t
cleaned=$(printf '%s' "${1:-}" | tr '[],' ' ')
for t in $cleaned; do
t=${t#[\"\']}; t=${t%[\"\']}
[[ -n "$t" ]] && printf '%s\n' "$t"
done
}
# _body_hash <file> -> stable content hash of the frontmatter-STRIPPED body.
# Drives idempotency-key dedupe on `add`. Prefers shasum/sha1sum, falls back
# to cksum so it works on a bare macOS/Linux host with no extra tools.
_body_hash() {
local f=$1
if command -v shasum >/dev/null 2>&1; then
strip_frontmatter "$f" | shasum | awk '{print $1}'
elif command -v sha1sum >/dev/null 2>&1; then
strip_frontmatter "$f" | sha1sum | awk '{print $1}'
else
strip_frontmatter "$f" | cksum | awk '{print $1"-"$2}'
fi
}
# priority_rank <priority> -> sort rank (lower = picked first). Unknown/empty
# defaults to medium. Used by inbox_sorted for priority-then-age selection.
priority_rank() {
case "$1" in
critical) echo 0;; high) echo 1;; medium) echo 2;; low) echo 3;; *) echo 2;;
esac
}
# inbox_sorted -> inbox/*.md ordered by priority (critical first) then oldest
# (filename embeds the queue timestamp, so a plain string sort = age order).
# Replaces the pure-FIFO `ls | sort`; per-lock serialization is unchanged
# (the caller still skips files whose lock key is busy).
inbox_sorted() {
local f p r
for f in "$INBOX"/*.md; do
[[ -e "$f" ]] || continue
p=$(fm_get "$f" priority medium); r=$(priority_rank "$p")
printf '%s\t%s\n' "$r" "$f"
done | sort -t"$(printf '\t')" -k1,1n -k2,2 | cut -f2-
}
# engine_available <engine> -> 0 if a runnable binary exists for the engine
# (executable path or a name resolvable on PATH). Honors the *_BIN overrides
# (so the self-test stub counts as "available").
engine_available() {
local eng=$1 bin=""
case "$eng" in
devin) bin=$DEVIN_BIN;;
claude) bin=$CLAUDE_BIN;;
codex) bin=$CODEX_BIN;;
copilot) bin=$COPILOT_BIN;;
*) return 1;;
esac
[[ -n "$bin" ]] || return 1
[[ -x "$bin" ]] && return 0
command -v "$bin" >/dev/null 2>&1
}
# engine_class_engines <class> -> candidate engines (priority order) for an
# engine-class (§5 taxonomy). `review-only` has no concrete mapping yet
# (reserved). Unknown class -> empty.
engine_class_engines() {
case "$1" in
agentic-coder) echo "devin claude codex";;
chat-coder) echo "copilot";;
review-only) echo "";;
*) echo "";;
esac
}
# detect_capabilities -> one capability token per line describing THIS host:
# os:<mac|linux|other> (os:any required-side wildcard matches this)
# engine:<devin|claude|codex|copilot> for each available engine
# node:<major> the host node major (compared by node<op>N)
# has:<git|pnpm|docker> tool probes (only emitted when present)
detect_capabilities() {
case "$(uname -s 2>/dev/null)" in
Darwin) echo "os:mac";;
Linux) echo "os:linux";;
*) echo "os:other";;
esac
local e
for e in devin claude codex copilot; do
engine_available "$e" && echo "engine:$e"
done
if command -v node >/dev/null 2>&1; then
local nv; nv=$(node -v 2>/dev/null | sed 's/^v//' | cut -d. -f1)
[[ "$nv" =~ ^[0-9]+$ ]] && echo "node:$nv"
fi
local t
for t in git pnpm docker; do
command -v "$t" >/dev/null 2>&1 && echo "has:$t"
done
}
# _cap_token_ok <required-token> <available-tokens> -> 0 if satisfied (§5 grammar):
# key:any wildcard — host advertises any "key:*" token (os:any matches all)
# key<op>ver numeric/semver-major compare against the host's "key:<val>" token
# (op in >= > = <= <)
# key:value exact token match
# key bare presence — exact, or advertised in any "key:"/"key<op>" form
_cap_token_ok() {
local tok=$1 avail=$2 a
if [[ "$tok" == *:any ]]; then
local wkey=${tok%:any}
for a in $avail; do [[ "$a" == "$wkey":* ]] && return 0; done
return 1
fi
if [[ "$tok" =~ ^([A-Za-z0-9_.:-]+)(\>=|\<=|=|\>|\<)([0-9][0-9.]*)$ ]]; then
local key=${BASH_REMATCH[1]} op=${BASH_REMATCH[2]} want=${BASH_REMATCH[3]} hostval=""
for a in $avail; do [[ "$a" == "$key":* ]] && { hostval=${a#*:}; break; }; done
[[ -n "$hostval" ]] || return 1
local hv=${hostval%%.*} wv=${want%%.*}
[[ "$hv" =~ ^[0-9]+$ && "$wv" =~ ^[0-9]+$ ]] || return 1
case "$op" in
">=") [[ "$hv" -ge "$wv" ]];;
">") [[ "$hv" -gt "$wv" ]];;
"=") [[ "$hv" -eq "$wv" ]];;
"<=") [[ "$hv" -le "$wv" ]];;
"<") [[ "$hv" -lt "$wv" ]];;
esac
return $?
fi
if [[ "$tok" == *:* ]]; then
for a in $avail; do [[ "$a" == "$tok" ]] && return 0; done
return 1
fi
for a in $avail; do
case "$a" in "$tok"|"$tok":*|"$tok"\>=*|"$tok"\>*|"$tok"=*|"$tok"\<=*|"$tok"\<*) return 0;; esac
done
return 1
}
# caps_match <required-tokens> <available-tokens> -> 0 iff EVERY required token
# is satisfied by the available set (both passed as whitespace-separated lists).
caps_match() {
local required=$1 available=$2 r
for r in $required; do
_cap_token_ok "$r" "$available" || return 1
done
return 0
}
# resolve_engine <file> -> the concrete engine to run:
# explicit `engine:` always wins; else `engine-class` picks the first
# available engine honoring `prefers-engine` then the class default order;
# else the global default engine. Prints "" only when an engine-class was
# requested but no engine in it is available (caller fails with no_engine).
resolve_engine() {
local f=$1 eng cls prefers
eng=$(fm_get "$f" engine "")
if [[ -n "$eng" ]]; then printf '%s' "$eng"; return 0; fi
cls=$(fm_get "$f" engine-class "")
if [[ -z "$cls" ]]; then printf '%s' "$DEFAULT_ENGINE"; return 0; fi
local class_engines; class_engines=$(engine_class_engines "$cls")
prefers=$(fm_get "$f" prefers-engine "")
local ordered=() seen=" " p c
if [[ -n "$prefers" ]]; then
while IFS= read -r p; do
[[ -n "$p" ]] || continue
case " $class_engines " in *" $p "*) ordered+=("$p"); seen+="$p ";; esac
done < <(parse_list "$prefers")
fi
for c in $class_engines; do
case "$seen" in *" $c "*) ;; *) ordered+=("$c"); seen+="$c ";; esac
done
local cand
for cand in ${ordered[@]+"${ordered[@]}"}; do
engine_available "$cand" && { printf '%s' "$cand"; return 0; }
done
printf '%s' ""
}
# ── Engine driver: builds argv into AGENT_CMD[]; sets AGENT_STDIN if the ──
# prompt should be fed on stdin (claude/codex) rather than a flag. $pf is the
# frontmatter-STRIPPED body file, so a body starting with '--' is never
# misparsed as a CLI option.
build_agent_cmd() {
local engine=$1 pf=$2 yolo=$3
AGENT_CMD=(); AGENT_STDIN=""
case "$engine" in
devin)
AGENT_CMD=( "$DEVIN_BIN" -p --prompt-file "$pf" )
[[ "$yolo" == "true" ]] && AGENT_CMD+=( --permission-mode dangerous )
;;
claude)
AGENT_CMD=( "$CLAUDE_BIN" -p )
[[ "$yolo" == "true" ]] && AGENT_CMD+=( --dangerously-skip-permissions )
AGENT_STDIN="$pf"
;;
codex)
AGENT_CMD=( "$CODEX_BIN" exec )
[[ "$yolo" == "true" ]] && AGENT_CMD+=( --dangerously-bypass-approvals-and-sandbox )
AGENT_STDIN="$pf"
;;
copilot)
# Best-effort GitHub Copilot CLI mapping for the chat-coder engine-class.
# Flags drift between CLI versions — this is the single place to edit.
AGENT_CMD=( "$COPILOT_BIN" -p )
[[ "$yolo" == "true" ]] && AGENT_CMD+=( --allow-all-tools )
AGENT_STDIN="$pf"
;;
*) die "unknown engine '$engine' (use: devin | claude | codex | copilot)";;
esac
}
# ── Worker: runs one job to completion (invoked in background) ───────
run_worker() {
local doing_file=$1
local job; job=$(basename "$doing_file")
job=${job%.md}
local engine cwd yolo logf metaf
cwd=$(fm_get "$doing_file" cwd "$PWD")
yolo=$(fm_get "$doing_file" yolo "true")
logf="$LOGS/$job.log"
metaf="$STATE/$job.meta"
# NOTE: the parent (cmd_run) creates $metaf with job/engine/cwd/started/pid.
# The worker only ever APPENDS (ended/exit/result) to avoid a truncation race.
# ── Capability gate (§5/§8 single-host): if the job declares `capabilities`
# this host does not satisfy, route to failed/ WITHOUT launching the agent. ──
local req_caps; req_caps=$(parse_list "$(fm_get "$doing_file" capabilities "")" | tr '\n' ' ')
if [[ -n "${req_caps// /}" ]]; then
local avail; avail=$(detect_capabilities)
if ! caps_match "$req_caps" "$avail"; then
{
echo "===== agent-queue job: $job ====="
echo "CAPABILITY MISMATCH — host cannot satisfy required: $req_caps"
echo "host advertises: $(printf '%s' "$avail" | tr '\n' ' ')"
echo "agent NOT launched — routed to failed/ ($(date))"
} >> "$logf"
mv "$doing_file" "$FAILED/" 2>/dev/null
{ echo "result=capability_mismatch"; echo "ended=$(date +%s)"; } >> "$metaf"
return 0
fi
fi
# ── Engine resolution (§5): explicit `engine` wins, else `engine-class`.
# No available engine for a requested class -> fail (no_engine), no launch. ──
engine=$(resolve_engine "$doing_file")
if [[ -z "$engine" ]]; then
{
echo "===== agent-queue job: $job ====="
echo "NO ENGINE — engine-class '$(fm_get "$doing_file" engine-class "")' has no available engine on this host"
echo "agent NOT launched — routed to failed/ ($(date))"
} >> "$logf"
mv "$doing_file" "$FAILED/" 2>/dev/null
{ echo "result=no_engine"; echo "ended=$(date +%s)"; } >> "$metaf"
return 0
fi
{
echo "===== agent-queue job: $job ====="
echo "engine=$engine cwd=$cwd yolo=$yolo"
echo "started: $(date)"
echo "================================="
} >> "$logf"
if [[ ! -d "$cwd" ]]; then
echo "FATAL: cwd does not exist: $cwd" >> "$logf"
mv "$doing_file" "$FAILED/" 2>/dev/null
echo "result=failed" >> "$metaf"; echo "ended=$(date +%s)" >> "$metaf"
return 1
fi
local started; started=$(grep '^started=' "$metaf" 2>/dev/null | tail -1 | cut -d= -f2)
# Strip our frontmatter so the agent only sees the task body.
local bodyf="$STATE/$job.body.md"
strip_frontmatter "$doing_file" > "$bodyf"
build_agent_cmd "$engine" "$bodyf" "$yolo"
# ── WIP checkpoint setup (§25.2): on a git cwd, create/checkout aq/wip/<job>
# so partial work survives a crash; a trap guarantees a checkpoint on EVERY
# exit path (success, failure, timeout, SIGTERM/SIGINT). Non-git cwd: no-op. ──
WIP_ACTIVE=0; WIP_BASE=""; WIP_DONE=0
_worker_trap() {
[[ "$WIP_DONE" == 1 ]] && return
WIP_DONE=1
[[ "$WIP_ACTIVE" == 1 ]] && _wip_checkpoint "$job" "$cwd" "$metaf" "$logf" "trap-exit"
}
trap '_worker_trap' EXIT
trap '_worker_trap; exit 143' INT TERM
_wip_start "$job" "$cwd" "$metaf" "$logf" || true
_run_agent() {
if [[ -n "$AGENT_STDIN" ]]; then
( cd "$cwd" && "${AGENT_CMD[@]}" < "$AGENT_STDIN" )
else
( cd "$cwd" && "${AGENT_CMD[@]}" )
fi
}
local rc=0 lockkey tmo timed_out=false
lockkey=$(lock_key_for "$doing_file")
tmo=$(_dur_to_secs "$(fm_get "$doing_file" timeout "0")")
local tmo_flag="$STATE/$job.timedout"; rm -f "$tmo_flag"
local lf="$LOCKS/$(_keyhash "$lockkey").lock"
if [[ "$tmo" -gt 0 && -n "$TIMEOUT_BIN" ]]; then
# Hard timeout via timeout/gtimeout (kills the whole process tree).
AQ_STDIN="$AGENT_STDIN" "$TIMEOUT_BIN" -k 5 "${tmo}s" bash -c '
cd "$1" || exit 97; shift
if [ -n "${AQ_STDIN:-}" ]; then exec "$@" < "$AQ_STDIN"; else exec "$@"; fi
' _ "$cwd" "${AGENT_CMD[@]}" >> "$logf" 2>&1
rc=$?
[[ $rc -eq 124 ]] && timed_out=true
elif [[ "$tmo" -gt 0 ]]; then
# Portable watchdog fallback (no timeout binary). Flags the timeout and
# signals the worker; install coreutils (gtimeout) for hard tree kills.
_run_agent >> "$logf" 2>&1 &
local apid=$!
( sleep "$tmo"; : > "$tmo_flag"
pkill -TERM -P "$apid" 2>/dev/null; kill -TERM "$apid" 2>/dev/null
sleep 5; pkill -KILL -P "$apid" 2>/dev/null; kill -KILL "$apid" 2>/dev/null ) &
local wpid=$!
wait "$apid" 2>/dev/null; rc=$?
kill "$wpid" 2>/dev/null; wait "$wpid" 2>/dev/null
[[ -f "$tmo_flag" ]] && timed_out=true
elif [[ -n "$FLOCK_BIN" ]]; then
# Cross-process hardening where flock exists (Linux CI). The single run-loop
# already serializes by lock key; this guards against a stray second launcher.
( "$FLOCK_BIN" -n 9 || exit 75; _run_agent ) 9>"$lf" >> "$logf" 2>&1
rc=$?
if [[ $rc -eq 75 ]]; then
echo "lock busy (key=$lockkey) — requeued to inbox" >> "$logf"
mv "$doing_file" "$INBOX/" 2>/dev/null
{ echo "ended=$(date +%s)"; echo "result=requeued"; } >> "$metaf"
return 0
fi
else
_run_agent >> "$logf" 2>&1
rc=$?
fi
rm -f "$tmo_flag"
# ── Preserve work + capture run metrics on EVERY path (§25.2/§26.1/§26.2) ──
if [[ "$WIP_ACTIVE" == 1 ]]; then
_wip_checkpoint "$job" "$cwd" "$metaf" "$logf" "agent-exit"; WIP_DONE=1
fi
_numstat_into_meta "$cwd" "$WIP_BASE" "$metaf"
parse_usage "$engine" "$logf" >> "$metaf"
if $timed_out; then
echo "TIMED OUT after ${tmo}s (rc=$rc): $(date)" >> "$logf"
_finish_failure "$job" "$doing_file" "$metaf" "$logf" "timeout" "$rc" "$started"
elif [[ $rc -eq 0 ]]; then
# Agent succeeded: land in review/, then run the auto-QA verify gate. The
# worker is still alive here so the concurrency slot stays held through
# verification — `ended=` is written only once we reach a resting stage.
mv "$doing_file" "$REVIEW/" 2>/dev/null
local review_file="$REVIEW/$job.md"
echo "exit=$rc" >> "$metaf"
echo "completed OK (rc=0): landed in review — $(date)" >> "$logf"
local verify; verify=$(fm_get "$review_file" verify "$DEFAULT_VERIFY")
if [[ -z "$verify" ]]; then
_meta_end "$metaf" "review" "$started"
echo "no verify command — parked in review for manual promote: $(date)" >> "$logf"
else
echo "----- verify: $verify -----" >> "$logf"
local vrc=0
( cd "$cwd" && bash -c "$verify" ) >> "$logf" 2>&1 || vrc=$?
echo "verify_exit=$vrc" >> "$metaf"
if [[ $vrc -eq 0 ]]; then
mv "$review_file" "$TESTING/" 2>/dev/null
_meta_end "$metaf" "testing" "$started"
echo "VERIFY PASSED — promoted to testing (QA): $(date)" >> "$logf"
else
echo "VERIFY FAILED (rc=$vrc): $(date)" >> "$logf"
# verify ran on the review_file; retry policy may requeue it.
_finish_failure "$job" "$review_file" "$metaf" "$logf" "verify_failed" "$rc" "$started"
fi
fi
else
echo "FAILED (rc=$rc): $(date)" >> "$logf"
_finish_failure "$job" "$doing_file" "$metaf" "$logf" "crash" "$rc" "$started"
fi
}
# ── Resilience & insights helpers (Phase 1 — single-host §25/§26) ────
#
# _in_list <item> <space-separated-list> -> 0 if item is present.
_in_list() { case " ${2:-} " in *" $1 "*) return 0;; esac; return 1; }
# _meta_end <metafile> <result> <started-epoch> -> append result/ended/duration
# (append-only; never truncates a live meta — §25 state integrity).
_meta_end() {
local mf=$1 res=$2 now; now=$(date +%s); local st=${3:-$now}
[[ "$st" =~ ^[0-9]+$ ]] || st=$now
{ echo "result=$res"; echo "ended=$now"; echo "duration_s=$(( now - st ))"; } >> "$mf"
}
# ── git WIP checkpointing (§25.2) ──
_is_git_repo() { git -C "$1" rev-parse --is-inside-work-tree >/dev/null 2>&1; }
_wip_branch() { printf 'aq/wip/%s' "$1"; }
# _wip_start <job> <cwd> <metaf> <logf> -> ensure/checkout the WIP branch.
# Sets globals WIP_ACTIVE (1 when a git WIP branch is in play) and WIP_BASE.
# RESUME: if aq/wip/<job> already exists (orphan/retry relaunch), check it out so
# the agent continues from the checkpoint instead of from zero. The base commit is
# persisted in a write-once sidecar so it survives meta re-creation across attempts.
_wip_start() {
local job=$1 cwd=$2 metaf=$3 logf=$4
WIP_ACTIVE=0; WIP_BASE=""
if ! _is_git_repo "$cwd"; then
echo "wip: cwd not a git repo — skipping checkpoint" >> "$logf"
return 1
fi
local br basefile base; br=$(_wip_branch "$job"); basefile="$STATE/$job.wipbase"
if git -C "$cwd" show-ref --verify --quiet "refs/heads/$br"; then
git -C "$cwd" checkout "$br" >/dev/null 2>&1 \
|| echo "wip: could not checkout $br (resume) — staying on current branch" >> "$logf"
echo "wip: resuming on $br ($(date))" >> "$logf"
else
base=$(git -C "$cwd" rev-parse --short HEAD 2>/dev/null || echo "")
if ! git -C "$cwd" checkout -b "$br" >/dev/null 2>&1; then
echo "wip: could not create $br — skipping checkpoint" >> "$logf"
return 1
fi
[[ -n "$base" ]] && printf '%s\n' "$base" > "$basefile"
echo "wip: created $br from ${base:-<root>} ($(date))" >> "$logf"
fi
base=$(cat "$basefile" 2>/dev/null || echo "")
{ echo "wip_branch=$br"; echo "wip_base=$base"; } >> "$metaf"
WIP_ACTIVE=1; WIP_BASE="$base"
return 0
}
# _wip_checkpoint <job> <cwd> <metaf> <logf> <stage> -> commit any changes in cwd
# onto aq/wip/<job>. Idempotent (no commit when the tree is clean). NEVER commits
# to a non-WIP branch — protects main/protected branches (§12).
_wip_checkpoint() {
local job=$1 cwd=$2 metaf=$3 logf=$4 stage=$5
_is_git_repo "$cwd" || return 0
local br cur; br=$(_wip_branch "$job")
cur=$(git -C "$cwd" symbolic-ref --short HEAD 2>/dev/null || echo "")
if [[ "$cur" != "$br" ]]; then
git -C "$cwd" checkout "$br" >/dev/null 2>&1 \
|| { echo "wip: not on $br — skipping checkpoint to protect '$cur'" >> "$logf"; return 0; }
fi
git -C "$cwd" add -A >/dev/null 2>&1
if git -C "$cwd" diff --cached --quiet 2>/dev/null; then
return 0 # nothing to preserve
fi
git -C "$cwd" -c user.email=agent-queue@local -c user.name=agent-queue \
commit -q -m "aq wip: $job ($stage)" >/dev/null 2>&1
local c; c=$(git -C "$cwd" rev-parse --short HEAD 2>/dev/null || echo "")
[[ -n "$c" ]] && echo "wip_commit=$c" >> "$metaf"
echo "wip: checkpoint ($stage) -> ${c:-?} ($(date))" >> "$logf"
}
# _numstat_into_meta <cwd> <base> <metaf> -> record files_changed/lines_added/
# lines_deleted for the run (base..HEAD on the WIP branch; binary files count 0).
_numstat_into_meta() {
local cwd=$1 base=$2 metaf=$3 out stats
_is_git_repo "$cwd" || return 0
if [[ -n "$base" ]]; then
out=$(git -C "$cwd" diff --numstat "$base" HEAD 2>/dev/null)
else
out=$(git -C "$cwd" diff --numstat HEAD 2>/dev/null)
fi
[[ -n "$out" ]] || return 0
stats=$(printf '%s\n' "$out" | awk '
{ if ($1 ~ /^[0-9]+$/) a+=$1; if ($2 ~ /^[0-9]+$/) d+=$2; f++ }
END { printf "%d %d %d", f+0, a+0, d+0 }')
# shellcheck disable=SC2086
set -- $stats
{ echo "files_changed=$1"; echo "lines_added=$2"; echo "lines_deleted=$3"; } >> "$metaf"
}
# ── retry policy (§5/§11/§25.3) ──
# Parse `retry: { max: N, backoff: 5m, on: [classes] }`. Absent/empty -> no retry.
_retry_max() { local m; m=$(printf '%s' "${1:-}" | grep -oE 'max:[[:space:]]*[0-9]+' | grep -oE '[0-9]+' | head -1); echo "${m:-0}"; }
_retry_backoff_s() { local b; b=$(printf '%s' "${1:-}" | grep -oE 'backoff:[[:space:]]*[0-9]+[smhd]?' | sed -E 's/^backoff:[[:space:]]*//' | head -1); _dur_to_secs "${b:-0}"; }
_retry_on() {
local raw=${1:-} inside
inside=$(printf '%s' "$raw" | sed -nE 's/.*on:[[:space:]]*\[([^]]*)\].*/\1/p')
[[ -n "$inside" ]] || inside=$(printf '%s' "$raw" | sed -nE 's/.*on:[[:space:]]*([A-Za-z_,[:space:]-]+).*/\1/p')
parse_list "$inside" | tr '\n' ' '
}
# _class_retryable <class> <on-list> -> 0 if this failure class should retry.
# `crash` and `agent_error` are synonyms for a non-zero agent exit.
_class_retryable() {
local class=$1 on=$2
case "$class" in
crash) _in_list crash "$on" || _in_list agent_error "$on";;
*) _in_list "$class" "$on";;
esac
}
# _finish_failure <job> <file> <metaf> <logf> <class> <rc> <started> -> apply the
# retry policy to a failed run. Retries (requeue to inbox with backoff via
# next_eligible) while the class is in `retry.on` and attempts <= max; on
# exhaustion -> failed/ result=retries_exhausted; with no policy -> failed/ with
# the natural result (failed|timeout|verify_failed). The WIP branch is preserved
# either way so a retry resumes from the checkpoint.
_finish_failure() {
local job=$1 file=$2 metaf=$3 logf=$4 class=$5 rc=$6 started=$7
local raw max on attempts; raw=$(fm_get "$file" retry "")
attempts=$(grep '^attempts=' "$metaf" 2>/dev/null | tail -1 | cut -d= -f2); attempts=${attempts:-1}
max=$(_retry_max "$raw"); on=$(_retry_on "$raw")
if [[ "$max" -gt 0 ]] && _class_retryable "$class" "$on" && [[ "$attempts" -le "$max" ]]; then
local backoff now next na; backoff=$(_retry_backoff_s "$raw")
now=$(date +%s); next=$(( now + backoff )); na=$(( attempts + 1 ))
mv "$file" "$INBOX/" 2>/dev/null
{
echo "exit=$rc"; echo "attempts=$na"; echo "next_eligible=$next"
echo "retry_class=$class"; echo "result=retry_scheduled"
echo "ended=$now"; echo "duration_s=$(( now - ${started:-now} ))"
} >> "$metaf"
echo "RETRY scheduled: class=$class, attempt $attempts/$max, backoff ${backoff}s -> inbox ($(date))" >> "$logf"
return 0
fi
local result
if [[ "$max" -gt 0 ]] && _class_retryable "$class" "$on"; then
result="retries_exhausted"
echo "RETRIES EXHAUSTED after $attempts attempt(s) (class=$class) -> failed/ ($(date))" >> "$logf"
else
case "$class" in
timeout) result="timeout";;
verify_failed) result="verify_failed";;
*) result="failed";;
esac
fi
echo "exit=$rc" >> "$metaf"
mv "$file" "$FAILED/" 2>/dev/null
_meta_end "$metaf" "$result" "$started"
}
# ── orphan recovery (§25.3) ──
# recover_orphans -> move building/ jobs whose worker is dead back to inbox/ for
# re-selection (resume-aware via the WIP branch), incrementing attempts. Idempotent:
# once moved out of building/ a job is never recovered twice. A retry-capped job
# that has exhausted crash retries goes to failed/ result=retries_exhausted instead
# of looping forever; otherwise recovery never strands work.
recover_orphans() {
local f job metaf pid pidstart
for f in "$BUILDING"/*.md; do
[[ -e "$f" ]] || continue
job=$(basename "$f"); job=${job%.md}; metaf="$STATE/$job.meta"
if [[ -f "$metaf" ]] && ! grep -q '^ended=' "$metaf"; then
pid=$(grep '^pid=' "$metaf" 2>/dev/null | tail -1 | cut -d= -f2)
pidstart=$(grep '^pidstart=' "$metaf" 2>/dev/null | tail -1 | cut -d= -f2-)
_pid_alive "$pid" "$pidstart" && continue # a live worker still owns it
fi
local prev na raw max now; prev=$(grep '^attempts=' "$metaf" 2>/dev/null | tail -1 | cut -d= -f2); prev=${prev:-1}
raw=$(fm_get "$f" retry ""); max=$(_retry_max "$raw"); now=$(date +%s)
if [[ "$max" -gt 0 ]] && _class_retryable crash "$(_retry_on "$raw")" && [[ "$prev" -gt "$max" ]]; then
mv "$f" "$FAILED/" 2>/dev/null
{ echo "attempts=$prev"; echo "recovered=$now"; } >> "$metaf"
_meta_end "$metaf" "retries_exhausted" "$(grep '^started=' "$metaf" | tail -1 | cut -d= -f2)"
echo "ORPHAN: $job exhausted crash retries -> failed/ ($(date))" >> "$LOGS/$job.log"
log "↻ orphan $C_BOLD$job$C_RESET exhausted retries -> failed"
continue
fi
na=$(( prev + 1 ))
local next=""; [[ "$max" -gt 0 ]] && next=$(( now + $(_retry_backoff_s "$raw") ))
mv "$f" "$INBOX/" 2>/dev/null
{
echo "attempts=$na"; echo "recovered=$now"
[[ -n "$next" ]] && echo "next_eligible=$next"
echo "result=recovered"; echo "ended=$now"
} >> "$metaf"
echo "ORPHAN RECOVERED: $job (worker dead) -> inbox, attempt now $na ($(date))" >> "$LOGS/$job.log"
log "↻ recovered orphan $C_BOLD$job$C_RESET (attempt $na)"
done
}
# ── token / cost capture (§26.2) ──
# parse_usage <engine> <logfile> -> emit `key=value` usage lines (model, tokens_in,
# tokens_out, tokens_cached, cost_usd, turns, tool_calls, usage_estimated) when the
# engine's output exposes them. This is the SINGLE place per-engine extraction lives.
# A wrapper of any engine may emit a machine-readable `AQ_USAGE k=v ...` line, which
# is always honored; engine-specific heuristics are best-effort (real where known,
# TODO otherwise). Never fabricate precise numbers — omit or mark usage_estimated.
parse_usage() {
local engine=$1 log=$2
[[ -f "$log" ]] || return 0
# 1) Generic, explicit usage line (preferred; emitted by any cooperating wrapper).
local line; line=$(grep -E '^AQ_USAGE ' "$log" 2>/dev/null | tail -1)
if [[ -n "$line" ]]; then
local kv
for kv in ${line#AQ_USAGE }; do
case "$kv" in
model=*|tokens_in=*|tokens_out=*|tokens_cached=*|cost_usd=*|turns=*|tool_calls=*|usage_estimated=*) echo "$kv";;
esac
done
return 0
fi
# 2) Engine-specific best-effort heuristics (real where the format is known).
local ti to
case "$engine" in
claude)
# Claude Code can surface usage as JSON-ish input_tokens/output_tokens.
ti=$(grep -oE '"input_tokens"[": ]+[0-9]+' "$log" 2>/dev/null | grep -oE '[0-9]+' | tail -1)
to=$(grep -oE '"output_tokens"[": ]+[0-9]+' "$log" 2>/dev/null | grep -oE '[0-9]+' | tail -1)
[[ -n "$ti" ]] && echo "tokens_in=$ti"
[[ -n "$to" ]] && echo "tokens_out=$to"
;;
codex)
# OpenAI usage object: prompt_tokens / completion_tokens.
ti=$(grep -oE '"prompt_tokens"[": ]+[0-9]+' "$log" 2>/dev/null | grep -oE '[0-9]+' | tail -1)
to=$(grep -oE '"completion_tokens"[": ]+[0-9]+' "$log" 2>/dev/null | grep -oE '[0-9]+' | tail -1)
[[ -n "$ti" ]] && echo "tokens_in=$ti"
[[ -n "$to" ]] && echo "tokens_out=$to"
;;
devin) : ;; # TODO: Devin session metrics are exposed via API, not the local log.
copilot) : ;; # TODO: GitHub Copilot CLI usage format not yet documented here.
esac
return 0
}
# ── insights helpers (§26) ──
# _meta_val <metafile> <key> -> last value for key (append-only safe), else empty.
_meta_val() { grep "^$2=" "$1" 2>/dev/null | tail -1 | cut -d= -f2-; }
# _result_is_success <result> -> 0 if the agent run succeeded (reached a good stage).
_result_is_success() { case "$1" in review|testing|shipped) return 0;; *) return 1;; esac; }
# _insights_line <metafile> -> compact one-line metrics summary for status/dash.
_insights_line() {
local f=$1 s="" v ti to la ld
v=$(_meta_val "$f" attempts); [[ -n "$v" ]] && s+="attempt $v "
v=$(_meta_val "$f" duration_s); [[ -n "$v" ]] && s+="${v}s "
ti=$(_meta_val "$f" tokens_in); to=$(_meta_val "$f" tokens_out)
[[ -n "$ti$to" ]] && s+="tok ${ti:-0}/${to:-0} "
v=$(_meta_val "$f" cost_usd); [[ -n "$v" ]] && s+="usd=$v "
la=$(_meta_val "$f" lines_added); ld=$(_meta_val "$f" lines_deleted)
[[ -n "$la$ld" ]] && s+="+${la:-0}/-${ld:-0} "
[[ -n "$(_meta_val "$f" usage_estimated)" ]] && s+="(est) "
printf 'insights: %s' "${s:-(pending)}"
}
# ── Commands ────────────────────────────────────────────────────────
cmd_init() { ensure_dirs; log "queue initialized at $C_BOLD$QUEUE_ROOT$C_RESET"; }
cmd_add() {
ensure_dirs
local file="" engine="" cwd="" yolo=""
while [[ $# -gt 0 ]]; do
case "$1" in
--engine) engine=$2; shift 2;;
--cwd) cwd=$2; shift 2;;
--yolo) yolo=true; shift;;
--no-yolo) yolo=false; shift;;
*) file=$1; shift;;
esac
done
[[ -n "$file" && -f "$file" ]] || die "usage: add <file.md> [--engine devin|claude|codex] [--cwd PATH] [--yolo|--no-yolo]"
# ── idempotency-key dedupe (§5) ──
# If the file declares an idempotency-key, compare a content hash of its
# stripped body against existing jobs in any active stage:
# same key + same body -> no-op (skip; "duplicate")
# same key + different body, prior in inbox/ -> supersede (replace it)
# same key + different body, prior elsewhere -> reject (clear error)
local idem; idem=$(fm_get "$file" idempotency-key "")
if [[ -n "$idem" ]]; then
local newhash; newhash=$(_body_hash "$file")
local d ef ekey ehash stage
# pass 1: exact duplicate anywhere active -> no-op
for d in "$INBOX" "$BUILDING" "$REVIEW" "$TESTING" "$SHIPPED"; do
for ef in "$d"/*.md; do
[[ -e "$ef" ]] || continue
ekey=$(fm_get "$ef" idempotency-key "")
[[ "$ekey" == "$idem" ]] || continue
ehash=$(_body_hash "$ef")
if [[ "$ehash" == "$newhash" ]]; then
log "duplicate, skipped (idempotency-key=$C_BOLD$idem$C_RESET matches $(basename "$ef"))"
return 0
fi
done
done
# pass 2: same key, different body, prior is past inbox -> reject
for d in "$BUILDING" "$REVIEW" "$TESTING" "$SHIPPED"; do
for ef in "$d"/*.md; do
[[ -e "$ef" ]] || continue
ekey=$(fm_get "$ef" idempotency-key "")
[[ "$ekey" == "$idem" ]] || continue
stage=$(basename "$d")
die "idempotency-key '$idem' already in use by $(basename "$ef") (stage: $stage) with different content — refusing. Use a new key, or requeue the existing job."
done
done
# pass 3: same key, different body, prior still queued -> supersede
for ef in "$INBOX"/*.md; do
[[ -e "$ef" ]] || continue
ekey=$(fm_get "$ef" idempotency-key "")
[[ "$ekey" == "$idem" ]] || continue
log "superseding inbox job $(basename "$ef") (same idempotency-key=$idem, changed content)"
rm -f "$ef"
done
fi
local base; base=$(basename "$file")
local stamp; stamp=$(date +%Y%m%d-%H%M%S)
local dest="$INBOX/${stamp}__${base}"
# If user passed flags AND the file has no frontmatter, inject one.
if [[ -n "$engine$cwd$yolo" ]] && [[ "$(head -1 "$file")" != "---" ]]; then
{
echo "---"
echo "engine: ${engine:-$DEFAULT_ENGINE}"
echo "cwd: ${cwd:-$PWD}"
echo "yolo: ${yolo:-true}"
echo "---"
echo
cat "$file"
} > "$dest"
else
cp "$file" "$dest"
fi
log "queued $C_BOLD$(basename "$dest")$C_RESET (engine=$(fm_get "$dest" engine "$DEFAULT_ENGINE"), cwd=$(fm_get "$dest" cwd "$PWD"))"
}
cmd_run() {
ensure_dirs
local once=false
while [[ $# -gt 0 ]]; do
case "$1" in
--max) MAX_CONCURRENCY=$2; shift 2;;
--engine) DEFAULT_ENGINE=$2; shift 2;;
--once|--drain) once=true; shift;;
*) die "run: unknown arg '$1'";;
esac
done
# Refuse to start a second run loop against the same queue — two daemons would
# break the single-launcher invariant that per-cwd locking relies on.
local dpid=""
[[ -f "$STATE/daemon.pid" ]] && dpid=$(cat "$STATE/daemon.pid" 2>/dev/null)
if [[ -n "$dpid" ]] && kill -0 "$dpid" 2>/dev/null; then
die "a run loop is already active (pid $dpid). Use 'stop' first, or a different AGENT_QUEUE_ROOT."
fi
[[ -n "$dpid" ]] && log "clearing stale daemon.pid ($dpid)"
echo "$$" > "$STATE/daemon.pid"
trap 'rm -f "$STATE/daemon.pid"; log "run loop stopped"; exit 0' INT TERM
log "run loop started (max=$MAX_CONCURRENCY, default engine=$DEFAULT_ENGINE). Ctrl-C to stop."
# Crash recovery (§25.3): reclaim jobs orphaned in building/ by a previous
# crash/power-off before launching anything new.
recover_orphans
while true; do
# continuously sweep for orphans (a worker that died mid-loop)
recover_orphans
local running; running=$(active_workers)
# launch jobs while we have capacity and an eligible inbox file
while [[ "$running" -lt "$MAX_CONCURRENCY" ]]; do
# pick by priority (critical→low) then age, skipping files whose lock key
# is currently busy, so two jobs sharing a cwd (or `lock:` key) never run
# at once regardless of --max. inbox_sorted replaces the old pure-FIFO sort.
# Also skip jobs still inside their retry/recovery backoff (next_eligible).
local busy; busy=$(busy_keys)
local next="" cand cand_key cand_job cand_ne now_s
now_s=$(date +%s)
while IFS= read -r cand; do
[[ -n "$cand" ]] || continue
cand_key=$(lock_key_for "$cand")
if printf '%s\n' "$busy" | grep -qxF -- "$cand_key"; then continue; fi
cand_job=$(basename "$cand"); cand_job=${cand_job%.md}
cand_ne=$(grep '^next_eligible=' "$STATE/$cand_job.meta" 2>/dev/null | tail -1 | cut -d= -f2)
if [[ "$cand_ne" =~ ^[0-9]+$ ]] && [[ "$cand_ne" -gt "$now_s" ]]; then continue; fi
next="$cand"; break
done < <(inbox_sorted)
[[ -z "$next" ]] && break
local job; job=$(basename "$next"); job=${job%.md}
local doing_file="$BUILDING/$(basename "$next")"
mv "$next" "$doing_file"
local w_eng w_cwd w_yolo w_key
# resolve the concrete engine now (explicit engine / engine-class) so the
# meta + status reflect what will actually run; run_worker re-resolves and
# is the authority on capability/engine gates.
w_eng=$(resolve_engine "$doing_file")
w_cwd=$(fm_get "$doing_file" cwd "$PWD")
w_yolo=$(fm_get "$doing_file" yolo "true")
w_key=$(lock_key_for "$doing_file")
# Preserve the attempt counter across requeues (retry/orphan recovery set
# it before re-queuing); a fresh job starts at 1. Read BEFORE truncating
# the meta below so the count survives re-creation (§25.4 crash-safe).
local w_attempts; w_attempts=$(grep '^attempts=' "$STATE/$job.meta" 2>/dev/null | tail -1 | cut -d= -f2)
[[ "$w_attempts" =~ ^[0-9]+$ ]] && [[ "$w_attempts" -gt 0 ]] || w_attempts=1
# write meta BEFORE launch (no pid yet), then append the worker pid from $!.
# The new manifest fields (§5) are recorded here; only priority,
# capabilities, engine-class and idempotency-key are functional this
# phase — the rest are stored for `status`/audit but otherwise inert.
{
echo "job=$job"
echo "engine=${w_eng:-<none>}"
echo "cwd=$w_cwd"
echo "yolo=$w_yolo"
echo "lock=$w_key"
echo "started=$(date +%s)"
echo "attempts=$w_attempts"
echo "priority=$(fm_get "$doing_file" priority medium)"
echo "profile=$(fm_get "$doing_file" profile "")"
echo "engine_class=$(fm_get "$doing_file" engine-class "")"
echo "capabilities=$(fm_get "$doing_file" capabilities "")"
echo "prefers=$(fm_get "$doing_file" prefers "")"
echo "prefers_engine=$(fm_get "$doing_file" prefers-engine "")"
echo "budget=$(fm_get "$doing_file" budget "")"
echo "deps=$(fm_get "$doing_file" deps "")"
echo "deps_mode=$(fm_get "$doing_file" deps-mode "")"
echo "idempotency_key=$(fm_get "$doing_file" idempotency-key "")"
echo "retry=$(fm_get "$doing_file" retry "")"
echo "review_policy=$(fm_get "$doing_file" review-policy "")"
echo "artifacts=$(fm_get "$doing_file" artifacts "")"
echo "tracker_item=$(fm_get "$doing_file" tracker-item "")"
} > "$STATE/$job.meta"
run_worker "$doing_file" &
{ echo "pid=$!"; echo "pidstart=$(_pidstart "$!")"; } >> "$STATE/$job.meta"
log "▶ launching $C_BOLD$job$C_RESET (engine=$w_eng, lock=$w_key)"
sleep 1
running=$(active_workers)
done
if $once; then
[[ "$(active_workers)" -eq 0 && -z "$(ls -1 "$INBOX"/*.md 2>/dev/null)" ]] && {
log "drain complete — inbox empty, no workers running"; rm -f "$STATE/daemon.pid"; exit 0; }
fi
sleep "$POLL_SECONDS"
done
}
_count() { ls -1 "$1"/*.md 2>/dev/null | wc -l | tr -d ' '; }
cmd_status() {
ensure_dirs
local ib bd rv ts sh fl
ib=$(_count "$INBOX"); bd=$(_count "$BUILDING"); rv=$(_count "$REVIEW")
ts=$(_count "$TESTING"); sh=$(_count "$SHIPPED"); fl=$(_count "$FAILED")
local running; running=$(active_workers)
echo
printf '%s AGENT QUEUE %s %s\n' "$C_BOLD" "$C_DIM$QUEUE_ROOT$C_RESET" ""
printf ' %sinbox%s %-3s %sbuilding%s %-3s %sreview%s %-3s %stesting%s %-3s %sshipped%s %-3s %sfailed%s %-3s %srunning%s %s/%s\n\n' \
"$C_BLUE" "$C_RESET" "$ib" "$C_YEL" "$C_RESET" "$bd" \
"$C_CYAN" "$C_RESET" "$rv" "$C_CYAN" "$C_RESET" "$ts" \
"$C_GREEN" "$C_RESET" "$sh" "$C_RED" "$C_RESET" "$fl" \
"$C_BOLD" "$C_RESET" "$running" "$MAX_CONCURRENCY"
# running table
local f
local printed=false
for f in "$STATE"/*.meta; do
[[ -e "$f" ]] || continue
grep -q '^ended=' "$f" && continue
local pid pidstart; pid=$(grep '^pid=' "$f" | cut -d= -f2); pidstart=$(grep '^pidstart=' "$f" | cut -d= -f2-)
_pid_alive "$pid" "$pidstart" || continue
if ! $printed; then printf ' %sRUNNING%s\n' "$C_BOLD" "$C_RESET"; printed=true; fi
local job eng start now el last lmt age stall=""
job=$(grep '^job=' "$f" | cut -d= -f2)
eng=$(grep '^engine=' "$f" | cut -d= -f2)
start=$(grep '^started=' "$f" | cut -d= -f2)
now=$(date +%s); el=$(( now - ${start:-$now} ))
last=$(tail -n 1 "$LOGS/$job.log" 2>/dev/null | cut -c1-60)
lmt=$(_mtime "$LOGS/$job.log"); age=$(( now - ${lmt:-$now} ))
[[ "$age" -gt $(( STALL_MIN * 60 )) ]] && stall="${C_RED}⚠ stalled${C_RESET} "
printf ' %s%-26s%s %-7s %3dm%02ds pid %-6s %s%s%s%s\n' \
"$C_BOLD" "$job" "$C_RESET" "$eng" $((el/60)) $((el%60)) "$pid" "$stall" "$C_DIM" "$last" "$C_RESET"
# manifest sub-line (Phase 1): priority + profile + capabilities + tracker-item
local m_prio m_prof m_caps m_trk extra=""
m_prio=$(grep '^priority=' "$f" | cut -d= -f2-); m_prof=$(grep '^profile=' "$f" | cut -d= -f2-)
m_caps=$(grep '^capabilities=' "$f" | cut -d= -f2-); m_trk=$(grep '^tracker_item=' "$f" | cut -d= -f2-)
[[ -n "$m_prio" ]] && extra+="prio=$m_prio "
[[ -n "$m_prof" ]] && extra+="profile=$m_prof "
[[ -n "$m_caps" ]] && extra+="caps=$m_caps "
[[ -n "$m_trk" ]] && extra+="tracker=$m_trk "
[[ -n "$extra" ]] && printf ' %s%s%s\n' "$C_DIM" "$extra" "$C_RESET"
printf ' %s%s%s\n' "$C_DIM" "$(_insights_line "$f")" "$C_RESET"
done
$printed || printf ' %sno workers running%s\n' "$C_DIM" "$C_RESET"
echo
}
cmd_watch() {
local interval="${1:-2}"
while true; do clear; cmd_status; sleep "$interval"; done
}
# recover — reclaim orphaned building/ jobs (dead worker) back to inbox/. Runs
# automatically inside `run`; exposed for operators + crash-recovery testing.
cmd_recover() { ensure_dirs; recover_orphans; log "orphan sweep complete"; }
# insights [job] — per-job metrics, or a recent-jobs table + per-engine rollup.
cmd_insights() {
ensure_dirs
local job="${1:-}"
if [[ -n "$job" ]]; then
local f="$STATE/$job.meta"
[[ -f "$f" ]] || f=$(ls -1t "$STATE"/*"$job"*.meta 2>/dev/null | head -1)
[[ -f "$f" ]] || die "no job meta matching '$job'"
local jn; jn=$(basename "$f"); jn=${jn%.meta}
printf '\n%s INSIGHTS %s%s%s\n' "$C_BOLD" "$C_CYAN" "$jn" "$C_RESET"
local k val
for k in engine result attempts started ended duration_s exit verify_exit \
model tokens_in tokens_out tokens_cached cost_usd turns tool_calls usage_estimated \
files_changed lines_added lines_deleted wip_branch wip_base wip_commit \
next_eligible retry_class recovered; do
val=$(_meta_val "$f" "$k")
[[ -n "$val" ]] && printf ' %-15s %s\n' "$k" "$val"
done
echo
return 0
fi
printf '\n%s INSIGHTS — recent finished jobs%s %s%s%s\n' \
"$C_BOLD" "$C_RESET" "$C_DIM" "$QUEUE_ROOT" "$C_RESET"
printf ' %-26s %-8s %-16s %6s %10s %9s\n' "job" "engine" "result" "dur" "tok(i/o)" "cost"
local f rows=0 agg; agg=$(mktemp "${TMPDIR:-/tmp}/aq-insights.XXXXXX")
while IFS= read -r f; do
[[ -n "$f" ]] || continue
grep -q '^ended=' "$f" || continue
local jn eng res dur ti to cost est
jn=$(basename "$f"); jn=${jn%.meta}
eng=$(_meta_val "$f" engine); res=$(_meta_val "$f" result); dur=$(_meta_val "$f" duration_s)
ti=$(_meta_val "$f" tokens_in); to=$(_meta_val "$f" tokens_out); cost=$(_meta_val "$f" cost_usd)
est=$(_meta_val "$f" usage_estimated)
rows=$((rows+1))
[[ $rows -le 15 ]] && printf ' %-26.26s %-8.8s %-16.16s %5ss %10s %9s\n' \
"$jn" "${eng:-?}" "${res:-?}" "${dur:-0}" "${ti:-0}/${to:-0}" "${cost:-}"
local succ=0; _result_is_success "$res" && succ=1
printf '%s|%s|%s|%s|%s|%s|%s\n' "${eng:-?}" "${ti:-0}" "${to:-0}" "${cost:-0}" "${dur:-0}" "$succ" "$est" >> "$agg"
done < <(ls -1t "$STATE"/*.meta 2>/dev/null)
if [[ $rows -eq 0 ]]; then
printf ' %sno finished jobs yet%s\n\n' "$C_DIM" "$C_RESET"; rm -f "$agg"; return 0
fi
printf '\n%s ROLLUP BY ENGINE%s\n' "$C_BOLD" "$C_RESET"
printf ' %-8s %5s %10s %10s %10s %8s\n' "engine" "jobs" "tok_in" "tok_out" "cost" "success"
local e
for e in $(cut -d'|' -f1 "$agg" | sort -u); do
awk -F'|' -v eng="$e" '
$1==eng { jobs++; ti+=$2; to+=$3; cost+=$4; succ+=$6; if ($7!="") est=1 }
END {
rate = jobs>0 ? (succ*100.0/jobs) : 0
printf " %-8s %5d %10d %10d %9.4f%s %6.0f%%\n", eng, jobs, ti, to, cost, (est?"*":" "), rate
}' "$agg"
done
printf ' %s* total includes estimated token/cost values%s\n\n' "$C_DIM" "$C_RESET"
rm -f "$agg"
}
cmd_dash() {
command -v node >/dev/null 2>&1 || die "node not found — use 'watch' for the bash status view"
AGENT_QUEUE_ROOT="$QUEUE_ROOT" exec node "$SCRIPT_DIR/dashboard.mjs" "$@"
}
cmd_stop() {
ensure_dirs
local killed=0 f pid pidstart
for f in "$STATE"/*.meta; do
[[ -e "$f" ]] || continue
grep -q '^ended=' "$f" && continue
pid=$(grep '^pid=' "$f" | cut -d= -f2); pidstart=$(grep '^pidstart=' "$f" | cut -d= -f2-)
_pid_alive "$pid" "$pidstart" && { kill "$pid" 2>/dev/null && killed=$((killed+1)); }
done
[[ -f "$STATE/daemon.pid" ]] && kill "$(cat "$STATE/daemon.pid")" 2>/dev/null
rm -f "$STATE/daemon.pid"
log "stopped $killed running worker(s) + run loop"
}
cmd_logs() {
local job="${1:-}" follow=""
[[ "${2:-}" == "-f" || "$job" == "-f" ]] && follow="-f"
[[ "$job" == "-f" ]] && job="${2:-}"
[[ -n "$job" ]] || die "usage: logs <job> [-f]"
local lf="$LOGS/$job.log"
[[ -f "$lf" ]] || lf=$(ls -1t "$LOGS"/*"$job"*.log 2>/dev/null | head -1)
[[ -f "$lf" ]] || die "no log found for '$job'"
if [[ -n "$follow" ]]; then tail -f "$lf"; else cat "$lf"; fi
}
# _find_job <job> <dir...> — echo the first matching .md across the given dirs
# (exact "<job>.md" preferred, else newest fuzzy match). Empty if none found.
_find_job() {
local job=$1; shift
local d f
for d in "$@"; do
[[ -f "$d/$job.md" ]] && { printf '%s' "$d/$job.md"; return; }
done
for d in "$@"; do
f=$(ls -1t "$d"/*"$job"*.md 2>/dev/null | head -1)
[[ -f "$f" ]] && { printf '%s' "$f"; return; }
done
}
# requeue <job> — move a job back to inbox/ for a fresh run (from failed/review/testing).
cmd_requeue() {
ensure_dirs
local job="${1:-}"
[[ -n "$job" ]] || die "usage: requeue <job>"
local f; f=$(_find_job "$job" "$FAILED" "$REVIEW" "$TESTING")
[[ -n "$f" ]] || die "no failed/review/testing job matching '$job'"
local base name from; base=$(basename "$f"); name=${base%.md}; from=$(basename "$(dirname "$f")")
mv "$f" "$INBOX/$base"
# drop stale state so it re-runs cleanly
rm -f "$STATE/$name.meta" "$STATE/$name.body.md" "$STATE/$name.timedout"
log "requeued $C_BOLD$base$C_RESET ($from → inbox)"
}
# ship <job> — manual promotion testing/ (QA) → shipped/. The human gate.
cmd_ship() {
ensure_dirs
local job="${1:-}"
[[ -n "$job" ]] || die "usage: ship <job>"
local f; f=$(_find_job "$job" "$TESTING")
[[ -n "$f" ]] || die "no job in testing/ matching '$job' (only QA-passed jobs can ship)"
local base name; base=$(basename "$f"); name=${base%.md}
mv "$f" "$SHIPPED/$base"
[[ -f "$STATE/$name.meta" ]] && echo "result=shipped" >> "$STATE/$name.meta"
log "shipped $C_BOLD$base$C_RESET (testing → shipped)"
}
# promote <job> — advance one stage forward: review → testing → shipped.
cmd_promote() {
ensure_dirs
local job="${1:-}"
[[ -n "$job" ]] || die "usage: promote <job>"
local f; f=$(_find_job "$job" "$REVIEW" "$TESTING")
[[ -n "$f" ]] || die "no job in review/ or testing/ matching '$job'"
local base name from dest result; base=$(basename "$f"); name=${base%.md}
from=$(basename "$(dirname "$f")")
case "$from" in
review) dest="$TESTING"; result="testing";;
testing) dest="$SHIPPED"; result="shipped";;
*) die "promote: '$base' is in '$from' — nothing to promote";;
esac
mv "$f" "$dest/$base"
[[ -f "$STATE/$name.meta" ]] && echo "result=$result" >> "$STATE/$name.meta"
log "promoted $C_BOLD$base$C_RESET ($from$result)"
}
# reject <job> — move a review/testing job to failed/ (manual gate rejection).
cmd_reject() {
ensure_dirs
local job="${1:-}"
[[ -n "$job" ]] || die "usage: reject <job>"
local f; f=$(_find_job "$job" "$REVIEW" "$TESTING")
[[ -n "$f" ]] || die "no job in review/ or testing/ matching '$job'"
local base name from; base=$(basename "$f"); name=${base%.md}; from=$(basename "$(dirname "$f")")
mv "$f" "$FAILED/$base"
[[ -f "$STATE/$name.meta" ]] && echo "result=rejected" >> "$STATE/$name.meta"
log "rejected $C_BOLD$base$C_RESET ($from → failed)"
}
# clean [--keep N] — archive finished jobs' logs+meta beyond the newest N
# (default 50) into queue/.archive/<ts>/. Running jobs and the done/failed .md
# kanban records are left untouched.
cmd_clean() {
ensure_dirs
local keep=50
while [[ $# -gt 0 ]]; do
case "$1" in
--keep) keep=$2; shift 2;;
*) die "clean: unknown arg '$1'";;
esac
done
[[ "$keep" =~ ^[0-9]+$ ]] || die "clean: --keep must be a number"
local arch="$QUEUE_ROOT/.archive/$(date +%Y%m%d-%H%M%S)"
# finished metas (have ended=), newest-first by mtime
local metas; metas=$(grep -l '^ended=' "$STATE"/*.meta 2>/dev/null \
| while IFS= read -r m; do printf '%s %s\n' "$(_mtime "$m")" "$m"; done \
| sort -rn | awk '{print $2}')
local i=0 moved=0 m name
while IFS= read -r m; do
[[ -n "$m" ]] || continue
i=$((i+1))
[[ "$i" -le "$keep" ]] && continue
name=$(basename "$m"); name=${name%.meta}
mkdir -p "$arch"
mv "$m" "$arch/" 2>/dev/null
[[ -f "$LOGS/$name.log" ]] && mv "$LOGS/$name.log" "$arch/" 2>/dev/null
[[ -f "$STATE/$name.body.md" ]] && mv "$STATE/$name.body.md" "$arch/" 2>/dev/null
moved=$((moved+1))
done <<< "$metas"
if [[ "$moved" -gt 0 ]]; then
log "archived $moved finished job(s) to $C_BOLD$arch$C_RESET (kept newest $keep)"
else
log "nothing to clean (≤$keep finished jobs)"
fi
}
usage() {
cat <<EOF
${C_BOLD}agent-queue${C_RESET} — folder kanban runner for devin | claude | codex
${C_BOLD}USAGE${C_RESET}
agent-queue.sh <command> [args]
${C_BOLD}COMMANDS${C_RESET}
init create the queue/ folders
add <file.md> [opts] queue a prompt file into inbox/
--engine devin|claude|codex --cwd PATH --yolo | --no-yolo
run [--max N] [--engine E] [--once]
process inbox/ (foreground loop; Ctrl-C to stop)
status show kanban counts + running workers (+ insights)
watch [interval] live status (default 2s, bash)
insights [job] per-job metrics, or recent table + per-engine rollup
recover reclaim orphaned building/ jobs (dead worker) -> inbox
dash [--interval N] richer live Node dashboard (recent shipped/failed too)
stop kill running workers + the run loop
logs <job> [-f] print (or follow) a job's log
promote <job> advance one stage (review → testing → shipped)
ship <job> manual gate: testing (QA) → shipped
reject <job> send a review/testing job to failed/
requeue <job> move a failed/review/testing job back to inbox/
clean [--keep N] archive finished logs+meta beyond newest N (default 50)
help this message
${C_BOLD}KANBAN${C_RESET} inbox → building → review → testing → shipped (+ failed; logs/ + .state/ alongside)
auto: agent rc=0 → review; verify pass → testing; verify fail → failed
manual: ship (testing → shipped)
${C_BOLD}TASK FRONTMATTER${C_RESET} (top of each .md)
---
engine: devin # devin|claude|codex|copilot. Explicit engine always wins
cwd: /Users/you/code/repo
yolo: true
lock: my-repo # optional; defaults to cwd. Jobs sharing a key run serially
timeout: 45m # optional; 90s|45m|2h|1d. On expiry -> failed (result=timeout)
verify: pnpm -s test # optional; auto-QA gate. pass -> testing, fail -> failed
# --- Phase 1 manifest (active) ---
priority: high # critical|high|medium|low (default medium). Picked highest-first, then oldest
engine-class: agentic-coder # used only if `engine` unset: agentic-coder->devin,claude,codex; chat-coder->copilot
prefers-engine: [claude] # optional order hint for engine-class resolution
capabilities: [os:any, node>=20, has:git] # hard host requirements; unmet -> failed (capability_mismatch)
idempotency-key: my-task-1 # re-adding same key+body = no-op; same key+different body = reject/supersede
retry: { max: 2, backoff: 5m, on: [timeout, verify_failed, crash] } # requeue on these classes up to max, then retries_exhausted
# --- reserved (parsed + shown in status, but no-op until a later phase) ---
profile: prefers: budget: deps: deps-mode: review-policy: artifacts: tracker-item:
---
${C_BOLD}RESILIENCE${C_RESET} crash-safe: orphaned building/ jobs (dead worker) are recovered to inbox/ on
'run' startup; git-repo cwd work is checkpointed to branch aq/wip/<job> on every
exit (resumed on retry); 'retry' requeues failures with backoff. See 'insights'.
${C_BOLD}ENV${C_RESET}
AGENT_QUEUE_ROOT (=$QUEUE_ROOT) AGENT_QUEUE_MAX (=$MAX_CONCURRENCY)
AGENT_QUEUE_ENGINE (=$DEFAULT_ENGINE) AGENT_QUEUE_VERIFY (default verify cmd)
DEVIN_BIN / CLAUDE_BIN / CODEX_BIN / COPILOT_BIN
EOF
}
main() {
local cmd="${1:-help}"; shift || true
case "$cmd" in
init) cmd_init "$@";;
add) cmd_add "$@";;
run) cmd_run "$@";;
status) cmd_status "$@";;
watch) cmd_watch "$@";;
insights) cmd_insights "$@";;
recover) cmd_recover "$@";;
dash|dashboard) cmd_dash "$@";;
stop) cmd_stop "$@";;
logs) cmd_logs "$@";;
promote) cmd_promote "$@";;
ship) cmd_ship "$@";;
reject) cmd_reject "$@";;
requeue) cmd_requeue "$@";;
clean) cmd_clean "$@";;
help|-h|--help) usage;;
*) err "unknown command: $cmd"; echo; usage; exit 1;;
esac
}
main "$@"