From d5d969358131349782dd4b6d3f7033a6ffc0954b Mon Sep 17 00:00:00 2001 From: theshy Date: Tue, 14 Apr 2026 16:00:17 +0800 Subject: [PATCH] feat: add session-level publish and comment flow --- README.md | 80 ++++- config/settings.json | 13 +- config/settings.schema.json | 50 ++++ config/settings.standalone.example.json | 7 +- docs/cold-start-checklist.md | 7 +- docs/config-system.md | 4 + docs/control-plane-guide.md | 25 +- docs/state-machine.md | 32 +- src/biliup_next/app/cli.py | 12 +- .../app/control_plane_post_dispatcher.py | 18 +- .../app/session_delivery_service.py | 8 +- src/biliup_next/app/task_engine.py | 53 ++++ src/biliup_next/app/task_policies.py | 10 +- src/biliup_next/app/task_runner.py | 9 + src/biliup_next/core/config.py | 2 + src/biliup_next/infra/adapters/biliup_cli.py | 139 ++++++++- src/biliup_next/infra/adapters/qwen_cli.py | 36 +++ src/biliup_next/infra/adapters/yt_dlp.py | 76 +++++ src/biliup_next/infra/runtime_doctor.py | 14 +- .../comment/providers/bilibili_top_comment.py | 123 +++++++- src/biliup_next/modules/comment/service.py | 4 +- .../modules/ingest/providers/bilibili_url.py | 128 ++++++++ src/biliup_next/modules/ingest/service.py | 83 ++++-- .../modules/publish/providers/biliup_cli.py | 171 +++++++++-- src/biliup_next/modules/publish/service.py | 162 +++++++++- .../modules/song_detect/providers/codex.py | 82 +---- .../modules/song_detect/providers/common.py | 122 ++++++++ .../modules/song_detect/providers/qwen_cli.py | 78 +++++ .../manifests/ingest_bilibili_url.json | 9 + .../manifests/song_detect_qwen_cli.json | 9 + tests/test_api_server.py | 29 ++ tests/test_bilibili_top_comment_provider.py | 256 ++++++++++++++++ tests/test_biliup_cli_publish_provider.py | 281 ++++++++++++++++++ tests/test_ingest_bilibili_url.py | 49 +++ tests/test_ingest_session_grouping.py | 61 ++++ tests/test_publish_service.py | 159 ++++++++++ tests/test_session_delivery_service.py | 53 +++- tests/test_settings_service.py | 11 +- tests/test_song_detect_providers.py | 77 +++++ tests/test_task_engine.py | 63 ++++ tests/test_task_policies.py | 20 ++ tests/test_task_runner.py | 34 +++ 42 files changed, 2478 insertions(+), 181 deletions(-) create mode 100644 src/biliup_next/infra/adapters/qwen_cli.py create mode 100644 src/biliup_next/infra/adapters/yt_dlp.py create mode 100644 src/biliup_next/modules/ingest/providers/bilibili_url.py create mode 100644 src/biliup_next/modules/song_detect/providers/common.py create mode 100644 src/biliup_next/modules/song_detect/providers/qwen_cli.py create mode 100644 src/biliup_next/plugins/manifests/ingest_bilibili_url.json create mode 100644 src/biliup_next/plugins/manifests/song_detect_qwen_cli.json create mode 100644 tests/test_bilibili_top_comment_provider.py create mode 100644 tests/test_biliup_cli_publish_provider.py create mode 100644 tests/test_ingest_bilibili_url.py create mode 100644 tests/test_ingest_session_grouping.py create mode 100644 tests/test_publish_service.py create mode 100644 tests/test_song_detect_providers.py diff --git a/README.md b/README.md index 0fe6276..82ce3bf 100644 --- a/README.md +++ b/README.md @@ -20,6 +20,11 @@ - 基础 CLI - 隔离 workspace 运行 - `stage -> ingest -> transcribe -> song_detect -> split -> publish -> comment -> collection` 主链路 +- session 级归并与交付 + - 同主播、文件名时间相差 `3` 小时内的片段归入同一 session + - session 的 `session_key` 使用最早片段标题 + - 同一 session 的纯享版只发布一个 BV + - 同一 session 的纯享版评论与完整版评论都按 `P1/P2/P3` 聚合 ## Run @@ -81,8 +86,10 @@ http://127.0.0.1:8787/classic - 默认第 1 次重试等待 15 分钟 - 第 2-5 次各等待 5 分钟 - 评论链路支持拆分: - - 纯享版 `bvid.txt` 下默认发布“无时间轴编号歌单” - - 完整版主视频默认尝试发布“带时间轴评论” + - 纯享版 `bvid.txt` 下默认发布 session 级聚合歌单评论 + - 同一 session 只由 anchor task 发一次,内容按 `P1/P2/P3` 分组 + - 完整版主视频默认尝试发布 session 级“带时间轴评论” + - 同一 session 只由 anchor task 发一次,内容按 `P1/P2/P3` 分组 - 若找不到 `full_video_bvid.txt` 或匹配不到完整版视频,则主视频评论跳过 - 任务完成后支持可选清理: - 删除 session 中的原始完整视频 @@ -176,10 +183,13 @@ cd /home/theshy/biliup/biliup-next 评论默认分成两条: - `comment.post_split_comment = true` - - 纯享版分P视频下发布编号歌单评论 - - 不带时间轴 + - 纯享版视频下发布 session 级聚合编号歌单评论 + - 同一 session 只发一次 + - 内容按 `P1/P2/P3` 分组 - `comment.post_full_video_timeline_comment = true` - - 尝试在完整版主视频下发布带时间轴的置顶评论 + - 尝试在完整版主视频下发布 session 级带时间轴置顶评论 + - 同一 session 只发一次 + - 内容按 `P1/P2/P3` 分组 - 依赖 `full_video_bvid.txt` 或通过标题匹配解析到完整版 BV 清理默认关闭: @@ -211,6 +221,37 @@ cd /home/theshy/biliup/biliup-next 如果 webhook 先于片段 ingest 到达,`biliup-next` 会先把它持久化;后续同 `session_key` 或 `source_title` 的任务进入时会自动继承该 `BV`。 +## Session Rules + +当前默认 session 归并规则是最小启发式实现: + +- 从文件名解析 `streamer + 日期时间` +- 同一 `streamer` +- 时间相差在 `3` 小时内 +- 归到同一个 session +- `session_key` 直接使用最早片段标题 + +例如: + +- `Self-test 04月04日 09时23分` +- `Self-test 04月04日 09时25分` +- `Self-test 04月04日 09时27分` + +会归到同一个: + +```text +session_key = Self-test 04月04日 09时23分 +``` + +在同一个 session 内: + +- 只有最早片段对应的 anchor task 会真正执行纯享版上传 +- 上传时会聚合整个 session 下所有 `split_video` +- 成功后整组 task 共用同一个 `bvid.txt` +- 纯享版评论与完整版评论也都只由 anchor task 发送一次 + +推荐 webhook 也使用同一个最早标题作为 `session_key` / `source_title`,这样完整版 `BV` 能稳定命中整组任务。 + ## Security 控制台支持可选 token 保护: @@ -250,3 +291,32 @@ bash install-systemd.sh - 安装到 `/etc/systemd/system/` - `daemon-reload` - `enable --now` 启动 `worker` 和 `api` + +## Bilibili URL Ingest + +现在也支持直接给 B 站视频链接创建任务。 + +CLI: + +```bash +cd /home/theshy/biliup/biliup-next +PYTHONPATH=src python -m biliup_next.app.cli create-task \ + --source-type bilibili_url \ + "https://www.bilibili.com/video/BVxxxxxxxxxx" +``` + +API: + +```bash +curl -X POST http://127.0.0.1:8787/tasks \ + -H 'Content-Type: application/json' \ + -d '{ + "source_type": "bilibili_url", + "source_url": "https://www.bilibili.com/video/BVxxxxxxxxxx" + }' +``` + +相关配置: + +- `ingest.provider = bilibili_url` +- `ingest.yt_dlp_cmd = yt-dlp` diff --git a/config/settings.json b/config/settings.json index 2cc24aa..7b5147b 100644 --- a/config/settings.json +++ b/config/settings.json @@ -31,6 +31,8 @@ "provider": "local_file", "min_duration_seconds": 900, "ffprobe_bin": "ffprobe", + "yt_dlp_cmd": "yt-dlp", + "yt_dlp_format": "", "allowed_extensions": [ ".mp4", ".flv", @@ -50,8 +52,9 @@ "max_file_size_mb": 23 }, "song_detect": { - "provider": "codex", + "provider": "qwen_cli", "codex_cmd": "codex", + "qwen_cmd": "qwen", "poll_interval_seconds": 2 }, "split": { @@ -72,7 +75,13 @@ 5, 5 ], - "retry_backoff_seconds": 300 + "retry_backoff_seconds": 300, + "command_timeout_seconds": 1800, + "rate_limit_retry_schedule_minutes": [ + 30, + 60, + 120 + ] }, "comment": { "provider": "bilibili_top_comment", diff --git a/config/settings.schema.json b/config/settings.schema.json index f5caf92..7937713 100644 --- a/config/settings.schema.json +++ b/config/settings.schema.json @@ -145,6 +145,21 @@ "ui_order": 20, "ui_widget": "command" }, + "yt_dlp_cmd": { + "type": "string", + "default": "yt-dlp", + "title": "yt-dlp Command", + "ui_order": 25, + "ui_widget": "command", + "description": "当 ingest.provider 选择 bilibili_url 时,用于下载 B 站视频的 yt-dlp 命令或绝对路径。" + }, + "yt_dlp_format": { + "type": "string", + "default": "", + "title": "yt-dlp Format", + "ui_order": 26, + "description": "可选的 yt-dlp -f 参数。留空表示使用默认画质;测试时可设为 worst[ext=mp4]/worst。" + }, "allowed_extensions": { "type": "array", "default": [".mp4", ".flv", ".mkv", ".mov"], @@ -245,6 +260,15 @@ "ui_widget": "command", "description": "歌曲识别时实际执行的 codex 命令或绝对路径。" }, + "qwen_cmd": { + "type": "string", + "default": "qwen", + "title": "Qwen Command", + "ui_order": 15, + "ui_featured": true, + "ui_widget": "command", + "description": "歌曲识别 provider 选择 qwen_cli 时,实际执行的 qwen 命令或绝对路径。" + }, "poll_interval_seconds": { "type": "integer", "default": 2, @@ -337,6 +361,32 @@ "ui_widget": "duration_seconds", "description": "旧版统一回退秒数。仅在未配置重试时间表时作为兼容兜底。", "minimum": 0 + }, + "command_timeout_seconds": { + "type": "integer", + "default": 1800, + "title": "Command Timeout Seconds", + "ui_order": 60, + "ui_featured": true, + "ui_widget": "duration_seconds", + "description": "单次 biliup renew/upload/append 命令的超时时间。超时会写入 publish.log 并返回失败。", + "minimum": 1 + }, + "rate_limit_retry_schedule_minutes": { + "type": "array", + "default": [ + 30, + 60, + 120 + ], + "title": "Rate Limit Retry Schedule Minutes", + "ui_order": 70, + "ui_featured": true, + "description": "上传命中 B站 601 限流时,按分钟定义专用重试等待时间。", + "items": { + "type": "integer", + "minimum": 0 + } } }, "comment": { diff --git a/config/settings.standalone.example.json b/config/settings.standalone.example.json index c8b6a59..c02c24e 100644 --- a/config/settings.standalone.example.json +++ b/config/settings.standalone.example.json @@ -15,6 +15,8 @@ "provider": "local_file", "min_duration_seconds": 900, "ffprobe_bin": "ffprobe", + "yt_dlp_cmd": "yt-dlp", + "yt_dlp_format": "", "allowed_extensions": [".mp4", ".flv", ".mkv", ".mov"], "stage_min_free_space_mb": 2048, "stability_wait_seconds": 30, @@ -31,6 +33,7 @@ "song_detect": { "provider": "codex", "codex_cmd": "codex", + "qwen_cmd": "qwen", "poll_interval_seconds": 2 }, "split": { @@ -43,7 +46,9 @@ "biliup_path": "runtime/biliup", "cookie_file": "runtime/cookies.json", "retry_count": 5, - "retry_backoff_seconds": 300 + "retry_backoff_seconds": 300, + "command_timeout_seconds": 1800, + "rate_limit_retry_schedule_minutes": [30, 60, 120] }, "comment": { "provider": "bilibili_top_comment", diff --git a/docs/cold-start-checklist.md b/docs/cold-start-checklist.md index 61ffe65..a278c68 100644 --- a/docs/cold-start-checklist.md +++ b/docs/cold-start-checklist.md @@ -41,7 +41,9 @@ bash setup.sh - 编辑 `runtime/upload_config.json` - 把 `biliup` 放到 `runtime/biliup`,或在 `settings.json` 里改成系统路径 - 填写 `transcribe.groq_api_key` -- 按机器实际情况调整 `song_detect.codex_cmd` +- 按机器实际情况调整 `song_detect.provider` +- 如果用 `codex`,调整 `song_detect.codex_cmd` +- 如果用 `qwen_cli`,调整 `song_detect.qwen_cmd` - 按需要填写 `collection.season_id_a` / `collection.season_id_b` ## 5. 验收 @@ -74,6 +76,7 @@ http://127.0.0.1:8787/ - `runtime/cookies.json` - `runtime/upload_config.json` - `publish.biliup_path` -- `song_detect.codex_cmd` +- `song_detect.provider` +- `song_detect.codex_cmd` 或 `song_detect.qwen_cmd` - `transcribe.groq_api_key` - `collection.season_id_a` / `collection.season_id_b` diff --git a/docs/config-system.md b/docs/config-system.md index ec6b614..996da1f 100644 --- a/docs/config-system.md +++ b/docs/config-system.md @@ -114,7 +114,10 @@ User edits config ### ingest +- `provider` - `min_duration_seconds` +- `ffprobe_bin` +- `yt_dlp_cmd` - `allowed_extensions` ### transcribe @@ -128,6 +131,7 @@ User edits config - `provider` - `codex_cmd` +- `qwen_cmd` - `poll_interval_seconds` ### split diff --git a/docs/control-plane-guide.md b/docs/control-plane-guide.md index a5e7109..7e1a66d 100644 --- a/docs/control-plane-guide.md +++ b/docs/control-plane-guide.md @@ -232,17 +232,31 @@ http://127.0.0.1:8787/ 排查原则: - `transcribe` 失败:先看 `Groq API Key`、`ffmpeg` -- `song_detect` 失败:先看 `codex_cmd` +- `song_detect` 失败:先看 `song_detect.provider`,再看 `codex_cmd` 或 `qwen_cmd` - `publish` 失败:先看 `cookies.json`、`biliup` - `collection_*` 失败:再看任务历史和日志 评论规则补充: - `comment` - - 纯享版视频下默认发“编号歌单”,不带时间轴 - - 完整版主视频下默认才发“带时间轴评论” + - 纯享版视频下默认发 session 级聚合“编号歌单” + - 内容按 `P1/P2/P3` 分组 + - 同一 session 只由 anchor task 发一次 + - 完整版主视频下默认才发 session 级“带时间轴评论” + - 内容按 `P1/P2/P3` 分组 + - 同一 session 只由 anchor task 发一次 - 如果当前任务找不到 `full_video_bvid.txt`,也没能从最近发布列表解析出完整版 BV,主视频评论会跳过 +session 规则补充: + +- 同主播、文件名时间 `3` 小时内的任务会自动归到同一 session +- session 的 `session_key` 使用最早片段标题 +- 同一 session 内: + - 只有 anchor task 真正执行纯享版上传 + - 纯享版上传会聚合整组 `split_video` + - 整组 task 共用同一个 `bvid.txt` + - split/full 评论都只发一次 + ## Artifacts 这里显示任务当前已经产出的文件,例如: @@ -324,7 +338,7 @@ http://127.0.0.1:8787/ - `upload_config_file` - `ffprobe` - `ffmpeg` -- `codex_cmd` +- `codex_cmd` 或 `qwen_cmd` - `biliup_path` 如果某个依赖显示 `(external)`,表示它还在用系统或父项目路径,不是 `biliup-next` 自己目录内的副本。 @@ -361,6 +375,7 @@ Settings 分成两层: - `min_duration_seconds` - `groq_api_key` - `codex_cmd` +- `qwen_cmd` - `retry_count` - `season_id_a` - `season_id_b` @@ -449,7 +464,7 @@ __BILIUP_NEXT_SECRET__ 建议: -- 如果你希望纯享版评论更适合分P浏览,保持 `post_split_comment = true` +- 如果你希望纯享版评论以 session 级聚合歌单展示,保持 `post_split_comment = true` - 如果你不希望尝试给完整版主视频发时间轴评论,可以关闭 `post_full_video_timeline_comment` - 如果磁盘紧张,再开启 cleanup;默认建议先关闭,等确认流程稳定后再开 diff --git a/docs/state-machine.md b/docs/state-machine.md index 03947b8..4865938 100644 --- a/docs/state-machine.md +++ b/docs/state-machine.md @@ -88,7 +88,9 @@ 负责: - 上传纯享版视频 -- 记录 `aid/bvid` +- 同 session 多个 task 时,只由 anchor task 真正执行上传 +- 聚合同 session 的全部 `clip_video` +- 成功后把同一个 `bvid` 写回整组 task ### comment @@ -96,6 +98,9 @@ - 发布评论 - 置顶评论 +- split 评论在 session 级聚合为 `P1/P2/P3` +- full 评论在 session 级聚合为 `P1/P2/P3` +- 同一 session 的评论只由 anchor task 执行一次 ### collection_a @@ -156,6 +161,31 @@ created - `collection_b` 必须依赖 `publish` - `collection_a` 通常依赖外部完整版 BV,可独立于 `publish` +## Session Semantics + +当多个 task 属于同一个 `session_key` 时,系统会引入 session 级语义: + +- `split` 仍然保持 task 级 +- `publish` 升级为 session 级 +- `comment` 升级为 session 级 + +当前 anchor 规则: + +- 同一 session 内按 `segment_started_at` 升序排序 +- 最早那个 task 作为 anchor + +当前 session 级行为: + +- `publish` + - 只有 anchor task 执行真实上传 + - 其余 task 复用同一个纯享 `BV` +- `comment.split` + - 只有 anchor task 对纯享版视频发评论 + - 评论内容按 `P1/P2/P3` 聚合 +- `comment.full` + - 只有 anchor task 对完整版视频发评论 + - 评论内容按 `P1/P2/P3` 聚合 + ## Special Case: Collection A 合集 A 的数据来源与主上传链路不同。 diff --git a/src/biliup_next/app/cli.py b/src/biliup_next/app/cli.py index 1c8a8b8..53d9ba0 100644 --- a/src/biliup_next/app/cli.py +++ b/src/biliup_next/app/cli.py @@ -31,7 +31,8 @@ def main() -> None: worker_parser.add_argument("--interval", type=int, default=5) create_task_parser = sub.add_parser("create-task") - create_task_parser.add_argument("source_path") + create_task_parser.add_argument("source") + create_task_parser.add_argument("--source-type", choices=["local_file", "bilibili_url"], default="local_file") serve_parser = sub.add_parser("serve") serve_parser.add_argument("--host", default="127.0.0.1") @@ -95,10 +96,11 @@ def main() -> None: state = ensure_initialized() settings = dict(state["settings"]["ingest"]) settings.update(state["settings"]["paths"]) - task = state["ingest_service"].create_task_from_file( - Path(args.source_path), - settings, - ) + if args.source_type == "bilibili_url": + settings["provider"] = "bilibili_url" + task = state["ingest_service"].create_task_from_url(args.source, settings) + else: + task = state["ingest_service"].create_task_from_file(Path(args.source), settings) print(json.dumps(task.to_dict(), ensure_ascii=False, indent=2)) return diff --git a/src/biliup_next/app/control_plane_post_dispatcher.py b/src/biliup_next/app/control_plane_post_dispatcher.py index f31cf0c..5bb3b61 100644 --- a/src/biliup_next/app/control_plane_post_dispatcher.py +++ b/src/biliup_next/app/control_plane_post_dispatcher.py @@ -137,13 +137,23 @@ class ControlPlanePostDispatcher: return result, HTTPStatus.CREATED def handle_create_task(self, payload: object) -> tuple[object, HTTPStatus]: - source_path = payload.get("source_path") if isinstance(payload, dict) else None - if not source_path: - return {"error": "missing source_path"}, HTTPStatus.BAD_REQUEST + if not isinstance(payload, dict): + return {"error": "invalid payload"}, HTTPStatus.BAD_REQUEST + source_type = str(payload.get("source_type") or "local_file") try: settings = dict(self.state["settings"]["ingest"]) settings.update(self.state["settings"]["paths"]) - task = self.state["ingest_service"].create_task_from_file(Path(source_path), settings) + if source_type == "bilibili_url": + source_url = payload.get("source_url") + if not source_url: + return {"error": "missing source_url"}, HTTPStatus.BAD_REQUEST + settings["provider"] = "bilibili_url" + task = self.state["ingest_service"].create_task_from_url(str(source_url), settings) + else: + source_path = payload.get("source_path") + if not source_path: + return {"error": "missing source_path"}, HTTPStatus.BAD_REQUEST + task = self.state["ingest_service"].create_task_from_file(Path(str(source_path)), settings) except Exception as exc: status = HTTPStatus.CONFLICT if exc.__class__.__name__ == "ModuleError" else HTTPStatus.INTERNAL_SERVER_ERROR body = exc.to_dict() if hasattr(exc, "to_dict") else {"error": str(exc)} diff --git a/src/biliup_next/app/session_delivery_service.py b/src/biliup_next/app/session_delivery_service.py index 519f7f6..411a3c5 100644 --- a/src/biliup_next/app/session_delivery_service.py +++ b/src/biliup_next/app/session_delivery_service.py @@ -155,6 +155,10 @@ class SessionDeliveryService: if session_key is None and source_title is None: return {"error": {"code": "SESSION_KEY_OR_SOURCE_TITLE_REQUIRED", "message": "session_key or source_title required"}} + source_contexts = self.repo.list_task_contexts_by_source_title(source_title) if source_title else [] + if session_key is None and source_contexts: + session_key = source_contexts[0].session_key + now = utc_now_iso() self.repo.upsert_session_binding( SessionBinding( @@ -170,8 +174,8 @@ class SessionDeliveryService: ) contexts = self.repo.list_task_contexts_by_session_key(session_key) if session_key else [] - if not contexts and source_title: - contexts = self.repo.list_task_contexts_by_source_title(source_title) + if not contexts and source_contexts: + contexts = source_contexts updated_tasks: list[dict[str, object]] = [] for context in contexts: diff --git a/src/biliup_next/app/task_engine.py b/src/biliup_next/app/task_engine.py index 79f682c..54009ca 100644 --- a/src/biliup_next/app/task_engine.py +++ b/src/biliup_next/app/task_engine.py @@ -1,5 +1,7 @@ from __future__ import annotations +from datetime import datetime + from biliup_next.app.retry_meta import retry_meta_for_step @@ -90,6 +92,14 @@ def next_runnable_step(task, steps: dict[str, object], state: dict[str, object]) if task.status == "split_done": step = steps.get("publish") if step and step.status in {"pending", "failed_retryable"}: + session_publish = _session_publish_gate(task.id, state) + if session_publish is not None: + if session_publish["session_published"]: + return "publish", None + if not session_publish["is_anchor"]: + return None, None + if not session_publish["all_split_ready"]: + return None, None return "publish", None if task.status in {"published", "collection_synced"}: if state["settings"]["comment"].get("enabled", True): @@ -113,6 +123,49 @@ def next_runnable_step(task, steps: dict[str, object], state: dict[str, object]) return None, None +def _session_publish_gate(task_id: str, state: dict[str, object]) -> dict[str, object] | None: + repo = state.get("repo") + if repo is None or not hasattr(repo, "get_task_context"): + return None + context = repo.get_task_context(task_id) + if context is None or not context.session_key or context.session_key.startswith("task:"): + return None + contexts = list(repo.list_task_contexts_by_session_key(context.session_key)) + if len(contexts) <= 1: + return None + ordered = sorted( + contexts, + key=lambda item: ( + _parse_dt(item.segment_started_at), + item.source_title or item.task_id, + ), + ) + anchor_id = ordered[0].task_id + sibling_tasks = [repo.get_task(item.task_id) for item in ordered] + session_published = any( + sibling is not None and sibling.status in {"published", "commented", "collection_synced"} + for sibling in sibling_tasks + ) + all_split_ready = all( + sibling is not None and sibling.status in {"split_done", "published", "commented", "collection_synced"} + for sibling in sibling_tasks + ) + return { + "is_anchor": task_id == anchor_id, + "session_published": session_published, + "all_split_ready": all_split_ready, + } + + +def _parse_dt(value: str | None) -> datetime: + if not value: + return datetime.max + try: + return datetime.fromisoformat(value) + except ValueError: + return datetime.max + + def execute_step(state: dict[str, object], task_id: str, step_name: str) -> dict[str, object]: if step_name == "transcribe": artifact = state["transcribe_service"].run(task_id, settings_for(state, "transcribe")) diff --git a/src/biliup_next/app/task_policies.py b/src/biliup_next/app/task_policies.py index 7ed4be4..41296c2 100644 --- a/src/biliup_next/app/task_policies.py +++ b/src/biliup_next/app/task_policies.py @@ -36,7 +36,15 @@ def resolve_failure(task, repo, state: dict[str, object], exc) -> dict[str, obje next_status = "failed_retryable" if exc.retryable else "failed_manual" next_retry_delay_seconds: int | None = None if exc.retryable and step_name == "publish": - schedule = publish_retry_schedule_seconds(settings_for(state, "publish")) + publish_settings = settings_for(state, "publish") + if exc.code == "PUBLISH_RATE_LIMITED": + custom_schedule = publish_settings.get("rate_limit_retry_schedule_minutes") + if isinstance(custom_schedule, list) and custom_schedule: + schedule = [max(0, int(minutes)) * 60 for minutes in custom_schedule] + else: + schedule = publish_retry_schedule_seconds(publish_settings) + else: + schedule = publish_retry_schedule_seconds(publish_settings) if next_retry_count > len(schedule): next_status = "failed_manual" else: diff --git a/src/biliup_next/app/task_runner.py b/src/biliup_next/app/task_runner.py index 5e67e6f..5c6c0c0 100644 --- a/src/biliup_next/app/task_runner.py +++ b/src/biliup_next/app/task_runner.py @@ -86,4 +86,13 @@ def process_task(task_id: str, *, reset_step: str | None = None, include_stage_s failure = resolve_failure(task, repo, state, exc) processed.append(failure["payload"]) record_task_action(repo, task_id, failure["step_name"], "error", failure["summary"], failure["payload"]) + except Exception as exc: + unexpected = ModuleError( + code="UNHANDLED_EXCEPTION", + message=f"unexpected error: {exc}", + retryable=False, + ) + failure = resolve_failure(task, repo, state, unexpected) + processed.append(failure["payload"]) + record_task_action(repo, task_id, failure["step_name"], "error", failure["summary"], failure["payload"]) return {"processed": processed} diff --git a/src/biliup_next/core/config.py b/src/biliup_next/core/config.py index 17e3239..d1a44ec 100644 --- a/src/biliup_next/core/config.py +++ b/src/biliup_next/core/config.py @@ -160,9 +160,11 @@ class SettingsService: ("paths", "cookies_file"), ("paths", "upload_config_file"), ("ingest", "ffprobe_bin"), + ("ingest", "yt_dlp_cmd"), ("transcribe", "ffmpeg_bin"), ("split", "ffmpeg_bin"), ("song_detect", "codex_cmd"), + ("song_detect", "qwen_cmd"), ("publish", "biliup_path"), ("publish", "cookie_file"), ): diff --git a/src/biliup_next/infra/adapters/biliup_cli.py b/src/biliup_next/infra/adapters/biliup_cli.py index 532911a..941a79f 100644 --- a/src/biliup_next/infra/adapters/biliup_cli.py +++ b/src/biliup_next/infra/adapters/biliup_cli.py @@ -1,27 +1,158 @@ from __future__ import annotations import subprocess +from datetime import datetime +from pathlib import Path +import shlex from biliup_next.core.errors import ModuleError class BiliupCliAdapter: - def run(self, cmd: list[str], *, label: str) -> subprocess.CompletedProcess[str]: + def run( + self, + cmd: list[str], + *, + label: str, + timeout_seconds: int | None = None, + log_path: Path | None = None, + ) -> subprocess.CompletedProcess[str]: try: - return subprocess.run(cmd, capture_output=True, text=True, check=False) + result = subprocess.run( + cmd, + capture_output=True, + text=True, + check=False, + timeout=timeout_seconds, + ) + self._append_log( + log_path, + label=label, + cmd=cmd, + returncode=result.returncode, + stdout=result.stdout, + stderr=result.stderr, + timeout_seconds=timeout_seconds, + ) + return result except FileNotFoundError as exc: raise ModuleError( code="BILIUP_NOT_FOUND", message=f"找不到 biliup 命令: {cmd[0]} ({label})", retryable=False, ) from exc + except subprocess.TimeoutExpired as exc: + stdout = self._stringify_output(exc.stdout) + stderr = self._stringify_output(exc.stderr) + self._append_log( + log_path, + label=label, + cmd=cmd, + returncode=None, + stdout=stdout, + stderr=stderr, + timeout_seconds=timeout_seconds, + ) + raise ModuleError( + code="BILIUP_TIMEOUT", + message=f"biliup 命令超时: {label}", + retryable=True, + details={ + "label": label, + "timeout_seconds": timeout_seconds, + "stdout": stdout[-2000:], + "stderr": stderr[-2000:], + }, + ) from exc - def run_optional(self, cmd: list[str]) -> None: + def run_optional( + self, + cmd: list[str], + *, + label: str, + timeout_seconds: int | None = None, + log_path: Path | None = None, + ) -> None: try: - subprocess.run(cmd, capture_output=True, text=True, check=False) + result = subprocess.run( + cmd, + capture_output=True, + text=True, + check=False, + timeout=timeout_seconds, + ) + self._append_log( + log_path, + label=label, + cmd=cmd, + returncode=result.returncode, + stdout=result.stdout, + stderr=result.stderr, + timeout_seconds=timeout_seconds, + ) except FileNotFoundError as exc: raise ModuleError( code="BILIUP_NOT_FOUND", message=f"找不到 biliup 命令: {cmd[0]}", retryable=False, ) from exc + except subprocess.TimeoutExpired as exc: + stdout = self._stringify_output(exc.stdout) + stderr = self._stringify_output(exc.stderr) + self._append_log( + log_path, + label=label, + cmd=cmd, + returncode=None, + stdout=stdout, + stderr=stderr, + timeout_seconds=timeout_seconds, + ) + raise ModuleError( + code="BILIUP_TIMEOUT", + message=f"biliup 命令超时: {label}", + retryable=True, + details={ + "label": label, + "timeout_seconds": timeout_seconds, + "stdout": stdout[-2000:], + "stderr": stderr[-2000:], + }, + ) from exc + + @staticmethod + def _append_log( + log_path: Path | None, + *, + label: str, + cmd: list[str], + returncode: int | None, + stdout: str, + stderr: str, + timeout_seconds: int | None = None, + ) -> None: + if log_path is None: + return + timestamp = datetime.now().isoformat(timespec="seconds") + log_path.parent.mkdir(parents=True, exist_ok=True) + lines = [ + f"[{timestamp}] {label}", + f"cmd: {shlex.join(cmd)}", + ] + if timeout_seconds is not None: + lines.append(f"timeout_seconds: {timeout_seconds}") + lines.append(f"exit: {returncode if returncode is not None else 'timeout'}") + if stdout: + lines.extend(["stdout:", stdout.rstrip()]) + if stderr: + lines.extend(["stderr:", stderr.rstrip()]) + lines.append("") + log_path.write_text(log_path.read_text(encoding="utf-8") + "\n".join(lines), encoding="utf-8") if log_path.exists() else log_path.write_text("\n".join(lines), encoding="utf-8") + + @staticmethod + def _stringify_output(value: str | bytes | None) -> str: + if value is None: + return "" + if isinstance(value, bytes): + return value.decode("utf-8", errors="replace") + return value diff --git a/src/biliup_next/infra/adapters/qwen_cli.py b/src/biliup_next/infra/adapters/qwen_cli.py new file mode 100644 index 0000000..b965ed9 --- /dev/null +++ b/src/biliup_next/infra/adapters/qwen_cli.py @@ -0,0 +1,36 @@ +from __future__ import annotations + +import subprocess +from pathlib import Path + +from biliup_next.core.errors import ModuleError + + +class QwenCliAdapter: + def run_song_detect( + self, + *, + qwen_cmd: str, + work_dir: Path, + prompt: str, + ) -> subprocess.CompletedProcess[str]: + cmd = [ + qwen_cmd, + "--yolo", + "-p", + prompt, + ] + try: + return subprocess.run( + cmd, + cwd=str(work_dir), + capture_output=True, + text=True, + check=False, + ) + except FileNotFoundError as exc: + raise ModuleError( + code="QWEN_NOT_FOUND", + message=f"找不到 qwen 命令: {qwen_cmd}", + retryable=False, + ) from exc diff --git a/src/biliup_next/infra/adapters/yt_dlp.py b/src/biliup_next/infra/adapters/yt_dlp.py new file mode 100644 index 0000000..09c5e14 --- /dev/null +++ b/src/biliup_next/infra/adapters/yt_dlp.py @@ -0,0 +1,76 @@ +from __future__ import annotations + +import json +import subprocess +from typing import Any + +from biliup_next.core.errors import ModuleError + + +class YtDlpAdapter: + def probe(self, *, yt_dlp_cmd: str, source_url: str) -> dict[str, Any]: + cmd = [ + yt_dlp_cmd, + "--dump-single-json", + "--no-warnings", + "--no-playlist", + source_url, + ] + result = self._run(cmd) + if result.returncode != 0: + raise ModuleError( + code="YT_DLP_PROBE_FAILED", + message="yt-dlp 获取视频信息失败", + retryable=True, + details={"stdout": result.stdout[-2000:], "stderr": result.stderr[-2000:]}, + ) + try: + payload = json.loads(result.stdout) + except json.JSONDecodeError as exc: + raise ModuleError( + code="YT_DLP_PROBE_INVALID_JSON", + message="yt-dlp 返回了非法 JSON", + retryable=True, + details={"stdout": result.stdout[-2000:]}, + ) from exc + if not isinstance(payload, dict): + raise ModuleError( + code="YT_DLP_PROBE_INVALID_JSON", + message="yt-dlp 返回结果不是对象", + retryable=True, + ) + return payload + + def download( + self, + *, + yt_dlp_cmd: str, + source_url: str, + output_template: str, + format_selector: str | None = None, + ) -> subprocess.CompletedProcess[str]: + cmd = [ + yt_dlp_cmd, + "--no-warnings", + "--no-playlist", + ] + if format_selector: + cmd.extend(["-f", format_selector]) + cmd.extend([ + "--merge-output-format", + "mp4", + "-o", + output_template, + source_url, + ]) + return self._run(cmd) + + def _run(self, cmd: list[str]) -> subprocess.CompletedProcess[str]: + try: + return subprocess.run(cmd, capture_output=True, text=True, check=False) + except FileNotFoundError as exc: + raise ModuleError( + code="YT_DLP_NOT_FOUND", + message=f"找不到 yt-dlp 命令: {cmd[0]}", + retryable=False, + ) from exc diff --git a/src/biliup_next/infra/runtime_doctor.py b/src/biliup_next/infra/runtime_doctor.py index 77d2fa8..6143ed9 100644 --- a/src/biliup_next/infra/runtime_doctor.py +++ b/src/biliup_next/infra/runtime_doctor.py @@ -40,13 +40,25 @@ class RuntimeDoctor: for group, name in ( ("ingest", "ffprobe_bin"), ("transcribe", "ffmpeg_bin"), - ("song_detect", "codex_cmd"), ): value = settings[group][name] found = shutil.which(value) if "/" not in value else str((self.root_dir / value).resolve()) ok = bool(found) and (Path(found).exists() if "/" in str(found) else True) checks.append({"name": f"{group}.{name}", "ok": ok, "detail": str(found or value)}) + if str(settings.get("ingest", {}).get("provider", "local_file")) == "bilibili_url": + value = str(settings["ingest"].get("yt_dlp_cmd", "yt-dlp")) + found = shutil.which(value) if "/" not in value else str((self.root_dir / value).resolve()) + ok = bool(found) and (Path(found).exists() if "/" in str(found) else True) + checks.append({"name": "ingest.yt_dlp_cmd", "ok": ok, "detail": str(found or value)}) + + song_detect_provider = str(settings.get("song_detect", {}).get("provider", "codex")) + song_detect_cmd_name = "qwen_cmd" if song_detect_provider == "qwen_cli" else "codex_cmd" + song_detect_cmd = str(settings["song_detect"].get(song_detect_cmd_name, "")) + found = shutil.which(song_detect_cmd) if "/" not in song_detect_cmd else str((self.root_dir / song_detect_cmd).resolve()) + ok = bool(found) and (Path(found).exists() if "/" in str(found) else True) + checks.append({"name": f"song_detect.{song_detect_cmd_name}", "ok": ok, "detail": str(found or song_detect_cmd)}) + publish_biliup_path = Path(str(settings["publish"]["biliup_path"])).resolve() checks.append( { diff --git a/src/biliup_next/modules/comment/providers/bilibili_top_comment.py b/src/biliup_next/modules/comment/providers/bilibili_top_comment.py index c57e9a3..9bcf82e 100644 --- a/src/biliup_next/modules/comment/providers/bilibili_top_comment.py +++ b/src/biliup_next/modules/comment/providers/bilibili_top_comment.py @@ -2,6 +2,7 @@ from __future__ import annotations import json import time +from datetime import datetime from pathlib import Path from typing import Any @@ -39,7 +40,7 @@ class BilibiliTopCommentProvider: ) timeline_content = songs_path.read_text(encoding="utf-8").strip() - split_content = self._build_split_comment_content(songs_json_path, songs_path) + split_content, split_reason = self._build_split_comment(task, settings) if not timeline_content and not split_content: self._touch_comment_flags(session_dir, split_done=True, full_done=True) return {"status": "skipped", "reason": "comment_content_empty"} @@ -62,7 +63,9 @@ class BilibiliTopCommentProvider: if settings.get("post_split_comment", True) and not split_done: split_bvid = bvid_path.read_text(encoding="utf-8").strip() - if split_content: + if split_reason is not None: + split_result = {"status": "skipped", "reason": split_reason} + elif split_content: split_result = self._post_and_top_comment(session, csrf, split_bvid, split_content, "split") else: split_result = {"status": "skipped", "reason": "split_comment_empty"} @@ -74,8 +77,11 @@ class BilibiliTopCommentProvider: if settings.get("post_full_video_timeline_comment", True) and not full_done: full_bvid = resolve_full_video_bvid(task.title, session_dir, settings) - if full_bvid and timeline_content: - full_result = self._post_and_top_comment(session, csrf, full_bvid, timeline_content, "full") + full_content, full_reason = self._build_full_comment_content(task, settings) + if full_reason is not None: + full_result = {"status": "skipped", "reason": full_reason} + elif full_bvid and full_content: + full_result = self._post_and_top_comment(session, csrf, full_bvid, full_content, "full") else: reason = "full_video_bvid_not_found" if not full_bvid else "timeline_comment_empty" full_result = {"status": "skipped", "reason": reason} @@ -104,13 +110,18 @@ class BilibiliTopCommentProvider: error_message=f"获取{target}视频信息失败", ) aid = int(view["aid"]) - add_res = self.bilibili_api.add_reply( - session, - csrf=csrf, - aid=aid, - content=content, - error_message=f"发布{target}评论失败", - ) + try: + add_res = self.bilibili_api.add_reply( + session, + csrf=csrf, + aid=aid, + content=content, + error_message=f"发布{target}评论失败", + ) + except ModuleError as exc: + if self._is_comment_closed_error(exc): + return {"status": "skipped", "reason": "comment_disabled", "bvid": bvid, "aid": aid} + raise rpid = int(add_res["rpid"]) time.sleep(3) self.bilibili_api.top_reply( @@ -151,6 +162,80 @@ class BilibiliTopCommentProvider: return "\n".join(lines) return "" + def _build_split_comment(self, task: Task, settings: dict[str, Any]) -> tuple[str, str | None]: + repo = settings.get("__repo") + session_dir_root = Path(str(settings["session_dir"])) + if repo is None or not hasattr(repo, "get_task_context") or not hasattr(repo, "list_task_contexts_by_session_key"): + session_dir = session_dir_root / task.title + return self._build_split_comment_content(session_dir / "songs.json", session_dir / "songs.txt"), None + + context = repo.get_task_context(task.id) + if context is None or not context.session_key or context.session_key.startswith("task:"): + session_dir = session_dir_root / task.title + return self._build_split_comment_content(session_dir / "songs.json", session_dir / "songs.txt"), None + + ordered_contexts = self._ordered_session_contexts(repo, context.session_key) + if not ordered_contexts: + return "", "split_comment_empty" + anchor_task_id = ordered_contexts[0].task_id + if task.id != anchor_task_id: + return "", "session_split_comment_owned_by_anchor" + + blocks: list[str] = [] + for index, session_context in enumerate(ordered_contexts, start=1): + task_dir = session_dir_root / session_context.task_id + content = self._build_split_comment_content(task_dir / "songs.json", task_dir / "songs.txt") + if not content: + continue + blocks.append(f"P{index}:\n{content}") + if not blocks: + return "", "split_comment_empty" + return "\n\n".join(blocks), None + + def _build_full_comment_content(self, task: Task, settings: dict[str, Any]) -> tuple[str, str | None]: + repo = settings.get("__repo") + if repo is None or not hasattr(repo, "get_task_context") or not hasattr(repo, "list_task_contexts_by_session_key"): + session_dir = Path(str(settings["session_dir"])) / task.title + content = session_dir.joinpath("songs.txt").read_text(encoding="utf-8").strip() + return content, None if content else "timeline_comment_empty" + + context = repo.get_task_context(task.id) + if context is None or not context.session_key or context.session_key.startswith("task:"): + session_dir = Path(str(settings["session_dir"])) / task.title + content = session_dir.joinpath("songs.txt").read_text(encoding="utf-8").strip() + return content, None if content else "timeline_comment_empty" + + ordered_contexts = self._ordered_session_contexts(repo, context.session_key) + if not ordered_contexts: + return "", "timeline_comment_empty" + anchor_task_id = ordered_contexts[0].task_id + if task.id != anchor_task_id: + return "", "session_full_comment_owned_by_anchor" + + blocks: list[str] = [] + for index, session_context in enumerate(ordered_contexts, start=1): + task_dir = Path(str(settings["session_dir"])) / session_context.task_id + songs_path = task_dir / "songs.txt" + if not songs_path.exists(): + continue + content = songs_path.read_text(encoding="utf-8").strip() + if not content: + continue + blocks.append(f"P{index}:\n{content}") + if not blocks: + return "", "timeline_comment_empty" + return "\n\n".join(blocks), None + + def _ordered_session_contexts(self, repo, session_key: str) -> list[object]: # type: ignore[no-untyped-def] + contexts = list(repo.list_task_contexts_by_session_key(session_key)) + return sorted( + contexts, + key=lambda item: ( + self._parse_started_at(item.segment_started_at), + item.source_title or item.task_id, + ), + ) + @staticmethod def _touch_comment_flags(session_dir: Path, *, split_done: bool, full_done: bool) -> None: if split_done: @@ -159,3 +244,19 @@ class BilibiliTopCommentProvider: (session_dir / "comment_full_done.flag").touch() if split_done and full_done: (session_dir / "comment_done.flag").touch() + + @staticmethod + def _is_comment_closed_error(exc: ModuleError) -> bool: + message = exc.message or "" + details = exc.details or {} + combined = f"{message}\n{details}".lower() + return "评论功能已关闭" in message or "comment disabled" in combined or "reply not allowed" in combined + + @staticmethod + def _parse_started_at(value: str | None) -> datetime: + if not value: + return datetime.max + try: + return datetime.fromisoformat(value) + except ValueError: + return datetime.max diff --git a/src/biliup_next/modules/comment/service.py b/src/biliup_next/modules/comment/service.py index d360657..0195862 100644 --- a/src/biliup_next/modules/comment/service.py +++ b/src/biliup_next/modules/comment/service.py @@ -15,9 +15,11 @@ class CommentService: if task is None: raise RuntimeError(f"task not found: {task_id}") provider = self.registry.get("comment_provider", str(settings.get("provider", "bilibili_top_comment"))) + provider_settings = dict(settings) + provider_settings["__repo"] = self.repo started_at = utc_now_iso() self.repo.update_step_status(task_id, "comment", "running", started_at=started_at) - result = provider.comment(task, settings) + result = provider.comment(task, provider_settings) finished_at = utc_now_iso() self.repo.update_step_status(task_id, "comment", "succeeded", finished_at=finished_at) self.repo.update_task_status(task_id, "commented", finished_at) diff --git a/src/biliup_next/modules/ingest/providers/bilibili_url.py b/src/biliup_next/modules/ingest/providers/bilibili_url.py new file mode 100644 index 0000000..4a64c41 --- /dev/null +++ b/src/biliup_next/modules/ingest/providers/bilibili_url.py @@ -0,0 +1,128 @@ +from __future__ import annotations + +import re +from pathlib import Path +from typing import Any + +from biliup_next.core.errors import ModuleError +from biliup_next.core.providers import ProviderManifest +from biliup_next.infra.adapters.yt_dlp import YtDlpAdapter + +FORBIDDEN_PATH_CHARS = re.compile(r'[<>:"/\\|?*\x00-\x1f]') +URL_PREFIXES = ( + "https://www.bilibili.com/video/", + "http://www.bilibili.com/video/", + "https://b23.tv/", + "http://b23.tv/", +) + + +class BilibiliUrlIngestProvider: + def __init__(self, yt_dlp: YtDlpAdapter | None = None) -> None: + self.yt_dlp = yt_dlp or YtDlpAdapter() + + manifest = ProviderManifest( + id="bilibili_url", + name="Bilibili URL Ingest", + version="0.1.0", + provider_type="ingest_provider", + entrypoint="biliup_next.modules.ingest.providers.bilibili_url:BilibiliUrlIngestProvider", + capabilities=["ingest"], + enabled_by_default=True, + ) + + def validate_source(self, source_url: str, settings: dict[str, Any]) -> None: + if not isinstance(source_url, str) or not source_url.strip(): + raise ModuleError( + code="SOURCE_URL_MISSING", + message="缺少 source_url", + retryable=False, + ) + source_url = source_url.strip() + if not source_url.startswith(URL_PREFIXES): + raise ModuleError( + code="SOURCE_URL_NOT_SUPPORTED", + message=f"当前仅支持 B 站视频链接: {source_url}", + retryable=False, + ) + + def resolve_source(self, source_url: str, settings: dict[str, Any]) -> dict[str, Any]: + yt_dlp_cmd = str(settings.get("yt_dlp_cmd", "yt-dlp")) + info = self.yt_dlp.probe(yt_dlp_cmd=yt_dlp_cmd, source_url=source_url) + video_id = str(info.get("id") or "").strip() + title = str(info.get("title") or video_id or "bilibili-video").strip() + if not video_id: + raise ModuleError( + code="YT_DLP_METADATA_MISSING_ID", + message="yt-dlp 未返回视频 ID", + retryable=True, + ) + return { + "task_id": self._safe_name(f"{title} [{video_id}]"), + "title": title, + "video_id": video_id, + "streamer": self._clean_text(info.get("uploader") or info.get("channel")), + "segment_duration_seconds": self._coerce_float(info.get("duration")), + "source_url": source_url, + } + + def download_source( + self, + source_url: str, + task_dir: Path, + settings: dict[str, Any], + *, + task_id: str, + ) -> Path: + yt_dlp_cmd = str(settings.get("yt_dlp_cmd", "yt-dlp")) + format_selector = str(settings.get("yt_dlp_format", "")).strip() or None + task_dir.mkdir(parents=True, exist_ok=True) + output_template = str((task_dir / f"{task_id}.%(ext)s").resolve()) + result = self.yt_dlp.download( + yt_dlp_cmd=yt_dlp_cmd, + source_url=source_url, + output_template=output_template, + format_selector=format_selector, + ) + if result.returncode != 0: + raise ModuleError( + code="YT_DLP_DOWNLOAD_FAILED", + message="yt-dlp 下载视频失败", + retryable=True, + details={"stdout": result.stdout[-2000:], "stderr": result.stderr[-2000:]}, + ) + candidates = [ + path + for path in sorted(task_dir.iterdir()) + if path.is_file() and path.stem == task_id and path.suffix.lower() in {".mp4", ".mkv", ".flv", ".mov"} + ] + if not candidates: + raise ModuleError( + code="YT_DLP_OUTPUT_MISSING", + message=f"下载完成但未找到目标视频文件: {task_dir}", + retryable=True, + details={"stdout": result.stdout[-2000:], "stderr": result.stderr[-2000:]}, + ) + return candidates[0].resolve() + + @staticmethod + def _safe_name(value: str) -> str: + cleaned = FORBIDDEN_PATH_CHARS.sub("_", value).strip().rstrip(".") + cleaned = re.sub(r"\s+", " ", cleaned) + return cleaned[:180] or "bilibili-video" + + @staticmethod + def _clean_text(value: object) -> str | None: + if value is None: + return None + text = str(value).strip() + return text or None + + @staticmethod + def _coerce_float(value: object) -> float | None: + if value in {None, ""}: + return None + try: + return float(value) + except (TypeError, ValueError): + return None diff --git a/src/biliup_next/modules/ingest/service.py b/src/biliup_next/modules/ingest/service.py index 4a8fdf4..438e2b9 100644 --- a/src/biliup_next/modules/ingest/service.py +++ b/src/biliup_next/modules/ingest/service.py @@ -31,8 +31,12 @@ class IngestService: settings: dict[str, object], *, context_payload: dict[str, object] | None = None, + provider_id: str | None = None, + source_type: str = "local_file", + title_override: str | None = None, + source_ref: str | None = None, ) -> Task: - provider_id = str(settings.get("provider", "local_file")) + provider_id = provider_id or str(settings.get("provider", "local_file")) provider = self.registry.get("ingest_provider", provider_id) provider.validate_source(source_path, settings) source_path = source_path.resolve() @@ -59,9 +63,9 @@ class IngestService: context_payload = context_payload or {} task = Task( id=task_id, - source_type="local_file", + source_type=source_type, source_path=str(source_path), - title=source_path.stem, + title=title_override or source_path.stem, status="created", created_at=now, updated_at=now, @@ -86,7 +90,7 @@ class IngestService: task_id=task_id, artifact_type="source_video", path=str(source_path), - metadata_json=json.dumps({"provider": provider_id}), + metadata_json=json.dumps({"provider": provider_id, "source_ref": source_ref}), created_at=now, ) ) @@ -103,6 +107,41 @@ class IngestService: (source_path.parent / "full_video_bvid.txt").write_text(full_video_bvid, encoding="utf-8") return task + def create_task_from_url(self, source_url: str, settings: dict[str, object]) -> Task: + provider_id = str(settings.get("provider", "bilibili_url")) + provider = self.registry.get("ingest_provider", provider_id) + provider.validate_source(source_url, settings) + + session_dir = Path(str(settings["session_dir"])).resolve() + session_dir.mkdir(parents=True, exist_ok=True) + + resolved = provider.resolve_source(source_url, settings) + task_id = str(resolved["task_id"]) + if self.repo.get_task(task_id): + raise ModuleError( + code="TASK_ALREADY_EXISTS", + message=f"任务已存在: {task_id}", + retryable=False, + ) + + task_dir = session_dir / task_id + downloaded_path = provider.download_source(source_url, task_dir, settings, task_id=task_id) + context_payload = { + "source_title": resolved.get("title"), + "streamer": resolved.get("streamer"), + "segment_duration_seconds": resolved.get("segment_duration_seconds"), + "reference_timestamp": time.time(), + } + return self.create_task_from_file( + downloaded_path, + settings, + context_payload=context_payload, + provider_id=provider_id, + source_type="bilibili_url", + title_override=str(resolved.get("title") or task_id), + source_ref=source_url, + ) + def scan_stage(self, settings: dict[str, object]) -> dict[str, object]: stage_dir = Path(str(settings["stage_dir"])).resolve() backup_dir = Path(str(settings["backup_dir"])).resolve() @@ -315,8 +354,7 @@ class IngestService: streamer=streamer, room_id=room_id, segment_started_at=segment_started_at, - segment_duration_seconds=segment_duration, - fallback_task_id=task.id, + source_title=source_title, gap_minutes=session_gap_minutes, ) if full_video_bvid is None: @@ -413,8 +451,7 @@ class IngestService: streamer: str | None, room_id: str | None, segment_started_at: str | None, - segment_duration_seconds: float | None, - fallback_task_id: str, + source_title: str, gap_minutes: int, ) -> tuple[str | None, str | None]: if not streamer or not segment_started_at: @@ -424,27 +461,35 @@ class IngestService: except ValueError: return None, None - tolerance = timedelta(minutes=max(gap_minutes, 0)) + tolerance = timedelta(minutes=max(gap_minutes, 180)) + matched_contexts: list[TaskContext] = [] for context in self.repo.find_recent_task_contexts(streamer): if room_id and context.room_id and room_id != context.room_id: continue - candidate_end = self._context_end_time(context) - if candidate_end is None: + candidate_start = self._context_start_time(context) + if candidate_start is None: continue - if segment_start >= candidate_end and segment_start - candidate_end <= tolerance: - return context.session_key, context.full_video_bvid - date_tag = segment_start.astimezone(SHANGHAI_TZ).strftime("%Y%m%dT%H%M") - return f"{streamer}:{date_tag}", None + if abs(segment_start - candidate_start) <= tolerance: + matched_contexts.append(context) + if matched_contexts: + anchor = min( + matched_contexts, + key=lambda context: ( + self._context_start_time(context) or segment_start, + context.source_title or context.session_key, + ), + ) + return anchor.session_key, anchor.full_video_bvid + return source_title, None @staticmethod - def _context_end_time(context: TaskContext) -> datetime | None: - if not context.segment_started_at or context.segment_duration_seconds is None: + def _context_start_time(context: TaskContext) -> datetime | None: + if not context.segment_started_at: return None try: - started_at = datetime.fromisoformat(context.segment_started_at) + return datetime.fromisoformat(context.segment_started_at) except ValueError: return None - return started_at + timedelta(seconds=float(context.segment_duration_seconds)) def _find_full_video_bvid_by_session_key(self, session_key: str) -> str | None: for context in self.repo.list_task_contexts_by_session_key(session_key): diff --git a/src/biliup_next/modules/publish/providers/biliup_cli.py b/src/biliup_next/modules/publish/providers/biliup_cli.py index 8bbb9b5..4f41d09 100644 --- a/src/biliup_next/modules/publish/providers/biliup_cli.py +++ b/src/biliup_next/modules/publish/providers/biliup_cli.py @@ -31,6 +31,8 @@ class BiliupCliPublishProvider: work_dir = Path(str(settings["session_dir"])) / task.title bvid_file = work_dir / "bvid.txt" upload_done = work_dir / "upload_done.flag" + publish_log = work_dir / "publish.log" + publish_progress = work_dir / "publish_progress.json" config = self._load_upload_config(Path(str(settings["upload_config_file"]))) video_files = [artifact.path for artifact in clip_videos] @@ -45,8 +47,8 @@ class BiliupCliPublishProvider: streamer = parsed.get("streamer", task.title) date = parsed.get("date", "") - songs_txt = work_dir / "songs.txt" - songs_json = work_dir / "songs.json" + songs_txt = Path(str(settings.get("publish_songs_txt_path", work_dir / "songs.txt"))) + songs_json = Path(str(settings.get("publish_songs_json_path", work_dir / "songs.json"))) songs_list = songs_txt.read_text(encoding="utf-8").strip() if songs_txt.exists() else "" song_count = 0 if songs_json.exists(): @@ -75,13 +77,20 @@ class BiliupCliPublishProvider: biliup_path = str(settings["biliup_path"]) cookie_file = str(settings["cookie_file"]) retry_count = max(1, int(settings.get("retry_count", 5))) + command_timeout_seconds = max(1, int(settings.get("command_timeout_seconds", 1800))) - self.adapter.run_optional([biliup_path, "-u", cookie_file, "renew"]) + self.adapter.run_optional( + [biliup_path, "-u", cookie_file, "renew"], + label="刷新 biliup 登录态", + timeout_seconds=command_timeout_seconds, + log_path=publish_log, + ) first_batch = video_files[:5] remaining_batches = [video_files[i:i + 5] for i in range(5, len(video_files), 5)] existing_bvid = bvid_file.read_text(encoding="utf-8").strip() if bvid_file.exists() else "" + progress = self._load_publish_progress(publish_progress) if upload_done.exists() and existing_bvid.startswith("BV"): return PublishRecord( id=None, @@ -93,21 +102,35 @@ class BiliupCliPublishProvider: published_at=utc_now_iso(), ) - bvid = existing_bvid if existing_bvid.startswith("BV") else self._upload_first_batch( - biliup_path=biliup_path, - cookie_file=cookie_file, - first_batch=first_batch, - title=title, - tid=tid, - tags=tags, - description=description, - dynamic=dynamic, - upload_settings=upload_settings, - retry_count=retry_count, - ) - bvid_file.write_text(bvid, encoding="utf-8") + if existing_bvid.startswith("BV") and not publish_progress.exists() and remaining_batches: + progress = {"bvid": existing_bvid, "completed_append_batches": []} + self._write_publish_progress(publish_progress, progress) + if existing_bvid.startswith("BV") and publish_progress.exists(): + bvid = existing_bvid + else: + bvid = self._upload_first_batch( + biliup_path=biliup_path, + cookie_file=cookie_file, + first_batch=first_batch, + title=title, + tid=tid, + tags=tags, + description=description, + dynamic=dynamic, + upload_settings=upload_settings, + retry_count=retry_count, + command_timeout_seconds=command_timeout_seconds, + publish_log=publish_log, + ) + bvid_file.write_text(bvid, encoding="utf-8") + progress = {"bvid": bvid, "completed_append_batches": []} + self._write_publish_progress(publish_progress, progress) + + completed_append_batches = set(self._progress_batches(progress)) for batch_index, batch in enumerate(remaining_batches, start=2): + if batch_index in completed_append_batches: + continue self._append_batch( biliup_path=biliup_path, cookie_file=cookie_file, @@ -115,9 +138,16 @@ class BiliupCliPublishProvider: batch=batch, batch_index=batch_index, retry_count=retry_count, + command_timeout_seconds=command_timeout_seconds, + publish_log=publish_log, ) + completed_append_batches.add(batch_index) + progress = {"bvid": bvid, "completed_append_batches": sorted(completed_append_batches)} + self._write_publish_progress(publish_progress, progress) upload_done.touch() + if publish_progress.exists(): + publish_progress.unlink() return PublishRecord( id=None, task_id=task.id, @@ -141,6 +171,8 @@ class BiliupCliPublishProvider: dynamic: str, upload_settings: dict[str, Any], retry_count: int, + command_timeout_seconds: int, + publish_log: Path, ) -> str: upload_cmd = [ biliup_path, @@ -168,20 +200,20 @@ class BiliupCliPublishProvider: upload_cmd.extend(["--cover", cover]) for attempt in range(1, retry_count + 1): - result = self.adapter.run(upload_cmd, label=f"首批上传[{attempt}/{retry_count}]") + result = self.adapter.run( + upload_cmd, + label=f"首批上传[{attempt}/{retry_count}]", + timeout_seconds=command_timeout_seconds, + log_path=publish_log, + ) if result.returncode == 0: - match = re.search(r'"bvid":"(BV[A-Za-z0-9]+)"', result.stdout) or re.search(r"(BV[A-Za-z0-9]+)", result.stdout) + match = self._extract_bvid(result.stdout) if match: - return match.group(1) + return match if attempt < retry_count: time.sleep(self._wait_seconds(attempt - 1)) continue - raise ModuleError( - code="PUBLISH_UPLOAD_FAILED", - message="首批上传失败", - retryable=True, - details={"stdout": result.stdout[-2000:], "stderr": result.stderr[-2000:]}, - ) + raise self._classify_publish_error(result, default_code="PUBLISH_UPLOAD_FAILED", default_message="首批上传失败") raise AssertionError("unreachable") def _append_batch( @@ -193,21 +225,27 @@ class BiliupCliPublishProvider: batch: list[str], batch_index: int, retry_count: int, + command_timeout_seconds: int, + publish_log: Path, ) -> None: time.sleep(45) append_cmd = [biliup_path, "-u", cookie_file, "append", "--vid", bvid, *batch] for attempt in range(1, retry_count + 1): - result = self.adapter.run(append_cmd, label=f"追加第{batch_index}批[{attempt}/{retry_count}]") + result = self.adapter.run( + append_cmd, + label=f"追加第{batch_index}批[{attempt}/{retry_count}]", + timeout_seconds=command_timeout_seconds, + log_path=publish_log, + ) if result.returncode == 0: return if attempt < retry_count: time.sleep(self._wait_seconds(attempt - 1)) continue - raise ModuleError( - code="PUBLISH_APPEND_FAILED", - message=f"追加第 {batch_index} 批失败", - retryable=True, - details={"stdout": result.stdout[-2000:], "stderr": result.stderr[-2000:]}, + raise self._classify_publish_error( + result, + default_code="PUBLISH_APPEND_FAILED", + default_message=f"追加第 {batch_index} 批失败", ) @staticmethod @@ -245,3 +283,74 @@ class BiliupCliPublishProvider: if not quotes: return {"text": "", "author": ""} return random.choice(quotes) + + @staticmethod + def _classify_publish_error( + result, + *, + default_code: str, + default_message: str, + ) -> ModuleError: + stdout = result.stdout[-2000:] + stderr = result.stderr[-2000:] + combined = f"{result.stdout}\n{result.stderr}" + details = {"stdout": stdout, "stderr": stderr} + if "code: 601" in combined or "上传视频过快" in combined: + return ModuleError( + code="PUBLISH_RATE_LIMITED", + message="B站上传限流,请稍后重试", + retryable=True, + details=details, + ) + if "code: 21021" in combined or "转载来源不能为空" in combined: + return ModuleError( + code="PUBLISH_SOURCE_REQUIRED", + message="转载稿件缺少来源说明", + retryable=False, + details=details, + ) + return ModuleError( + code=default_code, + message=default_message, + retryable=True, + details=details, + ) + + @staticmethod + def _extract_bvid(output: str) -> str | None: + patterns = ( + r'"bvid":"(BV[A-Za-z0-9]+)"', + r'String\("?(BV[A-Za-z0-9]+)"?\)', + r"\b(BV[A-Za-z0-9]+)\b", + ) + for pattern in patterns: + match = re.search(pattern, output) + if match: + return match.group(1) + return None + + @staticmethod + def _load_publish_progress(path: Path) -> dict[str, Any]: + if not path.exists(): + return {} + try: + return json.loads(path.read_text(encoding="utf-8")) + except json.JSONDecodeError: + return {} + + @staticmethod + def _write_publish_progress(path: Path, progress: dict[str, Any]) -> None: + path.write_text(json.dumps(progress, ensure_ascii=False, indent=2) + "\n", encoding="utf-8") + + @staticmethod + def _progress_batches(progress: dict[str, Any]) -> list[int]: + raw = progress.get("completed_append_batches") + if not isinstance(raw, list): + return [] + batches: list[int] = [] + for item in raw: + try: + batches.append(int(item)) + except (TypeError, ValueError): + continue + return batches diff --git a/src/biliup_next/modules/publish/service.py b/src/biliup_next/modules/publish/service.py index 56577a6..5eee29b 100644 --- a/src/biliup_next/modules/publish/service.py +++ b/src/biliup_next/modules/publish/service.py @@ -1,9 +1,11 @@ from __future__ import annotations import json +from datetime import datetime from pathlib import Path +from typing import Any -from biliup_next.core.models import Artifact, PublishRecord, utc_now_iso +from biliup_next.core.models import Artifact, PublishRecord, TaskContext, utc_now_iso from biliup_next.core.registry import Registry from biliup_next.infra.task_repository import TaskRepository @@ -17,22 +19,75 @@ class PublishService: task = self.repo.get_task(task_id) if task is None: raise RuntimeError(f"task not found: {task_id}") - artifacts = self.repo.list_artifacts(task_id) - clip_videos = [a for a in artifacts if a.artifact_type == "clip_video"] provider = self.registry.get("publish_provider", str(settings.get("provider", "biliup_cli"))) started_at = utc_now_iso() self.repo.update_step_status(task_id, "publish", "running", started_at=started_at) - record = provider.publish(task, clip_videos, settings) + session_contexts = self._session_contexts(task_id) + if len(session_contexts) <= 1: + clip_videos = self._clip_videos_for_task(task_id) + record = provider.publish(task, clip_videos, settings) + self._persist_publish_success(task_id, task.title, record, settings) + return record + + anchor_context = session_contexts[0] + shared_bvid = self._shared_session_bvid(session_contexts, settings) + if task_id != anchor_context.task_id and shared_bvid: + record = PublishRecord( + id=None, + task_id=task_id, + platform="bilibili", + aid=None, + bvid=shared_bvid, + title=task.title, + published_at=utc_now_iso(), + ) + self._persist_publish_success(task_id, task.title, record, settings) + return record + + clip_videos = self._session_clip_videos(session_contexts) + anchor_task = self.repo.get_task(anchor_context.task_id) + if anchor_task is None: + raise RuntimeError(f"anchor task not found: {anchor_context.task_id}") + session_settings = dict(settings) + session_settings.update(self._session_publish_metadata(anchor_task, session_contexts, settings)) + record = provider.publish(anchor_task, clip_videos, session_settings) + for context in session_contexts: + session_task = self.repo.get_task(context.task_id) + if session_task is None: + continue + session_record = PublishRecord( + id=None, + task_id=context.task_id, + platform=record.platform, + aid=record.aid, + bvid=record.bvid, + title=record.title, + published_at=record.published_at, + ) + self._persist_publish_success(context.task_id, session_task.title, session_record, settings) + return PublishRecord( + id=None, + task_id=task_id, + platform=record.platform, + aid=record.aid, + bvid=record.bvid, + title=record.title, + published_at=record.published_at, + ) + + def _persist_publish_success(self, task_id: str, task_title: str, record: PublishRecord, settings: dict[str, object]) -> None: self.repo.add_publish_record(record) if record.bvid: - session_dir = Path(str(settings.get("session_dir", "session"))) / task.title - bvid_path = str((session_dir / "bvid.txt").resolve()) + session_dir = Path(str(settings.get("session_dir", "session"))) / task_title + session_dir.mkdir(parents=True, exist_ok=True) + bvid_path_obj = session_dir / "bvid.txt" + bvid_path_obj.write_text(record.bvid, encoding="utf-8") self.repo.add_artifact( Artifact( id=None, task_id=task_id, artifact_type="publish_bvid", - path=bvid_path, + path=str(bvid_path_obj.resolve()), metadata_json=json.dumps({}), created_at=utc_now_iso(), ) @@ -40,4 +95,95 @@ class PublishService: finished_at = utc_now_iso() self.repo.update_step_status(task_id, "publish", "succeeded", finished_at=finished_at) self.repo.update_task_status(task_id, "published", finished_at) - return record + + def _session_contexts(self, task_id: str) -> list[TaskContext]: + context = self.repo.get_task_context(task_id) + if context is None or not context.session_key or context.session_key.startswith("task:"): + return [context] if context is not None else [] + contexts = list(self.repo.list_task_contexts_by_session_key(context.session_key)) + return sorted( + contexts, + key=lambda item: ( + self._parse_started_at(item.segment_started_at), + item.source_title or item.task_id, + ), + ) + + def _clip_videos_for_task(self, task_id: str) -> list[Artifact]: + artifacts = self.repo.list_artifacts(task_id) + return [artifact for artifact in artifacts if artifact.artifact_type == "clip_video"] + + def _session_clip_videos(self, contexts: list[TaskContext]) -> list[Artifact]: + aggregated: list[Artifact] = [] + for context in contexts: + aggregated.extend(self._clip_videos_for_task(context.task_id)) + return aggregated + + def _shared_session_bvid(self, contexts: list[TaskContext], settings: dict[str, object]) -> str | None: + session_dir_root = Path(str(settings.get("session_dir", "session"))) + for context in contexts: + task = self.repo.get_task(context.task_id) + if task is None: + continue + bvid_path = session_dir_root / task.title / "bvid.txt" + if bvid_path.exists(): + bvid = bvid_path.read_text(encoding="utf-8").strip() + if bvid.startswith("BV"): + return bvid + return None + + def _session_publish_metadata( + self, + anchor_task, + contexts: list[TaskContext], + settings: dict[str, object], + ) -> dict[str, Any]: # type: ignore[no-untyped-def] + session_dir_root = Path(str(settings.get("session_dir", "session"))) + anchor_work_dir = (session_dir_root / anchor_task.title).resolve() + anchor_work_dir.mkdir(parents=True, exist_ok=True) + aggregate_txt_lines: list[str] = [] + aggregate_songs: list[dict[str, object]] = [] + + for index, context in enumerate(contexts, start=1): + task = self.repo.get_task(context.task_id) + if task is None: + continue + task_work_dir = (session_dir_root / task.title).resolve() + songs_txt = task_work_dir / "songs.txt" + songs_json = task_work_dir / "songs.json" + + if songs_txt.exists(): + lines = [line.strip() for line in songs_txt.read_text(encoding="utf-8").splitlines() if line.strip()] + if lines: + aggregate_txt_lines.append(f"P{index}:") + aggregate_txt_lines.extend(lines) + aggregate_txt_lines.append("") + + if songs_json.exists(): + try: + songs = json.loads(songs_json.read_text(encoding="utf-8")).get("songs", []) + except json.JSONDecodeError: + songs = [] + if isinstance(songs, list): + aggregate_songs.extend(song for song in songs if isinstance(song, dict)) + + aggregate_txt_path = anchor_work_dir / "session_split_songs.txt" + aggregate_json_path = anchor_work_dir / "session_split_songs.json" + aggregate_txt_path.write_text("\n".join(aggregate_txt_lines).strip() + "\n", encoding="utf-8") + aggregate_json_path.write_text( + json.dumps({"songs": aggregate_songs}, ensure_ascii=False, indent=2) + "\n", + encoding="utf-8", + ) + return { + "publish_songs_txt_path": str(aggregate_txt_path), + "publish_songs_json_path": str(aggregate_json_path), + } + + @staticmethod + def _parse_started_at(value: str | None) -> datetime: + if not value: + return datetime.max + try: + return datetime.fromisoformat(value) + except ValueError: + return datetime.max diff --git a/src/biliup_next/modules/song_detect/providers/codex.py b/src/biliup_next/modules/song_detect/providers/codex.py index 697d746..494b08e 100644 --- a/src/biliup_next/modules/song_detect/providers/codex.py +++ b/src/biliup_next/modules/song_detect/providers/codex.py @@ -8,51 +8,7 @@ from biliup_next.core.errors import ModuleError from biliup_next.core.models import Artifact, Task, utc_now_iso from biliup_next.core.providers import ProviderManifest from biliup_next.infra.adapters.codex_cli import CodexCliAdapter - -SONG_SCHEMA = { - "type": "object", - "properties": { - "songs": { - "type": "array", - "items": { - "type": "object", - "properties": { - "start": {"type": "string"}, - "end": {"type": "string"}, - "title": {"type": "string"}, - "artist": {"type": "string"}, - "confidence": {"type": "number"}, - "evidence": {"type": "string"}, - }, - "required": ["start", "end", "title", "artist", "confidence", "evidence"], - "additionalProperties": False, - }, - } - }, - "required": ["songs"], - "additionalProperties": False, -} - -TASK_PROMPT = """你是音乐片段识别助手。当前目录下有一个字幕文件。 -任务: -1. 结合字幕内容并允许联网搜索进行纠错(识别同音字、唱错等)。 -2. 识别出直播中唱过的所有歌曲,给出精确的开始和结束时间。歌曲开始时间规则: - - 歌曲开始时间应使用“上一句字幕的结束时间”作为 start_time。 - - 这样可以尽量保留歌曲可能存在的前奏。 -3. 同一首歌间隔 ≤160s 合并,>160s 分开。若连续识别出相同歌曲,且中间只有短暂对白、空白、转场或无歌词段,应合并为同一首歌. -4. 忽略纯聊天片段。 -5. 无法确认的歌曲丢弃,宁缺毋滥:你的输出将直接面向最终用户。 -6. 忽略短片段:如果一段演唱持续时间总和少于 15 秒,视为随口哼唱,请直接忽略,不计入列表。 -7. 仔细分析每一句歌词,识别出相关歌曲后, 使用该歌曲歌词上下文对比字幕上下文,确定歌曲起始与停止时间 -8.歌曲标注规则: - - 可以在歌曲名称后使用括号 () 添加补充说明。 - - 常见标注示例: - - (片段):歌曲演唱时间较短,例如 < 60 秒 - - (清唱):无伴奏演唱 - - (副歌):只演唱副歌部分 - - 标注应简洁,仅在确有必要时使用。 -9. 通过歌曲起始和结束时间自检, 一般歌曲长度在5分钟以内, 1分钟以上, 可疑片段重新联网搜索检查. -最后请严格按照 Schema 生成 JSON 数据。""" +from biliup_next.modules.song_detect.providers.common import TASK_PROMPT, ensure_song_outputs, write_song_schema class CodexSongDetector: @@ -71,10 +27,9 @@ class CodexSongDetector: def detect(self, task: Task, subtitle_srt: Artifact, settings: dict[str, Any]) -> tuple[Artifact, Artifact]: work_dir = Path(subtitle_srt.path).resolve().parent - schema_path = work_dir / "song_schema.json" + write_song_schema(work_dir) songs_json_path = work_dir / "songs.json" songs_txt_path = work_dir / "songs.txt" - schema_path.write_text(json.dumps(SONG_SCHEMA, ensure_ascii=False, indent=2), encoding="utf-8") codex_cmd = str(settings.get("codex_cmd", "codex")) result = self.adapter.run_song_detect( @@ -91,16 +46,13 @@ class CodexSongDetector: details={"stdout": result.stdout[-2000:], "stderr": result.stderr[-2000:]}, ) - if songs_json_path.exists() and not songs_txt_path.exists(): - self._generate_txt_fallback(songs_json_path, songs_txt_path) - - if not songs_json_path.exists() or not songs_txt_path.exists(): - raise ModuleError( - code="SONG_DETECT_OUTPUT_MISSING", - message=f"未生成 songs.json/songs.txt: {work_dir}", - retryable=True, - details={"stdout": result.stdout[-2000:], "stderr": result.stderr[-2000:]}, - ) + ensure_song_outputs( + songs_json_path=songs_json_path, + songs_txt_path=songs_txt_path, + stdout=result.stdout, + stderr=result.stderr, + provider_name="codex", + ) return ( Artifact( @@ -120,19 +72,3 @@ class CodexSongDetector: created_at=utc_now_iso(), ), ) - - def _generate_txt_fallback(self, songs_json_path: Path, songs_txt_path: Path) -> None: - try: - data = json.loads(songs_json_path.read_text(encoding="utf-8")) - songs = data.get("songs", []) - with songs_txt_path.open("w", encoding="utf-8") as file_handle: - for song in songs: - start_time = str(song["start"]).split(",")[0].split(".")[0] - file_handle.write(f"{start_time} {song['title']} — {song['artist']}\n") - except Exception as exc: # noqa: BLE001 - raise ModuleError( - code="SONGS_TXT_GENERATE_FAILED", - message=f"生成 songs.txt 失败: {songs_txt_path}", - retryable=False, - details={"error": str(exc)}, - ) from exc diff --git a/src/biliup_next/modules/song_detect/providers/common.py b/src/biliup_next/modules/song_detect/providers/common.py new file mode 100644 index 0000000..1e6c184 --- /dev/null +++ b/src/biliup_next/modules/song_detect/providers/common.py @@ -0,0 +1,122 @@ +from __future__ import annotations + +import json +from pathlib import Path + +from biliup_next.core.errors import ModuleError + +SONG_SCHEMA = { + "type": "object", + "properties": { + "songs": { + "type": "array", + "items": { + "type": "object", + "properties": { + "start": {"type": "string"}, + "end": {"type": "string"}, + "title": {"type": "string"}, + "artist": {"type": "string"}, + "confidence": {"type": "number"}, + "evidence": {"type": "string"}, + }, + "required": ["start", "end", "title", "artist", "confidence", "evidence"], + "additionalProperties": False, + }, + } + }, + "required": ["songs"], + "additionalProperties": False, +} + +TASK_PROMPT = """你是音乐片段识别助手。当前目录下有一个字幕文件。 +任务: +1. 结合字幕内容并允许联网搜索进行纠错(识别同音字、唱错等)。 +2. 识别出直播中唱过的所有歌曲,给出精确的开始和结束时间。歌曲开始时间规则: + - 歌曲开始时间应使用“上一句字幕的结束时间”作为 start_time。 + - 这样可以尽量保留歌曲可能存在的前奏。 +3. 同一首歌间隔 ≤160s 合并,>160s 分开。若连续识别出相同歌曲,且中间只有短暂对白、空白、转场或无歌词段,应合并为同一首歌. +4. 忽略纯聊天片段。 +5. 无法确认的歌曲丢弃,宁缺毋滥:你的输出将直接面向最终用户。 +6. 忽略短片段:如果一段演唱持续时间总和少于 15 秒,视为随口哼唱,请直接忽略,不计入列表。 +7. 仔细分析每一句歌词,识别出相关歌曲后, 使用该歌曲歌词上下文对比字幕上下文,确定歌曲起始与停止时间 +8.歌曲标注规则: + - 可以在歌曲名称后使用括号 () 添加补充说明。 + - 常见标注示例: + - (片段):歌曲演唱时间较短,例如 < 60 秒 + - (清唱):无伴奏演唱 + - (副歌):只演唱副歌部分 + - 标注应简洁,仅在确有必要时使用。 +9. 通过歌曲起始和结束时间自检, 一般歌曲长度在5分钟以内, 1分钟以上, 可疑片段重新联网搜索检查. +最后请严格按照 Schema 生成 JSON 数据。""" + +QWEN_TASK_PROMPT = """你是音乐片段识别助手。当前目录下有一个字幕文件 `subtitle.srt` 和一个 JSON schema 文件 `song_schema.json`。 +任务: +1. 结合字幕内容并允许联网搜索进行纠错(识别同音字、唱错等)。 +2. 识别出直播中唱过的所有歌曲,给出精确的开始和结束时间。歌曲开始时间规则: + - 歌曲开始时间应使用“上一句字幕的结束时间”作为 start_time。 + - 这样可以尽量保留歌曲可能存在的前奏。 +3. 同一首歌间隔 ≤160s 合并,>160s 分开。若连续识别出相同歌曲,且中间只有短暂对白、空白、转场或无歌词段,应合并为同一首歌。 +4. 忽略纯聊天片段。 +5. 无法确认的歌曲丢弃,宁缺毋滥:你的输出将直接面向最终用户。 +6. 忽略短片段:如果一段演唱持续时间总和少于 15 秒,视为随口哼唱,请直接忽略,不计入列表。 +7. 仔细分析每一句歌词,识别出相关歌曲后,使用该歌曲歌词上下文对比字幕上下文,确定歌曲起始与停止时间。 +8. 歌曲名称后可以按需补充 `(片段)`、`(清唱)`、`(副歌)` 等简短标注。 +9. 通过歌曲起始和结束时间自检,一般歌曲长度在 5 分钟以内、1 分钟以上,可疑片段重新联网搜索检查。 + +输出要求: +1. 读取 `song_schema.json`,生成严格符合 schema 的 JSON。 +2. 把 JSON 保存到当前目录的 `songs.json`。 +3. 再生成一个 `songs.txt`,每行格式为 `HH:MM:SS 歌曲名 — 歌手`,其中时间取每首歌的开始时间,忽略毫秒。 +4. 不要修改其他文件。 +5. 完成后只输出简短结果说明。 +""" + + +def write_song_schema(work_dir: Path) -> Path: + schema_path = work_dir / "song_schema.json" + schema_path.write_text(json.dumps(SONG_SCHEMA, ensure_ascii=False, indent=2), encoding="utf-8") + return schema_path + + +def ensure_song_outputs( + *, + songs_json_path: Path, + songs_txt_path: Path, + stdout: str, + stderr: str, + provider_name: str, +) -> None: + if songs_json_path.exists() and not songs_txt_path.exists(): + generate_txt_fallback(songs_json_path, songs_txt_path) + + if songs_json_path.exists() and songs_txt_path.exists(): + return + + raise ModuleError( + code="SONG_DETECT_OUTPUT_MISSING", + message=f"未生成 songs.json/songs.txt: {songs_json_path.parent}", + retryable=True, + details={ + "provider": provider_name, + "stdout": stdout[-2000:], + "stderr": stderr[-2000:], + }, + ) + + +def generate_txt_fallback(songs_json_path: Path, songs_txt_path: Path) -> None: + try: + data = json.loads(songs_json_path.read_text(encoding="utf-8")) + songs = data.get("songs", []) + with songs_txt_path.open("w", encoding="utf-8") as file_handle: + for song in songs: + start_time = str(song["start"]).split(",")[0].split(".")[0] + file_handle.write(f"{start_time} {song['title']} — {song['artist']}\n") + except Exception as exc: # noqa: BLE001 + raise ModuleError( + code="SONGS_TXT_GENERATE_FAILED", + message=f"生成 songs.txt 失败: {songs_txt_path}", + retryable=False, + details={"error": str(exc)}, + ) from exc diff --git a/src/biliup_next/modules/song_detect/providers/qwen_cli.py b/src/biliup_next/modules/song_detect/providers/qwen_cli.py new file mode 100644 index 0000000..e9055fc --- /dev/null +++ b/src/biliup_next/modules/song_detect/providers/qwen_cli.py @@ -0,0 +1,78 @@ +from __future__ import annotations + +import json +from pathlib import Path +from typing import Any + +from biliup_next.core.errors import ModuleError +from biliup_next.core.models import Artifact, Task, utc_now_iso +from biliup_next.core.providers import ProviderManifest +from biliup_next.infra.adapters.qwen_cli import QwenCliAdapter +from biliup_next.modules.song_detect.providers.common import ( + QWEN_TASK_PROMPT, + ensure_song_outputs, + write_song_schema, +) + + +class QwenCliSongDetector: + def __init__(self, adapter: QwenCliAdapter | None = None) -> None: + self.adapter = adapter or QwenCliAdapter() + + manifest = ProviderManifest( + id="qwen_cli", + name="Qwen CLI Song Detector", + version="0.1.0", + provider_type="song_detector", + entrypoint="biliup_next.modules.song_detect.providers.qwen_cli:QwenCliSongDetector", + capabilities=["song_detect"], + enabled_by_default=True, + ) + + def detect(self, task: Task, subtitle_srt: Artifact, settings: dict[str, Any]) -> tuple[Artifact, Artifact]: + work_dir = Path(subtitle_srt.path).resolve().parent + write_song_schema(work_dir) + songs_json_path = work_dir / "songs.json" + songs_txt_path = work_dir / "songs.txt" + + qwen_cmd = str(settings.get("qwen_cmd", "qwen")) + result = self.adapter.run_song_detect( + qwen_cmd=qwen_cmd, + work_dir=work_dir, + prompt=QWEN_TASK_PROMPT, + ) + + if result.returncode != 0: + raise ModuleError( + code="SONG_DETECT_FAILED", + message="qwen -p 执行失败", + retryable=True, + details={"stdout": result.stdout[-2000:], "stderr": result.stderr[-2000:]}, + ) + + ensure_song_outputs( + songs_json_path=songs_json_path, + songs_txt_path=songs_txt_path, + stdout=result.stdout, + stderr=result.stderr, + provider_name="qwen_cli", + ) + + return ( + Artifact( + id=None, + task_id=task.id, + artifact_type="songs_json", + path=str(songs_json_path.resolve()), + metadata_json=json.dumps({"provider": "qwen_cli"}), + created_at=utc_now_iso(), + ), + Artifact( + id=None, + task_id=task.id, + artifact_type="songs_txt", + path=str(songs_txt_path.resolve()), + metadata_json=json.dumps({"provider": "qwen_cli"}), + created_at=utc_now_iso(), + ), + ) diff --git a/src/biliup_next/plugins/manifests/ingest_bilibili_url.json b/src/biliup_next/plugins/manifests/ingest_bilibili_url.json new file mode 100644 index 0000000..9fe1dfc --- /dev/null +++ b/src/biliup_next/plugins/manifests/ingest_bilibili_url.json @@ -0,0 +1,9 @@ +{ + "id": "bilibili_url", + "name": "Bilibili URL Ingest", + "version": "0.1.0", + "provider_type": "ingest_provider", + "entrypoint": "biliup_next.modules.ingest.providers.bilibili_url:BilibiliUrlIngestProvider", + "capabilities": ["ingest"], + "enabled_by_default": true +} diff --git a/src/biliup_next/plugins/manifests/song_detect_qwen_cli.json b/src/biliup_next/plugins/manifests/song_detect_qwen_cli.json new file mode 100644 index 0000000..be7861f --- /dev/null +++ b/src/biliup_next/plugins/manifests/song_detect_qwen_cli.json @@ -0,0 +1,9 @@ +{ + "id": "qwen_cli", + "name": "Qwen CLI Song Detector", + "version": "0.1.0", + "provider_type": "song_detector", + "entrypoint": "biliup_next.modules.song_detect.providers.qwen_cli:QwenCliSongDetector", + "capabilities": ["song_detect"], + "enabled_by_default": true +} diff --git a/tests/test_api_server.py b/tests/test_api_server.py index fd2f794..adaff26 100644 --- a/tests/test_api_server.py +++ b/tests/test_api_server.py @@ -465,6 +465,35 @@ class ApiServerTests(unittest.TestCase): self.assertEqual(body["id"], "task-new") self.assertEqual(body["source_path"], source_path) + def test_post_tasks_creates_task_from_bilibili_url(self) -> None: + with tempfile.TemporaryDirectory() as tmpdir: + source_url = "https://www.bilibili.com/video/BV1TEST1234" + created_task = Task( + id="task-bv", + source_type="bilibili_url", + source_path=str(Path(tmpdir) / "session" / "task-bv" / "task-bv.mp4"), + title="video-title", + status="created", + created_at="2026-01-01T00:00:00+00:00", + updated_at="2026-01-01T00:00:00+00:00", + ) + ingest_service = SimpleNamespace(create_task_from_url=lambda url, settings: created_task) + repo = FakeRepo(created_task) + state = self._state(tmpdir, repo, ingest_service=ingest_service) + state["settings"]["ingest"] = {"provider": "bilibili_url", "yt_dlp_cmd": "yt-dlp"} + + response_status, _, body = self._request( + "POST", + "/tasks", + state, + body=json.dumps({"source_type": "bilibili_url", "source_url": source_url}).encode("utf-8"), + headers={"Content-Type": "application/json"}, + ) + + self.assertEqual(response_status, 201) + self.assertEqual(body["id"], "task-bv") + self.assertEqual(body["source_type"], "bilibili_url") + def test_post_run_task_action_returns_accepted_payload(self) -> None: with tempfile.TemporaryDirectory() as tmpdir: task = Task( diff --git a/tests/test_bilibili_top_comment_provider.py b/tests/test_bilibili_top_comment_provider.py new file mode 100644 index 0000000..7564943 --- /dev/null +++ b/tests/test_bilibili_top_comment_provider.py @@ -0,0 +1,256 @@ +from __future__ import annotations + +import json +import tempfile +import unittest +from pathlib import Path + +from biliup_next.core.models import Task, utc_now_iso +from biliup_next.core.errors import ModuleError +from biliup_next.modules.comment.providers.bilibili_top_comment import BilibiliTopCommentProvider + + +class _FakeBilibiliApi: + def __init__(self) -> None: + self.reply_calls: list[dict[str, object]] = [] + + def load_cookies(self, path: Path) -> dict[str, str]: + return {"bili_jct": "csrf-token"} + + def build_session(self, *, cookies: dict[str, str], referer: str, origin: str | None = None) -> object: + return object() + + def get_video_view(self, session, bvid: str, *, error_code: str, error_message: str) -> dict[str, object]: + return {"aid": 123} + + def add_reply(self, session, *, csrf: str, aid: int, content: str, error_message: str) -> dict[str, object]: + self.reply_calls.append({"aid": aid, "content": content, "error_message": error_message}) + raise ModuleError( + code="COMMENT_POST_FAILED", + message=f"{error_message}: 当前页面评论功能已关闭", + retryable=True, + ) + + def top_reply(self, session, *, csrf: str, aid: int, rpid: int, error_message: str) -> None: + raise AssertionError("top_reply should not be called when comment is disabled") + + +class BilibiliTopCommentProviderTests(unittest.TestCase): + def test_split_comment_aggregates_session_parts_on_anchor_task(self) -> None: + api = _FakeBilibiliApi() + provider = BilibiliTopCommentProvider(bilibili_api=api) + with tempfile.TemporaryDirectory() as tmpdir: + root = Path(tmpdir) + task = Task("task-1", "local_file", str(root / "source-1.mp4"), "task-1", "published", utc_now_iso(), utc_now_iso()) + task_dir_1 = root / "task-1" + task_dir_2 = root / "task-2" + task_dir_1.mkdir(parents=True, exist_ok=True) + task_dir_2.mkdir(parents=True, exist_ok=True) + (task_dir_1 / "songs.txt").write_text("00:00:00 Song A — Artist A\n", encoding="utf-8") + (task_dir_1 / "songs.json").write_text(json.dumps({"songs": [{"title": "Song A", "artist": "Artist A"}]}), encoding="utf-8") + (task_dir_1 / "bvid.txt").write_text("BV1SPLIT111", encoding="utf-8") + (task_dir_2 / "songs.txt").write_text("00:00:00 Song B — Artist B\n", encoding="utf-8") + (task_dir_2 / "songs.json").write_text(json.dumps({"songs": [{"title": "Song B", "artist": "Artist B"}]}), encoding="utf-8") + cookies_file = root / "cookies.json" + cookies_file.write_text("{}", encoding="utf-8") + + class _Repo: + def get_task_context(self, task_id): # noqa: ANN001 + mapping = { + "task-1": type("Ctx", (), {"task_id": "task-1", "session_key": "session-1", "segment_started_at": "2026-04-04T09:23:00+08:00", "source_title": "part-1"})(), + "task-2": type("Ctx", (), {"task_id": "task-2", "session_key": "session-1", "segment_started_at": "2026-04-04T09:25:00+08:00", "source_title": "part-2"})(), + } + return mapping[task_id] + + def list_task_contexts_by_session_key(self, session_key): # noqa: ANN001 + return [self.get_task_context("task-1"), self.get_task_context("task-2")] + + result = provider.comment( + task, + { + "session_dir": str(root), + "cookies_file": str(cookies_file), + "post_split_comment": True, + "post_full_video_timeline_comment": False, + "__repo": _Repo(), + }, + ) + + self.assertEqual(result["status"], "ok") + self.assertEqual(result["split"]["status"], "skipped") + self.assertEqual(result["split"]["reason"], "comment_disabled") + self.assertEqual(len(api.reply_calls), 1) + self.assertIn("P1:\n1. Song A — Artist A", api.reply_calls[0]["content"]) + self.assertIn("P2:\n1. Song B — Artist B", api.reply_calls[0]["content"]) + + def test_split_comment_skips_on_non_anchor_task(self) -> None: + api = _FakeBilibiliApi() + provider = BilibiliTopCommentProvider(bilibili_api=api) + with tempfile.TemporaryDirectory() as tmpdir: + root = Path(tmpdir) + task = Task("task-2", "local_file", str(root / "source-2.mp4"), "task-2", "published", utc_now_iso(), utc_now_iso()) + task_dir = root / "task-2" + task_dir.mkdir(parents=True, exist_ok=True) + (task_dir / "songs.txt").write_text("00:00:00 Song B — Artist B\n", encoding="utf-8") + (task_dir / "songs.json").write_text(json.dumps({"songs": [{"title": "Song B", "artist": "Artist B"}]}), encoding="utf-8") + (task_dir / "bvid.txt").write_text("BV1SPLIT222", encoding="utf-8") + cookies_file = root / "cookies.json" + cookies_file.write_text("{}", encoding="utf-8") + + class _Repo: + def get_task_context(self, task_id): # noqa: ANN001 + mapping = { + "task-1": type("Ctx", (), {"task_id": "task-1", "session_key": "session-1", "segment_started_at": "2026-04-04T09:23:00+08:00", "source_title": "part-1"})(), + "task-2": type("Ctx", (), {"task_id": "task-2", "session_key": "session-1", "segment_started_at": "2026-04-04T09:25:00+08:00", "source_title": "part-2"})(), + } + return mapping[task_id] + + def list_task_contexts_by_session_key(self, session_key): # noqa: ANN001 + return [self.get_task_context("task-1"), self.get_task_context("task-2")] + + result = provider.comment( + task, + { + "session_dir": str(root), + "cookies_file": str(cookies_file), + "post_split_comment": True, + "post_full_video_timeline_comment": False, + "__repo": _Repo(), + }, + ) + + self.assertEqual(result["status"], "ok") + self.assertEqual(result["split"]["status"], "skipped") + self.assertEqual(result["split"]["reason"], "session_split_comment_owned_by_anchor") + self.assertEqual(api.reply_calls, []) + + def test_comment_skips_when_page_comment_is_disabled(self) -> None: + provider = BilibiliTopCommentProvider(bilibili_api=_FakeBilibiliApi()) + with tempfile.TemporaryDirectory() as tmpdir: + root = Path(tmpdir) + task = Task( + id="task-1", + source_type="local_file", + source_path=str(root / "source.mp4"), + title="task-1", + status="published", + created_at=utc_now_iso(), + updated_at=utc_now_iso(), + ) + work_dir = root / task.title + work_dir.mkdir(parents=True, exist_ok=True) + (work_dir / "songs.txt").write_text("00:00:00 Test Song - Tester\n", encoding="utf-8") + (work_dir / "songs.json").write_text(json.dumps({"songs": [{"title": "Test Song", "artist": "Tester"}]}), encoding="utf-8") + (work_dir / "bvid.txt").write_text("BV1COMMENT123", encoding="utf-8") + cookies_file = root / "cookies.json" + cookies_file.write_text("{}", encoding="utf-8") + + result = provider.comment( + task, + { + "session_dir": str(root), + "cookies_file": str(cookies_file), + "post_split_comment": True, + "post_full_video_timeline_comment": False, + }, + ) + + self.assertEqual(result["status"], "ok") + self.assertEqual(result["split"]["status"], "skipped") + self.assertEqual(result["split"]["reason"], "comment_disabled") + self.assertTrue((work_dir / "comment_split_done.flag").exists()) + self.assertTrue((work_dir / "comment_full_done.flag").exists()) + self.assertTrue((work_dir / "comment_done.flag").exists()) + + def test_full_comment_aggregates_session_parts_on_anchor_task(self) -> None: + api = _FakeBilibiliApi() + provider = BilibiliTopCommentProvider(bilibili_api=api) + with tempfile.TemporaryDirectory() as tmpdir: + root = Path(tmpdir) + task = Task("task-1", "local_file", str(root / "source-1.mp4"), "task-1", "published", utc_now_iso(), utc_now_iso()) + task_dir_1 = root / "task-1" + task_dir_2 = root / "task-2" + task_dir_1.mkdir(parents=True, exist_ok=True) + task_dir_2.mkdir(parents=True, exist_ok=True) + (task_dir_1 / "songs.txt").write_text("00:00:01 Song A\n00:02:00 Song B\n", encoding="utf-8") + (task_dir_1 / "songs.json").write_text(json.dumps({"songs": [{"title": "Song A"}]}), encoding="utf-8") + (task_dir_1 / "bvid.txt").write_text("BV1SPLIT111", encoding="utf-8") + (task_dir_1 / "full_video_bvid.txt").write_text("BV1FULL111", encoding="utf-8") + (task_dir_2 / "songs.txt").write_text("00:00:03 Song C\n", encoding="utf-8") + cookies_file = root / "cookies.json" + cookies_file.write_text("{}", encoding="utf-8") + + class _Repo: + def get_task_context(self, task_id): # noqa: ANN001 + mapping = { + "task-1": type("Ctx", (), {"task_id": "task-1", "session_key": "session-1", "segment_started_at": "2026-04-04T09:23:00+08:00", "source_title": "part-1"})(), + "task-2": type("Ctx", (), {"task_id": "task-2", "session_key": "session-1", "segment_started_at": "2026-04-04T09:25:00+08:00", "source_title": "part-2"})(), + } + return mapping[task_id] + + def list_task_contexts_by_session_key(self, session_key): # noqa: ANN001 + return [self.get_task_context("task-1"), self.get_task_context("task-2")] + + result = provider.comment( + task, + { + "session_dir": str(root), + "cookies_file": str(cookies_file), + "post_split_comment": False, + "post_full_video_timeline_comment": True, + "__repo": _Repo(), + }, + ) + + self.assertEqual(result["status"], "ok") + self.assertEqual(result["full"]["status"], "skipped") + self.assertEqual(result["full"]["reason"], "comment_disabled") + self.assertEqual(len(api.reply_calls), 1) + self.assertIn("P1:\n00:00:01 Song A\n00:02:00 Song B", api.reply_calls[0]["content"]) + self.assertIn("P2:\n00:00:03 Song C", api.reply_calls[0]["content"]) + + def test_full_comment_skips_on_non_anchor_task(self) -> None: + api = _FakeBilibiliApi() + provider = BilibiliTopCommentProvider(bilibili_api=api) + with tempfile.TemporaryDirectory() as tmpdir: + root = Path(tmpdir) + task = Task("task-2", "local_file", str(root / "source-2.mp4"), "task-2", "published", utc_now_iso(), utc_now_iso()) + task_dir = root / "task-2" + task_dir.mkdir(parents=True, exist_ok=True) + (task_dir / "songs.txt").write_text("00:00:03 Song C\n", encoding="utf-8") + (task_dir / "songs.json").write_text(json.dumps({"songs": [{"title": "Song C"}]}), encoding="utf-8") + (task_dir / "bvid.txt").write_text("BV1SPLIT222", encoding="utf-8") + (task_dir / "full_video_bvid.txt").write_text("BV1FULL111", encoding="utf-8") + cookies_file = root / "cookies.json" + cookies_file.write_text("{}", encoding="utf-8") + + class _Repo: + def get_task_context(self, task_id): # noqa: ANN001 + mapping = { + "task-1": type("Ctx", (), {"task_id": "task-1", "session_key": "session-1", "segment_started_at": "2026-04-04T09:23:00+08:00", "source_title": "part-1"})(), + "task-2": type("Ctx", (), {"task_id": "task-2", "session_key": "session-1", "segment_started_at": "2026-04-04T09:25:00+08:00", "source_title": "part-2"})(), + } + return mapping[task_id] + + def list_task_contexts_by_session_key(self, session_key): # noqa: ANN001 + return [self.get_task_context("task-1"), self.get_task_context("task-2")] + + result = provider.comment( + task, + { + "session_dir": str(root), + "cookies_file": str(cookies_file), + "post_split_comment": False, + "post_full_video_timeline_comment": True, + "__repo": _Repo(), + }, + ) + + self.assertEqual(result["status"], "ok") + self.assertEqual(result["full"]["status"], "skipped") + self.assertEqual(result["full"]["reason"], "session_full_comment_owned_by_anchor") + self.assertEqual(api.reply_calls, []) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_biliup_cli_publish_provider.py b/tests/test_biliup_cli_publish_provider.py new file mode 100644 index 0000000..5ec5588 --- /dev/null +++ b/tests/test_biliup_cli_publish_provider.py @@ -0,0 +1,281 @@ +from __future__ import annotations + +import json +import subprocess +import sys +import tempfile +import unittest +from pathlib import Path +from unittest.mock import patch + +from biliup_next.core.models import Artifact, Task, utc_now_iso +from biliup_next.infra.adapters.biliup_cli import BiliupCliAdapter +from biliup_next.modules.publish.providers.biliup_cli import BiliupCliPublishProvider + + +class _FakeBiliupAdapter: + def __init__(self) -> None: + self.optional_calls: list[dict] = [] + self.run_calls: list[dict] = [] + + def run_optional(self, cmd: list[str], *, label: str, timeout_seconds: int | None = None, log_path: Path | None = None) -> None: + self.optional_calls.append( + {"cmd": cmd, "label": label, "timeout_seconds": timeout_seconds, "log_path": log_path} + ) + + def run(self, cmd: list[str], *, label: str, timeout_seconds: int | None = None, log_path: Path | None = None) -> subprocess.CompletedProcess[str]: + self.run_calls.append( + {"cmd": cmd, "label": label, "timeout_seconds": timeout_seconds, "log_path": log_path} + ) + return subprocess.CompletedProcess(cmd, 0, stdout='{"bvid":"BV1TEST12345"}', stderr="") + + +class BiliupCliAdapterTests(unittest.TestCase): + def test_run_writes_publish_log(self) -> None: + adapter = BiliupCliAdapter() + with tempfile.TemporaryDirectory() as tmpdir: + log_path = Path(tmpdir) / "publish.log" + result = adapter.run( + [sys.executable, "-c", "print('hello from biliup adapter')"], + label="adapter smoke", + timeout_seconds=5, + log_path=log_path, + ) + self.assertEqual(result.returncode, 0) + content = log_path.read_text(encoding="utf-8") + self.assertIn("adapter smoke", content) + self.assertIn("timeout_seconds: 5", content) + self.assertIn("exit: 0", content) + self.assertIn("hello from biliup adapter", content) + + +class BiliupCliPublishProviderTests(unittest.TestCase): + def test_publish_passes_timeout_and_log_path(self) -> None: + adapter = _FakeBiliupAdapter() + provider = BiliupCliPublishProvider(adapter=adapter) + with tempfile.TemporaryDirectory() as tmpdir: + root = Path(tmpdir) + task = Task( + id="task-1", + source_type="local_file", + source_path=str(root / "source.mp4"), + title="task-1", + status="split_done", + created_at=utc_now_iso(), + updated_at=utc_now_iso(), + ) + work_dir = root / task.title + work_dir.mkdir(parents=True, exist_ok=True) + (work_dir / "songs.txt").write_text("00:00:00 Test Song - Tester\n", encoding="utf-8") + (work_dir / "songs.json").write_text(json.dumps({"songs": [{"title": "Test Song"}]}), encoding="utf-8") + upload_config = root / "upload_config.json" + upload_config.write_text("{}", encoding="utf-8") + clip_path = work_dir / "clip-1.mp4" + clip_path.write_text("fake", encoding="utf-8") + clip = Artifact( + id=None, + task_id=task.id, + artifact_type="clip_video", + path=str(clip_path), + metadata_json="{}", + created_at=utc_now_iso(), + ) + + record = provider.publish( + task, + [clip], + { + "session_dir": str(root), + "upload_config_file": str(upload_config), + "biliup_path": "runtime/biliup", + "cookie_file": "runtime/cookies.json", + "retry_count": 2, + "command_timeout_seconds": 123, + }, + ) + + self.assertEqual(record.bvid, "BV1TEST12345") + self.assertEqual(adapter.optional_calls[0]["timeout_seconds"], 123) + self.assertEqual(adapter.optional_calls[0]["log_path"], work_dir / "publish.log") + self.assertEqual(adapter.run_calls[0]["timeout_seconds"], 123) + self.assertEqual(adapter.run_calls[0]["log_path"], work_dir / "publish.log") + self.assertTrue((work_dir / "bvid.txt").exists()) + self.assertTrue((work_dir / "upload_done.flag").exists()) + + def test_extract_bvid_supports_rust_debug_string_format(self) -> None: + provider = BiliupCliPublishProvider() + + output = 'ResponseData { code: 0, data: Some(Object {"bvid": String("BV1N5DrBQEBg")}), message: "0" }' + + self.assertEqual(provider._extract_bvid(output), "BV1N5DrBQEBg") + + def test_publish_does_not_reuse_stale_bvid_without_upload_done_flag(self) -> None: + adapter = _FakeBiliupAdapter() + adapter.run = lambda cmd, *, label, timeout_seconds=None, log_path=None: subprocess.CompletedProcess( # type: ignore[method-assign] + cmd, 0, stdout='ResponseData { code: 0, data: Some(Object {"bvid": String("BV1NEW1234567")}) }', stderr="" + ) + provider = BiliupCliPublishProvider(adapter=adapter) + with tempfile.TemporaryDirectory() as tmpdir: + root = Path(tmpdir) + task = Task( + id="task-1", + source_type="local_file", + source_path=str(root / "source.mp4"), + title="task-1", + status="split_done", + created_at=utc_now_iso(), + updated_at=utc_now_iso(), + ) + work_dir = root / task.title + work_dir.mkdir(parents=True, exist_ok=True) + (work_dir / "songs.txt").write_text("00:00:00 Test Song - Tester\n", encoding="utf-8") + (work_dir / "songs.json").write_text(json.dumps({"songs": [{"title": "Test Song"}]}), encoding="utf-8") + (work_dir / "bvid.txt").write_text("BVOLD1234567", encoding="utf-8") + upload_config = root / "upload_config.json" + upload_config.write_text("{}", encoding="utf-8") + clip_path = work_dir / "clip-1.mp4" + clip_path.write_text("fake", encoding="utf-8") + clip = Artifact( + id=None, + task_id=task.id, + artifact_type="clip_video", + path=str(clip_path), + metadata_json="{}", + created_at=utc_now_iso(), + ) + + record = provider.publish( + task, + [clip], + { + "session_dir": str(root), + "upload_config_file": str(upload_config), + "biliup_path": "runtime/biliup", + "cookie_file": "runtime/cookies.json", + "retry_count": 2, + "command_timeout_seconds": 123, + }, + ) + + self.assertEqual(record.bvid, "BV1NEW1234567") + self.assertEqual((work_dir / "bvid.txt").read_text(encoding="utf-8"), "BV1NEW1234567") + + def test_publish_resumes_append_when_bvid_exists_without_upload_done(self) -> None: + adapter = _FakeBiliupAdapter() + provider = BiliupCliPublishProvider(adapter=adapter) + with tempfile.TemporaryDirectory() as tmpdir: + root = Path(tmpdir) + task = Task( + id="task-1", + source_type="local_file", + source_path=str(root / "source.mp4"), + title="task-1", + status="split_done", + created_at=utc_now_iso(), + updated_at=utc_now_iso(), + ) + work_dir = root / task.title + work_dir.mkdir(parents=True, exist_ok=True) + (work_dir / "songs.txt").write_text("00:00:00 Test Song - Tester\n", encoding="utf-8") + (work_dir / "songs.json").write_text(json.dumps({"songs": [{"title": "Test Song"}]}), encoding="utf-8") + (work_dir / "bvid.txt").write_text("BV1RESUME1234", encoding="utf-8") + (work_dir / "publish_progress.json").write_text( + json.dumps({"bvid": "BV1RESUME1234", "completed_append_batches": []}), + encoding="utf-8", + ) + upload_config = root / "upload_config.json" + upload_config.write_text("{}", encoding="utf-8") + clips = [] + for index in range(1, 11): + clip_path = work_dir / f"clip-{index}.mp4" + clip_path.write_text("fake", encoding="utf-8") + clips.append( + Artifact( + id=None, + task_id=task.id, + artifact_type="clip_video", + path=str(clip_path), + metadata_json="{}", + created_at=utc_now_iso(), + ) + ) + + with patch("biliup_next.modules.publish.providers.biliup_cli.time.sleep", return_value=None): + record = provider.publish( + task, + clips, + { + "session_dir": str(root), + "upload_config_file": str(upload_config), + "biliup_path": "runtime/biliup", + "cookie_file": "runtime/cookies.json", + "retry_count": 2, + "command_timeout_seconds": 123, + }, + ) + + self.assertEqual(record.bvid, "BV1RESUME1234") + self.assertEqual(len(adapter.run_calls), 1) + self.assertIn("append", adapter.run_calls[0]["cmd"]) + self.assertIn("BV1RESUME1234", adapter.run_calls[0]["cmd"]) + self.assertTrue((work_dir / "upload_done.flag").exists()) + + def test_publish_creates_progress_from_existing_bvid_for_append_resume(self) -> None: + adapter = _FakeBiliupAdapter() + provider = BiliupCliPublishProvider(adapter=adapter) + with tempfile.TemporaryDirectory() as tmpdir: + root = Path(tmpdir) + task = Task( + id="task-1", + source_type="local_file", + source_path=str(root / "source.mp4"), + title="task-1", + status="split_done", + created_at=utc_now_iso(), + updated_at=utc_now_iso(), + ) + work_dir = root / task.title + work_dir.mkdir(parents=True, exist_ok=True) + (work_dir / "songs.txt").write_text("00:00:00 Test Song - Tester\n", encoding="utf-8") + (work_dir / "songs.json").write_text(json.dumps({"songs": [{"title": "Test Song"}]}), encoding="utf-8") + (work_dir / "bvid.txt").write_text("BV1RESUME1234", encoding="utf-8") + upload_config = root / "upload_config.json" + upload_config.write_text("{}", encoding="utf-8") + clips = [] + for index in range(1, 11): + clip_path = work_dir / f"clip-{index}.mp4" + clip_path.write_text("fake", encoding="utf-8") + clips.append( + Artifact( + id=None, + task_id=task.id, + artifact_type="clip_video", + path=str(clip_path), + metadata_json="{}", + created_at=utc_now_iso(), + ) + ) + + with patch("biliup_next.modules.publish.providers.biliup_cli.time.sleep", return_value=None): + record = provider.publish( + task, + clips, + { + "session_dir": str(root), + "upload_config_file": str(upload_config), + "biliup_path": "runtime/biliup", + "cookie_file": "runtime/cookies.json", + "retry_count": 2, + "command_timeout_seconds": 123, + }, + ) + + self.assertEqual(record.bvid, "BV1RESUME1234") + self.assertEqual(len(adapter.run_calls), 1) + self.assertIn("append", adapter.run_calls[0]["cmd"]) + self.assertFalse((work_dir / "publish_progress.json").exists()) + self.assertTrue((work_dir / "upload_done.flag").exists()) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_ingest_bilibili_url.py b/tests/test_ingest_bilibili_url.py new file mode 100644 index 0000000..11cb184 --- /dev/null +++ b/tests/test_ingest_bilibili_url.py @@ -0,0 +1,49 @@ +from __future__ import annotations + +import tempfile +import unittest +from pathlib import Path + +from biliup_next.modules.ingest.providers.bilibili_url import BilibiliUrlIngestProvider + + +class FakeYtDlpAdapter: + def probe(self, *, yt_dlp_cmd: str, source_url: str): # noqa: ANN001 + return { + "id": "BV1TEST1234", + "title": "测试视频标题", + "uploader": "测试主播", + "duration": 321.0, + } + + def download(self, *, yt_dlp_cmd: str, source_url: str, output_template: str, format_selector=None): # noqa: ANN001 + output_path = Path(output_template.replace("%(ext)s", "mp4")) + output_path.parent.mkdir(parents=True, exist_ok=True) + output_path.write_bytes(b"fake-video") + return type("Result", (), {"returncode": 0, "stdout": "ok", "stderr": ""})() + + +class BilibiliUrlIngestProviderTests(unittest.TestCase): + def test_resolve_and_download_source(self) -> None: + provider = BilibiliUrlIngestProvider(yt_dlp=FakeYtDlpAdapter()) + settings = {"yt_dlp_cmd": "yt-dlp"} + + resolved = provider.resolve_source("https://www.bilibili.com/video/BV1TEST1234", settings) + + self.assertEqual(resolved["video_id"], "BV1TEST1234") + self.assertEqual(resolved["title"], "测试视频标题") + self.assertEqual(resolved["streamer"], "测试主播") + + with tempfile.TemporaryDirectory() as tmpdir: + downloaded = provider.download_source( + "https://www.bilibili.com/video/BV1TEST1234", + Path(tmpdir), + settings, + task_id=str(resolved["task_id"]), + ) + self.assertTrue(downloaded.exists()) + self.assertEqual(downloaded.suffix, ".mp4") + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_ingest_session_grouping.py b/tests/test_ingest_session_grouping.py new file mode 100644 index 0000000..e37509d --- /dev/null +++ b/tests/test_ingest_session_grouping.py @@ -0,0 +1,61 @@ +from __future__ import annotations + +import unittest + +from biliup_next.core.models import Task, TaskContext +from biliup_next.modules.ingest.service import IngestService + + +class _FakeRepo: + def __init__(self, contexts: list[TaskContext]) -> None: + self.contexts = contexts + + def find_recent_task_contexts(self, streamer: str) -> list[TaskContext]: + return [context for context in self.contexts if context.streamer == streamer] + + +class IngestSessionGroupingTests(unittest.TestCase): + def test_infer_session_key_groups_same_streamer_within_three_hours_to_earliest_title(self) -> None: + existing_context = TaskContext( + id=None, + task_id="task-1", + session_key="王海颖唱歌录播 04月04日 21时59分 p01 王海颖唱歌录播 04月04日 21时59分", + streamer="王海颖唱歌录播", + room_id=None, + source_title="王海颖唱歌录播 04月04日 21时59分 p01 王海颖唱歌录播 04月04日 21时59分", + segment_started_at="2026-04-04T21:59:00+08:00", + segment_duration_seconds=None, + full_video_bvid="BVFULL123", + created_at="2026-04-04T14:00:00+00:00", + updated_at="2026-04-04T14:00:00+00:00", + ) + service = IngestService(registry=None, repo=_FakeRepo([existing_context])) # type: ignore[arg-type] + + session_key, inherited_bvid = service._infer_session_key( + streamer="王海颖唱歌录播", + room_id=None, + segment_started_at="2026-04-05T00:30:00+08:00", + source_title="王海颖唱歌录播 04月05日 00时30分 p02 王海颖唱歌录播 04月05日 00时30分", + gap_minutes=60, + ) + + self.assertEqual(session_key, existing_context.session_key) + self.assertEqual(inherited_bvid, "BVFULL123") + + def test_infer_session_key_uses_current_title_when_no_recent_context_matches(self) -> None: + service = IngestService(registry=None, repo=_FakeRepo([])) # type: ignore[arg-type] + + session_key, inherited_bvid = service._infer_session_key( + streamer="王海颖唱歌录播", + room_id=None, + segment_started_at="2026-04-05T00:30:00+08:00", + source_title="王海颖唱歌录播 04月05日 00时30分 p02 王海颖唱歌录播 04月05日 00时30分", + gap_minutes=60, + ) + + self.assertEqual(session_key, "王海颖唱歌录播 04月05日 00时30分 p02 王海颖唱歌录播 04月05日 00时30分") + self.assertIsNone(inherited_bvid) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_publish_service.py b/tests/test_publish_service.py new file mode 100644 index 0000000..6cdb30d --- /dev/null +++ b/tests/test_publish_service.py @@ -0,0 +1,159 @@ +from __future__ import annotations + +import json +import tempfile +import unittest +from pathlib import Path + +from biliup_next.core.models import Artifact, PublishRecord, Task, TaskContext, TaskStep +from biliup_next.modules.publish.service import PublishService + + +class _FakePublishProvider: + def __init__(self) -> None: + self.calls: list[tuple[str, list[str], dict[str, object]]] = [] + + def publish(self, task: Task, clip_videos: list[Artifact], settings: dict[str, object]) -> PublishRecord: + self.calls.append((task.id, [artifact.path for artifact in clip_videos], dict(settings))) + return PublishRecord( + id=None, + task_id=task.id, + platform="bilibili", + aid=None, + bvid="BV1SESSION123", + title=task.title, + published_at="2026-01-01T00:00:00+00:00", + ) + + +class _FakeRegistry: + def __init__(self, provider) -> None: # noqa: ANN001 + self.provider = provider + + def get(self, provider_type: str, provider_id: str): # noqa: ANN001 + return self.provider + + +class _FakeRepo: + def __init__(self, tasks: list[Task], contexts: list[TaskContext], artifacts: dict[str, list[Artifact]]) -> None: + self.tasks = {task.id: task for task in tasks} + self.contexts = {context.task_id: context for context in contexts} + self.artifacts = artifacts + self.publish_records: list[PublishRecord] = [] + self.step_updates: list[tuple[str, str, str]] = [] + self.task_updates: list[tuple[str, str]] = [] + + def get_task(self, task_id: str) -> Task | None: + return self.tasks.get(task_id) + + def list_artifacts(self, task_id: str) -> list[Artifact]: + return list(self.artifacts.get(task_id, [])) + + def get_task_context(self, task_id: str) -> TaskContext | None: + return self.contexts.get(task_id) + + def list_task_contexts_by_session_key(self, session_key: str) -> list[TaskContext]: + return [context for context in self.contexts.values() if context.session_key == session_key] + + def add_publish_record(self, record: PublishRecord) -> None: + self.publish_records.append(record) + + def add_artifact(self, artifact: Artifact) -> None: + self.artifacts.setdefault(artifact.task_id, []).append(artifact) + + def update_step_status(self, task_id: str, step_name: str, status: str, **kwargs) -> None: # noqa: ANN001 + self.step_updates.append((task_id, step_name, status)) + + def update_task_status(self, task_id: str, status: str, updated_at: str) -> None: + self.task_updates.append((task_id, status)) + task = self.tasks[task_id] + self.tasks[task_id] = Task(task.id, task.source_type, task.source_path, task.title, status, task.created_at, updated_at) + + +class PublishServiceTests(unittest.TestCase): + def test_anchor_task_publishes_aggregated_session_clips_and_marks_all_tasks_published(self) -> None: + provider = _FakePublishProvider() + with tempfile.TemporaryDirectory() as tmpdir: + root = Path(tmpdir) + task1 = Task("task-1", "local_file", "/tmp/a.mp4", "task-1", "split_done", "2026-01-01T00:00:00+00:00", "2026-01-01T00:00:00+00:00") + task2 = Task("task-2", "local_file", "/tmp/b.mp4", "task-2", "split_done", "2026-01-01T00:00:00+00:00", "2026-01-01T00:00:00+00:00") + ctx1 = TaskContext(None, "task-1", "session-1", "s", None, "part-1", "2026-04-04T09:23:00+08:00", None, None, task1.created_at, task1.updated_at) + ctx2 = TaskContext(None, "task-2", "session-1", "s", None, "part-2", "2026-04-04T09:25:00+08:00", None, None, task2.created_at, task2.updated_at) + artifacts = { + "task-1": [Artifact(None, "task-1", "clip_video", str(root / "a1.mp4"), "{}", task1.created_at)], + "task-2": [Artifact(None, "task-2", "clip_video", str(root / "b1.mp4"), "{}", task2.created_at)], + } + repo = _FakeRepo([task1, task2], [ctx1, ctx2], artifacts) + service = PublishService(_FakeRegistry(provider), repo) + + record = service.run("task-1", {"provider": "biliup_cli", "session_dir": str(root)}) + + self.assertEqual(record.bvid, "BV1SESSION123") + self.assertEqual(provider.calls[0][0], "task-1") + self.assertEqual(provider.calls[0][1], [str(root / "a1.mp4"), str(root / "b1.mp4")]) + aggregate_settings = provider.calls[0][2] + aggregate_txt = Path(str(aggregate_settings["publish_songs_txt_path"])) + aggregate_json = Path(str(aggregate_settings["publish_songs_json_path"])) + self.assertTrue(aggregate_txt.exists()) + self.assertTrue(aggregate_json.exists()) + self.assertIn(("task-1", "published"), repo.task_updates) + self.assertIn(("task-2", "published"), repo.task_updates) + self.assertEqual(len(repo.publish_records), 2) + self.assertTrue((root / "task-1" / "bvid.txt").exists()) + self.assertTrue((root / "task-2" / "bvid.txt").exists()) + + def test_non_anchor_task_reuses_existing_session_bvid_without_republishing(self) -> None: + provider = _FakePublishProvider() + with tempfile.TemporaryDirectory() as tmpdir: + root = Path(tmpdir) + (root / "task-1").mkdir(parents=True, exist_ok=True) + (root / "task-1" / "bvid.txt").write_text("BV1SESSION123", encoding="utf-8") + task1 = Task("task-1", "local_file", "/tmp/a.mp4", "task-1", "published", "2026-01-01T00:00:00+00:00", "2026-01-01T00:00:00+00:00") + task2 = Task("task-2", "local_file", "/tmp/b.mp4", "task-2", "split_done", "2026-01-01T00:00:00+00:00", "2026-01-01T00:00:00+00:00") + ctx1 = TaskContext(None, "task-1", "session-1", "s", None, "part-1", "2026-04-04T09:23:00+08:00", None, None, task1.created_at, task1.updated_at) + ctx2 = TaskContext(None, "task-2", "session-1", "s", None, "part-2", "2026-04-04T09:25:00+08:00", None, None, task2.created_at, task2.updated_at) + repo = _FakeRepo([task1, task2], [ctx1, ctx2], {"task-2": []}) + service = PublishService(_FakeRegistry(provider), repo) + + record = service.run("task-2", {"provider": "biliup_cli", "session_dir": str(root)}) + + self.assertEqual(record.bvid, "BV1SESSION123") + self.assertEqual(provider.calls, []) + self.assertIn(("task-2", "published"), repo.task_updates) + self.assertTrue((root / "task-2" / "bvid.txt").exists()) + + def test_session_publish_aggregates_song_lists_for_provider_metadata(self) -> None: + provider = _FakePublishProvider() + with tempfile.TemporaryDirectory() as tmpdir: + root = Path(tmpdir) + (root / "task-1").mkdir(parents=True, exist_ok=True) + (root / "task-2").mkdir(parents=True, exist_ok=True) + (root / "task-1" / "songs.txt").write_text("00:00:00 Song A — Artist A\n", encoding="utf-8") + (root / "task-2" / "songs.txt").write_text("00:00:00 Song B — Artist B\n", encoding="utf-8") + (root / "task-1" / "songs.json").write_text('{"songs":[{"title":"Song A"},{"title":"Song A2"}]}\n', encoding="utf-8") + (root / "task-2" / "songs.json").write_text('{"songs":[{"title":"Song B"}]}\n', encoding="utf-8") + task1 = Task("task-1", "local_file", "/tmp/a.mp4", "task-1", "split_done", "2026-01-01T00:00:00+00:00", "2026-01-01T00:00:00+00:00") + task2 = Task("task-2", "local_file", "/tmp/b.mp4", "task-2", "split_done", "2026-01-01T00:00:00+00:00", "2026-01-01T00:00:00+00:00") + ctx1 = TaskContext(None, "task-1", "session-1", "s", None, "part-1", "2026-04-04T09:23:00+08:00", None, None, task1.created_at, task1.updated_at) + ctx2 = TaskContext(None, "task-2", "session-1", "s", None, "part-2", "2026-04-04T09:25:00+08:00", None, None, task2.created_at, task2.updated_at) + artifacts = { + "task-1": [Artifact(None, "task-1", "clip_video", str(root / "a1.mp4"), "{}", task1.created_at)], + "task-2": [Artifact(None, "task-2", "clip_video", str(root / "b1.mp4"), "{}", task2.created_at)], + } + repo = _FakeRepo([task1, task2], [ctx1, ctx2], artifacts) + service = PublishService(_FakeRegistry(provider), repo) + + service.run("task-1", {"provider": "biliup_cli", "session_dir": str(root)}) + + settings = provider.calls[0][2] + aggregate_txt = Path(str(settings["publish_songs_txt_path"])).read_text(encoding="utf-8") + aggregate_json = Path(str(settings["publish_songs_json_path"])).read_text(encoding="utf-8") + self.assertIn("P1:", aggregate_txt) + self.assertIn("Song A — Artist A", aggregate_txt) + self.assertIn("P2:", aggregate_txt) + self.assertIn("Song B — Artist B", aggregate_txt) + self.assertEqual(len(json.loads(aggregate_json)["songs"]), 3) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_session_delivery_service.py b/tests/test_session_delivery_service.py index 5b654f3..9a3b310 100644 --- a/tests/test_session_delivery_service.py +++ b/tests/test_session_delivery_service.py @@ -11,6 +11,7 @@ from biliup_next.core.models import Task, TaskContext class FakeRepo: def __init__(self, task: Task, context: TaskContext | None = None, contexts: list[TaskContext] | None = None) -> None: self.task = task + self.tasks = {task.id: task} self.context = context self.contexts = contexts or ([] if context is None else [context]) self.task_context_upserts: list[TaskContext] = [] @@ -19,7 +20,7 @@ class FakeRepo: self.updated_session_bvid: tuple[str, str, str] | None = None def get_task(self, task_id: str) -> Task | None: - return self.task if task_id == self.task.id else None + return self.tasks.get(task_id) def get_task_context(self, task_id: str) -> TaskContext | None: return self.context if task_id == self.task.id else None @@ -78,6 +79,56 @@ class SessionDeliveryServiceTests(unittest.TestCase): self.assertTrue(persisted_path.exists()) self.assertEqual(persisted_path.read_text(encoding="utf-8"), "BVWEBHOOK123") + def test_receive_full_video_webhook_uses_source_title_to_expand_to_session(self) -> None: + with tempfile.TemporaryDirectory() as tmpdir: + task = Task("task-1", "local_file", "/tmp/source.mp4", "task-title", "published", "2026-01-01T00:00:00+00:00", "2026-01-01T00:00:00+00:00") + anchor = TaskContext( + id=None, + task_id="task-1", + session_key="session-anchor", + streamer="streamer", + room_id="room", + source_title="anchor-title", + segment_started_at=None, + segment_duration_seconds=None, + full_video_bvid=None, + created_at="2026-01-01T00:00:00+00:00", + updated_at="2026-01-01T00:00:00+00:00", + ) + sibling = TaskContext( + id=None, + task_id="task-2", + session_key="session-anchor", + streamer="streamer", + room_id="room", + source_title="sibling-title", + segment_started_at=None, + segment_duration_seconds=None, + full_video_bvid=None, + created_at="2026-01-01T00:00:00+00:00", + updated_at="2026-01-01T00:00:00+00:00", + ) + repo = FakeRepo(task, context=anchor, contexts=[anchor, sibling]) + repo.tasks["task-2"] = Task( + "task-2", + "local_file", + "/tmp/source-2.mp4", + "task-title-2", + "published", + "2026-01-01T00:00:00+00:00", + "2026-01-01T00:00:00+00:00", + ) + state = {"repo": repo, "settings": {"paths": {"session_dir": str(Path(tmpdir) / "session")}}} + + result = SessionDeliveryService(state).receive_full_video_webhook( + {"source_title": "anchor-title", "full_video_bvid": "BVWEBHOOK123"} + ) + + self.assertEqual(result["session_key"], "session-anchor") + self.assertEqual(result["updated_count"], 2) + self.assertTrue(any(binding.session_key == "session-anchor" for binding in repo.session_binding_upserts)) + self.assertTrue(any(binding.source_title == "anchor-title" for binding in repo.session_binding_upserts)) + def test_merge_session_returns_error_when_task_ids_empty(self) -> None: task = Task("task-1", "local_file", "/tmp/source.mp4", "task-title", "created", "2026-01-01T00:00:00+00:00", "2026-01-01T00:00:00+00:00") repo = FakeRepo(task) diff --git a/tests/test_settings_service.py b/tests/test_settings_service.py index e6c221c..b28b26c 100644 --- a/tests/test_settings_service.py +++ b/tests/test_settings_service.py @@ -28,7 +28,9 @@ class SettingsServiceTests(unittest.TestCase): "upload_config_file": {"type": "string", "default": "runtime/upload_config.json"} }, "ingest": { - "ffprobe_bin": {"type": "string", "default": "ffprobe"} + "ffprobe_bin": {"type": "string", "default": "ffprobe"}, + "yt_dlp_cmd": {"type": "string", "default": "yt-dlp"}, + "yt_dlp_format": {"type": "string", "default": ""} }, "transcribe": { "ffmpeg_bin": {"type": "string", "default": "ffmpeg"} @@ -37,7 +39,8 @@ class SettingsServiceTests(unittest.TestCase): "ffmpeg_bin": {"type": "string", "default": "ffmpeg"} }, "song_detect": { - "codex_cmd": {"type": "string", "default": "codex"} + "codex_cmd": {"type": "string", "default": "codex"}, + "qwen_cmd": {"type": "string", "default": "qwen"} }, "publish": { "biliup_path": {"type": "string", "default": "runtime/biliup"}, @@ -59,10 +62,10 @@ class SettingsServiceTests(unittest.TestCase): "cookies_file": "runtime/cookies.json", "upload_config_file": "runtime/upload_config.json" }, - "ingest": {"ffprobe_bin": "ffprobe"}, + "ingest": {"ffprobe_bin": "ffprobe", "yt_dlp_cmd": "yt-dlp", "yt_dlp_format": ""}, "transcribe": {"ffmpeg_bin": "ffmpeg"}, "split": {"ffmpeg_bin": "ffmpeg"}, - "song_detect": {"codex_cmd": "codex"}, + "song_detect": {"codex_cmd": "codex", "qwen_cmd": "qwen"}, "publish": {"biliup_path": "runtime/biliup", "cookie_file": "runtime/cookies.json"} } """, diff --git a/tests/test_song_detect_providers.py b/tests/test_song_detect_providers.py new file mode 100644 index 0000000..5ddff7b --- /dev/null +++ b/tests/test_song_detect_providers.py @@ -0,0 +1,77 @@ +from __future__ import annotations + +import json +import tempfile +import unittest +from pathlib import Path + +from biliup_next.core.models import Artifact, Task, utc_now_iso +from biliup_next.modules.song_detect.providers.qwen_cli import QwenCliSongDetector + + +class FakeQwenCliAdapter: + def __init__(self, returncode: int = 0) -> None: + self.returncode = returncode + self.last_qwen_cmd: str | None = None + + def run_song_detect(self, *, qwen_cmd: str, work_dir: Path, prompt: str): # noqa: ANN001 + self.last_qwen_cmd = qwen_cmd + songs_json_path = work_dir / "songs.json" + songs_json_path.write_text( + json.dumps( + { + "songs": [ + { + "start": "00:01:23,000", + "end": "00:03:45,000", + "title": "测试歌曲", + "artist": "测试歌手", + "confidence": 0.93, + "evidence": "歌词命中", + } + ] + }, + ensure_ascii=False, + ), + encoding="utf-8", + ) + return type("Result", (), {"returncode": self.returncode, "stdout": "ok", "stderr": ""})() + + +class SongDetectProviderTests(unittest.TestCase): + def test_qwen_cli_provider_generates_json_and_txt_artifacts(self) -> None: + with tempfile.TemporaryDirectory() as tmpdir: + work_dir = Path(tmpdir) + subtitle_path = work_dir / "subtitle.srt" + subtitle_path.write_text("1\n00:00:00,000 --> 00:00:03,000\n测试字幕\n", encoding="utf-8") + provider = QwenCliSongDetector(adapter=FakeQwenCliAdapter()) + + task = Task( + id="task-1", + source_type="local_file", + source_path=str(work_dir / "video.mp4"), + title="task-1", + status="transcribed", + created_at=utc_now_iso(), + updated_at=utc_now_iso(), + ) + subtitle = Artifact( + id=None, + task_id=task.id, + artifact_type="subtitle_srt", + path=str(subtitle_path), + metadata_json=None, + created_at=utc_now_iso(), + ) + + songs_json, songs_txt = provider.detect(task, subtitle, {"qwen_cmd": "qwen"}) + + self.assertEqual(json.loads(songs_json.metadata_json)["provider"], "qwen_cli") + self.assertEqual(json.loads(songs_txt.metadata_json)["provider"], "qwen_cli") + self.assertTrue(Path(songs_json.path).exists()) + self.assertTrue(Path(songs_txt.path).exists()) + self.assertIn("测试歌曲", Path(songs_txt.path).read_text(encoding="utf-8")) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_task_engine.py b/tests/test_task_engine.py index 553f921..23198d6 100644 --- a/tests/test_task_engine.py +++ b/tests/test_task_engine.py @@ -65,6 +65,69 @@ class TaskEngineTests(unittest.TestCase): self.assertTrue(waiting_payload["waiting_for_retry"]) self.assertEqual(waiting_payload["step"], "publish") + def test_next_runnable_step_blocks_non_anchor_session_publish_until_anchor_runs(self) -> None: + task = SimpleNamespace(id="task-2", status="split_done") + steps = { + "publish": TaskStep(None, "task-2", "publish", "pending", None, None, 0, None, None), + } + + class _Repo: + def get_task_context(self, task_id): # noqa: ANN001 + return SimpleNamespace(task_id=task_id, session_key="session-1") + + def list_task_contexts_by_session_key(self, session_key): # noqa: ANN001 + return [ + SimpleNamespace(task_id="task-1", segment_started_at="2026-04-04T09:23:00+08:00", source_title="part-1"), + SimpleNamespace(task_id="task-2", segment_started_at="2026-04-04T09:25:00+08:00", source_title="part-2"), + ] + + def get_task(self, task_id): # noqa: ANN001 + status = "split_done" + return SimpleNamespace(id=task_id, status=status) + + state = { + "repo": _Repo(), + "settings": { + "comment": {"enabled": True}, + "collection": {"enabled": True}, + "paths": {}, + "publish": {}, + }, + } + + self.assertEqual(next_runnable_step(task, steps, state), (None, None)) + + def test_next_runnable_step_allows_anchor_session_publish_when_all_parts_split_done(self) -> None: + task = SimpleNamespace(id="task-1", status="split_done") + steps = { + "publish": TaskStep(None, "task-1", "publish", "pending", None, None, 0, None, None), + } + + class _Repo: + def get_task_context(self, task_id): # noqa: ANN001 + return SimpleNamespace(task_id=task_id, session_key="session-1") + + def list_task_contexts_by_session_key(self, session_key): # noqa: ANN001 + return [ + SimpleNamespace(task_id="task-1", segment_started_at="2026-04-04T09:23:00+08:00", source_title="part-1"), + SimpleNamespace(task_id="task-2", segment_started_at="2026-04-04T09:25:00+08:00", source_title="part-2"), + ] + + def get_task(self, task_id): # noqa: ANN001 + return SimpleNamespace(id=task_id, status="split_done") + + state = { + "repo": _Repo(), + "settings": { + "comment": {"enabled": True}, + "collection": {"enabled": True}, + "paths": {}, + "publish": {}, + }, + } + + self.assertEqual(next_runnable_step(task, steps, state), ("publish", None)) + if __name__ == "__main__": unittest.main() diff --git a/tests/test_task_policies.py b/tests/test_task_policies.py index 3011063..97ce40b 100644 --- a/tests/test_task_policies.py +++ b/tests/test_task_policies.py @@ -70,6 +70,26 @@ class TaskPoliciesTests(unittest.TestCase): self.assertEqual(repo.step_updates[-1][1], "publish") self.assertEqual(repo.task_updates[-1][1], "failed_retryable") + def test_resolve_failure_uses_rate_limit_schedule_for_publish_601(self) -> None: + task = SimpleNamespace(id="task-1", status="running") + steps = [ + TaskStep(None, "task-1", "publish", "running", None, None, 0, "2026-01-01T00:00:00+00:00", None), + ] + repo = FakePolicyRepo(task, steps) + state = { + "settings": { + "publish": {"retry_schedule_minutes": [15, 5], "rate_limit_retry_schedule_minutes": [30, 60]}, + "comment": {}, + "paths": {}, + } + } + exc = ModuleError(code="PUBLISH_RATE_LIMITED", message="rate limited", retryable=True) + + failure = resolve_failure(task, repo, state, exc) + + self.assertEqual(failure["payload"]["next_retry_delay_seconds"], 1800) + self.assertEqual(repo.task_updates[-1][1], "failed_retryable") + if __name__ == "__main__": unittest.main() diff --git a/tests/test_task_runner.py b/tests/test_task_runner.py index f69e5e2..a257f2f 100644 --- a/tests/test_task_runner.py +++ b/tests/test_task_runner.py @@ -4,6 +4,7 @@ import unittest from types import SimpleNamespace from unittest.mock import patch +from biliup_next.core.errors import ModuleError from biliup_next.app.task_runner import process_task from biliup_next.core.models import TaskStep @@ -97,6 +98,39 @@ class TaskRunnerTests(unittest.TestCase): self.assertEqual(repo.task_updates[0][1], "running") self.assertEqual(result["processed"][0]["step"], "transcribe") + def test_process_task_marks_publish_failed_retryable_on_module_error(self) -> None: + task = SimpleNamespace(id="task-1", status="split_done", updated_at="2026-01-01T00:00:00+00:00") + steps = [ + TaskStep(None, "task-1", "publish", "pending", None, None, 0, None, None), + ] + repo = FakeRunnerRepo(task, steps) + state = { + "repo": repo, + "settings": { + "ingest": {}, + "paths": {}, + "comment": {"enabled": True}, + "collection": {"enabled": True}, + "publish": {"retry_schedule_minutes": [15], "rate_limit_retry_schedule_minutes": [30]}, + }, + } + + with patch("biliup_next.app.task_runner.ensure_initialized", return_value=state), patch( + "biliup_next.app.task_runner.record_task_action" + ), patch("biliup_next.app.task_runner.apply_disabled_step_fallbacks", return_value=False), patch( + "biliup_next.app.task_runner.next_runnable_step", return_value=("publish", None) + ), patch( + "biliup_next.app.task_runner.execute_step", + side_effect=ModuleError(code="PUBLISH_RATE_LIMITED", message="rate limited", retryable=True), + ): + result = process_task("task-1") + + self.assertEqual(result["processed"][-1]["retry_status"], "failed_retryable") + self.assertEqual(result["processed"][-1]["next_retry_delay_seconds"], 1800) + self.assertEqual(repo.step_updates[-1][1], "publish") + self.assertEqual(repo.step_updates[-1][2], "failed_retryable") + self.assertEqual(repo.task_updates[-1][1], "failed_retryable") + if __name__ == "__main__": unittest.main()