bytelyst-devops-tools/scripts/hermes-emergency-bundle-upload-drive.py

178 lines
6.3 KiB
Python
Executable File

#!/usr/bin/env python3
"""Create encrypted Hermes emergency bundles and upload them to Google Drive.
This script uploads only .gpg encrypted bundles. It never uploads plaintext
staging directories or decrypted files.
"""
from __future__ import annotations
import argparse
import os
from pathlib import Path
import subprocess
import sys
import time
from google.auth.transport.requests import Request
from google.oauth2 import credentials as user_credentials
from google.oauth2 import service_account
from googleapiclient.discovery import build
from googleapiclient.http import MediaFileUpload
ROOT = Path("/root/repos/learning_ai_devops_tools")
CREATE_SCRIPT = ROOT / "scripts/hermes-emergency-bundle-create.sh"
KEY_FILE = Path("/root/.config/hermes-google-drive/service-account.json")
USER_TOKEN_FILE = Path("/root/.config/hermes-google-drive/user-token.json")
DEFAULT_OUT = Path("/root/hermes-emergency-bundles")
FOLDERS = {
"vijay": "1KIlSJzpf5fuaH5LYvfbLsUbOSYY23YGm",
"bheem": "1Ac5cbDC0dSWas8LeeWe_9XFqCquz7kZT",
}
def run(cmd: list[str], env: dict[str, str] | None = None) -> str:
proc = subprocess.run(
cmd,
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
env=env,
check=False,
)
if proc.returncode != 0:
raise RuntimeError(f"command failed ({proc.returncode}): {' '.join(cmd)}\n{proc.stdout}")
return proc.stdout
def drive_service(auth_mode: str):
scopes = ["https://www.googleapis.com/auth/drive.file"]
if auth_mode == "user":
if not USER_TOKEN_FILE.exists():
raise RuntimeError(
f"missing OAuth user token: {USER_TOKEN_FILE}. "
"Run hermes-google-drive-oauth-login.py first."
)
creds = user_credentials.Credentials.from_authorized_user_file(str(USER_TOKEN_FILE), scopes=scopes)
if creds.expired and creds.refresh_token:
creds.refresh(Request())
USER_TOKEN_FILE.write_text(creds.to_json())
USER_TOKEN_FILE.chmod(0o600)
elif auth_mode == "service-account":
creds = service_account.Credentials.from_service_account_file(str(KEY_FILE), scopes=scopes)
else:
raise ValueError(f"unknown auth mode: {auth_mode}")
return build("drive", "v3", credentials=creds, cache_discovery=False)
def create_bundle(out_dir: Path) -> Path:
before = set(out_dir.glob("*.tar.zst.gpg")) if out_dir.exists() else set()
output = run([str(CREATE_SCRIPT), str(out_dir)])
after = set(out_dir.glob("*.tar.zst.gpg"))
created = sorted(after - before, key=lambda p: p.stat().st_mtime, reverse=True)
if created:
return created[0]
for line in output.splitlines():
marker = "Encrypted emergency bundle created:"
if marker in line:
return Path(line.split(marker, 1)[1].strip())
raise RuntimeError("bundle script did not report a created bundle")
def upload_file(service, bundle: Path, label: str, folder_id: str) -> str:
metadata = {
"name": bundle.name,
"parents": [folder_id],
"description": f"Hermes encrypted emergency bundle for {label}; uploaded {time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())}",
}
media = MediaFileUpload(str(bundle), mimetype="application/octet-stream", resumable=True)
result = (
service.files()
.create(
body=metadata,
media_body=media,
fields="id,name,webViewLink",
supportsAllDrives=True,
)
.execute()
)
return result["id"]
def cleanup_remote(service, folder_id: str, keep: int, dry_run: bool) -> list[str]:
query = (
f"'{folder_id}' in parents and trashed = false "
"and name contains 'hermes-emergency-bundle-' "
"and name contains '.tar.zst.gpg'"
)
files = (
service.files()
.list(
q=query,
fields="files(id,name,createdTime)",
orderBy="createdTime desc",
pageSize=1000,
supportsAllDrives=True,
includeItemsFromAllDrives=True,
)
.execute()
.get("files", [])
)
deleted = []
for item in files[keep:]:
deleted.append(item["name"])
if not dry_run:
service.files().delete(fileId=item["id"], supportsAllDrives=True).execute()
return deleted
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--target", choices=["vijay", "bheem", "both"], default="both")
parser.add_argument(
"--auth-mode",
choices=["user", "service-account"],
default=os.environ.get("HERMES_DRIVE_AUTH_MODE", "user"),
help="Use user OAuth for personal Drive; service-account only works with Shared Drives or delegated Workspace setups.",
)
parser.add_argument("--out-dir", default=str(DEFAULT_OUT))
parser.add_argument("--keep", type=int, default=12, help="encrypted bundles to retain per Drive folder")
parser.add_argument("--dry-run", action="store_true", help="create bundle but do not upload/delete remote files")
return parser.parse_args()
def main() -> int:
args = parse_args()
if args.auth_mode == "service-account" and not KEY_FILE.exists():
raise SystemExit(f"missing service account key: {KEY_FILE}")
if not CREATE_SCRIPT.exists():
raise SystemExit(f"missing create script: {CREATE_SCRIPT}")
out_dir = Path(args.out_dir)
bundle = create_bundle(out_dir)
service = drive_service(args.auth_mode)
targets = ["vijay", "bheem"] if args.target == "both" else [args.target]
for target in targets:
folder_id = FOLDERS[target]
if args.dry_run:
print(f"DRY RUN: would upload {bundle.name} to {target} folder {folder_id}")
deleted = cleanup_remote(service, folder_id, args.keep, dry_run=True)
else:
file_id = upload_file(service, bundle, target, folder_id)
print(f"Uploaded encrypted bundle to {target}: file_id={file_id}")
deleted = cleanup_remote(service, folder_id, args.keep, dry_run=False)
if deleted:
print(f"Retention cleanup for {target}: {len(deleted)} old encrypted bundle(s)")
return 0
if __name__ == "__main__":
try:
raise SystemExit(main())
except Exception as exc:
print(f"Drive upload FAILED: {exc}", file=sys.stderr)
raise SystemExit(1)