feat(agent-queue): per-cwd locking so two agents never share a repo
Serialize jobs by lock key (frontmatter 'lock:' override, default cwd) via the single run-loop's pre-launch eligibility check; the oldest non-busy job is picked regardless of --max. Adds a flock-based worker guard where flock exists (Linux); macOS relies on the single-daemon model. Records lock= in job meta.
This commit is contained in:
parent
9b49c28af5
commit
f14e6c2336
@ -29,12 +29,17 @@ DONE="$QUEUE_ROOT/done"
|
||||
FAILED="$QUEUE_ROOT/failed"
|
||||
LOGS="$QUEUE_ROOT/logs"
|
||||
STATE="$QUEUE_ROOT/.state"
|
||||
LOCKS="$QUEUE_ROOT/locks"
|
||||
|
||||
# ── Config (env-overridable) ────────────────────────────────────────
|
||||
MAX_CONCURRENCY="${AGENT_QUEUE_MAX:-2}"
|
||||
DEFAULT_ENGINE="${AGENT_QUEUE_ENGINE:-devin}"
|
||||
POLL_SECONDS="${AGENT_QUEUE_POLL:-3}"
|
||||
|
||||
# flock is used for cross-process lock hardening when available (Linux). macOS
|
||||
# has no flock; mutual exclusion there relies on the single run-loop (see cmd_run).
|
||||
FLOCK_BIN="${FLOCK_BIN:-$(command -v flock || true)}"
|
||||
|
||||
DEVIN_BIN="${DEVIN_BIN:-$(command -v devin || echo "$HOME/.local/bin/devin")}"
|
||||
CLAUDE_BIN="${CLAUDE_BIN:-$(command -v claude || echo claude)}"
|
||||
CODEX_BIN="${CODEX_BIN:-$(command -v codex || echo codex)}"
|
||||
@ -52,7 +57,7 @@ err() { printf '%s[agent-queue]%s %s\n' "$C_RED" "$C_RESET" "$*" >&2; }
|
||||
die() { err "$*"; exit 1; }
|
||||
|
||||
# ── Init ────────────────────────────────────────────────────────────
|
||||
ensure_dirs() { mkdir -p "$INBOX" "$DOING" "$DONE" "$FAILED" "$LOGS" "$STATE"; }
|
||||
ensure_dirs() { mkdir -p "$INBOX" "$DOING" "$DONE" "$FAILED" "$LOGS" "$STATE" "$LOCKS"; }
|
||||
|
||||
# ── Frontmatter parsing ─────────────────────────────────────────────
|
||||
# fm_get <file> <key> <default>
|
||||
@ -83,6 +88,33 @@ strip_frontmatter() {
|
||||
{ if (!infm) print }' "$1"
|
||||
}
|
||||
|
||||
# lock_key_for <file> -> the mutual-exclusion key for a job: frontmatter `lock:`
|
||||
# if set, otherwise the cwd. Jobs sharing a key never run concurrently.
|
||||
lock_key_for() {
|
||||
local f=$1 k
|
||||
k=$(fm_get "$f" lock "")
|
||||
[[ -n "$k" ]] && { printf '%s' "$k"; return; }
|
||||
fm_get "$f" cwd "$PWD"
|
||||
}
|
||||
|
||||
# _keyhash <key> -> stable filename-safe token for a lock key
|
||||
_keyhash() { printf '%s' "$1" | cksum | awk '{print $1}'; }
|
||||
|
||||
# busy_keys -> newline list of lock keys currently held by active workers.
|
||||
# A worker is active if its meta has no `ended=` and its pid is live (or the pid
|
||||
# has not been written yet, i.e. it was just launched and the slot is reserved).
|
||||
busy_keys() {
|
||||
local f pid
|
||||
for f in "$STATE"/*.meta; do
|
||||
[[ -e "$f" ]] || continue
|
||||
grep -q '^ended=' "$f" && continue
|
||||
pid=$(grep '^pid=' "$f" | head -1 | cut -d= -f2)
|
||||
if [[ -z "$pid" ]] || kill -0 "$pid" 2>/dev/null; then
|
||||
grep '^lock=' "$f" | head -1 | cut -d= -f2-
|
||||
fi
|
||||
done
|
||||
}
|
||||
|
||||
# ── Engine driver: builds argv into AGENT_CMD[]; sets AGENT_STDIN if the ──
|
||||
# prompt should be fed on stdin (claude/codex) rather than a flag. $pf is the
|
||||
# frontmatter-STRIPPED body file, so a body starting with '--' is never
|
||||
@ -141,13 +173,33 @@ run_worker() {
|
||||
local bodyf="$STATE/$job.body.md"
|
||||
strip_frontmatter "$doing_file" > "$bodyf"
|
||||
build_agent_cmd "$engine" "$bodyf" "$yolo"
|
||||
local rc
|
||||
if [[ -n "$AGENT_STDIN" ]]; then
|
||||
( cd "$cwd" && "${AGENT_CMD[@]}" < "$AGENT_STDIN" ) >> "$logf" 2>&1
|
||||
|
||||
_run_agent() {
|
||||
if [[ -n "$AGENT_STDIN" ]]; then
|
||||
( cd "$cwd" && "${AGENT_CMD[@]}" < "$AGENT_STDIN" )
|
||||
else
|
||||
( cd "$cwd" && "${AGENT_CMD[@]}" )
|
||||
fi
|
||||
}
|
||||
|
||||
local rc lockkey
|
||||
lockkey=$(lock_key_for "$doing_file")
|
||||
if [[ -n "$FLOCK_BIN" ]]; then
|
||||
# Cross-process hardening where flock exists (Linux CI). The single run-loop
|
||||
# already serializes by lock key; this guards against a stray second launcher.
|
||||
local lf="$LOCKS/$(_keyhash "$lockkey").lock"
|
||||
( "$FLOCK_BIN" -n 9 || exit 75; _run_agent ) 9>"$lf" >> "$logf" 2>&1
|
||||
rc=$?
|
||||
if [[ $rc -eq 75 ]]; then
|
||||
echo "lock busy (key=$lockkey) — requeued to inbox" >> "$logf"
|
||||
mv "$doing_file" "$INBOX/" 2>/dev/null
|
||||
{ echo "ended=$(date +%s)"; echo "result=requeued"; } >> "$metaf"
|
||||
return 0
|
||||
fi
|
||||
else
|
||||
( cd "$cwd" && "${AGENT_CMD[@]}" ) >> "$logf" 2>&1
|
||||
_run_agent >> "$logf" 2>&1
|
||||
rc=$?
|
||||
fi
|
||||
rc=$?
|
||||
|
||||
echo "ended=$(date +%s)" >> "$metaf"
|
||||
echo "exit=$rc" >> "$metaf"
|
||||
@ -228,28 +280,40 @@ cmd_run() {
|
||||
|
||||
while true; do
|
||||
local running; running=$(live_workers)
|
||||
# launch jobs while we have capacity and inbox files
|
||||
# launch jobs while we have capacity and an eligible inbox file
|
||||
while [[ "$running" -lt "$MAX_CONCURRENCY" ]]; do
|
||||
local next; next=$(ls -1 "$INBOX"/*.md 2>/dev/null | sort | head -1)
|
||||
# pick the oldest inbox file whose lock key is not currently busy, so two
|
||||
# jobs sharing a cwd (or `lock:` key) never run at once, regardless of --max.
|
||||
local busy; busy=$(busy_keys)
|
||||
local next="" cand cand_key
|
||||
while IFS= read -r cand; do
|
||||
[[ -n "$cand" ]] || continue
|
||||
cand_key=$(lock_key_for "$cand")
|
||||
if printf '%s\n' "$busy" | grep -qxF -- "$cand_key"; then continue; fi
|
||||
next="$cand"; break
|
||||
done < <(ls -1 "$INBOX"/*.md 2>/dev/null | sort)
|
||||
[[ -z "$next" ]] && break
|
||||
|
||||
local job; job=$(basename "$next"); job=${job%.md}
|
||||
local doing_file="$DOING/$(basename "$next")"
|
||||
mv "$next" "$doing_file"
|
||||
local w_eng w_cwd w_yolo
|
||||
local w_eng w_cwd w_yolo w_key
|
||||
w_eng=$(fm_get "$doing_file" engine "$DEFAULT_ENGINE")
|
||||
w_cwd=$(fm_get "$doing_file" cwd "$PWD")
|
||||
w_yolo=$(fm_get "$doing_file" yolo "true")
|
||||
w_key=$(lock_key_for "$doing_file")
|
||||
# write meta BEFORE launch (no pid yet), then append the worker pid from $!
|
||||
{
|
||||
echo "job=$job"
|
||||
echo "engine=$w_eng"
|
||||
echo "cwd=$w_cwd"
|
||||
echo "yolo=$w_yolo"
|
||||
echo "lock=$w_key"
|
||||
echo "started=$(date +%s)"
|
||||
} > "$STATE/$job.meta"
|
||||
run_worker "$doing_file" &
|
||||
echo "pid=$!" >> "$STATE/$job.meta"
|
||||
log "▶ launching $C_BOLD$job$C_RESET (engine=$w_eng)"
|
||||
log "▶ launching $C_BOLD$job$C_RESET (engine=$w_eng, lock=$w_key)"
|
||||
sleep 1
|
||||
running=$(live_workers)
|
||||
done
|
||||
@ -360,6 +424,7 @@ ${C_BOLD}TASK FRONTMATTER${C_RESET} (top of each .md)
|
||||
engine: devin
|
||||
cwd: /Users/you/code/repo
|
||||
yolo: true
|
||||
lock: my-repo # optional; defaults to cwd. Jobs sharing a key run serially
|
||||
---
|
||||
|
||||
${C_BOLD}ENV${C_RESET}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user