Files
biliup-next/tests/test_publish_service.py

163 lines
9.1 KiB
Python

from __future__ import annotations
import json
import tempfile
import unittest
from pathlib import Path
from biliup_next.core.models import Artifact, PublishRecord, Task, TaskContext, TaskStep
from biliup_next.modules.publish.service import PublishService
class _FakePublishProvider:
def __init__(self) -> None:
self.calls: list[tuple[str, list[str], dict[str, object]]] = []
def publish(self, task: Task, clip_videos: list[Artifact], settings: dict[str, object]) -> PublishRecord:
self.calls.append((task.id, [artifact.path for artifact in clip_videos], dict(settings)))
return PublishRecord(
id=None,
task_id=task.id,
platform="bilibili",
aid=None,
bvid="BV1SESSION123",
title=task.title,
published_at="2026-01-01T00:00:00+00:00",
)
class _FakeRegistry:
def __init__(self, provider) -> None: # noqa: ANN001
self.provider = provider
def get(self, provider_type: str, provider_id: str): # noqa: ANN001
return self.provider
class _FakeRepo:
def __init__(self, tasks: list[Task], contexts: list[TaskContext], artifacts: dict[str, list[Artifact]]) -> None:
self.tasks = {task.id: task for task in tasks}
self.contexts = {context.task_id: context for context in contexts}
self.artifacts = artifacts
self.publish_records: list[PublishRecord] = []
self.step_updates: list[tuple[str, str, str]] = []
self.task_updates: list[tuple[str, str]] = []
def get_task(self, task_id: str) -> Task | None:
return self.tasks.get(task_id)
def list_artifacts(self, task_id: str) -> list[Artifact]:
return list(self.artifacts.get(task_id, []))
def get_task_context(self, task_id: str) -> TaskContext | None:
return self.contexts.get(task_id)
def list_task_contexts_by_session_key(self, session_key: str) -> list[TaskContext]:
return [context for context in self.contexts.values() if context.session_key == session_key]
def add_publish_record(self, record: PublishRecord) -> None:
self.publish_records.append(record)
def add_artifact(self, artifact: Artifact) -> None:
self.artifacts.setdefault(artifact.task_id, []).append(artifact)
def update_step_status(self, task_id: str, step_name: str, status: str, **kwargs) -> None: # noqa: ANN001
self.step_updates.append((task_id, step_name, status))
def update_task_status(self, task_id: str, status: str, updated_at: str) -> None:
self.task_updates.append((task_id, status))
task = self.tasks[task_id]
self.tasks[task_id] = Task(task.id, task.source_type, task.source_path, task.title, status, task.created_at, updated_at)
class PublishServiceTests(unittest.TestCase):
def test_anchor_task_publishes_aggregated_session_clips_and_marks_all_tasks_published(self) -> None:
provider = _FakePublishProvider()
with tempfile.TemporaryDirectory() as tmpdir:
root = Path(tmpdir)
(root / "task-1").mkdir(parents=True, exist_ok=True)
(root / "task-2").mkdir(parents=True, exist_ok=True)
task1 = Task("task-1", "local_file", str(root / "task-1" / "source.mp4"), "task-1", "split_done", "2026-01-01T00:00:00+00:00", "2026-01-01T00:00:00+00:00")
task2 = Task("task-2", "local_file", str(root / "task-2" / "source.mp4"), "task-2", "split_done", "2026-01-01T00:00:00+00:00", "2026-01-01T00:00:00+00:00")
ctx1 = TaskContext(None, "task-1", "session-1", "s", None, "part-1", "2026-04-04T09:23:00+08:00", None, None, task1.created_at, task1.updated_at)
ctx2 = TaskContext(None, "task-2", "session-1", "s", None, "part-2", "2026-04-04T09:25:00+08:00", None, None, task2.created_at, task2.updated_at)
artifacts = {
"task-1": [Artifact(None, "task-1", "clip_video", str(root / "a1.mp4"), "{}", task1.created_at)],
"task-2": [Artifact(None, "task-2", "clip_video", str(root / "b1.mp4"), "{}", task2.created_at)],
}
repo = _FakeRepo([task1, task2], [ctx1, ctx2], artifacts)
service = PublishService(_FakeRegistry(provider), repo)
record = service.run("task-1", {"provider": "biliup_cli", "session_dir": str(root)})
self.assertEqual(record.bvid, "BV1SESSION123")
self.assertEqual(provider.calls[0][0], "task-1")
self.assertEqual(provider.calls[0][1], [str(root / "a1.mp4"), str(root / "b1.mp4")])
aggregate_settings = provider.calls[0][2]
aggregate_txt = Path(str(aggregate_settings["publish_songs_txt_path"]))
aggregate_json = Path(str(aggregate_settings["publish_songs_json_path"]))
self.assertTrue(aggregate_txt.exists())
self.assertTrue(aggregate_json.exists())
self.assertIn(("task-1", "published"), repo.task_updates)
self.assertIn(("task-2", "published"), repo.task_updates)
self.assertEqual(len(repo.publish_records), 2)
self.assertTrue((root / "task-1" / "bvid.txt").exists())
self.assertTrue((root / "task-2" / "bvid.txt").exists())
def test_non_anchor_task_reuses_existing_session_bvid_without_republishing(self) -> None:
provider = _FakePublishProvider()
with tempfile.TemporaryDirectory() as tmpdir:
root = Path(tmpdir)
(root / "task-1").mkdir(parents=True, exist_ok=True)
(root / "task-1" / "bvid.txt").write_text("BV1SESSION123", encoding="utf-8")
(root / "task-2").mkdir(parents=True, exist_ok=True)
task1 = Task("task-1", "local_file", str(root / "task-1" / "source.mp4"), "task-1", "published", "2026-01-01T00:00:00+00:00", "2026-01-01T00:00:00+00:00")
task2 = Task("task-2", "local_file", str(root / "task-2" / "source.mp4"), "task-2", "split_done", "2026-01-01T00:00:00+00:00", "2026-01-01T00:00:00+00:00")
ctx1 = TaskContext(None, "task-1", "session-1", "s", None, "part-1", "2026-04-04T09:23:00+08:00", None, None, task1.created_at, task1.updated_at)
ctx2 = TaskContext(None, "task-2", "session-1", "s", None, "part-2", "2026-04-04T09:25:00+08:00", None, None, task2.created_at, task2.updated_at)
repo = _FakeRepo([task1, task2], [ctx1, ctx2], {"task-2": []})
service = PublishService(_FakeRegistry(provider), repo)
record = service.run("task-2", {"provider": "biliup_cli", "session_dir": str(root)})
self.assertEqual(record.bvid, "BV1SESSION123")
self.assertEqual(provider.calls, [])
self.assertIn(("task-2", "published"), repo.task_updates)
self.assertTrue((root / "task-2" / "bvid.txt").exists())
def test_session_publish_aggregates_song_lists_for_provider_metadata(self) -> None:
provider = _FakePublishProvider()
with tempfile.TemporaryDirectory() as tmpdir:
root = Path(tmpdir)
(root / "task-1").mkdir(parents=True, exist_ok=True)
(root / "task-2").mkdir(parents=True, exist_ok=True)
(root / "task-1" / "songs.txt").write_text("00:00:00 Song A — Artist A\n", encoding="utf-8")
(root / "task-2" / "songs.txt").write_text("00:00:00 Song B — Artist B\n", encoding="utf-8")
(root / "task-1" / "songs.json").write_text('{"songs":[{"title":"Song A"},{"title":"Song A2"}]}\n', encoding="utf-8")
(root / "task-2" / "songs.json").write_text('{"songs":[{"title":"Song B"}]}\n', encoding="utf-8")
task1 = Task("task-1", "local_file", str(root / "task-1" / "source.mp4"), "task-1", "split_done", "2026-01-01T00:00:00+00:00", "2026-01-01T00:00:00+00:00")
task2 = Task("task-2", "local_file", str(root / "task-2" / "source.mp4"), "task-2", "split_done", "2026-01-01T00:00:00+00:00", "2026-01-01T00:00:00+00:00")
ctx1 = TaskContext(None, "task-1", "session-1", "s", None, "part-1", "2026-04-04T09:23:00+08:00", None, None, task1.created_at, task1.updated_at)
ctx2 = TaskContext(None, "task-2", "session-1", "s", None, "part-2", "2026-04-04T09:25:00+08:00", None, None, task2.created_at, task2.updated_at)
artifacts = {
"task-1": [Artifact(None, "task-1", "clip_video", str(root / "a1.mp4"), "{}", task1.created_at)],
"task-2": [Artifact(None, "task-2", "clip_video", str(root / "b1.mp4"), "{}", task2.created_at)],
}
repo = _FakeRepo([task1, task2], [ctx1, ctx2], artifacts)
service = PublishService(_FakeRegistry(provider), repo)
service.run("task-1", {"provider": "biliup_cli", "session_dir": str(root)})
settings = provider.calls[0][2]
aggregate_txt = Path(str(settings["publish_songs_txt_path"])).read_text(encoding="utf-8")
aggregate_json = Path(str(settings["publish_songs_json_path"])).read_text(encoding="utf-8")
self.assertIn("P1:", aggregate_txt)
self.assertIn("Song A — Artist A", aggregate_txt)
self.assertIn("P2:", aggregate_txt)
self.assertIn("Song B — Artist B", aggregate_txt)
self.assertEqual(len(json.loads(aggregate_json)["songs"]), 3)
if __name__ == "__main__":
unittest.main()