feat(agent-queue): flag stalled workers in status + dash

Mark a running worker '⚠ stalled' when its log has not changed for more than
AGENT_QUEUE_STALL_MIN minutes (default 10), using log mtime as the freshness
signal. Implemented in both the bash status table and the Node dashboard.
This commit is contained in:
saravanakumardb1 2026-05-28 22:15:26 -07:00
parent 3b71f0117a
commit 79331d591f
2 changed files with 28 additions and 4 deletions

View File

@ -35,6 +35,9 @@ LOCKS="$QUEUE_ROOT/locks"
MAX_CONCURRENCY="${AGENT_QUEUE_MAX:-2}" MAX_CONCURRENCY="${AGENT_QUEUE_MAX:-2}"
DEFAULT_ENGINE="${AGENT_QUEUE_ENGINE:-devin}" DEFAULT_ENGINE="${AGENT_QUEUE_ENGINE:-devin}"
POLL_SECONDS="${AGENT_QUEUE_POLL:-3}" POLL_SECONDS="${AGENT_QUEUE_POLL:-3}"
# A running worker is flagged "stalled" if its log has not changed in this many
# minutes (no new agent output) — surfaced in status + dash.
STALL_MIN="${AGENT_QUEUE_STALL_MIN:-10}"
# flock is used for cross-process lock hardening when available (Linux). macOS # flock is used for cross-process lock hardening when available (Linux). macOS
# has no flock; mutual exclusion there relies on the single run-loop (see cmd_run). # has no flock; mutual exclusion there relies on the single run-loop (see cmd_run).
@ -103,6 +106,12 @@ lock_key_for() {
# _keyhash <key> -> stable filename-safe token for a lock key # _keyhash <key> -> stable filename-safe token for a lock key
_keyhash() { printf '%s' "$1" | cksum | awk '{print $1}'; } _keyhash() { printf '%s' "$1" | cksum | awk '{print $1}'; }
# _mtime <file> -> file modification time in epoch seconds (BSD or GNU stat); empty if missing
_mtime() {
[[ -e "$1" ]] || { echo ""; return; }
stat -f %m "$1" 2>/dev/null || stat -c %Y "$1" 2>/dev/null || echo ""
}
# _dur_to_secs <dur> -> seconds. Accepts 90, 90s, 45m, 2h, 1d. Invalid/empty -> 0. # _dur_to_secs <dur> -> seconds. Accepts 90, 90s, 45m, 2h, 1d. Invalid/empty -> 0.
_dur_to_secs() { _dur_to_secs() {
local d=$1 local d=$1
@ -397,14 +406,16 @@ cmd_status() {
local pid; pid=$(grep '^pid=' "$f" | cut -d= -f2) local pid; pid=$(grep '^pid=' "$f" | cut -d= -f2)
[[ -n "$pid" ]] && kill -0 "$pid" 2>/dev/null || continue [[ -n "$pid" ]] && kill -0 "$pid" 2>/dev/null || continue
if ! $printed; then printf ' %sRUNNING%s\n' "$C_BOLD" "$C_RESET"; printed=true; fi if ! $printed; then printf ' %sRUNNING%s\n' "$C_BOLD" "$C_RESET"; printed=true; fi
local job eng start now el last local job eng start now el last lmt age stall=""
job=$(grep '^job=' "$f" | cut -d= -f2) job=$(grep '^job=' "$f" | cut -d= -f2)
eng=$(grep '^engine=' "$f" | cut -d= -f2) eng=$(grep '^engine=' "$f" | cut -d= -f2)
start=$(grep '^started=' "$f" | cut -d= -f2) start=$(grep '^started=' "$f" | cut -d= -f2)
now=$(date +%s); el=$(( now - ${start:-$now} )) now=$(date +%s); el=$(( now - ${start:-$now} ))
last=$(tail -n 1 "$LOGS/$job.log" 2>/dev/null | cut -c1-60) last=$(tail -n 1 "$LOGS/$job.log" 2>/dev/null | cut -c1-60)
printf ' %s%-26s%s %-7s %3dm%02ds pid %-6s %s%s%s\n' \ lmt=$(_mtime "$LOGS/$job.log"); age=$(( now - ${lmt:-$now} ))
"$C_BOLD" "$job" "$C_RESET" "$eng" $((el/60)) $((el%60)) "$pid" "$C_DIM" "$last" "$C_RESET" [[ "$age" -gt $(( STALL_MIN * 60 )) ]] && stall="${C_RED}⚠ stalled${C_RESET} "
printf ' %s%-26s%s %-7s %3dm%02ds pid %-6s %s%s%s%s\n' \
"$C_BOLD" "$job" "$C_RESET" "$eng" $((el/60)) $((el%60)) "$pid" "$stall" "$C_DIM" "$last" "$C_RESET"
done done
$printed || printf ' %sno workers running%s\n' "$C_DIM" "$C_RESET" $printed || printf ' %sno workers running%s\n' "$C_DIM" "$C_RESET"
echo echo

View File

@ -23,6 +23,8 @@ const getArg = (flag, def) => {
}; };
const ROOT = path.resolve(getArg('--root', process.env.AGENT_QUEUE_ROOT || path.join(__dirname, 'queue'))); const ROOT = path.resolve(getArg('--root', process.env.AGENT_QUEUE_ROOT || path.join(__dirname, 'queue')));
const INTERVAL = Math.max(1, parseInt(getArg('--interval', '2'), 10)) * 1000; const INTERVAL = Math.max(1, parseInt(getArg('--interval', '2'), 10)) * 1000;
// A running worker is flagged stalled if its log has not changed in this many minutes.
const STALL_MIN = Math.max(1, parseInt(process.env.AGENT_QUEUE_STALL_MIN || '10', 10));
const DIRS = { const DIRS = {
inbox: path.join(ROOT, 'inbox'), inbox: path.join(ROOT, 'inbox'),
@ -72,6 +74,14 @@ const lastLogLine = (job) => {
} catch { return ''; } } catch { return ''; }
}; };
// seconds since a job's log was last modified (no new agent output); null if no log
const logAgeSec = (job) => {
try {
const mt = fs.statSync(path.join(DIRS.logs, `${job}.log`)).mtimeMs;
return Math.max(0, Math.floor((Date.now() - mt) / 1000));
} catch { return null; }
};
const fmtElapsed = (startSec) => { const fmtElapsed = (startSec) => {
if (!startSec) return ' -- '; if (!startSec) return ' -- ';
const s = Math.max(0, Math.floor(Date.now() / 1000) - Number(startSec)); const s = Math.max(0, Math.floor(Date.now() / 1000) - Number(startSec));
@ -128,11 +138,14 @@ function render() {
for (const m of running) { for (const m of running) {
const eng = m.engine || '?'; const eng = m.engine || '?';
const engC = ENGINE_COLOR[eng] || 'gray'; const engC = ENGINE_COLOR[eng] || 'gray';
const age = logAgeSec(m.job);
const stalled = age !== null && age > STALL_MIN * 60;
const line = const line =
` ${c('bold', trunc(m.job || '?', 30).padEnd(30))} ` + ` ${c('bold', trunc(m.job || '?', 30).padEnd(30))} ` +
`${c(engC, eng.padEnd(7))} ` + `${c(engC, eng.padEnd(7))} ` +
`${fmtElapsed(m.started).padStart(7)} ` + `${fmtElapsed(m.started).padStart(7)} ` +
`${c('gray', 'pid ' + (m.pid || '?'))}`; `${c('gray', 'pid ' + (m.pid || '?'))}` +
`${stalled ? ' ' + c('red', '⚠ stalled') : ''}`;
out.push(line); out.push(line);
out.push(` ${c('dim', trunc(shortPath(m.cwd || ''), 70))}`); out.push(` ${c('dim', trunc(shortPath(m.cwd || ''), 70))}`);
const last = lastLogLine(m.job); const last = lastLogLine(m.job);