Files
2026-03-21_why-manifest/archive_scripts/temp_fromA_2_B.py

147 lines
5.8 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import time
import requests
import shutil
import random
from pathlib import Path
# ================= 配置区域 =================
COOKIE_FILE = Path("./cookies.json")
SOURCE_SEASON_ID = 7196643 # 源合集 (大合集)
TARGET_SEASON_ID = 7288568 # 目标合集 (短视频合集)
MAX_DURATION_SEC = 20 * 60 # 阈值20分钟 (1200秒)
# ===========================================
class BiliCollectionTransferTool:
def __init__(self):
self.load_cookies()
self.session = requests.Session()
self.session.headers.update({
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
"Referer": "https://member.bilibili.com/platform/upload-manager/distribution"
})
def load_cookies(self):
if not COOKIE_FILE.exists():
raise FileNotFoundError(f"找不到 Cookies 文件: {COOKIE_FILE}")
with open(COOKIE_FILE, "r", encoding="utf-8") as f:
data = json.load(f)
self.cookies = {c["name"]: c["value"] for c in data.get("cookie_info", {}).get("cookies", [])} if "cookie_info" in data else data
self.csrf = self.cookies.get("bili_jct")
def get_season_info(self, season_id):
"""获取合集的 Section ID 和 视频列表"""
self.session.cookies.update(self.cookies)
try:
# 1. 获取 Section ID
list_url = "https://member.bilibili.com/x2/creative/web/seasons"
res_list = self.session.get(list_url, params={"pn": 1, "ps": 50}).json()
section_id = None
for s in res_list.get("data", {}).get("seasons", []):
if s.get("season", {}).get("id") == season_id:
sections = s.get("sections", {}).get("sections", [])
if sections: section_id = sections[0]["id"]
break
if not section_id: return None, []
# 2. 获取该小节详细视频列表
detail_url = "https://member.bilibili.com/x2/creative/web/season/section"
res_detail = self.session.get(detail_url, params={"id": section_id}).json()
# 兼容性修复:确保返回的是列表而非 None
episodes = res_detail.get("data", {}).get("episodes", [])
if episodes is None: episodes = []
return section_id, episodes
except Exception as e:
print(f"❌ 获取合集 {season_id} 失败: {e}")
return None, []
def get_video_duration(self, bvid):
"""获取视频准确时长(秒)"""
url = "https://api.bilibili.com/x/web-interface/view"
try:
res = self.session.get(url, params={"bvid": bvid}).json()
if res["code"] == 0:
return res["data"]["duration"]
except: pass
return 999999
def run(self):
# 1. 获取合集信息
src_section_id, src_episodes = self.get_season_info(SOURCE_SEASON_ID)
dst_section_id, dst_episodes = self.get_season_info(TARGET_SEASON_ID)
if not src_section_id or not dst_section_id:
print("❌ 无法获取合集信息,请检查 ID 是否正确。")
return
# 修复 NoneType 报错:确保 dst_episodes 是列表
dst_bvids = {ep['bvid'] for ep in dst_episodes if ep and 'bvid' in ep}
print(f"📡 源合集共有 {len(src_episodes)} 个视频,开始检查时长...")
to_move = []
for idx, ep in enumerate(src_episodes):
bvid = ep['bvid']
duration = self.get_video_duration(bvid)
# 进度提示
if (idx + 1) % 10 == 0:
print(f" 已检查 {idx + 1}/{len(src_episodes)}...")
if duration < MAX_DURATION_SEC:
if bvid not in dst_bvids:
to_move.append({
"aid": ep["aid"],
"cid": ep["cid"],
"title": ep["title"],
"bvid": bvid,
"charging_pay": 0
})
time.sleep(0.4)
if not to_move:
print("✨ 未发现需要迁移的短视频。")
return
print(f"\n💡 共发现 {len(to_move)} 个短视频需要迁移。")
# 2. 分批迁移 (每 30 个一组)
batch_size = 30
for i in range(0, len(to_move), batch_size):
batch = to_move[i:i+batch_size]
batch_aids = [m["aid"] for m in batch]
print(f"🚀 正在处理第 {i//batch_size + 1} 组迁移 ({len(batch)} 个)...")
# 先加入目标合集
add_url = "https://member.bilibili.com/x2/creative/web/season/section/episodes/add"
res_add = self.session.post(add_url, params={"csrf": self.csrf}, json={
"sectionId": dst_section_id,
"episodes": batch
}).json()
if res_add["code"] == 0:
# 后从源合集移除
del_url = "https://member.bilibili.com/x2/creative/web/season/section/episodes/delete"
res_del = self.session.post(del_url, params={"csrf": self.csrf}, json={
"sectionId": src_section_id,
"aids": batch_aids
}).json()
if res_del["code"] == 0:
print(f" ✅ 成功移动 {len(batch)} 个。")
else:
print(f" ⚠️ 移除失败: {res_del.get('message')}")
else:
print(f" ❌ 加入目标合集失败: {res_add.get('message')}")
time.sleep(random.uniform(3, 6))
print("\n🎉 迁移任务执行完毕。")
if __name__ == "__main__":
tool = BiliCollectionTransferTool()
tool.run()