from __future__ import annotations import json import tempfile import unittest from pathlib import Path from biliup_next.core.models import Artifact, PublishRecord, Task, TaskContext, TaskStep from biliup_next.modules.publish.service import PublishService class _FakePublishProvider: def __init__(self) -> None: self.calls: list[tuple[str, list[str], dict[str, object]]] = [] def publish(self, task: Task, clip_videos: list[Artifact], settings: dict[str, object]) -> PublishRecord: self.calls.append((task.id, [artifact.path for artifact in clip_videos], dict(settings))) return PublishRecord( id=None, task_id=task.id, platform="bilibili", aid=None, bvid="BV1SESSION123", title=task.title, published_at="2026-01-01T00:00:00+00:00", ) class _FakeRegistry: def __init__(self, provider) -> None: # noqa: ANN001 self.provider = provider def get(self, provider_type: str, provider_id: str): # noqa: ANN001 return self.provider class _FakeRepo: def __init__(self, tasks: list[Task], contexts: list[TaskContext], artifacts: dict[str, list[Artifact]]) -> None: self.tasks = {task.id: task for task in tasks} self.contexts = {context.task_id: context for context in contexts} self.artifacts = artifacts self.publish_records: list[PublishRecord] = [] self.step_updates: list[tuple[str, str, str]] = [] self.task_updates: list[tuple[str, str]] = [] def get_task(self, task_id: str) -> Task | None: return self.tasks.get(task_id) def list_artifacts(self, task_id: str) -> list[Artifact]: return list(self.artifacts.get(task_id, [])) def get_task_context(self, task_id: str) -> TaskContext | None: return self.contexts.get(task_id) def list_task_contexts_by_session_key(self, session_key: str) -> list[TaskContext]: return [context for context in self.contexts.values() if context.session_key == session_key] def add_publish_record(self, record: PublishRecord) -> None: self.publish_records.append(record) def add_artifact(self, artifact: Artifact) -> None: self.artifacts.setdefault(artifact.task_id, []).append(artifact) def update_step_status(self, task_id: str, step_name: str, status: str, **kwargs) -> None: # noqa: ANN001 self.step_updates.append((task_id, step_name, status)) def update_task_status(self, task_id: str, status: str, updated_at: str) -> None: self.task_updates.append((task_id, status)) task = self.tasks[task_id] self.tasks[task_id] = Task(task.id, task.source_type, task.source_path, task.title, status, task.created_at, updated_at) class PublishServiceTests(unittest.TestCase): def test_anchor_task_publishes_aggregated_session_clips_and_marks_all_tasks_published(self) -> None: provider = _FakePublishProvider() with tempfile.TemporaryDirectory() as tmpdir: root = Path(tmpdir) (root / "task-1").mkdir(parents=True, exist_ok=True) (root / "task-2").mkdir(parents=True, exist_ok=True) task1 = Task("task-1", "local_file", str(root / "task-1" / "source.mp4"), "task-1", "split_done", "2026-01-01T00:00:00+00:00", "2026-01-01T00:00:00+00:00") task2 = Task("task-2", "local_file", str(root / "task-2" / "source.mp4"), "task-2", "split_done", "2026-01-01T00:00:00+00:00", "2026-01-01T00:00:00+00:00") ctx1 = TaskContext(None, "task-1", "session-1", "s", None, "part-1", "2026-04-04T09:23:00+08:00", None, None, task1.created_at, task1.updated_at) ctx2 = TaskContext(None, "task-2", "session-1", "s", None, "part-2", "2026-04-04T09:25:00+08:00", None, None, task2.created_at, task2.updated_at) artifacts = { "task-1": [Artifact(None, "task-1", "clip_video", str(root / "a1.mp4"), "{}", task1.created_at)], "task-2": [Artifact(None, "task-2", "clip_video", str(root / "b1.mp4"), "{}", task2.created_at)], } repo = _FakeRepo([task1, task2], [ctx1, ctx2], artifacts) service = PublishService(_FakeRegistry(provider), repo) record = service.run("task-1", {"provider": "biliup_cli", "session_dir": str(root)}) self.assertEqual(record.bvid, "BV1SESSION123") self.assertEqual(provider.calls[0][0], "task-1") self.assertEqual(provider.calls[0][1], [str(root / "a1.mp4"), str(root / "b1.mp4")]) aggregate_settings = provider.calls[0][2] aggregate_txt = Path(str(aggregate_settings["publish_songs_txt_path"])) aggregate_json = Path(str(aggregate_settings["publish_songs_json_path"])) self.assertTrue(aggregate_txt.exists()) self.assertTrue(aggregate_json.exists()) self.assertIn(("task-1", "published"), repo.task_updates) self.assertIn(("task-2", "published"), repo.task_updates) self.assertEqual(len(repo.publish_records), 2) self.assertTrue((root / "task-1" / "bvid.txt").exists()) self.assertTrue((root / "task-2" / "bvid.txt").exists()) def test_non_anchor_task_reuses_existing_session_bvid_without_republishing(self) -> None: provider = _FakePublishProvider() with tempfile.TemporaryDirectory() as tmpdir: root = Path(tmpdir) (root / "task-1").mkdir(parents=True, exist_ok=True) (root / "task-1" / "bvid.txt").write_text("BV1SESSION123", encoding="utf-8") (root / "task-2").mkdir(parents=True, exist_ok=True) task1 = Task("task-1", "local_file", str(root / "task-1" / "source.mp4"), "task-1", "published", "2026-01-01T00:00:00+00:00", "2026-01-01T00:00:00+00:00") task2 = Task("task-2", "local_file", str(root / "task-2" / "source.mp4"), "task-2", "split_done", "2026-01-01T00:00:00+00:00", "2026-01-01T00:00:00+00:00") ctx1 = TaskContext(None, "task-1", "session-1", "s", None, "part-1", "2026-04-04T09:23:00+08:00", None, None, task1.created_at, task1.updated_at) ctx2 = TaskContext(None, "task-2", "session-1", "s", None, "part-2", "2026-04-04T09:25:00+08:00", None, None, task2.created_at, task2.updated_at) repo = _FakeRepo([task1, task2], [ctx1, ctx2], {"task-2": []}) service = PublishService(_FakeRegistry(provider), repo) record = service.run("task-2", {"provider": "biliup_cli", "session_dir": str(root)}) self.assertEqual(record.bvid, "BV1SESSION123") self.assertEqual(provider.calls, []) self.assertIn(("task-2", "published"), repo.task_updates) self.assertTrue((root / "task-2" / "bvid.txt").exists()) def test_session_publish_aggregates_song_lists_for_provider_metadata(self) -> None: provider = _FakePublishProvider() with tempfile.TemporaryDirectory() as tmpdir: root = Path(tmpdir) (root / "task-1").mkdir(parents=True, exist_ok=True) (root / "task-2").mkdir(parents=True, exist_ok=True) (root / "task-1" / "songs.txt").write_text("00:00:00 Song A — Artist A\n", encoding="utf-8") (root / "task-2" / "songs.txt").write_text("00:00:00 Song B — Artist B\n", encoding="utf-8") (root / "task-1" / "songs.json").write_text('{"songs":[{"title":"Song A"},{"title":"Song A2"}]}\n', encoding="utf-8") (root / "task-2" / "songs.json").write_text('{"songs":[{"title":"Song B"}]}\n', encoding="utf-8") task1 = Task("task-1", "local_file", str(root / "task-1" / "source.mp4"), "task-1", "split_done", "2026-01-01T00:00:00+00:00", "2026-01-01T00:00:00+00:00") task2 = Task("task-2", "local_file", str(root / "task-2" / "source.mp4"), "task-2", "split_done", "2026-01-01T00:00:00+00:00", "2026-01-01T00:00:00+00:00") ctx1 = TaskContext(None, "task-1", "session-1", "s", None, "part-1", "2026-04-04T09:23:00+08:00", None, None, task1.created_at, task1.updated_at) ctx2 = TaskContext(None, "task-2", "session-1", "s", None, "part-2", "2026-04-04T09:25:00+08:00", None, None, task2.created_at, task2.updated_at) artifacts = { "task-1": [Artifact(None, "task-1", "clip_video", str(root / "a1.mp4"), "{}", task1.created_at)], "task-2": [Artifact(None, "task-2", "clip_video", str(root / "b1.mp4"), "{}", task2.created_at)], } repo = _FakeRepo([task1, task2], [ctx1, ctx2], artifacts) service = PublishService(_FakeRegistry(provider), repo) service.run("task-1", {"provider": "biliup_cli", "session_dir": str(root)}) settings = provider.calls[0][2] aggregate_txt = Path(str(settings["publish_songs_txt_path"])).read_text(encoding="utf-8") aggregate_json = Path(str(settings["publish_songs_json_path"])).read_text(encoding="utf-8") self.assertIn("P1:", aggregate_txt) self.assertIn("Song A — Artist A", aggregate_txt) self.assertIn("P2:", aggregate_txt) self.assertIn("Song B — Artist B", aggregate_txt) self.assertEqual(len(json.loads(aggregate_json)["songs"]), 3) if __name__ == "__main__": unittest.main()