317 lines
13 KiB
Python
317 lines
13 KiB
Python
|
||
|
||
import os
|
||
import time
|
||
import subprocess
|
||
import json
|
||
import re
|
||
import random
|
||
import shutil
|
||
import sys
|
||
from pathlib import Path
|
||
from watchdog.observers import Observer
|
||
from watchdog.events import FileSystemEventHandler
|
||
from logger import get_system_logger, log_exception
|
||
|
||
# ==========================================
|
||
# 接口配置
|
||
# ==========================================
|
||
SESSION_DIR = r'./session' # 监控的工作区目录
|
||
CHECK_INTERVAL = 5 # 检查频率
|
||
BILIUP_PATH = shutil.which("biliup") or "./biliup"
|
||
CONFIG_FILE = "upload_config.json" # 配置文件路径
|
||
DONE_FLAG = "split_done.flag" # monitorSongs.py 生成的标记
|
||
UPLOAD_FLAG = "upload_done.flag" # 本脚本生成的完成标记
|
||
|
||
# 初始化日志
|
||
logger = get_system_logger('upload')
|
||
# ==========================================
|
||
|
||
class UploadConfig:
|
||
"""上传配置管理器"""
|
||
def __init__(self, config_path):
|
||
self.config_path = Path(config_path)
|
||
self.config = self.load_config()
|
||
|
||
def load_config(self):
|
||
try:
|
||
if not self.config_path.exists():
|
||
logger.error(f"配置文件不存在: {self.config_path}")
|
||
return self.get_default_config()
|
||
|
||
with open(self.config_path, 'r', encoding='utf-8') as f:
|
||
config = json.load(f)
|
||
logger.info(f"成功加载配置文件: {self.config_path}")
|
||
return config
|
||
except Exception as e:
|
||
log_exception(logger, e, "加载配置文件失败")
|
||
return self.get_default_config()
|
||
|
||
def get_default_config(self):
|
||
logger.warning("使用默认配置")
|
||
return {
|
||
"upload_settings": {
|
||
"tid": 31,
|
||
"copyright": 2,
|
||
"source": "直播回放",
|
||
"cover": ""
|
||
},
|
||
"template": {
|
||
"title": "{streamer}_{date}",
|
||
"description": "自动录制剪辑\n\n{songs_list}",
|
||
"tag": "翻唱,直播切片,唱歌,音乐",
|
||
"dynamic": ""
|
||
},
|
||
"streamers": {},
|
||
"quotes": [],
|
||
"filename_patterns": {"patterns": []}
|
||
}
|
||
|
||
def parse_filename(self, filename):
|
||
patterns = self.config.get("filename_patterns", {}).get("patterns", [])
|
||
for pattern_config in patterns:
|
||
regex = pattern_config.get("regex")
|
||
if not regex: continue
|
||
match = re.match(regex, filename)
|
||
if match:
|
||
data = match.groupdict()
|
||
date_format = pattern_config.get("date_format", "{date}")
|
||
try:
|
||
formatted_date = date_format.format(**data)
|
||
data['date'] = formatted_date
|
||
except KeyError: pass
|
||
logger.debug(f"文件名匹配成功: {pattern_config.get('name')} -> {data}")
|
||
return data
|
||
logger.warning(f"文件名未匹配任何模式: {filename}")
|
||
return {"streamer": filename, "date": ""}
|
||
|
||
def get_random_quote(self):
|
||
quotes = self.config.get("quotes", [])
|
||
if not quotes: return {"text": "", "author": ""}
|
||
return random.choice(quotes)
|
||
|
||
class UploadHandler(FileSystemEventHandler):
|
||
def __init__(self, config):
|
||
self.processing_sets = set()
|
||
self.config = config
|
||
|
||
def on_created(self, event):
|
||
src_path = event.src_path
|
||
if isinstance(src_path, bytes): src_path = src_path.decode('utf-8')
|
||
if not event.is_directory and src_path.lower().endswith(DONE_FLAG):
|
||
logger.debug(f"检测到切割完成标记: {src_path}")
|
||
self.handle_upload(Path(src_path))
|
||
|
||
def on_moved(self, event):
|
||
dest_path = event.dest_path
|
||
if isinstance(dest_path, bytes): dest_path = dest_path.decode('utf-8')
|
||
if not event.is_directory and dest_path.lower().endswith(DONE_FLAG):
|
||
logger.debug(f"检测到切割完成标记移动: {dest_path}")
|
||
self.handle_upload(Path(dest_path))
|
||
|
||
def _wait_exponential(self, retry_count, base_wait=300, max_wait=3600):
|
||
"""指数退避等待计算"""
|
||
# 计算等待时间:60, 120, 240... 最大 600秒
|
||
wait_time = min(base_wait * (2 ** retry_count), max_wait)
|
||
return wait_time
|
||
|
||
def handle_upload(self, flag_path):
|
||
work_dir = flag_path.parent
|
||
video_stem = work_dir.name
|
||
upload_done = work_dir / UPLOAD_FLAG
|
||
split_dir = work_dir / "split_video"
|
||
|
||
if upload_done.exists() or video_stem in self.processing_sets:
|
||
logger.debug(f"上传已完成或正在处理,跳过: {video_stem}")
|
||
return
|
||
|
||
logger.info("="*50)
|
||
logger.info(f"准备上传: {video_stem}")
|
||
logger.info("="*50)
|
||
self.processing_sets.add(video_stem)
|
||
|
||
try:
|
||
parsed = self.config.parse_filename(video_stem)
|
||
streamer = parsed.get('streamer', video_stem)
|
||
date = parsed.get('date', '')
|
||
|
||
songs_json = work_dir / "songs.json"
|
||
songs_txt = work_dir / "songs.txt"
|
||
songs_list = ""
|
||
song_count = 0
|
||
|
||
if songs_json.exists():
|
||
try:
|
||
with open(songs_json, 'r', encoding='utf-8') as f:
|
||
data = json.load(f)
|
||
song_count = len(data.get('songs', []))
|
||
except Exception: pass
|
||
|
||
if songs_txt.exists():
|
||
songs_list = songs_txt.read_text(encoding='utf-8').strip()
|
||
|
||
quote = self.config.get_random_quote()
|
||
template_vars = {
|
||
'streamer': streamer, 'date': date, 'song_count': song_count,
|
||
'songs_list': songs_list, 'daily_quote': quote.get('text', ''),
|
||
'quote_author': quote.get('author', '')
|
||
}
|
||
|
||
template = self.config.config.get('template', {})
|
||
title = template.get('title', '{streamer}_{date}').format(**template_vars)
|
||
description = template.get('description', '{songs_list}').format(**template_vars)
|
||
dynamic = template.get('dynamic', '').format(**template_vars)
|
||
|
||
streamers_config = self.config.config.get('streamers', {})
|
||
if streamer in streamers_config:
|
||
tags = streamers_config[streamer].get('tags', template.get('tag', ''))
|
||
else:
|
||
tags = template.get('tag', '翻唱,唱歌,音乐').format(**template_vars)
|
||
|
||
video_files = sorted([str(v) for v in split_dir.glob("*") if v.suffix.lower() in {'.mp4', '.mkv', '.mov', '.flv', '.ts'}])
|
||
|
||
if not video_files:
|
||
logger.error(f"切片目录 {split_dir} 内没找到视频")
|
||
return
|
||
|
||
upload_settings = self.config.config.get('upload_settings', {})
|
||
tid = upload_settings.get('tid', 31)
|
||
|
||
# 1. 刷新登录
|
||
subprocess.run([BILIUP_PATH, "renew"], shell=False, capture_output=True)
|
||
|
||
# 2. 准备分批
|
||
BATCH_SIZE = 5
|
||
logger.info(f"启动分批投稿 (总计 {len(video_files)} 个分片)...")
|
||
|
||
first_batch = video_files[:BATCH_SIZE]
|
||
remaining_batches = [video_files[i:i + BATCH_SIZE] for i in range(BATCH_SIZE, len(video_files), BATCH_SIZE)]
|
||
|
||
upload_cmd = [
|
||
BILIUP_PATH, "upload",
|
||
*first_batch,
|
||
"--title", title,
|
||
"--tid", str(tid),
|
||
"--tag", tags,
|
||
"--copyright", str(upload_settings.get('copyright', 2)),
|
||
"--source", upload_settings.get('source', '直播回放'),
|
||
"--desc", description
|
||
]
|
||
|
||
if dynamic: upload_cmd.extend(["--dynamic", dynamic])
|
||
cover = upload_settings.get('cover', '')
|
||
if cover and Path(cover).exists(): upload_cmd.extend(["--cover", cover])
|
||
|
||
bvid = None
|
||
MAX_ATTEMPTS = 5 # 定义最大尝试次数
|
||
|
||
# ==========================
|
||
# 阶段一:首批上传 (最多5次)
|
||
# ==========================
|
||
logger.info(f"正在上传第一批 ({len(first_batch)}个文件)...")
|
||
|
||
for attempt in range(1, MAX_ATTEMPTS + 1):
|
||
logger.info(f"首批上传尝试 [{attempt}/{MAX_ATTEMPTS}]...")
|
||
|
||
result = subprocess.run(upload_cmd, shell=False, capture_output=True, text=True, encoding='utf-8')
|
||
|
||
if result.returncode == 0:
|
||
bv_match = re.search(r'"bvid":"(BV[A-Za-z0-9]+)"', result.stdout)
|
||
if not bv_match: bv_match = re.search(r'(BV[A-Za-z0-9]+)', result.stdout)
|
||
|
||
if bv_match:
|
||
bvid = bv_match.group(1)
|
||
logger.info(f"第一批上传成功,BV 号: {bvid}")
|
||
(work_dir / "bvid.txt").write_text(bvid, encoding='utf-8')
|
||
break # 成功退出循环
|
||
else:
|
||
logger.warning(f"上传命令返回成功但未找到BVID (尝试 {attempt}/{MAX_ATTEMPTS})")
|
||
|
||
# 如果没有成功 (没有break)
|
||
if attempt < MAX_ATTEMPTS:
|
||
err_msg = result.stderr.strip()[-100:] if result.stderr else "无标准错误输出"
|
||
wait_time = self._wait_exponential(attempt - 1)
|
||
logger.error(f"第一批上传失败或未获取BVID,等待 {wait_time}秒后重试。错误片段: {err_msg}")
|
||
time.sleep(wait_time)
|
||
else:
|
||
logger.error("第一批上传已达到最大重试次数 (5次),中止本次任务。")
|
||
return # 彻底结束函数,不进行后续操作
|
||
|
||
# ==========================
|
||
# 阶段二:追加上传 (每批最多5次)
|
||
# ==========================
|
||
if bvid:
|
||
for idx, batch in enumerate(remaining_batches, 2):
|
||
logger.info(f"等待 45 秒冷却时间,准备上传第 {idx} 批...")
|
||
time.sleep(45)
|
||
|
||
batch_success = False
|
||
for attempt in range(1, MAX_ATTEMPTS + 1):
|
||
logger.info(f"正在追加第 {idx} 批 ({len(batch)}个) - 尝试 [{attempt}/{MAX_ATTEMPTS}]...")
|
||
|
||
append_cmd = [BILIUP_PATH, "append", "--vid", bvid, *batch]
|
||
res = subprocess.run(append_cmd, shell=False, capture_output=True, text=True, encoding='utf-8')
|
||
|
||
if res.returncode == 0:
|
||
logger.info(f"第 {idx} 批追加成功")
|
||
batch_success = True
|
||
break # 成功退出内层循环,进入下一批
|
||
|
||
# 如果失败
|
||
if attempt < MAX_ATTEMPTS:
|
||
err_msg = res.stderr.strip()[-100:] if res.stderr else "无标准错误输出"
|
||
wait_time = self._wait_exponential(attempt - 1)
|
||
logger.error(f"第 {idx} 批追加失败,等待 {wait_time}秒后重试。错误片段: {err_msg}")
|
||
time.sleep(wait_time)
|
||
|
||
if not batch_success:
|
||
logger.error(f"第 {idx} 批追加已达到最大重试次数 (5次)。为防止顺序错乱,中止后续上传。")
|
||
return # 某一批次彻底失败,停止整个流程
|
||
|
||
# 只有当所有循环都正常走完没有 return,才会执行到这里
|
||
logger.info(f"所有分片上传完成: {bvid}")
|
||
upload_done.touch()
|
||
|
||
# 清理
|
||
try:
|
||
if split_dir.exists(): shutil.rmtree(split_dir)
|
||
for ext in ['.mp4', '.mkv', '.mov', '.flv', '.ts']:
|
||
orig = work_dir / f"{video_stem}{ext}"
|
||
if orig.exists(): orig.unlink()
|
||
except Exception as e:
|
||
logger.error(f"清理空间失败: {e}")
|
||
else:
|
||
# 逻辑上如果第一阶段return了,这里不会执行;
|
||
# 但如果第一阶段break了但没bvid(理论上不可能,除非正则漏了),做个保险
|
||
logger.error("逻辑错误:流程继续但无BVID,上传中止")
|
||
|
||
except Exception as e:
|
||
log_exception(logger, e, "上传异常")
|
||
finally:
|
||
self.processing_sets.discard(video_stem)
|
||
|
||
def main():
|
||
path = Path(SESSION_DIR)
|
||
path.mkdir(parents=True, exist_ok=True)
|
||
logger.info("上传模块启动 (MaxRetry=5)")
|
||
|
||
config = UploadConfig(CONFIG_FILE)
|
||
handler = UploadHandler(config)
|
||
|
||
for sub_dir in path.iterdir():
|
||
if sub_dir.is_dir():
|
||
if (sub_dir / DONE_FLAG).exists() and not (sub_dir / UPLOAD_FLAG).exists():
|
||
handler.handle_upload(sub_dir / DONE_FLAG)
|
||
|
||
observer = Observer()
|
||
observer.schedule(handler, str(path), recursive=True)
|
||
observer.start()
|
||
try:
|
||
while True: time.sleep(5)
|
||
except KeyboardInterrupt:
|
||
observer.stop()
|
||
observer.join()
|
||
|
||
if __name__ == "__main__":
|
||
main()
|