#!/usr/bin/env python3 """Upload a single local file to the configured Google Drive folder.""" from __future__ import annotations import argparse from pathlib import Path import shutil import subprocess import sys import tempfile from google.auth.transport.requests import Request from google.oauth2.credentials import Credentials from googleapiclient.discovery import build from googleapiclient.http import MediaFileUpload TOKEN_FILE = Path("/root/.config/hermes-google-drive/user-token.json") PASSPHRASE_FILE = Path("/root/.config/hermes-google-drive/bundle-passphrase") SCOPES = ["https://www.googleapis.com/auth/drive.file"] FOLDERS = { "vijay": "1KIlSJzpf5fuaH5LYvfbLsUbOSYY23YGm", "bheem": "1Ac5cbDC0dSWas8LeeWe_9XFqCquz7kZT", } BLOCKED_NAMES = { ".env", "auth.json", ".git-credentials", "service-account.json", "oauth-client.json", "user-token.json", "bundle-passphrase", } BLOCKED_SUFFIXES = { ".pem", ".key", ".p12", ".pfx", ".db", ".db-wal", ".db-shm", } def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("path", help="Local file path to upload") parser.add_argument("--target", choices=sorted(FOLDERS), required=True) parser.add_argument("--name", help="Optional Drive filename") parser.add_argument("--encrypt", action="store_true", help="GPG-encrypt the file before upload") parser.add_argument("--allow-sensitive", action="store_true", help="Allow blocked sensitive-looking filenames") return parser.parse_args() def is_blocked(path: Path) -> bool: name = path.name if name in BLOCKED_NAMES: return True if any(name.endswith(suffix) for suffix in BLOCKED_SUFFIXES): return True if ".ssh" in path.parts or ".gnupg" in path.parts: return True return False def drive_service(): if not TOKEN_FILE.exists(): raise RuntimeError(f"missing Google Drive OAuth token: {TOKEN_FILE}") creds = Credentials.from_authorized_user_file(str(TOKEN_FILE), SCOPES) if creds.expired and creds.refresh_token: creds.refresh(Request()) TOKEN_FILE.write_text(creds.to_json()) TOKEN_FILE.chmod(0o600) return build("drive", "v3", credentials=creds, cache_discovery=False) def encrypt_file(src: Path, tmpdir: Path) -> Path: if not PASSPHRASE_FILE.exists(): raise RuntimeError(f"missing passphrase file: {PASSPHRASE_FILE}") out = tmpdir / f"{src.name}.gpg" cmd = [ "gpg", "--batch", "--yes", "--pinentry-mode", "loopback", "--passphrase-file", str(PASSPHRASE_FILE), "--symmetric", "--cipher-algo", "AES256", "--output", str(out), str(src), ] proc = subprocess.run(cmd, text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) if proc.returncode != 0: raise RuntimeError(f"gpg encryption failed: {proc.stdout}") return out def upload(path: Path, target: str, drive_name: str | None) -> dict: service = drive_service() metadata = {"name": drive_name or path.name, "parents": [FOLDERS[target]]} media = MediaFileUpload(str(path), mimetype="application/octet-stream", resumable=True) return ( service.files() .create(body=metadata, media_body=media, fields="id,name,size,webViewLink", supportsAllDrives=True) .execute() ) def main() -> int: args = parse_args() src = Path(args.path).expanduser().resolve() if not src.exists(): raise SystemExit(f"file not found: {src}") if not src.is_file(): raise SystemExit(f"refusing non-file path: {src}") if is_blocked(src) and not args.allow_sensitive: raise SystemExit(f"refusing sensitive-looking file without --allow-sensitive: {src.name}") upload_path = src cleanup_dir = None try: if args.encrypt: cleanup_dir = Path(tempfile.mkdtemp(prefix="drive-upload-")) upload_path = encrypt_file(src, cleanup_dir) result = upload(upload_path, args.target, args.name) size = result.get("size") or str(upload_path.stat().st_size) print(f"uploaded target={args.target} name={result.get('name')} size={size} file_id={result.get('id')}") return 0 finally: if cleanup_dir and cleanup_dir.exists(): shutil.rmtree(cleanup_dir) if __name__ == "__main__": try: raise SystemExit(main()) except Exception as exc: print(f"upload failed: {exc}", file=sys.stderr) raise SystemExit(1)