bytelyst-devops-tools/scripts/google-drive-upload-file.py
2026-05-27 12:19:45 +00:00

146 lines
4.5 KiB
Python
Executable File

#!/usr/bin/env python3
"""Upload a single local file to the configured Google Drive folder."""
from __future__ import annotations
import argparse
from pathlib import Path
import shutil
import subprocess
import sys
import tempfile
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
from googleapiclient.http import MediaFileUpload
TOKEN_FILE = Path("/root/.config/hermes-google-drive/user-token.json")
PASSPHRASE_FILE = Path("/root/.config/hermes-google-drive/bundle-passphrase")
SCOPES = ["https://www.googleapis.com/auth/drive.file"]
FOLDERS = {
"vijay": "1KIlSJzpf5fuaH5LYvfbLsUbOSYY23YGm",
"bheem": "1Ac5cbDC0dSWas8LeeWe_9XFqCquz7kZT",
}
BLOCKED_NAMES = {
".env",
"auth.json",
".git-credentials",
"service-account.json",
"oauth-client.json",
"user-token.json",
"bundle-passphrase",
}
BLOCKED_SUFFIXES = {
".pem",
".key",
".p12",
".pfx",
".db",
".db-wal",
".db-shm",
}
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("path", help="Local file path to upload")
parser.add_argument("--target", choices=sorted(FOLDERS), required=True)
parser.add_argument("--name", help="Optional Drive filename")
parser.add_argument("--encrypt", action="store_true", help="GPG-encrypt the file before upload")
parser.add_argument("--allow-sensitive", action="store_true", help="Allow blocked sensitive-looking filenames")
return parser.parse_args()
def is_blocked(path: Path) -> bool:
name = path.name
if name in BLOCKED_NAMES:
return True
if any(name.endswith(suffix) for suffix in BLOCKED_SUFFIXES):
return True
if ".ssh" in path.parts or ".gnupg" in path.parts:
return True
return False
def drive_service():
if not TOKEN_FILE.exists():
raise RuntimeError(f"missing Google Drive OAuth token: {TOKEN_FILE}")
creds = Credentials.from_authorized_user_file(str(TOKEN_FILE), SCOPES)
if creds.expired and creds.refresh_token:
creds.refresh(Request())
TOKEN_FILE.write_text(creds.to_json())
TOKEN_FILE.chmod(0o600)
return build("drive", "v3", credentials=creds, cache_discovery=False)
def encrypt_file(src: Path, tmpdir: Path) -> Path:
if not PASSPHRASE_FILE.exists():
raise RuntimeError(f"missing passphrase file: {PASSPHRASE_FILE}")
out = tmpdir / f"{src.name}.gpg"
cmd = [
"gpg",
"--batch",
"--yes",
"--pinentry-mode",
"loopback",
"--passphrase-file",
str(PASSPHRASE_FILE),
"--symmetric",
"--cipher-algo",
"AES256",
"--output",
str(out),
str(src),
]
proc = subprocess.run(cmd, text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if proc.returncode != 0:
raise RuntimeError(f"gpg encryption failed: {proc.stdout}")
return out
def upload(path: Path, target: str, drive_name: str | None) -> dict:
service = drive_service()
metadata = {"name": drive_name or path.name, "parents": [FOLDERS[target]]}
media = MediaFileUpload(str(path), mimetype="application/octet-stream", resumable=True)
return (
service.files()
.create(body=metadata, media_body=media, fields="id,name,size,webViewLink", supportsAllDrives=True)
.execute()
)
def main() -> int:
args = parse_args()
src = Path(args.path).expanduser().resolve()
if not src.exists():
raise SystemExit(f"file not found: {src}")
if not src.is_file():
raise SystemExit(f"refusing non-file path: {src}")
if is_blocked(src) and not args.allow_sensitive:
raise SystemExit(f"refusing sensitive-looking file without --allow-sensitive: {src.name}")
upload_path = src
cleanup_dir = None
try:
if args.encrypt:
cleanup_dir = Path(tempfile.mkdtemp(prefix="drive-upload-"))
upload_path = encrypt_file(src, cleanup_dir)
result = upload(upload_path, args.target, args.name)
size = result.get("size") or str(upload_path.stat().st_size)
print(f"uploaded target={args.target} name={result.get('name')} size={size} file_id={result.get('id')}")
return 0
finally:
if cleanup_dir and cleanup_dir.exists():
shutil.rmtree(cleanup_dir)
if __name__ == "__main__":
try:
raise SystemExit(main())
except Exception as exc:
print(f"upload failed: {exc}", file=sys.stderr)
raise SystemExit(1)