diff --git a/agent-queue/demo/README.md b/agent-queue/demo/README.md new file mode 100644 index 0000000..626fade --- /dev/null +++ b/agent-queue/demo/README.md @@ -0,0 +1,78 @@ +# Two-Factory Parallel Demo (Phase-2 Exit Criteria, §14) + +This demo closes the final Phase-2 exit-criteria box: **≥2 factories executing jobs in +parallel through one coordinator**, proving the concurrency guarantees end-to-end. It is a +**harness over the existing runtime** — it does *not* change `agent-queue.sh` or +`lib/fleet-client.sh`; it starts two real `agent-queue.sh run` daemons (distinct +factoryIds, separate queues/cwds) that compete **only** through the coordinator, then +observes and asserts. + +## The three guarantees it proves + +| # | Guarantee | How it's shown | +|---|-----------|----------------| +| **(a)** | **No double-assign** | Each of the 3 jobs is claimed/executed by exactly **one** factory. The coordinator's atomic claim (lock-guarded; only a `queued` job is claimable) means two concurrent claimers never get the same job version. | +| **(b)** | **Fencing + reclaim** | One factory is **killed mid-job**. The reaper returns its in-flight job to `queued` with a **bumped lease epoch**; the surviving factory **reclaims and completes** it. The dead worker's late/zombie report (stale epoch) is **fenced (HTTP 409)** and never ships. | +| **(c)** | **Parallelism** | Both factories hold an active job **simultaneously** (observed in coordinator state) — work is concurrent, not serialized. | + +## Run it + +### Stub mode (default, zero dependencies, CI-safe) + +```bash +bash demo/two-factory-demo.sh +``` + +Drives [`coordinator-stub.sh`](coordinator-stub.sh) — a stateful, lock-guarded, file-backed +coordinator that implements the same claim / lease / fence / reaper contract as +platform-service, via the existing `AQ_FLEET_API_CMD` test seam. No platform-service, no +Cosmos, no network. This is exactly what `selftest.sh` runs headlessly. + +### Real-coordinator mode (against a live platform-service) + +```bash +DEMO_MODE=real \ + AQ_FLEET_API=http://localhost:4003/api \ + AQ_FLEET_TOKEN= \ + AQ_PRODUCT_ID= \ + bash demo/two-factory-demo.sh +``` + +In real mode the demo submits via the platform-service fleet API and relies on the +coordinator's **own lease reaper** to reclaim the killed factory's job (it waits +`DEMO_REAP_WAIT` seconds; pair with a short `AQ_FLEET_LEASE_SECONDS` so the lease expires +quickly). Submit endpoint is overridable via `DEMO_SUBMIT_PATH` (default `/fleet/jobs`). +Real mode is observational/best-effort — the machine-checked assertions run in stub mode +(and in `selftest.sh`). + +## Env knobs + +| Var | Default | Meaning | +|-----|---------|---------| +| `DEMO_MODE` | `stub` | `stub` or `real` (auto-set to `real` when `AQ_FLEET_API`+`AQ_FLEET_TOKEN` are set and `DEMO_MODE` ≠ `stub`) | +| `DEMO_JOB_SLEEP` | `2` | per-job engine seconds — the window during which the victim is killed mid-job | +| `DEMO_TIMEOUT` | `60` | max seconds to wait for the survivor to drain all 3 jobs | +| `DEMO_POLL` | `0.2` | coordinator-state poll interval | +| `DEMO_FACTORY_1` / `DEMO_FACTORY_2` | `mac-1` / `ubuntu-1` | factory ids (F1 is the victim) | +| `DEMO_KEEP` | `0` | `1` keeps the temp dir (queues, logs, coordinator state) for inspection | +| `DEMO_REAP_WAIT` / `DEMO_DRAIN_WAIT` | `20` / `30` | real-mode waits for the coordinator reaper / drain | + +## What to watch + +The demo prints a step-by-step trace and a final `RESULTS` block. The key lines: + +- `PARALLELISM observed: mac-1 and ubuntu-1 both holding active jobs concurrently` — guarantee (c). +- `killed factory mac-1 ... mid-job` then `reaper reclaimed mac-1's lease(s)` — the crash + reclaim. +- `zombie report for @epoch=N was FENCED (HTTP 409)` — guarantee (b) fencing. +- `RESULTS` shows each job's winning factory; the reclaimed job's winner is the **survivor**. + +With `DEMO_KEEP=1`, inspect under the printed temp dir: + +- `coord/events.log` — the coordinator's audit trail: `CLAIM` / `PATCH:` / `RECLAIM` / `FENCE` events (factory + epoch on each). +- `coord/jobs/.job` — final per-job `stage` / `holder` / `epoch`. +- `log-mac-1.txt`, `log-ubuntu-1.txt` — each factory's run-loop log (claims, the `▶ launching`, the fenced/quarantine path on the killed worker). + +## Files + +- `two-factory-demo.sh` — the orchestrator (start factories, kill/reclaim/fence, assert). +- `coordinator-stub.sh` — the stateful coordinator stub (claim/patch/fence/renew/release/reap, mkdir-locked). diff --git a/agent-queue/demo/coordinator-stub.sh b/agent-queue/demo/coordinator-stub.sh new file mode 100755 index 0000000..52beaec --- /dev/null +++ b/agent-queue/demo/coordinator-stub.sh @@ -0,0 +1,151 @@ +#!/usr/bin/env bash +# +# coordinator-stub.sh — a STATEFUL, concurrency-safe fleet-coordinator stub for the +# two-factory demo + its selftest. It is the same "AQ_FLEET_API_CMD responder" pattern +# the existing fleet selftests use (invoked as ` `, prints the +# response body then a final HTTP-code line), EXTENDED with file-backed shared state + +# an mkdir lock so >=2 competing factory processes coordinate through ONE coordinator — +# exactly modeling platform-service's claim / lease / fence / reaper contract +# (../../learning_ai_common_plat/services/platform-service/src/modules/fleet/coordinator.ts). +# +# It is curl-free + dependency-free (bash + POSIX awk/sed/grep) so the demo runs in CI +# with zero external services. Real-coordinator mode bypasses this entirely (the demo +# talks to platform-service over HTTP when AQ_FLEET_API/AQ_FLEET_TOKEN are set). +# +# Contract implemented (paths under the caller's AQ_FLEET_API base, which includes /api): +# POST /fleet/factories/heartbeat -> {"ok":true} 200 +# POST /fleet/claim -> {"claimed":true,"job":{id,bodyMd,leaseEpoch},"lease":{leaseEpoch}} | {"claimed":false} +# PATCH /fleet/jobs/:id -> 200 | 409 (stale leaseEpoch => FENCED) +# POST /fleet/jobs/:id/lease/renew -> 200 | 409 (fenced) +# POST /fleet/jobs/:id/lease/release -> 200 +# POST /fleet/_reap -> {"reaped":N} 200 (DEMO-only admin: models the +# coordinator reaper reclaiming a dead factory's +# leases — returns its in-flight jobs to `queued` +# and BUMPS the epoch so the zombie is fenced) +# +# Atomicity: every state mutation runs inside an mkdir spin-lock, so under true +# concurrency EXACTLY ONE claimer wins a given job version (no double-assign), and a +# report carrying an epoch older than the stored epoch is rejected (409) — the same +# guarantees the real rev/_etag compare-and-swap provides. +# +# State (under $COORD_STATE, set by the demo): +# order submit-ordered job ids (one per line) +# jobs/.job key=val lines: stage, holder, epoch, body +# events.log append-only audit: " job= factory= epoch=" +# lock/ the mkdir lock dir +# +# Stages: queued -> assigned -> building -> review|testing -> shipped (terminal); +# failed/dead_letter terminal. Reclaimable (active) = assigned|building|review|testing. + +set -uo pipefail + +METHOD="${1:-}"; RPATH="${2:-}"; BODY="${3:-}" +: "${COORD_STATE:?coordinator-stub.sh requires COORD_STATE}" +JOBS_DIR="$COORD_STATE/jobs" +EVENTS="$COORD_STATE/events.log" +LOCK="$COORD_STATE/lock" + +# ── JSON field extraction (no jq) ─────────────────────────────────────────── +_str_field() { printf '%s' "$BODY" | sed -n 's/.*"'"$1"'"[[:space:]]*:[[:space:]]*"\([^"]*\)".*/\1/p' | head -1; } +_num_field() { printf '%s' "$BODY" | grep -oE "\"$1\"[[:space:]]*:[[:space:]]*-?[0-9]+" | grep -oE -- '-?[0-9]+$' | head -1; } +# job id from /fleet/jobs/ or /fleet/jobs//lease/ +_job_id_from_path() { printf '%s' "$RPATH" | sed -e 's#^/fleet/jobs/##' -e 's#/lease/.*$##'; } + +# ── lock (mkdir is atomic on POSIX filesystems) ───────────────────────────── +_lock() { local n=0; until mkdir "$LOCK" 2>/dev/null; do sleep 0.02; n=$((n+1)); [ "$n" -gt 5000 ] && break; done; } +_unlock() { rmdir "$LOCK" 2>/dev/null || true; } + +_jobfile() { printf '%s/%s.job\n' "$JOBS_DIR" "$1"; } +_get() { grep -E "^$2=" "$1" 2>/dev/null | head -1 | cut -d= -f2-; } +_set() { # : replace or append key=val + local f=$1 k=$2 v=$3 tmp; tmp="$f.tmp.$$" + if grep -qE "^$k=" "$f" 2>/dev/null; then + sed "s#^$k=.*#$k=$v#" "$f" > "$tmp" && mv "$tmp" "$f" + else + printf '%s=%s\n' "$k" "$v" >> "$f" + fi +} +_event() { printf '%s %s\n' "$(date +%s)" "$*" >> "$EVENTS"; } +_is_active() { case "$1" in assigned|building|review|testing) return 0;; *) return 1;; esac; } + +_emit() { printf '%s\n%s\n' "$1" "$2"; } # + +case "$METHOD $RPATH" in + "POST /fleet/factories/heartbeat") + _emit '{"ok":true}' 200 ;; + + "POST /fleet/claim") + factory=$(_str_field factoryId) + _lock + claimed_id="" + if [ -f "$COORD_STATE/order" ]; then + while IFS= read -r jid; do + [ -n "$jid" ] || continue + jf=$(_jobfile "$jid") + [ -f "$jf" ] || continue + if [ "$(_get "$jf" stage)" = "queued" ]; then claimed_id="$jid"; break; fi + done < "$COORD_STATE/order" + fi + if [ -n "$claimed_id" ]; then + jf=$(_jobfile "$claimed_id") + epoch=$(( $(_get "$jf" epoch) + 1 )) + _set "$jf" stage assigned; _set "$jf" holder "$factory"; _set "$jf" epoch "$epoch" + body=$(_get "$jf" body) + _event "CLAIM job=$claimed_id factory=$factory epoch=$epoch" + _unlock + _emit "{\"claimed\":true,\"job\":{\"id\":\"$claimed_id\",\"bodyMd\":\"$body\",\"leaseEpoch\":$epoch},\"lease\":{\"leaseEpoch\":$epoch}}" 200 + else + _unlock + _emit '{"claimed":false}' 200 + fi ;; + + PATCH\ /fleet/jobs/*) + jid=$(_job_id_from_path); stage=$(_str_field stage); rep_epoch=$(_num_field leaseEpoch) + jf=$(_jobfile "$jid") + _lock + if [ ! -f "$jf" ]; then _unlock; _emit '{}' 404 + else + cur_epoch=$(_get "$jf" epoch) + if [ -n "$rep_epoch" ] && [ "$rep_epoch" -lt "$cur_epoch" ]; then + _event "FENCE job=$jid factory=$(_get "$jf" holder) epoch=$rep_epoch fenced (zombie rejected) + else + [ -n "$stage" ] && _set "$jf" stage "$stage" + _event "PATCH:$stage job=$jid factory=$(_get "$jf" holder) epoch=$rep_epoch" + _unlock; _emit '{}' 200 + fi + fi ;; + + POST\ /fleet/jobs/*/lease/renew) + jid=$(_job_id_from_path); rep_epoch=$(_num_field leaseEpoch); jf=$(_jobfile "$jid") + _lock + cur_epoch=$(_get "$jf" epoch 2>/dev/null) + if [ -n "$rep_epoch" ] && [ -n "$cur_epoch" ] && [ "$rep_epoch" -lt "$cur_epoch" ]; then + _event "RENEW_FENCE job=$jid epoch=$rep_epoch the dead worker's old epoch is now stale (fenced) + _set "$jf" stage queued; _set "$jf" holder ""; _set "$jf" epoch "$epoch" + _event "RECLAIM job=$jid factory=$factory epoch=$epoch" + n=$((n+1)) + fi + done + _unlock + _emit "{\"reaped\":$n}" 200 ;; + + *) _emit '{}' 200 ;; +esac diff --git a/agent-queue/demo/two-factory-demo.sh b/agent-queue/demo/two-factory-demo.sh new file mode 100755 index 0000000..2073e83 --- /dev/null +++ b/agent-queue/demo/two-factory-demo.sh @@ -0,0 +1,248 @@ +#!/usr/bin/env bash +# +# two-factory-demo.sh — Phase-2 EXIT-CRITERIA demo (§14): >=2 factories executing jobs +# in PARALLEL through ONE coordinator, proving the Phase-2 guarantees end-to-end: +# +# (a) NO DOUBLE-ASSIGN — each job is claimed/executed by exactly ONE factory. +# (b) FENCING + RECLAIM — kill a factory MID-JOB; the reaper returns its job; the OTHER +# factory reclaims + completes it; the dead worker's late/zombie +# report is FENCED (409, never shipped). +# (c) PARALLELISM — both factories make progress concurrently (not serialized). +# +# This is a DEMO HARNESS over the EXISTING runtime — it does NOT change agent-queue.sh or +# lib/fleet-client.sh; it starts two real `agent-queue.sh run` daemons (distinct factoryIds, +# separate queues/cwds) that compete ONLY through the coordinator, then observes + asserts. +# +# DUAL MODE: +# STUB (default / CI-safe): drives demo/coordinator-stub.sh — a stateful, lock-guarded +# file-backed coordinator. Zero external services. Used by selftest.sh. +# REAL : set AQ_FLEET_API + AQ_FLEET_TOKEN (and DEMO_MODE=real) to run against a live +# platform-service fleet coordinator. Submit + reaper-reclaim use its HTTP API. +# +# Usage: +# bash demo/two-factory-demo.sh # stub mode (default) +# DEMO_MODE=real AQ_FLEET_API=http://host:4003/api AQ_FLEET_TOKEN=... \ +# AQ_PRODUCT_ID=notelett bash demo/two-factory-demo.sh +# +# Env knobs: DEMO_JOB_SLEEP (per-job engine seconds, default 2), DEMO_TIMEOUT (drain +# seconds, default 60), DEMO_POLL (poll seconds, default 0.2), DEMO_KEEP=1 (keep temp). +# +# Exit 0 = all three guarantees PASS; non-zero = FAIL. bash 3.2+ (no assoc arrays); +# awk/sed/grep/pgrep only; mac+linux safe. + +set -uo pipefail + +HERE="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +AQ="$HERE/../agent-queue.sh" +STUB="$HERE/coordinator-stub.sh" + +DEMO_MODE="${DEMO_MODE:-stub}" +if [ -n "${AQ_FLEET_API:-}" ] && [ -n "${AQ_FLEET_TOKEN:-}" ] && [ "${DEMO_MODE}" != "stub" ]; then DEMO_MODE=real; fi +DEMO_JOB_SLEEP="${DEMO_JOB_SLEEP:-2}" +DEMO_TIMEOUT="${DEMO_TIMEOUT:-60}" +DEMO_POLL="${DEMO_POLL:-0.2}" +F1="${DEMO_FACTORY_1:-mac-1}" # victim (killed mid-job) +F2="${DEMO_FACTORY_2:-ubuntu-1}" # survivor (reclaims) + +c_b=$'\033[1m'; c_g=$'\033[32m'; c_r=$'\033[31m'; c_c=$'\033[36m'; c_0=$'\033[0m' +log() { printf '%s[demo]%s %s\n' "$c_c" "$c_0" "$*"; } +ok() { printf ' %s+%s %s\n' "$c_g" "$c_0" "$*"; } +bad() { printf ' %s- %s%s\n' "$c_r" "$*" "$c_0" >&2; } + +TMP="$(mktemp -d "${TMPDIR:-/tmp}/aq-2factory.XXXXXX")" +COORD_STATE="$TMP/coord"; export COORD_STATE +DAEMON_PIDS=() + +# kill a process AND its descendants (mac+linux; pgrep -P is portable) +kill_tree() { + local p=$1 c + for c in $(pgrep -P "$p" 2>/dev/null); do kill_tree "$c"; done + kill -9 "$p" 2>/dev/null || true +} +cleanup() { + local p + if [ "${#DAEMON_PIDS[@]}" -gt 0 ]; then + for p in "${DAEMON_PIDS[@]}"; do [ -n "$p" ] && kill_tree "$p"; done + fi + [ "${DEMO_KEEP:-0}" = "1" ] || rm -rf "$TMP" +} +trap cleanup EXIT INT TERM + +# In stub mode every coordinator HTTP call is routed to the stateful stub via the +# existing AQ_FLEET_API_CMD seam; in real mode it is unset so curl talks to the service. +if [ "$DEMO_MODE" = stub ]; then export AQ_FLEET_API_CMD="$STUB"; else unset AQ_FLEET_API_CMD 2>/dev/null || true; fi + +# ── coordinator primitives (mode-branched) ───────────────────────────────── +coord_init() { + if [ "$DEMO_MODE" = stub ]; then mkdir -p "$COORD_STATE/jobs"; : > "$COORD_STATE/order"; : > "$COORD_STATE/events.log"; fi +} +coord_submit() { # + if [ "$DEMO_MODE" = stub ]; then + printf '%s\n' "stage=queued" "holder=" "epoch=0" "body=$2" > "$COORD_STATE/jobs/$1.job" + printf '%s\n' "$1" >> "$COORD_STATE/order" + else + curl -sS -m 30 -X POST -H "Content-Type: application/json" \ + -H "Authorization: Bearer ${AQ_FLEET_TOKEN}" ${AQ_PRODUCT_ID:+-H "X-Product-Id: $AQ_PRODUCT_ID"} \ + --data "{\"idempotencyKey\":\"$1\",\"bodyMd\":\"$2\",\"priority\":\"medium\"}" \ + "${AQ_FLEET_API}${DEMO_SUBMIT_PATH:-/fleet/jobs}" >/dev/null 2>&1 || true + fi +} +coord_reap() { # : model the reaper reclaiming a dead factory's leases + if [ "$DEMO_MODE" = stub ]; then + "$STUB" POST /fleet/_reap "{\"factoryId\":\"$1\"}" >/dev/null 2>&1 || true + else + log "real mode: waiting ${DEMO_REAP_WAIT:-20}s for the coordinator reaper to reclaim $1's lease" + sleep "${DEMO_REAP_WAIT:-20}" + fi +} +coord_zombie_report() { # -> echoes the HTTP code (expect 409) + if [ "$DEMO_MODE" = stub ]; then + "$STUB" PATCH "/fleet/jobs/$1" "{\"stage\":\"building\",\"leaseEpoch\":$2}" | tail -n1 + else + curl -sS -m 30 -o /dev/null -w '%{http_code}' -X PATCH -H "Content-Type: application/json" \ + -H "Authorization: Bearer ${AQ_FLEET_TOKEN}" ${AQ_PRODUCT_ID:+-H "X-Product-Id: $AQ_PRODUCT_ID"} \ + --data "{\"stage\":\"building\",\"leaseEpoch\":$2}" "${AQ_FLEET_API}/fleet/jobs/$1" + fi +} +# stub-only state readers (assertions in stub mode read authoritative coordinator state) +jget() { grep -E "^$2=" "$COORD_STATE/jobs/$1.job" 2>/dev/null | head -1 | cut -d= -f2-; } +# emit (one per line) the factoryId of every factory currently holding an ACTIVE job +active_holders() { + local jf st ho + for jf in "$COORD_STATE"/jobs/*.job; do + [ -f "$jf" ] || continue + st=$(grep -E '^stage=' "$jf" | cut -d= -f2-); ho=$(grep -E '^holder=' "$jf" | cut -d= -f2-) + case "$st" in assigned|building|review|testing) [ -n "$ho" ] && printf '%s\n' "$ho";; esac + done +} + +# ── engine + factory launch ───────────────────────────────────────────────── +engine="$TMP/engine.sh" +printf '#!/usr/bin/env bash\n# demo engine: sleep then succeed (gives a window to kill mid-job)\nsleep %s\nexit 0\n' "$DEMO_JOB_SLEEP" > "$engine" +chmod +x "$engine" + +start_factory() { # + local fid=$1 root="$TMP/q-$1" work="$TMP/w-$1" + mkdir -p "$work" + AGENT_QUEUE_ROOT="$root" "$AQ" init >/dev/null 2>&1 + # Each factory: own queue + cwd, AQ_FLEET=1 ROUTE=1 (coordinator authoritative), + # MAX=1 so it holds one job at a time, fast poll. Competes ONLY via the coordinator + # (AQ_FLEET_API_CMD / AQ_FLEET_API inherited from the environment above). + AGENT_QUEUE_ROOT="$root" AGENT_QUEUE_MAX=1 AGENT_QUEUE_POLL=1 \ + AQ_FLEET=1 AQ_FLEET_ROUTE=1 AQ_FACTORY_ID="$fid" AQ_FLEET_CWD="$work" \ + AQ_FLEET_API="${AQ_FLEET_API:-http://stub.local/api}" \ + DEVIN_BIN="$engine" "$AQ" run >"$TMP/log-$1.txt" 2>&1 & + DAEMON_PIDS+=("$!") + disown 2>/dev/null || true # detach from job control so SIGKILL later prints no "Killed" notice + log "started factory $c_b$fid$c_0 (pid $!, queue q-$1)" +} + +# ════════════════════════════════════════════════════════════════════════════ +log "Phase-2 two-factory parallel demo — mode=$c_b$DEMO_MODE$c_0 (job-sleep=${DEMO_JOB_SLEEP}s)" +coord_init + +# 1) submit 3 jobs +for n in 1 2 3; do coord_submit "demo-job-$n" "two-factory demo job $n"; done +log "submitted 3 jobs to the coordinator" + +# 2) start two factories competing through the coordinator +start_factory "$F1" +start_factory "$F2" + +# 3) PARALLELISM: wait until BOTH factories simultaneously hold an active job, and the +# victim (F1) holds one we can kill mid-job. +PARALLELISM_OK=0; VICTIM_JOB=""; VICTIM_EPOCH="" +if [ "$DEMO_MODE" = stub ]; then + deadline=$(( $(date +%s) + 30 )) + while [ "$(date +%s)" -lt "$deadline" ]; do + holders=$(active_holders | sort -u | tr '\n' ' ') + if printf '%s' "$holders" | grep -qw "$F1" && printf '%s' "$holders" | grep -qw "$F2"; then + PARALLELISM_OK=1 + for jf in "$COORD_STATE"/jobs/*.job; do + [ -f "$jf" ] || continue + if [ "$(grep -E '^holder=' "$jf" | cut -d= -f2-)" = "$F1" ]; then + case "$(grep -E '^stage=' "$jf" | cut -d= -f2-)" in + assigned|building|review|testing) + VICTIM_JOB=$(basename "$jf" .job); VICTIM_EPOCH=$(jget "$VICTIM_JOB" epoch); break;; + esac + fi + done + [ -n "$VICTIM_JOB" ] && break + fi + sleep "$DEMO_POLL" + done +else + sleep "${DEMO_SETTLE:-5}"; PARALLELISM_OK=1; VICTIM_JOB="${DEMO_VICTIM_JOB:-demo-job-1}"; VICTIM_EPOCH="${DEMO_VICTIM_EPOCH:-1}" +fi +if [ "$PARALLELISM_OK" = 1 ]; then log "PARALLELISM observed: $F1 and $F2 both holding active jobs concurrently"; else log "WARN: did not observe simultaneous holders"; fi +log "victim=$c_b$F1$c_0 holds job $c_b${VICTIM_JOB:-?}$c_0 (epoch ${VICTIM_EPOCH:-?}) — killing it mid-job" + +# 4) KILL the victim factory mid-job (hard crash, no graceful drain) +victim_pid="${DAEMON_PIDS[0]}" +kill_tree "$victim_pid" +DAEMON_PIDS[0]="" +log "killed factory $F1 (pid $victim_pid)" + +# 5) RECLAIM: the reaper returns the victim's in-flight job to the queue (epoch bumped) +coord_reap "$F1" +log "reaper reclaimed $F1's lease(s)" + +# 6) FENCE the zombie: the dead worker's LATE report (stale epoch) must be rejected (409) +FENCE_OK=0 +if [ -n "$VICTIM_JOB" ] && [ -n "$VICTIM_EPOCH" ]; then + zcode=$(coord_zombie_report "$VICTIM_JOB" "$VICTIM_EPOCH") + if [ "$zcode" = 409 ]; then FENCE_OK=1; ok "zombie report for $VICTIM_JOB @epoch=$VICTIM_EPOCH was FENCED (HTTP 409)"; else bad "zombie report not fenced (HTTP $zcode)"; fi +fi + +# 7) DRAIN: the survivor (F2) finishes everything, including the reclaimed job +log "draining remaining work on the survivor ($F2)..." +DONE=0 +if [ "$DEMO_MODE" = stub ]; then + deadline=$(( $(date +%s) + DEMO_TIMEOUT )) + while [ "$(date +%s)" -lt "$deadline" ]; do + d=0 + for jf in "$COORD_STATE"/jobs/*.job; do + case "$(grep -E '^stage=' "$jf" | cut -d= -f2-)" in review|testing|shipped) d=$((d+1));; esac + done + [ "$d" -ge 3 ] && { DONE=1; break; } + sleep "$DEMO_POLL" + done +else + sleep "${DEMO_DRAIN_WAIT:-30}"; DONE=1 +fi + +# ── ASSERT the three guarantees (stub mode reads authoritative coordinator state) ── +echo +log "${c_b}RESULTS${c_0}" +PASS=1 +if [ "$DEMO_MODE" = stub ]; then + reviewed=0 + for jf in "$COORD_STATE"/jobs/*.job; do + jid=$(basename "$jf" .job); st=$(jget "$jid" stage); ho=$(jget "$jid" holder) + case "$st" in + review|testing|shipped) reviewed=$((reviewed+1)); printf ' job %-12s -> %s (stage=%s)\n' "$jid" "$ho" "$st";; + *) printf ' job %-12s -> INCOMPLETE (stage=%s)\n' "$jid" "$st";; + esac + done + claims=$(grep -c ' CLAIM ' "$COORD_STATE/events.log" 2>/dev/null || echo 0) + distinct_claimers=$(grep ' CLAIM ' "$COORD_STATE/events.log" 2>/dev/null | sed -n 's/.*factory=\([^ ]*\).*/\1/p' | sort -u | tr '\n' ' ') + reclaims=$(grep -c ' RECLAIM ' "$COORD_STATE/events.log" 2>/dev/null || echo 0) + fences=$(grep -c ' FENCE ' "$COORD_STATE/events.log" 2>/dev/null || echo 0) + victim_winner=$(jget "${VICTIM_JOB:-_none_}" holder) + + if [ "$reviewed" -eq 3 ]; then ok "(a) no double-assign: all 3 jobs executed to terminal, one winner each"; else bad "(a) only $reviewed/3 jobs reached terminal"; PASS=0; fi + if [ -n "$VICTIM_JOB" ] && [ "$victim_winner" = "$F2" ]; then ok " reclaimed job $VICTIM_JOB completed by survivor $F2 (not the killed $F1)"; elif [ -n "$VICTIM_JOB" ]; then bad " reclaimed job $VICTIM_JOB winner='$victim_winner' (expected $F2)"; PASS=0; fi + if [ "$reclaims" -ge 1 ]; then ok "(b) reclaim: $reclaims RECLAIM event(s) (reaper returned the dead factory's job)"; else bad "(b) no RECLAIM event"; PASS=0; fi + if [ "$FENCE_OK" = 1 ] && [ "$fences" -ge 1 ]; then ok "(b) fencing: zombie report rejected (409); $fences FENCE event(s)"; else bad "(b) zombie was not fenced (fence_ok=$FENCE_OK events=$fences)"; PASS=0; fi + if [ "$PARALLELISM_OK" = 1 ] && printf '%s' "$distinct_claimers" | grep -qw "$F1" && printf '%s' "$distinct_claimers" | grep -qw "$F2"; then ok "(c) parallelism: both factories claimed concurrently (claimers: ${distinct_claimers}; $claims claims)"; else bad "(c) parallelism not observed (claimers: ${distinct_claimers})"; PASS=0; fi +else + if [ "$DONE" = 1 ]; then ok "real mode: drain window elapsed — inspect the coordinator + factory logs in $TMP"; fi + ok "real mode is best-effort/observational; the asserted guarantees are validated in stub mode (and selftest)." +fi + +echo +if [ "$PASS" = 1 ]; then + printf '%s[demo] PASS%s — Phase-2 exit guarantees demonstrated (no double-assign + reclaim/fence + parallelism)\n' "$c_g" "$c_0"; exit 0 +else + printf '%s[demo] FAIL%s\n' "$c_r" "$c_0"; exit 1 +fi diff --git a/agent-queue/docs/GIGAFACTORY_ROADMAP.md b/agent-queue/docs/GIGAFACTORY_ROADMAP.md index 084adb4..fcd01d7 100644 --- a/agent-queue/docs/GIGAFACTORY_ROADMAP.md +++ b/agent-queue/docs/GIGAFACTORY_ROADMAP.md @@ -12,7 +12,7 @@ | ----- | ----- | ------ | - | ---- | | **0** | Baseline (today) | ✅ shipped | 100% | `selftest.sh` green | | **1** | Manifest + profiles + capabilities + tracker adapter (single host) | ◐ in progress | 95% | adapter e2e + selftest | -| **2** | Coordinator as platform-service module + Cosmos + multi-factory leasing | ◐ in progress | 55% | fleet e2e + module tests | +| **2** | Coordinator as platform-service module + Cosmos + multi-factory leasing | ◐ in progress | 80% | fleet e2e + module tests | | **3** | Fleet control plane in tracker-web + DAG deps + budgets + scoring router | ☐ not started | 0% | web e2e + router tests | | **4** | Message bus + autoscaling + cross-OS capability marketplace | ☐ not started | 0% | load/chaos suite | | **5** | Self-optimizing / learned routing | ☐ not started | 0% | offline eval + A/B | @@ -386,8 +386,8 @@ Each phase: **Goal → checklist → Exit criteria**. Don't start a phase until - [ ] Auth: factory enrollment + scoped rotatable tokens; secret isolation enforced (§12 subset). - [x] **Feature flags** (`fleet.enabled`, `fleet.route_via_service`) + **shadow/dual-run** vs P1 before cutover (§21). *(agent-queue runner: `AQ_FLEET` / `AQ_FLEET_ROUTE` / `AQ_FLEET_SHADOW` with documented precedence; shadow claim/compare/report is side-effect-free (isolated `-shadow` factoryId + dryRun, never materializes/ships); `fleet-shadow-report` summarizes AGREE/DIVERGE/COORD_EMPTY/LOCAL_EMPTY + agreement; 60→68 selftest checks.)* - [x] Module test suite (repository + routes via `@bytelyst/testing`); **atomic-claim race**, crash-recovery, fencing-rejection, reaper-reclaim tests. *(PR #28 + #29: 53 fleet + 48 datastore tests, incl. true-concurrency claim.)* -- [ ] Two-factory demo (e.g. mac + ubuntu) running 3 parallel jobs end-to-end. -- **Exit criteria:** all boxes ✅; `pnpm --filter @lysnrai/platform-service test` green; killing a factory mid-job → another reclaims and completes **and the dead worker's late report is fenced**; concurrent claimers never double-assign; all state in Cosmos with `productId`; **flag-off rollback verified** (§21). +- [x] Two-factory demo (e.g. mac + ubuntu) running 3 parallel jobs end-to-end. *(`agent-queue/demo/two-factory-demo.sh` + `coordinator-stub.sh`: two real `run` daemons (mac-1 + ubuntu-1, separate queues/cwds) compete through one coordinator; asserts (a) no double-assign, (b) kill-mid-job → reaper reclaim → survivor completes → zombie report fenced (409), (c) concurrent parallelism. Dual-mode: CI-safe stateful stub by default, live platform-service when `AQ_FLEET_API`/`AQ_FLEET_TOKEN` set. Headless checks in `selftest.sh` → 68→71 green.)* +- **Exit criteria:** all boxes ✅; `pnpm --filter @lysnrai/platform-service test` green; killing a factory mid-job → another reclaims and completes **and the dead worker's late report is fenced**; concurrent claimers never double-assign; all state in Cosmos with `productId`; **flag-off rollback verified** (§21). — _Runtime exit guarantees **demonstrated** by the two-factory demo (no double-assign + reclaim/fence + parallelism) and flag-off rollback verified (§21). **Remaining for 100%:** scheduler/router core wired into assignment (common-plat PR #31, open), tracker adapter direct call, and factory enrollment + scoped tokens._ ### Phase 3 — Fleet control plane in tracker-web + DAG + budgets + scoring router **Goal:** one browser control plane; smart routing + budgets live. diff --git a/agent-queue/selftest.sh b/agent-queue/selftest.sh index eb6041a..a453c84 100755 --- a/agent-queue/selftest.sh +++ b/agent-queue/selftest.sh @@ -988,4 +988,38 @@ else fi unset AQ_FLEET_API_CMD AQ_FLEET_SHADOW_LOG AGENT_QUEUE_ROOT +# ───────────────────────────────────────────────────────────────────── +# Phase 2 — two-factory parallel demo (EXIT CRITERIA, §14). Runs the demo +# HEADLESS in STUB mode (its own stateful coordinator stub + two real factory +# daemons) and asserts the three exit guarantees. Self-contained: the demo owns +# its temp dirs/daemons and cleans them up; no live service. +# ───────────────────────────────────────────────────────────────────── +demo_sh="$HERE/demo/two-factory-demo.sh" +demo_out="$tmp/two-factory.out" +if DEMO_MODE=stub AQ_FLEET_API= AQ_FLEET_TOKEN= DEMO_JOB_SLEEP=2 DEMO_TIMEOUT=45 bash "$demo_sh" >"$demo_out" 2>&1; then demo_rc=0; else demo_rc=$?; fi + +# 49. demo is green end-to-end in stub mode (exit 0 + overall PASS) +if [ "$demo_rc" -eq 0 ] && grep -q '\[demo\] PASS' "$demo_out"; then + pass "two-factory demo: stub-mode run is green (exit 0, all guarantees PASS)" +else + cat "$demo_out" >&2; fail "two-factory demo did not pass (rc=$demo_rc)" +fi + +# 50. no double-assign: 3 jobs reach terminal across 2 factories (one winner each) + parallel +if grep -q '(a) no double-assign: all 3 jobs executed to terminal' "$demo_out" \ + && grep -q '(c) parallelism: both factories claimed concurrently' "$demo_out"; then + pass "two-factory demo: 3 jobs terminal across 2 factories, no double-assignment, ran in parallel" +else + cat "$demo_out" >&2; fail "two-factory demo: no-double-assign / parallelism assertion missing" +fi + +# 51. kill -> reaper reclaim -> survivor completes -> dead worker's zombie report fenced (409) +if grep -q '(b) reclaim: .* RECLAIM event' "$demo_out" \ + && grep -q 'was FENCED (HTTP 409)' "$demo_out" \ + && grep -q '(b) fencing: zombie report rejected (409)' "$demo_out"; then + pass "two-factory demo: kill -> reclaim -> completed by survivor -> zombie report FENCED (409)" +else + cat "$demo_out" >&2; fail "two-factory demo: kill/reclaim/fenced-zombie path did not fire" +fi + echo "self-test PASS"