96 lines
3.7 KiB
Python
96 lines
3.7 KiB
Python
from __future__ import annotations
|
|
|
|
import unittest
|
|
from types import SimpleNamespace
|
|
|
|
from biliup_next.app.task_policies import apply_disabled_step_fallbacks, resolve_failure
|
|
from biliup_next.core.errors import ModuleError
|
|
from biliup_next.core.models import TaskStep
|
|
|
|
|
|
class FakePolicyRepo:
|
|
def __init__(self, task, steps: list[TaskStep]) -> None: # type: ignore[no-untyped-def]
|
|
self.task = task
|
|
self.steps = steps
|
|
self.step_updates: list[tuple] = []
|
|
self.task_updates: list[tuple] = []
|
|
|
|
def get_task(self, task_id: str): # type: ignore[no-untyped-def]
|
|
return self.task if task_id == self.task.id else None
|
|
|
|
def list_steps(self, task_id: str) -> list[TaskStep]:
|
|
return list(self.steps) if task_id == self.task.id else []
|
|
|
|
def update_step_status(self, task_id: str, step_name: str, status: str, **kwargs) -> None: # type: ignore[no-untyped-def]
|
|
self.step_updates.append((task_id, step_name, status, kwargs))
|
|
|
|
def update_task_status(self, task_id: str, status: str, updated_at: str) -> None:
|
|
self.task_updates.append((task_id, status, updated_at))
|
|
|
|
|
|
class TaskPoliciesTests(unittest.TestCase):
|
|
def test_apply_disabled_step_fallbacks_marks_collection_done_when_disabled(self) -> None:
|
|
task = SimpleNamespace(id="task-1", status="commented")
|
|
repo = FakePolicyRepo(task, [])
|
|
state = {
|
|
"settings": {
|
|
"comment": {"enabled": True},
|
|
"collection": {"enabled": False},
|
|
"paths": {},
|
|
"publish": {},
|
|
}
|
|
}
|
|
|
|
changed = apply_disabled_step_fallbacks(state, task, repo)
|
|
|
|
self.assertTrue(changed)
|
|
self.assertEqual([update[1] for update in repo.step_updates], ["collection_a", "collection_b"])
|
|
self.assertEqual(repo.task_updates[-1][1], "collection_synced")
|
|
|
|
def test_resolve_failure_uses_publish_retry_schedule(self) -> None:
|
|
task = SimpleNamespace(id="task-1", status="running")
|
|
steps = [
|
|
TaskStep(None, "task-1", "publish", "running", None, None, 0, "2026-01-01T00:00:00+00:00", None),
|
|
]
|
|
repo = FakePolicyRepo(task, steps)
|
|
state = {
|
|
"settings": {
|
|
"publish": {"retry_schedule_minutes": [15, 5]},
|
|
"comment": {},
|
|
"paths": {},
|
|
}
|
|
}
|
|
exc = ModuleError(code="PUBLISH_UPLOAD_FAILED", message="upload failed", retryable=True)
|
|
|
|
failure = resolve_failure(task, repo, state, exc)
|
|
|
|
self.assertEqual(failure["step_name"], "publish")
|
|
self.assertEqual(failure["payload"]["retry_status"], "failed_retryable")
|
|
self.assertEqual(failure["payload"]["next_retry_delay_seconds"], 900)
|
|
self.assertEqual(repo.step_updates[-1][1], "publish")
|
|
self.assertEqual(repo.task_updates[-1][1], "failed_retryable")
|
|
|
|
def test_resolve_failure_uses_rate_limit_schedule_for_publish_601(self) -> None:
|
|
task = SimpleNamespace(id="task-1", status="running")
|
|
steps = [
|
|
TaskStep(None, "task-1", "publish", "running", None, None, 0, "2026-01-01T00:00:00+00:00", None),
|
|
]
|
|
repo = FakePolicyRepo(task, steps)
|
|
state = {
|
|
"settings": {
|
|
"publish": {"retry_schedule_minutes": [15, 5], "rate_limit_retry_schedule_minutes": [30, 60]},
|
|
"comment": {},
|
|
"paths": {},
|
|
}
|
|
}
|
|
exc = ModuleError(code="PUBLISH_RATE_LIMITED", message="rate limited", retryable=True)
|
|
|
|
failure = resolve_failure(task, repo, state, exc)
|
|
|
|
self.assertEqual(failure["payload"]["next_retry_delay_seconds"], 1800)
|
|
self.assertEqual(repo.task_updates[-1][1], "failed_retryable")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|