From f14e6c2336c07407ecec7563390cbf2b86d9b205 Mon Sep 17 00:00:00 2001 From: saravanakumardb1 Date: Thu, 28 May 2026 22:10:30 -0700 Subject: [PATCH] feat(agent-queue): per-cwd locking so two agents never share a repo Serialize jobs by lock key (frontmatter 'lock:' override, default cwd) via the single run-loop's pre-launch eligibility check; the oldest non-busy job is picked regardless of --max. Adds a flock-based worker guard where flock exists (Linux); macOS relies on the single-daemon model. Records lock= in job meta. --- agent-queue/agent-queue.sh | 85 +++++++++++++++++++++++++++++++++----- 1 file changed, 75 insertions(+), 10 deletions(-) diff --git a/agent-queue/agent-queue.sh b/agent-queue/agent-queue.sh index 7af9675..20d8de3 100755 --- a/agent-queue/agent-queue.sh +++ b/agent-queue/agent-queue.sh @@ -29,12 +29,17 @@ DONE="$QUEUE_ROOT/done" FAILED="$QUEUE_ROOT/failed" LOGS="$QUEUE_ROOT/logs" STATE="$QUEUE_ROOT/.state" +LOCKS="$QUEUE_ROOT/locks" # ── Config (env-overridable) ──────────────────────────────────────── MAX_CONCURRENCY="${AGENT_QUEUE_MAX:-2}" DEFAULT_ENGINE="${AGENT_QUEUE_ENGINE:-devin}" POLL_SECONDS="${AGENT_QUEUE_POLL:-3}" +# flock is used for cross-process lock hardening when available (Linux). macOS +# has no flock; mutual exclusion there relies on the single run-loop (see cmd_run). +FLOCK_BIN="${FLOCK_BIN:-$(command -v flock || true)}" + DEVIN_BIN="${DEVIN_BIN:-$(command -v devin || echo "$HOME/.local/bin/devin")}" CLAUDE_BIN="${CLAUDE_BIN:-$(command -v claude || echo claude)}" CODEX_BIN="${CODEX_BIN:-$(command -v codex || echo codex)}" @@ -52,7 +57,7 @@ err() { printf '%s[agent-queue]%s %s\n' "$C_RED" "$C_RESET" "$*" >&2; } die() { err "$*"; exit 1; } # ── Init ──────────────────────────────────────────────────────────── -ensure_dirs() { mkdir -p "$INBOX" "$DOING" "$DONE" "$FAILED" "$LOGS" "$STATE"; } +ensure_dirs() { mkdir -p "$INBOX" "$DOING" "$DONE" "$FAILED" "$LOGS" "$STATE" "$LOCKS"; } # ── Frontmatter parsing ───────────────────────────────────────────── # fm_get @@ -83,6 +88,33 @@ strip_frontmatter() { { if (!infm) print }' "$1" } +# lock_key_for -> the mutual-exclusion key for a job: frontmatter `lock:` +# if set, otherwise the cwd. Jobs sharing a key never run concurrently. +lock_key_for() { + local f=$1 k + k=$(fm_get "$f" lock "") + [[ -n "$k" ]] && { printf '%s' "$k"; return; } + fm_get "$f" cwd "$PWD" +} + +# _keyhash -> stable filename-safe token for a lock key +_keyhash() { printf '%s' "$1" | cksum | awk '{print $1}'; } + +# busy_keys -> newline list of lock keys currently held by active workers. +# A worker is active if its meta has no `ended=` and its pid is live (or the pid +# has not been written yet, i.e. it was just launched and the slot is reserved). +busy_keys() { + local f pid + for f in "$STATE"/*.meta; do + [[ -e "$f" ]] || continue + grep -q '^ended=' "$f" && continue + pid=$(grep '^pid=' "$f" | head -1 | cut -d= -f2) + if [[ -z "$pid" ]] || kill -0 "$pid" 2>/dev/null; then + grep '^lock=' "$f" | head -1 | cut -d= -f2- + fi + done +} + # ── Engine driver: builds argv into AGENT_CMD[]; sets AGENT_STDIN if the ── # prompt should be fed on stdin (claude/codex) rather than a flag. $pf is the # frontmatter-STRIPPED body file, so a body starting with '--' is never @@ -141,13 +173,33 @@ run_worker() { local bodyf="$STATE/$job.body.md" strip_frontmatter "$doing_file" > "$bodyf" build_agent_cmd "$engine" "$bodyf" "$yolo" - local rc - if [[ -n "$AGENT_STDIN" ]]; then - ( cd "$cwd" && "${AGENT_CMD[@]}" < "$AGENT_STDIN" ) >> "$logf" 2>&1 + + _run_agent() { + if [[ -n "$AGENT_STDIN" ]]; then + ( cd "$cwd" && "${AGENT_CMD[@]}" < "$AGENT_STDIN" ) + else + ( cd "$cwd" && "${AGENT_CMD[@]}" ) + fi + } + + local rc lockkey + lockkey=$(lock_key_for "$doing_file") + if [[ -n "$FLOCK_BIN" ]]; then + # Cross-process hardening where flock exists (Linux CI). The single run-loop + # already serializes by lock key; this guards against a stray second launcher. + local lf="$LOCKS/$(_keyhash "$lockkey").lock" + ( "$FLOCK_BIN" -n 9 || exit 75; _run_agent ) 9>"$lf" >> "$logf" 2>&1 + rc=$? + if [[ $rc -eq 75 ]]; then + echo "lock busy (key=$lockkey) — requeued to inbox" >> "$logf" + mv "$doing_file" "$INBOX/" 2>/dev/null + { echo "ended=$(date +%s)"; echo "result=requeued"; } >> "$metaf" + return 0 + fi else - ( cd "$cwd" && "${AGENT_CMD[@]}" ) >> "$logf" 2>&1 + _run_agent >> "$logf" 2>&1 + rc=$? fi - rc=$? echo "ended=$(date +%s)" >> "$metaf" echo "exit=$rc" >> "$metaf" @@ -228,28 +280,40 @@ cmd_run() { while true; do local running; running=$(live_workers) - # launch jobs while we have capacity and inbox files + # launch jobs while we have capacity and an eligible inbox file while [[ "$running" -lt "$MAX_CONCURRENCY" ]]; do - local next; next=$(ls -1 "$INBOX"/*.md 2>/dev/null | sort | head -1) + # pick the oldest inbox file whose lock key is not currently busy, so two + # jobs sharing a cwd (or `lock:` key) never run at once, regardless of --max. + local busy; busy=$(busy_keys) + local next="" cand cand_key + while IFS= read -r cand; do + [[ -n "$cand" ]] || continue + cand_key=$(lock_key_for "$cand") + if printf '%s\n' "$busy" | grep -qxF -- "$cand_key"; then continue; fi + next="$cand"; break + done < <(ls -1 "$INBOX"/*.md 2>/dev/null | sort) [[ -z "$next" ]] && break + local job; job=$(basename "$next"); job=${job%.md} local doing_file="$DOING/$(basename "$next")" mv "$next" "$doing_file" - local w_eng w_cwd w_yolo + local w_eng w_cwd w_yolo w_key w_eng=$(fm_get "$doing_file" engine "$DEFAULT_ENGINE") w_cwd=$(fm_get "$doing_file" cwd "$PWD") w_yolo=$(fm_get "$doing_file" yolo "true") + w_key=$(lock_key_for "$doing_file") # write meta BEFORE launch (no pid yet), then append the worker pid from $! { echo "job=$job" echo "engine=$w_eng" echo "cwd=$w_cwd" echo "yolo=$w_yolo" + echo "lock=$w_key" echo "started=$(date +%s)" } > "$STATE/$job.meta" run_worker "$doing_file" & echo "pid=$!" >> "$STATE/$job.meta" - log "▶ launching $C_BOLD$job$C_RESET (engine=$w_eng)" + log "▶ launching $C_BOLD$job$C_RESET (engine=$w_eng, lock=$w_key)" sleep 1 running=$(live_workers) done @@ -360,6 +424,7 @@ ${C_BOLD}TASK FRONTMATTER${C_RESET} (top of each .md) engine: devin cwd: /Users/you/code/repo yolo: true + lock: my-repo # optional; defaults to cwd. Jobs sharing a key run serially --- ${C_BOLD}ENV${C_RESET}