Google Drive 上传文件
Google Drive 上传文件
第一步:启用 Google Drive API 并获取凭证
-
创建一个项目(或选择已有项目) 必须得搞一个项目,才能启用 Google Drive API。 有个地方让你选 , 选 ”Google Drive API“
3.导航到 API & Services > Library,启用 Google Drive API 在 搜索框 搜索 “Google Drive API”, 进去,点击 Enable(启用)
4.然后进入 Credentials > Create Credentials > OAuth client ID • 应用类型选:Desktop App • 下载 JSON 文件,保存为 credentials.json
一定要选择 Desktop App(桌面应用)
第二步:安装依赖包
pip install --upgrade google-api-python-client google-auth-httplib2 google-auth-oauthlib
第三步:运行代码
需要注意:
- 文件目录的获取
你的 链接长这样 https://drive.google.com/drive/u/0/folders/1A2b3C4d5E6fG7h8I9j0k1l2m3n4o5p
那么 1A2b3C4d5E6fG7h8I9j0k1l2m3n4o5p 这个就是你的文件目录
- 每次上传得重新授权, 代码
authenticate部分
service = authenticate() # 不可复用,全局定义之后会导致ssl错误
- 首次运行代码, 会弹出授权页面, 允许授权, 授权之后会在本地生成一个
token.json文件, 下次运行代码, 就不需要授权了
以下是多线程上传代码.
from concurrent.futures import ThreadPoolExecutor, as_completed
import glob
import os
from googleapiclient.discovery import build
from googleapiclient.http import MediaFileUpload
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from pathlib import Path
from tqdm import tqdm
# 设置权限范围
SCOPES = ["https://www.googleapis.com/auth/drive"]
folder_id = "1A2b3C4d5E6fG7h8I9j0k1l2m3n4o5p" # 替换为目标文件夹ID
failed_map = {}
def authenticate():
"""进行身份验证并返回服务对象"""
creds = None
# 检查是否存在token.json文件
if os.path.exists("token.json"):
creds = Credentials.from_authorized_user_file("token.json", SCOPES)
# 如果没有有效的凭据,让用户登录
if not creds or not creds.valid:
flow = InstalledAppFlow.from_client_secrets_file("credentials.json", SCOPES)
creds = flow.run_local_server(port=0)
# 保存凭据供下次使用
with open("token.json", "w") as token:
token.write(creds.to_json())
return build("drive", "v3", credentials=creds)
def upload_file_to_folder(file_path):
"""上传文件到指定文件夹"""
service = authenticate() # 不可复用!
# file_name = os.path.basename(file_path)
file_name = Path(file_path).absolute().__str__()
print(f"准备上传: {file_name}")
# 创建文件元数据
try:
file_metadata = {"name": file_name, "parents": [folder_id]}
# 创建媒体文件上传对象
media = MediaFileUpload(file_path, mimetype="application/gzip", resumable=True)
# 上传文件
file = (
service.files()
.create(body=file_metadata, media_body=media, fields="id")
.execute()
)
if media and media._fd:
media._fd.close()
if file is None:
error_time = failed_map.get(file_path)
if not error_time:
failed_map[file_path] = 1
elif error_time >= 3:
print("上传失败,请检查文件")
return
else:
failed_map[file_path] += 1
print(f"上传失败: {file_path},重新上传")
upload_file_to_folder(file_path)
else:
print(f"文件已上传,ID: {file}")
except Exception as e:
print(f"Error: {e}")
def main():
files = glob.glob(f"./logs/**/*.gz", recursive=True)
with ThreadPoolExecutor(max_workers=4) as executor:
# list(tqdm(executor.map(save_from_file, files), total=len(files)))
futures = {executor.submit(upload_file_to_folder, f): f for f in files}
with tqdm(total=len(files)) as bar:
for future in as_completed(futures):
try:
future.result() # 获取结果(会抛出异常)
except Exception as e:
f = futures[future]
print(f"\nError processing {f}: {e}")
finally:
bar.update(1)
if __name__ == "__main__":
main()
可后台运行
nohup python upload-file-to-google.py > nohup.out 2>&1 &
tail -f nohup.out