feat(agent-queue): enforce budget.wall as a hard wall-clock ceiling

Parse the wall ceiling from the budget manifest map (budget: { wall: <dur> })
and arm it alongside the per-run timeout. Whichever ceiling fires first binds;
the kill is recorded as result=timeout or result=budget_exceeded accordingly.
budget.wall extends timeout: a job with only a budget.wall (no timeout) is now
hard-killed at the ceiling. budget_exceeded is a terminal, non-retryable class
by default and maps to the failed tracker status.

Adds _budget_wall_secs + _effective_kill helpers (pure, unit-tested) and live
selftest coverage; usd/tokens remain best-effort and are not enforced here.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
Saravanakumar D 2026-05-30 19:21:49 -07:00
parent f1fe66fd4d
commit 7f77e9abc7
3 changed files with 81 additions and 14 deletions

View File

@ -224,6 +224,32 @@ _dur_to_secs() {
fi fi
} }
# _budget_wall_secs <file> -> the HARD wall-clock ceiling in seconds parsed from
# the `budget:` manifest map, e.g. `budget: { usd: 5, tokens: 2M, wall: 4h }`.
# `wall` is the always-enforceable hard ceiling (§5/§14); `usd`/`tokens` stay
# best-effort and are NOT enforced here. Unset/invalid -> 0 (no budget kill).
_budget_wall_secs() {
local raw w
raw=$(fm_get "$1" budget "")
w=$(printf '%s' "$raw" | grep -oE 'wall[[:space:]]*:[[:space:]]*[0-9]+[smhd]?' \
| grep -oE '[0-9]+[smhd]?' | head -1)
_dur_to_secs "${w:-0}"
}
# _effective_kill <timeout_secs> <budget_wall_secs> -> "<secs> <class>".
# The binding hard wall-clock ceiling for a single run. `budget.wall` extends
# `timeout`: when only one is set it binds; when both are set the smaller
# (earlier) ceiling fires, and <class> reflects which limit it was so the run is
# recorded as `timeout` vs `budget_exceeded`. 0/0 -> "0 timeout" (no kill armed).
_effective_kill() {
local tmo=${1:-0} bw=${2:-0}
if [[ "$bw" -gt 0 && ( "$tmo" -le 0 || "$bw" -lt "$tmo" ) ]]; then
echo "$bw budget_exceeded"
else
echo "$tmo timeout"
fi
}
# _meta_active <metafile> -> 0 if the job is occupying a concurrency slot. # _meta_active <metafile> -> 0 if the job is occupying a concurrency slot.
# Active = no `ended=` AND (pid is live, OR pid not yet written but the meta was # Active = no `ended=` AND (pid is live, OR pid not yet written but the meta was
# created moments ago — the reserved-slot window between meta-write and launch). # created moments ago — the reserved-slot window between meta-write and launch).
@ -719,23 +745,29 @@ run_worker() {
local rc=0 lockkey tmo timed_out=false local rc=0 lockkey tmo timed_out=false
lockkey=$(lock_key_for "$doing_file") lockkey=$(lock_key_for "$doing_file")
tmo=$(_dur_to_secs "$(fm_get "$doing_file" timeout "0")") tmo=$(_dur_to_secs "$(fm_get "$doing_file" timeout "0")")
# budget.wall is a HARD wall-clock ceiling that extends `timeout` (§5/§14):
# whichever ceiling fires first binds, and `kill_class` records which one so
# the run is labeled `timeout` vs `budget_exceeded`.
local bwall eff kill_class _ek
bwall=$(_budget_wall_secs "$doing_file")
_ek=$(_effective_kill "$tmo" "$bwall"); eff="${_ek%% *}"; kill_class="${_ek##* }"
local tmo_flag="$STATE/$job.timedout"; rm -f "$tmo_flag" local tmo_flag="$STATE/$job.timedout"; rm -f "$tmo_flag"
local lf="$LOCKS/$(_keyhash "$lockkey").lock" local lf="$LOCKS/$(_keyhash "$lockkey").lock"
if [[ "$tmo" -gt 0 && -n "$TIMEOUT_BIN" ]]; then if [[ "$eff" -gt 0 && -n "$TIMEOUT_BIN" ]]; then
# Hard timeout via timeout/gtimeout (kills the whole process tree). # Hard ceiling via timeout/gtimeout (kills the whole process tree).
AQ_STDIN="$AGENT_STDIN" "$TIMEOUT_BIN" -k 5 "${tmo}s" bash -c ' AQ_STDIN="$AGENT_STDIN" "$TIMEOUT_BIN" -k 5 "${eff}s" bash -c '
cd "$1" || exit 97; shift cd "$1" || exit 97; shift
if [ -n "${AQ_STDIN:-}" ]; then exec "$@" < "$AQ_STDIN"; else exec "$@"; fi if [ -n "${AQ_STDIN:-}" ]; then exec "$@" < "$AQ_STDIN"; else exec "$@"; fi
' _ "$cwd" "${AGENT_CMD[@]}" >> "$logf" 2>&1 ' _ "$cwd" "${AGENT_CMD[@]}" >> "$logf" 2>&1
rc=$? rc=$?
[[ $rc -eq 124 ]] && timed_out=true [[ $rc -eq 124 ]] && timed_out=true
elif [[ "$tmo" -gt 0 ]]; then elif [[ "$eff" -gt 0 ]]; then
# Portable watchdog fallback (no timeout binary). Flags the timeout and # Portable watchdog fallback (no timeout binary). Flags the kill and
# signals the worker; install coreutils (gtimeout) for hard tree kills. # signals the worker; install coreutils (gtimeout) for hard tree kills.
_run_agent >> "$logf" 2>&1 & _run_agent >> "$logf" 2>&1 &
local apid=$! local apid=$!
( sleep "$tmo"; : > "$tmo_flag" ( sleep "$eff"; : > "$tmo_flag"
pkill -TERM -P "$apid" 2>/dev/null; kill -TERM "$apid" 2>/dev/null pkill -TERM -P "$apid" 2>/dev/null; kill -TERM "$apid" 2>/dev/null
sleep 5; pkill -KILL -P "$apid" 2>/dev/null; kill -KILL "$apid" 2>/dev/null ) & sleep 5; pkill -KILL -P "$apid" 2>/dev/null; kill -KILL "$apid" 2>/dev/null ) &
local wpid=$! local wpid=$!
@ -772,8 +804,8 @@ run_worker() {
[[ -n "$scope" ]] && scope_check "$cwd" "$WIP_BASE" "$scope" "$logf" "$metaf" [[ -n "$scope" ]] && scope_check "$cwd" "$WIP_BASE" "$scope" "$logf" "$metaf"
if $timed_out; then if $timed_out; then
echo "TIMED OUT after ${tmo}s (rc=$rc): $(date)" >> "$logf" echo "KILLED after ${eff}s (rc=$rc, limit=$kill_class): $(date)" >> "$logf"
_finish_failure "$job" "$doing_file" "$metaf" "$logf" "timeout" "$rc" "$started" _finish_failure "$job" "$doing_file" "$metaf" "$logf" "$kill_class" "$rc" "$started"
elif [[ $rc -eq 0 ]]; then elif [[ $rc -eq 0 ]]; then
# Fleet (§18): re-confirm the lease before accepting the agent's output. If the # Fleet (§18): re-confirm the lease before accepting the agent's output. If the
# coordinator reclaimed us mid-run (offline-degrade then reconnect to a stale # coordinator reclaimed us mid-run (offline-degrade then reconnect to a stale
@ -936,7 +968,7 @@ _class_retryable() {
# retry policy to a failed run. Retries (requeue to inbox with backoff via # retry policy to a failed run. Retries (requeue to inbox with backoff via
# next_eligible) while the class is in `retry.on` and attempts <= max; on # next_eligible) while the class is in `retry.on` and attempts <= max; on
# exhaustion -> failed/ result=retries_exhausted; with no policy -> failed/ with # exhaustion -> failed/ result=retries_exhausted; with no policy -> failed/ with
# the natural result (failed|timeout|verify_failed). The WIP branch is preserved # the natural result (failed|timeout|budget_exceeded|verify_failed). The WIP branch is preserved
# either way so a retry resumes from the checkpoint. # either way so a retry resumes from the checkpoint.
_finish_failure() { _finish_failure() {
local job=$1 file=$2 metaf=$3 logf=$4 class=$5 rc=$6 started=$7 local job=$1 file=$2 metaf=$3 logf=$4 class=$5 rc=$6 started=$7
@ -962,6 +994,7 @@ _finish_failure() {
else else
case "$class" in case "$class" in
timeout) result="timeout";; timeout) result="timeout";;
budget_exceeded) result="budget_exceeded";;
verify_failed) result="verify_failed";; verify_failed) result="verify_failed";;
*) result="failed";; *) result="failed";;
esac esac
@ -1157,7 +1190,7 @@ _json_escape() {
_tracker_status_for() { _tracker_status_for() {
case "$1" in case "$1" in
shipped) printf '%s' "$AQ_TRACKER_STATUS_DONE";; shipped) printf '%s' "$AQ_TRACKER_STATUS_DONE";;
failed|timeout|verify_failed|retries_exhausted|capability_mismatch|no_engine|rejected) failed|timeout|budget_exceeded|verify_failed|retries_exhausted|capability_mismatch|no_engine|rejected)
printf '%s' "$AQ_TRACKER_STATUS_FAILED";; printf '%s' "$AQ_TRACKER_STATUS_FAILED";;
*) printf '%s' "$AQ_TRACKER_STATUS_INPROGRESS";; *) printf '%s' "$AQ_TRACKER_STATUS_INPROGRESS";;
esac esac
@ -1875,6 +1908,7 @@ ${C_BOLD}TASK FRONTMATTER${C_RESET} (top of each .md)
yolo: true yolo: true
lock: my-repo # optional; defaults to cwd. Jobs sharing a key run serially lock: my-repo # optional; defaults to cwd. Jobs sharing a key run serially
timeout: 45m # optional; 90s|45m|2h|1d. On expiry -> failed (result=timeout) timeout: 45m # optional; 90s|45m|2h|1d. On expiry -> failed (result=timeout)
budget: { wall: 2h } # optional; wall = HARD wall-clock ceiling (extends timeout). On expiry -> failed (result=budget_exceeded). usd/tokens best-effort only
verify: pnpm -s test # optional; auto-QA gate. pass -> testing, fail -> failed verify: pnpm -s test # optional; auto-QA gate. pass -> testing, fail -> failed
# --- Phase 1 manifest (active) --- # --- Phase 1 manifest (active) ---
priority: high # critical|high|medium|low (default medium). Picked highest-first, then oldest priority: high # critical|high|medium|low (default medium). Picked highest-first, then oldest
@ -1887,7 +1921,7 @@ ${C_BOLD}TASK FRONTMATTER${C_RESET} (top of each .md)
deps: [other-key] # block until each idempotency-key is shipped/ (or testing/ if deps-mode: soft) deps: [other-key] # block until each idempotency-key is shipped/ (or testing/ if deps-mode: soft)
deps-mode: soft # soft = a dep also counts as satisfied while in testing/ deps-mode: soft # soft = a dep also counts as satisfied while in testing/
# --- reserved (parsed + shown in status, but no-op until a later phase) --- # --- reserved (parsed + shown in status, but no-op until a later phase) ---
prefers: budget: review-policy: artifacts: tracker-item: prefers: review-policy: artifacts: tracker-item:
--- ---
${C_BOLD}PROFILES${C_RESET} profiles/<name>.md presets persona + capabilities + default-verify + engine-class + ${C_BOLD}PROFILES${C_RESET} profiles/<name>.md presets persona + capabilities + default-verify + engine-class +

View File

@ -349,14 +349,14 @@ Each phase: **Goal → checklist → Exit criteria**. Don't start a phase until
> >
> **Slice progress — P1-S2 (profiles + deps/DAG, single host):** the `profiles/` catalog + resolution (`fm_eff` inheritance with job>profile>default precedence, persona injection), the warn-only `allowed-scope` guardrail (`scope_check`/`path_in_scope`), and single-host `deps` (block-with-reason in selection, `status` surfacing, submit-time cycle detection) are **done** — see §5/§6. > **Slice progress — P1-S2 (profiles + deps/DAG, single host):** the `profiles/` catalog + resolution (`fm_eff` inheritance with job>profile>default precedence, persona injection), the warn-only `allowed-scope` guardrail (`scope_check`/`path_in_scope`), and single-host `deps` (block-with-reason in selection, `status` surfacing, submit-time cycle detection) are **done** — see §5/§6.
> >
> **Slice progress — P1-S4 (tracker adapter, single host):** the task ↔ job round-trip is **done** (§10) — `aq from-tracker` materializes a job from a tracker Item (idempotent on `tracker-<id>`, label→manifest mapping), `aq to-tracker` echoes status + a metrics-only comment one-way (idempotent via `tracker_echoed`, never fatal), and opt-in `AQ_TRACKER_AUTO` auto-echoes on transitions. All HTTP is curl-only through one wrapper (test seam `AQ_TRACKER_API_CMD`). **This closes the Phase-1 §14 tracker-adapter item.** Remaining P1 extras: `budget.wall` (P1-S3 left it) and Node-`dash` surfacing of the new fields. > **Slice progress — P1-S4 (tracker adapter, single host):** the task ↔ job round-trip is **done** (§10) — `aq from-tracker` materializes a job from a tracker Item (idempotent on `tracker-<id>`, label→manifest mapping), `aq to-tracker` echoes status + a metrics-only comment one-way (idempotent via `tracker_echoed`, never fatal), and opt-in `AQ_TRACKER_AUTO` auto-echoes on transitions. All HTTP is curl-only through one wrapper (test seam `AQ_TRACKER_API_CMD`). **This closes the Phase-1 §14 tracker-adapter item.** Remaining P1 extras: Node-`dash` surfacing of the new fields. *(`budget.wall` now enforced — see §11 retry/budget line below.)*
- [x] Extend `agent-queue.sh` frontmatter parsing for all new manifest fields (§5), defaulted + backward-compatible. *(P1-S1)* - [x] Extend `agent-queue.sh` frontmatter parsing for all new manifest fields (§5), defaulted + backward-compatible. *(P1-S1)*
- [x] Add `profiles/` directory + profile resolution (persona injection, default verify/caps/scope) (§6). *(P1-S2)* - [x] Add `profiles/` directory + profile resolution (persona injection, default verify/caps/scope) (§6). *(P1-S2)*
- [x] Local capability detection + a job/factory capability match check before launch (§8 subset). *(P1-S1: `detect_capabilities` + `caps_match`; mismatch ⇒ `failed/` `result=capability_mismatch`, agent never launched.)* - [x] Local capability detection + a job/factory capability match check before launch (§8 subset). *(P1-S1: `detect_capabilities` + `caps_match`; mismatch ⇒ `failed/` `result=capability_mismatch`, agent never launched.)*
- [x] `priority` ordering in the inbox pick (replace pure FIFO with priority-then-age). *(P1-S1: `inbox_sorted`; per-lock serialization preserved.)* - [x] `priority` ordering in the inbox pick (replace pure FIFO with priority-then-age). *(P1-S1: `inbox_sorted`; per-lock serialization preserved.)*
- [x] `deps` (DAG) blocking on a single host; `idempotency-key` dedupe on `add`. *(P1-S1 idempotency dedupe + P1-S2 `deps` blocking/cycle detection.)* - [x] `deps` (DAG) blocking on a single host; `idempotency-key` dedupe on `add`. *(P1-S1 idempotency dedupe + P1-S2 `deps` blocking/cycle detection.)*
- [ ] `retry` with backoff into `failed`/requeue; `budget.wall` enforced (extends `timeout`). *(P1-S3: `retry` with backoff + `retries_exhausted` DONE; `budget.wall` still pending.)* - [x] `retry` with backoff into `failed`/requeue; `budget.wall` enforced (extends `timeout`). *(P1-S3: `retry` with backoff + `retries_exhausted` DONE. `budget.wall` DONE: parsed from `budget: { wall: <dur> }`, armed as a HARD wall-clock ceiling alongside `timeout` (whichever fires first binds), expiry → `failed` result=`budget_exceeded`, non-retryable by default.)*
- [x] `allowed-scope` guardrail (warn-only this phase) + post-run diff report. *(P1-S2: `scope_check` WARN-only + `scope_warning=`.)* - [x] `allowed-scope` guardrail (warn-only this phase) + post-run diff report. *(P1-S2: `scope_check` WARN-only + `scope_warning=`.)*
- [x] **Tracker adapter** `aq from-tracker <ITEM>` + `aq to-tracker` event poster (§10 P1). *(P1-S4: curl-only `tracker_api`; from-tracker materializes a job (idempotent), to-tracker echoes status+metrics one-way; opt-in `AQ_TRACKER_AUTO`. A standalone background poller is deferred to P2.)* - [x] **Tracker adapter** `aq from-tracker <ITEM>` + `aq to-tracker` event poster (§10 P1). *(P1-S4: curl-only `tracker_api`; from-tracker materializes a job (idempotent), to-tracker echoes status+metrics one-way; opt-in `AQ_TRACKER_AUTO`. A standalone background poller is deferred to P2.)*
- [ ] Dashboard shows profile + priority + capability tags + tracker-item link. *(P1-S1: `status` shows priority/profile/caps/tracker-item; P1-S4: status/insights also show last echoed tracker status; Node `dash` surfacing pending.)* - [ ] Dashboard shows profile + priority + capability tags + tracker-item link. *(P1-S1: `status` shows priority/profile/caps/tracker-item; P1-S4: status/insights also show last echoed tracker status; Node `dash` surfacing pending.)*

View File

@ -380,7 +380,40 @@ DEVIN_BIN="$crashstub" "$AQ" run --once >/dev/null 2>&1
|| fail "retry(crash) should not retry when crash not in on (result=$(metaval "$AGENT_QUEUE_ROOT/.state/nocrashjob.meta" result) attempts=$(metaval "$AGENT_QUEUE_ROOT/.state/nocrashjob.meta" attempts))" || fail "retry(crash) should not retry when crash not in on (result=$(metaval "$AGENT_QUEUE_ROOT/.state/nocrashjob.meta" result) attempts=$(metaval "$AGENT_QUEUE_ROOT/.state/nocrashjob.meta" attempts))"
unset AGENT_QUEUE_POLL unset AGENT_QUEUE_POLL
# 17. insights parse: a stub log with a usage line → parse_usage records tokens/ # 16b. budget.wall — unit + live enforcement.
# unit: _budget_wall_secs parses `budget: { wall: <dur> }`; _effective_kill
# picks the binding ceiling + class (budget.wall extends timeout).
# live: a long-running stub under budget.wall:2s is hard-killed -> failed/
# result=budget_exceeded (NOT timeout), and is non-retryable by default.
bfuncs="$tmp/aq-funcs-budget.sh"; sed '/^main "\$@"/d' "$AQ" > "$bfuncs"
bwfile="$tmp/budget-task.md"
printf '%s\n' '---' 'engine: devin' 'budget: { usd: 5, tokens: 2M, wall: 4h }' '---' '' '# b' > "$bwfile"
if bash -c 'set -uo pipefail; source "'"$bfuncs"'"
[ "$(_budget_wall_secs "'"$bwfile"'")" = "14400" ] || exit 1
[ "$(_effective_kill 0 0)" = "0 timeout" ] || exit 2
[ "$(_effective_kill 600 0)" = "600 timeout" ] || exit 3
[ "$(_effective_kill 0 300)" = "300 budget_exceeded" ] || exit 4
[ "$(_effective_kill 600 300)" = "300 budget_exceeded" ] || exit 5
[ "$(_effective_kill 300 600)" = "300 timeout" ] || exit 6'; then
pass "budget.wall: _budget_wall_secs parses wall=4h; _effective_kill binds earliest ceiling + class"
else
fail "budget.wall unit logic wrong (parse/effective-kill)"
fi
sleepstub="$tmp/sleep-engine"
printf '#!/usr/bin/env bash\nsleep 30\nexit 0\n' > "$sleepstub"; chmod +x "$sleepstub"
export AGENT_QUEUE_ROOT="$tmp/queue-budget"
"$AQ" init >/dev/null
printf '%s\n' '---' 'engine: devin' "cwd: $work" 'yolo: true' \
'budget: { wall: 2s }' 'retry: { max: 2, backoff: 1s, on: [crash] }' '---' '' '# budget-wall task' \
> "$AGENT_QUEUE_ROOT/inbox/budgetjob.md"
DEVIN_BIN="$sleepstub" "$AQ" run --once >/dev/null 2>&1
if [ "$(metaval "$AGENT_QUEUE_ROOT/.state/budgetjob.meta" result)" = "budget_exceeded" ] \
&& ls "$AGENT_QUEUE_ROOT"/failed/budgetjob.md >/dev/null 2>&1; then
pass "budget.wall: long run hard-killed at wall ceiling -> failed/ (result=budget_exceeded, non-retryable)"
else
fail "budget.wall live enforcement wrong (result=$(metaval "$AGENT_QUEUE_ROOT/.state/budgetjob.meta" result))"
fi
unset AGENT_QUEUE_ROOT
# cost into meta; `insights <job>` prints them; a no-usage log doesn't crash. # cost into meta; `insights <job>` prints them; a no-usage log doesn't crash.
export AGENT_QUEUE_ROOT="$tmp/queue-usage" export AGENT_QUEUE_ROOT="$tmp/queue-usage"
usagestub="$tmp/usage-engine" usagestub="$tmp/usage-engine"