fix(agent-queue): reserve concurrency slot before backgrounding worker

Replace live_workers with reservation-aware active_workers + shared _meta_active:
a job counts toward --max the moment its meta is written (before the worker is
backgrounded), so --max can never be exceeded. A <30s guard prevents a meta
orphaned mid-launch from pinning a slot. busy_keys now shares _meta_active.
This commit is contained in:
saravanakumardb1 2026-05-28 22:17:36 -07:00
parent 79331d591f
commit 11935d0539

View File

@ -129,18 +129,39 @@ _dur_to_secs() {
fi
}
# busy_keys -> newline list of lock keys currently held by active workers.
# A worker is active if its meta has no `ended=` and its pid is live (or the pid
# has not been written yet, i.e. it was just launched and the slot is reserved).
busy_keys() {
local f pid
# _meta_active <metafile> -> 0 if the job is occupying a concurrency slot.
# Active = no `ended=` AND (pid is live, OR pid not yet written but the meta was
# created moments ago — the reserved-slot window between meta-write and launch).
# The <30s guard prevents a meta orphaned mid-launch (daemon killed in the gap)
# from pinning a slot forever.
_meta_active() {
local f=$1 pid mt age
grep -q '^ended=' "$f" && return 1
pid=$(grep '^pid=' "$f" | head -1 | cut -d= -f2)
if [[ -n "$pid" ]]; then
kill -0 "$pid" 2>/dev/null
return $?
fi
mt=$(_mtime "$f"); age=$(( $(date +%s) - ${mt:-0} ))
[[ "$age" -lt 30 ]]
}
# active_workers -> count of jobs occupying a concurrency slot (reservation-aware).
active_workers() {
local n=0 f
for f in "$STATE"/*.meta; do
[[ -e "$f" ]] || continue
grep -q '^ended=' "$f" && continue
pid=$(grep '^pid=' "$f" | head -1 | cut -d= -f2)
if [[ -z "$pid" ]] || kill -0 "$pid" 2>/dev/null; then
grep '^lock=' "$f" | head -1 | cut -d= -f2-
fi
_meta_active "$f" && n=$((n+1))
done
echo "$n"
}
# busy_keys -> newline list of lock keys currently held by active workers.
busy_keys() {
local f
for f in "$STATE"/*.meta; do
[[ -e "$f" ]] || continue
_meta_active "$f" && grep '^lock=' "$f" | head -1 | cut -d= -f2-
done
}
@ -271,18 +292,6 @@ run_worker() {
fi
}
# count live workers by checking recorded pids
live_workers() {
local n=0 f pid
for f in "$STATE"/*.meta; do
[[ -e "$f" ]] || continue
grep -q '^ended=' "$f" && continue
pid=$(grep '^pid=' "$f" | head -1 | cut -d= -f2)
[[ -n "$pid" ]] && kill -0 "$pid" 2>/dev/null && n=$((n+1))
done
echo "$n"
}
# ── Commands ────────────────────────────────────────────────────────
cmd_init() { ensure_dirs; log "queue initialized at $C_BOLD$QUEUE_ROOT$C_RESET"; }
@ -336,7 +345,7 @@ cmd_run() {
log "run loop started (max=$MAX_CONCURRENCY, default engine=$DEFAULT_ENGINE). Ctrl-C to stop."
while true; do
local running; running=$(live_workers)
local running; running=$(active_workers)
# launch jobs while we have capacity and an eligible inbox file
while [[ "$running" -lt "$MAX_CONCURRENCY" ]]; do
# pick the oldest inbox file whose lock key is not currently busy, so two
@ -372,11 +381,11 @@ cmd_run() {
echo "pid=$!" >> "$STATE/$job.meta"
log "▶ launching $C_BOLD$job$C_RESET (engine=$w_eng, lock=$w_key)"
sleep 1
running=$(live_workers)
running=$(active_workers)
done
if $once; then
[[ "$(live_workers)" -eq 0 && -z "$(ls -1 "$INBOX"/*.md 2>/dev/null)" ]] && {
[[ "$(active_workers)" -eq 0 && -z "$(ls -1 "$INBOX"/*.md 2>/dev/null)" ]] && {
log "drain complete — inbox empty, no workers running"; rm -f "$STATE/daemon.pid"; exit 0; }
fi
sleep "$POLL_SECONDS"