144 lines
5.1 KiB
Python
Executable File
144 lines
5.1 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""Write a sanitized Hermes ops snapshot for the unified dashboard.
|
|
|
|
Run this as the Hermes instance owner (root for Vijay, uma for Bheem). It
|
|
writes booleans, counts, timestamps, and short Git metadata only. It never
|
|
copies tokens, state.db, logs, prompts, session content, or environment files.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import tempfile
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
|
|
HERMES_HOME = Path(os.getenv("HERMES_HOME", str(Path.home() / ".hermes")))
|
|
OUTPUT_PATH = Path(os.getenv("HERMES_OPS_EXPORT_PATH", str(HERMES_HOME / "ops-export.json")))
|
|
GATEWAY_SERVICE = os.getenv("HERMES_GATEWAY_SERVICE", "hermes-gateway.service")
|
|
DASHBOARD_SERVICE = os.getenv("HERMES_DASHBOARD_SERVICE", "hermes-root-dashboard.service")
|
|
BACKUP_TIMER = os.getenv("HERMES_BACKUP_TIMER", "hermes-root-backup.timer")
|
|
BACKUP_REPO = Path(os.getenv("HERMES_BACKUP_REPO", str(Path.home() / "repos" / "bytelyst_hostinger_hermes_vm")))
|
|
|
|
|
|
def run(cmd: list[str], cwd: Path | None = None, timeout: int = 10) -> tuple[bool, str]:
|
|
try:
|
|
result = subprocess.run(cmd, cwd=cwd, text=True, capture_output=True, timeout=timeout, check=False)
|
|
except (FileNotFoundError, subprocess.TimeoutExpired):
|
|
return False, ""
|
|
return True, result.stdout.strip()
|
|
|
|
|
|
def probe_active(unit: str) -> dict[str, Any]:
|
|
ran, out = run(["systemctl", "--user", "is-active", unit])
|
|
if not ran:
|
|
ran, out = run(["systemctl", "is-active", unit])
|
|
active = out == "active"
|
|
return {"active": active, "status": "up" if active else "down" if ran else "unknown"}
|
|
|
|
|
|
def probe_enabled(unit: str) -> bool:
|
|
ran, out = run(["systemctl", "--user", "is-enabled", unit])
|
|
if not ran:
|
|
ran, out = run(["systemctl", "is-enabled", unit])
|
|
return ran and out == "enabled"
|
|
|
|
|
|
def probe_timer(name: str) -> dict[str, Any]:
|
|
active = probe_active(name)
|
|
ran, out = run([
|
|
"systemctl",
|
|
"--user",
|
|
"show",
|
|
name,
|
|
"-p",
|
|
"NextElapseUSecRealtime",
|
|
"-p",
|
|
"LastTriggerUSec",
|
|
"--no-pager",
|
|
])
|
|
if not ran:
|
|
ran, out = run([
|
|
"systemctl",
|
|
"show",
|
|
name,
|
|
"-p",
|
|
"NextElapseUSecRealtime",
|
|
"-p",
|
|
"LastTriggerUSec",
|
|
"--no-pager",
|
|
])
|
|
props: dict[str, str | None] = {}
|
|
for line in out.splitlines() if ran else []:
|
|
key, _, value = line.partition("=")
|
|
props[key] = value or None
|
|
return {
|
|
"name": name,
|
|
"active": active["active"],
|
|
"status": active["status"],
|
|
"nextRun": props.get("NextElapseUSecRealtime"),
|
|
"lastRun": props.get("LastTriggerUSec"),
|
|
}
|
|
|
|
|
|
def probe_repo(path: Path) -> dict[str, Any]:
|
|
ran_head, head = run(["git", "rev-parse", "--short", "HEAD"], cwd=path)
|
|
ran_branch, branch = run(["git", "branch", "--show-current"], cwd=path)
|
|
ran_status, status = run(["git", "status", "--porcelain"], cwd=path)
|
|
ran_commit, last_commit = run(["git", "log", "-1", "--format=%cI"], cwd=path)
|
|
return {
|
|
"path": str(path),
|
|
"branch": branch if ran_branch and branch else None,
|
|
"clean": ran_status and status == "",
|
|
"head": head if ran_head and head else None,
|
|
"lastCommitAt": last_commit if ran_commit and last_commit else None,
|
|
"size": None,
|
|
"status": "up" if ran_head else "unknown",
|
|
}
|
|
|
|
|
|
def restore_stats(path: Path) -> dict[str, int | None]:
|
|
try:
|
|
manifest = json.loads((path / "hermes_persistent_backup" / "MANIFEST.json").read_text(encoding="utf-8"))
|
|
files = manifest.get("files")
|
|
file_count = len(files) if isinstance(files, list) else None
|
|
except Exception:
|
|
file_count = None
|
|
try:
|
|
jobs = json.loads((path / "hermes_persistent_backup" / "cron" / "jobs.json").read_text(encoding="utf-8"))
|
|
cron_jobs = jobs.get("jobs") if isinstance(jobs, dict) else jobs
|
|
cron_count = len(cron_jobs) if isinstance(cron_jobs, list) else None
|
|
except Exception:
|
|
cron_count = None
|
|
return {"restoredFileCount": file_count, "restoredCronJobs": cron_count}
|
|
|
|
|
|
def write_atomic(path: Path, payload: dict[str, Any]) -> None:
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
with tempfile.NamedTemporaryFile("w", encoding="utf-8", dir=path.parent, delete=False) as tmp:
|
|
json.dump(payload, tmp, indent=2, sort_keys=True)
|
|
tmp.write("\n")
|
|
tmp_path = Path(tmp.name)
|
|
tmp_path.replace(path)
|
|
path.chmod(0o644)
|
|
|
|
|
|
def main() -> int:
|
|
payload: dict[str, Any] = {
|
|
"generatedAt": subprocess.check_output(["date", "-u", "+%Y-%m-%dT%H:%M:%SZ"], text=True).strip(),
|
|
"gateway": {**probe_active(GATEWAY_SERVICE), "enabled": probe_enabled(GATEWAY_SERVICE)},
|
|
"dashboard": probe_active(DASHBOARD_SERVICE),
|
|
"backupTimer": probe_timer(BACKUP_TIMER),
|
|
"repo": probe_repo(BACKUP_REPO),
|
|
"googleWorkspaceToken": (HERMES_HOME / "google_token.json").is_file(),
|
|
}
|
|
payload.update(restore_stats(BACKUP_REPO))
|
|
write_atomic(OUTPUT_PATH, payload)
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|