bytelyst-devops-tools/agent-queue/agent-queue.sh
saravanakumardb1 d0e800247c feat(agent-queue): PR mode clones from local repo base (AQ_FLEET_REPO_BASE)
MVP: when AQ_FLEET_REPO_BASE/<repo> is an existing local checkout, use it as the
clone source (fast, no network) and push/PR to its GitHub origin — embedded creds
in the local origin URL are stripped (gh credential helper handles auth). Selftest
PASS (full-path bare-repo fallback unchanged).

Generated with [Devin](https://cli.devin.ai/docs)

Co-Authored-By: Devin <158243242+devin-ai-integration[bot]@users.noreply.github.com>
2026-05-31 05:36:46 -07:00

2136 lines
94 KiB
Bash
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env bash
#
# agent-queue — a folder-based "kanban" runner for headless coding-agent CLIs.
#
# Drop a prompt .md file into queue/inbox/, and `agent-queue run` will:
# 1. pick the oldest file (respecting --max concurrency),
# 2. move it inbox/ -> building/,
# 3. launch the chosen agent CLI (devin | claude | codex) in --yolo mode,
# 4. on agent rc=0 move building/ -> review/, then run the auto-QA verify gate:
# verify pass -> testing/ verify fail -> failed/ (no verify -> stays in review/)
# 5. on agent failure/timeout move building/ -> failed/,
# 6. you manually `ship` testing/ -> shipped/ (the human gate),
# 7. write a per-job log + live state so `status`/`watch` can show progress.
#
# Lifecycle: inbox -> building -> review -> testing -> shipped (+ failed)
#
# Per-task config travels in YAML-ish frontmatter at the top of the .md:
# ---
# engine: devin # devin | claude | codex (default: $DEFAULT_ENGINE)
# cwd: /abs/path/repo # where the agent runs (default: $PWD when added)
# yolo: true # auto-approve all tools (default: true)
# ---
#
# Subcommands: init | add | run | status | watch | dash | stop | logs |
# promote | ship | reject | requeue | clean | help
#
set -uo pipefail
# ── Resolve paths ───────────────────────────────────────────────────
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
QUEUE_ROOT="${AGENT_QUEUE_ROOT:-$SCRIPT_DIR/queue}"
# Profile catalog dir (persona + capability presets). Override for tests.
PROFILES_DIR="${AGENT_QUEUE_PROFILES:-$SCRIPT_DIR/profiles}"
INBOX="$QUEUE_ROOT/inbox"
BUILDING="$QUEUE_ROOT/building"
REVIEW="$QUEUE_ROOT/review"
TESTING="$QUEUE_ROOT/testing"
SHIPPED="$QUEUE_ROOT/shipped"
FAILED="$QUEUE_ROOT/failed"
LOGS="$QUEUE_ROOT/logs"
STATE="$QUEUE_ROOT/.state"
LOCKS="$QUEUE_ROOT/locks"
# ── Config (env-overridable) ────────────────────────────────────────
MAX_CONCURRENCY="${AGENT_QUEUE_MAX:-3}"
DEFAULT_ENGINE="${AGENT_QUEUE_ENGINE:-devin}"
POLL_SECONDS="${AGENT_QUEUE_POLL:-3}"
# A running worker is flagged "stalled" if its log has not changed in this many
# minutes (no new agent output) — surfaced in status + dash.
STALL_MIN="${AGENT_QUEUE_STALL_MIN:-10}"
# Auto-QA verify command. After an agent exits 0 the job lands in review/; if a
# verify command is set (frontmatter `verify:` overrides this default) it runs in
# the job's cwd: pass -> testing/ (QA), fail -> failed/. Empty default = jobs park
# in review/ for manual `promote`. Shipping (testing -> shipped) is always manual.
DEFAULT_VERIFY="${AGENT_QUEUE_VERIFY:-}"
# flock is used for cross-process lock hardening when available (Linux). macOS
# has no flock; mutual exclusion there relies on the single run-loop (see cmd_run).
FLOCK_BIN="${FLOCK_BIN:-$(command -v flock || true)}"
# timeout/gtimeout give hard process-tree kills for per-job timeouts; if absent
# (stock macOS) a pure-bash watchdog is used as a best-effort fallback.
TIMEOUT_BIN="${TIMEOUT_BIN:-$(command -v timeout || command -v gtimeout || true)}"
DEVIN_BIN="${DEVIN_BIN:-$(command -v devin || echo "$HOME/.local/bin/devin")}"
CLAUDE_BIN="${CLAUDE_BIN:-$(command -v claude || echo claude)}"
CODEX_BIN="${CODEX_BIN:-$(command -v codex || echo codex)}"
COPILOT_BIN="${COPILOT_BIN:-$(command -v copilot || echo copilot)}"
GH_BIN="${GH_BIN:-$(command -v gh || echo gh)}" # GitHub CLI (fleet PR mode)
# ── Tracker integration (§10) — task <-> job round-trip via the items API ──
# Base URL of the platform-service items API; the items routes live under /api.
AQ_TRACKER_API="${AQ_TRACKER_API:-http://localhost:4003}"
# Bearer token (required for real calls; never hardcode). productId stamps Items.
AQ_TRACKER_TOKEN="${AQ_TRACKER_TOKEN:-}"
AQ_PRODUCT_ID="${AQ_PRODUCT_ID:-}"
# cwd a tracker-derived job runs in (Items carry no cwd); defaults to the invoking dir.
AQ_TRACKER_CWD="${AQ_TRACKER_CWD:-$PWD}"
# Auto-echo job outcomes back to the tracker on each transition (opt-in, default OFF).
AQ_TRACKER_AUTO="${AQ_TRACKER_AUTO:-0}"
# Item status the API uses for each bucket (the items API has no blocked/failed
# status, so failures map to wont_fix by default — all overridable).
AQ_TRACKER_STATUS_INPROGRESS="${AQ_TRACKER_STATUS_INPROGRESS:-in_progress}"
AQ_TRACKER_STATUS_DONE="${AQ_TRACKER_STATUS_DONE:-done}"
AQ_TRACKER_STATUS_FAILED="${AQ_TRACKER_STATUS_FAILED:-wont_fix}"
# Test seam: a stub script that replaces the real curl HTTP (see selftest.sh).
AQ_TRACKER_API_CMD="${AQ_TRACKER_API_CMD:-}"
CURL_BIN="${CURL_BIN:-$(command -v curl || echo curl)}"
# ── Colors ──────────────────────────────────────────────────────────
if [[ -t 1 ]]; then
C_RESET=$'\033[0m'; C_DIM=$'\033[2m'; C_BOLD=$'\033[1m'
C_BLUE=$'\033[34m'; C_GREEN=$'\033[32m'; C_RED=$'\033[31m'; C_YEL=$'\033[33m'; C_CYAN=$'\033[36m'
else
C_RESET=""; C_DIM=""; C_BOLD=""; C_BLUE=""; C_GREEN=""; C_RED=""; C_YEL=""; C_CYAN=""
fi
log() { printf '%s[agent-queue]%s %s\n' "$C_CYAN" "$C_RESET" "$*"; }
err() { printf '%s[agent-queue]%s %s\n' "$C_RED" "$C_RESET" "$*" >&2; }
die() { err "$*"; exit 1; }
# ── Init ────────────────────────────────────────────────────────────
ensure_dirs() { mkdir -p "$INBOX" "$BUILDING" "$REVIEW" "$TESTING" "$SHIPPED" "$FAILED" "$LOGS" "$STATE" "$LOCKS"; }
# ── Frontmatter parsing ─────────────────────────────────────────────
# fm_get <file> <key> <default>
fm_get() {
local file=$1 key=$2 def=${3:-}
local val
# only scan a leading --- ... --- block
val=$(awk -v k="$key" '
NR==1 && $0!="---" { exit }
NR==1 { infm=1; next }
infm && $0=="---" { exit }
infm {
line=$0
sub(/^[ \t]*/,"",line)
if (line ~ "^" k "[ \t]*:") {
sub("^" k "[ \t]*:[ \t]*","",line)
gsub(/^["'\''[:space:]]+|["'\''[:space:]]+$/,"",line)
print line; exit
}
}' "$file" 2>/dev/null)
[[ -n "$val" ]] && printf '%s' "$val" || printf '%s' "$def"
}
# strip_frontmatter <file> -> prints the body (everything after a leading ---..--- block)
strip_frontmatter() {
awk 'NR==1 && $0=="---" { infm=1; next }
infm && $0=="---" { infm=0; next }
{ if (!infm) print }' "$1"
}
# lock_key_for <file> -> the mutual-exclusion key for a job: frontmatter `lock:`
# if set, otherwise the cwd. Jobs sharing a key never run concurrently.
lock_key_for() {
local f=$1 k
k=$(fm_get "$f" lock "")
[[ -n "$k" ]] && { printf '%s' "$k"; return; }
fm_get "$f" cwd "$PWD"
}
# _keyhash <key> -> stable filename-safe token for a lock key
_keyhash() { printf '%s' "$1" | cksum | awk '{print $1}'; }
# ── Profiles (§6): persona + capability/engine/scope presets ─────────
#
# profile_get <profile-name> <profile-key> [default] -> a single-line value from
# profiles/<name>.md, else the default.
profile_get() {
local pf="$PROFILES_DIR/$1.md"
[[ -f "$pf" ]] || { printf '%s' "${3:-}"; return; }
fm_get "$pf" "$2" "${3:-}"
}
# profile_persona <profile-name> -> the multi-line `persona: |` block (2-space
# indent stripped), or empty. Used to prepend a persona overlay to the job body.
profile_persona() {
local pf="$PROFILES_DIR/$1.md"
[[ -f "$pf" ]] || return 0
awk '
NR==1 && $0!="---" { exit }
NR==1 { infm=1; next }
infm && $0=="---" { exit }
!infm { next }
inpersona {
if ($0 ~ /^[A-Za-z0-9_-]+[ \t]*:/) { inpersona=0 }
else { line=$0; sub(/^ /,"",line); print line; next }
}
$0 ~ /^persona[ \t]*:[ \t]*\|[ \t]*$/ { inpersona=1; next }
' "$pf"
}
# fm_eff <file> <job-key> [default] [profile-key] -> effective value with
# precedence job > profile > built-in default (§6 resolution). The profile is the
# job's `profile:` frontmatter; `profile-key` defaults to `job-key` (e.g. `verify`
# inherits the profile's `default-verify`). Inheritable: verify, capabilities,
# engine-class, prefers-engine, allowed-scope, review-policy.
fm_eff() {
local file=$1 jkey=$2 def=${3:-} pkey=${4:-$2} v prof pv
v=$(fm_get "$file" "$jkey" "")
if [[ -n "$v" ]]; then printf '%s' "$v"; return; fi
prof=$(fm_get "$file" profile "")
if [[ -n "$prof" ]]; then
pv=$(profile_get "$prof" "$pkey" "")
[[ -n "$pv" ]] && { printf '%s' "$pv"; return; }
fi
printf '%s' "$def"
}
# _mtime <file> -> file modification time in epoch seconds (BSD or GNU stat); empty if missing
_mtime() {
[[ -e "$1" ]] || { echo ""; return; }
stat -f %m "$1" 2>/dev/null || stat -c %Y "$1" 2>/dev/null || echo ""
}
# _pidstart <pid> -> the process start time as reported by ps (whitespace-normalized).
# Used as an identity token so a recycled pid is never mistaken for our worker.
_pidstart() { ps -o lstart= -p "$1" 2>/dev/null | awk '{$1=$1;print}'; }
# _pid_alive <pid> <pidstart> -> 0 if the pid is live AND (when a start time was
# recorded) its current start time still matches — defeating pid reuse.
_pid_alive() {
local pid=$1 want=$2 cur
[[ -n "$pid" ]] || return 1
kill -0 "$pid" 2>/dev/null || return 1
[[ -z "$want" ]] && return 0
cur=$(_pidstart "$pid")
[[ "$cur" == "$want" ]]
}
# _dur_to_secs <dur> -> seconds. Accepts 90, 90s, 45m, 2h, 1d. Invalid/empty -> 0.
_dur_to_secs() {
local d=$1
[[ -z "$d" || "$d" == "0" ]] && { echo 0; return; }
if [[ "$d" =~ ^([0-9]+)([smhd]?)$ ]]; then
local n=${BASH_REMATCH[1]} u=${BASH_REMATCH[2]}
case "$u" in
""|s) echo "$n";;
m) echo $((n*60));;
h) echo $((n*3600));;
d) echo $((n*86400));;
esac
else
echo 0
fi
}
# _budget_wall_secs <file> -> the HARD wall-clock ceiling in seconds parsed from
# the `budget:` manifest map, e.g. `budget: { usd: 5, tokens: 2M, wall: 4h }`.
# `wall` is the always-enforceable hard ceiling (§5/§14); `usd`/`tokens` stay
# best-effort and are NOT enforced here. Unset/invalid -> 0 (no budget kill).
_budget_wall_secs() {
local raw w
raw=$(fm_get "$1" budget "")
w=$(printf '%s' "$raw" | grep -oE 'wall[[:space:]]*:[[:space:]]*[0-9]+[smhd]?' \
| grep -oE '[0-9]+[smhd]?' | head -1)
_dur_to_secs "${w:-0}"
}
# _effective_kill <timeout_secs> <budget_wall_secs> -> "<secs> <class>".
# The binding hard wall-clock ceiling for a single run. `budget.wall` extends
# `timeout`: when only one is set it binds; when both are set the smaller
# (earlier) ceiling fires, and <class> reflects which limit it was so the run is
# recorded as `timeout` vs `budget_exceeded`. 0/0 -> "0 timeout" (no kill armed).
_effective_kill() {
local tmo=${1:-0} bw=${2:-0}
if [[ "$bw" -gt 0 && ( "$tmo" -le 0 || "$bw" -lt "$tmo" ) ]]; then
echo "$bw budget_exceeded"
else
echo "$tmo timeout"
fi
}
# _meta_active <metafile> -> 0 if the job is occupying a concurrency slot.
# Active = no `ended=` AND (pid is live, OR pid not yet written but the meta was
# created moments ago — the reserved-slot window between meta-write and launch).
# The <30s guard prevents a meta orphaned mid-launch (daemon killed in the gap)
# from pinning a slot forever.
_meta_active() {
local f=$1 pid mt age
grep -q '^ended=' "$f" && return 1
pid=$(grep '^pid=' "$f" | head -1 | cut -d= -f2)
if [[ -n "$pid" ]]; then
local pidstart; pidstart=$(grep '^pidstart=' "$f" | head -1 | cut -d= -f2-)
_pid_alive "$pid" "$pidstart"
return $?
fi
mt=$(_mtime "$f"); age=$(( $(date +%s) - ${mt:-0} ))
[[ "$age" -lt 30 ]]
}
# active_workers -> count of jobs occupying a concurrency slot (reservation-aware).
active_workers() {
local n=0 f
for f in "$STATE"/*.meta; do
[[ -e "$f" ]] || continue
_meta_active "$f" && n=$((n+1))
done
echo "$n"
}
# busy_keys -> newline list of lock keys currently held by active workers.
busy_keys() {
local f
for f in "$STATE"/*.meta; do
[[ -e "$f" ]] || continue
_meta_active "$f" && grep '^lock=' "$f" | head -1 | cut -d= -f2-
done
}
# ── Manifest helpers (Phase 1 — §5/§6/§7/§8 single-host subset) ──────
#
# parse_list <raw> -> one token per line. Accepts a YAML-ish inline list
# ("[a, b]"), comma- or space-separated values, with surrounding quotes
# stripped. Empty tokens are dropped. Used for `capabilities`, `prefers`,
# `prefers-engine`, `deps`, etc.
parse_list() {
local raw cleaned t
cleaned=$(printf '%s' "${1:-}" | tr '[],' ' ')
for t in $cleaned; do
t=${t#[\"\']}; t=${t%[\"\']}
[[ -n "$t" ]] && printf '%s\n' "$t"
done
}
# _body_hash <file> -> stable content hash of the frontmatter-STRIPPED body.
# Drives idempotency-key dedupe on `add`. Prefers shasum/sha1sum, falls back
# to cksum so it works on a bare macOS/Linux host with no extra tools.
_body_hash() {
local f=$1
if command -v shasum >/dev/null 2>&1; then
strip_frontmatter "$f" | shasum | awk '{print $1}'
elif command -v sha1sum >/dev/null 2>&1; then
strip_frontmatter "$f" | sha1sum | awk '{print $1}'
else
strip_frontmatter "$f" | cksum | awk '{print $1"-"$2}'
fi
}
# priority_rank <priority> -> sort rank (lower = picked first). Unknown/empty
# defaults to medium. Used by inbox_sorted for priority-then-age selection.
priority_rank() {
case "$1" in
critical) echo 0;; high) echo 1;; medium) echo 2;; low) echo 3;; *) echo 2;;
esac
}
# inbox_sorted -> inbox/*.md ordered by priority (critical first) then oldest
# (filename embeds the queue timestamp, so a plain string sort = age order).
# Replaces the pure-FIFO `ls | sort`; per-lock serialization is unchanged
# (the caller still skips files whose lock key is busy).
inbox_sorted() {
local f p r
for f in "$INBOX"/*.md; do
[[ -e "$f" ]] || continue
p=$(fm_get "$f" priority medium); r=$(priority_rank "$p")
printf '%s\t%s\n' "$r" "$f"
done | sort -t"$(printf '\t')" -k1,1n -k2,2 | cut -f2-
}
# engine_available <engine> -> 0 if a runnable binary exists for the engine
# (executable path or a name resolvable on PATH). Honors the *_BIN overrides
# (so the self-test stub counts as "available").
engine_available() {
local eng=$1 bin=""
case "$eng" in
devin) bin=$DEVIN_BIN;;
claude) bin=$CLAUDE_BIN;;
codex) bin=$CODEX_BIN;;
copilot) bin=$COPILOT_BIN;;
*) return 1;;
esac
[[ -n "$bin" ]] || return 1
[[ -x "$bin" ]] && return 0
command -v "$bin" >/dev/null 2>&1
}
# engine_class_engines <class> -> candidate engines (priority order) for an
# engine-class (§5 taxonomy). `review-only` has no concrete mapping yet
# (reserved). Unknown class -> empty.
engine_class_engines() {
case "$1" in
agentic-coder) echo "devin claude codex";;
chat-coder) echo "copilot";;
review-only) echo "";;
*) echo "";;
esac
}
# detect_capabilities -> one capability token per line describing THIS host:
# os:<mac|linux|other> (os:any required-side wildcard matches this)
# engine:<devin|claude|codex|copilot> for each available engine
# node:<major> the host node major (compared by node<op>N)
# has:<git|pnpm|docker> tool probes (only emitted when present)
detect_capabilities() {
case "$(uname -s 2>/dev/null)" in
Darwin) echo "os:mac";;
Linux) echo "os:linux";;
*) echo "os:other";;
esac
local e
for e in devin claude codex copilot; do
engine_available "$e" && echo "engine:$e"
done
if command -v node >/dev/null 2>&1; then
local nv; nv=$(node -v 2>/dev/null | sed 's/^v//' | cut -d. -f1)
[[ "$nv" =~ ^[0-9]+$ ]] && echo "node:$nv"
fi
local t
for t in git pnpm docker; do
command -v "$t" >/dev/null 2>&1 && echo "has:$t"
done
}
# _cap_token_ok <required-token> <available-tokens> -> 0 if satisfied (§5 grammar):
# key:any wildcard — host advertises any "key:*" token (os:any matches all)
# key<op>ver numeric/semver-major compare against the host's "key:<val>" token
# (op in >= > = <= <)
# key:value exact token match
# key bare presence — exact, or advertised in any "key:"/"key<op>" form
_cap_token_ok() {
local tok=$1 avail=$2 a
if [[ "$tok" == *:any ]]; then
local wkey=${tok%:any}
for a in $avail; do [[ "$a" == "$wkey":* ]] && return 0; done
return 1
fi
if [[ "$tok" =~ ^([A-Za-z0-9_.:-]+)(\>=|\<=|=|\>|\<)([0-9][0-9.]*)$ ]]; then
local key=${BASH_REMATCH[1]} op=${BASH_REMATCH[2]} want=${BASH_REMATCH[3]} hostval=""
for a in $avail; do [[ "$a" == "$key":* ]] && { hostval=${a#*:}; break; }; done
[[ -n "$hostval" ]] || return 1
local hv=${hostval%%.*} wv=${want%%.*}
[[ "$hv" =~ ^[0-9]+$ && "$wv" =~ ^[0-9]+$ ]] || return 1
case "$op" in
">=") [[ "$hv" -ge "$wv" ]];;
">") [[ "$hv" -gt "$wv" ]];;
"=") [[ "$hv" -eq "$wv" ]];;
"<=") [[ "$hv" -le "$wv" ]];;
"<") [[ "$hv" -lt "$wv" ]];;
esac
return $?
fi
if [[ "$tok" == *:* ]]; then
for a in $avail; do [[ "$a" == "$tok" ]] && return 0; done
return 1
fi
for a in $avail; do
case "$a" in "$tok"|"$tok":*|"$tok"\>=*|"$tok"\>*|"$tok"=*|"$tok"\<=*|"$tok"\<*) return 0;; esac
done
return 1
}
# caps_match <required-tokens> <available-tokens> -> 0 iff EVERY required token
# is satisfied by the available set (both passed as whitespace-separated lists).
caps_match() {
local required=$1 available=$2 r
for r in $required; do
_cap_token_ok "$r" "$available" || return 1
done
return 0
}
# resolve_engine <file> -> the concrete engine to run:
# explicit `engine:` always wins; else `engine-class` picks the first
# available engine honoring `prefers-engine` then the class default order;
# else the global default engine. Prints "" only when an engine-class was
# requested but no engine in it is available (caller fails with no_engine).
resolve_engine() {
local f=$1 eng cls prefers
eng=$(fm_get "$f" engine "")
if [[ -n "$eng" ]]; then printf '%s' "$eng"; return 0; fi
cls=$(fm_eff "$f" engine-class "") # inherit engine-class from the job's profile
if [[ -z "$cls" ]]; then printf '%s' "$DEFAULT_ENGINE"; return 0; fi
local class_engines; class_engines=$(engine_class_engines "$cls")
prefers=$(fm_eff "$f" prefers-engine "")
local ordered=() seen=" " p c
if [[ -n "$prefers" ]]; then
while IFS= read -r p; do
[[ -n "$p" ]] || continue
case " $class_engines " in *" $p "*) ordered+=("$p"); seen+="$p ";; esac
done < <(parse_list "$prefers")
fi
for c in $class_engines; do
case "$seen" in *" $c "*) ;; *) ordered+=("$c"); seen+="$c ";; esac
done
local cand
for cand in ${ordered[@]+"${ordered[@]}"}; do
engine_available "$cand" && { printf '%s' "$cand"; return 0; }
done
printf '%s' ""
}
# ── deps / DAG, single host (§5) ─────────────────────────────────────
# deps reference other jobs by their (author-controlled) `idempotency-key`.
#
# _key_in_dir <key> <dir> -> 0 if some .md in <dir> has idempotency-key == key.
_key_in_dir() {
local key=$1 d=$2 ef
for ef in "$d"/*.md; do
[[ -e "$ef" ]] || continue
[[ "$(fm_get "$ef" idempotency-key "")" == "$key" ]] && return 0
done
return 1
}
# dep_satisfied <key> <mode> -> 0 when the dep is met: a job with <key> is in
# shipped/ (default), or shipped/ OR testing/ when mode is `soft`.
dep_satisfied() {
local key=$1 mode=$2
_key_in_dir "$key" "$SHIPPED" && return 0
[[ "$mode" == soft ]] && _key_in_dir "$key" "$TESTING" && return 0
return 1
}
# deps_unmet <file> -> space-separated list of this job's UNMET dep keys (empty if
# none / no deps). `deps-mode` (hard|soft) is job-level.
deps_unmet() {
local f=$1 keys mode k unmet=""
keys=$(parse_list "$(fm_get "$f" deps "")" | tr '\n' ' ')
[[ -n "${keys// /}" ]] || { printf ''; return 0; }
mode=$(fm_get "$f" deps-mode "hard")
for k in $keys; do
[[ -n "$k" ]] || continue
dep_satisfied "$k" "$mode" || unmet+="$k "
done
printf '%s' "${unmet% }"
}
# _deps_of_key <key> -> dep keys (space-separated) of the job carrying <key>,
# scanned across inbox + active stages.
_deps_of_key() {
local key=$1 d ef
for d in "$INBOX" "$BUILDING" "$REVIEW" "$TESTING" "$SHIPPED"; do
for ef in "$d"/*.md; do
[[ -e "$ef" ]] || continue
[[ "$(fm_get "$ef" idempotency-key "")" == "$key" ]] || continue
parse_list "$(fm_get "$ef" deps "")" | tr '\n' ' '
return 0
done
done
}
# deps_would_cycle <new-key> <new-deps-space> -> 0 if adding a job with <new-key>
# depending on <new-deps> would create a cycle (BFS over existing key->deps edges
# back to new-key; also catches self-dependency).
deps_would_cycle() {
local newkey=$1 newdeps=$2 visited=" " frontier next k d kd
[[ -n "$newkey" ]] || return 1
_in_list "$newkey" "$newdeps" && return 0
frontier="$newdeps"
while [[ -n "${frontier// /}" ]]; do
next=""
for k in $frontier; do
[[ -n "$k" ]] || continue
[[ "$k" == "$newkey" ]] && return 0
case "$visited" in *" $k "*) continue;; esac
visited+="$k "
kd=$(_deps_of_key "$k")
for d in $kd; do next+="$d "; done
done
frontier="$next"
done
return 1
}
# _drain_pending -> 0 if some inbox job can still make progress on its own: it is
# runnable now, or it is waiting on a retry/recovery backoff (which elapses with
# time). A job blocked ONLY by unmet deps is NOT pending while the loop is idle
# (no running job can satisfy its deps), so `--once` can drain past it.
_drain_pending() {
local cand cj ne now; now=$(date +%s)
for cand in "$INBOX"/*.md; do
[[ -e "$cand" ]] || continue
cj=$(basename "$cand"); cj=${cj%.md}
ne=$(grep '^next_eligible=' "$STATE/$cj.meta" 2>/dev/null | tail -1 | cut -d= -f2)
if [[ "$ne" =~ ^[0-9]+$ ]] && [[ "$ne" -gt "$now" ]]; then return 0; fi
[[ -n "$(deps_unmet "$cand")" ]] && continue
return 0
done
return 1
}
# ── allowed-scope guardrail (§6/§12) — WARN-ONLY this phase ───────────
# path_in_scope <path> <globs-space> -> 0 if <path> matches any allowed-scope glob
# (`dir/**` matches the whole subtree; `*` matches across `/`). Pure + testable.
path_in_scope() {
local path=$1 globs=$2 g pat
for g in $globs; do
[[ -n "$g" ]] || continue
pat=${g//\*\*/\*}
# shellcheck disable=SC2053
[[ "$path" == $pat ]] && return 0
[[ "$path" == "$g"/* ]] && return 0
done
return 1
}
# scope_check <cwd> <base> <scope> <logf> <metaf> -> log a WARNING (non-blocking)
# for changed paths outside allowed-scope. Records scope_warning= in the meta.
scope_check() {
local cwd=$1 base=$2 scope=$3 logf=$4 metaf=$5 changed globs p out=""
_is_git_repo "$cwd" || return 0
if [[ -n "$base" ]]; then
changed=$(git -C "$cwd" diff --name-only "$base" HEAD 2>/dev/null)
else
changed=$(git -C "$cwd" diff --name-only HEAD 2>/dev/null)
fi
[[ -n "$changed" ]] || return 0
globs=$(parse_list "$scope" | tr '\n' ' ')
[[ -n "${globs// /}" ]] || return 0
while IFS= read -r p; do
[[ -n "$p" ]] || continue
path_in_scope "$p" "$globs" || out+="$p "
done <<< "$changed"
if [[ -n "$out" ]]; then
echo "WARNING: allowed-scope violation (warn-only) — changed outside [$globs]: ${out% }" >> "$logf"
echo "scope_warning=${out% }" >> "$metaf"
fi
}
# ── Engine driver: builds argv into AGENT_CMD[]; sets AGENT_STDIN if the ──
# prompt should be fed on stdin (claude/codex) rather than a flag. $pf is the
# frontmatter-STRIPPED body file, so a body starting with '--' is never
# misparsed as a CLI option.
build_agent_cmd() {
local engine=$1 pf=$2 yolo=$3 exportpath=${4:-}
AGENT_CMD=(); AGENT_STDIN=""
case "$engine" in
devin)
AGENT_CMD=( "$DEVIN_BIN" -p --prompt-file "$pf" )
[[ "$yolo" == "true" ]] && AGENT_CMD+=( --permission-mode dangerous )
# Export the conversation (ATIF) so parse_usage can read per-step token
# metrics — Devin does not expose usage in its stdout/local log otherwise.
[[ -n "$exportpath" ]] && AGENT_CMD+=( --export "$exportpath" )
;;
claude)
AGENT_CMD=( "$CLAUDE_BIN" -p )
[[ "$yolo" == "true" ]] && AGENT_CMD+=( --dangerously-skip-permissions )
AGENT_STDIN="$pf"
;;
codex)
AGENT_CMD=( "$CODEX_BIN" exec )
[[ "$yolo" == "true" ]] && AGENT_CMD+=( --dangerously-bypass-approvals-and-sandbox )
AGENT_STDIN="$pf"
;;
copilot)
# Best-effort GitHub Copilot CLI mapping for the chat-coder engine-class.
# Flags drift between CLI versions — this is the single place to edit.
AGENT_CMD=( "$COPILOT_BIN" -p )
[[ "$yolo" == "true" ]] && AGENT_CMD+=( --allow-all-tools )
AGENT_STDIN="$pf"
;;
*) die "unknown engine '$engine' (use: devin | claude | codex | copilot)";;
esac
}
# ── Fleet PR mode (§PR) ──────────────────────────────────────────────
# Prepare an isolated checkout of <repo> on branch aq/job/<jid> off <base> and
# echo its path (used as the agent cwd). Clones into a cache (default
# $STATE/repos/<safe>), then fetch + hard-reset to origin/<base>. Returns 1 on
# failure (caller falls back to the static cwd). <repo> may be owner/name or a
# clone URL (ssh/https).
_fleet_pr_prepare() {
local repo=$1 base=${2:-main} jid=$3 logf=$4
local reposdir="${AQ_FLEET_REPOS_DIR:-$STATE/repos}"; mkdir -p "$reposdir"
local safe dir url br src="" origin_url=""
safe=$(printf '%s' "$repo" | tr -c 'A-Za-z0-9._-' '_'); dir="$reposdir/$safe"
# MVP: if AQ_FLEET_REPO_BASE/<repo> is an existing local repo, use it as the clone
# SOURCE (fast, no network) and push/PR to its GitHub origin. Credentials embedded
# in the local origin URL are stripped — gh's credential helper handles auth.
if [[ -n "${AQ_FLEET_REPO_BASE:-}" && -d "$AQ_FLEET_REPO_BASE/$repo/.git" ]]; then
src="$AQ_FLEET_REPO_BASE/$repo"
origin_url=$(git -C "$src" remote get-url origin 2>/dev/null | sed -E 's#://[^@/]*@#://#')
fi
# Resolve a clone URL: local source wins; else full URLs pass through; absolute/
# existing local paths used as-is; otherwise `owner/name` -> GitHub HTTPS.
if [[ -n "$src" ]]; then url="$src"; else
case "$repo" in
*://*|git@*) url="$repo";;
/*) url="$repo";;
*) [[ -e "$repo" ]] && url="$repo" || url="https://github.com/$repo.git";;
esac
fi
if [[ -d "$dir/.git" ]]; then
[[ -n "$origin_url" ]] && git -C "$dir" remote set-url origin "$origin_url" >>"$logf" 2>&1 || true
git -C "$dir" fetch --quiet origin "$base" >>"$logf" 2>&1 || { echo "PR: fetch failed for $base" >>"$logf"; return 1; }
else
git clone --quiet "$url" "$dir" >>"$logf" 2>&1 || { echo "PR: clone failed for $url" >>"$logf"; return 1; }
# Repoint to the upstream GitHub origin (when cloned from a local mirror).
[[ -n "$origin_url" ]] && git -C "$dir" remote set-url origin "$origin_url" >>"$logf" 2>&1 || true
git -C "$dir" fetch --quiet origin "$base" >>"$logf" 2>&1 || true
fi
br="aq/job/$jid"
git -C "$dir" checkout --quiet -B "$br" "origin/$base" >>"$logf" 2>&1 \
|| git -C "$dir" checkout --quiet -B "$br" "$base" >>"$logf" 2>&1 \
|| { echo "PR: cannot branch $br off $base" >>"$logf"; return 1; }
# Ensure a commit identity exists (fall back to a bot identity if the host has
# no global git user configured) so the PR commit never fails.
git -C "$dir" config user.email >/dev/null 2>&1 || git -C "$dir" config user.email "agent-queue@fleet.local"
git -C "$dir" config user.name >/dev/null 2>&1 || git -C "$dir" config user.name "agent-queue"
echo "$dir"
}
# Commit the agent's work in <dir>, push branch aq/job/<jid>, open a PR against
# <base>, and echo the PR URL. Returns 1 with no output if there is nothing to
# commit or any git/gh step fails (caller leaves pr_url empty).
_fleet_pr_open() {
local dir=$1 base=${2:-main} jid=$3 title=$4 logf=$5
local br="aq/job/$jid"
git -C "$dir" add -A >>"$logf" 2>&1
if git -C "$dir" diff --cached --quiet 2>/dev/null; then
echo "PR: no changes to commit — skipping PR" >>"$logf"; return 1
fi
git -C "$dir" commit --quiet -m "$title" >>"$logf" 2>&1 || { echo "PR: commit failed" >>"$logf"; return 1; }
git -C "$dir" push --quiet -u origin "$br" >>"$logf" 2>&1 || { echo "PR: push failed" >>"$logf"; return 1; }
local url
url=$( cd "$dir" && "$GH_BIN" pr create --base "$base" --head "$br" \
--title "$title" --body "Automated by agent-queue fleet (job $jid)." 2>>"$logf" ) \
|| { echo "PR: gh pr create failed" >>"$logf"; return 1; }
printf '%s' "$url" | tr -d '\n'
}
# ── Worker: runs one job to completion (invoked in background) ───────
run_worker() {
local doing_file=$1
local job; job=$(basename "$doing_file")
job=${job%.md}
local engine cwd yolo logf metaf
cwd=$(fm_get "$doing_file" cwd "$PWD")
yolo=$(fm_get "$doing_file" yolo "true")
logf="$LOGS/$job.log"
metaf="$STATE/$job.meta"
# NOTE: the parent (cmd_run) creates $metaf with job/engine/cwd/started/pid.
# The worker only ever APPENDS (ended/exit/result) to avoid a truncation race.
# ── Capability gate (§5/§8 single-host): if the job declares `capabilities`
# (own or inherited from its profile) this host does not satisfy, route to
# failed/ WITHOUT launching the agent. ──
local req_caps; req_caps=$(parse_list "$(fm_eff "$doing_file" capabilities "")" | tr '\n' ' ')
if [[ -n "${req_caps// /}" ]]; then
local avail; avail=$(detect_capabilities)
if ! caps_match "$req_caps" "$avail"; then
{
echo "===== agent-queue job: $job ====="
echo "CAPABILITY MISMATCH — host cannot satisfy required: $req_caps"
echo "host advertises: $(printf '%s' "$avail" | tr '\n' ' ')"
echo "agent NOT launched — routed to failed/ ($(date))"
} >> "$logf"
mv "$doing_file" "$FAILED/" 2>/dev/null
{ echo "result=capability_mismatch"; echo "ended=$(date +%s)"; } >> "$metaf"
_auto_echo "$job"
return 0
fi
fi
# ── Engine resolution (§5): explicit `engine` wins, else `engine-class`.
# No available engine for a requested class -> fail (no_engine), no launch. ──
engine=$(resolve_engine "$doing_file")
if [[ -z "$engine" ]]; then
{
echo "===== agent-queue job: $job ====="
echo "NO ENGINE — engine-class '$(fm_get "$doing_file" engine-class "")' has no available engine on this host"
echo "agent NOT launched — routed to failed/ ($(date))"
} >> "$logf"
mv "$doing_file" "$FAILED/" 2>/dev/null
{ echo "result=no_engine"; echo "ended=$(date +%s)"; } >> "$metaf"
_auto_echo "$job"
return 0
fi
{
echo "===== agent-queue job: $job ====="
echo "engine=$engine cwd=$cwd yolo=$yolo"
echo "started: $(date)"
echo "================================="
} >> "$logf"
if [[ ! -d "$cwd" ]]; then
echo "FATAL: cwd does not exist: $cwd" >> "$logf"
mv "$doing_file" "$FAILED/" 2>/dev/null
echo "result=failed" >> "$metaf"; echo "ended=$(date +%s)" >> "$metaf"
_auto_echo "$job"
return 1
fi
local started; started=$(grep '^started=' "$metaf" 2>/dev/null | tail -1 | cut -d= -f2)
# Strip our frontmatter so the agent only sees the task body.
local bodyf="$STATE/$job.body.md"
strip_frontmatter "$doing_file" > "$bodyf"
# ── Persona injection (§6): prepend the profile's persona to the body fed to
# the engine (job body unchanged on disk). Secrets are never logged. ──
local prof; prof=$(fm_get "$doing_file" profile "")
if [[ -n "$prof" ]]; then
local persona; persona=$(profile_persona "$prof")
if [[ -n "$persona" ]]; then
{ printf '%s\n\n' "$persona"; cat "$bodyf"; } > "$bodyf.tmp" && mv "$bodyf.tmp" "$bodyf"
echo "profile: injected persona overlay from '$prof'" >> "$logf"
fi
fi
# Devin conversation export path (derived from the log path so parse_usage can
# find it). Harmless for other engines (they ignore the 4th arg).
local devin_export="${logf%.log}.devin-export.json"
build_agent_cmd "$engine" "$bodyf" "$yolo" "$devin_export"
# ── Fleet PR mode (§PR): if enabled and this fleet job targets a repo, run the
# agent in an isolated checkout on branch aq/job/<id> instead of the static
# cwd. The PR is opened after a passing verify. WIP checkpointing is skipped
# for PR jobs — the pushed PR branch is the durable artifact. ──
local pr_dir="" pr_base="" pr_repo="" pr_jid=""
if [[ "${AQ_FLEET_PR:-0}" == 1 ]] && fleet_enabled && _fleet_is_job "$job"; then
pr_repo=$(fm_get "$doing_file" fleet-repo "")
pr_base=$(fm_get "$doing_file" fleet-base-branch "main")
# Branch off the stable fleet job id (not the transient local job name).
pr_jid=$(fm_get "$doing_file" fleet-job-id "$job")
if [[ -n "$pr_repo" ]]; then
local _prep; _prep=$(_fleet_pr_prepare "$pr_repo" "$pr_base" "$pr_jid" "$logf")
if [[ -n "$_prep" && -d "$_prep" ]]; then
cwd="$_prep"; pr_dir="$_prep"
echo "PR mode: agent cwd=$cwd on branch aq/job/$pr_jid (base $pr_base, repo $pr_repo)" >> "$logf"
else
echo "PR mode: prepare failed for $pr_repo — running in $cwd, no PR" >> "$logf"
fi
fi
fi
# ── WIP checkpoint setup (§25.2): on a git cwd, create/checkout aq/wip/<job>
# so partial work survives a crash; a trap guarantees a checkpoint on EVERY
# exit path (success, failure, timeout, SIGTERM/SIGINT). Non-git cwd: no-op.
# Skipped in PR mode (the aq/job/<id> PR branch is the durable artifact). ──
WIP_ACTIVE=0; WIP_BASE=""; WIP_DONE=0
_worker_trap() {
[[ "$WIP_DONE" == 1 ]] && return
WIP_DONE=1
[[ "$WIP_ACTIVE" == 1 ]] && _wip_checkpoint "$job" "$cwd" "$metaf" "$logf" "trap-exit"
}
trap '_worker_trap' EXIT
trap '_worker_trap; exit 143' INT TERM
if [[ -z "$pr_dir" ]]; then _wip_start "$job" "$cwd" "$metaf" "$logf" || true; fi
# ── Fleet (§7/§18): report `building` (with WIP checkpoint) to the coordinator.
# If the lease is stale (we were reclaimed) the report is FENCED -> self-abort and
# quarantine WITHOUT running the agent. No-op for non-fleet jobs / flag off. ──
if fleet_enabled && _fleet_is_job "$job"; then
fleet_report "$job" building checkpoint; local _frc=$?
if [[ "$_frc" -eq 2 ]]; then
fleet_quarantine "$job" "$doing_file" "$metaf" "$logf"
return 0
fi
fi
_run_agent() {
if [[ -n "$AGENT_STDIN" ]]; then
( cd "$cwd" && "${AGENT_CMD[@]}" < "$AGENT_STDIN" )
else
( cd "$cwd" && "${AGENT_CMD[@]}" )
fi
}
local rc=0 lockkey tmo timed_out=false
lockkey=$(lock_key_for "$doing_file")
tmo=$(_dur_to_secs "$(fm_get "$doing_file" timeout "0")")
# budget.wall is a HARD wall-clock ceiling that extends `timeout` (§5/§14):
# whichever ceiling fires first binds, and `kill_class` records which one so
# the run is labeled `timeout` vs `budget_exceeded`.
local bwall eff kill_class _ek
bwall=$(_budget_wall_secs "$doing_file")
_ek=$(_effective_kill "$tmo" "$bwall"); eff="${_ek%% *}"; kill_class="${_ek##* }"
local tmo_flag="$STATE/$job.timedout"; rm -f "$tmo_flag"
local lf="$LOCKS/$(_keyhash "$lockkey").lock"
if [[ "$eff" -gt 0 && -n "$TIMEOUT_BIN" ]]; then
# Hard ceiling via timeout/gtimeout (kills the whole process tree).
AQ_STDIN="$AGENT_STDIN" "$TIMEOUT_BIN" -k 5 "${eff}s" bash -c '
cd "$1" || exit 97; shift
if [ -n "${AQ_STDIN:-}" ]; then exec "$@" < "$AQ_STDIN"; else exec "$@"; fi
' _ "$cwd" "${AGENT_CMD[@]}" >> "$logf" 2>&1
rc=$?
[[ $rc -eq 124 ]] && timed_out=true
elif [[ "$eff" -gt 0 ]]; then
# Portable watchdog fallback (no timeout binary). Flags the kill and
# signals the worker; install coreutils (gtimeout) for hard tree kills.
_run_agent >> "$logf" 2>&1 &
local apid=$!
( sleep "$eff"; : > "$tmo_flag"
pkill -TERM -P "$apid" 2>/dev/null; kill -TERM "$apid" 2>/dev/null
sleep 5; pkill -KILL -P "$apid" 2>/dev/null; kill -KILL "$apid" 2>/dev/null ) &
local wpid=$!
wait "$apid" 2>/dev/null; rc=$?
kill "$wpid" 2>/dev/null; wait "$wpid" 2>/dev/null
[[ -f "$tmo_flag" ]] && timed_out=true
elif [[ -n "$FLOCK_BIN" ]]; then
# Cross-process hardening where flock exists (Linux CI). The single run-loop
# already serializes by lock key; this guards against a stray second launcher.
( "$FLOCK_BIN" -n 9 || exit 75; _run_agent ) 9>"$lf" >> "$logf" 2>&1
rc=$?
if [[ $rc -eq 75 ]]; then
echo "lock busy (key=$lockkey) — requeued to inbox" >> "$logf"
mv "$doing_file" "$INBOX/" 2>/dev/null
{ echo "ended=$(date +%s)"; echo "result=requeued"; } >> "$metaf"
return 0
fi
else
_run_agent >> "$logf" 2>&1
rc=$?
fi
rm -f "$tmo_flag"
# ── Preserve work + capture run metrics on EVERY path (§25.2/§26.1/§26.2) ──
if [[ "$WIP_ACTIVE" == 1 ]]; then
_wip_checkpoint "$job" "$cwd" "$metaf" "$logf" "agent-exit"; WIP_DONE=1
fi
_numstat_into_meta "$cwd" "$WIP_BASE" "$metaf"
parse_usage "$engine" "$logf" >> "$metaf"
# ── allowed-scope guardrail (§6) — WARN-ONLY: flag out-of-scope changes but
# never block the job this phase. Scope may be inherited from the profile. ──
local scope; scope=$(fm_eff "$doing_file" allowed-scope "" allowed-scope)
[[ -n "$scope" ]] && scope_check "$cwd" "$WIP_BASE" "$scope" "$logf" "$metaf"
if $timed_out; then
echo "KILLED after ${eff}s (rc=$rc, limit=$kill_class): $(date)" >> "$logf"
_finish_failure "$job" "$doing_file" "$metaf" "$logf" "$kill_class" "$rc" "$started"
elif [[ $rc -eq 0 ]]; then
# Fleet (§18): re-confirm the lease before accepting the agent's output. If the
# coordinator reclaimed us mid-run (offline-degrade then reconnect to a stale
# epoch), the report is FENCED -> quarantine the local result, NEVER ship.
if fleet_enabled && _fleet_is_job "$job"; then
fleet_report "$job" review checkpoint; local _rrc=$?
if [[ "$_rrc" -eq 2 ]]; then
fleet_quarantine "$job" "$doing_file" "$metaf" "$logf"
return 0
fi
# Report cost/token/effort metrics (parse_usage wrote them to the meta just
# above) + result onto the coordinator's run, and release the held lease.
fleet_report_insights "$job" review
fi
# Agent succeeded: land in review/, then run the auto-QA verify gate. The
# worker is still alive here so the concurrency slot stays held through
# verification — `ended=` is written only once we reach a resting stage.
mv "$doing_file" "$REVIEW/" 2>/dev/null
local review_file="$REVIEW/$job.md"
echo "exit=$rc" >> "$metaf"
echo "completed OK (rc=0): landed in review — $(date)" >> "$logf"
# verify is job-level, else inherited from the profile's default-verify.
local verify; verify=$(fm_eff "$review_file" verify "$DEFAULT_VERIFY" default-verify)
if [[ -z "$verify" ]]; then
_meta_end "$metaf" "review" "$started"
echo "no verify command — parked in review for manual promote: $(date)" >> "$logf"
else
echo "----- verify: $verify -----" >> "$logf"
local vrc=0
( cd "$cwd" && bash -c "$verify" ) >> "$logf" 2>&1 || vrc=$?
echo "verify_exit=$vrc" >> "$metaf"
if [[ $vrc -eq 0 ]]; then
mv "$review_file" "$TESTING/" 2>/dev/null
_meta_end "$metaf" "testing" "$started"
echo "VERIFY PASSED — promoted to testing (QA): $(date)" >> "$logf"
# PR mode (§PR): work passed verify — commit/push the job branch, open a PR,
# record the URL in the meta, and push it onto the coordinator run.
if [[ -n "$pr_dir" ]]; then
local _prtitle _prurl
_prtitle=$(head -1 "$bodyf" 2>/dev/null | sed 's/^#* *//' | cut -c1-72)
[[ -n "$_prtitle" ]] || _prtitle="agent-queue job $job"
_prurl=$(_fleet_pr_open "$pr_dir" "$pr_base" "$pr_jid" "$_prtitle" "$logf") || _prurl=""
if [[ -n "$_prurl" ]]; then
{ echo "pr_url=$_prurl"; echo "pr_branch=aq/job/$pr_jid"; } >> "$metaf"
echo "PR opened: $_prurl" >> "$logf"
if fleet_enabled && _fleet_is_job "$job"; then fleet_report_insights "$job" testing; fi
fi
fi
# Fleet (§14): mirror local QA to the coordinator. Always report `testing`
# so the coordinator stage reflects that local verify passed. When AUTOSHIP
# is enabled, the factory's verify gate IS the test phase — advance
# testing -> shipped (closing the testing->shipped gap autonomously). Default
# off leaves the job resting at `testing` for the human review gate / ship.
if fleet_enabled && _fleet_is_job "$job"; then
fleet_report "$job" testing
if [[ "${AQ_FLEET_AUTOSHIP:-0}" == 1 ]] && fleet_report "$job" shipped; then
mv "$TESTING/$job.md" "$SHIPPED/" 2>/dev/null
_meta_end "$metaf" "shipped" "$started"
echo "FLEET AUTOSHIP — testing -> shipped: $(date)" >> "$logf"
fi
fi
else
echo "VERIFY FAILED (rc=$vrc): $(date)" >> "$logf"
# verify ran on the review_file; retry policy may requeue it.
_finish_failure "$job" "$review_file" "$metaf" "$logf" "verify_failed" "$rc" "$started"
fi
fi
else
echo "FAILED (rc=$rc): $(date)" >> "$logf"
_finish_failure "$job" "$doing_file" "$metaf" "$logf" "crash" "$rc" "$started"
fi
# Opt-in echo of the resting/terminal outcome back to the tracker (§10).
_auto_echo "$job"
}
# ── Resilience & insights helpers (Phase 1 — single-host §25/§26) ────
#
# _in_list <item> <space-separated-list> -> 0 if item is present.
_in_list() { case " ${2:-} " in *" $1 "*) return 0;; esac; return 1; }
# _meta_end <metafile> <result> <started-epoch> -> append result/ended/duration
# (append-only; never truncates a live meta — §25 state integrity).
_meta_end() {
local mf=$1 res=$2 now; now=$(date +%s); local st=${3:-$now}
[[ "$st" =~ ^[0-9]+$ ]] || st=$now
{ echo "result=$res"; echo "ended=$now"; echo "duration_s=$(( now - st ))"; } >> "$mf"
}
# ── git WIP checkpointing (§25.2) ──
_is_git_repo() { git -C "$1" rev-parse --is-inside-work-tree >/dev/null 2>&1; }
_wip_branch() { printf 'aq/wip/%s' "$1"; }
# _wip_start <job> <cwd> <metaf> <logf> -> ensure/checkout the WIP branch.
# Sets globals WIP_ACTIVE (1 when a git WIP branch is in play) and WIP_BASE.
# RESUME: if aq/wip/<job> already exists (orphan/retry relaunch), check it out so
# the agent continues from the checkpoint instead of from zero. The base commit is
# persisted in a write-once sidecar so it survives meta re-creation across attempts.
_wip_start() {
local job=$1 cwd=$2 metaf=$3 logf=$4
WIP_ACTIVE=0; WIP_BASE=""
if ! _is_git_repo "$cwd"; then
echo "wip: cwd not a git repo — skipping checkpoint" >> "$logf"
return 1
fi
local br basefile base; br=$(_wip_branch "$job"); basefile="$STATE/$job.wipbase"
if git -C "$cwd" show-ref --verify --quiet "refs/heads/$br"; then
git -C "$cwd" checkout "$br" >/dev/null 2>&1 \
|| echo "wip: could not checkout $br (resume) — staying on current branch" >> "$logf"
echo "wip: resuming on $br ($(date))" >> "$logf"
else
base=$(git -C "$cwd" rev-parse --short HEAD 2>/dev/null || echo "")
if ! git -C "$cwd" checkout -b "$br" >/dev/null 2>&1; then
echo "wip: could not create $br — skipping checkpoint" >> "$logf"
return 1
fi
[[ -n "$base" ]] && printf '%s\n' "$base" > "$basefile"
echo "wip: created $br from ${base:-<root>} ($(date))" >> "$logf"
fi
base=$(cat "$basefile" 2>/dev/null || echo "")
{ echo "wip_branch=$br"; echo "wip_base=$base"; } >> "$metaf"
WIP_ACTIVE=1; WIP_BASE="$base"
return 0
}
# _wip_checkpoint <job> <cwd> <metaf> <logf> <stage> -> commit any changes in cwd
# onto aq/wip/<job>. Idempotent (no commit when the tree is clean). NEVER commits
# to a non-WIP branch — protects main/protected branches (§12).
_wip_checkpoint() {
local job=$1 cwd=$2 metaf=$3 logf=$4 stage=$5
_is_git_repo "$cwd" || return 0
local br cur; br=$(_wip_branch "$job")
cur=$(git -C "$cwd" symbolic-ref --short HEAD 2>/dev/null || echo "")
if [[ "$cur" != "$br" ]]; then
git -C "$cwd" checkout "$br" >/dev/null 2>&1 \
|| { echo "wip: not on $br — skipping checkpoint to protect '$cur'" >> "$logf"; return 0; }
fi
git -C "$cwd" add -A >/dev/null 2>&1
if git -C "$cwd" diff --cached --quiet 2>/dev/null; then
return 0 # nothing to preserve
fi
git -C "$cwd" -c user.email=agent-queue@local -c user.name=agent-queue \
commit -q -m "aq wip: $job ($stage)" >/dev/null 2>&1
local c; c=$(git -C "$cwd" rev-parse --short HEAD 2>/dev/null || echo "")
[[ -n "$c" ]] && echo "wip_commit=$c" >> "$metaf"
echo "wip: checkpoint ($stage) -> ${c:-?} ($(date))" >> "$logf"
}
# _numstat_into_meta <cwd> <base> <metaf> -> record files_changed/lines_added/
# lines_deleted for the run (base..HEAD on the WIP branch; binary files count 0).
_numstat_into_meta() {
local cwd=$1 base=$2 metaf=$3 out stats
_is_git_repo "$cwd" || return 0
if [[ -n "$base" ]]; then
out=$(git -C "$cwd" diff --numstat "$base" HEAD 2>/dev/null)
else
out=$(git -C "$cwd" diff --numstat HEAD 2>/dev/null)
fi
[[ -n "$out" ]] || return 0
stats=$(printf '%s\n' "$out" | awk '
{ if ($1 ~ /^[0-9]+$/) a+=$1; if ($2 ~ /^[0-9]+$/) d+=$2; f++ }
END { printf "%d %d %d", f+0, a+0, d+0 }')
# shellcheck disable=SC2086
set -- $stats
{ echo "files_changed=$1"; echo "lines_added=$2"; echo "lines_deleted=$3"; } >> "$metaf"
}
# ── retry policy (§5/§11/§25.3) ──
# Parse `retry: { max: N, backoff: 5m, on: [classes] }`. Absent/empty -> no retry.
_retry_max() { local m; m=$(printf '%s' "${1:-}" | grep -oE 'max:[[:space:]]*[0-9]+' | grep -oE '[0-9]+' | head -1); echo "${m:-0}"; }
_retry_backoff_s() { local b; b=$(printf '%s' "${1:-}" | grep -oE 'backoff:[[:space:]]*[0-9]+[smhd]?' | sed -E 's/^backoff:[[:space:]]*//' | head -1); _dur_to_secs "${b:-0}"; }
_retry_on() {
local raw=${1:-} inside
inside=$(printf '%s' "$raw" | sed -nE 's/.*on:[[:space:]]*\[([^]]*)\].*/\1/p')
[[ -n "$inside" ]] || inside=$(printf '%s' "$raw" | sed -nE 's/.*on:[[:space:]]*([A-Za-z_,[:space:]-]+).*/\1/p')
parse_list "$inside" | tr '\n' ' '
}
# _class_retryable <class> <on-list> -> 0 if this failure class should retry.
# `crash` and `agent_error` are synonyms for a non-zero agent exit.
_class_retryable() {
local class=$1 on=$2
case "$class" in
crash) _in_list crash "$on" || _in_list agent_error "$on";;
*) _in_list "$class" "$on";;
esac
}
# _finish_failure <job> <file> <metaf> <logf> <class> <rc> <started> -> apply the
# retry policy to a failed run. Retries (requeue to inbox with backoff via
# next_eligible) while the class is in `retry.on` and attempts <= max; on
# exhaustion -> failed/ result=retries_exhausted; with no policy -> failed/ with
# the natural result (failed|timeout|budget_exceeded|verify_failed). The WIP branch is preserved
# either way so a retry resumes from the checkpoint.
_finish_failure() {
local job=$1 file=$2 metaf=$3 logf=$4 class=$5 rc=$6 started=$7
local raw max on attempts; raw=$(fm_get "$file" retry "")
attempts=$(grep '^attempts=' "$metaf" 2>/dev/null | tail -1 | cut -d= -f2); attempts=${attempts:-1}
max=$(_retry_max "$raw"); on=$(_retry_on "$raw")
if [[ "$max" -gt 0 ]] && _class_retryable "$class" "$on" && [[ "$attempts" -le "$max" ]]; then
local backoff now next na; backoff=$(_retry_backoff_s "$raw")
now=$(date +%s); next=$(( now + backoff )); na=$(( attempts + 1 ))
mv "$file" "$INBOX/" 2>/dev/null
{
echo "exit=$rc"; echo "attempts=$na"; echo "next_eligible=$next"
echo "retry_class=$class"; echo "result=retry_scheduled"
echo "ended=$now"; echo "duration_s=$(( now - ${started:-now} ))"
} >> "$metaf"
echo "RETRY scheduled: class=$class, attempt $attempts/$max, backoff ${backoff}s -> inbox ($(date))" >> "$logf"
return 0
fi
local result
if [[ "$max" -gt 0 ]] && _class_retryable "$class" "$on"; then
result="retries_exhausted"
echo "RETRIES EXHAUSTED after $attempts attempt(s) (class=$class) -> failed/ ($(date))" >> "$logf"
else
case "$class" in
timeout) result="timeout";;
budget_exceeded) result="budget_exceeded";;
verify_failed) result="verify_failed";;
*) result="failed";;
esac
fi
echo "exit=$rc" >> "$metaf"
mv "$file" "$FAILED/" 2>/dev/null
_meta_end "$metaf" "$result" "$started"
}
# ── orphan recovery (§25.3) ──
# recover_orphans -> move building/ jobs whose worker is dead back to inbox/ for
# re-selection (resume-aware via the WIP branch), incrementing attempts. Idempotent:
# once moved out of building/ a job is never recovered twice. A retry-capped job
# that has exhausted crash retries goes to failed/ result=retries_exhausted instead
# of looping forever; otherwise recovery never strands work.
recover_orphans() {
local f job metaf pid pidstart
for f in "$BUILDING"/*.md; do
[[ -e "$f" ]] || continue
job=$(basename "$f"); job=${job%.md}; metaf="$STATE/$job.meta"
if [[ -f "$metaf" ]] && ! grep -q '^ended=' "$metaf"; then
pid=$(grep '^pid=' "$metaf" 2>/dev/null | tail -1 | cut -d= -f2)
pidstart=$(grep '^pidstart=' "$metaf" 2>/dev/null | tail -1 | cut -d= -f2-)
_pid_alive "$pid" "$pidstart" && continue # a live worker still owns it
fi
local prev na raw max now; prev=$(grep '^attempts=' "$metaf" 2>/dev/null | tail -1 | cut -d= -f2); prev=${prev:-1}
raw=$(fm_get "$f" retry ""); max=$(_retry_max "$raw"); now=$(date +%s)
if [[ "$max" -gt 0 ]] && _class_retryable crash "$(_retry_on "$raw")" && [[ "$prev" -gt "$max" ]]; then
mv "$f" "$FAILED/" 2>/dev/null
{ echo "attempts=$prev"; echo "recovered=$now"; } >> "$metaf"
_meta_end "$metaf" "retries_exhausted" "$(grep '^started=' "$metaf" | tail -1 | cut -d= -f2)"
echo "ORPHAN: $job exhausted crash retries -> failed/ ($(date))" >> "$LOGS/$job.log"
log "↻ orphan $C_BOLD$job$C_RESET exhausted retries -> failed"
continue
fi
na=$(( prev + 1 ))
local next=""; [[ "$max" -gt 0 ]] && next=$(( now + $(_retry_backoff_s "$raw") ))
mv "$f" "$INBOX/" 2>/dev/null
{
echo "attempts=$na"; echo "recovered=$now"
[[ -n "$next" ]] && echo "next_eligible=$next"
echo "result=recovered"; echo "ended=$now"
} >> "$metaf"
echo "ORPHAN RECOVERED: $job (worker dead) -> inbox, attempt now $na ($(date))" >> "$LOGS/$job.log"
log "↻ recovered orphan $C_BOLD$job$C_RESET (attempt $na)"
done
}
# ── token / cost capture (§26.2) ──
# parse_usage <engine> <logfile> -> emit `key=value` usage lines (model, tokens_in,
# tokens_out, tokens_cached, cost_usd, turns, tool_calls, usage_estimated) when the
# engine's output exposes them. This is the SINGLE place per-engine extraction lives.
# A wrapper of any engine may emit a machine-readable `AQ_USAGE k=v ...` line, which
# is always honored; engine-specific heuristics are best-effort (real where known,
# TODO otherwise). Never fabricate precise numbers — omit or mark usage_estimated.
parse_usage() {
local engine=$1 log=$2
[[ -f "$log" ]] || return 0
# 1) Generic, explicit usage line (preferred; emitted by any cooperating wrapper).
local line; line=$(grep -E '^AQ_USAGE ' "$log" 2>/dev/null | tail -1)
if [[ -n "$line" ]]; then
local kv
for kv in ${line#AQ_USAGE }; do
case "$kv" in
model=*|tokens_in=*|tokens_out=*|tokens_cached=*|cost_usd=*|turns=*|tool_calls=*|usage_estimated=*) echo "$kv";;
esac
done
return 0
fi
# 2) Engine-specific best-effort heuristics (real where the format is known).
local ti to
case "$engine" in
claude)
# Claude Code can surface usage as JSON-ish input_tokens/output_tokens.
ti=$(grep -oE '"input_tokens"[": ]+[0-9]+' "$log" 2>/dev/null | grep -oE '[0-9]+' | tail -1)
to=$(grep -oE '"output_tokens"[": ]+[0-9]+' "$log" 2>/dev/null | grep -oE '[0-9]+' | tail -1)
[[ -n "$ti" ]] && echo "tokens_in=$ti"
[[ -n "$to" ]] && echo "tokens_out=$to"
;;
codex)
# OpenAI usage object: prompt_tokens / completion_tokens.
ti=$(grep -oE '"prompt_tokens"[": ]+[0-9]+' "$log" 2>/dev/null | grep -oE '[0-9]+' | tail -1)
to=$(grep -oE '"completion_tokens"[": ]+[0-9]+' "$log" 2>/dev/null | grep -oE '[0-9]+' | tail -1)
[[ -n "$ti" ]] && echo "tokens_in=$ti"
[[ -n "$to" ]] && echo "tokens_out=$to"
;;
devin)
# Devin exposes usage in its ATIF conversation export (--export), not stdout.
# build_agent_cmd writes it next to the log; sum per-step token metrics.
local exp="${log%.log}.devin-export.json"
if [[ -f "$exp" ]]; then
local dti dto dtc dmodel
dti=$(grep -oE '"input_tokens"[[:space:]]*:[[:space:]]*[0-9]+' "$exp" | grep -oE '[0-9]+' | awk '{s+=$1} END{print s+0}')
dto=$(grep -oE '"output_tokens"[[:space:]]*:[[:space:]]*[0-9]+' "$exp" | grep -oE '[0-9]+' | awk '{s+=$1} END{print s+0}')
dtc=$(grep -oE '"cache_read_tokens"[[:space:]]*:[[:space:]]*[0-9]+' "$exp" | grep -oE '[0-9]+' | awk '{s+=$1} END{print s+0}')
dmodel=$(grep -oE '"model_name"[[:space:]]*:[[:space:]]*"[^"]+"' "$exp" | head -1 | sed -E 's/.*:[[:space:]]*"([^"]+)".*/\1/')
[[ -n "$dmodel" ]] && echo "model=$dmodel"
[[ "${dti:-0}" -gt 0 ]] && echo "tokens_in=$dti"
[[ "${dto:-0}" -gt 0 ]] && echo "tokens_out=$dto"
[[ "${dtc:-0}" -gt 0 ]] && echo "tokens_cached=$dtc"
# NOTE: Devin's export carries token counts but not USD cost; cost_usd is
# left unset (the dashboard shows tokens + model; cost stays blank).
fi
;;
copilot) : ;; # TODO: GitHub Copilot CLI usage format not yet documented here.
esac
return 0
}
# ── insights helpers (§26) ──
# _meta_val <metafile> <key> -> last value for key (append-only safe), else empty.
_meta_val() { grep "^$2=" "$1" 2>/dev/null | tail -1 | cut -d= -f2-; }
# _result_is_success <result> -> 0 if the agent run succeeded (reached a good stage).
_result_is_success() { case "$1" in review|testing|shipped) return 0;; *) return 1;; esac; }
# _insights_line <metafile> -> compact one-line metrics summary for status/dash.
_insights_line() {
local f=$1 s="" v ti to la ld
v=$(_meta_val "$f" attempts); [[ -n "$v" ]] && s+="attempt $v "
v=$(_meta_val "$f" duration_s); [[ -n "$v" ]] && s+="${v}s "
ti=$(_meta_val "$f" tokens_in); to=$(_meta_val "$f" tokens_out)
[[ -n "$ti$to" ]] && s+="tok ${ti:-0}/${to:-0} "
v=$(_meta_val "$f" cost_usd); [[ -n "$v" ]] && s+="usd=$v "
la=$(_meta_val "$f" lines_added); ld=$(_meta_val "$f" lines_deleted)
[[ -n "$la$ld" ]] && s+="+${la:-0}/-${ld:-0} "
[[ -n "$(_meta_val "$f" usage_estimated)" ]] && s+="(est) "
printf 'insights: %s' "${s:-(pending)}"
}
# ── Commands ────────────────────────────────────────────────────────
# ── Tracker integration (§10) — task <-> job round-trip ─────────────
#
# tracker_api <METHOD> <PATH> [JSON] -> emits the response body, then a final
# line with the HTTP status code. ALL HTTP goes through here (curl only). Tests
# replace it wholesale via AQ_TRACKER_API_CMD so no live service is needed.
tracker_api() {
local method=$1 path=$2 body=${3:-}
if [[ -n "$AQ_TRACKER_API_CMD" ]]; then
"$AQ_TRACKER_API_CMD" "$method" "$path" "$body"
return $?
fi
local url="${AQ_TRACKER_API}${path}"
local -a args=(-sS -m "${AQ_TRACKER_TIMEOUT:-30}" -X "$method"
-H "Content-Type: application/json" -w '\n%{http_code}')
[[ -n "$AQ_TRACKER_TOKEN" ]] && args+=(-H "Authorization: Bearer $AQ_TRACKER_TOKEN")
[[ -n "$AQ_PRODUCT_ID" ]] && args+=(-H "X-Product-Id: $AQ_PRODUCT_ID")
[[ -n "$body" ]] && args+=(--data "$body")
local out rc
out=$("$CURL_BIN" "${args[@]}" "$url" 2>/dev/null); rc=$?
if [[ $rc -ne 0 ]]; then printf '%s\n000\n' "$out"; else printf '%s\n' "$out"; fi
}
# _api_call <METHOD> <PATH> [JSON] -> sets globals API_BODY + API_CODE.
_api_call() {
local out; out=$(tracker_api "$@")
API_CODE=$(printf '%s' "$out" | tail -n1)
API_BODY=$(printf '%s' "$out" | sed '$d')
}
# _json_str <key> (reads JSON on stdin) -> the top-level string value, unescaped.
# Pure awk (POSIX) — mac + linux safe, no jq dependency.
_json_str() {
awk -v key="$1" '
{ json = json $0 }
END {
pat = "\"" key "\""
i = index(json, pat); if (i == 0) exit
rest = substr(json, i + length(pat))
sub(/^[ \t]*:[ \t]*/, "", rest)
if (substr(rest, 1, 1) != "\"") exit
rest = substr(rest, 2); n = length(rest)
for (j = 1; j <= n; j++) {
c = substr(rest, j, 1)
if (c == "\\") { j++; e = substr(rest, j, 1)
if (e == "n") out = out "\n"; else if (e == "t") out = out "\t"
else if (e == "r") out = out "\r"; else out = out e }
else if (c == "\"") break
else out = out c
}
printf "%s", out
}'
}
# _json_labels (reads JSON on stdin) -> one labels[] entry per line.
_json_labels() {
awk '
{ json = json $0 }
END {
i = index(json, "\"labels\""); if (i == 0) exit
rest = substr(json, i); lb = index(rest, "["); rb = index(rest, "]")
if (lb == 0 || rb == 0 || rb < lb) exit
arr = substr(rest, lb + 1, rb - lb - 1)
while (match(arr, /"([^"\\]|\\.)*"/)) {
tok = substr(arr, RSTART + 1, RLENGTH - 2)
gsub(/\\"/, "\"", tok); print tok
arr = substr(arr, RSTART + RLENGTH)
}
}'
}
# _json_escape <text> -> the text as a JSON string body (no surrounding quotes).
_json_escape() {
printf '%s' "$1" | awk '
{ line = $0; gsub(/\\/, "\\\\", line); gsub(/"/, "\\\"", line); gsub(/\t/, "\\t", line)
out = out (NR > 1 ? "\\n" : "") line }
END { printf "%s", out }'
}
# _tracker_status_for <result> -> the Item status for a job result/stage.
_tracker_status_for() {
case "$1" in
shipped) printf '%s' "$AQ_TRACKER_STATUS_DONE";;
failed|timeout|budget_exceeded|verify_failed|retries_exhausted|capability_mismatch|no_engine|rejected)
printf '%s' "$AQ_TRACKER_STATUS_FAILED";;
*) printf '%s' "$AQ_TRACKER_STATUS_INPROGRESS";;
esac
}
# _tracker_note <job> <metaf> <result> <status> -> a metrics-only summary line.
# NEVER includes prompt/body content or secrets — only run metrics (§24.5/§26).
_tracker_note() {
local jn=$1 metaf=$2 result=$3 status=$4 s attempts dur ti to cost la ld
attempts=$(_meta_val "$metaf" attempts); dur=$(_meta_val "$metaf" duration_s)
ti=$(_meta_val "$metaf" tokens_in); to=$(_meta_val "$metaf" tokens_out)
cost=$(_meta_val "$metaf" cost_usd)
la=$(_meta_val "$metaf" lines_added); ld=$(_meta_val "$metaf" lines_deleted)
s="agent-queue: job ${jn} -> ${result:-$status} (status=${status})."
[[ -n "$attempts" ]] && s+=" attempts=${attempts}."
[[ -n "$dur" ]] && s+=" duration=${dur}s."
[[ -n "$ti$to" ]] && s+=" tokens=${ti:-0}/${to:-0}."
[[ -n "$cost" ]] && s+=" cost_usd=${cost}."
[[ -n "$la$ld" ]] && s+=" diff=+${la:-0}/-${ld:-0}."
printf '%s' "$s"
}
# from-tracker <ITEM_ID> — pull a tracker Item and materialize a job in inbox/.
# Idempotent on the derived key `tracker-<ITEM_ID>` (Slice 1 dedupe).
cmd_from_tracker() {
ensure_dirs
local item_id="${1:-}"
[[ -n "$item_id" ]] || die "usage: from-tracker <ITEM_ID>"
_api_call GET "/api/items/$item_id"
case "$API_CODE" in 2*) :;; *) die "from-tracker: items API returned HTTP ${API_CODE:-error} for item $item_id";; esac
local title desc iprio
title=$(printf '%s' "$API_BODY" | _json_str title)
desc=$(printf '%s' "$API_BODY" | _json_str description)
iprio=$(printf '%s' "$API_BODY" | _json_str priority)
[[ -n "$title$desc" ]] || die "from-tracker: item $item_id has no title/description (HTTP $API_CODE)"
# labels carry optional manifest hints: engine-class:, profile:, priority:, cap:
local engine_class="" profile="" prio="" caps_list="" label
while IFS= read -r label; do
[[ -n "$label" ]] || continue
case "$label" in
engine-class:*) engine_class=${label#engine-class:};;
profile:*) profile=${label#profile:};;
priority:*) prio=${label#priority:};;
cap:*) caps_list+="${label#cap:}, ";;
esac
done < <(printf '%s' "$API_BODY" | _json_labels)
prio=${prio:-${iprio:-medium}}
case "$prio" in critical|high|medium|low) :;; *) prio=medium;; esac
# Materialize into a .md file (so cmd_add/the queue recognize it). mktemp -d is
# the portable way to get a unique path with a fixed (.md) basename on mac+linux.
local safe_id tmpdir tmp
safe_id=$(printf '%s' "$item_id" | tr -c 'A-Za-z0-9._-' '_')
tmpdir=$(mktemp -d "${TMPDIR:-/tmp}/aq-fromtracker.XXXXXX")
tmp="$tmpdir/tracker-$safe_id.md"
{
echo "---"
echo "cwd: $AQ_TRACKER_CWD"
echo "yolo: true"
echo "priority: $prio"
[[ -n "$engine_class" ]] && echo "engine-class: $engine_class"
[[ -n "$profile" ]] && echo "profile: $profile"
[[ -n "$caps_list" ]] && echo "capabilities: [${caps_list%, }]"
echo "tracker-item: $item_id"
echo "idempotency-key: tracker-$item_id"
echo "---"
echo
[[ -n "$title" ]] && { echo "# $title"; echo; }
printf '%s\n' "$desc"
} > "$tmp"
cmd_add "$tmp"
rm -rf "$tmpdir"
local created; created=$(grep -lE "^tracker-item:[[:space:]]*${item_id}[[:space:]]*\$" "$INBOX"/*.md 2>/dev/null | head -1)
if [[ -n "$created" ]]; then
log "from-tracker: item $item_id -> $C_BOLD$(basename "$created")$C_RESET"
else
log "from-tracker: item $item_id already queued elsewhere (deduped)"
fi
}
# to-tracker <job> — one-way echo of a job's CURRENT outcome to its tracker Item
# (child -> tracker, §24.5). Idempotent via meta `tracker_echoed`; never fatal.
cmd_to_tracker() {
ensure_dirs
local job="${1:-}"
[[ -n "$job" ]] || die "usage: to-tracker <job>"
local metaf="$STATE/$job.meta"
[[ -f "$metaf" ]] || metaf=$(ls -1t "$STATE"/*"$job"*.meta 2>/dev/null | head -1)
[[ -f "$metaf" ]] || die "to-tracker: no meta for job '$job'"
local jn; jn=$(basename "$metaf"); jn=${jn%.meta}
local item_id; item_id=$(_meta_val "$metaf" tracker_item)
if [[ -z "$item_id" ]]; then
log "to-tracker: $jn has no tracker-item — nothing to echo"
return 0
fi
local result status last
result=$(_meta_val "$metaf" result)
status=$(_tracker_status_for "$result")
last=$(_meta_val "$metaf" tracker_echoed)
if [[ "$last" == "$status" ]]; then
log "to-tracker: $jn already echoed status=$status to $item_id (no-op)"
return 0
fi
# 1) status transition
_api_call PATCH "/api/items/$item_id/status" "{\"status\":\"$status\"}"
case "$API_CODE" in
2*) :;;
*) err "to-tracker: status PATCH for $item_id failed (HTTP ${API_CODE:-error}) — non-fatal; job state unchanged"; return 0;;
esac
# 2) metrics-only comment (never prompt content / secrets)
local note; note=$(_tracker_note "$jn" "$metaf" "$result" "$status")
_api_call POST "/api/items/$item_id/comments" "{\"body\":\"$(_json_escape "$note")\"}"
case "$API_CODE" in
2*) :;;
*) err "to-tracker: comment for $item_id failed (HTTP ${API_CODE:-error}) — non-fatal";;
esac
# 3) record echoed status (idempotency)
{ echo "tracker_echoed=$status"; echo "tracker_echoed_at=$(date +%s)"; } >> "$metaf"
log "to-tracker: echoed $jn -> item $item_id (status=$status)"
}
# _auto_echo <job> — best-effort outcome echo on a transition (never fatal).
# Fleet mode (AQ_FLEET=1, fleet job): route the echo through the coordinator as a
# fenced stage report — `fleet_events` becomes the audit source of truth, so we do
# NOT also post to the tracker directly. Offline (non-fleet) jobs keep the Slice-4
# direct tracker echo (opt-in via AQ_TRACKER_AUTO).
_auto_echo() {
local job=$1
if declare -f fleet_enabled >/dev/null 2>&1 && fleet_enabled && _fleet_is_job "$job"; then
local result; result=$(_meta_val "$STATE/$job.meta" result)
fleet_report "$job" "$(_fleet_stage_for "$result")" >/dev/null 2>&1 || true
return 0
fi
[[ "$AQ_TRACKER_AUTO" == 1 ]] || return 0
cmd_to_tracker "$job" >/dev/null 2>&1 || true
}
cmd_init() { ensure_dirs; log "queue initialized at $C_BOLD$QUEUE_ROOT$C_RESET"; }
cmd_add() {
ensure_dirs
local file="" engine="" cwd="" yolo=""
while [[ $# -gt 0 ]]; do
case "$1" in
--engine) engine=$2; shift 2;;
--cwd) cwd=$2; shift 2;;
--yolo) yolo=true; shift;;
--no-yolo) yolo=false; shift;;
*) file=$1; shift;;
esac
done
[[ -n "$file" && -f "$file" ]] || die "usage: add <file.md> [--engine devin|claude|codex] [--cwd PATH] [--yolo|--no-yolo]"
# ── idempotency-key dedupe (§5) ──
# If the file declares an idempotency-key, compare a content hash of its
# stripped body against existing jobs in any active stage:
# same key + same body -> no-op (skip; "duplicate")
# same key + different body, prior in inbox/ -> supersede (replace it)
# same key + different body, prior elsewhere -> reject (clear error)
local idem; idem=$(fm_get "$file" idempotency-key "")
if [[ -n "$idem" ]]; then
local newhash; newhash=$(_body_hash "$file")
local d ef ekey ehash stage
# pass 1: exact duplicate anywhere active -> no-op
for d in "$INBOX" "$BUILDING" "$REVIEW" "$TESTING" "$SHIPPED"; do
for ef in "$d"/*.md; do
[[ -e "$ef" ]] || continue
ekey=$(fm_get "$ef" idempotency-key "")
[[ "$ekey" == "$idem" ]] || continue
ehash=$(_body_hash "$ef")
if [[ "$ehash" == "$newhash" ]]; then
log "duplicate, skipped (idempotency-key=$C_BOLD$idem$C_RESET matches $(basename "$ef"))"
return 0
fi
done
done
# pass 2: same key, different body, prior is past inbox -> reject
for d in "$BUILDING" "$REVIEW" "$TESTING" "$SHIPPED"; do
for ef in "$d"/*.md; do
[[ -e "$ef" ]] || continue
ekey=$(fm_get "$ef" idempotency-key "")
[[ "$ekey" == "$idem" ]] || continue
stage=$(basename "$d")
die "idempotency-key '$idem' already in use by $(basename "$ef") (stage: $stage) with different content — refusing. Use a new key, or requeue the existing job."
done
done
# pass 3: same key, different body, prior still queued -> supersede
for ef in "$INBOX"/*.md; do
[[ -e "$ef" ]] || continue
ekey=$(fm_get "$ef" idempotency-key "")
[[ "$ekey" == "$idem" ]] || continue
log "superseding inbox job $(basename "$ef") (same idempotency-key=$idem, changed content)"
rm -f "$ef"
done
fi
# ── dep cycle detection (§5): reject a submit that would create a cycle in the
# idempotency-key dependency graph (inbox + active stages). ──
local newdeps; newdeps=$(parse_list "$(fm_get "$file" deps "")" | tr '\n' ' ')
if [[ -n "${newdeps// /}" ]] && deps_would_cycle "$idem" "$newdeps"; then
die "dependency cycle detected: job (key '${idem:-<none>}') with deps [${newdeps% }] would create a cycle — refusing."
fi
local base; base=$(basename "$file")
local stamp; stamp=$(date +%Y%m%d-%H%M%S)
local dest="$INBOX/${stamp}__${base}"
# If user passed flags AND the file has no frontmatter, inject one.
if [[ -n "$engine$cwd$yolo" ]] && [[ "$(head -1 "$file")" != "---" ]]; then
{
echo "---"
echo "engine: ${engine:-$DEFAULT_ENGINE}"
echo "cwd: ${cwd:-$PWD}"
echo "yolo: ${yolo:-true}"
echo "---"
echo
cat "$file"
} > "$dest"
else
cp "$file" "$dest"
fi
log "queued $C_BOLD$(basename "$dest")$C_RESET (engine=$(fm_get "$dest" engine "$DEFAULT_ENGINE"), cwd=$(fm_get "$dest" cwd "$PWD"))"
}
cmd_run() {
ensure_dirs
local once=false
while [[ $# -gt 0 ]]; do
case "$1" in
--max) MAX_CONCURRENCY=$2; shift 2;;
--engine) DEFAULT_ENGINE=$2; shift 2;;
--once|--drain) once=true; shift;;
*) die "run: unknown arg '$1'";;
esac
done
# Refuse to start a second run loop against the same queue — two daemons would
# break the single-launcher invariant that per-cwd locking relies on.
local dpid=""
[[ -f "$STATE/daemon.pid" ]] && dpid=$(cat "$STATE/daemon.pid" 2>/dev/null)
if [[ -n "$dpid" ]] && kill -0 "$dpid" 2>/dev/null; then
die "a run loop is already active (pid $dpid). Use 'stop' first, or a different AGENT_QUEUE_ROOT."
fi
[[ -n "$dpid" ]] && log "clearing stale daemon.pid ($dpid)"
echo "$$" > "$STATE/daemon.pid"
trap 'rm -f "$STATE/daemon.pid"; log "run loop stopped"; exit 0' INT TERM
log "run loop started (max=$MAX_CONCURRENCY, default engine=$DEFAULT_ENGINE). Ctrl-C to stop."
# Crash recovery (§25.3): reclaim jobs orphaned in building/ by a previous
# crash/power-off before launching anything new.
recover_orphans
# Fleet (§8): register with the coordinator (registration == first heartbeat).
fleet_enabled && fleet_heartbeat
fleet_flags_warn_once # §16: warn once if ROUTE=1 + SHADOW=1 (ROUTE wins, shadow off)
while true; do
# continuously sweep for orphans (a worker that died mid-loop)
recover_orphans
# Fleet (§7/§8): heartbeat on cadence, renew leases for in-flight fleet jobs,
# and — if we have capacity — claim one coordinator job into inbox/ so the
# normal selection loop below executes it interleaved with local .md files.
if fleet_enabled; then
fleet_heartbeat_maybe
fleet_renew_active
# ROUTE flag (§16): only SOURCE work from the coordinator when route_via_service
# is on (ROUTE=1, the default). With ROUTE=0 the LOCAL inbox is authoritative
# (pre-cutover / shadow) — the coordinator is never used to source work.
if fleet_route_enabled && [[ "$(active_workers)" -lt "$MAX_CONCURRENCY" ]]; then
fleet_claim >/dev/null 2>&1 || true
fi
fi
local shadow_local="" # the local (authoritative) job picked this iteration (for shadow compare)
local running; running=$(active_workers)
# launch jobs while we have capacity and an eligible inbox file
while [[ "$running" -lt "$MAX_CONCURRENCY" ]]; do
# pick by priority (critical→low) then age, skipping files whose lock key
# is currently busy, so two jobs sharing a cwd (or `lock:` key) never run
# at once regardless of --max. inbox_sorted replaces the old pure-FIFO sort.
# Also skip jobs still inside their retry/recovery backoff (next_eligible).
local busy; busy=$(busy_keys)
local next="" cand cand_key cand_job cand_ne now_s
now_s=$(date +%s)
while IFS= read -r cand; do
[[ -n "$cand" ]] || continue
cand_key=$(lock_key_for "$cand")
if printf '%s\n' "$busy" | grep -qxF -- "$cand_key"; then continue; fi
cand_job=$(basename "$cand"); cand_job=${cand_job%.md}
cand_ne=$(grep '^next_eligible=' "$STATE/$cand_job.meta" 2>/dev/null | tail -1 | cut -d= -f2)
if [[ "$cand_ne" =~ ^[0-9]+$ ]] && [[ "$cand_ne" -gt "$now_s" ]]; then continue; fi
# skip jobs whose deps (§5 DAG) are unmet — blocked, re-evaluated next loop
if [[ -n "$(deps_unmet "$cand")" ]]; then continue; fi
next="$cand"; break
done < <(inbox_sorted)
[[ -z "$next" ]] && break
local job; job=$(basename "$next"); job=${job%.md}
local doing_file="$BUILDING/$(basename "$next")"
mv "$next" "$doing_file"
# Shadow compare key: the LOCAL authoritative decision is identified by its
# idempotency-key (the coordinator keys jobs the same way), falling back to
# the job name. Capture the first job launched this iteration.
[[ -z "$shadow_local" ]] && shadow_local=$(fm_get "$doing_file" idempotency-key "$job")
local w_eng w_cwd w_yolo w_key
# resolve the concrete engine now (explicit engine / engine-class) so the
# meta + status reflect what will actually run; run_worker re-resolves and
# is the authority on capability/engine gates.
w_eng=$(resolve_engine "$doing_file")
w_cwd=$(fm_get "$doing_file" cwd "$PWD")
w_yolo=$(fm_get "$doing_file" yolo "true")
w_key=$(lock_key_for "$doing_file")
# Preserve the attempt counter across requeues (retry/orphan recovery set
# it before re-queuing); a fresh job starts at 1. Read BEFORE truncating
# the meta below so the count survives re-creation (§25.4 crash-safe).
local w_attempts; w_attempts=$(grep '^attempts=' "$STATE/$job.meta" 2>/dev/null | tail -1 | cut -d= -f2)
[[ "$w_attempts" =~ ^[0-9]+$ ]] && [[ "$w_attempts" -gt 0 ]] || w_attempts=1
# write meta BEFORE launch (no pid yet), then append the worker pid from $!.
# The new manifest fields (§5) are recorded here; only priority,
# capabilities, engine-class and idempotency-key are functional this
# phase — the rest are stored for `status`/audit but otherwise inert.
{
echo "job=$job"
echo "engine=${w_eng:-<none>}"
echo "cwd=$w_cwd"
echo "yolo=$w_yolo"
echo "lock=$w_key"
echo "started=$(date +%s)"
echo "attempts=$w_attempts"
echo "priority=$(fm_get "$doing_file" priority medium)"
echo "profile=$(fm_get "$doing_file" profile "")"
echo "engine_class=$(fm_eff "$doing_file" engine-class "")"
echo "capabilities=$(fm_eff "$doing_file" capabilities "")"
echo "prefers=$(fm_get "$doing_file" prefers "")"
echo "prefers_engine=$(fm_eff "$doing_file" prefers-engine "")"
echo "allowed_scope=$(fm_eff "$doing_file" allowed-scope "" allowed-scope)"
echo "budget=$(fm_get "$doing_file" budget "")"
echo "deps=$(fm_get "$doing_file" deps "")"
echo "deps_mode=$(fm_get "$doing_file" deps-mode "")"
echo "idempotency_key=$(fm_get "$doing_file" idempotency-key "")"
echo "retry=$(fm_get "$doing_file" retry "")"
echo "review_policy=$(fm_eff "$doing_file" review-policy "" review-policy)"
echo "artifacts=$(fm_get "$doing_file" artifacts "")"
echo "tracker_item=$(fm_get "$doing_file" tracker-item "")"
echo "fleet_job_id=$(fm_get "$doing_file" fleet-job-id "")"
echo "fleet_lease_epoch=$(fm_get "$doing_file" fleet-lease-epoch "")"
} > "$STATE/$job.meta"
run_worker "$doing_file" &
{ echo "pid=$!"; echo "pidstart=$(_pidstart "$!")"; } >> "$STATE/$job.meta"
log "▶ launching $C_BOLD$job$C_RESET (engine=$w_eng, lock=$w_key)"
_auto_echo "$job" # building -> in_progress (opt-in)
sleep 1
running=$(active_workers)
done
# Shadow / dual-run (§9/§16): AFTER the local authoritative decision this
# iteration, ask the coordinator what it WOULD have assigned and record the
# divergence — WITHOUT acting on its response (shadow never claims/ships/
# quarantines or mutates real job state). Best-effort + error-swallowed so a
# coordinator hiccup can NEVER fail a real job.
if fleet_shadow_enabled && [[ -n "$shadow_local" ]]; then
{
fleet_shadow_claim
fleet_shadow_compare "$shadow_local" "${SHADOW_COORD_JOB:-}"
fleet_shadow_report "$shadow_local" "${SHADOW_COORD_JOB:-}" building
} >/dev/null 2>&1 || true
fi
if $once; then
# drain when no worker is running and nothing in inbox can still progress on
# its own (backoff jobs still count as pending; dep-blocked jobs do not).
if [[ "$(active_workers)" -eq 0 ]] && ! _drain_pending; then
log "drain complete — no runnable work, no workers running"; rm -f "$STATE/daemon.pid"; exit 0
fi
fi
sleep "$POLL_SECONDS"
done
}
_count() { ls -1 "$1"/*.md 2>/dev/null | wc -l | tr -d ' '; }
cmd_status() {
ensure_dirs
local ib bd rv ts sh fl
ib=$(_count "$INBOX"); bd=$(_count "$BUILDING"); rv=$(_count "$REVIEW")
ts=$(_count "$TESTING"); sh=$(_count "$SHIPPED"); fl=$(_count "$FAILED")
local running; running=$(active_workers)
echo
printf '%s AGENT QUEUE %s %s\n' "$C_BOLD" "$C_DIM$QUEUE_ROOT$C_RESET" ""
if fleet_enabled; then
printf ' %sFLEET%s factory=%s api=%s\n' "$C_CYAN" "$C_RESET" "$AQ_FACTORY_ID" "$AQ_FLEET_API"
printf ' %sFLEET%s %s\n' "$C_CYAN" "$C_RESET" "$(fleet_flags_state)"
fi
printf ' %sinbox%s %-3s %sbuilding%s %-3s %sreview%s %-3s %stesting%s %-3s %sshipped%s %-3s %sfailed%s %-3s %srunning%s %s/%s\n\n' \
"$C_BLUE" "$C_RESET" "$ib" "$C_YEL" "$C_RESET" "$bd" \
"$C_CYAN" "$C_RESET" "$rv" "$C_CYAN" "$C_RESET" "$ts" \
"$C_GREEN" "$C_RESET" "$sh" "$C_RED" "$C_RESET" "$fl" \
"$C_BOLD" "$C_RESET" "$running" "$MAX_CONCURRENCY"
# running table
local f
local printed=false
for f in "$STATE"/*.meta; do
[[ -e "$f" ]] || continue
grep -q '^ended=' "$f" && continue
local pid pidstart; pid=$(grep '^pid=' "$f" | cut -d= -f2); pidstart=$(grep '^pidstart=' "$f" | cut -d= -f2-)
_pid_alive "$pid" "$pidstart" || continue
if ! $printed; then printf ' %sRUNNING%s\n' "$C_BOLD" "$C_RESET"; printed=true; fi
local job eng start now el last lmt age stall=""
job=$(grep '^job=' "$f" | cut -d= -f2)
eng=$(grep '^engine=' "$f" | cut -d= -f2)
start=$(grep '^started=' "$f" | cut -d= -f2)
now=$(date +%s); el=$(( now - ${start:-$now} ))
last=$(tail -n 1 "$LOGS/$job.log" 2>/dev/null | cut -c1-60)
lmt=$(_mtime "$LOGS/$job.log"); age=$(( now - ${lmt:-$now} ))
[[ "$age" -gt $(( STALL_MIN * 60 )) ]] && stall="${C_RED}⚠ stalled${C_RESET} "
printf ' %s%-26s%s %-7s %3dm%02ds pid %-6s %s%s%s%s\n' \
"$C_BOLD" "$job" "$C_RESET" "$eng" $((el/60)) $((el%60)) "$pid" "$stall" "$C_DIM" "$last" "$C_RESET"
# manifest sub-line (Phase 1): priority + profile + capabilities + tracker-item
local m_prio m_prof m_caps m_trk extra=""
m_prio=$(grep '^priority=' "$f" | cut -d= -f2-); m_prof=$(grep '^profile=' "$f" | cut -d= -f2-)
m_caps=$(grep '^capabilities=' "$f" | cut -d= -f2-); m_trk=$(grep '^tracker_item=' "$f" | cut -d= -f2-)
local m_echo m_fleet m_epoch
m_echo=$(grep '^tracker_echoed=' "$f" | tail -1 | cut -d= -f2-)
m_fleet=$(grep '^fleet_job_id=' "$f" | tail -1 | cut -d= -f2-)
m_epoch=$(grep '^fleet_lease_epoch=' "$f" | tail -1 | cut -d= -f2-)
[[ -n "$m_prio" ]] && extra+="prio=$m_prio "
[[ -n "$m_prof" ]] && extra+="profile=$m_prof "
[[ -n "$m_caps" ]] && extra+="caps=$m_caps "
[[ -n "$m_trk" ]] && extra+="tracker=$m_trk "
[[ -n "$m_echo" ]] && extra+="echoed=$m_echo "
[[ -n "$m_fleet" ]] && extra+="fleet=$m_fleet@e${m_epoch:-0} "
[[ -n "$extra" ]] && printf ' %s%s%s\n' "$C_DIM" "$extra" "$C_RESET"
printf ' %s%s%s\n' "$C_DIM" "$(_insights_line "$f")" "$C_RESET"
done
$printed || printf ' %sno workers running%s\n' "$C_DIM" "$C_RESET"
# blocked jobs (unmet deps, §5) — waiting in inbox/, re-evaluated each loop
local bf bj unmet bprinted=false
for bf in "$INBOX"/*.md; do
[[ -e "$bf" ]] || continue
unmet=$(deps_unmet "$bf")
[[ -n "$unmet" ]] || continue
if ! $bprinted; then echo; printf ' %sBLOCKED%s\n' "$C_BOLD" "$C_RESET"; bprinted=true; fi
bj=$(basename "$bf"); bj=${bj%.md}
printf ' %s%-26s%s %sblocked (waiting on: %s)%s\n' \
"$C_BOLD" "$bj" "$C_RESET" "$C_YEL" "$unmet" "$C_RESET"
done
echo
}
cmd_watch() {
local interval="${1:-2}"
while true; do clear; cmd_status; sleep "$interval"; done
}
# recover — reclaim orphaned building/ jobs (dead worker) back to inbox/. Runs
# automatically inside `run`; exposed for operators + crash-recovery testing.
cmd_recover() { ensure_dirs; recover_orphans; log "orphan sweep complete"; }
# insights [job] — per-job metrics, or a recent-jobs table + per-engine rollup.
cmd_insights() {
ensure_dirs
local job="${1:-}"
if [[ -n "$job" ]]; then
local f="$STATE/$job.meta"
[[ -f "$f" ]] || f=$(ls -1t "$STATE"/*"$job"*.meta 2>/dev/null | head -1)
[[ -f "$f" ]] || die "no job meta matching '$job'"
local jn; jn=$(basename "$f"); jn=${jn%.meta}
printf '\n%s INSIGHTS %s%s%s\n' "$C_BOLD" "$C_CYAN" "$jn" "$C_RESET"
local k val
for k in engine result attempts started ended duration_s exit verify_exit \
model tokens_in tokens_out tokens_cached cost_usd turns tool_calls usage_estimated \
files_changed lines_added lines_deleted wip_branch wip_base wip_commit \
next_eligible retry_class recovered tracker_item tracker_echoed tracker_echoed_at \
fleet_job_id fleet_lease_epoch fleet_reported fleet_fenced fleet_degraded fleet_quarantined; do
val=$(_meta_val "$f" "$k")
[[ -n "$val" ]] && printf ' %-15s %s\n' "$k" "$val"
done
echo
return 0
fi
printf '\n%s INSIGHTS — recent finished jobs%s %s%s%s\n' \
"$C_BOLD" "$C_RESET" "$C_DIM" "$QUEUE_ROOT" "$C_RESET"
printf ' %-26s %-8s %-16s %6s %10s %9s\n' "job" "engine" "result" "dur" "tok(i/o)" "cost"
local f rows=0 agg; agg=$(mktemp "${TMPDIR:-/tmp}/aq-insights.XXXXXX")
while IFS= read -r f; do
[[ -n "$f" ]] || continue
grep -q '^ended=' "$f" || continue
local jn eng res dur ti to cost est
jn=$(basename "$f"); jn=${jn%.meta}
eng=$(_meta_val "$f" engine); res=$(_meta_val "$f" result); dur=$(_meta_val "$f" duration_s)
ti=$(_meta_val "$f" tokens_in); to=$(_meta_val "$f" tokens_out); cost=$(_meta_val "$f" cost_usd)
est=$(_meta_val "$f" usage_estimated)
rows=$((rows+1))
[[ $rows -le 15 ]] && printf ' %-26.26s %-8.8s %-16.16s %5ss %10s %9s\n' \
"$jn" "${eng:-?}" "${res:-?}" "${dur:-0}" "${ti:-0}/${to:-0}" "${cost:-}"
local succ=0; _result_is_success "$res" && succ=1
printf '%s|%s|%s|%s|%s|%s|%s\n' "${eng:-?}" "${ti:-0}" "${to:-0}" "${cost:-0}" "${dur:-0}" "$succ" "$est" >> "$agg"
done < <(ls -1t "$STATE"/*.meta 2>/dev/null)
if [[ $rows -eq 0 ]]; then
printf ' %sno finished jobs yet%s\n\n' "$C_DIM" "$C_RESET"; rm -f "$agg"; return 0
fi
printf '\n%s ROLLUP BY ENGINE%s\n' "$C_BOLD" "$C_RESET"
printf ' %-8s %5s %10s %10s %10s %8s\n' "engine" "jobs" "tok_in" "tok_out" "cost" "success"
local e
for e in $(cut -d'|' -f1 "$agg" | sort -u); do
awk -F'|' -v eng="$e" '
$1==eng { jobs++; ti+=$2; to+=$3; cost+=$4; succ+=$6; if ($7!="") est=1 }
END {
rate = jobs>0 ? (succ*100.0/jobs) : 0
printf " %-8s %5d %10d %10d %9.4f%s %6.0f%%\n", eng, jobs, ti, to, cost, (est?"*":" "), rate
}' "$agg"
done
printf ' %s* total includes estimated token/cost values%s\n\n' "$C_DIM" "$C_RESET"
rm -f "$agg"
}
cmd_dash() {
command -v node >/dev/null 2>&1 || die "node not found — use 'watch' for the bash status view"
AGENT_QUEUE_ROOT="$QUEUE_ROOT" exec node "$SCRIPT_DIR/dashboard.mjs" "$@"
}
cmd_stop() {
ensure_dirs
local killed=0 f pid pidstart
for f in "$STATE"/*.meta; do
[[ -e "$f" ]] || continue
grep -q '^ended=' "$f" && continue
pid=$(grep '^pid=' "$f" | cut -d= -f2); pidstart=$(grep '^pidstart=' "$f" | cut -d= -f2-)
_pid_alive "$pid" "$pidstart" && { kill "$pid" 2>/dev/null && killed=$((killed+1)); }
done
[[ -f "$STATE/daemon.pid" ]] && kill "$(cat "$STATE/daemon.pid")" 2>/dev/null
rm -f "$STATE/daemon.pid"
log "stopped $killed running worker(s) + run loop"
}
cmd_logs() {
local job="${1:-}" follow=""
[[ "${2:-}" == "-f" || "$job" == "-f" ]] && follow="-f"
[[ "$job" == "-f" ]] && job="${2:-}"
[[ -n "$job" ]] || die "usage: logs <job> [-f]"
local lf="$LOGS/$job.log"
[[ -f "$lf" ]] || lf=$(ls -1t "$LOGS"/*"$job"*.log 2>/dev/null | head -1)
[[ -f "$lf" ]] || die "no log found for '$job'"
if [[ -n "$follow" ]]; then tail -f "$lf"; else cat "$lf"; fi
}
# _find_job <job> <dir...> — echo the first matching .md across the given dirs
# (exact "<job>.md" preferred, else newest fuzzy match). Empty if none found.
_find_job() {
local job=$1; shift
local d f
for d in "$@"; do
[[ -f "$d/$job.md" ]] && { printf '%s' "$d/$job.md"; return; }
done
for d in "$@"; do
f=$(ls -1t "$d"/*"$job"*.md 2>/dev/null | head -1)
[[ -f "$f" ]] && { printf '%s' "$f"; return; }
done
}
# requeue <job> — move a job back to inbox/ for a fresh run (from failed/review/testing).
cmd_requeue() {
ensure_dirs
local job="${1:-}"
[[ -n "$job" ]] || die "usage: requeue <job>"
local f; f=$(_find_job "$job" "$FAILED" "$REVIEW" "$TESTING")
[[ -n "$f" ]] || die "no failed/review/testing job matching '$job'"
local base name from; base=$(basename "$f"); name=${base%.md}; from=$(basename "$(dirname "$f")")
mv "$f" "$INBOX/$base"
# drop stale state so it re-runs cleanly
rm -f "$STATE/$name.meta" "$STATE/$name.body.md" "$STATE/$name.timedout"
log "requeued $C_BOLD$base$C_RESET ($from → inbox)"
}
# ship <job> — manual promotion testing/ (QA) → shipped/. The human gate.
cmd_ship() {
ensure_dirs
local job="${1:-}"
[[ -n "$job" ]] || die "usage: ship <job>"
local f; f=$(_find_job "$job" "$TESTING")
[[ -n "$f" ]] || die "no job in testing/ matching '$job' (only QA-passed jobs can ship)"
local base name; base=$(basename "$f"); name=${base%.md}
# Fleet (§18): a fleet job may only ship if we still hold the lease. A fenced
# report means the coordinator reclaimed it -> quarantine instead of shipping.
if fleet_enabled && _fleet_is_job "$name"; then
fleet_report "$name" shipped checkpoint; local _src=$?
if [[ "$_src" -eq 2 ]]; then
fleet_quarantine "$name" "$f" "$STATE/$name.meta" "$LOGS/$name.log"
return 0
fi
fi
mv "$f" "$SHIPPED/$base"
[[ -f "$STATE/$name.meta" ]] && echo "result=shipped" >> "$STATE/$name.meta"
log "shipped $C_BOLD$base$C_RESET (testing → shipped)"
_auto_echo "$name"
if fleet_enabled && _fleet_is_job "$name"; then
fleet_lease_release "$name" shipped >/dev/null 2>&1 || true
fi
return 0
}
# promote <job> — advance one stage forward: review → testing → shipped.
cmd_promote() {
ensure_dirs
local job="${1:-}"
[[ -n "$job" ]] || die "usage: promote <job>"
local f; f=$(_find_job "$job" "$REVIEW" "$TESTING")
[[ -n "$f" ]] || die "no job in review/ or testing/ matching '$job'"
local base name from dest result; base=$(basename "$f"); name=${base%.md}
from=$(basename "$(dirname "$f")")
case "$from" in
review) dest="$TESTING"; result="testing";;
testing) dest="$SHIPPED"; result="shipped";;
*) die "promote: '$base' is in '$from' — nothing to promote";;
esac
mv "$f" "$dest/$base"
[[ -f "$STATE/$name.meta" ]] && echo "result=$result" >> "$STATE/$name.meta"
log "promoted $C_BOLD$base$C_RESET ($from$result)"
_auto_echo "$name"
}
# reject <job> — move a review/testing job to failed/ (manual gate rejection).
cmd_reject() {
ensure_dirs
local job="${1:-}"
[[ -n "$job" ]] || die "usage: reject <job>"
local f; f=$(_find_job "$job" "$REVIEW" "$TESTING")
[[ -n "$f" ]] || die "no job in review/ or testing/ matching '$job'"
local base name from; base=$(basename "$f"); name=${base%.md}; from=$(basename "$(dirname "$f")")
mv "$f" "$FAILED/$base"
[[ -f "$STATE/$name.meta" ]] && echo "result=rejected" >> "$STATE/$name.meta"
log "rejected $C_BOLD$base$C_RESET ($from → failed)"
}
# clean [--keep N] — archive finished jobs' logs+meta beyond the newest N
# (default 50) into queue/.archive/<ts>/. Running jobs and the done/failed .md
# kanban records are left untouched.
cmd_clean() {
ensure_dirs
local keep=50
while [[ $# -gt 0 ]]; do
case "$1" in
--keep) keep=$2; shift 2;;
*) die "clean: unknown arg '$1'";;
esac
done
[[ "$keep" =~ ^[0-9]+$ ]] || die "clean: --keep must be a number"
local arch="$QUEUE_ROOT/.archive/$(date +%Y%m%d-%H%M%S)"
# finished metas (have ended=), newest-first by mtime
local metas; metas=$(grep -l '^ended=' "$STATE"/*.meta 2>/dev/null \
| while IFS= read -r m; do printf '%s %s\n' "$(_mtime "$m")" "$m"; done \
| sort -rn | awk '{print $2}')
local i=0 moved=0 m name
while IFS= read -r m; do
[[ -n "$m" ]] || continue
i=$((i+1))
[[ "$i" -le "$keep" ]] && continue
name=$(basename "$m"); name=${name%.meta}
mkdir -p "$arch"
mv "$m" "$arch/" 2>/dev/null
[[ -f "$LOGS/$name.log" ]] && mv "$LOGS/$name.log" "$arch/" 2>/dev/null
[[ -f "$STATE/$name.body.md" ]] && mv "$STATE/$name.body.md" "$arch/" 2>/dev/null
moved=$((moved+1))
done <<< "$metas"
if [[ "$moved" -gt 0 ]]; then
log "archived $moved finished job(s) to $C_BOLD$arch$C_RESET (kept newest $keep)"
else
log "nothing to clean (≤$keep finished jobs)"
fi
}
usage() {
cat <<EOF
${C_BOLD}agent-queue${C_RESET} — folder kanban runner for devin | claude | codex
${C_BOLD}USAGE${C_RESET}
agent-queue.sh <command> [args]
${C_BOLD}COMMANDS${C_RESET}
init create the queue/ folders
add <file.md> [opts] queue a prompt file into inbox/
--engine devin|claude|codex --cwd PATH --yolo | --no-yolo
run [--max N] [--engine E] [--once]
process inbox/ (foreground loop; Ctrl-C to stop)
status show kanban counts + running workers (+ insights)
watch [interval] live status (default 2s, bash)
insights [job] per-job metrics, or recent table + per-engine rollup
recover reclaim orphaned building/ jobs (dead worker) -> inbox
from-tracker <ITEM_ID> pull a tracker Item -> materialize a job in inbox/ (§10)
to-tracker <job> echo a job's outcome to its tracker Item (one-way)
fleet-status (AQ_FLEET=1) register/heartbeat with the coordinator + show identity + flags
fleet-shadow-report [N] summarize the shadow/dual-run divergence log (counts, agreement, last N)
dash [--interval N] richer live Node dashboard (recent shipped/failed too)
stop kill running workers + the run loop
logs <job> [-f] print (or follow) a job's log
promote <job> advance one stage (review → testing → shipped)
ship <job> manual gate: testing (QA) → shipped
reject <job> send a review/testing job to failed/
requeue <job> move a failed/review/testing job back to inbox/
clean [--keep N] archive finished logs+meta beyond newest N (default 50)
help this message
${C_BOLD}KANBAN${C_RESET} inbox → building → review → testing → shipped (+ failed; logs/ + .state/ alongside)
auto: agent rc=0 → review; verify pass → testing; verify fail → failed
manual: ship (testing → shipped)
${C_BOLD}TASK FRONTMATTER${C_RESET} (top of each .md)
---
engine: devin # devin|claude|codex|copilot. Explicit engine always wins
cwd: /Users/you/code/repo
yolo: true
lock: my-repo # optional; defaults to cwd. Jobs sharing a key run serially
timeout: 45m # optional; 90s|45m|2h|1d. On expiry -> failed (result=timeout)
budget: { wall: 2h } # optional; wall = HARD wall-clock ceiling (extends timeout). On expiry -> failed (result=budget_exceeded). usd/tokens best-effort only
verify: pnpm -s test # optional; auto-QA gate. pass -> testing, fail -> failed
# --- Phase 1 manifest (active) ---
priority: high # critical|high|medium|low (default medium). Picked highest-first, then oldest
engine-class: agentic-coder # used only if `engine` unset: agentic-coder->devin,claude,codex; chat-coder->copilot
prefers-engine: [claude] # optional order hint for engine-class resolution
capabilities: [os:any, node>=20, has:git] # hard host requirements; unmet -> failed (capability_mismatch)
idempotency-key: my-task-1 # re-adding same key+body = no-op; same key+different body = reject/supersede
retry: { max: 2, backoff: 5m, on: [timeout, verify_failed, crash] } # requeue on these classes up to max, then retries_exhausted
profile: backend-engineer # inherit persona + verify/caps/engine-class/scope/review-policy (job fields override)
deps: [other-key] # block until each idempotency-key is shipped/ (or testing/ if deps-mode: soft)
deps-mode: soft # soft = a dep also counts as satisfied while in testing/
# --- reserved (parsed + shown in status, but no-op until a later phase) ---
prefers: review-policy: artifacts: tracker-item:
---
${C_BOLD}PROFILES${C_RESET} profiles/<name>.md presets persona + capabilities + default-verify + engine-class +
prefers-engine + allowed-scope + review-policy. A job's own fields always override.
Catalog: developer, backend-engineer, frontend-engineer, ux-designer, ui-designer,
qa, reviewer, docs-writer, planner(reserved).
${C_BOLD}RESILIENCE${C_RESET} crash-safe: orphaned building/ jobs (dead worker) are recovered to inbox/ on
'run' startup; git-repo cwd work is checkpointed to branch aq/wip/<job> on every
exit (resumed on retry); 'retry' requeues failures with backoff. See 'insights'.
${C_BOLD}ENV${C_RESET}
AGENT_QUEUE_ROOT (=$QUEUE_ROOT) AGENT_QUEUE_MAX (=$MAX_CONCURRENCY)
AGENT_QUEUE_ENGINE (=$DEFAULT_ENGINE) AGENT_QUEUE_VERIFY (default verify cmd)
DEVIN_BIN / CLAUDE_BIN / CODEX_BIN / COPILOT_BIN
${C_BOLD}TRACKER${C_RESET} (§10 — from-tracker / to-tracker; real use needs platform-service + a token)
AQ_TRACKER_API (=$AQ_TRACKER_API) AQ_TRACKER_TOKEN (bearer) AQ_PRODUCT_ID
AQ_TRACKER_AUTO=1 to auto-echo outcomes on each transition (default OFF)
AQ_TRACKER_CWD (cwd for tracker-derived jobs) AQ_TRACKER_API_CMD (test stub seam)
label hints on an Item: engine-class:<x> profile:<x> priority:<x> cap:<token>
${C_BOLD}FLEET${C_RESET} (Phase 2 — coordinator integration; OFF by default = offline git-queue)
AQ_FLEET=1 to enable; AQ_FLEET_API (=$AQ_FLEET_API) AQ_FLEET_TOKEN (bearer) AQ_PRODUCT_ID
AQ_FACTORY_ID (=$AQ_FACTORY_ID) AQ_FLEET_LEASE_RENEW_SEC AQ_FLEET_CAPS AQ_FLEET_CWD
Three independently-toggleable flags (precedence below):
AQ_FLEET=0 pure offline — ZERO coordinator calls (master switch).
AQ_FLEET_ROUTE=1 route_via_service (default): coordinator AUTHORITATIVE for claim.
AQ_FLEET_ROUTE=0 LOCAL inbox authoritative — coordinator not used to source work.
AQ_FLEET_SHADOW=1 shadow/dual-run (needs AQ_FLEET=1 + AQ_FLEET_ROUTE=0): run the offline
path as authoritative AND query the coordinator in parallel to record
divergence, never acting on it. If ROUTE=1 + SHADOW=1, ROUTE wins (warned).
Cutover ladder: (1) ROUTE=0 SHADOW=1 observe → (2) inspect 'fleet-shadow-report' agreement
→ (3) flip ROUTE=1 once agreement is high. Rollback = ROUTE=0 (and/or AQ_FLEET=0) anytime.
EOF
}
# Fleet coordinator client (Phase 2). All functions no-op unless AQ_FLEET=1, so the
# offline git-queue path above is unchanged when the flag is off.
# shellcheck source=lib/fleet-client.sh
[[ -f "$SCRIPT_DIR/lib/fleet-client.sh" ]] && source "$SCRIPT_DIR/lib/fleet-client.sh"
main() {
local cmd="${1:-help}"; shift || true
case "$cmd" in
init) cmd_init "$@";;
fleet-status) cmd_fleet_status "$@";;
fleet-shadow-report) cmd_fleet_shadow_report "$@";;
add) cmd_add "$@";;
run) cmd_run "$@";;
status) cmd_status "$@";;
watch) cmd_watch "$@";;
insights) cmd_insights "$@";;
recover) cmd_recover "$@";;
from-tracker) cmd_from_tracker "$@";;
to-tracker) cmd_to_tracker "$@";;
dash|dashboard) cmd_dash "$@";;
stop) cmd_stop "$@";;
logs) cmd_logs "$@";;
promote) cmd_promote "$@";;
ship) cmd_ship "$@";;
reject) cmd_reject "$@";;
requeue) cmd_requeue "$@";;
clean) cmd_clean "$@";;
help|-h|--help) usage;;
*) err "unknown command: $cmd"; echo; usage; exit 1;;
esac
}
main "$@"