#!/usr/bin/env python3 """Silent-on-success Hermes health watchdog for ByteLyst. Designed for a Hermes no-agent cron job. It prints nothing when healthy and prints a concise Telegram-ready alert when an actionable problem is detected. """ from __future__ import annotations import os import shutil import subprocess import sys from datetime import datetime, timezone from pathlib import Path from urllib.parse import urlencode from urllib.request import Request, urlopen DISK_WARN_PERCENT = int(os.getenv("HERMES_WATCHDOG_DISK_WARN_PERCENT", "85")) MEMORY_WARN_PERCENT = int(os.getenv("HERMES_WATCHDOG_MEMORY_WARN_PERCENT", "90")) BACKUP_STALE_MINUTES = int(os.getenv("HERMES_WATCHDOG_BACKUP_STALE_MINUTES", "90")) BACKUP_JOB_NAME = os.getenv("HERMES_WATCHDOG_BACKUP_JOB_NAME", "Sync Hermes persistent-data backup to GitHub") GATEWAY_SERVICE = os.getenv("HERMES_WATCHDOG_GATEWAY_SERVICE", "hermes-gateway.service") SYSTEMD_SCOPE = os.getenv("HERMES_WATCHDOG_SYSTEMD_SCOPE", "system") INSTANCE_ID = os.getenv("HERMES_WATCHDOG_INSTANCE", "vijay") TELEGRAM_CONFIG = Path(os.getenv("HERMES_WATCHDOG_TELEGRAM_CONFIG", str(Path.home() / ".config/hermes/telegram"))) WATCHDOG_LOG = Path(os.getenv("HERMES_WATCHDOG_LOG_PATH", str(Path.home() / ".hermes/logs/hermes-health-watchdog.log"))) DASHBOARD_ALERT_LOG = Path(os.getenv("HERMES_DASHBOARD_ALERT_LOG", "/var/log/hermes-dashboard-warnings.log")) DASHBOARD_ALERT_STATE = Path(os.getenv("HERMES_DASHBOARD_ALERT_STATE", str(Path.home() / ".hermes/logs/dashboard-alerts.offset"))) ALERT_STATE = Path(os.getenv("HERMES_WATCHDOG_ALERT_STATE", str(Path.home() / ".hermes/logs/watchdog-alert-active"))) DOCKER_CONTAINERS = [ item.strip() for item in os.getenv("HERMES_WATCHDOG_DOCKER_CONTAINERS", "caddy,gitea-npm-registry").split(",") if item.strip() ] HERMES_HOME = Path(os.getenv("HERMES_HOME", str(Path.home() / ".hermes"))) def run(cmd: list[str], timeout: int = 20) -> subprocess.CompletedProcess[str]: return subprocess.run(cmd, text=True, capture_output=True, timeout=timeout, check=False) def utc_now() -> str: return datetime.now(timezone.utc).isoformat(timespec="seconds") def append_watchdog_log(severity: str, message: str) -> None: WATCHDOG_LOG.parent.mkdir(parents=True, exist_ok=True) with WATCHDOG_LOG.open("a", encoding="utf-8") as fh: fh.write(f"{utc_now()} {severity.upper()} {message}\n") def read_key_file(path: Path) -> dict[str, str]: values: dict[str, str] = {} try: for line in path.read_text(encoding="utf-8").splitlines(): key, sep, value = line.partition("=") if sep and key.strip() and value.strip(): values[key.strip()] = value.strip() except FileNotFoundError: return {} return values def telegram_credentials() -> tuple[str | None, str | None]: values = read_key_file(TELEGRAM_CONFIG) token = values.get("BOT_TOKEN") or values.get("TELEGRAM_BOT_TOKEN") chat_id = values.get("CHAT_ID") or values.get("TELEGRAM_CHAT_ID") return token, chat_id def send_telegram(message: str) -> bool: token, chat_id = telegram_credentials() if not token or not chat_id: return False data = urlencode({"chat_id": chat_id, "text": message}).encode("utf-8") req = Request(f"https://api.telegram.org/bot{token}/sendMessage", data=data, method="POST") try: with urlopen(req, timeout=10) as response: # noqa: S310 - token-protected Telegram API endpoint. return 200 <= response.status < 300 except Exception: return False def mark_alert_active() -> None: ALERT_STATE.parent.mkdir(parents=True, exist_ok=True) ALERT_STATE.write_text(utc_now(), encoding="utf-8") def clear_alert_active() -> bool: if not ALERT_STATE.exists(): return False try: ALERT_STATE.unlink() except FileNotFoundError: return False return True def read_dashboard_alerts() -> list[str]: if not DASHBOARD_ALERT_LOG.exists(): return [] try: previous = int(DASHBOARD_ALERT_STATE.read_text(encoding="utf-8").strip() or "0") except Exception: previous = 0 try: size = DASHBOARD_ALERT_LOG.stat().st_size start = previous if previous <= size else 0 with DASHBOARD_ALERT_LOG.open("r", encoding="utf-8") as fh: fh.seek(start) lines = [line.strip() for line in fh if line.strip()] offset = fh.tell() DASHBOARD_ALERT_STATE.parent.mkdir(parents=True, exist_ok=True) DASHBOARD_ALERT_STATE.write_text(str(offset), encoding="utf-8") except Exception: return [] routed: list[str] = [] for line in lines: if f"instance={INSTANCE_ID}" in line or "instance=all" in line: routed.append(line) return routed def check_gateway(alerts: list[str]) -> None: cmd = ["systemctl", "--user", "is-active", GATEWAY_SERVICE] if SYSTEMD_SCOPE == "user" else ["systemctl", "is-active", GATEWAY_SERVICE] result = run(cmd) if result.stdout.strip() != "active": alerts.append(f"gateway service `{GATEWAY_SERVICE}` is not active: `{result.stdout.strip() or result.stderr.strip() or 'unknown'}`") def check_backup_cron(alerts: list[str]) -> None: if not BACKUP_JOB_NAME: return result = run(["hermes", "cron", "list"], timeout=30) out = result.stdout + result.stderr if result.returncode != 0: alerts.append(f"`hermes cron list` failed with exit {result.returncode}") return if BACKUP_JOB_NAME not in out: alerts.append(f"backup cron job `{BACKUP_JOB_NAME}` was not found") return if "Last run:" not in out or " ok" not in out: alerts.append("backup cron last-run status is not visibly `ok` in `hermes cron list`") script_path = HERMES_HOME / "scripts" / "sync_hermes_persistent_backup.py" if script_path.exists(): age_minutes = (datetime.now(timezone.utc).timestamp() - script_path.stat().st_mtime) / 60 # Script mtime is not backup freshness; keep this as a weak sanity note only. if age_minutes < 0: alerts.append("backup sync script has a future modification time") def check_backup_repo_freshness(alerts: list[str]) -> None: repo = Path(os.getenv("HERMES_WATCHDOG_BACKUP_REPO", str(HERMES_HOME / "persistent_backup_repo"))) candidates = [repo, Path.home() / "hermes_persistent_backup", Path.home() / "hermes_persistent_backup_repo"] existing = next((p for p in candidates if (p / ".git").exists()), None) if not existing: # The backup cron may use its own path; cron status is the primary check. return result = run(["git", "-C", str(existing), "log", "-1", "--format=%ct"], timeout=20) if result.returncode != 0 or not result.stdout.strip().isdigit(): alerts.append(f"could not inspect backup repo freshness at `{existing}`") return age_minutes = (datetime.now(timezone.utc).timestamp() - int(result.stdout.strip())) / 60 if age_minutes > BACKUP_STALE_MINUTES: alerts.append(f"backup repo `{existing}` latest commit is stale: {age_minutes:.0f} minutes old") def check_disk(alerts: list[str]) -> None: usage = shutil.disk_usage("/") pct = int(round((usage.used / usage.total) * 100)) if pct >= DISK_WARN_PERCENT: alerts.append(f"root disk usage is high: {pct}% used (threshold {DISK_WARN_PERCENT}%)") def check_memory(alerts: list[str]) -> None: meminfo: dict[str, int] = {} for line in Path("/proc/meminfo").read_text(encoding="utf-8").splitlines(): parts = line.split() if len(parts) >= 2: meminfo[parts[0].rstrip(":")] = int(parts[1]) total = meminfo.get("MemTotal", 0) available = meminfo.get("MemAvailable", 0) if total <= 0 or available <= 0: alerts.append("could not read memory pressure from /proc/meminfo") return used_pct = int(round(((total - available) / total) * 100)) if used_pct >= MEMORY_WARN_PERCENT: alerts.append(f"memory pressure is high: {used_pct}% used (threshold {MEMORY_WARN_PERCENT}%)") def check_docker_containers(alerts: list[str]) -> None: if not DOCKER_CONTAINERS: return docker = shutil.which("docker") if not docker: alerts.append("docker executable not found; cannot verify critical containers") return result = run([docker, "ps", "--format", "{{.Names}}"], timeout=20) if result.returncode != 0: alerts.append(f"`docker ps` failed while checking critical containers: {result.stderr.strip() or result.stdout.strip()}") return running = set(result.stdout.splitlines()) missing = [name for name in DOCKER_CONTAINERS if name not in running] if missing: alerts.append(f"critical Docker container(s) not running: {', '.join(missing)}") def main() -> int: alerts: list[str] = [] for check in ( check_gateway, check_backup_cron, check_backup_repo_freshness, check_disk, check_memory, check_docker_containers, ): try: check(alerts) except Exception as exc: # noqa: BLE001 - watchdog should alert, not crash silently alerts.append(f"{check.__name__} errored: {exc}") alerts.extend(f"dashboard alert: {line}" for line in read_dashboard_alerts()) if alerts: header = f"ByteLyst Hermes watchdog alert ({INSTANCE_ID})" append_watchdog_log("WARNING", header) print("🚨 " + header) for item in alerts: append_watchdog_log("WARNING", item) print(f"- {item}") footer = ( "\nSuggested first checks: `systemctl status hermes-gateway --no-pager`, " "`hermes cron list`, `df -h /`, `free -h`, `docker ps`." ) print(footer) sent = send_telegram("🚨 " + header + "\n" + "\n".join(f"- {item}" for item in alerts) + footer) append_watchdog_log("INFO" if sent else "WARNING", "Telegram delivery succeeded" if sent else "Telegram delivery skipped or failed") mark_alert_active() return 0 recovered = clear_alert_active() if recovered: message = f"✅ ByteLyst Hermes watchdog recovery ({INSTANCE_ID})\nBack to healthy." sent = send_telegram(message) append_watchdog_log("INFO", "recovery: back to healthy") append_watchdog_log("INFO" if sent else "WARNING", "Telegram recovery delivery succeeded" if sent else "Telegram recovery delivery skipped or failed") else: append_watchdog_log("INFO", "healthy") return 0 if __name__ == "__main__": sys.exit(main())