from __future__ import annotations import unittest from types import SimpleNamespace from biliup_next.app.task_engine import infer_error_step_name, next_runnable_step from biliup_next.core.models import TaskStep class TaskEngineTests(unittest.TestCase): def test_infer_error_step_name_prefers_running_step(self) -> None: task = SimpleNamespace(status="running") steps = { "transcribe": TaskStep(None, "task-1", "transcribe", "running", None, None, 0, None, None), "song_detect": TaskStep(None, "task-1", "song_detect", "pending", None, None, 0, None, None), } self.assertEqual(infer_error_step_name(task, steps), "transcribe") def test_next_runnable_step_returns_none_while_a_step_is_running(self) -> None: task = SimpleNamespace(id="task-1", status="running") steps = { "transcribe": TaskStep(None, "task-1", "transcribe", "running", None, None, 0, None, None), "song_detect": TaskStep(None, "task-1", "song_detect", "pending", None, None, 0, None, None), } state = { "settings": { "comment": {"enabled": True}, "collection": {"enabled": True}, "paths": {}, "publish": {}, } } self.assertEqual(next_runnable_step(task, steps, state), (None, None)) def test_next_runnable_step_returns_wait_payload_for_retryable_publish(self) -> None: task = SimpleNamespace(id="task-1", status="failed_retryable") steps = { "publish": TaskStep( None, "task-1", "publish", "failed_retryable", "PUBLISH_UPLOAD_FAILED", "upload failed", 1, None, "2099-01-01T00:00:00+00:00", ) } state = { "settings": { "comment": {"enabled": True}, "collection": {"enabled": True}, "paths": {}, "publish": {"retry_schedule_minutes": [10]}, } } step_name, waiting_payload = next_runnable_step(task, steps, state) self.assertIsNone(step_name) self.assertIsNotNone(waiting_payload) self.assertTrue(waiting_payload["waiting_for_retry"]) self.assertEqual(waiting_payload["step"], "publish") def test_next_runnable_step_blocks_non_anchor_session_publish_until_anchor_runs(self) -> None: task = SimpleNamespace(id="task-2", status="split_done") steps = { "publish": TaskStep(None, "task-2", "publish", "pending", None, None, 0, None, None), } class _Repo: def get_task_context(self, task_id): # noqa: ANN001 return SimpleNamespace(task_id=task_id, session_key="session-1") def list_task_contexts_by_session_key(self, session_key): # noqa: ANN001 return [ SimpleNamespace(task_id="task-1", segment_started_at="2026-04-04T09:23:00+08:00", source_title="part-1"), SimpleNamespace(task_id="task-2", segment_started_at="2026-04-04T09:25:00+08:00", source_title="part-2"), ] def get_task(self, task_id): # noqa: ANN001 status = "split_done" return SimpleNamespace(id=task_id, status=status) state = { "repo": _Repo(), "settings": { "comment": {"enabled": True}, "collection": {"enabled": True}, "paths": {}, "publish": {}, }, } self.assertEqual(next_runnable_step(task, steps, state), (None, None)) def test_next_runnable_step_allows_anchor_session_publish_when_all_parts_split_done(self) -> None: task = SimpleNamespace(id="task-1", status="split_done") steps = { "publish": TaskStep(None, "task-1", "publish", "pending", None, None, 0, None, None), } class _Repo: def get_task_context(self, task_id): # noqa: ANN001 return SimpleNamespace(task_id=task_id, session_key="session-1") def list_task_contexts_by_session_key(self, session_key): # noqa: ANN001 return [ SimpleNamespace(task_id="task-1", segment_started_at="2026-04-04T09:23:00+08:00", source_title="part-1"), SimpleNamespace(task_id="task-2", segment_started_at="2026-04-04T09:25:00+08:00", source_title="part-2"), ] def get_task(self, task_id): # noqa: ANN001 return SimpleNamespace(id=task_id, status="split_done") state = { "repo": _Repo(), "settings": { "comment": {"enabled": True}, "collection": {"enabled": True}, "paths": {}, "publish": {}, }, } self.assertEqual(next_runnable_step(task, steps, state), ("publish", None)) if __name__ == "__main__": unittest.main()