import json import time import requests import shutil import random from pathlib import Path # ================= 配置区域 ================= COOKIE_FILE = Path("./cookies.json") SOURCE_SEASON_ID = 7196643 # 源合集 (大合集) TARGET_SEASON_ID = 7288568 # 目标合集 (短视频合集) MAX_DURATION_SEC = 20 * 60 # 阈值:20分钟 (1200秒) # =========================================== class BiliCollectionTransferTool: def __init__(self): self.load_cookies() self.session = requests.Session() self.session.headers.update({ "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", "Referer": "https://member.bilibili.com/platform/upload-manager/distribution" }) def load_cookies(self): if not COOKIE_FILE.exists(): raise FileNotFoundError(f"找不到 Cookies 文件: {COOKIE_FILE}") with open(COOKIE_FILE, "r", encoding="utf-8") as f: data = json.load(f) self.cookies = {c["name"]: c["value"] for c in data.get("cookie_info", {}).get("cookies", [])} if "cookie_info" in data else data self.csrf = self.cookies.get("bili_jct") def get_season_info(self, season_id): """获取合集的 Section ID 和 视频列表""" self.session.cookies.update(self.cookies) try: # 1. 获取 Section ID list_url = "https://member.bilibili.com/x2/creative/web/seasons" res_list = self.session.get(list_url, params={"pn": 1, "ps": 50}).json() section_id = None for s in res_list.get("data", {}).get("seasons", []): if s.get("season", {}).get("id") == season_id: sections = s.get("sections", {}).get("sections", []) if sections: section_id = sections[0]["id"] break if not section_id: return None, [] # 2. 获取该小节详细视频列表 detail_url = "https://member.bilibili.com/x2/creative/web/season/section" res_detail = self.session.get(detail_url, params={"id": section_id}).json() # 兼容性修复:确保返回的是列表而非 None episodes = res_detail.get("data", {}).get("episodes", []) if episodes is None: episodes = [] return section_id, episodes except Exception as e: print(f"❌ 获取合集 {season_id} 失败: {e}") return None, [] def get_video_duration(self, bvid): """获取视频准确时长(秒)""" url = "https://api.bilibili.com/x/web-interface/view" try: res = self.session.get(url, params={"bvid": bvid}).json() if res["code"] == 0: return res["data"]["duration"] except: pass return 999999 def run(self): # 1. 获取合集信息 src_section_id, src_episodes = self.get_season_info(SOURCE_SEASON_ID) dst_section_id, dst_episodes = self.get_season_info(TARGET_SEASON_ID) if not src_section_id or not dst_section_id: print("❌ 无法获取合集信息,请检查 ID 是否正确。") return # 修复 NoneType 报错:确保 dst_episodes 是列表 dst_bvids = {ep['bvid'] for ep in dst_episodes if ep and 'bvid' in ep} print(f"📡 源合集共有 {len(src_episodes)} 个视频,开始检查时长...") to_move = [] for idx, ep in enumerate(src_episodes): bvid = ep['bvid'] duration = self.get_video_duration(bvid) # 进度提示 if (idx + 1) % 10 == 0: print(f" 已检查 {idx + 1}/{len(src_episodes)}...") if duration < MAX_DURATION_SEC: if bvid not in dst_bvids: to_move.append({ "aid": ep["aid"], "cid": ep["cid"], "title": ep["title"], "bvid": bvid, "charging_pay": 0 }) time.sleep(0.4) if not to_move: print("✨ 未发现需要迁移的短视频。") return print(f"\n💡 共发现 {len(to_move)} 个短视频需要迁移。") # 2. 分批迁移 (每 30 个一组) batch_size = 30 for i in range(0, len(to_move), batch_size): batch = to_move[i:i+batch_size] batch_aids = [m["aid"] for m in batch] print(f"🚀 正在处理第 {i//batch_size + 1} 组迁移 ({len(batch)} 个)...") # 先加入目标合集 add_url = "https://member.bilibili.com/x2/creative/web/season/section/episodes/add" res_add = self.session.post(add_url, params={"csrf": self.csrf}, json={ "sectionId": dst_section_id, "episodes": batch }).json() if res_add["code"] == 0: # 后从源合集移除 del_url = "https://member.bilibili.com/x2/creative/web/season/section/episodes/delete" res_del = self.session.post(del_url, params={"csrf": self.csrf}, json={ "sectionId": src_section_id, "aids": batch_aids }).json() if res_del["code"] == 0: print(f" ✅ 成功移动 {len(batch)} 个。") else: print(f" ⚠️ 移除失败: {res_del.get('message')}") else: print(f" ❌ 加入目标合集失败: {res_add.get('message')}") time.sleep(random.uniform(3, 6)) print("\n🎉 迁移任务执行完毕。") if __name__ == "__main__": tool = BiliCollectionTransferTool() tool.run()