from __future__ import annotations import unittest from types import SimpleNamespace from biliup_next.app.task_policies import apply_disabled_step_fallbacks, resolve_failure from biliup_next.core.errors import ModuleError from biliup_next.core.models import TaskStep class FakePolicyRepo: def __init__(self, task, steps: list[TaskStep]) -> None: # type: ignore[no-untyped-def] self.task = task self.steps = steps self.step_updates: list[tuple] = [] self.task_updates: list[tuple] = [] def get_task(self, task_id: str): # type: ignore[no-untyped-def] return self.task if task_id == self.task.id else None def list_steps(self, task_id: str) -> list[TaskStep]: return list(self.steps) if task_id == self.task.id else [] def update_step_status(self, task_id: str, step_name: str, status: str, **kwargs) -> None: # type: ignore[no-untyped-def] self.step_updates.append((task_id, step_name, status, kwargs)) def update_task_status(self, task_id: str, status: str, updated_at: str) -> None: self.task_updates.append((task_id, status, updated_at)) class TaskPoliciesTests(unittest.TestCase): def test_apply_disabled_step_fallbacks_marks_collection_done_when_disabled(self) -> None: task = SimpleNamespace(id="task-1", status="commented") repo = FakePolicyRepo(task, []) state = { "settings": { "comment": {"enabled": True}, "collection": {"enabled": False}, "paths": {}, "publish": {}, } } changed = apply_disabled_step_fallbacks(state, task, repo) self.assertTrue(changed) self.assertEqual([update[1] for update in repo.step_updates], ["collection_a", "collection_b"]) self.assertEqual(repo.task_updates[-1][1], "collection_synced") def test_resolve_failure_uses_publish_retry_schedule(self) -> None: task = SimpleNamespace(id="task-1", status="running") steps = [ TaskStep(None, "task-1", "publish", "running", None, None, 0, "2026-01-01T00:00:00+00:00", None), ] repo = FakePolicyRepo(task, steps) state = { "settings": { "publish": {"retry_schedule_minutes": [15, 5]}, "comment": {}, "paths": {}, } } exc = ModuleError(code="PUBLISH_UPLOAD_FAILED", message="upload failed", retryable=True) failure = resolve_failure(task, repo, state, exc) self.assertEqual(failure["step_name"], "publish") self.assertEqual(failure["payload"]["retry_status"], "failed_retryable") self.assertEqual(failure["payload"]["next_retry_delay_seconds"], 900) self.assertEqual(repo.step_updates[-1][1], "publish") self.assertEqual(repo.task_updates[-1][1], "failed_retryable") if __name__ == "__main__": unittest.main()