bytelyst-devops-tools/agent-queue/agent-queue.sh
saravanakumardb1 b7a9ea1b7a feat(agent-queue): tracker adapter — task <-> job round-trip (P1-S4)
Implements §10 single-host tracker integration, closing the last Phase-1 §14 item:

- tracker_api: one curl-only HTTP wrapper (base URL + bearer + productId header),
  overridable via AQ_TRACKER_API_CMD so tests need no live service. Emits the
  response body + a trailing HTTP-code line; _api_call splits into API_BODY/API_CODE.
- aq from-tracker <ITEM_ID>: GET the Item, map title/description -> job body,
  labels (engine-class:/profile:/priority:/cap:) + Item priority -> frontmatter,
  and stamp tracker-item + a stable idempotency-key tracker-<id>. Materializes a
  .md into inbox/ via cmd_add; idempotent (Slice 1 dedupe) so a re-pull never dups.
  JSON parsed with POSIX awk (no jq) — mac + linux safe.
- aq to-tracker <job>: one-way echo (child -> tracker, §24.5). PATCHes the Item
  status (building/review/testing->in_progress, shipped->done, failures->wont_fix,
  all overridable) and posts a metrics-only comment (result/attempts/duration/
  tokens/cost/diff — NEVER prompt content or secrets). Idempotent via meta
  tracker_echoed; an echo failure (e.g. HTTP 500) is logged and non-fatal — the
  tracker is downstream, never authoritative for execution.
- Opt-in auto-echo (AQ_TRACKER_AUTO=1, default OFF): the worker echoes on each
  transition (building via cmd_run, review/testing/failed via run_worker, shipped
  via ship/promote); never blocks or fails a job.
- status + insights surface tracker-item and the last echoed status.

curl-only HTTP; no new runtime deps; conventional + backward-compatible.
2026-05-29 21:35:06 -07:00

1851 lines
78 KiB
Bash
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env bash
#
# agent-queue — a folder-based "kanban" runner for headless coding-agent CLIs.
#
# Drop a prompt .md file into queue/inbox/, and `agent-queue run` will:
# 1. pick the oldest file (respecting --max concurrency),
# 2. move it inbox/ -> building/,
# 3. launch the chosen agent CLI (devin | claude | codex) in --yolo mode,
# 4. on agent rc=0 move building/ -> review/, then run the auto-QA verify gate:
# verify pass -> testing/ verify fail -> failed/ (no verify -> stays in review/)
# 5. on agent failure/timeout move building/ -> failed/,
# 6. you manually `ship` testing/ -> shipped/ (the human gate),
# 7. write a per-job log + live state so `status`/`watch` can show progress.
#
# Lifecycle: inbox -> building -> review -> testing -> shipped (+ failed)
#
# Per-task config travels in YAML-ish frontmatter at the top of the .md:
# ---
# engine: devin # devin | claude | codex (default: $DEFAULT_ENGINE)
# cwd: /abs/path/repo # where the agent runs (default: $PWD when added)
# yolo: true # auto-approve all tools (default: true)
# ---
#
# Subcommands: init | add | run | status | watch | dash | stop | logs |
# promote | ship | reject | requeue | clean | help
#
set -uo pipefail
# ── Resolve paths ───────────────────────────────────────────────────
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
QUEUE_ROOT="${AGENT_QUEUE_ROOT:-$SCRIPT_DIR/queue}"
# Profile catalog dir (persona + capability presets). Override for tests.
PROFILES_DIR="${AGENT_QUEUE_PROFILES:-$SCRIPT_DIR/profiles}"
INBOX="$QUEUE_ROOT/inbox"
BUILDING="$QUEUE_ROOT/building"
REVIEW="$QUEUE_ROOT/review"
TESTING="$QUEUE_ROOT/testing"
SHIPPED="$QUEUE_ROOT/shipped"
FAILED="$QUEUE_ROOT/failed"
LOGS="$QUEUE_ROOT/logs"
STATE="$QUEUE_ROOT/.state"
LOCKS="$QUEUE_ROOT/locks"
# ── Config (env-overridable) ────────────────────────────────────────
MAX_CONCURRENCY="${AGENT_QUEUE_MAX:-3}"
DEFAULT_ENGINE="${AGENT_QUEUE_ENGINE:-devin}"
POLL_SECONDS="${AGENT_QUEUE_POLL:-3}"
# A running worker is flagged "stalled" if its log has not changed in this many
# minutes (no new agent output) — surfaced in status + dash.
STALL_MIN="${AGENT_QUEUE_STALL_MIN:-10}"
# Auto-QA verify command. After an agent exits 0 the job lands in review/; if a
# verify command is set (frontmatter `verify:` overrides this default) it runs in
# the job's cwd: pass -> testing/ (QA), fail -> failed/. Empty default = jobs park
# in review/ for manual `promote`. Shipping (testing -> shipped) is always manual.
DEFAULT_VERIFY="${AGENT_QUEUE_VERIFY:-}"
# flock is used for cross-process lock hardening when available (Linux). macOS
# has no flock; mutual exclusion there relies on the single run-loop (see cmd_run).
FLOCK_BIN="${FLOCK_BIN:-$(command -v flock || true)}"
# timeout/gtimeout give hard process-tree kills for per-job timeouts; if absent
# (stock macOS) a pure-bash watchdog is used as a best-effort fallback.
TIMEOUT_BIN="${TIMEOUT_BIN:-$(command -v timeout || command -v gtimeout || true)}"
DEVIN_BIN="${DEVIN_BIN:-$(command -v devin || echo "$HOME/.local/bin/devin")}"
CLAUDE_BIN="${CLAUDE_BIN:-$(command -v claude || echo claude)}"
CODEX_BIN="${CODEX_BIN:-$(command -v codex || echo codex)}"
COPILOT_BIN="${COPILOT_BIN:-$(command -v copilot || echo copilot)}"
# ── Tracker integration (§10) — task <-> job round-trip via the items API ──
# Base URL of the platform-service items API; the items routes live under /api.
AQ_TRACKER_API="${AQ_TRACKER_API:-http://localhost:4003}"
# Bearer token (required for real calls; never hardcode). productId stamps Items.
AQ_TRACKER_TOKEN="${AQ_TRACKER_TOKEN:-}"
AQ_PRODUCT_ID="${AQ_PRODUCT_ID:-}"
# cwd a tracker-derived job runs in (Items carry no cwd); defaults to the invoking dir.
AQ_TRACKER_CWD="${AQ_TRACKER_CWD:-$PWD}"
# Auto-echo job outcomes back to the tracker on each transition (opt-in, default OFF).
AQ_TRACKER_AUTO="${AQ_TRACKER_AUTO:-0}"
# Item status the API uses for each bucket (the items API has no blocked/failed
# status, so failures map to wont_fix by default — all overridable).
AQ_TRACKER_STATUS_INPROGRESS="${AQ_TRACKER_STATUS_INPROGRESS:-in_progress}"
AQ_TRACKER_STATUS_DONE="${AQ_TRACKER_STATUS_DONE:-done}"
AQ_TRACKER_STATUS_FAILED="${AQ_TRACKER_STATUS_FAILED:-wont_fix}"
# Test seam: a stub script that replaces the real curl HTTP (see selftest.sh).
AQ_TRACKER_API_CMD="${AQ_TRACKER_API_CMD:-}"
CURL_BIN="${CURL_BIN:-$(command -v curl || echo curl)}"
# ── Colors ──────────────────────────────────────────────────────────
if [[ -t 1 ]]; then
C_RESET=$'\033[0m'; C_DIM=$'\033[2m'; C_BOLD=$'\033[1m'
C_BLUE=$'\033[34m'; C_GREEN=$'\033[32m'; C_RED=$'\033[31m'; C_YEL=$'\033[33m'; C_CYAN=$'\033[36m'
else
C_RESET=""; C_DIM=""; C_BOLD=""; C_BLUE=""; C_GREEN=""; C_RED=""; C_YEL=""; C_CYAN=""
fi
log() { printf '%s[agent-queue]%s %s\n' "$C_CYAN" "$C_RESET" "$*"; }
err() { printf '%s[agent-queue]%s %s\n' "$C_RED" "$C_RESET" "$*" >&2; }
die() { err "$*"; exit 1; }
# ── Init ────────────────────────────────────────────────────────────
ensure_dirs() { mkdir -p "$INBOX" "$BUILDING" "$REVIEW" "$TESTING" "$SHIPPED" "$FAILED" "$LOGS" "$STATE" "$LOCKS"; }
# ── Frontmatter parsing ─────────────────────────────────────────────
# fm_get <file> <key> <default>
fm_get() {
local file=$1 key=$2 def=${3:-}
local val
# only scan a leading --- ... --- block
val=$(awk -v k="$key" '
NR==1 && $0!="---" { exit }
NR==1 { infm=1; next }
infm && $0=="---" { exit }
infm {
line=$0
sub(/^[ \t]*/,"",line)
if (line ~ "^" k "[ \t]*:") {
sub("^" k "[ \t]*:[ \t]*","",line)
gsub(/^["'\''[:space:]]+|["'\''[:space:]]+$/,"",line)
print line; exit
}
}' "$file" 2>/dev/null)
[[ -n "$val" ]] && printf '%s' "$val" || printf '%s' "$def"
}
# strip_frontmatter <file> -> prints the body (everything after a leading ---..--- block)
strip_frontmatter() {
awk 'NR==1 && $0=="---" { infm=1; next }
infm && $0=="---" { infm=0; next }
{ if (!infm) print }' "$1"
}
# lock_key_for <file> -> the mutual-exclusion key for a job: frontmatter `lock:`
# if set, otherwise the cwd. Jobs sharing a key never run concurrently.
lock_key_for() {
local f=$1 k
k=$(fm_get "$f" lock "")
[[ -n "$k" ]] && { printf '%s' "$k"; return; }
fm_get "$f" cwd "$PWD"
}
# _keyhash <key> -> stable filename-safe token for a lock key
_keyhash() { printf '%s' "$1" | cksum | awk '{print $1}'; }
# ── Profiles (§6): persona + capability/engine/scope presets ─────────
#
# profile_get <profile-name> <profile-key> [default] -> a single-line value from
# profiles/<name>.md, else the default.
profile_get() {
local pf="$PROFILES_DIR/$1.md"
[[ -f "$pf" ]] || { printf '%s' "${3:-}"; return; }
fm_get "$pf" "$2" "${3:-}"
}
# profile_persona <profile-name> -> the multi-line `persona: |` block (2-space
# indent stripped), or empty. Used to prepend a persona overlay to the job body.
profile_persona() {
local pf="$PROFILES_DIR/$1.md"
[[ -f "$pf" ]] || return 0
awk '
NR==1 && $0!="---" { exit }
NR==1 { infm=1; next }
infm && $0=="---" { exit }
!infm { next }
inpersona {
if ($0 ~ /^[A-Za-z0-9_-]+[ \t]*:/) { inpersona=0 }
else { line=$0; sub(/^ /,"",line); print line; next }
}
$0 ~ /^persona[ \t]*:[ \t]*\|[ \t]*$/ { inpersona=1; next }
' "$pf"
}
# fm_eff <file> <job-key> [default] [profile-key] -> effective value with
# precedence job > profile > built-in default (§6 resolution). The profile is the
# job's `profile:` frontmatter; `profile-key` defaults to `job-key` (e.g. `verify`
# inherits the profile's `default-verify`). Inheritable: verify, capabilities,
# engine-class, prefers-engine, allowed-scope, review-policy.
fm_eff() {
local file=$1 jkey=$2 def=${3:-} pkey=${4:-$2} v prof pv
v=$(fm_get "$file" "$jkey" "")
if [[ -n "$v" ]]; then printf '%s' "$v"; return; fi
prof=$(fm_get "$file" profile "")
if [[ -n "$prof" ]]; then
pv=$(profile_get "$prof" "$pkey" "")
[[ -n "$pv" ]] && { printf '%s' "$pv"; return; }
fi
printf '%s' "$def"
}
# _mtime <file> -> file modification time in epoch seconds (BSD or GNU stat); empty if missing
_mtime() {
[[ -e "$1" ]] || { echo ""; return; }
stat -f %m "$1" 2>/dev/null || stat -c %Y "$1" 2>/dev/null || echo ""
}
# _pidstart <pid> -> the process start time as reported by ps (whitespace-normalized).
# Used as an identity token so a recycled pid is never mistaken for our worker.
_pidstart() { ps -o lstart= -p "$1" 2>/dev/null | awk '{$1=$1;print}'; }
# _pid_alive <pid> <pidstart> -> 0 if the pid is live AND (when a start time was
# recorded) its current start time still matches — defeating pid reuse.
_pid_alive() {
local pid=$1 want=$2 cur
[[ -n "$pid" ]] || return 1
kill -0 "$pid" 2>/dev/null || return 1
[[ -z "$want" ]] && return 0
cur=$(_pidstart "$pid")
[[ "$cur" == "$want" ]]
}
# _dur_to_secs <dur> -> seconds. Accepts 90, 90s, 45m, 2h, 1d. Invalid/empty -> 0.
_dur_to_secs() {
local d=$1
[[ -z "$d" || "$d" == "0" ]] && { echo 0; return; }
if [[ "$d" =~ ^([0-9]+)([smhd]?)$ ]]; then
local n=${BASH_REMATCH[1]} u=${BASH_REMATCH[2]}
case "$u" in
""|s) echo "$n";;
m) echo $((n*60));;
h) echo $((n*3600));;
d) echo $((n*86400));;
esac
else
echo 0
fi
}
# _meta_active <metafile> -> 0 if the job is occupying a concurrency slot.
# Active = no `ended=` AND (pid is live, OR pid not yet written but the meta was
# created moments ago — the reserved-slot window between meta-write and launch).
# The <30s guard prevents a meta orphaned mid-launch (daemon killed in the gap)
# from pinning a slot forever.
_meta_active() {
local f=$1 pid mt age
grep -q '^ended=' "$f" && return 1
pid=$(grep '^pid=' "$f" | head -1 | cut -d= -f2)
if [[ -n "$pid" ]]; then
local pidstart; pidstart=$(grep '^pidstart=' "$f" | head -1 | cut -d= -f2-)
_pid_alive "$pid" "$pidstart"
return $?
fi
mt=$(_mtime "$f"); age=$(( $(date +%s) - ${mt:-0} ))
[[ "$age" -lt 30 ]]
}
# active_workers -> count of jobs occupying a concurrency slot (reservation-aware).
active_workers() {
local n=0 f
for f in "$STATE"/*.meta; do
[[ -e "$f" ]] || continue
_meta_active "$f" && n=$((n+1))
done
echo "$n"
}
# busy_keys -> newline list of lock keys currently held by active workers.
busy_keys() {
local f
for f in "$STATE"/*.meta; do
[[ -e "$f" ]] || continue
_meta_active "$f" && grep '^lock=' "$f" | head -1 | cut -d= -f2-
done
}
# ── Manifest helpers (Phase 1 — §5/§6/§7/§8 single-host subset) ──────
#
# parse_list <raw> -> one token per line. Accepts a YAML-ish inline list
# ("[a, b]"), comma- or space-separated values, with surrounding quotes
# stripped. Empty tokens are dropped. Used for `capabilities`, `prefers`,
# `prefers-engine`, `deps`, etc.
parse_list() {
local raw cleaned t
cleaned=$(printf '%s' "${1:-}" | tr '[],' ' ')
for t in $cleaned; do
t=${t#[\"\']}; t=${t%[\"\']}
[[ -n "$t" ]] && printf '%s\n' "$t"
done
}
# _body_hash <file> -> stable content hash of the frontmatter-STRIPPED body.
# Drives idempotency-key dedupe on `add`. Prefers shasum/sha1sum, falls back
# to cksum so it works on a bare macOS/Linux host with no extra tools.
_body_hash() {
local f=$1
if command -v shasum >/dev/null 2>&1; then
strip_frontmatter "$f" | shasum | awk '{print $1}'
elif command -v sha1sum >/dev/null 2>&1; then
strip_frontmatter "$f" | sha1sum | awk '{print $1}'
else
strip_frontmatter "$f" | cksum | awk '{print $1"-"$2}'
fi
}
# priority_rank <priority> -> sort rank (lower = picked first). Unknown/empty
# defaults to medium. Used by inbox_sorted for priority-then-age selection.
priority_rank() {
case "$1" in
critical) echo 0;; high) echo 1;; medium) echo 2;; low) echo 3;; *) echo 2;;
esac
}
# inbox_sorted -> inbox/*.md ordered by priority (critical first) then oldest
# (filename embeds the queue timestamp, so a plain string sort = age order).
# Replaces the pure-FIFO `ls | sort`; per-lock serialization is unchanged
# (the caller still skips files whose lock key is busy).
inbox_sorted() {
local f p r
for f in "$INBOX"/*.md; do
[[ -e "$f" ]] || continue
p=$(fm_get "$f" priority medium); r=$(priority_rank "$p")
printf '%s\t%s\n' "$r" "$f"
done | sort -t"$(printf '\t')" -k1,1n -k2,2 | cut -f2-
}
# engine_available <engine> -> 0 if a runnable binary exists for the engine
# (executable path or a name resolvable on PATH). Honors the *_BIN overrides
# (so the self-test stub counts as "available").
engine_available() {
local eng=$1 bin=""
case "$eng" in
devin) bin=$DEVIN_BIN;;
claude) bin=$CLAUDE_BIN;;
codex) bin=$CODEX_BIN;;
copilot) bin=$COPILOT_BIN;;
*) return 1;;
esac
[[ -n "$bin" ]] || return 1
[[ -x "$bin" ]] && return 0
command -v "$bin" >/dev/null 2>&1
}
# engine_class_engines <class> -> candidate engines (priority order) for an
# engine-class (§5 taxonomy). `review-only` has no concrete mapping yet
# (reserved). Unknown class -> empty.
engine_class_engines() {
case "$1" in
agentic-coder) echo "devin claude codex";;
chat-coder) echo "copilot";;
review-only) echo "";;
*) echo "";;
esac
}
# detect_capabilities -> one capability token per line describing THIS host:
# os:<mac|linux|other> (os:any required-side wildcard matches this)
# engine:<devin|claude|codex|copilot> for each available engine
# node:<major> the host node major (compared by node<op>N)
# has:<git|pnpm|docker> tool probes (only emitted when present)
detect_capabilities() {
case "$(uname -s 2>/dev/null)" in
Darwin) echo "os:mac";;
Linux) echo "os:linux";;
*) echo "os:other";;
esac
local e
for e in devin claude codex copilot; do
engine_available "$e" && echo "engine:$e"
done
if command -v node >/dev/null 2>&1; then
local nv; nv=$(node -v 2>/dev/null | sed 's/^v//' | cut -d. -f1)
[[ "$nv" =~ ^[0-9]+$ ]] && echo "node:$nv"
fi
local t
for t in git pnpm docker; do
command -v "$t" >/dev/null 2>&1 && echo "has:$t"
done
}
# _cap_token_ok <required-token> <available-tokens> -> 0 if satisfied (§5 grammar):
# key:any wildcard — host advertises any "key:*" token (os:any matches all)
# key<op>ver numeric/semver-major compare against the host's "key:<val>" token
# (op in >= > = <= <)
# key:value exact token match
# key bare presence — exact, or advertised in any "key:"/"key<op>" form
_cap_token_ok() {
local tok=$1 avail=$2 a
if [[ "$tok" == *:any ]]; then
local wkey=${tok%:any}
for a in $avail; do [[ "$a" == "$wkey":* ]] && return 0; done
return 1
fi
if [[ "$tok" =~ ^([A-Za-z0-9_.:-]+)(\>=|\<=|=|\>|\<)([0-9][0-9.]*)$ ]]; then
local key=${BASH_REMATCH[1]} op=${BASH_REMATCH[2]} want=${BASH_REMATCH[3]} hostval=""
for a in $avail; do [[ "$a" == "$key":* ]] && { hostval=${a#*:}; break; }; done
[[ -n "$hostval" ]] || return 1
local hv=${hostval%%.*} wv=${want%%.*}
[[ "$hv" =~ ^[0-9]+$ && "$wv" =~ ^[0-9]+$ ]] || return 1
case "$op" in
">=") [[ "$hv" -ge "$wv" ]];;
">") [[ "$hv" -gt "$wv" ]];;
"=") [[ "$hv" -eq "$wv" ]];;
"<=") [[ "$hv" -le "$wv" ]];;
"<") [[ "$hv" -lt "$wv" ]];;
esac
return $?
fi
if [[ "$tok" == *:* ]]; then
for a in $avail; do [[ "$a" == "$tok" ]] && return 0; done
return 1
fi
for a in $avail; do
case "$a" in "$tok"|"$tok":*|"$tok"\>=*|"$tok"\>*|"$tok"=*|"$tok"\<=*|"$tok"\<*) return 0;; esac
done
return 1
}
# caps_match <required-tokens> <available-tokens> -> 0 iff EVERY required token
# is satisfied by the available set (both passed as whitespace-separated lists).
caps_match() {
local required=$1 available=$2 r
for r in $required; do
_cap_token_ok "$r" "$available" || return 1
done
return 0
}
# resolve_engine <file> -> the concrete engine to run:
# explicit `engine:` always wins; else `engine-class` picks the first
# available engine honoring `prefers-engine` then the class default order;
# else the global default engine. Prints "" only when an engine-class was
# requested but no engine in it is available (caller fails with no_engine).
resolve_engine() {
local f=$1 eng cls prefers
eng=$(fm_get "$f" engine "")
if [[ -n "$eng" ]]; then printf '%s' "$eng"; return 0; fi
cls=$(fm_eff "$f" engine-class "") # inherit engine-class from the job's profile
if [[ -z "$cls" ]]; then printf '%s' "$DEFAULT_ENGINE"; return 0; fi
local class_engines; class_engines=$(engine_class_engines "$cls")
prefers=$(fm_eff "$f" prefers-engine "")
local ordered=() seen=" " p c
if [[ -n "$prefers" ]]; then
while IFS= read -r p; do
[[ -n "$p" ]] || continue
case " $class_engines " in *" $p "*) ordered+=("$p"); seen+="$p ";; esac
done < <(parse_list "$prefers")
fi
for c in $class_engines; do
case "$seen" in *" $c "*) ;; *) ordered+=("$c"); seen+="$c ";; esac
done
local cand
for cand in ${ordered[@]+"${ordered[@]}"}; do
engine_available "$cand" && { printf '%s' "$cand"; return 0; }
done
printf '%s' ""
}
# ── deps / DAG, single host (§5) ─────────────────────────────────────
# deps reference other jobs by their (author-controlled) `idempotency-key`.
#
# _key_in_dir <key> <dir> -> 0 if some .md in <dir> has idempotency-key == key.
_key_in_dir() {
local key=$1 d=$2 ef
for ef in "$d"/*.md; do
[[ -e "$ef" ]] || continue
[[ "$(fm_get "$ef" idempotency-key "")" == "$key" ]] && return 0
done
return 1
}
# dep_satisfied <key> <mode> -> 0 when the dep is met: a job with <key> is in
# shipped/ (default), or shipped/ OR testing/ when mode is `soft`.
dep_satisfied() {
local key=$1 mode=$2
_key_in_dir "$key" "$SHIPPED" && return 0
[[ "$mode" == soft ]] && _key_in_dir "$key" "$TESTING" && return 0
return 1
}
# deps_unmet <file> -> space-separated list of this job's UNMET dep keys (empty if
# none / no deps). `deps-mode` (hard|soft) is job-level.
deps_unmet() {
local f=$1 keys mode k unmet=""
keys=$(parse_list "$(fm_get "$f" deps "")" | tr '\n' ' ')
[[ -n "${keys// /}" ]] || { printf ''; return 0; }
mode=$(fm_get "$f" deps-mode "hard")
for k in $keys; do
[[ -n "$k" ]] || continue
dep_satisfied "$k" "$mode" || unmet+="$k "
done
printf '%s' "${unmet% }"
}
# _deps_of_key <key> -> dep keys (space-separated) of the job carrying <key>,
# scanned across inbox + active stages.
_deps_of_key() {
local key=$1 d ef
for d in "$INBOX" "$BUILDING" "$REVIEW" "$TESTING" "$SHIPPED"; do
for ef in "$d"/*.md; do
[[ -e "$ef" ]] || continue
[[ "$(fm_get "$ef" idempotency-key "")" == "$key" ]] || continue
parse_list "$(fm_get "$ef" deps "")" | tr '\n' ' '
return 0
done
done
}
# deps_would_cycle <new-key> <new-deps-space> -> 0 if adding a job with <new-key>
# depending on <new-deps> would create a cycle (BFS over existing key->deps edges
# back to new-key; also catches self-dependency).
deps_would_cycle() {
local newkey=$1 newdeps=$2 visited=" " frontier next k d kd
[[ -n "$newkey" ]] || return 1
_in_list "$newkey" "$newdeps" && return 0
frontier="$newdeps"
while [[ -n "${frontier// /}" ]]; do
next=""
for k in $frontier; do
[[ -n "$k" ]] || continue
[[ "$k" == "$newkey" ]] && return 0
case "$visited" in *" $k "*) continue;; esac
visited+="$k "
kd=$(_deps_of_key "$k")
for d in $kd; do next+="$d "; done
done
frontier="$next"
done
return 1
}
# _drain_pending -> 0 if some inbox job can still make progress on its own: it is
# runnable now, or it is waiting on a retry/recovery backoff (which elapses with
# time). A job blocked ONLY by unmet deps is NOT pending while the loop is idle
# (no running job can satisfy its deps), so `--once` can drain past it.
_drain_pending() {
local cand cj ne now; now=$(date +%s)
for cand in "$INBOX"/*.md; do
[[ -e "$cand" ]] || continue
cj=$(basename "$cand"); cj=${cj%.md}
ne=$(grep '^next_eligible=' "$STATE/$cj.meta" 2>/dev/null | tail -1 | cut -d= -f2)
if [[ "$ne" =~ ^[0-9]+$ ]] && [[ "$ne" -gt "$now" ]]; then return 0; fi
[[ -n "$(deps_unmet "$cand")" ]] && continue
return 0
done
return 1
}
# ── allowed-scope guardrail (§6/§12) — WARN-ONLY this phase ───────────
# path_in_scope <path> <globs-space> -> 0 if <path> matches any allowed-scope glob
# (`dir/**` matches the whole subtree; `*` matches across `/`). Pure + testable.
path_in_scope() {
local path=$1 globs=$2 g pat
for g in $globs; do
[[ -n "$g" ]] || continue
pat=${g//\*\*/\*}
# shellcheck disable=SC2053
[[ "$path" == $pat ]] && return 0
[[ "$path" == "$g"/* ]] && return 0
done
return 1
}
# scope_check <cwd> <base> <scope> <logf> <metaf> -> log a WARNING (non-blocking)
# for changed paths outside allowed-scope. Records scope_warning= in the meta.
scope_check() {
local cwd=$1 base=$2 scope=$3 logf=$4 metaf=$5 changed globs p out=""
_is_git_repo "$cwd" || return 0
if [[ -n "$base" ]]; then
changed=$(git -C "$cwd" diff --name-only "$base" HEAD 2>/dev/null)
else
changed=$(git -C "$cwd" diff --name-only HEAD 2>/dev/null)
fi
[[ -n "$changed" ]] || return 0
globs=$(parse_list "$scope" | tr '\n' ' ')
[[ -n "${globs// /}" ]] || return 0
while IFS= read -r p; do
[[ -n "$p" ]] || continue
path_in_scope "$p" "$globs" || out+="$p "
done <<< "$changed"
if [[ -n "$out" ]]; then
echo "WARNING: allowed-scope violation (warn-only) — changed outside [$globs]: ${out% }" >> "$logf"
echo "scope_warning=${out% }" >> "$metaf"
fi
}
# ── Engine driver: builds argv into AGENT_CMD[]; sets AGENT_STDIN if the ──
# prompt should be fed on stdin (claude/codex) rather than a flag. $pf is the
# frontmatter-STRIPPED body file, so a body starting with '--' is never
# misparsed as a CLI option.
build_agent_cmd() {
local engine=$1 pf=$2 yolo=$3
AGENT_CMD=(); AGENT_STDIN=""
case "$engine" in
devin)
AGENT_CMD=( "$DEVIN_BIN" -p --prompt-file "$pf" )
[[ "$yolo" == "true" ]] && AGENT_CMD+=( --permission-mode dangerous )
;;
claude)
AGENT_CMD=( "$CLAUDE_BIN" -p )
[[ "$yolo" == "true" ]] && AGENT_CMD+=( --dangerously-skip-permissions )
AGENT_STDIN="$pf"
;;
codex)
AGENT_CMD=( "$CODEX_BIN" exec )
[[ "$yolo" == "true" ]] && AGENT_CMD+=( --dangerously-bypass-approvals-and-sandbox )
AGENT_STDIN="$pf"
;;
copilot)
# Best-effort GitHub Copilot CLI mapping for the chat-coder engine-class.
# Flags drift between CLI versions — this is the single place to edit.
AGENT_CMD=( "$COPILOT_BIN" -p )
[[ "$yolo" == "true" ]] && AGENT_CMD+=( --allow-all-tools )
AGENT_STDIN="$pf"
;;
*) die "unknown engine '$engine' (use: devin | claude | codex | copilot)";;
esac
}
# ── Worker: runs one job to completion (invoked in background) ───────
run_worker() {
local doing_file=$1
local job; job=$(basename "$doing_file")
job=${job%.md}
local engine cwd yolo logf metaf
cwd=$(fm_get "$doing_file" cwd "$PWD")
yolo=$(fm_get "$doing_file" yolo "true")
logf="$LOGS/$job.log"
metaf="$STATE/$job.meta"
# NOTE: the parent (cmd_run) creates $metaf with job/engine/cwd/started/pid.
# The worker only ever APPENDS (ended/exit/result) to avoid a truncation race.
# ── Capability gate (§5/§8 single-host): if the job declares `capabilities`
# (own or inherited from its profile) this host does not satisfy, route to
# failed/ WITHOUT launching the agent. ──
local req_caps; req_caps=$(parse_list "$(fm_eff "$doing_file" capabilities "")" | tr '\n' ' ')
if [[ -n "${req_caps// /}" ]]; then
local avail; avail=$(detect_capabilities)
if ! caps_match "$req_caps" "$avail"; then
{
echo "===== agent-queue job: $job ====="
echo "CAPABILITY MISMATCH — host cannot satisfy required: $req_caps"
echo "host advertises: $(printf '%s' "$avail" | tr '\n' ' ')"
echo "agent NOT launched — routed to failed/ ($(date))"
} >> "$logf"
mv "$doing_file" "$FAILED/" 2>/dev/null
{ echo "result=capability_mismatch"; echo "ended=$(date +%s)"; } >> "$metaf"
_auto_echo "$job"
return 0
fi
fi
# ── Engine resolution (§5): explicit `engine` wins, else `engine-class`.
# No available engine for a requested class -> fail (no_engine), no launch. ──
engine=$(resolve_engine "$doing_file")
if [[ -z "$engine" ]]; then
{
echo "===== agent-queue job: $job ====="
echo "NO ENGINE — engine-class '$(fm_get "$doing_file" engine-class "")' has no available engine on this host"
echo "agent NOT launched — routed to failed/ ($(date))"
} >> "$logf"
mv "$doing_file" "$FAILED/" 2>/dev/null
{ echo "result=no_engine"; echo "ended=$(date +%s)"; } >> "$metaf"
_auto_echo "$job"
return 0
fi
{
echo "===== agent-queue job: $job ====="
echo "engine=$engine cwd=$cwd yolo=$yolo"
echo "started: $(date)"
echo "================================="
} >> "$logf"
if [[ ! -d "$cwd" ]]; then
echo "FATAL: cwd does not exist: $cwd" >> "$logf"
mv "$doing_file" "$FAILED/" 2>/dev/null
echo "result=failed" >> "$metaf"; echo "ended=$(date +%s)" >> "$metaf"
_auto_echo "$job"
return 1
fi
local started; started=$(grep '^started=' "$metaf" 2>/dev/null | tail -1 | cut -d= -f2)
# Strip our frontmatter so the agent only sees the task body.
local bodyf="$STATE/$job.body.md"
strip_frontmatter "$doing_file" > "$bodyf"
# ── Persona injection (§6): prepend the profile's persona to the body fed to
# the engine (job body unchanged on disk). Secrets are never logged. ──
local prof; prof=$(fm_get "$doing_file" profile "")
if [[ -n "$prof" ]]; then
local persona; persona=$(profile_persona "$prof")
if [[ -n "$persona" ]]; then
{ printf '%s\n\n' "$persona"; cat "$bodyf"; } > "$bodyf.tmp" && mv "$bodyf.tmp" "$bodyf"
echo "profile: injected persona overlay from '$prof'" >> "$logf"
fi
fi
build_agent_cmd "$engine" "$bodyf" "$yolo"
# ── WIP checkpoint setup (§25.2): on a git cwd, create/checkout aq/wip/<job>
# so partial work survives a crash; a trap guarantees a checkpoint on EVERY
# exit path (success, failure, timeout, SIGTERM/SIGINT). Non-git cwd: no-op. ──
WIP_ACTIVE=0; WIP_BASE=""; WIP_DONE=0
_worker_trap() {
[[ "$WIP_DONE" == 1 ]] && return
WIP_DONE=1
[[ "$WIP_ACTIVE" == 1 ]] && _wip_checkpoint "$job" "$cwd" "$metaf" "$logf" "trap-exit"
}
trap '_worker_trap' EXIT
trap '_worker_trap; exit 143' INT TERM
_wip_start "$job" "$cwd" "$metaf" "$logf" || true
_run_agent() {
if [[ -n "$AGENT_STDIN" ]]; then
( cd "$cwd" && "${AGENT_CMD[@]}" < "$AGENT_STDIN" )
else
( cd "$cwd" && "${AGENT_CMD[@]}" )
fi
}
local rc=0 lockkey tmo timed_out=false
lockkey=$(lock_key_for "$doing_file")
tmo=$(_dur_to_secs "$(fm_get "$doing_file" timeout "0")")
local tmo_flag="$STATE/$job.timedout"; rm -f "$tmo_flag"
local lf="$LOCKS/$(_keyhash "$lockkey").lock"
if [[ "$tmo" -gt 0 && -n "$TIMEOUT_BIN" ]]; then
# Hard timeout via timeout/gtimeout (kills the whole process tree).
AQ_STDIN="$AGENT_STDIN" "$TIMEOUT_BIN" -k 5 "${tmo}s" bash -c '
cd "$1" || exit 97; shift
if [ -n "${AQ_STDIN:-}" ]; then exec "$@" < "$AQ_STDIN"; else exec "$@"; fi
' _ "$cwd" "${AGENT_CMD[@]}" >> "$logf" 2>&1
rc=$?
[[ $rc -eq 124 ]] && timed_out=true
elif [[ "$tmo" -gt 0 ]]; then
# Portable watchdog fallback (no timeout binary). Flags the timeout and
# signals the worker; install coreutils (gtimeout) for hard tree kills.
_run_agent >> "$logf" 2>&1 &
local apid=$!
( sleep "$tmo"; : > "$tmo_flag"
pkill -TERM -P "$apid" 2>/dev/null; kill -TERM "$apid" 2>/dev/null
sleep 5; pkill -KILL -P "$apid" 2>/dev/null; kill -KILL "$apid" 2>/dev/null ) &
local wpid=$!
wait "$apid" 2>/dev/null; rc=$?
kill "$wpid" 2>/dev/null; wait "$wpid" 2>/dev/null
[[ -f "$tmo_flag" ]] && timed_out=true
elif [[ -n "$FLOCK_BIN" ]]; then
# Cross-process hardening where flock exists (Linux CI). The single run-loop
# already serializes by lock key; this guards against a stray second launcher.
( "$FLOCK_BIN" -n 9 || exit 75; _run_agent ) 9>"$lf" >> "$logf" 2>&1
rc=$?
if [[ $rc -eq 75 ]]; then
echo "lock busy (key=$lockkey) — requeued to inbox" >> "$logf"
mv "$doing_file" "$INBOX/" 2>/dev/null
{ echo "ended=$(date +%s)"; echo "result=requeued"; } >> "$metaf"
return 0
fi
else
_run_agent >> "$logf" 2>&1
rc=$?
fi
rm -f "$tmo_flag"
# ── Preserve work + capture run metrics on EVERY path (§25.2/§26.1/§26.2) ──
if [[ "$WIP_ACTIVE" == 1 ]]; then
_wip_checkpoint "$job" "$cwd" "$metaf" "$logf" "agent-exit"; WIP_DONE=1
fi
_numstat_into_meta "$cwd" "$WIP_BASE" "$metaf"
parse_usage "$engine" "$logf" >> "$metaf"
# ── allowed-scope guardrail (§6) — WARN-ONLY: flag out-of-scope changes but
# never block the job this phase. Scope may be inherited from the profile. ──
local scope; scope=$(fm_eff "$doing_file" allowed-scope "" allowed-scope)
[[ -n "$scope" ]] && scope_check "$cwd" "$WIP_BASE" "$scope" "$logf" "$metaf"
if $timed_out; then
echo "TIMED OUT after ${tmo}s (rc=$rc): $(date)" >> "$logf"
_finish_failure "$job" "$doing_file" "$metaf" "$logf" "timeout" "$rc" "$started"
elif [[ $rc -eq 0 ]]; then
# Agent succeeded: land in review/, then run the auto-QA verify gate. The
# worker is still alive here so the concurrency slot stays held through
# verification — `ended=` is written only once we reach a resting stage.
mv "$doing_file" "$REVIEW/" 2>/dev/null
local review_file="$REVIEW/$job.md"
echo "exit=$rc" >> "$metaf"
echo "completed OK (rc=0): landed in review — $(date)" >> "$logf"
# verify is job-level, else inherited from the profile's default-verify.
local verify; verify=$(fm_eff "$review_file" verify "$DEFAULT_VERIFY" default-verify)
if [[ -z "$verify" ]]; then
_meta_end "$metaf" "review" "$started"
echo "no verify command — parked in review for manual promote: $(date)" >> "$logf"
else
echo "----- verify: $verify -----" >> "$logf"
local vrc=0
( cd "$cwd" && bash -c "$verify" ) >> "$logf" 2>&1 || vrc=$?
echo "verify_exit=$vrc" >> "$metaf"
if [[ $vrc -eq 0 ]]; then
mv "$review_file" "$TESTING/" 2>/dev/null
_meta_end "$metaf" "testing" "$started"
echo "VERIFY PASSED — promoted to testing (QA): $(date)" >> "$logf"
else
echo "VERIFY FAILED (rc=$vrc): $(date)" >> "$logf"
# verify ran on the review_file; retry policy may requeue it.
_finish_failure "$job" "$review_file" "$metaf" "$logf" "verify_failed" "$rc" "$started"
fi
fi
else
echo "FAILED (rc=$rc): $(date)" >> "$logf"
_finish_failure "$job" "$doing_file" "$metaf" "$logf" "crash" "$rc" "$started"
fi
# Opt-in echo of the resting/terminal outcome back to the tracker (§10).
_auto_echo "$job"
}
# ── Resilience & insights helpers (Phase 1 — single-host §25/§26) ────
#
# _in_list <item> <space-separated-list> -> 0 if item is present.
_in_list() { case " ${2:-} " in *" $1 "*) return 0;; esac; return 1; }
# _meta_end <metafile> <result> <started-epoch> -> append result/ended/duration
# (append-only; never truncates a live meta — §25 state integrity).
_meta_end() {
local mf=$1 res=$2 now; now=$(date +%s); local st=${3:-$now}
[[ "$st" =~ ^[0-9]+$ ]] || st=$now
{ echo "result=$res"; echo "ended=$now"; echo "duration_s=$(( now - st ))"; } >> "$mf"
}
# ── git WIP checkpointing (§25.2) ──
_is_git_repo() { git -C "$1" rev-parse --is-inside-work-tree >/dev/null 2>&1; }
_wip_branch() { printf 'aq/wip/%s' "$1"; }
# _wip_start <job> <cwd> <metaf> <logf> -> ensure/checkout the WIP branch.
# Sets globals WIP_ACTIVE (1 when a git WIP branch is in play) and WIP_BASE.
# RESUME: if aq/wip/<job> already exists (orphan/retry relaunch), check it out so
# the agent continues from the checkpoint instead of from zero. The base commit is
# persisted in a write-once sidecar so it survives meta re-creation across attempts.
_wip_start() {
local job=$1 cwd=$2 metaf=$3 logf=$4
WIP_ACTIVE=0; WIP_BASE=""
if ! _is_git_repo "$cwd"; then
echo "wip: cwd not a git repo — skipping checkpoint" >> "$logf"
return 1
fi
local br basefile base; br=$(_wip_branch "$job"); basefile="$STATE/$job.wipbase"
if git -C "$cwd" show-ref --verify --quiet "refs/heads/$br"; then
git -C "$cwd" checkout "$br" >/dev/null 2>&1 \
|| echo "wip: could not checkout $br (resume) — staying on current branch" >> "$logf"
echo "wip: resuming on $br ($(date))" >> "$logf"
else
base=$(git -C "$cwd" rev-parse --short HEAD 2>/dev/null || echo "")
if ! git -C "$cwd" checkout -b "$br" >/dev/null 2>&1; then
echo "wip: could not create $br — skipping checkpoint" >> "$logf"
return 1
fi
[[ -n "$base" ]] && printf '%s\n' "$base" > "$basefile"
echo "wip: created $br from ${base:-<root>} ($(date))" >> "$logf"
fi
base=$(cat "$basefile" 2>/dev/null || echo "")
{ echo "wip_branch=$br"; echo "wip_base=$base"; } >> "$metaf"
WIP_ACTIVE=1; WIP_BASE="$base"
return 0
}
# _wip_checkpoint <job> <cwd> <metaf> <logf> <stage> -> commit any changes in cwd
# onto aq/wip/<job>. Idempotent (no commit when the tree is clean). NEVER commits
# to a non-WIP branch — protects main/protected branches (§12).
_wip_checkpoint() {
local job=$1 cwd=$2 metaf=$3 logf=$4 stage=$5
_is_git_repo "$cwd" || return 0
local br cur; br=$(_wip_branch "$job")
cur=$(git -C "$cwd" symbolic-ref --short HEAD 2>/dev/null || echo "")
if [[ "$cur" != "$br" ]]; then
git -C "$cwd" checkout "$br" >/dev/null 2>&1 \
|| { echo "wip: not on $br — skipping checkpoint to protect '$cur'" >> "$logf"; return 0; }
fi
git -C "$cwd" add -A >/dev/null 2>&1
if git -C "$cwd" diff --cached --quiet 2>/dev/null; then
return 0 # nothing to preserve
fi
git -C "$cwd" -c user.email=agent-queue@local -c user.name=agent-queue \
commit -q -m "aq wip: $job ($stage)" >/dev/null 2>&1
local c; c=$(git -C "$cwd" rev-parse --short HEAD 2>/dev/null || echo "")
[[ -n "$c" ]] && echo "wip_commit=$c" >> "$metaf"
echo "wip: checkpoint ($stage) -> ${c:-?} ($(date))" >> "$logf"
}
# _numstat_into_meta <cwd> <base> <metaf> -> record files_changed/lines_added/
# lines_deleted for the run (base..HEAD on the WIP branch; binary files count 0).
_numstat_into_meta() {
local cwd=$1 base=$2 metaf=$3 out stats
_is_git_repo "$cwd" || return 0
if [[ -n "$base" ]]; then
out=$(git -C "$cwd" diff --numstat "$base" HEAD 2>/dev/null)
else
out=$(git -C "$cwd" diff --numstat HEAD 2>/dev/null)
fi
[[ -n "$out" ]] || return 0
stats=$(printf '%s\n' "$out" | awk '
{ if ($1 ~ /^[0-9]+$/) a+=$1; if ($2 ~ /^[0-9]+$/) d+=$2; f++ }
END { printf "%d %d %d", f+0, a+0, d+0 }')
# shellcheck disable=SC2086
set -- $stats
{ echo "files_changed=$1"; echo "lines_added=$2"; echo "lines_deleted=$3"; } >> "$metaf"
}
# ── retry policy (§5/§11/§25.3) ──
# Parse `retry: { max: N, backoff: 5m, on: [classes] }`. Absent/empty -> no retry.
_retry_max() { local m; m=$(printf '%s' "${1:-}" | grep -oE 'max:[[:space:]]*[0-9]+' | grep -oE '[0-9]+' | head -1); echo "${m:-0}"; }
_retry_backoff_s() { local b; b=$(printf '%s' "${1:-}" | grep -oE 'backoff:[[:space:]]*[0-9]+[smhd]?' | sed -E 's/^backoff:[[:space:]]*//' | head -1); _dur_to_secs "${b:-0}"; }
_retry_on() {
local raw=${1:-} inside
inside=$(printf '%s' "$raw" | sed -nE 's/.*on:[[:space:]]*\[([^]]*)\].*/\1/p')
[[ -n "$inside" ]] || inside=$(printf '%s' "$raw" | sed -nE 's/.*on:[[:space:]]*([A-Za-z_,[:space:]-]+).*/\1/p')
parse_list "$inside" | tr '\n' ' '
}
# _class_retryable <class> <on-list> -> 0 if this failure class should retry.
# `crash` and `agent_error` are synonyms for a non-zero agent exit.
_class_retryable() {
local class=$1 on=$2
case "$class" in
crash) _in_list crash "$on" || _in_list agent_error "$on";;
*) _in_list "$class" "$on";;
esac
}
# _finish_failure <job> <file> <metaf> <logf> <class> <rc> <started> -> apply the
# retry policy to a failed run. Retries (requeue to inbox with backoff via
# next_eligible) while the class is in `retry.on` and attempts <= max; on
# exhaustion -> failed/ result=retries_exhausted; with no policy -> failed/ with
# the natural result (failed|timeout|verify_failed). The WIP branch is preserved
# either way so a retry resumes from the checkpoint.
_finish_failure() {
local job=$1 file=$2 metaf=$3 logf=$4 class=$5 rc=$6 started=$7
local raw max on attempts; raw=$(fm_get "$file" retry "")
attempts=$(grep '^attempts=' "$metaf" 2>/dev/null | tail -1 | cut -d= -f2); attempts=${attempts:-1}
max=$(_retry_max "$raw"); on=$(_retry_on "$raw")
if [[ "$max" -gt 0 ]] && _class_retryable "$class" "$on" && [[ "$attempts" -le "$max" ]]; then
local backoff now next na; backoff=$(_retry_backoff_s "$raw")
now=$(date +%s); next=$(( now + backoff )); na=$(( attempts + 1 ))
mv "$file" "$INBOX/" 2>/dev/null
{
echo "exit=$rc"; echo "attempts=$na"; echo "next_eligible=$next"
echo "retry_class=$class"; echo "result=retry_scheduled"
echo "ended=$now"; echo "duration_s=$(( now - ${started:-now} ))"
} >> "$metaf"
echo "RETRY scheduled: class=$class, attempt $attempts/$max, backoff ${backoff}s -> inbox ($(date))" >> "$logf"
return 0
fi
local result
if [[ "$max" -gt 0 ]] && _class_retryable "$class" "$on"; then
result="retries_exhausted"
echo "RETRIES EXHAUSTED after $attempts attempt(s) (class=$class) -> failed/ ($(date))" >> "$logf"
else
case "$class" in
timeout) result="timeout";;
verify_failed) result="verify_failed";;
*) result="failed";;
esac
fi
echo "exit=$rc" >> "$metaf"
mv "$file" "$FAILED/" 2>/dev/null
_meta_end "$metaf" "$result" "$started"
}
# ── orphan recovery (§25.3) ──
# recover_orphans -> move building/ jobs whose worker is dead back to inbox/ for
# re-selection (resume-aware via the WIP branch), incrementing attempts. Idempotent:
# once moved out of building/ a job is never recovered twice. A retry-capped job
# that has exhausted crash retries goes to failed/ result=retries_exhausted instead
# of looping forever; otherwise recovery never strands work.
recover_orphans() {
local f job metaf pid pidstart
for f in "$BUILDING"/*.md; do
[[ -e "$f" ]] || continue
job=$(basename "$f"); job=${job%.md}; metaf="$STATE/$job.meta"
if [[ -f "$metaf" ]] && ! grep -q '^ended=' "$metaf"; then
pid=$(grep '^pid=' "$metaf" 2>/dev/null | tail -1 | cut -d= -f2)
pidstart=$(grep '^pidstart=' "$metaf" 2>/dev/null | tail -1 | cut -d= -f2-)
_pid_alive "$pid" "$pidstart" && continue # a live worker still owns it
fi
local prev na raw max now; prev=$(grep '^attempts=' "$metaf" 2>/dev/null | tail -1 | cut -d= -f2); prev=${prev:-1}
raw=$(fm_get "$f" retry ""); max=$(_retry_max "$raw"); now=$(date +%s)
if [[ "$max" -gt 0 ]] && _class_retryable crash "$(_retry_on "$raw")" && [[ "$prev" -gt "$max" ]]; then
mv "$f" "$FAILED/" 2>/dev/null
{ echo "attempts=$prev"; echo "recovered=$now"; } >> "$metaf"
_meta_end "$metaf" "retries_exhausted" "$(grep '^started=' "$metaf" | tail -1 | cut -d= -f2)"
echo "ORPHAN: $job exhausted crash retries -> failed/ ($(date))" >> "$LOGS/$job.log"
log "↻ orphan $C_BOLD$job$C_RESET exhausted retries -> failed"
continue
fi
na=$(( prev + 1 ))
local next=""; [[ "$max" -gt 0 ]] && next=$(( now + $(_retry_backoff_s "$raw") ))
mv "$f" "$INBOX/" 2>/dev/null
{
echo "attempts=$na"; echo "recovered=$now"
[[ -n "$next" ]] && echo "next_eligible=$next"
echo "result=recovered"; echo "ended=$now"
} >> "$metaf"
echo "ORPHAN RECOVERED: $job (worker dead) -> inbox, attempt now $na ($(date))" >> "$LOGS/$job.log"
log "↻ recovered orphan $C_BOLD$job$C_RESET (attempt $na)"
done
}
# ── token / cost capture (§26.2) ──
# parse_usage <engine> <logfile> -> emit `key=value` usage lines (model, tokens_in,
# tokens_out, tokens_cached, cost_usd, turns, tool_calls, usage_estimated) when the
# engine's output exposes them. This is the SINGLE place per-engine extraction lives.
# A wrapper of any engine may emit a machine-readable `AQ_USAGE k=v ...` line, which
# is always honored; engine-specific heuristics are best-effort (real where known,
# TODO otherwise). Never fabricate precise numbers — omit or mark usage_estimated.
parse_usage() {
local engine=$1 log=$2
[[ -f "$log" ]] || return 0
# 1) Generic, explicit usage line (preferred; emitted by any cooperating wrapper).
local line; line=$(grep -E '^AQ_USAGE ' "$log" 2>/dev/null | tail -1)
if [[ -n "$line" ]]; then
local kv
for kv in ${line#AQ_USAGE }; do
case "$kv" in
model=*|tokens_in=*|tokens_out=*|tokens_cached=*|cost_usd=*|turns=*|tool_calls=*|usage_estimated=*) echo "$kv";;
esac
done
return 0
fi
# 2) Engine-specific best-effort heuristics (real where the format is known).
local ti to
case "$engine" in
claude)
# Claude Code can surface usage as JSON-ish input_tokens/output_tokens.
ti=$(grep -oE '"input_tokens"[": ]+[0-9]+' "$log" 2>/dev/null | grep -oE '[0-9]+' | tail -1)
to=$(grep -oE '"output_tokens"[": ]+[0-9]+' "$log" 2>/dev/null | grep -oE '[0-9]+' | tail -1)
[[ -n "$ti" ]] && echo "tokens_in=$ti"
[[ -n "$to" ]] && echo "tokens_out=$to"
;;
codex)
# OpenAI usage object: prompt_tokens / completion_tokens.
ti=$(grep -oE '"prompt_tokens"[": ]+[0-9]+' "$log" 2>/dev/null | grep -oE '[0-9]+' | tail -1)
to=$(grep -oE '"completion_tokens"[": ]+[0-9]+' "$log" 2>/dev/null | grep -oE '[0-9]+' | tail -1)
[[ -n "$ti" ]] && echo "tokens_in=$ti"
[[ -n "$to" ]] && echo "tokens_out=$to"
;;
devin) : ;; # TODO: Devin session metrics are exposed via API, not the local log.
copilot) : ;; # TODO: GitHub Copilot CLI usage format not yet documented here.
esac
return 0
}
# ── insights helpers (§26) ──
# _meta_val <metafile> <key> -> last value for key (append-only safe), else empty.
_meta_val() { grep "^$2=" "$1" 2>/dev/null | tail -1 | cut -d= -f2-; }
# _result_is_success <result> -> 0 if the agent run succeeded (reached a good stage).
_result_is_success() { case "$1" in review|testing|shipped) return 0;; *) return 1;; esac; }
# _insights_line <metafile> -> compact one-line metrics summary for status/dash.
_insights_line() {
local f=$1 s="" v ti to la ld
v=$(_meta_val "$f" attempts); [[ -n "$v" ]] && s+="attempt $v "
v=$(_meta_val "$f" duration_s); [[ -n "$v" ]] && s+="${v}s "
ti=$(_meta_val "$f" tokens_in); to=$(_meta_val "$f" tokens_out)
[[ -n "$ti$to" ]] && s+="tok ${ti:-0}/${to:-0} "
v=$(_meta_val "$f" cost_usd); [[ -n "$v" ]] && s+="usd=$v "
la=$(_meta_val "$f" lines_added); ld=$(_meta_val "$f" lines_deleted)
[[ -n "$la$ld" ]] && s+="+${la:-0}/-${ld:-0} "
[[ -n "$(_meta_val "$f" usage_estimated)" ]] && s+="(est) "
printf 'insights: %s' "${s:-(pending)}"
}
# ── Commands ────────────────────────────────────────────────────────
# ── Tracker integration (§10) — task <-> job round-trip ─────────────
#
# tracker_api <METHOD> <PATH> [JSON] -> emits the response body, then a final
# line with the HTTP status code. ALL HTTP goes through here (curl only). Tests
# replace it wholesale via AQ_TRACKER_API_CMD so no live service is needed.
tracker_api() {
local method=$1 path=$2 body=${3:-}
if [[ -n "$AQ_TRACKER_API_CMD" ]]; then
"$AQ_TRACKER_API_CMD" "$method" "$path" "$body"
return $?
fi
local url="${AQ_TRACKER_API}${path}"
local -a args=(-sS -m "${AQ_TRACKER_TIMEOUT:-30}" -X "$method"
-H "Content-Type: application/json" -w '\n%{http_code}')
[[ -n "$AQ_TRACKER_TOKEN" ]] && args+=(-H "Authorization: Bearer $AQ_TRACKER_TOKEN")
[[ -n "$AQ_PRODUCT_ID" ]] && args+=(-H "X-Product-Id: $AQ_PRODUCT_ID")
[[ -n "$body" ]] && args+=(--data "$body")
local out rc
out=$("$CURL_BIN" "${args[@]}" "$url" 2>/dev/null); rc=$?
if [[ $rc -ne 0 ]]; then printf '%s\n000\n' "$out"; else printf '%s\n' "$out"; fi
}
# _api_call <METHOD> <PATH> [JSON] -> sets globals API_BODY + API_CODE.
_api_call() {
local out; out=$(tracker_api "$@")
API_CODE=$(printf '%s' "$out" | tail -n1)
API_BODY=$(printf '%s' "$out" | sed '$d')
}
# _json_str <key> (reads JSON on stdin) -> the top-level string value, unescaped.
# Pure awk (POSIX) — mac + linux safe, no jq dependency.
_json_str() {
awk -v key="$1" '
{ json = json $0 }
END {
pat = "\"" key "\""
i = index(json, pat); if (i == 0) exit
rest = substr(json, i + length(pat))
sub(/^[ \t]*:[ \t]*/, "", rest)
if (substr(rest, 1, 1) != "\"") exit
rest = substr(rest, 2); n = length(rest)
for (j = 1; j <= n; j++) {
c = substr(rest, j, 1)
if (c == "\\") { j++; e = substr(rest, j, 1)
if (e == "n") out = out "\n"; else if (e == "t") out = out "\t"
else if (e == "r") out = out "\r"; else out = out e }
else if (c == "\"") break
else out = out c
}
printf "%s", out
}'
}
# _json_labels (reads JSON on stdin) -> one labels[] entry per line.
_json_labels() {
awk '
{ json = json $0 }
END {
i = index(json, "\"labels\""); if (i == 0) exit
rest = substr(json, i); lb = index(rest, "["); rb = index(rest, "]")
if (lb == 0 || rb == 0 || rb < lb) exit
arr = substr(rest, lb + 1, rb - lb - 1)
while (match(arr, /"([^"\\]|\\.)*"/)) {
tok = substr(arr, RSTART + 1, RLENGTH - 2)
gsub(/\\"/, "\"", tok); print tok
arr = substr(arr, RSTART + RLENGTH)
}
}'
}
# _json_escape <text> -> the text as a JSON string body (no surrounding quotes).
_json_escape() {
printf '%s' "$1" | awk '
{ line = $0; gsub(/\\/, "\\\\", line); gsub(/"/, "\\\"", line); gsub(/\t/, "\\t", line)
out = out (NR > 1 ? "\\n" : "") line }
END { printf "%s", out }'
}
# _tracker_status_for <result> -> the Item status for a job result/stage.
_tracker_status_for() {
case "$1" in
shipped) printf '%s' "$AQ_TRACKER_STATUS_DONE";;
failed|timeout|verify_failed|retries_exhausted|capability_mismatch|no_engine|rejected)
printf '%s' "$AQ_TRACKER_STATUS_FAILED";;
*) printf '%s' "$AQ_TRACKER_STATUS_INPROGRESS";;
esac
}
# _tracker_note <job> <metaf> <result> <status> -> a metrics-only summary line.
# NEVER includes prompt/body content or secrets — only run metrics (§24.5/§26).
_tracker_note() {
local jn=$1 metaf=$2 result=$3 status=$4 s attempts dur ti to cost la ld
attempts=$(_meta_val "$metaf" attempts); dur=$(_meta_val "$metaf" duration_s)
ti=$(_meta_val "$metaf" tokens_in); to=$(_meta_val "$metaf" tokens_out)
cost=$(_meta_val "$metaf" cost_usd)
la=$(_meta_val "$metaf" lines_added); ld=$(_meta_val "$metaf" lines_deleted)
s="agent-queue: job ${jn} -> ${result:-$status} (status=${status})."
[[ -n "$attempts" ]] && s+=" attempts=${attempts}."
[[ -n "$dur" ]] && s+=" duration=${dur}s."
[[ -n "$ti$to" ]] && s+=" tokens=${ti:-0}/${to:-0}."
[[ -n "$cost" ]] && s+=" cost_usd=${cost}."
[[ -n "$la$ld" ]] && s+=" diff=+${la:-0}/-${ld:-0}."
printf '%s' "$s"
}
# from-tracker <ITEM_ID> — pull a tracker Item and materialize a job in inbox/.
# Idempotent on the derived key `tracker-<ITEM_ID>` (Slice 1 dedupe).
cmd_from_tracker() {
ensure_dirs
local item_id="${1:-}"
[[ -n "$item_id" ]] || die "usage: from-tracker <ITEM_ID>"
_api_call GET "/api/items/$item_id"
case "$API_CODE" in 2*) :;; *) die "from-tracker: items API returned HTTP ${API_CODE:-error} for item $item_id";; esac
local title desc iprio
title=$(printf '%s' "$API_BODY" | _json_str title)
desc=$(printf '%s' "$API_BODY" | _json_str description)
iprio=$(printf '%s' "$API_BODY" | _json_str priority)
[[ -n "$title$desc" ]] || die "from-tracker: item $item_id has no title/description (HTTP $API_CODE)"
# labels carry optional manifest hints: engine-class:, profile:, priority:, cap:
local engine_class="" profile="" prio="" caps_list="" label
while IFS= read -r label; do
[[ -n "$label" ]] || continue
case "$label" in
engine-class:*) engine_class=${label#engine-class:};;
profile:*) profile=${label#profile:};;
priority:*) prio=${label#priority:};;
cap:*) caps_list+="${label#cap:}, ";;
esac
done < <(printf '%s' "$API_BODY" | _json_labels)
prio=${prio:-${iprio:-medium}}
case "$prio" in critical|high|medium|low) :;; *) prio=medium;; esac
# Materialize into a .md file (so cmd_add/the queue recognize it). mktemp -d is
# the portable way to get a unique path with a fixed (.md) basename on mac+linux.
local safe_id tmpdir tmp
safe_id=$(printf '%s' "$item_id" | tr -c 'A-Za-z0-9._-' '_')
tmpdir=$(mktemp -d "${TMPDIR:-/tmp}/aq-fromtracker.XXXXXX")
tmp="$tmpdir/tracker-$safe_id.md"
{
echo "---"
echo "cwd: $AQ_TRACKER_CWD"
echo "yolo: true"
echo "priority: $prio"
[[ -n "$engine_class" ]] && echo "engine-class: $engine_class"
[[ -n "$profile" ]] && echo "profile: $profile"
[[ -n "$caps_list" ]] && echo "capabilities: [${caps_list%, }]"
echo "tracker-item: $item_id"
echo "idempotency-key: tracker-$item_id"
echo "---"
echo
[[ -n "$title" ]] && { echo "# $title"; echo; }
printf '%s\n' "$desc"
} > "$tmp"
cmd_add "$tmp"
rm -rf "$tmpdir"
local created; created=$(grep -lE "^tracker-item:[[:space:]]*${item_id}[[:space:]]*\$" "$INBOX"/*.md 2>/dev/null | head -1)
if [[ -n "$created" ]]; then
log "from-tracker: item $item_id -> $C_BOLD$(basename "$created")$C_RESET"
else
log "from-tracker: item $item_id already queued elsewhere (deduped)"
fi
}
# to-tracker <job> — one-way echo of a job's CURRENT outcome to its tracker Item
# (child -> tracker, §24.5). Idempotent via meta `tracker_echoed`; never fatal.
cmd_to_tracker() {
ensure_dirs
local job="${1:-}"
[[ -n "$job" ]] || die "usage: to-tracker <job>"
local metaf="$STATE/$job.meta"
[[ -f "$metaf" ]] || metaf=$(ls -1t "$STATE"/*"$job"*.meta 2>/dev/null | head -1)
[[ -f "$metaf" ]] || die "to-tracker: no meta for job '$job'"
local jn; jn=$(basename "$metaf"); jn=${jn%.meta}
local item_id; item_id=$(_meta_val "$metaf" tracker_item)
if [[ -z "$item_id" ]]; then
log "to-tracker: $jn has no tracker-item — nothing to echo"
return 0
fi
local result status last
result=$(_meta_val "$metaf" result)
status=$(_tracker_status_for "$result")
last=$(_meta_val "$metaf" tracker_echoed)
if [[ "$last" == "$status" ]]; then
log "to-tracker: $jn already echoed status=$status to $item_id (no-op)"
return 0
fi
# 1) status transition
_api_call PATCH "/api/items/$item_id/status" "{\"status\":\"$status\"}"
case "$API_CODE" in
2*) :;;
*) err "to-tracker: status PATCH for $item_id failed (HTTP ${API_CODE:-error}) — non-fatal; job state unchanged"; return 0;;
esac
# 2) metrics-only comment (never prompt content / secrets)
local note; note=$(_tracker_note "$jn" "$metaf" "$result" "$status")
_api_call POST "/api/items/$item_id/comments" "{\"body\":\"$(_json_escape "$note")\"}"
case "$API_CODE" in
2*) :;;
*) err "to-tracker: comment for $item_id failed (HTTP ${API_CODE:-error}) — non-fatal";;
esac
# 3) record echoed status (idempotency)
{ echo "tracker_echoed=$status"; echo "tracker_echoed_at=$(date +%s)"; } >> "$metaf"
log "to-tracker: echoed $jn -> item $item_id (status=$status)"
}
# _auto_echo <job> — opt-in (AQ_TRACKER_AUTO=1) best-effort echo on a transition.
# Never blocks or fails the job: the tracker is downstream, not authoritative.
_auto_echo() {
[[ "$AQ_TRACKER_AUTO" == 1 ]] || return 0
cmd_to_tracker "$1" >/dev/null 2>&1 || true
}
cmd_init() { ensure_dirs; log "queue initialized at $C_BOLD$QUEUE_ROOT$C_RESET"; }
cmd_add() {
ensure_dirs
local file="" engine="" cwd="" yolo=""
while [[ $# -gt 0 ]]; do
case "$1" in
--engine) engine=$2; shift 2;;
--cwd) cwd=$2; shift 2;;
--yolo) yolo=true; shift;;
--no-yolo) yolo=false; shift;;
*) file=$1; shift;;
esac
done
[[ -n "$file" && -f "$file" ]] || die "usage: add <file.md> [--engine devin|claude|codex] [--cwd PATH] [--yolo|--no-yolo]"
# ── idempotency-key dedupe (§5) ──
# If the file declares an idempotency-key, compare a content hash of its
# stripped body against existing jobs in any active stage:
# same key + same body -> no-op (skip; "duplicate")
# same key + different body, prior in inbox/ -> supersede (replace it)
# same key + different body, prior elsewhere -> reject (clear error)
local idem; idem=$(fm_get "$file" idempotency-key "")
if [[ -n "$idem" ]]; then
local newhash; newhash=$(_body_hash "$file")
local d ef ekey ehash stage
# pass 1: exact duplicate anywhere active -> no-op
for d in "$INBOX" "$BUILDING" "$REVIEW" "$TESTING" "$SHIPPED"; do
for ef in "$d"/*.md; do
[[ -e "$ef" ]] || continue
ekey=$(fm_get "$ef" idempotency-key "")
[[ "$ekey" == "$idem" ]] || continue
ehash=$(_body_hash "$ef")
if [[ "$ehash" == "$newhash" ]]; then
log "duplicate, skipped (idempotency-key=$C_BOLD$idem$C_RESET matches $(basename "$ef"))"
return 0
fi
done
done
# pass 2: same key, different body, prior is past inbox -> reject
for d in "$BUILDING" "$REVIEW" "$TESTING" "$SHIPPED"; do
for ef in "$d"/*.md; do
[[ -e "$ef" ]] || continue
ekey=$(fm_get "$ef" idempotency-key "")
[[ "$ekey" == "$idem" ]] || continue
stage=$(basename "$d")
die "idempotency-key '$idem' already in use by $(basename "$ef") (stage: $stage) with different content — refusing. Use a new key, or requeue the existing job."
done
done
# pass 3: same key, different body, prior still queued -> supersede
for ef in "$INBOX"/*.md; do
[[ -e "$ef" ]] || continue
ekey=$(fm_get "$ef" idempotency-key "")
[[ "$ekey" == "$idem" ]] || continue
log "superseding inbox job $(basename "$ef") (same idempotency-key=$idem, changed content)"
rm -f "$ef"
done
fi
# ── dep cycle detection (§5): reject a submit that would create a cycle in the
# idempotency-key dependency graph (inbox + active stages). ──
local newdeps; newdeps=$(parse_list "$(fm_get "$file" deps "")" | tr '\n' ' ')
if [[ -n "${newdeps// /}" ]] && deps_would_cycle "$idem" "$newdeps"; then
die "dependency cycle detected: job (key '${idem:-<none>}') with deps [${newdeps% }] would create a cycle — refusing."
fi
local base; base=$(basename "$file")
local stamp; stamp=$(date +%Y%m%d-%H%M%S)
local dest="$INBOX/${stamp}__${base}"
# If user passed flags AND the file has no frontmatter, inject one.
if [[ -n "$engine$cwd$yolo" ]] && [[ "$(head -1 "$file")" != "---" ]]; then
{
echo "---"
echo "engine: ${engine:-$DEFAULT_ENGINE}"
echo "cwd: ${cwd:-$PWD}"
echo "yolo: ${yolo:-true}"
echo "---"
echo
cat "$file"
} > "$dest"
else
cp "$file" "$dest"
fi
log "queued $C_BOLD$(basename "$dest")$C_RESET (engine=$(fm_get "$dest" engine "$DEFAULT_ENGINE"), cwd=$(fm_get "$dest" cwd "$PWD"))"
}
cmd_run() {
ensure_dirs
local once=false
while [[ $# -gt 0 ]]; do
case "$1" in
--max) MAX_CONCURRENCY=$2; shift 2;;
--engine) DEFAULT_ENGINE=$2; shift 2;;
--once|--drain) once=true; shift;;
*) die "run: unknown arg '$1'";;
esac
done
# Refuse to start a second run loop against the same queue — two daemons would
# break the single-launcher invariant that per-cwd locking relies on.
local dpid=""
[[ -f "$STATE/daemon.pid" ]] && dpid=$(cat "$STATE/daemon.pid" 2>/dev/null)
if [[ -n "$dpid" ]] && kill -0 "$dpid" 2>/dev/null; then
die "a run loop is already active (pid $dpid). Use 'stop' first, or a different AGENT_QUEUE_ROOT."
fi
[[ -n "$dpid" ]] && log "clearing stale daemon.pid ($dpid)"
echo "$$" > "$STATE/daemon.pid"
trap 'rm -f "$STATE/daemon.pid"; log "run loop stopped"; exit 0' INT TERM
log "run loop started (max=$MAX_CONCURRENCY, default engine=$DEFAULT_ENGINE). Ctrl-C to stop."
# Crash recovery (§25.3): reclaim jobs orphaned in building/ by a previous
# crash/power-off before launching anything new.
recover_orphans
while true; do
# continuously sweep for orphans (a worker that died mid-loop)
recover_orphans
local running; running=$(active_workers)
# launch jobs while we have capacity and an eligible inbox file
while [[ "$running" -lt "$MAX_CONCURRENCY" ]]; do
# pick by priority (critical→low) then age, skipping files whose lock key
# is currently busy, so two jobs sharing a cwd (or `lock:` key) never run
# at once regardless of --max. inbox_sorted replaces the old pure-FIFO sort.
# Also skip jobs still inside their retry/recovery backoff (next_eligible).
local busy; busy=$(busy_keys)
local next="" cand cand_key cand_job cand_ne now_s
now_s=$(date +%s)
while IFS= read -r cand; do
[[ -n "$cand" ]] || continue
cand_key=$(lock_key_for "$cand")
if printf '%s\n' "$busy" | grep -qxF -- "$cand_key"; then continue; fi
cand_job=$(basename "$cand"); cand_job=${cand_job%.md}
cand_ne=$(grep '^next_eligible=' "$STATE/$cand_job.meta" 2>/dev/null | tail -1 | cut -d= -f2)
if [[ "$cand_ne" =~ ^[0-9]+$ ]] && [[ "$cand_ne" -gt "$now_s" ]]; then continue; fi
# skip jobs whose deps (§5 DAG) are unmet — blocked, re-evaluated next loop
if [[ -n "$(deps_unmet "$cand")" ]]; then continue; fi
next="$cand"; break
done < <(inbox_sorted)
[[ -z "$next" ]] && break
local job; job=$(basename "$next"); job=${job%.md}
local doing_file="$BUILDING/$(basename "$next")"
mv "$next" "$doing_file"
local w_eng w_cwd w_yolo w_key
# resolve the concrete engine now (explicit engine / engine-class) so the
# meta + status reflect what will actually run; run_worker re-resolves and
# is the authority on capability/engine gates.
w_eng=$(resolve_engine "$doing_file")
w_cwd=$(fm_get "$doing_file" cwd "$PWD")
w_yolo=$(fm_get "$doing_file" yolo "true")
w_key=$(lock_key_for "$doing_file")
# Preserve the attempt counter across requeues (retry/orphan recovery set
# it before re-queuing); a fresh job starts at 1. Read BEFORE truncating
# the meta below so the count survives re-creation (§25.4 crash-safe).
local w_attempts; w_attempts=$(grep '^attempts=' "$STATE/$job.meta" 2>/dev/null | tail -1 | cut -d= -f2)
[[ "$w_attempts" =~ ^[0-9]+$ ]] && [[ "$w_attempts" -gt 0 ]] || w_attempts=1
# write meta BEFORE launch (no pid yet), then append the worker pid from $!.
# The new manifest fields (§5) are recorded here; only priority,
# capabilities, engine-class and idempotency-key are functional this
# phase — the rest are stored for `status`/audit but otherwise inert.
{
echo "job=$job"
echo "engine=${w_eng:-<none>}"
echo "cwd=$w_cwd"
echo "yolo=$w_yolo"
echo "lock=$w_key"
echo "started=$(date +%s)"
echo "attempts=$w_attempts"
echo "priority=$(fm_get "$doing_file" priority medium)"
echo "profile=$(fm_get "$doing_file" profile "")"
echo "engine_class=$(fm_eff "$doing_file" engine-class "")"
echo "capabilities=$(fm_eff "$doing_file" capabilities "")"
echo "prefers=$(fm_get "$doing_file" prefers "")"
echo "prefers_engine=$(fm_eff "$doing_file" prefers-engine "")"
echo "allowed_scope=$(fm_eff "$doing_file" allowed-scope "" allowed-scope)"
echo "budget=$(fm_get "$doing_file" budget "")"
echo "deps=$(fm_get "$doing_file" deps "")"
echo "deps_mode=$(fm_get "$doing_file" deps-mode "")"
echo "idempotency_key=$(fm_get "$doing_file" idempotency-key "")"
echo "retry=$(fm_get "$doing_file" retry "")"
echo "review_policy=$(fm_eff "$doing_file" review-policy "" review-policy)"
echo "artifacts=$(fm_get "$doing_file" artifacts "")"
echo "tracker_item=$(fm_get "$doing_file" tracker-item "")"
} > "$STATE/$job.meta"
run_worker "$doing_file" &
{ echo "pid=$!"; echo "pidstart=$(_pidstart "$!")"; } >> "$STATE/$job.meta"
log "▶ launching $C_BOLD$job$C_RESET (engine=$w_eng, lock=$w_key)"
_auto_echo "$job" # building -> in_progress (opt-in)
sleep 1
running=$(active_workers)
done
if $once; then
# drain when no worker is running and nothing in inbox can still progress on
# its own (backoff jobs still count as pending; dep-blocked jobs do not).
if [[ "$(active_workers)" -eq 0 ]] && ! _drain_pending; then
log "drain complete — no runnable work, no workers running"; rm -f "$STATE/daemon.pid"; exit 0
fi
fi
sleep "$POLL_SECONDS"
done
}
_count() { ls -1 "$1"/*.md 2>/dev/null | wc -l | tr -d ' '; }
cmd_status() {
ensure_dirs
local ib bd rv ts sh fl
ib=$(_count "$INBOX"); bd=$(_count "$BUILDING"); rv=$(_count "$REVIEW")
ts=$(_count "$TESTING"); sh=$(_count "$SHIPPED"); fl=$(_count "$FAILED")
local running; running=$(active_workers)
echo
printf '%s AGENT QUEUE %s %s\n' "$C_BOLD" "$C_DIM$QUEUE_ROOT$C_RESET" ""
printf ' %sinbox%s %-3s %sbuilding%s %-3s %sreview%s %-3s %stesting%s %-3s %sshipped%s %-3s %sfailed%s %-3s %srunning%s %s/%s\n\n' \
"$C_BLUE" "$C_RESET" "$ib" "$C_YEL" "$C_RESET" "$bd" \
"$C_CYAN" "$C_RESET" "$rv" "$C_CYAN" "$C_RESET" "$ts" \
"$C_GREEN" "$C_RESET" "$sh" "$C_RED" "$C_RESET" "$fl" \
"$C_BOLD" "$C_RESET" "$running" "$MAX_CONCURRENCY"
# running table
local f
local printed=false
for f in "$STATE"/*.meta; do
[[ -e "$f" ]] || continue
grep -q '^ended=' "$f" && continue
local pid pidstart; pid=$(grep '^pid=' "$f" | cut -d= -f2); pidstart=$(grep '^pidstart=' "$f" | cut -d= -f2-)
_pid_alive "$pid" "$pidstart" || continue
if ! $printed; then printf ' %sRUNNING%s\n' "$C_BOLD" "$C_RESET"; printed=true; fi
local job eng start now el last lmt age stall=""
job=$(grep '^job=' "$f" | cut -d= -f2)
eng=$(grep '^engine=' "$f" | cut -d= -f2)
start=$(grep '^started=' "$f" | cut -d= -f2)
now=$(date +%s); el=$(( now - ${start:-$now} ))
last=$(tail -n 1 "$LOGS/$job.log" 2>/dev/null | cut -c1-60)
lmt=$(_mtime "$LOGS/$job.log"); age=$(( now - ${lmt:-$now} ))
[[ "$age" -gt $(( STALL_MIN * 60 )) ]] && stall="${C_RED}⚠ stalled${C_RESET} "
printf ' %s%-26s%s %-7s %3dm%02ds pid %-6s %s%s%s%s\n' \
"$C_BOLD" "$job" "$C_RESET" "$eng" $((el/60)) $((el%60)) "$pid" "$stall" "$C_DIM" "$last" "$C_RESET"
# manifest sub-line (Phase 1): priority + profile + capabilities + tracker-item
local m_prio m_prof m_caps m_trk extra=""
m_prio=$(grep '^priority=' "$f" | cut -d= -f2-); m_prof=$(grep '^profile=' "$f" | cut -d= -f2-)
m_caps=$(grep '^capabilities=' "$f" | cut -d= -f2-); m_trk=$(grep '^tracker_item=' "$f" | cut -d= -f2-)
local m_echo; m_echo=$(grep '^tracker_echoed=' "$f" | tail -1 | cut -d= -f2-)
[[ -n "$m_prio" ]] && extra+="prio=$m_prio "
[[ -n "$m_prof" ]] && extra+="profile=$m_prof "
[[ -n "$m_caps" ]] && extra+="caps=$m_caps "
[[ -n "$m_trk" ]] && extra+="tracker=$m_trk "
[[ -n "$m_echo" ]] && extra+="echoed=$m_echo "
[[ -n "$extra" ]] && printf ' %s%s%s\n' "$C_DIM" "$extra" "$C_RESET"
printf ' %s%s%s\n' "$C_DIM" "$(_insights_line "$f")" "$C_RESET"
done
$printed || printf ' %sno workers running%s\n' "$C_DIM" "$C_RESET"
# blocked jobs (unmet deps, §5) — waiting in inbox/, re-evaluated each loop
local bf bj unmet bprinted=false
for bf in "$INBOX"/*.md; do
[[ -e "$bf" ]] || continue
unmet=$(deps_unmet "$bf")
[[ -n "$unmet" ]] || continue
if ! $bprinted; then echo; printf ' %sBLOCKED%s\n' "$C_BOLD" "$C_RESET"; bprinted=true; fi
bj=$(basename "$bf"); bj=${bj%.md}
printf ' %s%-26s%s %sblocked (waiting on: %s)%s\n' \
"$C_BOLD" "$bj" "$C_RESET" "$C_YEL" "$unmet" "$C_RESET"
done
echo
}
cmd_watch() {
local interval="${1:-2}"
while true; do clear; cmd_status; sleep "$interval"; done
}
# recover — reclaim orphaned building/ jobs (dead worker) back to inbox/. Runs
# automatically inside `run`; exposed for operators + crash-recovery testing.
cmd_recover() { ensure_dirs; recover_orphans; log "orphan sweep complete"; }
# insights [job] — per-job metrics, or a recent-jobs table + per-engine rollup.
cmd_insights() {
ensure_dirs
local job="${1:-}"
if [[ -n "$job" ]]; then
local f="$STATE/$job.meta"
[[ -f "$f" ]] || f=$(ls -1t "$STATE"/*"$job"*.meta 2>/dev/null | head -1)
[[ -f "$f" ]] || die "no job meta matching '$job'"
local jn; jn=$(basename "$f"); jn=${jn%.meta}
printf '\n%s INSIGHTS %s%s%s\n' "$C_BOLD" "$C_CYAN" "$jn" "$C_RESET"
local k val
for k in engine result attempts started ended duration_s exit verify_exit \
model tokens_in tokens_out tokens_cached cost_usd turns tool_calls usage_estimated \
files_changed lines_added lines_deleted wip_branch wip_base wip_commit \
next_eligible retry_class recovered tracker_item tracker_echoed tracker_echoed_at; do
val=$(_meta_val "$f" "$k")
[[ -n "$val" ]] && printf ' %-15s %s\n' "$k" "$val"
done
echo
return 0
fi
printf '\n%s INSIGHTS — recent finished jobs%s %s%s%s\n' \
"$C_BOLD" "$C_RESET" "$C_DIM" "$QUEUE_ROOT" "$C_RESET"
printf ' %-26s %-8s %-16s %6s %10s %9s\n' "job" "engine" "result" "dur" "tok(i/o)" "cost"
local f rows=0 agg; agg=$(mktemp "${TMPDIR:-/tmp}/aq-insights.XXXXXX")
while IFS= read -r f; do
[[ -n "$f" ]] || continue
grep -q '^ended=' "$f" || continue
local jn eng res dur ti to cost est
jn=$(basename "$f"); jn=${jn%.meta}
eng=$(_meta_val "$f" engine); res=$(_meta_val "$f" result); dur=$(_meta_val "$f" duration_s)
ti=$(_meta_val "$f" tokens_in); to=$(_meta_val "$f" tokens_out); cost=$(_meta_val "$f" cost_usd)
est=$(_meta_val "$f" usage_estimated)
rows=$((rows+1))
[[ $rows -le 15 ]] && printf ' %-26.26s %-8.8s %-16.16s %5ss %10s %9s\n' \
"$jn" "${eng:-?}" "${res:-?}" "${dur:-0}" "${ti:-0}/${to:-0}" "${cost:-}"
local succ=0; _result_is_success "$res" && succ=1
printf '%s|%s|%s|%s|%s|%s|%s\n' "${eng:-?}" "${ti:-0}" "${to:-0}" "${cost:-0}" "${dur:-0}" "$succ" "$est" >> "$agg"
done < <(ls -1t "$STATE"/*.meta 2>/dev/null)
if [[ $rows -eq 0 ]]; then
printf ' %sno finished jobs yet%s\n\n' "$C_DIM" "$C_RESET"; rm -f "$agg"; return 0
fi
printf '\n%s ROLLUP BY ENGINE%s\n' "$C_BOLD" "$C_RESET"
printf ' %-8s %5s %10s %10s %10s %8s\n' "engine" "jobs" "tok_in" "tok_out" "cost" "success"
local e
for e in $(cut -d'|' -f1 "$agg" | sort -u); do
awk -F'|' -v eng="$e" '
$1==eng { jobs++; ti+=$2; to+=$3; cost+=$4; succ+=$6; if ($7!="") est=1 }
END {
rate = jobs>0 ? (succ*100.0/jobs) : 0
printf " %-8s %5d %10d %10d %9.4f%s %6.0f%%\n", eng, jobs, ti, to, cost, (est?"*":" "), rate
}' "$agg"
done
printf ' %s* total includes estimated token/cost values%s\n\n' "$C_DIM" "$C_RESET"
rm -f "$agg"
}
cmd_dash() {
command -v node >/dev/null 2>&1 || die "node not found — use 'watch' for the bash status view"
AGENT_QUEUE_ROOT="$QUEUE_ROOT" exec node "$SCRIPT_DIR/dashboard.mjs" "$@"
}
cmd_stop() {
ensure_dirs
local killed=0 f pid pidstart
for f in "$STATE"/*.meta; do
[[ -e "$f" ]] || continue
grep -q '^ended=' "$f" && continue
pid=$(grep '^pid=' "$f" | cut -d= -f2); pidstart=$(grep '^pidstart=' "$f" | cut -d= -f2-)
_pid_alive "$pid" "$pidstart" && { kill "$pid" 2>/dev/null && killed=$((killed+1)); }
done
[[ -f "$STATE/daemon.pid" ]] && kill "$(cat "$STATE/daemon.pid")" 2>/dev/null
rm -f "$STATE/daemon.pid"
log "stopped $killed running worker(s) + run loop"
}
cmd_logs() {
local job="${1:-}" follow=""
[[ "${2:-}" == "-f" || "$job" == "-f" ]] && follow="-f"
[[ "$job" == "-f" ]] && job="${2:-}"
[[ -n "$job" ]] || die "usage: logs <job> [-f]"
local lf="$LOGS/$job.log"
[[ -f "$lf" ]] || lf=$(ls -1t "$LOGS"/*"$job"*.log 2>/dev/null | head -1)
[[ -f "$lf" ]] || die "no log found for '$job'"
if [[ -n "$follow" ]]; then tail -f "$lf"; else cat "$lf"; fi
}
# _find_job <job> <dir...> — echo the first matching .md across the given dirs
# (exact "<job>.md" preferred, else newest fuzzy match). Empty if none found.
_find_job() {
local job=$1; shift
local d f
for d in "$@"; do
[[ -f "$d/$job.md" ]] && { printf '%s' "$d/$job.md"; return; }
done
for d in "$@"; do
f=$(ls -1t "$d"/*"$job"*.md 2>/dev/null | head -1)
[[ -f "$f" ]] && { printf '%s' "$f"; return; }
done
}
# requeue <job> — move a job back to inbox/ for a fresh run (from failed/review/testing).
cmd_requeue() {
ensure_dirs
local job="${1:-}"
[[ -n "$job" ]] || die "usage: requeue <job>"
local f; f=$(_find_job "$job" "$FAILED" "$REVIEW" "$TESTING")
[[ -n "$f" ]] || die "no failed/review/testing job matching '$job'"
local base name from; base=$(basename "$f"); name=${base%.md}; from=$(basename "$(dirname "$f")")
mv "$f" "$INBOX/$base"
# drop stale state so it re-runs cleanly
rm -f "$STATE/$name.meta" "$STATE/$name.body.md" "$STATE/$name.timedout"
log "requeued $C_BOLD$base$C_RESET ($from → inbox)"
}
# ship <job> — manual promotion testing/ (QA) → shipped/. The human gate.
cmd_ship() {
ensure_dirs
local job="${1:-}"
[[ -n "$job" ]] || die "usage: ship <job>"
local f; f=$(_find_job "$job" "$TESTING")
[[ -n "$f" ]] || die "no job in testing/ matching '$job' (only QA-passed jobs can ship)"
local base name; base=$(basename "$f"); name=${base%.md}
mv "$f" "$SHIPPED/$base"
[[ -f "$STATE/$name.meta" ]] && echo "result=shipped" >> "$STATE/$name.meta"
log "shipped $C_BOLD$base$C_RESET (testing → shipped)"
_auto_echo "$name"
}
# promote <job> — advance one stage forward: review → testing → shipped.
cmd_promote() {
ensure_dirs
local job="${1:-}"
[[ -n "$job" ]] || die "usage: promote <job>"
local f; f=$(_find_job "$job" "$REVIEW" "$TESTING")
[[ -n "$f" ]] || die "no job in review/ or testing/ matching '$job'"
local base name from dest result; base=$(basename "$f"); name=${base%.md}
from=$(basename "$(dirname "$f")")
case "$from" in
review) dest="$TESTING"; result="testing";;
testing) dest="$SHIPPED"; result="shipped";;
*) die "promote: '$base' is in '$from' — nothing to promote";;
esac
mv "$f" "$dest/$base"
[[ -f "$STATE/$name.meta" ]] && echo "result=$result" >> "$STATE/$name.meta"
log "promoted $C_BOLD$base$C_RESET ($from$result)"
_auto_echo "$name"
}
# reject <job> — move a review/testing job to failed/ (manual gate rejection).
cmd_reject() {
ensure_dirs
local job="${1:-}"
[[ -n "$job" ]] || die "usage: reject <job>"
local f; f=$(_find_job "$job" "$REVIEW" "$TESTING")
[[ -n "$f" ]] || die "no job in review/ or testing/ matching '$job'"
local base name from; base=$(basename "$f"); name=${base%.md}; from=$(basename "$(dirname "$f")")
mv "$f" "$FAILED/$base"
[[ -f "$STATE/$name.meta" ]] && echo "result=rejected" >> "$STATE/$name.meta"
log "rejected $C_BOLD$base$C_RESET ($from → failed)"
}
# clean [--keep N] — archive finished jobs' logs+meta beyond the newest N
# (default 50) into queue/.archive/<ts>/. Running jobs and the done/failed .md
# kanban records are left untouched.
cmd_clean() {
ensure_dirs
local keep=50
while [[ $# -gt 0 ]]; do
case "$1" in
--keep) keep=$2; shift 2;;
*) die "clean: unknown arg '$1'";;
esac
done
[[ "$keep" =~ ^[0-9]+$ ]] || die "clean: --keep must be a number"
local arch="$QUEUE_ROOT/.archive/$(date +%Y%m%d-%H%M%S)"
# finished metas (have ended=), newest-first by mtime
local metas; metas=$(grep -l '^ended=' "$STATE"/*.meta 2>/dev/null \
| while IFS= read -r m; do printf '%s %s\n' "$(_mtime "$m")" "$m"; done \
| sort -rn | awk '{print $2}')
local i=0 moved=0 m name
while IFS= read -r m; do
[[ -n "$m" ]] || continue
i=$((i+1))
[[ "$i" -le "$keep" ]] && continue
name=$(basename "$m"); name=${name%.meta}
mkdir -p "$arch"
mv "$m" "$arch/" 2>/dev/null
[[ -f "$LOGS/$name.log" ]] && mv "$LOGS/$name.log" "$arch/" 2>/dev/null
[[ -f "$STATE/$name.body.md" ]] && mv "$STATE/$name.body.md" "$arch/" 2>/dev/null
moved=$((moved+1))
done <<< "$metas"
if [[ "$moved" -gt 0 ]]; then
log "archived $moved finished job(s) to $C_BOLD$arch$C_RESET (kept newest $keep)"
else
log "nothing to clean (≤$keep finished jobs)"
fi
}
usage() {
cat <<EOF
${C_BOLD}agent-queue${C_RESET} — folder kanban runner for devin | claude | codex
${C_BOLD}USAGE${C_RESET}
agent-queue.sh <command> [args]
${C_BOLD}COMMANDS${C_RESET}
init create the queue/ folders
add <file.md> [opts] queue a prompt file into inbox/
--engine devin|claude|codex --cwd PATH --yolo | --no-yolo
run [--max N] [--engine E] [--once]
process inbox/ (foreground loop; Ctrl-C to stop)
status show kanban counts + running workers (+ insights)
watch [interval] live status (default 2s, bash)
insights [job] per-job metrics, or recent table + per-engine rollup
recover reclaim orphaned building/ jobs (dead worker) -> inbox
from-tracker <ITEM_ID> pull a tracker Item -> materialize a job in inbox/ (§10)
to-tracker <job> echo a job's outcome to its tracker Item (one-way)
dash [--interval N] richer live Node dashboard (recent shipped/failed too)
stop kill running workers + the run loop
logs <job> [-f] print (or follow) a job's log
promote <job> advance one stage (review → testing → shipped)
ship <job> manual gate: testing (QA) → shipped
reject <job> send a review/testing job to failed/
requeue <job> move a failed/review/testing job back to inbox/
clean [--keep N] archive finished logs+meta beyond newest N (default 50)
help this message
${C_BOLD}KANBAN${C_RESET} inbox → building → review → testing → shipped (+ failed; logs/ + .state/ alongside)
auto: agent rc=0 → review; verify pass → testing; verify fail → failed
manual: ship (testing → shipped)
${C_BOLD}TASK FRONTMATTER${C_RESET} (top of each .md)
---
engine: devin # devin|claude|codex|copilot. Explicit engine always wins
cwd: /Users/you/code/repo
yolo: true
lock: my-repo # optional; defaults to cwd. Jobs sharing a key run serially
timeout: 45m # optional; 90s|45m|2h|1d. On expiry -> failed (result=timeout)
verify: pnpm -s test # optional; auto-QA gate. pass -> testing, fail -> failed
# --- Phase 1 manifest (active) ---
priority: high # critical|high|medium|low (default medium). Picked highest-first, then oldest
engine-class: agentic-coder # used only if `engine` unset: agentic-coder->devin,claude,codex; chat-coder->copilot
prefers-engine: [claude] # optional order hint for engine-class resolution
capabilities: [os:any, node>=20, has:git] # hard host requirements; unmet -> failed (capability_mismatch)
idempotency-key: my-task-1 # re-adding same key+body = no-op; same key+different body = reject/supersede
retry: { max: 2, backoff: 5m, on: [timeout, verify_failed, crash] } # requeue on these classes up to max, then retries_exhausted
profile: backend-engineer # inherit persona + verify/caps/engine-class/scope/review-policy (job fields override)
deps: [other-key] # block until each idempotency-key is shipped/ (or testing/ if deps-mode: soft)
deps-mode: soft # soft = a dep also counts as satisfied while in testing/
# --- reserved (parsed + shown in status, but no-op until a later phase) ---
prefers: budget: review-policy: artifacts: tracker-item:
---
${C_BOLD}PROFILES${C_RESET} profiles/<name>.md presets persona + capabilities + default-verify + engine-class +
prefers-engine + allowed-scope + review-policy. A job's own fields always override.
Catalog: developer, backend-engineer, frontend-engineer, ux-designer, ui-designer,
qa, reviewer, docs-writer, planner(reserved).
${C_BOLD}RESILIENCE${C_RESET} crash-safe: orphaned building/ jobs (dead worker) are recovered to inbox/ on
'run' startup; git-repo cwd work is checkpointed to branch aq/wip/<job> on every
exit (resumed on retry); 'retry' requeues failures with backoff. See 'insights'.
${C_BOLD}ENV${C_RESET}
AGENT_QUEUE_ROOT (=$QUEUE_ROOT) AGENT_QUEUE_MAX (=$MAX_CONCURRENCY)
AGENT_QUEUE_ENGINE (=$DEFAULT_ENGINE) AGENT_QUEUE_VERIFY (default verify cmd)
DEVIN_BIN / CLAUDE_BIN / CODEX_BIN / COPILOT_BIN
${C_BOLD}TRACKER${C_RESET} (§10 — from-tracker / to-tracker; real use needs platform-service + a token)
AQ_TRACKER_API (=$AQ_TRACKER_API) AQ_TRACKER_TOKEN (bearer) AQ_PRODUCT_ID
AQ_TRACKER_AUTO=1 to auto-echo outcomes on each transition (default OFF)
AQ_TRACKER_CWD (cwd for tracker-derived jobs) AQ_TRACKER_API_CMD (test stub seam)
label hints on an Item: engine-class:<x> profile:<x> priority:<x> cap:<token>
EOF
}
main() {
local cmd="${1:-help}"; shift || true
case "$cmd" in
init) cmd_init "$@";;
add) cmd_add "$@";;
run) cmd_run "$@";;
status) cmd_status "$@";;
watch) cmd_watch "$@";;
insights) cmd_insights "$@";;
recover) cmd_recover "$@";;
from-tracker) cmd_from_tracker "$@";;
to-tracker) cmd_to_tracker "$@";;
dash|dashboard) cmd_dash "$@";;
stop) cmd_stop "$@";;
logs) cmd_logs "$@";;
promote) cmd_promote "$@";;
ship) cmd_ship "$@";;
reject) cmd_reject "$@";;
requeue) cmd_requeue "$@";;
clean) cmd_clean "$@";;
help|-h|--help) usage;;
*) err "unknown command: $cmd"; echo; usage; exit 1;;
esac
}
main "$@"