#!/usr/bin/env python3 """Create encrypted Hermes emergency bundles and upload them to Google Drive. This script uploads only .gpg encrypted bundles. It never uploads plaintext staging directories or decrypted files. """ from __future__ import annotations import argparse import os from pathlib import Path import subprocess import sys import time from google.auth.transport.requests import Request from google.oauth2 import credentials as user_credentials from google.oauth2 import service_account from googleapiclient.discovery import build from googleapiclient.http import MediaFileUpload ROOT = Path("/root/repos/learning_ai_devops_tools") CREATE_SCRIPT = ROOT / "scripts/hermes-emergency-bundle-create.sh" KEY_FILE = Path("/root/.config/hermes-google-drive/service-account.json") USER_TOKEN_FILE = Path("/root/.config/hermes-google-drive/user-token.json") DEFAULT_OUT = Path("/root/hermes-emergency-bundles") FOLDERS = { "vijay": "1KIlSJzpf5fuaH5LYvfbLsUbOSYY23YGm", "bheem": "1Ac5cbDC0dSWas8LeeWe_9XFqCquz7kZT", } def run(cmd: list[str], env: dict[str, str] | None = None) -> str: proc = subprocess.run( cmd, text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=env, check=False, ) if proc.returncode != 0: raise RuntimeError(f"command failed ({proc.returncode}): {' '.join(cmd)}\n{proc.stdout}") return proc.stdout def drive_service(auth_mode: str): scopes = ["https://www.googleapis.com/auth/drive.file"] if auth_mode == "user": if not USER_TOKEN_FILE.exists(): raise RuntimeError( f"missing OAuth user token: {USER_TOKEN_FILE}. " "Run hermes-google-drive-oauth-login.py first." ) creds = user_credentials.Credentials.from_authorized_user_file(str(USER_TOKEN_FILE), scopes=scopes) if creds.expired and creds.refresh_token: creds.refresh(Request()) USER_TOKEN_FILE.write_text(creds.to_json()) USER_TOKEN_FILE.chmod(0o600) elif auth_mode == "service-account": creds = service_account.Credentials.from_service_account_file(str(KEY_FILE), scopes=scopes) else: raise ValueError(f"unknown auth mode: {auth_mode}") return build("drive", "v3", credentials=creds, cache_discovery=False) def create_bundle(out_dir: Path) -> Path: before = set(out_dir.glob("*.tar.zst.gpg")) if out_dir.exists() else set() output = run([str(CREATE_SCRIPT), str(out_dir)]) after = set(out_dir.glob("*.tar.zst.gpg")) created = sorted(after - before, key=lambda p: p.stat().st_mtime, reverse=True) if created: return created[0] for line in output.splitlines(): marker = "Encrypted emergency bundle created:" if marker in line: return Path(line.split(marker, 1)[1].strip()) raise RuntimeError("bundle script did not report a created bundle") def upload_file(service, bundle: Path, label: str, folder_id: str) -> str: metadata = { "name": bundle.name, "parents": [folder_id], "description": f"Hermes encrypted emergency bundle for {label}; uploaded {time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())}", } media = MediaFileUpload(str(bundle), mimetype="application/octet-stream", resumable=True) result = ( service.files() .create( body=metadata, media_body=media, fields="id,name,webViewLink", supportsAllDrives=True, ) .execute() ) return result["id"] def cleanup_remote(service, folder_id: str, keep: int, dry_run: bool) -> list[str]: query = ( f"'{folder_id}' in parents and trashed = false " "and name contains 'hermes-emergency-bundle-' " "and name contains '.tar.zst.gpg'" ) files = ( service.files() .list( q=query, fields="files(id,name,createdTime)", orderBy="createdTime desc", pageSize=1000, supportsAllDrives=True, includeItemsFromAllDrives=True, ) .execute() .get("files", []) ) deleted = [] for item in files[keep:]: deleted.append(item["name"]) if not dry_run: service.files().delete(fileId=item["id"], supportsAllDrives=True).execute() return deleted def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--target", choices=["vijay", "bheem", "both"], default="both") parser.add_argument( "--auth-mode", choices=["user", "service-account"], default=os.environ.get("HERMES_DRIVE_AUTH_MODE", "user"), help="Use user OAuth for personal Drive; service-account only works with Shared Drives or delegated Workspace setups.", ) parser.add_argument("--out-dir", default=str(DEFAULT_OUT)) parser.add_argument("--keep", type=int, default=12, help="encrypted bundles to retain per Drive folder") parser.add_argument("--dry-run", action="store_true", help="create bundle but do not upload/delete remote files") return parser.parse_args() def main() -> int: args = parse_args() if args.auth_mode == "service-account" and not KEY_FILE.exists(): raise SystemExit(f"missing service account key: {KEY_FILE}") if not CREATE_SCRIPT.exists(): raise SystemExit(f"missing create script: {CREATE_SCRIPT}") out_dir = Path(args.out_dir) bundle = create_bundle(out_dir) service = drive_service(args.auth_mode) targets = ["vijay", "bheem"] if args.target == "both" else [args.target] for target in targets: folder_id = FOLDERS[target] if args.dry_run: print(f"DRY RUN: would upload {bundle.name} to {target} folder {folder_id}") deleted = cleanup_remote(service, folder_id, args.keep, dry_run=True) else: file_id = upload_file(service, bundle, target, folder_id) print(f"Uploaded encrypted bundle to {target}: file_id={file_id}") deleted = cleanup_remote(service, folder_id, args.keep, dry_run=False) if deleted: print(f"Retention cleanup for {target}: {len(deleted)} old encrypted bundle(s)") return 0 if __name__ == "__main__": try: raise SystemExit(main()) except Exception as exc: print(f"Drive upload FAILED: {exc}", file=sys.stderr) raise SystemExit(1)