150 lines
6.4 KiB
Python
150 lines
6.4 KiB
Python
from __future__ import annotations
|
|
|
|
import tempfile
|
|
import unittest
|
|
from http import HTTPStatus
|
|
from pathlib import Path
|
|
from types import SimpleNamespace
|
|
|
|
from biliup_next.app.control_plane_get_dispatcher import ControlPlaneGetDispatcher
|
|
from biliup_next.core.models import ActionRecord, Task, TaskContext
|
|
|
|
|
|
class FakeRepo:
|
|
def __init__(self, task: Task, context: TaskContext | None = None, actions: list[ActionRecord] | None = None) -> None:
|
|
self.task = task
|
|
self.context = context
|
|
self.actions = actions or []
|
|
|
|
def query_tasks(self, **kwargs): # type: ignore[no-untyped-def]
|
|
return [self.task], 1
|
|
|
|
def get_task(self, task_id: str) -> Task | None:
|
|
return self.task if task_id == self.task.id else None
|
|
|
|
def get_task_context(self, task_id: str) -> TaskContext | None:
|
|
return self.context if self.context and self.context.task_id == task_id else None
|
|
|
|
def list_task_contexts_for_task_ids(self, task_ids: list[str]) -> dict[str, TaskContext]:
|
|
if self.context and self.context.task_id in task_ids:
|
|
return {self.context.task_id: self.context}
|
|
return {}
|
|
|
|
def list_steps_for_task_ids(self, task_ids: list[str]) -> dict[str, list[object]]:
|
|
return {self.task.id: []} if self.task.id in task_ids else {}
|
|
|
|
def list_task_contexts_by_session_key(self, session_key: str) -> list[TaskContext]:
|
|
if self.context and self.context.session_key == session_key:
|
|
return [self.context]
|
|
return []
|
|
|
|
def list_steps(self, task_id: str) -> list[object]:
|
|
return []
|
|
|
|
def list_artifacts(self, task_id: str) -> list[object]:
|
|
return []
|
|
|
|
def list_action_records(
|
|
self,
|
|
task_id: str | None = None,
|
|
limit: int = 200,
|
|
action_name: str | None = None,
|
|
status: str | None = None,
|
|
) -> list[ActionRecord]:
|
|
items = list(self.actions)
|
|
if task_id is not None:
|
|
items = [item for item in items if item.task_id == task_id]
|
|
if action_name is not None:
|
|
items = [item for item in items if item.action_name == action_name]
|
|
if status is not None:
|
|
items = [item for item in items if item.status == status]
|
|
return items[:limit]
|
|
|
|
|
|
class FakeSettingsService:
|
|
def __init__(self, root) -> None: # type: ignore[no-untyped-def]
|
|
self.root = root
|
|
|
|
def load_redacted(self):
|
|
return SimpleNamespace(settings={"runtime": {"control_token": "secret"}})
|
|
|
|
def load(self):
|
|
return SimpleNamespace(schema={"title": "SettingsSchema"})
|
|
|
|
|
|
class ControlPlaneGetDispatcherTests(unittest.TestCase):
|
|
def _dispatcher(self, tmpdir: str, repo: FakeRepo) -> ControlPlaneGetDispatcher:
|
|
state = {
|
|
"root": Path(tmpdir),
|
|
"repo": repo,
|
|
"settings": {
|
|
"paths": {"session_dir": str(Path(tmpdir) / "session")},
|
|
"comment": {"post_split_comment": True, "post_full_video_timeline_comment": True},
|
|
"cleanup": {},
|
|
"publish": {},
|
|
},
|
|
"registry": SimpleNamespace(list_manifests=lambda: [{"name": "publish.biliup_cli"}]),
|
|
"manifests": [{"name": "publish.biliup_cli"}],
|
|
}
|
|
return ControlPlaneGetDispatcher(
|
|
state,
|
|
attention_state_fn=lambda payload: "running" if payload.get("status") == "running" else "stable",
|
|
delivery_state_label_fn=lambda payload: "pending_comment" if payload.get("delivery_state", {}).get("split_comment") == "pending" else "stable",
|
|
build_scheduler_preview_fn=lambda state, include_stage_scan=False, limit=200: {"items": [{"limit": limit}]},
|
|
settings_service_factory=FakeSettingsService,
|
|
)
|
|
|
|
def test_handle_settings_schema_returns_schema(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
task = Task("task-1", "local_file", "/tmp/source.mp4", "task-title", "published", "2026-01-01T00:00:00+00:00", "2026-01-01T00:00:00+00:00")
|
|
dispatcher = self._dispatcher(tmpdir, FakeRepo(task))
|
|
|
|
body, status = dispatcher.handle_settings_schema()
|
|
|
|
self.assertEqual(status, HTTPStatus.OK)
|
|
self.assertEqual(body["title"], "SettingsSchema")
|
|
|
|
def test_handle_history_filters_records(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
task = Task("task-1", "local_file", "/tmp/source.mp4", "task-title", "published", "2026-01-01T00:00:00+00:00", "2026-01-01T00:00:00+00:00")
|
|
actions = [
|
|
ActionRecord(None, "task-1", "comment", "ok", "comment ok", "{}", "2026-01-01T00:01:00+00:00"),
|
|
ActionRecord(None, "task-1", "publish", "error", "publish failed", "{}", "2026-01-01T00:02:00+00:00"),
|
|
]
|
|
dispatcher = self._dispatcher(tmpdir, FakeRepo(task, actions=actions))
|
|
|
|
body, status = dispatcher.handle_history(limit=100, task_id="task-1", action_name="comment", status="ok")
|
|
|
|
self.assertEqual(status, HTTPStatus.OK)
|
|
self.assertEqual(len(body["items"]), 1)
|
|
self.assertEqual(body["items"][0]["action_name"], "comment")
|
|
|
|
def test_handle_session_returns_not_found_when_missing(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
task = Task("task-1", "local_file", "/tmp/source.mp4", "task-title", "published", "2026-01-01T00:00:00+00:00", "2026-01-01T00:00:00+00:00")
|
|
dispatcher = self._dispatcher(tmpdir, FakeRepo(task))
|
|
|
|
body, status = dispatcher.handle_session("missing-session")
|
|
|
|
self.assertEqual(status, HTTPStatus.NOT_FOUND)
|
|
self.assertEqual(body["error"], "session not found")
|
|
|
|
def test_handle_tasks_filters_attention(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
task = Task("task-1", "local_file", "/tmp/source.mp4", "task-title", "running", "2026-01-01T00:00:00+00:00", "2026-01-01T00:00:00+00:00")
|
|
dispatcher = self._dispatcher(tmpdir, FakeRepo(task))
|
|
|
|
body, status = dispatcher.handle_tasks(
|
|
limit=10,
|
|
offset=0,
|
|
status=None,
|
|
search=None,
|
|
sort="updated_desc",
|
|
attention="running",
|
|
delivery=None,
|
|
)
|
|
|
|
self.assertEqual(status, HTTPStatus.OK)
|
|
self.assertEqual(body["total"], 1)
|
|
self.assertEqual(body["items"][0]["id"], "task-1")
|