bytelyst-devops-tools/agent-queue/demo/two-factory-demo.sh
saravanakumardb1 0cde7def6a feat(agent-queue): two-factory parallel demo — Phase 2 exit criteria (§14)
Close the final Phase-2 exit-criteria box: >=2 factories executing jobs in parallel
through one coordinator, proving the concurrency guarantees end-to-end. This is a
DEMO HARNESS over the existing runtime — agent-queue.sh and lib/fleet-client.sh are
unchanged (read + called, not modified).

demo/two-factory-demo.sh: starts two real `agent-queue.sh run` daemons (mac-1 +
ubuntu-1, separate queues/cwds) that compete ONLY through the coordinator, then
asserts: (a) no double-assign — each of 3 jobs executed by exactly one factory;
(b) fencing + reclaim — kill a factory mid-job, the reaper returns its job, the
survivor reclaims + completes it, and the dead worker's late/zombie report (stale
leaseEpoch) is FENCED (HTTP 409, never shipped); (c) parallelism — both factories
hold active jobs concurrently. Dual-mode: CI-safe stateful stub by default; live
platform-service when AQ_FLEET_API/AQ_FLEET_TOKEN set.

demo/coordinator-stub.sh: stateful, mkdir-lock-guarded, file-backed coordinator
implementing claim/lease/fence/renew/release + reaper-reclaim via the existing
AQ_FLEET_API_CMD seam — the selftest stub pattern extended with shared state so
>=2 processes coordinate through one coordinator.

demo/README.md: stub + real invocations, env knobs, what each guarantee proves,
what-to-watch guide.

selftest.sh: +3 headless stub-mode checks (existing 68 unchanged byte-for-byte ->
71 total green).

docs/GIGAFACTORY_ROADMAP.md: tick the §14 two-factory-demo box; annotate Phase-2
exit criteria; bump §0 Phase 2 to 80% (remaining: scheduler-core wiring [common-plat
PR #31], tracker-direct call, factory enrollment).

bash 3.2 + awk/sed/grep/pgrep only; mac+linux safe; no new runtime deps.

Generated with [Devin](https://cli.devin.ai/docs)

Co-Authored-By: Devin <158243242+devin-ai-integration[bot]@users.noreply.github.com>
2026-05-30 01:53:36 -07:00

249 lines
12 KiB
Bash
Executable File

#!/usr/bin/env bash
#
# two-factory-demo.sh — Phase-2 EXIT-CRITERIA demo (§14): >=2 factories executing jobs
# in PARALLEL through ONE coordinator, proving the Phase-2 guarantees end-to-end:
#
# (a) NO DOUBLE-ASSIGN — each job is claimed/executed by exactly ONE factory.
# (b) FENCING + RECLAIM — kill a factory MID-JOB; the reaper returns its job; the OTHER
# factory reclaims + completes it; the dead worker's late/zombie
# report is FENCED (409, never shipped).
# (c) PARALLELISM — both factories make progress concurrently (not serialized).
#
# This is a DEMO HARNESS over the EXISTING runtime — it does NOT change agent-queue.sh or
# lib/fleet-client.sh; it starts two real `agent-queue.sh run` daemons (distinct factoryIds,
# separate queues/cwds) that compete ONLY through the coordinator, then observes + asserts.
#
# DUAL MODE:
# STUB (default / CI-safe): drives demo/coordinator-stub.sh — a stateful, lock-guarded
# file-backed coordinator. Zero external services. Used by selftest.sh.
# REAL : set AQ_FLEET_API + AQ_FLEET_TOKEN (and DEMO_MODE=real) to run against a live
# platform-service fleet coordinator. Submit + reaper-reclaim use its HTTP API.
#
# Usage:
# bash demo/two-factory-demo.sh # stub mode (default)
# DEMO_MODE=real AQ_FLEET_API=http://host:4003/api AQ_FLEET_TOKEN=... \
# AQ_PRODUCT_ID=notelett bash demo/two-factory-demo.sh
#
# Env knobs: DEMO_JOB_SLEEP (per-job engine seconds, default 2), DEMO_TIMEOUT (drain
# seconds, default 60), DEMO_POLL (poll seconds, default 0.2), DEMO_KEEP=1 (keep temp).
#
# Exit 0 = all three guarantees PASS; non-zero = FAIL. bash 3.2+ (no assoc arrays);
# awk/sed/grep/pgrep only; mac+linux safe.
set -uo pipefail
HERE="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
AQ="$HERE/../agent-queue.sh"
STUB="$HERE/coordinator-stub.sh"
DEMO_MODE="${DEMO_MODE:-stub}"
if [ -n "${AQ_FLEET_API:-}" ] && [ -n "${AQ_FLEET_TOKEN:-}" ] && [ "${DEMO_MODE}" != "stub" ]; then DEMO_MODE=real; fi
DEMO_JOB_SLEEP="${DEMO_JOB_SLEEP:-2}"
DEMO_TIMEOUT="${DEMO_TIMEOUT:-60}"
DEMO_POLL="${DEMO_POLL:-0.2}"
F1="${DEMO_FACTORY_1:-mac-1}" # victim (killed mid-job)
F2="${DEMO_FACTORY_2:-ubuntu-1}" # survivor (reclaims)
c_b=$'\033[1m'; c_g=$'\033[32m'; c_r=$'\033[31m'; c_c=$'\033[36m'; c_0=$'\033[0m'
log() { printf '%s[demo]%s %s\n' "$c_c" "$c_0" "$*"; }
ok() { printf ' %s+%s %s\n' "$c_g" "$c_0" "$*"; }
bad() { printf ' %s- %s%s\n' "$c_r" "$*" "$c_0" >&2; }
TMP="$(mktemp -d "${TMPDIR:-/tmp}/aq-2factory.XXXXXX")"
COORD_STATE="$TMP/coord"; export COORD_STATE
DAEMON_PIDS=()
# kill a process AND its descendants (mac+linux; pgrep -P is portable)
kill_tree() {
local p=$1 c
for c in $(pgrep -P "$p" 2>/dev/null); do kill_tree "$c"; done
kill -9 "$p" 2>/dev/null || true
}
cleanup() {
local p
if [ "${#DAEMON_PIDS[@]}" -gt 0 ]; then
for p in "${DAEMON_PIDS[@]}"; do [ -n "$p" ] && kill_tree "$p"; done
fi
[ "${DEMO_KEEP:-0}" = "1" ] || rm -rf "$TMP"
}
trap cleanup EXIT INT TERM
# In stub mode every coordinator HTTP call is routed to the stateful stub via the
# existing AQ_FLEET_API_CMD seam; in real mode it is unset so curl talks to the service.
if [ "$DEMO_MODE" = stub ]; then export AQ_FLEET_API_CMD="$STUB"; else unset AQ_FLEET_API_CMD 2>/dev/null || true; fi
# ── coordinator primitives (mode-branched) ─────────────────────────────────
coord_init() {
if [ "$DEMO_MODE" = stub ]; then mkdir -p "$COORD_STATE/jobs"; : > "$COORD_STATE/order"; : > "$COORD_STATE/events.log"; fi
}
coord_submit() { # <jobid> <bodyMd>
if [ "$DEMO_MODE" = stub ]; then
printf '%s\n' "stage=queued" "holder=" "epoch=0" "body=$2" > "$COORD_STATE/jobs/$1.job"
printf '%s\n' "$1" >> "$COORD_STATE/order"
else
curl -sS -m 30 -X POST -H "Content-Type: application/json" \
-H "Authorization: Bearer ${AQ_FLEET_TOKEN}" ${AQ_PRODUCT_ID:+-H "X-Product-Id: $AQ_PRODUCT_ID"} \
--data "{\"idempotencyKey\":\"$1\",\"bodyMd\":\"$2\",\"priority\":\"medium\"}" \
"${AQ_FLEET_API}${DEMO_SUBMIT_PATH:-/fleet/jobs}" >/dev/null 2>&1 || true
fi
}
coord_reap() { # <factoryId> : model the reaper reclaiming a dead factory's leases
if [ "$DEMO_MODE" = stub ]; then
"$STUB" POST /fleet/_reap "{\"factoryId\":\"$1\"}" >/dev/null 2>&1 || true
else
log "real mode: waiting ${DEMO_REAP_WAIT:-20}s for the coordinator reaper to reclaim $1's lease"
sleep "${DEMO_REAP_WAIT:-20}"
fi
}
coord_zombie_report() { # <jobid> <staleEpoch> -> echoes the HTTP code (expect 409)
if [ "$DEMO_MODE" = stub ]; then
"$STUB" PATCH "/fleet/jobs/$1" "{\"stage\":\"building\",\"leaseEpoch\":$2}" | tail -n1
else
curl -sS -m 30 -o /dev/null -w '%{http_code}' -X PATCH -H "Content-Type: application/json" \
-H "Authorization: Bearer ${AQ_FLEET_TOKEN}" ${AQ_PRODUCT_ID:+-H "X-Product-Id: $AQ_PRODUCT_ID"} \
--data "{\"stage\":\"building\",\"leaseEpoch\":$2}" "${AQ_FLEET_API}/fleet/jobs/$1"
fi
}
# stub-only state readers (assertions in stub mode read authoritative coordinator state)
jget() { grep -E "^$2=" "$COORD_STATE/jobs/$1.job" 2>/dev/null | head -1 | cut -d= -f2-; }
# emit (one per line) the factoryId of every factory currently holding an ACTIVE job
active_holders() {
local jf st ho
for jf in "$COORD_STATE"/jobs/*.job; do
[ -f "$jf" ] || continue
st=$(grep -E '^stage=' "$jf" | cut -d= -f2-); ho=$(grep -E '^holder=' "$jf" | cut -d= -f2-)
case "$st" in assigned|building|review|testing) [ -n "$ho" ] && printf '%s\n' "$ho";; esac
done
}
# ── engine + factory launch ─────────────────────────────────────────────────
engine="$TMP/engine.sh"
printf '#!/usr/bin/env bash\n# demo engine: sleep then succeed (gives a window to kill mid-job)\nsleep %s\nexit 0\n' "$DEMO_JOB_SLEEP" > "$engine"
chmod +x "$engine"
start_factory() { # <factoryId>
local fid=$1 root="$TMP/q-$1" work="$TMP/w-$1"
mkdir -p "$work"
AGENT_QUEUE_ROOT="$root" "$AQ" init >/dev/null 2>&1
# Each factory: own queue + cwd, AQ_FLEET=1 ROUTE=1 (coordinator authoritative),
# MAX=1 so it holds one job at a time, fast poll. Competes ONLY via the coordinator
# (AQ_FLEET_API_CMD / AQ_FLEET_API inherited from the environment above).
AGENT_QUEUE_ROOT="$root" AGENT_QUEUE_MAX=1 AGENT_QUEUE_POLL=1 \
AQ_FLEET=1 AQ_FLEET_ROUTE=1 AQ_FACTORY_ID="$fid" AQ_FLEET_CWD="$work" \
AQ_FLEET_API="${AQ_FLEET_API:-http://stub.local/api}" \
DEVIN_BIN="$engine" "$AQ" run >"$TMP/log-$1.txt" 2>&1 &
DAEMON_PIDS+=("$!")
disown 2>/dev/null || true # detach from job control so SIGKILL later prints no "Killed" notice
log "started factory $c_b$fid$c_0 (pid $!, queue q-$1)"
}
# ════════════════════════════════════════════════════════════════════════════
log "Phase-2 two-factory parallel demo — mode=$c_b$DEMO_MODE$c_0 (job-sleep=${DEMO_JOB_SLEEP}s)"
coord_init
# 1) submit 3 jobs
for n in 1 2 3; do coord_submit "demo-job-$n" "two-factory demo job $n"; done
log "submitted 3 jobs to the coordinator"
# 2) start two factories competing through the coordinator
start_factory "$F1"
start_factory "$F2"
# 3) PARALLELISM: wait until BOTH factories simultaneously hold an active job, and the
# victim (F1) holds one we can kill mid-job.
PARALLELISM_OK=0; VICTIM_JOB=""; VICTIM_EPOCH=""
if [ "$DEMO_MODE" = stub ]; then
deadline=$(( $(date +%s) + 30 ))
while [ "$(date +%s)" -lt "$deadline" ]; do
holders=$(active_holders | sort -u | tr '\n' ' ')
if printf '%s' "$holders" | grep -qw "$F1" && printf '%s' "$holders" | grep -qw "$F2"; then
PARALLELISM_OK=1
for jf in "$COORD_STATE"/jobs/*.job; do
[ -f "$jf" ] || continue
if [ "$(grep -E '^holder=' "$jf" | cut -d= -f2-)" = "$F1" ]; then
case "$(grep -E '^stage=' "$jf" | cut -d= -f2-)" in
assigned|building|review|testing)
VICTIM_JOB=$(basename "$jf" .job); VICTIM_EPOCH=$(jget "$VICTIM_JOB" epoch); break;;
esac
fi
done
[ -n "$VICTIM_JOB" ] && break
fi
sleep "$DEMO_POLL"
done
else
sleep "${DEMO_SETTLE:-5}"; PARALLELISM_OK=1; VICTIM_JOB="${DEMO_VICTIM_JOB:-demo-job-1}"; VICTIM_EPOCH="${DEMO_VICTIM_EPOCH:-1}"
fi
if [ "$PARALLELISM_OK" = 1 ]; then log "PARALLELISM observed: $F1 and $F2 both holding active jobs concurrently"; else log "WARN: did not observe simultaneous holders"; fi
log "victim=$c_b$F1$c_0 holds job $c_b${VICTIM_JOB:-?}$c_0 (epoch ${VICTIM_EPOCH:-?}) — killing it mid-job"
# 4) KILL the victim factory mid-job (hard crash, no graceful drain)
victim_pid="${DAEMON_PIDS[0]}"
kill_tree "$victim_pid"
DAEMON_PIDS[0]=""
log "killed factory $F1 (pid $victim_pid)"
# 5) RECLAIM: the reaper returns the victim's in-flight job to the queue (epoch bumped)
coord_reap "$F1"
log "reaper reclaimed $F1's lease(s)"
# 6) FENCE the zombie: the dead worker's LATE report (stale epoch) must be rejected (409)
FENCE_OK=0
if [ -n "$VICTIM_JOB" ] && [ -n "$VICTIM_EPOCH" ]; then
zcode=$(coord_zombie_report "$VICTIM_JOB" "$VICTIM_EPOCH")
if [ "$zcode" = 409 ]; then FENCE_OK=1; ok "zombie report for $VICTIM_JOB @epoch=$VICTIM_EPOCH was FENCED (HTTP 409)"; else bad "zombie report not fenced (HTTP $zcode)"; fi
fi
# 7) DRAIN: the survivor (F2) finishes everything, including the reclaimed job
log "draining remaining work on the survivor ($F2)..."
DONE=0
if [ "$DEMO_MODE" = stub ]; then
deadline=$(( $(date +%s) + DEMO_TIMEOUT ))
while [ "$(date +%s)" -lt "$deadline" ]; do
d=0
for jf in "$COORD_STATE"/jobs/*.job; do
case "$(grep -E '^stage=' "$jf" | cut -d= -f2-)" in review|testing|shipped) d=$((d+1));; esac
done
[ "$d" -ge 3 ] && { DONE=1; break; }
sleep "$DEMO_POLL"
done
else
sleep "${DEMO_DRAIN_WAIT:-30}"; DONE=1
fi
# ── ASSERT the three guarantees (stub mode reads authoritative coordinator state) ──
echo
log "${c_b}RESULTS${c_0}"
PASS=1
if [ "$DEMO_MODE" = stub ]; then
reviewed=0
for jf in "$COORD_STATE"/jobs/*.job; do
jid=$(basename "$jf" .job); st=$(jget "$jid" stage); ho=$(jget "$jid" holder)
case "$st" in
review|testing|shipped) reviewed=$((reviewed+1)); printf ' job %-12s -> %s (stage=%s)\n' "$jid" "$ho" "$st";;
*) printf ' job %-12s -> INCOMPLETE (stage=%s)\n' "$jid" "$st";;
esac
done
claims=$(grep -c ' CLAIM ' "$COORD_STATE/events.log" 2>/dev/null || echo 0)
distinct_claimers=$(grep ' CLAIM ' "$COORD_STATE/events.log" 2>/dev/null | sed -n 's/.*factory=\([^ ]*\).*/\1/p' | sort -u | tr '\n' ' ')
reclaims=$(grep -c ' RECLAIM ' "$COORD_STATE/events.log" 2>/dev/null || echo 0)
fences=$(grep -c ' FENCE ' "$COORD_STATE/events.log" 2>/dev/null || echo 0)
victim_winner=$(jget "${VICTIM_JOB:-_none_}" holder)
if [ "$reviewed" -eq 3 ]; then ok "(a) no double-assign: all 3 jobs executed to terminal, one winner each"; else bad "(a) only $reviewed/3 jobs reached terminal"; PASS=0; fi
if [ -n "$VICTIM_JOB" ] && [ "$victim_winner" = "$F2" ]; then ok " reclaimed job $VICTIM_JOB completed by survivor $F2 (not the killed $F1)"; elif [ -n "$VICTIM_JOB" ]; then bad " reclaimed job $VICTIM_JOB winner='$victim_winner' (expected $F2)"; PASS=0; fi
if [ "$reclaims" -ge 1 ]; then ok "(b) reclaim: $reclaims RECLAIM event(s) (reaper returned the dead factory's job)"; else bad "(b) no RECLAIM event"; PASS=0; fi
if [ "$FENCE_OK" = 1 ] && [ "$fences" -ge 1 ]; then ok "(b) fencing: zombie report rejected (409); $fences FENCE event(s)"; else bad "(b) zombie was not fenced (fence_ok=$FENCE_OK events=$fences)"; PASS=0; fi
if [ "$PARALLELISM_OK" = 1 ] && printf '%s' "$distinct_claimers" | grep -qw "$F1" && printf '%s' "$distinct_claimers" | grep -qw "$F2"; then ok "(c) parallelism: both factories claimed concurrently (claimers: ${distinct_claimers}; $claims claims)"; else bad "(c) parallelism not observed (claimers: ${distinct_claimers})"; PASS=0; fi
else
if [ "$DONE" = 1 ]; then ok "real mode: drain window elapsed — inspect the coordinator + factory logs in $TMP"; fi
ok "real mode is best-effort/observational; the asserted guarantees are validated in stub mode (and selftest)."
fi
echo
if [ "$PASS" = 1 ]; then
printf '%s[demo] PASS%s — Phase-2 exit guarantees demonstrated (no double-assign + reclaim/fence + parallelism)\n' "$c_g" "$c_0"; exit 0
else
printf '%s[demo] FAIL%s\n' "$c_r" "$c_0"; exit 1
fi