From b7a9ea1b7a5b6e2dffffe43bd22a747dfdc1530a Mon Sep 17 00:00:00 2001 From: saravanakumardb1 Date: Fri, 29 May 2026 21:35:06 -0700 Subject: [PATCH] =?UTF-8?q?feat(agent-queue):=20tracker=20adapter=20?= =?UTF-8?q?=E2=80=94=20task=20<->=20job=20round-trip=20(P1-S4)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements §10 single-host tracker integration, closing the last Phase-1 §14 item: - tracker_api: one curl-only HTTP wrapper (base URL + bearer + productId header), overridable via AQ_TRACKER_API_CMD so tests need no live service. Emits the response body + a trailing HTTP-code line; _api_call splits into API_BODY/API_CODE. - aq from-tracker : GET the Item, map title/description -> job body, labels (engine-class:/profile:/priority:/cap:) + Item priority -> frontmatter, and stamp tracker-item + a stable idempotency-key tracker-. Materializes a .md into inbox/ via cmd_add; idempotent (Slice 1 dedupe) so a re-pull never dups. JSON parsed with POSIX awk (no jq) — mac + linux safe. - aq to-tracker : one-way echo (child -> tracker, §24.5). PATCHes the Item status (building/review/testing->in_progress, shipped->done, failures->wont_fix, all overridable) and posts a metrics-only comment (result/attempts/duration/ tokens/cost/diff — NEVER prompt content or secrets). Idempotent via meta tracker_echoed; an echo failure (e.g. HTTP 500) is logged and non-fatal — the tracker is downstream, never authoritative for execution. - Opt-in auto-echo (AQ_TRACKER_AUTO=1, default OFF): the worker echoes on each transition (building via cmd_run, review/testing/failed via run_worker, shipped via ship/promote); never blocks or fails a job. - status + insights surface tracker-item and the last echoed status. curl-only HTTP; no new runtime deps; conventional + backward-compatible. --- agent-queue/agent-queue.sh | 258 ++++++++++++++++++++++++++++++++++++- 1 file changed, 257 insertions(+), 1 deletion(-) diff --git a/agent-queue/agent-queue.sh b/agent-queue/agent-queue.sh index f76b508..4046dd6 100755 --- a/agent-queue/agent-queue.sh +++ b/agent-queue/agent-queue.sh @@ -66,6 +66,25 @@ CLAUDE_BIN="${CLAUDE_BIN:-$(command -v claude || echo claude)}" CODEX_BIN="${CODEX_BIN:-$(command -v codex || echo codex)}" COPILOT_BIN="${COPILOT_BIN:-$(command -v copilot || echo copilot)}" +# ── Tracker integration (§10) — task <-> job round-trip via the items API ── +# Base URL of the platform-service items API; the items routes live under /api. +AQ_TRACKER_API="${AQ_TRACKER_API:-http://localhost:4003}" +# Bearer token (required for real calls; never hardcode). productId stamps Items. +AQ_TRACKER_TOKEN="${AQ_TRACKER_TOKEN:-}" +AQ_PRODUCT_ID="${AQ_PRODUCT_ID:-}" +# cwd a tracker-derived job runs in (Items carry no cwd); defaults to the invoking dir. +AQ_TRACKER_CWD="${AQ_TRACKER_CWD:-$PWD}" +# Auto-echo job outcomes back to the tracker on each transition (opt-in, default OFF). +AQ_TRACKER_AUTO="${AQ_TRACKER_AUTO:-0}" +# Item status the API uses for each bucket (the items API has no blocked/failed +# status, so failures map to wont_fix by default — all overridable). +AQ_TRACKER_STATUS_INPROGRESS="${AQ_TRACKER_STATUS_INPROGRESS:-in_progress}" +AQ_TRACKER_STATUS_DONE="${AQ_TRACKER_STATUS_DONE:-done}" +AQ_TRACKER_STATUS_FAILED="${AQ_TRACKER_STATUS_FAILED:-wont_fix}" +# Test seam: a stub script that replaces the real curl HTTP (see selftest.sh). +AQ_TRACKER_API_CMD="${AQ_TRACKER_API_CMD:-}" +CURL_BIN="${CURL_BIN:-$(command -v curl || echo curl)}" + # ── Colors ────────────────────────────────────────────────────────── if [[ -t 1 ]]; then C_RESET=$'\033[0m'; C_DIM=$'\033[2m'; C_BOLD=$'\033[1m' @@ -613,6 +632,7 @@ run_worker() { } >> "$logf" mv "$doing_file" "$FAILED/" 2>/dev/null { echo "result=capability_mismatch"; echo "ended=$(date +%s)"; } >> "$metaf" + _auto_echo "$job" return 0 fi fi @@ -628,6 +648,7 @@ run_worker() { } >> "$logf" mv "$doing_file" "$FAILED/" 2>/dev/null { echo "result=no_engine"; echo "ended=$(date +%s)"; } >> "$metaf" + _auto_echo "$job" return 0 fi @@ -642,6 +663,7 @@ run_worker() { echo "FATAL: cwd does not exist: $cwd" >> "$logf" mv "$doing_file" "$FAILED/" 2>/dev/null echo "result=failed" >> "$metaf"; echo "ended=$(date +%s)" >> "$metaf" + _auto_echo "$job" return 1 fi @@ -773,6 +795,9 @@ run_worker() { echo "FAILED (rc=$rc): $(date)" >> "$logf" _finish_failure "$job" "$doing_file" "$metaf" "$logf" "crash" "$rc" "$started" fi + + # Opt-in echo of the resting/terminal outcome back to the tracker (§10). + _auto_echo "$job" } # ── Resilience & insights helpers (Phase 1 — single-host §25/§26) ──── @@ -1029,6 +1054,222 @@ _insights_line() { } # ── Commands ──────────────────────────────────────────────────────── +# ── Tracker integration (§10) — task <-> job round-trip ───────────── +# +# tracker_api [JSON] -> emits the response body, then a final +# line with the HTTP status code. ALL HTTP goes through here (curl only). Tests +# replace it wholesale via AQ_TRACKER_API_CMD so no live service is needed. +tracker_api() { + local method=$1 path=$2 body=${3:-} + if [[ -n "$AQ_TRACKER_API_CMD" ]]; then + "$AQ_TRACKER_API_CMD" "$method" "$path" "$body" + return $? + fi + local url="${AQ_TRACKER_API}${path}" + local -a args=(-sS -m "${AQ_TRACKER_TIMEOUT:-30}" -X "$method" + -H "Content-Type: application/json" -w '\n%{http_code}') + [[ -n "$AQ_TRACKER_TOKEN" ]] && args+=(-H "Authorization: Bearer $AQ_TRACKER_TOKEN") + [[ -n "$AQ_PRODUCT_ID" ]] && args+=(-H "X-Product-Id: $AQ_PRODUCT_ID") + [[ -n "$body" ]] && args+=(--data "$body") + local out rc + out=$("$CURL_BIN" "${args[@]}" "$url" 2>/dev/null); rc=$? + if [[ $rc -ne 0 ]]; then printf '%s\n000\n' "$out"; else printf '%s\n' "$out"; fi +} + +# _api_call [JSON] -> sets globals API_BODY + API_CODE. +_api_call() { + local out; out=$(tracker_api "$@") + API_CODE=$(printf '%s' "$out" | tail -n1) + API_BODY=$(printf '%s' "$out" | sed '$d') +} + +# _json_str (reads JSON on stdin) -> the top-level string value, unescaped. +# Pure awk (POSIX) — mac + linux safe, no jq dependency. +_json_str() { + awk -v key="$1" ' + { json = json $0 } + END { + pat = "\"" key "\"" + i = index(json, pat); if (i == 0) exit + rest = substr(json, i + length(pat)) + sub(/^[ \t]*:[ \t]*/, "", rest) + if (substr(rest, 1, 1) != "\"") exit + rest = substr(rest, 2); n = length(rest) + for (j = 1; j <= n; j++) { + c = substr(rest, j, 1) + if (c == "\\") { j++; e = substr(rest, j, 1) + if (e == "n") out = out "\n"; else if (e == "t") out = out "\t" + else if (e == "r") out = out "\r"; else out = out e } + else if (c == "\"") break + else out = out c + } + printf "%s", out + }' +} + +# _json_labels (reads JSON on stdin) -> one labels[] entry per line. +_json_labels() { + awk ' + { json = json $0 } + END { + i = index(json, "\"labels\""); if (i == 0) exit + rest = substr(json, i); lb = index(rest, "["); rb = index(rest, "]") + if (lb == 0 || rb == 0 || rb < lb) exit + arr = substr(rest, lb + 1, rb - lb - 1) + while (match(arr, /"([^"\\]|\\.)*"/)) { + tok = substr(arr, RSTART + 1, RLENGTH - 2) + gsub(/\\"/, "\"", tok); print tok + arr = substr(arr, RSTART + RLENGTH) + } + }' +} + +# _json_escape -> the text as a JSON string body (no surrounding quotes). +_json_escape() { + printf '%s' "$1" | awk ' + { line = $0; gsub(/\\/, "\\\\", line); gsub(/"/, "\\\"", line); gsub(/\t/, "\\t", line) + out = out (NR > 1 ? "\\n" : "") line } + END { printf "%s", out }' +} + +# _tracker_status_for -> the Item status for a job result/stage. +_tracker_status_for() { + case "$1" in + shipped) printf '%s' "$AQ_TRACKER_STATUS_DONE";; + failed|timeout|verify_failed|retries_exhausted|capability_mismatch|no_engine|rejected) + printf '%s' "$AQ_TRACKER_STATUS_FAILED";; + *) printf '%s' "$AQ_TRACKER_STATUS_INPROGRESS";; + esac +} + +# _tracker_note -> a metrics-only summary line. +# NEVER includes prompt/body content or secrets — only run metrics (§24.5/§26). +_tracker_note() { + local jn=$1 metaf=$2 result=$3 status=$4 s attempts dur ti to cost la ld + attempts=$(_meta_val "$metaf" attempts); dur=$(_meta_val "$metaf" duration_s) + ti=$(_meta_val "$metaf" tokens_in); to=$(_meta_val "$metaf" tokens_out) + cost=$(_meta_val "$metaf" cost_usd) + la=$(_meta_val "$metaf" lines_added); ld=$(_meta_val "$metaf" lines_deleted) + s="agent-queue: job ${jn} -> ${result:-$status} (status=${status})." + [[ -n "$attempts" ]] && s+=" attempts=${attempts}." + [[ -n "$dur" ]] && s+=" duration=${dur}s." + [[ -n "$ti$to" ]] && s+=" tokens=${ti:-0}/${to:-0}." + [[ -n "$cost" ]] && s+=" cost_usd=${cost}." + [[ -n "$la$ld" ]] && s+=" diff=+${la:-0}/-${ld:-0}." + printf '%s' "$s" +} + +# from-tracker — pull a tracker Item and materialize a job in inbox/. +# Idempotent on the derived key `tracker-` (Slice 1 dedupe). +cmd_from_tracker() { + ensure_dirs + local item_id="${1:-}" + [[ -n "$item_id" ]] || die "usage: from-tracker " + _api_call GET "/api/items/$item_id" + case "$API_CODE" in 2*) :;; *) die "from-tracker: items API returned HTTP ${API_CODE:-error} for item $item_id";; esac + + local title desc iprio + title=$(printf '%s' "$API_BODY" | _json_str title) + desc=$(printf '%s' "$API_BODY" | _json_str description) + iprio=$(printf '%s' "$API_BODY" | _json_str priority) + [[ -n "$title$desc" ]] || die "from-tracker: item $item_id has no title/description (HTTP $API_CODE)" + + # labels carry optional manifest hints: engine-class:, profile:, priority:, cap: + local engine_class="" profile="" prio="" caps_list="" label + while IFS= read -r label; do + [[ -n "$label" ]] || continue + case "$label" in + engine-class:*) engine_class=${label#engine-class:};; + profile:*) profile=${label#profile:};; + priority:*) prio=${label#priority:};; + cap:*) caps_list+="${label#cap:}, ";; + esac + done < <(printf '%s' "$API_BODY" | _json_labels) + prio=${prio:-${iprio:-medium}} + case "$prio" in critical|high|medium|low) :;; *) prio=medium;; esac + + # Materialize into a .md file (so cmd_add/the queue recognize it). mktemp -d is + # the portable way to get a unique path with a fixed (.md) basename on mac+linux. + local safe_id tmpdir tmp + safe_id=$(printf '%s' "$item_id" | tr -c 'A-Za-z0-9._-' '_') + tmpdir=$(mktemp -d "${TMPDIR:-/tmp}/aq-fromtracker.XXXXXX") + tmp="$tmpdir/tracker-$safe_id.md" + { + echo "---" + echo "cwd: $AQ_TRACKER_CWD" + echo "yolo: true" + echo "priority: $prio" + [[ -n "$engine_class" ]] && echo "engine-class: $engine_class" + [[ -n "$profile" ]] && echo "profile: $profile" + [[ -n "$caps_list" ]] && echo "capabilities: [${caps_list%, }]" + echo "tracker-item: $item_id" + echo "idempotency-key: tracker-$item_id" + echo "---" + echo + [[ -n "$title" ]] && { echo "# $title"; echo; } + printf '%s\n' "$desc" + } > "$tmp" + + cmd_add "$tmp" + rm -rf "$tmpdir" + local created; created=$(grep -lE "^tracker-item:[[:space:]]*${item_id}[[:space:]]*\$" "$INBOX"/*.md 2>/dev/null | head -1) + if [[ -n "$created" ]]; then + log "from-tracker: item $item_id -> $C_BOLD$(basename "$created")$C_RESET" + else + log "from-tracker: item $item_id already queued elsewhere (deduped)" + fi +} + +# to-tracker — one-way echo of a job's CURRENT outcome to its tracker Item +# (child -> tracker, §24.5). Idempotent via meta `tracker_echoed`; never fatal. +cmd_to_tracker() { + ensure_dirs + local job="${1:-}" + [[ -n "$job" ]] || die "usage: to-tracker " + local metaf="$STATE/$job.meta" + [[ -f "$metaf" ]] || metaf=$(ls -1t "$STATE"/*"$job"*.meta 2>/dev/null | head -1) + [[ -f "$metaf" ]] || die "to-tracker: no meta for job '$job'" + local jn; jn=$(basename "$metaf"); jn=${jn%.meta} + + local item_id; item_id=$(_meta_val "$metaf" tracker_item) + if [[ -z "$item_id" ]]; then + log "to-tracker: $jn has no tracker-item — nothing to echo" + return 0 + fi + local result status last + result=$(_meta_val "$metaf" result) + status=$(_tracker_status_for "$result") + last=$(_meta_val "$metaf" tracker_echoed) + if [[ "$last" == "$status" ]]; then + log "to-tracker: $jn already echoed status=$status to $item_id (no-op)" + return 0 + fi + + # 1) status transition + _api_call PATCH "/api/items/$item_id/status" "{\"status\":\"$status\"}" + case "$API_CODE" in + 2*) :;; + *) err "to-tracker: status PATCH for $item_id failed (HTTP ${API_CODE:-error}) — non-fatal; job state unchanged"; return 0;; + esac + # 2) metrics-only comment (never prompt content / secrets) + local note; note=$(_tracker_note "$jn" "$metaf" "$result" "$status") + _api_call POST "/api/items/$item_id/comments" "{\"body\":\"$(_json_escape "$note")\"}" + case "$API_CODE" in + 2*) :;; + *) err "to-tracker: comment for $item_id failed (HTTP ${API_CODE:-error}) — non-fatal";; + esac + # 3) record echoed status (idempotency) + { echo "tracker_echoed=$status"; echo "tracker_echoed_at=$(date +%s)"; } >> "$metaf" + log "to-tracker: echoed $jn -> item $item_id (status=$status)" +} + +# _auto_echo — opt-in (AQ_TRACKER_AUTO=1) best-effort echo on a transition. +# Never blocks or fails the job: the tracker is downstream, not authoritative. +_auto_echo() { + [[ "$AQ_TRACKER_AUTO" == 1 ]] || return 0 + cmd_to_tracker "$1" >/dev/null 2>&1 || true +} + cmd_init() { ensure_dirs; log "queue initialized at $C_BOLD$QUEUE_ROOT$C_RESET"; } cmd_add() { @@ -1215,6 +1456,7 @@ cmd_run() { run_worker "$doing_file" & { echo "pid=$!"; echo "pidstart=$(_pidstart "$!")"; } >> "$STATE/$job.meta" log "▶ launching $C_BOLD$job$C_RESET (engine=$w_eng, lock=$w_key)" + _auto_echo "$job" # building -> in_progress (opt-in) sleep 1 running=$(active_workers) done @@ -1269,10 +1511,12 @@ cmd_status() { local m_prio m_prof m_caps m_trk extra="" m_prio=$(grep '^priority=' "$f" | cut -d= -f2-); m_prof=$(grep '^profile=' "$f" | cut -d= -f2-) m_caps=$(grep '^capabilities=' "$f" | cut -d= -f2-); m_trk=$(grep '^tracker_item=' "$f" | cut -d= -f2-) + local m_echo; m_echo=$(grep '^tracker_echoed=' "$f" | tail -1 | cut -d= -f2-) [[ -n "$m_prio" ]] && extra+="prio=$m_prio " [[ -n "$m_prof" ]] && extra+="profile=$m_prof " [[ -n "$m_caps" ]] && extra+="caps=$m_caps " [[ -n "$m_trk" ]] && extra+="tracker=$m_trk " + [[ -n "$m_echo" ]] && extra+="echoed=$m_echo " [[ -n "$extra" ]] && printf ' %s%s%s\n' "$C_DIM" "$extra" "$C_RESET" printf ' %s%s%s\n' "$C_DIM" "$(_insights_line "$f")" "$C_RESET" done @@ -1315,7 +1559,7 @@ cmd_insights() { for k in engine result attempts started ended duration_s exit verify_exit \ model tokens_in tokens_out tokens_cached cost_usd turns tool_calls usage_estimated \ files_changed lines_added lines_deleted wip_branch wip_base wip_commit \ - next_eligible retry_class recovered; do + next_eligible retry_class recovered tracker_item tracker_echoed tracker_echoed_at; do val=$(_meta_val "$f" "$k") [[ -n "$val" ]] && printf ' %-15s %s\n' "$k" "$val" done @@ -1428,6 +1672,7 @@ cmd_ship() { mv "$f" "$SHIPPED/$base" [[ -f "$STATE/$name.meta" ]] && echo "result=shipped" >> "$STATE/$name.meta" log "shipped $C_BOLD$base$C_RESET (testing → shipped)" + _auto_echo "$name" } # promote — advance one stage forward: review → testing → shipped. @@ -1447,6 +1692,7 @@ cmd_promote() { mv "$f" "$dest/$base" [[ -f "$STATE/$name.meta" ]] && echo "result=$result" >> "$STATE/$name.meta" log "promoted $C_BOLD$base$C_RESET ($from → $result)" + _auto_echo "$name" } # reject — move a review/testing job to failed/ (manual gate rejection). @@ -1516,6 +1762,8 @@ ${C_BOLD}COMMANDS${C_RESET} watch [interval] live status (default 2s, bash) insights [job] per-job metrics, or recent table + per-engine rollup recover reclaim orphaned building/ jobs (dead worker) -> inbox + from-tracker pull a tracker Item -> materialize a job in inbox/ (§10) + to-tracker echo a job's outcome to its tracker Item (one-way) dash [--interval N] richer live Node dashboard (recent shipped/failed too) stop kill running workers + the run loop logs [-f] print (or follow) a job's log @@ -1565,6 +1813,12 @@ ${C_BOLD}ENV${C_RESET} AGENT_QUEUE_ROOT (=$QUEUE_ROOT) AGENT_QUEUE_MAX (=$MAX_CONCURRENCY) AGENT_QUEUE_ENGINE (=$DEFAULT_ENGINE) AGENT_QUEUE_VERIFY (default verify cmd) DEVIN_BIN / CLAUDE_BIN / CODEX_BIN / COPILOT_BIN + +${C_BOLD}TRACKER${C_RESET} (§10 — from-tracker / to-tracker; real use needs platform-service + a token) + AQ_TRACKER_API (=$AQ_TRACKER_API) AQ_TRACKER_TOKEN (bearer) AQ_PRODUCT_ID + AQ_TRACKER_AUTO=1 to auto-echo outcomes on each transition (default OFF) + AQ_TRACKER_CWD (cwd for tracker-derived jobs) AQ_TRACKER_API_CMD (test stub seam) + label hints on an Item: engine-class: profile: priority: cap: EOF } @@ -1578,6 +1832,8 @@ main() { watch) cmd_watch "$@";; insights) cmd_insights "$@";; recover) cmd_recover "$@";; + from-tracker) cmd_from_tracker "$@";; + to-tracker) cmd_to_tracker "$@";; dash|dashboard) cmd_dash "$@";; stop) cmd_stop "$@";; logs) cmd_logs "$@";;