Files
2026-03-21_why-manifest/archive_scripts/add_to_collection-2026-01-28-20-40-29.py

235 lines
8.9 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import time
import requests
import re
import shutil
import subprocess
from pathlib import Path
from logger import get_system_logger, log_exception
# ================= 配置区域 =================
SESSION_DIR = Path("./session")
COOKIE_FILE = Path("./cookies.json")
# 【这里填你 B 站网页上看到的合集 ID】
# 脚本会自动根据这两个 ID 去查找对应的 Section ID (小节ID)
SEASON_ID_A = 7196643 # 合集 A (同名视频)
SEASON_ID_B = 7196624 # 合集 B (Upload切片)
# 自动寻找 biliup
BILIUP_PATH = shutil.which("biliup") or "biliup"
# ===========================================
logger = get_system_logger("collection_manager")
class BiliCollectionClient:
def __init__(self):
if not COOKIE_FILE.exists():
raise FileNotFoundError(f"Cookies 文件不存在: {COOKIE_FILE}")
with open(COOKIE_FILE, "r", encoding="utf-8") as f:
data = json.load(f)
# 兼容处理 cookie 格式
if "cookie_info" in data:
self.cookies = {c["name"]: c["value"] for c in data.get("cookie_info", {}).get("cookies", [])}
else:
self.cookies = data
self.csrf = self.cookies.get("bili_jct")
if not self.csrf:
raise ValueError("Cookie 中缺少 bili_jct (CSRF Token)")
self.session = requests.Session()
self.session.cookies.update(self.cookies)
# 使用你测试成功的 Headers
self.session.headers.update({
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Referer": "https://member.bilibili.com/platform/upload-manager/distribution"
})
def get_video_info(self, bvid):
"""通过 BVID 获取 AID, CID 和 Title"""
url = "https://api.bilibili.com/x/web-interface/view"
try:
res = self.session.get(url, params={"bvid": bvid}, timeout=10).json()
if res["code"] != 0:
logger.error(f"查询视频信息失败 [{bvid}]: {res['message']}")
return None
data = res["data"]
return {
"aid": data["aid"],
"cid": data["cid"],
"title": data["title"]
}
except Exception as e:
logger.error(f"获取视频信息异常: {e}")
return None
def resolve_section_id(self, target_season_id):
"""
【关键逻辑】通过 Season ID (合集ID) 查找 Section ID (小节ID)
"""
url = "https://member.bilibili.com/x2/creative/web/seasons"
params = {"pn": 1, "ps": 50} # 获取前50个合集
try:
res = self.session.get(url, params=params, timeout=10).json()
if res.get("code") != 0:
logger.error(f"获取合集列表失败: {res.get('message')}")
return None
seasons = res.get("data", {}).get("seasons", [])
for s in seasons:
current_sid = s.get("season", {}).get("id")
# 找到目标合集
if current_sid == target_season_id:
title = s.get("season", {}).get("title", "未知标题")
sections = s.get("sections", {}).get("sections", [])
if sections:
# 默认取第一个小节
first_section_id = sections[0]["id"]
logger.info(f"✅ ID解析成功: 合集[{title}]({target_season_id}) -> 小节ID: {first_section_id}")
return first_section_id
else:
logger.error(f"❌ 合集[{title}]({target_season_id}) 存在,但没有创建任何小节!")
return None
logger.error(f"❌ 未找到 Season ID 为 {target_season_id} 的合集,请检查 ID 是否正确。")
return None
except Exception as e:
logger.error(f"解析 Section ID 异常: {e}")
return None
def add_video_to_section(self, section_id, video_info):
"""正式添加视频到合集"""
url = "https://member.bilibili.com/x2/creative/web/season/section/episodes/add"
# 参数必须包含 csrf
params = {"csrf": self.csrf}
payload = {
"sectionId": section_id,
"episodes": [{
"aid": video_info["aid"],
"cid": video_info["cid"],
"title": video_info["title"],
"charging_pay": 0
}]
}
try:
res = self.session.post(url, params=params, json=payload, timeout=15).json()
if res["code"] == 0:
logger.info(f"🎉 成功添加: {video_info['title']}")
return True
else:
logger.error(f"添加失败: {res['message']} (Code: {res['code']})")
return False
except Exception as e:
logger.error(f"添加请求异常: {e}")
return False
class CollectionWorker:
def __init__(self, client, section_id_a, section_id_b):
self.client = client
self.section_id_a = section_id_a
self.section_id_b = section_id_b
self.ansi_escape = re.compile(r"\x1b\[[0-9;]*[A-Za-z]")
def fetch_recent_videos(self):
"""获取最近投稿"""
try:
cmd = [str(BILIUP_PATH), "list", "--max-pages", "2"]
res = subprocess.run(cmd, capture_output=True, text=True, encoding='utf-8')
videos = []
for line in self.ansi_escape.sub("", res.stdout).splitlines():
parts = line.split()
if len(parts) >= 2 and parts[0].startswith("BV"):
raw_title = " ".join(parts[1:])
title = re.sub(r"(开放浏览|直播回放|审核中|-)$", "", raw_title).strip()
videos.append({"bvid": parts[0], "title": title})
return videos
except Exception:
logger.warning("biliup list 执行失败,跳过同名视频匹配。")
return []
def normalize(self, text):
return re.sub(r'[^\u4e00-\u9fa5a-zA-Z0-9]', '', text).replace('', '').lower()
def find_bvid_by_title(self, target_title, video_list):
target_norm = self.normalize(target_title)
for v in video_list:
v_norm = self.normalize(v['title'])
if target_norm in v_norm or v_norm in target_norm:
return v['bvid']
return None
def process_folder(self, folder: Path, video_list):
flag_a = folder / "collection_a_done.flag"
flag_b = folder / "collection_b_done.flag"
# 任务 A: 同名视频 -> 合集 A
if self.section_id_a and not flag_a.exists():
matched_bvid = self.find_bvid_by_title(folder.name, video_list)
if matched_bvid:
logger.info(f"任务A (同名): 匹配到 {matched_bvid},尝试添加...")
info = self.client.get_video_info(matched_bvid)
if info and self.client.add_video_to_section(self.section_id_a, info):
flag_a.touch()
# 任务 B: Upload切片 -> 合集 B
if self.section_id_b and not flag_b.exists():
bvid_file = folder / "bvid.txt"
if bvid_file.exists():
bvid = bvid_file.read_text(encoding='utf-8').strip()
logger.info(f"任务B (切片): 读取到 {bvid},尝试添加...")
info = self.client.get_video_info(bvid)
if info and self.client.add_video_to_section(self.section_id_b, info):
flag_b.touch()
def main():
logger.info("启动合集管理模块 (基于成功测试版)...")
try:
client = BiliCollectionClient()
except Exception as e:
logger.error(f"客户端初始化失败: {e}")
return
# 1. 解析 ID (这是最关键的一步)
logger.info("正在解析合集 ID...")
real_section_a = None
real_section_b = None
if SEASON_ID_A > 0:
real_section_a = client.resolve_section_id(SEASON_ID_A)
if SEASON_ID_B > 0:
real_section_b = client.resolve_section_id(SEASON_ID_B)
if not real_section_a and not real_section_b:
logger.error("没有解析到任何有效的 Section ID脚本停止。")
return
# 2. 初始化 Worker
worker = CollectionWorker(client, real_section_a, real_section_b)
# 3. 扫描逻辑
logger.info("开始扫描目录...")
recent_videos = worker.fetch_recent_videos()
if SESSION_DIR.exists():
for folder in SESSION_DIR.iterdir():
if folder.is_dir():
worker.process_folder(folder, recent_videos)
logger.info("扫描完成。")
if __name__ == "__main__":
main()