#!/usr/bin/env bash # # coordinator-stub.sh — a STATEFUL, concurrency-safe fleet-coordinator stub for the # two-factory demo + its selftest. It is the same "AQ_FLEET_API_CMD responder" pattern # the existing fleet selftests use (invoked as ` `, prints the # response body then a final HTTP-code line), EXTENDED with file-backed shared state + # an mkdir lock so >=2 competing factory processes coordinate through ONE coordinator — # exactly modeling platform-service's claim / lease / fence / reaper contract # (../../learning_ai_common_plat/services/platform-service/src/modules/fleet/coordinator.ts). # # It is curl-free + dependency-free (bash + POSIX awk/sed/grep) so the demo runs in CI # with zero external services. Real-coordinator mode bypasses this entirely (the demo # talks to platform-service over HTTP when AQ_FLEET_API/AQ_FLEET_TOKEN are set). # # Contract implemented (paths under the caller's AQ_FLEET_API base, which includes /api): # POST /fleet/factories/heartbeat -> {"ok":true} 200 # POST /fleet/claim -> {"claimed":true,"job":{id,bodyMd,leaseEpoch},"lease":{leaseEpoch}} | {"claimed":false} # PATCH /fleet/jobs/:id -> 200 | 409 (stale leaseEpoch => FENCED) # POST /fleet/jobs/:id/lease/renew -> 200 | 409 (fenced) # POST /fleet/jobs/:id/lease/release -> 200 # POST /fleet/_reap -> {"reaped":N} 200 (DEMO-only admin: models the # coordinator reaper reclaiming a dead factory's # leases — returns its in-flight jobs to `queued` # and BUMPS the epoch so the zombie is fenced) # # Atomicity: every state mutation runs inside an mkdir spin-lock, so under true # concurrency EXACTLY ONE claimer wins a given job version (no double-assign), and a # report carrying an epoch older than the stored epoch is rejected (409) — the same # guarantees the real rev/_etag compare-and-swap provides. # # State (under $COORD_STATE, set by the demo): # order submit-ordered job ids (one per line) # jobs/.job key=val lines: stage, holder, epoch, body # events.log append-only audit: " job= factory= epoch=" # lock/ the mkdir lock dir # # Stages: queued -> assigned -> building -> review|testing -> shipped (terminal); # failed/dead_letter terminal. Reclaimable (active) = assigned|building|review|testing. set -uo pipefail METHOD="${1:-}"; RPATH="${2:-}"; BODY="${3:-}" : "${COORD_STATE:?coordinator-stub.sh requires COORD_STATE}" JOBS_DIR="$COORD_STATE/jobs" EVENTS="$COORD_STATE/events.log" LOCK="$COORD_STATE/lock" # ── JSON field extraction (no jq) ─────────────────────────────────────────── _str_field() { printf '%s' "$BODY" | sed -n 's/.*"'"$1"'"[[:space:]]*:[[:space:]]*"\([^"]*\)".*/\1/p' | head -1; } _num_field() { printf '%s' "$BODY" | grep -oE "\"$1\"[[:space:]]*:[[:space:]]*-?[0-9]+" | grep -oE -- '-?[0-9]+$' | head -1; } # job id from /fleet/jobs/ or /fleet/jobs//lease/ _job_id_from_path() { printf '%s' "$RPATH" | sed -e 's#^/fleet/jobs/##' -e 's#/lease/.*$##'; } # ── lock (mkdir is atomic on POSIX filesystems) ───────────────────────────── _lock() { local n=0; until mkdir "$LOCK" 2>/dev/null; do sleep 0.02; n=$((n+1)); [ "$n" -gt 5000 ] && break; done; } _unlock() { rmdir "$LOCK" 2>/dev/null || true; } _jobfile() { printf '%s/%s.job\n' "$JOBS_DIR" "$1"; } _get() { grep -E "^$2=" "$1" 2>/dev/null | head -1 | cut -d= -f2-; } _set() { # : replace or append key=val local f=$1 k=$2 v=$3 tmp; tmp="$f.tmp.$$" if grep -qE "^$k=" "$f" 2>/dev/null; then sed "s#^$k=.*#$k=$v#" "$f" > "$tmp" && mv "$tmp" "$f" else printf '%s=%s\n' "$k" "$v" >> "$f" fi } _event() { printf '%s %s\n' "$(date +%s)" "$*" >> "$EVENTS"; } _is_active() { case "$1" in assigned|building|review|testing) return 0;; *) return 1;; esac; } _emit() { printf '%s\n%s\n' "$1" "$2"; } # case "$METHOD $RPATH" in "POST /fleet/factories/heartbeat") _emit '{"ok":true}' 200 ;; "POST /fleet/claim") factory=$(_str_field factoryId) _lock claimed_id="" if [ -f "$COORD_STATE/order" ]; then while IFS= read -r jid; do [ -n "$jid" ] || continue jf=$(_jobfile "$jid") [ -f "$jf" ] || continue if [ "$(_get "$jf" stage)" = "queued" ]; then claimed_id="$jid"; break; fi done < "$COORD_STATE/order" fi if [ -n "$claimed_id" ]; then jf=$(_jobfile "$claimed_id") epoch=$(( $(_get "$jf" epoch) + 1 )) _set "$jf" stage assigned; _set "$jf" holder "$factory"; _set "$jf" epoch "$epoch" body=$(_get "$jf" body) _event "CLAIM job=$claimed_id factory=$factory epoch=$epoch" _unlock _emit "{\"claimed\":true,\"job\":{\"id\":\"$claimed_id\",\"bodyMd\":\"$body\",\"leaseEpoch\":$epoch},\"lease\":{\"leaseEpoch\":$epoch}}" 200 else _unlock _emit '{"claimed":false}' 200 fi ;; PATCH\ /fleet/jobs/*) jid=$(_job_id_from_path); stage=$(_str_field stage); rep_epoch=$(_num_field leaseEpoch) jf=$(_jobfile "$jid") _lock if [ ! -f "$jf" ]; then _unlock; _emit '{}' 404 else cur_epoch=$(_get "$jf" epoch) if [ -n "$rep_epoch" ] && [ "$rep_epoch" -lt "$cur_epoch" ]; then _event "FENCE job=$jid factory=$(_get "$jf" holder) epoch=$rep_epoch fenced (zombie rejected) else [ -n "$stage" ] && _set "$jf" stage "$stage" _event "PATCH:$stage job=$jid factory=$(_get "$jf" holder) epoch=$rep_epoch" _unlock; _emit '{}' 200 fi fi ;; POST\ /fleet/jobs/*/lease/renew) jid=$(_job_id_from_path); rep_epoch=$(_num_field leaseEpoch); jf=$(_jobfile "$jid") _lock cur_epoch=$(_get "$jf" epoch 2>/dev/null) if [ -n "$rep_epoch" ] && [ -n "$cur_epoch" ] && [ "$rep_epoch" -lt "$cur_epoch" ]; then _event "RENEW_FENCE job=$jid epoch=$rep_epoch the dead worker's old epoch is now stale (fenced) _set "$jf" stage queued; _set "$jf" holder ""; _set "$jf" epoch "$epoch" _event "RECLAIM job=$jid factory=$factory epoch=$epoch" n=$((n+1)) fi done _unlock _emit "{\"reaped\":$n}" 200 ;; *) _emit '{}' 200 ;; esac