diff --git a/agent-queue/agent-queue.sh b/agent-queue/agent-queue.sh index dfd412e..cb3d8f8 100755 --- a/agent-queue/agent-queue.sh +++ b/agent-queue/agent-queue.sh @@ -129,18 +129,39 @@ _dur_to_secs() { fi } -# busy_keys -> newline list of lock keys currently held by active workers. -# A worker is active if its meta has no `ended=` and its pid is live (or the pid -# has not been written yet, i.e. it was just launched and the slot is reserved). -busy_keys() { - local f pid +# _meta_active -> 0 if the job is occupying a concurrency slot. +# Active = no `ended=` AND (pid is live, OR pid not yet written but the meta was +# created moments ago — the reserved-slot window between meta-write and launch). +# The <30s guard prevents a meta orphaned mid-launch (daemon killed in the gap) +# from pinning a slot forever. +_meta_active() { + local f=$1 pid mt age + grep -q '^ended=' "$f" && return 1 + pid=$(grep '^pid=' "$f" | head -1 | cut -d= -f2) + if [[ -n "$pid" ]]; then + kill -0 "$pid" 2>/dev/null + return $? + fi + mt=$(_mtime "$f"); age=$(( $(date +%s) - ${mt:-0} )) + [[ "$age" -lt 30 ]] +} + +# active_workers -> count of jobs occupying a concurrency slot (reservation-aware). +active_workers() { + local n=0 f for f in "$STATE"/*.meta; do [[ -e "$f" ]] || continue - grep -q '^ended=' "$f" && continue - pid=$(grep '^pid=' "$f" | head -1 | cut -d= -f2) - if [[ -z "$pid" ]] || kill -0 "$pid" 2>/dev/null; then - grep '^lock=' "$f" | head -1 | cut -d= -f2- - fi + _meta_active "$f" && n=$((n+1)) + done + echo "$n" +} + +# busy_keys -> newline list of lock keys currently held by active workers. +busy_keys() { + local f + for f in "$STATE"/*.meta; do + [[ -e "$f" ]] || continue + _meta_active "$f" && grep '^lock=' "$f" | head -1 | cut -d= -f2- done } @@ -271,18 +292,6 @@ run_worker() { fi } -# count live workers by checking recorded pids -live_workers() { - local n=0 f pid - for f in "$STATE"/*.meta; do - [[ -e "$f" ]] || continue - grep -q '^ended=' "$f" && continue - pid=$(grep '^pid=' "$f" | head -1 | cut -d= -f2) - [[ -n "$pid" ]] && kill -0 "$pid" 2>/dev/null && n=$((n+1)) - done - echo "$n" -} - # ── Commands ──────────────────────────────────────────────────────── cmd_init() { ensure_dirs; log "queue initialized at $C_BOLD$QUEUE_ROOT$C_RESET"; } @@ -336,7 +345,7 @@ cmd_run() { log "run loop started (max=$MAX_CONCURRENCY, default engine=$DEFAULT_ENGINE). Ctrl-C to stop." while true; do - local running; running=$(live_workers) + local running; running=$(active_workers) # launch jobs while we have capacity and an eligible inbox file while [[ "$running" -lt "$MAX_CONCURRENCY" ]]; do # pick the oldest inbox file whose lock key is not currently busy, so two @@ -372,11 +381,11 @@ cmd_run() { echo "pid=$!" >> "$STATE/$job.meta" log "▶ launching $C_BOLD$job$C_RESET (engine=$w_eng, lock=$w_key)" sleep 1 - running=$(live_workers) + running=$(active_workers) done if $once; then - [[ "$(live_workers)" -eq 0 && -z "$(ls -1 "$INBOX"/*.md 2>/dev/null)" ]] && { + [[ "$(active_workers)" -eq 0 && -z "$(ls -1 "$INBOX"/*.md 2>/dev/null)" ]] && { log "drain complete — inbox empty, no workers running"; rm -f "$STATE/daemon.pid"; exit 0; } fi sleep "$POLL_SECONDS"