235 lines
8.9 KiB
Python
Executable File
235 lines
8.9 KiB
Python
Executable File
import json
|
||
import time
|
||
import requests
|
||
import re
|
||
import shutil
|
||
import subprocess
|
||
from pathlib import Path
|
||
from logger import get_system_logger, log_exception
|
||
|
||
# ================= 配置区域 =================
|
||
SESSION_DIR = Path("./session")
|
||
COOKIE_FILE = Path("./cookies.json")
|
||
|
||
# 【这里填你 B 站网页上看到的合集 ID】
|
||
# 脚本会自动根据这两个 ID 去查找对应的 Section ID (小节ID)
|
||
SEASON_ID_A = 7196643 # 合集 A (同名视频)
|
||
SEASON_ID_B = 7196624 # 合集 B (Upload切片)
|
||
|
||
# 自动寻找 biliup
|
||
BILIUP_PATH = shutil.which("biliup") or "biliup"
|
||
# ===========================================
|
||
|
||
logger = get_system_logger("collection_manager")
|
||
|
||
class BiliCollectionClient:
|
||
def __init__(self):
|
||
if not COOKIE_FILE.exists():
|
||
raise FileNotFoundError(f"Cookies 文件不存在: {COOKIE_FILE}")
|
||
|
||
with open(COOKIE_FILE, "r", encoding="utf-8") as f:
|
||
data = json.load(f)
|
||
|
||
# 兼容处理 cookie 格式
|
||
if "cookie_info" in data:
|
||
self.cookies = {c["name"]: c["value"] for c in data.get("cookie_info", {}).get("cookies", [])}
|
||
else:
|
||
self.cookies = data
|
||
|
||
self.csrf = self.cookies.get("bili_jct")
|
||
if not self.csrf:
|
||
raise ValueError("Cookie 中缺少 bili_jct (CSRF Token)")
|
||
|
||
self.session = requests.Session()
|
||
self.session.cookies.update(self.cookies)
|
||
|
||
# 使用你测试成功的 Headers
|
||
self.session.headers.update({
|
||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
|
||
"Referer": "https://member.bilibili.com/platform/upload-manager/distribution"
|
||
})
|
||
|
||
def get_video_info(self, bvid):
|
||
"""通过 BVID 获取 AID, CID 和 Title"""
|
||
url = "https://api.bilibili.com/x/web-interface/view"
|
||
try:
|
||
res = self.session.get(url, params={"bvid": bvid}, timeout=10).json()
|
||
if res["code"] != 0:
|
||
logger.error(f"查询视频信息失败 [{bvid}]: {res['message']}")
|
||
return None
|
||
|
||
data = res["data"]
|
||
return {
|
||
"aid": data["aid"],
|
||
"cid": data["cid"],
|
||
"title": data["title"]
|
||
}
|
||
except Exception as e:
|
||
logger.error(f"获取视频信息异常: {e}")
|
||
return None
|
||
|
||
def resolve_section_id(self, target_season_id):
|
||
"""
|
||
【关键逻辑】通过 Season ID (合集ID) 查找 Section ID (小节ID)
|
||
"""
|
||
url = "https://member.bilibili.com/x2/creative/web/seasons"
|
||
params = {"pn": 1, "ps": 50} # 获取前50个合集
|
||
|
||
try:
|
||
res = self.session.get(url, params=params, timeout=10).json()
|
||
if res.get("code") != 0:
|
||
logger.error(f"获取合集列表失败: {res.get('message')}")
|
||
return None
|
||
|
||
seasons = res.get("data", {}).get("seasons", [])
|
||
|
||
for s in seasons:
|
||
current_sid = s.get("season", {}).get("id")
|
||
|
||
# 找到目标合集
|
||
if current_sid == target_season_id:
|
||
title = s.get("season", {}).get("title", "未知标题")
|
||
sections = s.get("sections", {}).get("sections", [])
|
||
|
||
if sections:
|
||
# 默认取第一个小节
|
||
first_section_id = sections[0]["id"]
|
||
logger.info(f"✅ ID解析成功: 合集[{title}]({target_season_id}) -> 小节ID: {first_section_id}")
|
||
return first_section_id
|
||
else:
|
||
logger.error(f"❌ 合集[{title}]({target_season_id}) 存在,但没有创建任何小节!")
|
||
return None
|
||
|
||
logger.error(f"❌ 未找到 Season ID 为 {target_season_id} 的合集,请检查 ID 是否正确。")
|
||
return None
|
||
|
||
except Exception as e:
|
||
logger.error(f"解析 Section ID 异常: {e}")
|
||
return None
|
||
|
||
def add_video_to_section(self, section_id, video_info):
|
||
"""正式添加视频到合集"""
|
||
url = "https://member.bilibili.com/x2/creative/web/season/section/episodes/add"
|
||
|
||
# 参数必须包含 csrf
|
||
params = {"csrf": self.csrf}
|
||
|
||
payload = {
|
||
"sectionId": section_id,
|
||
"episodes": [{
|
||
"aid": video_info["aid"],
|
||
"cid": video_info["cid"],
|
||
"title": video_info["title"],
|
||
"charging_pay": 0
|
||
}]
|
||
}
|
||
|
||
try:
|
||
res = self.session.post(url, params=params, json=payload, timeout=15).json()
|
||
if res["code"] == 0:
|
||
logger.info(f"🎉 成功添加: {video_info['title']}")
|
||
return True
|
||
else:
|
||
logger.error(f"添加失败: {res['message']} (Code: {res['code']})")
|
||
return False
|
||
except Exception as e:
|
||
logger.error(f"添加请求异常: {e}")
|
||
return False
|
||
|
||
class CollectionWorker:
|
||
def __init__(self, client, section_id_a, section_id_b):
|
||
self.client = client
|
||
self.section_id_a = section_id_a
|
||
self.section_id_b = section_id_b
|
||
self.ansi_escape = re.compile(r"\x1b\[[0-9;]*[A-Za-z]")
|
||
|
||
def fetch_recent_videos(self):
|
||
"""获取最近投稿"""
|
||
try:
|
||
cmd = [str(BILIUP_PATH), "list", "--max-pages", "2"]
|
||
res = subprocess.run(cmd, capture_output=True, text=True, encoding='utf-8')
|
||
videos = []
|
||
for line in self.ansi_escape.sub("", res.stdout).splitlines():
|
||
parts = line.split()
|
||
if len(parts) >= 2 and parts[0].startswith("BV"):
|
||
raw_title = " ".join(parts[1:])
|
||
title = re.sub(r"(开放浏览|直播回放|审核中|-)$", "", raw_title).strip()
|
||
videos.append({"bvid": parts[0], "title": title})
|
||
return videos
|
||
except Exception:
|
||
logger.warning("biliup list 执行失败,跳过同名视频匹配。")
|
||
return []
|
||
|
||
def normalize(self, text):
|
||
return re.sub(r'[^\u4e00-\u9fa5a-zA-Z0-9]', '', text).replace('点', '时').lower()
|
||
|
||
def find_bvid_by_title(self, target_title, video_list):
|
||
target_norm = self.normalize(target_title)
|
||
for v in video_list:
|
||
v_norm = self.normalize(v['title'])
|
||
if target_norm in v_norm or v_norm in target_norm:
|
||
return v['bvid']
|
||
return None
|
||
|
||
def process_folder(self, folder: Path, video_list):
|
||
flag_a = folder / "collection_a_done.flag"
|
||
flag_b = folder / "collection_b_done.flag"
|
||
|
||
# 任务 A: 同名视频 -> 合集 A
|
||
if self.section_id_a and not flag_a.exists():
|
||
matched_bvid = self.find_bvid_by_title(folder.name, video_list)
|
||
if matched_bvid:
|
||
logger.info(f"任务A (同名): 匹配到 {matched_bvid},尝试添加...")
|
||
info = self.client.get_video_info(matched_bvid)
|
||
if info and self.client.add_video_to_section(self.section_id_a, info):
|
||
flag_a.touch()
|
||
|
||
# 任务 B: Upload切片 -> 合集 B
|
||
if self.section_id_b and not flag_b.exists():
|
||
bvid_file = folder / "bvid.txt"
|
||
if bvid_file.exists():
|
||
bvid = bvid_file.read_text(encoding='utf-8').strip()
|
||
logger.info(f"任务B (切片): 读取到 {bvid},尝试添加...")
|
||
info = self.client.get_video_info(bvid)
|
||
if info and self.client.add_video_to_section(self.section_id_b, info):
|
||
flag_b.touch()
|
||
|
||
def main():
|
||
logger.info("启动合集管理模块 (基于成功测试版)...")
|
||
|
||
try:
|
||
client = BiliCollectionClient()
|
||
except Exception as e:
|
||
logger.error(f"客户端初始化失败: {e}")
|
||
return
|
||
|
||
# 1. 解析 ID (这是最关键的一步)
|
||
logger.info("正在解析合集 ID...")
|
||
real_section_a = None
|
||
real_section_b = None
|
||
|
||
if SEASON_ID_A > 0:
|
||
real_section_a = client.resolve_section_id(SEASON_ID_A)
|
||
if SEASON_ID_B > 0:
|
||
real_section_b = client.resolve_section_id(SEASON_ID_B)
|
||
|
||
if not real_section_a and not real_section_b:
|
||
logger.error("没有解析到任何有效的 Section ID,脚本停止。")
|
||
return
|
||
|
||
# 2. 初始化 Worker
|
||
worker = CollectionWorker(client, real_section_a, real_section_b)
|
||
|
||
# 3. 扫描逻辑
|
||
logger.info("开始扫描目录...")
|
||
recent_videos = worker.fetch_recent_videos()
|
||
|
||
if SESSION_DIR.exists():
|
||
for folder in SESSION_DIR.iterdir():
|
||
if folder.is_dir():
|
||
worker.process_folder(folder, recent_videos)
|
||
|
||
logger.info("扫描完成。")
|
||
|
||
if __name__ == "__main__":
|
||
main() |