feat(agent-queue): M0 RU gate — skip the claim when the queue is unchanged

Adds AQ_FLEET_GATE (default OFF): the run loop point-reads the cheap per-product
queue version (GET /fleet/queue-state) and SKIPS the expensive /fleet/claim while
the version is unchanged and it is not mid-drain, with a periodic safety backstop
and fail-open-on-read-error so work is never stranded. Keeps POLL_SECONDS for
local job responsiveness rather than raising it globally. selftest 39b covers the
gate decisions; reconciles the M0 section of the dispatch redesign doc.

Generated with [Devin](https://cli.devin.ai/docs)

Co-Authored-By: Devin <158243242+devin-ai-integration[bot]@users.noreply.github.com>
This commit is contained in:
saravanakumardb1 2026-05-31 23:19:01 -07:00
parent 29afe59604
commit 41d8067724
4 changed files with 129 additions and 15 deletions

View File

@ -1658,8 +1658,14 @@ cmd_run() {
# ROUTE flag (§16): only SOURCE work from the coordinator when route_via_service
# is on (ROUTE=1, the default). With ROUTE=0 the LOCAL inbox is authoritative
# (pre-cutover / shadow) — the coordinator is never used to source work.
if fleet_route_enabled && [[ "$(active_workers)" -lt "$MAX_CONCURRENCY" ]]; then
fleet_claim >/dev/null 2>&1 || true
# §M0 RU gate: when AQ_FLEET_GATE=1, skip the (expensive) claim while the
# cheap per-product queue version is unchanged and we are not mid-drain.
# Gate off (default) -> fleet_gate_should_claim always true == prior behavior.
if fleet_route_enabled && [[ "$(active_workers)" -lt "$MAX_CONCURRENCY" ]] \
&& fleet_gate_should_claim; then
local _crc=0
fleet_claim >/dev/null 2>&1 || _crc=$?
fleet_gate_note_claim "$_crc"
fi
fi
local shadow_local="" # the local (authoritative) job picked this iteration (for shadow compare)

View File

@ -26,6 +26,11 @@
> small-messages/body-from-Cosmos + token re-check + alerting (M2), and new
> **Testing** and **Rollback & flags** blocks. No design element is now without
> an implementation step.
> - v5 (2026-05-31): **M0 implemented + shipped** (`fleet_queue_state` + bump
> hooks + `GET /fleet/queue-state` in common_plat; `AQ_FLEET_GATE` gate-skip in
> agent-queue). Reconciled M0 to the as-built approach (gate the *claim*; keep
> `POLL_SECONDS` for local responsiveness rather than raising it globally) and
> ticked the M0 checklist. Backend vitest + gate logic verified.
---
@ -326,13 +331,21 @@ worktree is force-recreated at the next job for that repo.
> numbering; all of M0M3 sit *inside* roadmap **Phase 4**. The ticked checklist
> is in §12.
### M0 — RU quick win (no new infra, fully reversible) — *do now*
- Add a per-product `queue_version`/`pending_count` doc bumped on submit/stage
change. The factory's loop does a **1-RU point-read** of that doc each tick and
only runs the expensive `listJobs`/claim when it changed.
- Raise `POLL_SECONDS` (e.g. 3 → 1530) and add jittered backoff when idle.
- Expected: **~1050× fewer claim queries at idle**, behavior otherwise
identical. Gate behind a flag; trivially revertible.
### M0 — RU quick win (no new infra, fully reversible) — *IMPLEMENTED*
- Per-product `fleet_queue_state` doc holds a monotonic `version`, bumped on job
create + every stage change (centralized in the repo layer, best-effort).
- The factory run loop does a **~1-RU point-read** (`GET /fleet/queue-state`) and
**skips the expensive claim** while the version is unchanged and it is not
mid-drain — rather than raising `POLL_SECONDS` globally (which would slow local,
non-fleet job pickup). A periodic safety backstop + fail-open-on-read-error
guarantee work is never stranded.
- Gated behind **`AQ_FLEET_GATE=1`** (default OFF ⇒ byte-for-byte prior behavior).
- Expected: **~1050× fewer claim queries at idle**, local responsiveness
unchanged.
- Code: common_plat `services/platform-service/src/modules/fleet/{types,repository,routes}.ts`
+ `lib/cosmos-init.ts`; `agent-queue/lib/fleet-client.sh` (`fleet_gate_*`) + the
run-loop hook in `agent-queue.sh`. Tests: fleet vitest (repo bump + endpoint) +
selftest `39b` (gate decisions).
### M1 — Stand up the broker in **shadow**
- Provision Service Bus (`fleet-dispatch` topic + subscriptions) with
@ -447,12 +460,12 @@ without an implementation step.
- [ ] Schema: add `targetFactoryId` to `FleetJobDoc`, `repos[]` to `FleetFactoryDoc`; register a new `fleet_queue_state` doc (`/productId`) for the M0 gate; provision the change-feed **lease container**; update the container registry / `COSMOS_AUTO_INIT`.
- [ ] RBAC via managed identity: dispatcher = Service Bus **Sender**, factories = **Listener** on their own subscription; no shared keys committed.
### M0 — RU quick win (no new infra)
- [ ] Add per-product `queue_version`/`pending_count` doc; bump on submit + stage change.
- [ ] Factory loop point-reads the gate each tick; run `listJobs`/claim only when it changed.
- [ ] Raise `POLL_SECONDS` default (3 → 1530) + jittered idle backoff.
- [ ] Flag-gate (e.g. `AQ_FLEET_GATE=1`) with a clean off-switch.
- [ ] Verify: idle claim queries drop ~1050×; functional behavior unchanged (selftest green).
### M0 — RU quick win (no new infra) — ✅ DONE
- [x] Add per-product `fleet_queue_state` doc; bump on create + every stage change (repo layer).
- [x] Factory loop point-reads the gate each tick; run the claim only when it changed / mid-drain / safety interval.
- [x] Keep `POLL_SECONDS` for local responsiveness; gate the *claim*, with a periodic safety backstop + fail-open (instead of raising the global poll interval).
- [x] Flag-gate `AQ_FLEET_GATE=1` (default OFF) with a clean off-switch.
- [x] Tests: fleet vitest (repo bump + `GET /fleet/queue-state`) + selftest `39b` (gate decisions) green; gate logic verified standalone.
### Routing-model fix (lands with M0/M1)
- [ ] Add `repo:<name>` capability token; factories advertise local repos via heartbeat (`repos[]`).

View File

@ -66,6 +66,20 @@ AQ_FLEET_SHADOW_LOG="${AQ_FLEET_SHADOW_LOG:-}"
_AQ_FLEET_SHADOW_WARNED=0 # one-shot ROUTE>SHADOW precedence warning (per process)
SHADOW_COORD_JOB="" # set by fleet_shadow_claim: would-be coordinator job id
# ── §M0 RU gate (docs/GIGAFACTORY/FLEET_DISPATCH_REDESIGN.md §8/§12) ──
# When ON, the run loop point-reads a cheap per-product queue version
# (GET /fleet/queue-state, ~1 RU) and SKIPS the expensive claim while nothing has
# changed and we are not mid-drain — slashing idle Cosmos RU. Default OFF
# (opt-in): behavior is byte-for-byte unchanged unless AQ_FLEET_GATE=1, and the
# gate always FAILS OPEN (claims) on any read error so work is never stranded.
AQ_FLEET_GATE="${AQ_FLEET_GATE:-0}"
# Force a full claim at least this often even when the gate is unchanged (backstops
# a missed/raced version bump). 0 disables the periodic backstop.
AQ_FLEET_GATE_SAFETY_SEC="${AQ_FLEET_GATE_SAFETY_SEC:-300}"
AQ_FLEET_GATE_SEEN="" # last-seen queue version (mutable, per process)
AQ_FLEET_GATE_TS=0 # epoch of the last full (drained) claim attempt
AQ_FLEET_GATE_DRAINING=1 # 1 = keep claiming (last claim got a job / startup)
# fleet_enabled — true iff the coordinator integration is switched on.
fleet_enabled() { [[ "${AQ_FLEET:-0}" == 1 ]]; }
@ -175,6 +189,48 @@ fleet_heartbeat_maybe() {
return 0
}
# ── §M0 RU gate helpers ─────────────────────────────────────────────
# fleet_gate_enabled — true iff the cheap-poll gate is switched on.
fleet_gate_enabled() { fleet_enabled && [[ "${AQ_FLEET_GATE:-0}" == 1 ]]; }
# fleet_queue_version — print the product's queue version (GET /fleet/queue-state);
# return non-zero on any read failure so callers can fail open.
fleet_queue_version() {
_fleet_call GET "/fleet/queue-state"
case "$FLEET_CODE" in 2*) :;; *) return 1;; esac
printf '%s' "$FLEET_BODY" | _fleet_json_num version
}
# fleet_gate_should_claim — 0 = run the (expensive) claim this tick, 1 = skip it.
# Read-only. Fails OPEN (claim) on any uncertainty so work is never stranded.
# Always 0 when the gate is OFF, preserving the pre-gate behavior exactly.
fleet_gate_should_claim() {
fleet_gate_enabled || return 0 # gate off -> always claim
[[ "${AQ_FLEET_GATE_DRAINING:-1}" == 1 ]] && return 0 # mid-drain -> keep claiming
local now; now=$(date +%s)
if [[ "${AQ_FLEET_GATE_SAFETY_SEC:-0}" -gt 0 \
&& $(( now - ${AQ_FLEET_GATE_TS:-0} )) -ge "${AQ_FLEET_GATE_SAFETY_SEC}" ]]; then
return 0 # periodic safety backstop
fi
local v; v=$(fleet_queue_version) || return 0 # read failed -> fail open
[[ -n "$v" ]] || return 0
[[ "$v" != "${AQ_FLEET_GATE_SEEN:-}" ]] && return 0 # changed -> claim
return 1 # unchanged + within backstop -> skip
}
# fleet_gate_note_claim <claim_rc> — update gate state after a claim attempt.
# rc 0 (claimed a job) -> stay draining (there may be more, keep claiming).
# rc 2 (nothing claimable) / 1 (API error) -> arm the gate: record the current
# version + timestamp and stop draining, so we skip until the version changes.
fleet_gate_note_claim() {
fleet_gate_enabled || return 0
if [[ "${1:-1}" == 0 ]]; then AQ_FLEET_GATE_DRAINING=1; return 0; fi
AQ_FLEET_GATE_DRAINING=0
AQ_FLEET_GATE_TS=$(date +%s)
local v; v=$(fleet_queue_version) && [[ -n "$v" ]] && AQ_FLEET_GATE_SEEN="$v"
return 0
}
# ── Claim — pull one job and materialize it as a local inbox .md ────
# Returns 0 = claimed + materialized, 2 = nothing claimable, 1 = API error.
fleet_claim() {

View File

@ -781,6 +781,9 @@ case "$1 $2" in
"${AQ_FSTUB_JOB_ID:-fjob_1}" "${AQ_FSTUB_BODY:-do the work}" "$repo_field"
fi ;;
PATCH\ /fleet/jobs/*) printf '%s\n%s\n' '{}' "${AQ_FSTUB_PATCH_CODE:-200}" ;;
"GET /fleet/queue-state")
qv=0; [ -n "${AQ_FSTUB_QVER:-}" ] && [ -f "$AQ_FSTUB_QVER" ] && qv=$(cat "$AQ_FSTUB_QVER" 2>/dev/null || echo 0)
printf '{"productId":"p","version":%s}\n200\n' "${qv:-0}" ;;
*) printf '%s\n200\n' '{}' ;;
esac
STUBEOF
@ -965,6 +968,42 @@ else
fi
unset AQ_FLEET_API_CMD AQ_FLEET_CWD AQ_FSTUB_CALLS AQ_FSTUB_CLAIM_FLAG AQ_FSTUB_JOB_ID AQ_FSTUB_BODY
# 39b. §M0 RU gate (unit): with AQ_FLEET_GATE=1 the gate skips the claim while the
# queue version is unchanged and not draining, claims again when it changes or
# while draining, and FAILS OPEN on a read error. (Gate OFF == always claim, which
# is exercised by every AQ_FLEET=1 case above.) Drives the gate functions directly.
gqver="$tmp/gate-qver"; echo 5 > "$gqver"
if gout=$(bash -c '
set -uo pipefail
export AGENT_QUEUE_ROOT="'"$tmp"'/queue-gate" AQ_FLEET=1 AQ_FLEET_GATE=1 AQ_FLEET_GATE_SAFETY_SEC=0
export AQ_FLEET_API_CMD="'"$fstub"'" AQ_FSTUB_QVER="'"$gqver"'"
source "'"$funcs"'"
source "'"$HERE"'/lib/fleet-client.sh"
fleet_gate_should_claim; printf "start=%s\n" "$?" # draining=1 (startup) -> claim
fleet_gate_note_claim 2 # drained empty @v5 -> arm gate
fleet_gate_should_claim; printf "unchanged=%s\n" "$?" # v5==seen -> SKIP (1)
echo 6 > "'"$gqver"'"
fleet_gate_should_claim; printf "changed=%s\n" "$?" # v6!=seen -> claim (0)
fleet_gate_note_claim 0 # got a job -> draining
fleet_gate_should_claim; printf "draining=%s\n" "$?" # draining -> claim (0)
fleet_gate_note_claim 2 # re-arm gate @v6
export AQ_FLEET_API_CMD="'"$tmp"'/no-such-cmd" # read now fails
fleet_gate_should_claim; printf "failopen=%s\n" "$?" # read error -> fail OPEN (0)
'); then
if printf '%s\n' "$gout" | grep -qx 'start=0' \
&& printf '%s\n' "$gout" | grep -qx 'unchanged=1' \
&& printf '%s\n' "$gout" | grep -qx 'changed=0' \
&& printf '%s\n' "$gout" | grep -qx 'draining=0' \
&& printf '%s\n' "$gout" | grep -qx 'failopen=0'; then
pass "fleet M0 gate: skips unchanged, claims on change/drain, fails open on read error"
else
printf '%s\n' "$gout" >&2; fail "fleet M0 gate decisions incorrect"
fi
else
fail "fleet M0 gate unit invocation errored"
fi
unset AQ_FLEET_API_CMD AQ_FSTUB_QVER
# ─────────────────────────────────────────────────────────────────────
# Phase 2 — Slice 4 cases (feature flags + shadow / dual-run). Reuses the
# fleet stub. SHADOW mode = AQ_FLEET=1 + AQ_FLEET_ROUTE=0 + AQ_FLEET_SHADOW=1: