146 lines
4.5 KiB
Python
Executable File
146 lines
4.5 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""Upload a single local file to the configured Google Drive folder."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
from pathlib import Path
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
|
|
from google.auth.transport.requests import Request
|
|
from google.oauth2.credentials import Credentials
|
|
from googleapiclient.discovery import build
|
|
from googleapiclient.http import MediaFileUpload
|
|
|
|
|
|
TOKEN_FILE = Path("/root/.config/hermes-google-drive/user-token.json")
|
|
PASSPHRASE_FILE = Path("/root/.config/hermes-google-drive/bundle-passphrase")
|
|
SCOPES = ["https://www.googleapis.com/auth/drive.file"]
|
|
FOLDERS = {
|
|
"vijay": "1KIlSJzpf5fuaH5LYvfbLsUbOSYY23YGm",
|
|
"bheem": "1Ac5cbDC0dSWas8LeeWe_9XFqCquz7kZT",
|
|
}
|
|
|
|
BLOCKED_NAMES = {
|
|
".env",
|
|
"auth.json",
|
|
".git-credentials",
|
|
"service-account.json",
|
|
"oauth-client.json",
|
|
"user-token.json",
|
|
"bundle-passphrase",
|
|
}
|
|
BLOCKED_SUFFIXES = {
|
|
".pem",
|
|
".key",
|
|
".p12",
|
|
".pfx",
|
|
".db",
|
|
".db-wal",
|
|
".db-shm",
|
|
}
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(description=__doc__)
|
|
parser.add_argument("path", help="Local file path to upload")
|
|
parser.add_argument("--target", choices=sorted(FOLDERS), required=True)
|
|
parser.add_argument("--name", help="Optional Drive filename")
|
|
parser.add_argument("--encrypt", action="store_true", help="GPG-encrypt the file before upload")
|
|
parser.add_argument("--allow-sensitive", action="store_true", help="Allow blocked sensitive-looking filenames")
|
|
return parser.parse_args()
|
|
|
|
|
|
def is_blocked(path: Path) -> bool:
|
|
name = path.name
|
|
if name in BLOCKED_NAMES:
|
|
return True
|
|
if any(name.endswith(suffix) for suffix in BLOCKED_SUFFIXES):
|
|
return True
|
|
if ".ssh" in path.parts or ".gnupg" in path.parts:
|
|
return True
|
|
return False
|
|
|
|
|
|
def drive_service():
|
|
if not TOKEN_FILE.exists():
|
|
raise RuntimeError(f"missing Google Drive OAuth token: {TOKEN_FILE}")
|
|
creds = Credentials.from_authorized_user_file(str(TOKEN_FILE), SCOPES)
|
|
if creds.expired and creds.refresh_token:
|
|
creds.refresh(Request())
|
|
TOKEN_FILE.write_text(creds.to_json())
|
|
TOKEN_FILE.chmod(0o600)
|
|
return build("drive", "v3", credentials=creds, cache_discovery=False)
|
|
|
|
|
|
def encrypt_file(src: Path, tmpdir: Path) -> Path:
|
|
if not PASSPHRASE_FILE.exists():
|
|
raise RuntimeError(f"missing passphrase file: {PASSPHRASE_FILE}")
|
|
out = tmpdir / f"{src.name}.gpg"
|
|
cmd = [
|
|
"gpg",
|
|
"--batch",
|
|
"--yes",
|
|
"--pinentry-mode",
|
|
"loopback",
|
|
"--passphrase-file",
|
|
str(PASSPHRASE_FILE),
|
|
"--symmetric",
|
|
"--cipher-algo",
|
|
"AES256",
|
|
"--output",
|
|
str(out),
|
|
str(src),
|
|
]
|
|
proc = subprocess.run(cmd, text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
|
|
if proc.returncode != 0:
|
|
raise RuntimeError(f"gpg encryption failed: {proc.stdout}")
|
|
return out
|
|
|
|
|
|
def upload(path: Path, target: str, drive_name: str | None) -> dict:
|
|
service = drive_service()
|
|
metadata = {"name": drive_name or path.name, "parents": [FOLDERS[target]]}
|
|
media = MediaFileUpload(str(path), mimetype="application/octet-stream", resumable=True)
|
|
return (
|
|
service.files()
|
|
.create(body=metadata, media_body=media, fields="id,name,size,webViewLink", supportsAllDrives=True)
|
|
.execute()
|
|
)
|
|
|
|
|
|
def main() -> int:
|
|
args = parse_args()
|
|
src = Path(args.path).expanduser().resolve()
|
|
if not src.exists():
|
|
raise SystemExit(f"file not found: {src}")
|
|
if not src.is_file():
|
|
raise SystemExit(f"refusing non-file path: {src}")
|
|
if is_blocked(src) and not args.allow_sensitive:
|
|
raise SystemExit(f"refusing sensitive-looking file without --allow-sensitive: {src.name}")
|
|
|
|
upload_path = src
|
|
cleanup_dir = None
|
|
try:
|
|
if args.encrypt:
|
|
cleanup_dir = Path(tempfile.mkdtemp(prefix="drive-upload-"))
|
|
upload_path = encrypt_file(src, cleanup_dir)
|
|
result = upload(upload_path, args.target, args.name)
|
|
size = result.get("size") or str(upload_path.stat().st_size)
|
|
print(f"uploaded target={args.target} name={result.get('name')} size={size} file_id={result.get('id')}")
|
|
return 0
|
|
finally:
|
|
if cleanup_dir and cleanup_dir.exists():
|
|
shutil.rmtree(cleanup_dir)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
raise SystemExit(main())
|
|
except Exception as exc:
|
|
print(f"upload failed: {exc}", file=sys.stderr)
|
|
raise SystemExit(1)
|