diff --git a/agent-queue/agent-queue.sh b/agent-queue/agent-queue.sh index 4a26bfa..a583558 100755 --- a/agent-queue/agent-queue.sh +++ b/agent-queue/agent-queue.sh @@ -1658,8 +1658,14 @@ cmd_run() { # ROUTE flag (§16): only SOURCE work from the coordinator when route_via_service # is on (ROUTE=1, the default). With ROUTE=0 the LOCAL inbox is authoritative # (pre-cutover / shadow) — the coordinator is never used to source work. - if fleet_route_enabled && [[ "$(active_workers)" -lt "$MAX_CONCURRENCY" ]]; then - fleet_claim >/dev/null 2>&1 || true + # §M0 RU gate: when AQ_FLEET_GATE=1, skip the (expensive) claim while the + # cheap per-product queue version is unchanged and we are not mid-drain. + # Gate off (default) -> fleet_gate_should_claim always true == prior behavior. + if fleet_route_enabled && [[ "$(active_workers)" -lt "$MAX_CONCURRENCY" ]] \ + && fleet_gate_should_claim; then + local _crc=0 + fleet_claim >/dev/null 2>&1 || _crc=$? + fleet_gate_note_claim "$_crc" fi fi local shadow_local="" # the local (authoritative) job picked this iteration (for shadow compare) diff --git a/agent-queue/docs/GIGAFACTORY/FLEET_DISPATCH_REDESIGN.md b/agent-queue/docs/GIGAFACTORY/FLEET_DISPATCH_REDESIGN.md index 51b7c7e..1e44eef 100644 --- a/agent-queue/docs/GIGAFACTORY/FLEET_DISPATCH_REDESIGN.md +++ b/agent-queue/docs/GIGAFACTORY/FLEET_DISPATCH_REDESIGN.md @@ -26,6 +26,11 @@ > small-messages/body-from-Cosmos + token re-check + alerting (M2), and new > **Testing** and **Rollback & flags** blocks. No design element is now without > an implementation step. +> - v5 (2026-05-31): **M0 implemented + shipped** (`fleet_queue_state` + bump +> hooks + `GET /fleet/queue-state` in common_plat; `AQ_FLEET_GATE` gate-skip in +> agent-queue). Reconciled M0 to the as-built approach (gate the *claim*; keep +> `POLL_SECONDS` for local responsiveness rather than raising it globally) and +> ticked the M0 checklist. Backend vitest + gate logic verified. --- @@ -326,13 +331,21 @@ worktree is force-recreated at the next job for that repo. > numbering; all of M0–M3 sit *inside* roadmap **Phase 4**. The ticked checklist > is in §12. -### M0 — RU quick win (no new infra, fully reversible) — *do now* -- Add a per-product `queue_version`/`pending_count` doc bumped on submit/stage - change. The factory's loop does a **1-RU point-read** of that doc each tick and - only runs the expensive `listJobs`/claim when it changed. -- Raise `POLL_SECONDS` (e.g. 3 → 15–30) and add jittered backoff when idle. -- Expected: **~10–50× fewer claim queries at idle**, behavior otherwise - identical. Gate behind a flag; trivially revertible. +### M0 — RU quick win (no new infra, fully reversible) — *IMPLEMENTED* +- Per-product `fleet_queue_state` doc holds a monotonic `version`, bumped on job + create + every stage change (centralized in the repo layer, best-effort). +- The factory run loop does a **~1-RU point-read** (`GET /fleet/queue-state`) and + **skips the expensive claim** while the version is unchanged and it is not + mid-drain — rather than raising `POLL_SECONDS` globally (which would slow local, + non-fleet job pickup). A periodic safety backstop + fail-open-on-read-error + guarantee work is never stranded. +- Gated behind **`AQ_FLEET_GATE=1`** (default OFF ⇒ byte-for-byte prior behavior). +- Expected: **~10–50× fewer claim queries at idle**, local responsiveness + unchanged. +- Code: common_plat `services/platform-service/src/modules/fleet/{types,repository,routes}.ts` + + `lib/cosmos-init.ts`; `agent-queue/lib/fleet-client.sh` (`fleet_gate_*`) + the + run-loop hook in `agent-queue.sh`. Tests: fleet vitest (repo bump + endpoint) + + selftest `39b` (gate decisions). ### M1 — Stand up the broker in **shadow** - Provision Service Bus (`fleet-dispatch` topic + subscriptions) with @@ -447,12 +460,12 @@ without an implementation step. - [ ] Schema: add `targetFactoryId` to `FleetJobDoc`, `repos[]` to `FleetFactoryDoc`; register a new `fleet_queue_state` doc (`/productId`) for the M0 gate; provision the change-feed **lease container**; update the container registry / `COSMOS_AUTO_INIT`. - [ ] RBAC via managed identity: dispatcher = Service Bus **Sender**, factories = **Listener** on their own subscription; no shared keys committed. -### M0 — RU quick win (no new infra) -- [ ] Add per-product `queue_version`/`pending_count` doc; bump on submit + stage change. -- [ ] Factory loop point-reads the gate each tick; run `listJobs`/claim only when it changed. -- [ ] Raise `POLL_SECONDS` default (3 → 15–30) + jittered idle backoff. -- [ ] Flag-gate (e.g. `AQ_FLEET_GATE=1`) with a clean off-switch. -- [ ] Verify: idle claim queries drop ~10–50×; functional behavior unchanged (selftest green). +### M0 — RU quick win (no new infra) — ✅ DONE +- [x] Add per-product `fleet_queue_state` doc; bump on create + every stage change (repo layer). +- [x] Factory loop point-reads the gate each tick; run the claim only when it changed / mid-drain / safety interval. +- [x] Keep `POLL_SECONDS` for local responsiveness; gate the *claim*, with a periodic safety backstop + fail-open (instead of raising the global poll interval). +- [x] Flag-gate `AQ_FLEET_GATE=1` (default OFF) with a clean off-switch. +- [x] Tests: fleet vitest (repo bump + `GET /fleet/queue-state`) + selftest `39b` (gate decisions) green; gate logic verified standalone. ### Routing-model fix (lands with M0/M1) - [ ] Add `repo:` capability token; factories advertise local repos via heartbeat (`repos[]`). diff --git a/agent-queue/lib/fleet-client.sh b/agent-queue/lib/fleet-client.sh index c6ae2e5..4871aff 100644 --- a/agent-queue/lib/fleet-client.sh +++ b/agent-queue/lib/fleet-client.sh @@ -66,6 +66,20 @@ AQ_FLEET_SHADOW_LOG="${AQ_FLEET_SHADOW_LOG:-}" _AQ_FLEET_SHADOW_WARNED=0 # one-shot ROUTE>SHADOW precedence warning (per process) SHADOW_COORD_JOB="" # set by fleet_shadow_claim: would-be coordinator job id +# ── §M0 RU gate (docs/GIGAFACTORY/FLEET_DISPATCH_REDESIGN.md §8/§12) ── +# When ON, the run loop point-reads a cheap per-product queue version +# (GET /fleet/queue-state, ~1 RU) and SKIPS the expensive claim while nothing has +# changed and we are not mid-drain — slashing idle Cosmos RU. Default OFF +# (opt-in): behavior is byte-for-byte unchanged unless AQ_FLEET_GATE=1, and the +# gate always FAILS OPEN (claims) on any read error so work is never stranded. +AQ_FLEET_GATE="${AQ_FLEET_GATE:-0}" +# Force a full claim at least this often even when the gate is unchanged (backstops +# a missed/raced version bump). 0 disables the periodic backstop. +AQ_FLEET_GATE_SAFETY_SEC="${AQ_FLEET_GATE_SAFETY_SEC:-300}" +AQ_FLEET_GATE_SEEN="" # last-seen queue version (mutable, per process) +AQ_FLEET_GATE_TS=0 # epoch of the last full (drained) claim attempt +AQ_FLEET_GATE_DRAINING=1 # 1 = keep claiming (last claim got a job / startup) + # fleet_enabled — true iff the coordinator integration is switched on. fleet_enabled() { [[ "${AQ_FLEET:-0}" == 1 ]]; } @@ -175,6 +189,48 @@ fleet_heartbeat_maybe() { return 0 } +# ── §M0 RU gate helpers ───────────────────────────────────────────── +# fleet_gate_enabled — true iff the cheap-poll gate is switched on. +fleet_gate_enabled() { fleet_enabled && [[ "${AQ_FLEET_GATE:-0}" == 1 ]]; } + +# fleet_queue_version — print the product's queue version (GET /fleet/queue-state); +# return non-zero on any read failure so callers can fail open. +fleet_queue_version() { + _fleet_call GET "/fleet/queue-state" + case "$FLEET_CODE" in 2*) :;; *) return 1;; esac + printf '%s' "$FLEET_BODY" | _fleet_json_num version +} + +# fleet_gate_should_claim — 0 = run the (expensive) claim this tick, 1 = skip it. +# Read-only. Fails OPEN (claim) on any uncertainty so work is never stranded. +# Always 0 when the gate is OFF, preserving the pre-gate behavior exactly. +fleet_gate_should_claim() { + fleet_gate_enabled || return 0 # gate off -> always claim + [[ "${AQ_FLEET_GATE_DRAINING:-1}" == 1 ]] && return 0 # mid-drain -> keep claiming + local now; now=$(date +%s) + if [[ "${AQ_FLEET_GATE_SAFETY_SEC:-0}" -gt 0 \ + && $(( now - ${AQ_FLEET_GATE_TS:-0} )) -ge "${AQ_FLEET_GATE_SAFETY_SEC}" ]]; then + return 0 # periodic safety backstop + fi + local v; v=$(fleet_queue_version) || return 0 # read failed -> fail open + [[ -n "$v" ]] || return 0 + [[ "$v" != "${AQ_FLEET_GATE_SEEN:-}" ]] && return 0 # changed -> claim + return 1 # unchanged + within backstop -> skip +} + +# fleet_gate_note_claim — update gate state after a claim attempt. +# rc 0 (claimed a job) -> stay draining (there may be more, keep claiming). +# rc 2 (nothing claimable) / 1 (API error) -> arm the gate: record the current +# version + timestamp and stop draining, so we skip until the version changes. +fleet_gate_note_claim() { + fleet_gate_enabled || return 0 + if [[ "${1:-1}" == 0 ]]; then AQ_FLEET_GATE_DRAINING=1; return 0; fi + AQ_FLEET_GATE_DRAINING=0 + AQ_FLEET_GATE_TS=$(date +%s) + local v; v=$(fleet_queue_version) && [[ -n "$v" ]] && AQ_FLEET_GATE_SEEN="$v" + return 0 +} + # ── Claim — pull one job and materialize it as a local inbox .md ──── # Returns 0 = claimed + materialized, 2 = nothing claimable, 1 = API error. fleet_claim() { diff --git a/agent-queue/selftest.sh b/agent-queue/selftest.sh index a9386fe..536138d 100755 --- a/agent-queue/selftest.sh +++ b/agent-queue/selftest.sh @@ -781,6 +781,9 @@ case "$1 $2" in "${AQ_FSTUB_JOB_ID:-fjob_1}" "${AQ_FSTUB_BODY:-do the work}" "$repo_field" fi ;; PATCH\ /fleet/jobs/*) printf '%s\n%s\n' '{}' "${AQ_FSTUB_PATCH_CODE:-200}" ;; + "GET /fleet/queue-state") + qv=0; [ -n "${AQ_FSTUB_QVER:-}" ] && [ -f "$AQ_FSTUB_QVER" ] && qv=$(cat "$AQ_FSTUB_QVER" 2>/dev/null || echo 0) + printf '{"productId":"p","version":%s}\n200\n' "${qv:-0}" ;; *) printf '%s\n200\n' '{}' ;; esac STUBEOF @@ -965,6 +968,42 @@ else fi unset AQ_FLEET_API_CMD AQ_FLEET_CWD AQ_FSTUB_CALLS AQ_FSTUB_CLAIM_FLAG AQ_FSTUB_JOB_ID AQ_FSTUB_BODY +# 39b. §M0 RU gate (unit): with AQ_FLEET_GATE=1 the gate skips the claim while the +# queue version is unchanged and not draining, claims again when it changes or +# while draining, and FAILS OPEN on a read error. (Gate OFF == always claim, which +# is exercised by every AQ_FLEET=1 case above.) Drives the gate functions directly. +gqver="$tmp/gate-qver"; echo 5 > "$gqver" +if gout=$(bash -c ' + set -uo pipefail + export AGENT_QUEUE_ROOT="'"$tmp"'/queue-gate" AQ_FLEET=1 AQ_FLEET_GATE=1 AQ_FLEET_GATE_SAFETY_SEC=0 + export AQ_FLEET_API_CMD="'"$fstub"'" AQ_FSTUB_QVER="'"$gqver"'" + source "'"$funcs"'" + source "'"$HERE"'/lib/fleet-client.sh" + fleet_gate_should_claim; printf "start=%s\n" "$?" # draining=1 (startup) -> claim + fleet_gate_note_claim 2 # drained empty @v5 -> arm gate + fleet_gate_should_claim; printf "unchanged=%s\n" "$?" # v5==seen -> SKIP (1) + echo 6 > "'"$gqver"'" + fleet_gate_should_claim; printf "changed=%s\n" "$?" # v6!=seen -> claim (0) + fleet_gate_note_claim 0 # got a job -> draining + fleet_gate_should_claim; printf "draining=%s\n" "$?" # draining -> claim (0) + fleet_gate_note_claim 2 # re-arm gate @v6 + export AQ_FLEET_API_CMD="'"$tmp"'/no-such-cmd" # read now fails + fleet_gate_should_claim; printf "failopen=%s\n" "$?" # read error -> fail OPEN (0) +'); then + if printf '%s\n' "$gout" | grep -qx 'start=0' \ + && printf '%s\n' "$gout" | grep -qx 'unchanged=1' \ + && printf '%s\n' "$gout" | grep -qx 'changed=0' \ + && printf '%s\n' "$gout" | grep -qx 'draining=0' \ + && printf '%s\n' "$gout" | grep -qx 'failopen=0'; then + pass "fleet M0 gate: skips unchanged, claims on change/drain, fails open on read error" + else + printf '%s\n' "$gout" >&2; fail "fleet M0 gate decisions incorrect" + fi +else + fail "fleet M0 gate unit invocation errored" +fi +unset AQ_FLEET_API_CMD AQ_FSTUB_QVER + # ───────────────────────────────────────────────────────────────────── # Phase 2 — Slice 4 cases (feature flags + shadow / dual-run). Reuses the # fleet stub. SHADOW mode = AQ_FLEET=1 + AQ_FLEET_ROUTE=0 + AQ_FLEET_SHADOW=1: