1061 lines
45 KiB
Python
1061 lines
45 KiB
Python
from __future__ import annotations
|
|
|
|
import io
|
|
import json
|
|
import tempfile
|
|
import unittest
|
|
from contextlib import ExitStack
|
|
from http import HTTPStatus
|
|
from pathlib import Path
|
|
from types import SimpleNamespace
|
|
from unittest.mock import patch
|
|
|
|
from biliup_next.app.api_server import ApiHandler
|
|
from biliup_next.core.models import ActionRecord, Artifact, Task, TaskContext, TaskStep
|
|
|
|
|
|
class FakeRepo:
|
|
def __init__(
|
|
self,
|
|
task: Task,
|
|
*,
|
|
context: TaskContext | None = None,
|
|
steps: list[TaskStep] | None = None,
|
|
artifacts: list[Artifact] | None = None,
|
|
actions: list[ActionRecord] | None = None,
|
|
) -> None:
|
|
self.task = task
|
|
self.context = context
|
|
self.steps = steps or []
|
|
self.artifacts = artifacts or []
|
|
self.actions = actions or []
|
|
|
|
def query_tasks(self, **kwargs): # type: ignore[no-untyped-def]
|
|
return [self.task], 1
|
|
|
|
def list_task_contexts_for_task_ids(self, task_ids: list[str]) -> dict[str, TaskContext]:
|
|
if self.context and self.context.task_id in task_ids:
|
|
return {self.context.task_id: self.context}
|
|
return {}
|
|
|
|
def list_steps_for_task_ids(self, task_ids: list[str]) -> dict[str, list[TaskStep]]:
|
|
if self.task.id in task_ids:
|
|
return {self.task.id: list(self.steps)}
|
|
return {}
|
|
|
|
def get_task(self, task_id: str) -> Task | None:
|
|
return self.task if task_id == self.task.id else None
|
|
|
|
def get_task_context(self, task_id: str) -> TaskContext | None:
|
|
return self.context if self.context and task_id == self.context.task_id else None
|
|
|
|
def list_steps(self, task_id: str) -> list[TaskStep]:
|
|
return list(self.steps) if task_id == self.task.id else []
|
|
|
|
def list_artifacts(self, task_id: str) -> list[Artifact]:
|
|
return list(self.artifacts) if task_id == self.task.id else []
|
|
|
|
def list_action_records(
|
|
self,
|
|
task_id: str | None = None,
|
|
limit: int = 200,
|
|
action_name: str | None = None,
|
|
status: str | None = None,
|
|
) -> list[ActionRecord]:
|
|
items = list(self.actions)
|
|
if task_id is not None:
|
|
items = [item for item in items if item.task_id == task_id]
|
|
if action_name is not None:
|
|
items = [item for item in items if item.action_name == action_name]
|
|
if status is not None:
|
|
items = [item for item in items if item.status == status]
|
|
return items[:limit]
|
|
|
|
def add_action_record(self, action: ActionRecord) -> None:
|
|
self.actions.append(action)
|
|
|
|
def list_task_contexts_by_session_key(self, session_key: str) -> list[TaskContext]:
|
|
if self.context and self.context.session_key == session_key:
|
|
return [self.context]
|
|
return []
|
|
|
|
|
|
class FakeSettingsService:
|
|
save_calls: list[dict[str, object]] = []
|
|
promote_calls: int = 0
|
|
|
|
def __init__(self, root) -> None: # type: ignore[no-untyped-def]
|
|
self.root = root
|
|
|
|
def save_staged_from_redacted(self, payload: dict[str, object]) -> None:
|
|
self.__class__.save_calls.append(payload)
|
|
|
|
def promote_staged(self) -> None:
|
|
self.__class__.promote_calls += 1
|
|
|
|
@classmethod
|
|
def reset(cls) -> None:
|
|
cls.save_calls = []
|
|
cls.promote_calls = 0
|
|
|
|
|
|
class FakeSettingsReader:
|
|
def __init__(self, root, *, settings=None, schema=None) -> None: # type: ignore[no-untyped-def]
|
|
self.root = root
|
|
self._settings = settings or {}
|
|
self._schema = schema or {}
|
|
|
|
def load_redacted(self):
|
|
return SimpleNamespace(settings=self._settings)
|
|
|
|
def load(self):
|
|
return SimpleNamespace(schema=self._schema)
|
|
|
|
|
|
class ApiServerTests(unittest.TestCase):
|
|
def _state(
|
|
self,
|
|
tmpdir: str,
|
|
repo: FakeRepo,
|
|
*,
|
|
control_token: str = "",
|
|
ingest_service: object | None = None,
|
|
) -> dict[str, object]:
|
|
state = {
|
|
"root": Path(tmpdir),
|
|
"repo": repo,
|
|
"settings": {
|
|
"runtime": {"control_token": control_token},
|
|
"paths": {"session_dir": str(Path(tmpdir) / "session")},
|
|
"comment": {"post_split_comment": True, "post_full_video_timeline_comment": True},
|
|
"cleanup": {},
|
|
"publish": {"retry_schedule_minutes": [10]},
|
|
},
|
|
"registry": SimpleNamespace(list_manifests=lambda: []),
|
|
"manifests": [],
|
|
}
|
|
if ingest_service is not None:
|
|
state["ingest_service"] = ingest_service
|
|
return state
|
|
|
|
def _request(
|
|
self,
|
|
method: str,
|
|
path: str,
|
|
state: dict[str, object],
|
|
*,
|
|
body: bytes = b"",
|
|
headers: dict[str, str] | None = None,
|
|
) -> tuple[int, dict[str, str], object]:
|
|
handler = ApiHandler.__new__(ApiHandler)
|
|
handler.path = path
|
|
handler.headers = {"Content-Length": str(len(body)), **(headers or {})}
|
|
handler.rfile = io.BytesIO(body)
|
|
handler.wfile = io.BytesIO()
|
|
|
|
response_status: dict[str, int] = {"value": HTTPStatus.OK}
|
|
response_headers: dict[str, str] = {}
|
|
|
|
def send_response(status: int, message: str | None = None) -> None:
|
|
response_status["value"] = int(status)
|
|
|
|
def send_header(name: str, value: str) -> None:
|
|
response_headers[name] = value
|
|
|
|
handler.send_response = send_response # type: ignore[method-assign]
|
|
handler.send_header = send_header # type: ignore[method-assign]
|
|
handler.end_headers = lambda: None # type: ignore[method-assign]
|
|
handler.log_message = lambda format, *args: None # type: ignore[method-assign]
|
|
|
|
with patch("biliup_next.app.api_server.ensure_initialized", return_value=state):
|
|
getattr(handler, f"do_{method}")()
|
|
|
|
raw_body = handler.wfile.getvalue().decode("utf-8")
|
|
parsed_body = json.loads(raw_body) if raw_body else None
|
|
return response_status["value"], response_headers, parsed_body
|
|
|
|
def test_get_tasks_returns_serialized_items(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
task = Task(
|
|
id="task-1",
|
|
source_type="local_file",
|
|
source_path=str(Path(tmpdir) / "session" / "task-title" / "source.mp4"),
|
|
title="task-title",
|
|
status="published",
|
|
created_at="2026-01-01T00:00:00+00:00",
|
|
updated_at="2026-01-01T00:01:00+00:00",
|
|
)
|
|
state = self._state(tmpdir, FakeRepo(task))
|
|
|
|
response_status, _, body = self._request("GET", "/tasks?limit=10&offset=0", state)
|
|
|
|
self.assertEqual(response_status, 200)
|
|
self.assertEqual(body["total"], 1)
|
|
self.assertEqual(body["items"][0]["id"], "task-1")
|
|
self.assertEqual(body["items"][0]["delivery_state"]["split_comment"], "pending")
|
|
|
|
def test_get_task_timeline_returns_serialized_timeline(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
task = Task(
|
|
id="task-1",
|
|
source_type="local_file",
|
|
source_path=str(Path(tmpdir) / "session" / "task-title" / "source.mp4"),
|
|
title="task-title",
|
|
status="published",
|
|
created_at="2026-01-01T00:00:00+00:00",
|
|
updated_at="2026-01-01T00:02:00+00:00",
|
|
)
|
|
steps = [
|
|
TaskStep(
|
|
id=None,
|
|
task_id="task-1",
|
|
step_name="publish",
|
|
status="failed_retryable",
|
|
error_code="ERR_UPLOAD",
|
|
error_message="upload failed",
|
|
retry_count=1,
|
|
started_at="2026-01-01T00:01:00+00:00",
|
|
finished_at="2026-01-01T00:01:30+00:00",
|
|
)
|
|
]
|
|
artifacts = [
|
|
Artifact(
|
|
id=None,
|
|
task_id="task-1",
|
|
artifact_type="publish_bvid",
|
|
path="/tmp/bvid.txt",
|
|
metadata_json="{}",
|
|
created_at="2026-01-01T00:01:40+00:00",
|
|
)
|
|
]
|
|
actions = [
|
|
ActionRecord(
|
|
id=None,
|
|
task_id="task-1",
|
|
action_name="comment",
|
|
status="ok",
|
|
summary="comment finished",
|
|
details_json=json.dumps({"split": {"status": "ok"}, "full": {"status": "skipped"}}),
|
|
created_at="2026-01-01T00:01:50+00:00",
|
|
)
|
|
]
|
|
state = self._state(tmpdir, FakeRepo(task, steps=steps, artifacts=artifacts, actions=actions))
|
|
|
|
response_status, _, body = self._request("GET", "/tasks/task-1/timeline", state)
|
|
|
|
self.assertEqual(response_status, 200)
|
|
self.assertGreaterEqual(len(body["items"]), 4)
|
|
self.assertEqual(body["items"][0]["kind"], "task")
|
|
action_item = next(item for item in body["items"] if item["kind"] == "action")
|
|
self.assertIn("split=ok", action_item["summary"])
|
|
|
|
def test_get_session_returns_serialized_tasks(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
task = Task(
|
|
id="task-1",
|
|
source_type="local_file",
|
|
source_path=str(Path(tmpdir) / "session" / "task-title" / "source.mp4"),
|
|
title="task-title",
|
|
status="published",
|
|
created_at="2026-01-01T00:00:00+00:00",
|
|
updated_at="2026-01-01T00:01:00+00:00",
|
|
)
|
|
context = TaskContext(
|
|
id=None,
|
|
task_id="task-1",
|
|
session_key="session-1",
|
|
streamer="streamer",
|
|
room_id="room-1",
|
|
source_title="task-title",
|
|
segment_started_at=None,
|
|
segment_duration_seconds=None,
|
|
full_video_bvid="BVFULL123",
|
|
created_at="2026-01-01T00:00:00+00:00",
|
|
updated_at="2026-01-01T00:01:00+00:00",
|
|
)
|
|
state = self._state(tmpdir, FakeRepo(task, context=context))
|
|
|
|
response_status, _, body = self._request("GET", "/sessions/session-1", state)
|
|
|
|
self.assertEqual(response_status, 200)
|
|
self.assertEqual(body["session_key"], "session-1")
|
|
self.assertEqual(body["full_video_bvid"], "BVFULL123")
|
|
self.assertEqual(body["tasks"][0]["session_context"]["session_key"], "session-1")
|
|
|
|
def test_get_tasks_requires_control_token_when_configured(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
task = Task(
|
|
id="task-1",
|
|
source_type="local_file",
|
|
source_path=str(Path(tmpdir) / "session" / "task-title" / "source.mp4"),
|
|
title="task-title",
|
|
status="published",
|
|
created_at="2026-01-01T00:00:00+00:00",
|
|
updated_at="2026-01-01T00:01:00+00:00",
|
|
)
|
|
state = self._state(tmpdir, FakeRepo(task), control_token="secret")
|
|
|
|
unauthorized_status, _, unauthorized_body = self._request("GET", "/tasks", state)
|
|
authorized_status, _, authorized_body = self._request(
|
|
"GET",
|
|
"/tasks",
|
|
state,
|
|
headers={"X-Biliup-Token": "secret"},
|
|
)
|
|
|
|
self.assertEqual(unauthorized_status, 401)
|
|
self.assertEqual(unauthorized_body["error"], "unauthorized")
|
|
self.assertEqual(authorized_status, 200)
|
|
self.assertEqual(authorized_body["items"][0]["id"], "task-1")
|
|
|
|
def test_get_history_filters_action_records(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
task = Task(
|
|
id="task-1",
|
|
source_type="local_file",
|
|
source_path=str(Path(tmpdir) / "session" / "task-title" / "source.mp4"),
|
|
title="task-title",
|
|
status="published",
|
|
created_at="2026-01-01T00:00:00+00:00",
|
|
updated_at="2026-01-01T00:01:00+00:00",
|
|
)
|
|
actions = [
|
|
ActionRecord(None, "task-1", "comment", "ok", "comment ok", "{}", "2026-01-01T00:01:00+00:00"),
|
|
ActionRecord(None, "task-1", "publish", "error", "publish error", "{}", "2026-01-01T00:02:00+00:00"),
|
|
]
|
|
state = self._state(tmpdir, FakeRepo(task, actions=actions))
|
|
|
|
response_status, _, body = self._request("GET", "/history?task_id=task-1&action_name=comment&status=ok", state)
|
|
|
|
self.assertEqual(response_status, 200)
|
|
self.assertEqual(len(body["items"]), 1)
|
|
self.assertEqual(body["items"][0]["action_name"], "comment")
|
|
|
|
def test_get_modules_returns_registry_and_discovered_manifests(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
task = Task(
|
|
id="task-1",
|
|
source_type="local_file",
|
|
source_path=str(Path(tmpdir) / "session" / "task-title" / "source.mp4"),
|
|
title="task-title",
|
|
status="published",
|
|
created_at="2026-01-01T00:00:00+00:00",
|
|
updated_at="2026-01-01T00:01:00+00:00",
|
|
)
|
|
state = self._state(tmpdir, FakeRepo(task))
|
|
state["registry"] = SimpleNamespace(list_manifests=lambda: [{"name": "publish.biliup_cli"}])
|
|
state["manifests"] = [{"name": "publish.biliup_cli", "path": "plugins/publish.json"}]
|
|
|
|
response_status, _, body = self._request("GET", "/modules", state)
|
|
|
|
self.assertEqual(response_status, 200)
|
|
self.assertEqual(body["items"][0]["name"], "publish.biliup_cli")
|
|
self.assertEqual(body["discovered_manifests"][0]["path"], "plugins/publish.json")
|
|
|
|
def test_get_scheduler_preview_returns_builder_payload(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
task = Task(
|
|
id="task-1",
|
|
source_type="local_file",
|
|
source_path=str(Path(tmpdir) / "session" / "task-title" / "source.mp4"),
|
|
title="task-title",
|
|
status="published",
|
|
created_at="2026-01-01T00:00:00+00:00",
|
|
updated_at="2026-01-01T00:01:00+00:00",
|
|
)
|
|
state = self._state(tmpdir, FakeRepo(task))
|
|
|
|
with patch(
|
|
"biliup_next.app.api_server.build_scheduler_preview",
|
|
return_value={"items": [{"task_id": "task-1"}]},
|
|
) as preview_mock:
|
|
response_status, _, body = self._request("GET", "/scheduler/preview", state)
|
|
|
|
self.assertEqual(response_status, 200)
|
|
self.assertEqual(body["items"][0]["task_id"], "task-1")
|
|
preview_mock.assert_called_once()
|
|
|
|
def test_get_settings_schema_returns_schema_payload(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
task = Task(
|
|
id="task-1",
|
|
source_type="local_file",
|
|
source_path=str(Path(tmpdir) / "session" / "task-title" / "source.mp4"),
|
|
title="task-title",
|
|
status="published",
|
|
created_at="2026-01-01T00:00:00+00:00",
|
|
updated_at="2026-01-01T00:01:00+00:00",
|
|
)
|
|
state = self._state(tmpdir, FakeRepo(task))
|
|
fake_service = FakeSettingsReader(Path(tmpdir), schema={"title": "SettingsSchema"})
|
|
|
|
with patch("biliup_next.app.api_server.SettingsService", return_value=fake_service):
|
|
response_status, _, body = self._request("GET", "/settings/schema", state)
|
|
|
|
self.assertEqual(response_status, 200)
|
|
self.assertEqual(body["title"], "SettingsSchema")
|
|
|
|
def test_put_settings_promotes_and_resets_initialized_state(self) -> None:
|
|
FakeSettingsService.reset()
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
state = {
|
|
"root": Path(tmpdir),
|
|
"settings": {"runtime": {"control_token": ""}},
|
|
}
|
|
|
|
with ExitStack() as stack:
|
|
ensure_mock = stack.enter_context(
|
|
patch("biliup_next.app.api_server.ensure_initialized", return_value=state)
|
|
)
|
|
reset_mock = stack.enter_context(patch("biliup_next.app.api_server.reset_initialized_state"))
|
|
stack.enter_context(patch("biliup_next.app.api_server.SettingsService", FakeSettingsService))
|
|
payload = json.dumps({"runtime": {"control_token": "abc"}})
|
|
handler = ApiHandler.__new__(ApiHandler)
|
|
handler.path = "/settings"
|
|
handler.headers = {
|
|
"Content-Length": str(len(payload.encode("utf-8"))),
|
|
"Content-Type": "application/json",
|
|
}
|
|
handler.rfile = io.BytesIO(payload.encode("utf-8"))
|
|
handler.wfile = io.BytesIO()
|
|
|
|
response_status: dict[str, int] = {"value": HTTPStatus.OK}
|
|
handler.send_response = lambda status, message=None: response_status.__setitem__("value", int(status)) # type: ignore[method-assign]
|
|
handler.send_header = lambda name, value: None # type: ignore[method-assign]
|
|
handler.end_headers = lambda: None # type: ignore[method-assign]
|
|
handler.log_message = lambda format, *args: None # type: ignore[method-assign]
|
|
|
|
handler.do_PUT()
|
|
body = json.loads(handler.wfile.getvalue().decode("utf-8"))
|
|
|
|
self.assertEqual(response_status["value"], 200)
|
|
self.assertEqual(body["ok"], True)
|
|
self.assertEqual(FakeSettingsService.save_calls[-1], {"runtime": {"control_token": "abc"}})
|
|
self.assertEqual(FakeSettingsService.promote_calls, 1)
|
|
reset_mock.assert_called_once()
|
|
self.assertGreaterEqual(ensure_mock.call_count, 2)
|
|
|
|
def test_post_tasks_creates_task_from_source_path(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
source_path = str(Path(tmpdir) / "stage" / "source.mp4")
|
|
created_task = Task(
|
|
id="task-new",
|
|
source_type="local_file",
|
|
source_path=source_path,
|
|
title="new-task",
|
|
status="created",
|
|
created_at="2026-01-01T00:00:00+00:00",
|
|
updated_at="2026-01-01T00:00:00+00:00",
|
|
)
|
|
ingest_service = SimpleNamespace(create_task_from_file=lambda path, settings: created_task)
|
|
repo = FakeRepo(created_task)
|
|
state = self._state(tmpdir, repo, ingest_service=ingest_service)
|
|
state["settings"]["ingest"] = {"min_duration_seconds": 60}
|
|
state["settings"]["paths"]["stage_dir"] = str(Path(tmpdir) / "stage")
|
|
|
|
response_status, _, body = self._request(
|
|
"POST",
|
|
"/tasks",
|
|
state,
|
|
body=json.dumps({"source_path": source_path}).encode("utf-8"),
|
|
headers={"Content-Type": "application/json"},
|
|
)
|
|
|
|
self.assertEqual(response_status, 201)
|
|
self.assertEqual(body["id"], "task-new")
|
|
self.assertEqual(body["source_path"], source_path)
|
|
|
|
def test_post_tasks_creates_task_from_bilibili_url(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
source_url = "https://www.bilibili.com/video/BV1TEST1234"
|
|
created_task = Task(
|
|
id="task-bv",
|
|
source_type="bilibili_url",
|
|
source_path=str(Path(tmpdir) / "session" / "task-bv" / "task-bv.mp4"),
|
|
title="video-title",
|
|
status="created",
|
|
created_at="2026-01-01T00:00:00+00:00",
|
|
updated_at="2026-01-01T00:00:00+00:00",
|
|
)
|
|
ingest_service = SimpleNamespace(create_task_from_url=lambda url, settings: created_task)
|
|
repo = FakeRepo(created_task)
|
|
state = self._state(tmpdir, repo, ingest_service=ingest_service)
|
|
state["settings"]["ingest"] = {"provider": "bilibili_url", "yt_dlp_cmd": "yt-dlp"}
|
|
|
|
response_status, _, body = self._request(
|
|
"POST",
|
|
"/tasks",
|
|
state,
|
|
body=json.dumps({"source_type": "bilibili_url", "source_url": source_url}).encode("utf-8"),
|
|
headers={"Content-Type": "application/json"},
|
|
)
|
|
|
|
self.assertEqual(response_status, 201)
|
|
self.assertEqual(body["id"], "task-bv")
|
|
self.assertEqual(body["source_type"], "bilibili_url")
|
|
|
|
def test_post_run_task_action_returns_accepted_payload(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
task = Task(
|
|
id="task-1",
|
|
source_type="local_file",
|
|
source_path=str(Path(tmpdir) / "session" / "task-title" / "source.mp4"),
|
|
title="task-title",
|
|
status="created",
|
|
created_at="2026-01-01T00:00:00+00:00",
|
|
updated_at="2026-01-01T00:00:00+00:00",
|
|
)
|
|
state = self._state(tmpdir, FakeRepo(task))
|
|
|
|
with patch("biliup_next.app.api_server.run_task_action", return_value={"ok": True, "task_id": "task-1"}) as run_mock:
|
|
response_status, _, body = self._request("POST", "/tasks/task-1/actions/run", state)
|
|
|
|
self.assertEqual(response_status, 202)
|
|
self.assertEqual(body["task_id"], "task-1")
|
|
run_mock.assert_called_once_with("task-1")
|
|
|
|
def test_post_retry_step_requires_step_name(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
task = Task(
|
|
id="task-1",
|
|
source_type="local_file",
|
|
source_path=str(Path(tmpdir) / "session" / "task-title" / "source.mp4"),
|
|
title="task-title",
|
|
status="failed_retryable",
|
|
created_at="2026-01-01T00:00:00+00:00",
|
|
updated_at="2026-01-01T00:00:00+00:00",
|
|
)
|
|
state = self._state(tmpdir, FakeRepo(task))
|
|
|
|
response_status, _, body = self._request(
|
|
"POST",
|
|
"/tasks/task-1/actions/retry-step",
|
|
state,
|
|
body=json.dumps({}).encode("utf-8"),
|
|
headers={"Content-Type": "application/json"},
|
|
)
|
|
|
|
self.assertEqual(response_status, 400)
|
|
self.assertEqual(body["error"], "missing step_name")
|
|
|
|
def test_post_retry_step_dispatches_to_action(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
task = Task(
|
|
id="task-1",
|
|
source_type="local_file",
|
|
source_path=str(Path(tmpdir) / "session" / "task-title" / "source.mp4"),
|
|
title="task-title",
|
|
status="failed_retryable",
|
|
created_at="2026-01-01T00:00:00+00:00",
|
|
updated_at="2026-01-01T00:00:00+00:00",
|
|
)
|
|
state = self._state(tmpdir, FakeRepo(task))
|
|
|
|
with patch(
|
|
"biliup_next.app.api_server.retry_step_action",
|
|
return_value={"ok": True, "task_id": "task-1", "step_name": "publish"},
|
|
) as retry_mock:
|
|
response_status, _, body = self._request(
|
|
"POST",
|
|
"/tasks/task-1/actions/retry-step",
|
|
state,
|
|
body=json.dumps({"step_name": "publish"}).encode("utf-8"),
|
|
headers={"Content-Type": "application/json"},
|
|
)
|
|
|
|
self.assertEqual(response_status, 202)
|
|
self.assertEqual(body["step_name"], "publish")
|
|
retry_mock.assert_called_once_with("task-1", "publish")
|
|
|
|
def test_post_reset_to_step_dispatches_to_action(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
task = Task(
|
|
id="task-1",
|
|
source_type="local_file",
|
|
source_path=str(Path(tmpdir) / "session" / "task-title" / "source.mp4"),
|
|
title="task-title",
|
|
status="published",
|
|
created_at="2026-01-01T00:00:00+00:00",
|
|
updated_at="2026-01-01T00:00:00+00:00",
|
|
)
|
|
state = self._state(tmpdir, FakeRepo(task))
|
|
|
|
with patch(
|
|
"biliup_next.app.api_server.reset_to_step_action",
|
|
return_value={"ok": True, "task_id": "task-1", "step_name": "split"},
|
|
) as reset_mock:
|
|
response_status, _, body = self._request(
|
|
"POST",
|
|
"/tasks/task-1/actions/reset-to-step",
|
|
state,
|
|
body=json.dumps({"step_name": "split"}).encode("utf-8"),
|
|
headers={"Content-Type": "application/json"},
|
|
)
|
|
|
|
self.assertEqual(response_status, 202)
|
|
self.assertEqual(body["step_name"], "split")
|
|
reset_mock.assert_called_once_with("task-1", "split")
|
|
|
|
def test_post_bind_full_video_requires_bvid(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
task = Task(
|
|
id="task-1",
|
|
source_type="local_file",
|
|
source_path=str(Path(tmpdir) / "session" / "task-title" / "source.mp4"),
|
|
title="task-title",
|
|
status="published",
|
|
created_at="2026-01-01T00:00:00+00:00",
|
|
updated_at="2026-01-01T00:00:00+00:00",
|
|
)
|
|
state = self._state(tmpdir, FakeRepo(task))
|
|
|
|
response_status, _, body = self._request(
|
|
"POST",
|
|
"/tasks/task-1/bind-full-video",
|
|
state,
|
|
body=json.dumps({}).encode("utf-8"),
|
|
headers={"Content-Type": "application/json"},
|
|
)
|
|
|
|
self.assertEqual(response_status, 400)
|
|
self.assertEqual(body["error"], "missing full_video_bvid")
|
|
|
|
def test_post_bind_full_video_maps_task_not_found_to_404(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
task = Task(
|
|
id="task-1",
|
|
source_type="local_file",
|
|
source_path=str(Path(tmpdir) / "session" / "task-title" / "source.mp4"),
|
|
title="task-title",
|
|
status="published",
|
|
created_at="2026-01-01T00:00:00+00:00",
|
|
updated_at="2026-01-01T00:00:00+00:00",
|
|
)
|
|
state = self._state(tmpdir, FakeRepo(task))
|
|
|
|
with patch(
|
|
"biliup_next.app.api_server.bind_full_video_action",
|
|
return_value={"error": {"code": "TASK_NOT_FOUND", "message": "missing"}},
|
|
) as bind_mock:
|
|
response_status, _, body = self._request(
|
|
"POST",
|
|
"/tasks/missing/bind-full-video",
|
|
state,
|
|
body=json.dumps({"full_video_bvid": "BVFULL123"}).encode("utf-8"),
|
|
headers={"Content-Type": "application/json"},
|
|
)
|
|
|
|
self.assertEqual(response_status, 404)
|
|
self.assertEqual(body["error"]["code"], "TASK_NOT_FOUND")
|
|
bind_mock.assert_called_once_with("missing", "BVFULL123")
|
|
|
|
def test_post_bind_full_video_dispatches_to_action(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
task = Task(
|
|
id="task-1",
|
|
source_type="local_file",
|
|
source_path=str(Path(tmpdir) / "session" / "task-title" / "source.mp4"),
|
|
title="task-title",
|
|
status="published",
|
|
created_at="2026-01-01T00:00:00+00:00",
|
|
updated_at="2026-01-01T00:00:00+00:00",
|
|
)
|
|
state = self._state(tmpdir, FakeRepo(task))
|
|
|
|
with patch(
|
|
"biliup_next.app.api_server.bind_full_video_action",
|
|
return_value={"ok": True, "task_id": "task-1", "full_video_bvid": "BVFULL123"},
|
|
) as bind_mock:
|
|
response_status, _, body = self._request(
|
|
"POST",
|
|
"/tasks/task-1/bind-full-video",
|
|
state,
|
|
body=json.dumps({"full_video_bvid": "BVFULL123"}).encode("utf-8"),
|
|
headers={"Content-Type": "application/json"},
|
|
)
|
|
|
|
self.assertEqual(response_status, 202)
|
|
self.assertEqual(body["full_video_bvid"], "BVFULL123")
|
|
bind_mock.assert_called_once_with("task-1", "BVFULL123")
|
|
|
|
def test_post_session_rebind_maps_session_not_found_to_404(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
task = Task(
|
|
id="task-1",
|
|
source_type="local_file",
|
|
source_path=str(Path(tmpdir) / "session" / "task-title" / "source.mp4"),
|
|
title="task-title",
|
|
status="published",
|
|
created_at="2026-01-01T00:00:00+00:00",
|
|
updated_at="2026-01-01T00:00:00+00:00",
|
|
)
|
|
state = self._state(tmpdir, FakeRepo(task))
|
|
|
|
with patch(
|
|
"biliup_next.app.api_server.rebind_session_full_video_action",
|
|
return_value={"error": {"code": "SESSION_NOT_FOUND", "message": "missing"}},
|
|
) as rebind_mock:
|
|
response_status, _, body = self._request(
|
|
"POST",
|
|
"/sessions/session-x/rebind",
|
|
state,
|
|
body=json.dumps({"full_video_bvid": "BVFULL123"}).encode("utf-8"),
|
|
headers={"Content-Type": "application/json"},
|
|
)
|
|
|
|
self.assertEqual(response_status, 404)
|
|
self.assertEqual(body["error"]["code"], "SESSION_NOT_FOUND")
|
|
rebind_mock.assert_called_once_with("session-x", "BVFULL123")
|
|
|
|
def test_post_session_rebind_dispatches_to_action(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
task = Task(
|
|
id="task-1",
|
|
source_type="local_file",
|
|
source_path=str(Path(tmpdir) / "session" / "task-title" / "source.mp4"),
|
|
title="task-title",
|
|
status="published",
|
|
created_at="2026-01-01T00:00:00+00:00",
|
|
updated_at="2026-01-01T00:00:00+00:00",
|
|
)
|
|
state = self._state(tmpdir, FakeRepo(task))
|
|
|
|
with patch(
|
|
"biliup_next.app.api_server.rebind_session_full_video_action",
|
|
return_value={"ok": True, "session_key": "session-1", "full_video_bvid": "BVFULL123"},
|
|
) as rebind_mock:
|
|
response_status, _, body = self._request(
|
|
"POST",
|
|
"/sessions/session-1/rebind",
|
|
state,
|
|
body=json.dumps({"full_video_bvid": "BVFULL123"}).encode("utf-8"),
|
|
headers={"Content-Type": "application/json"},
|
|
)
|
|
|
|
self.assertEqual(response_status, 202)
|
|
self.assertEqual(body["session_key"], "session-1")
|
|
rebind_mock.assert_called_once_with("session-1", "BVFULL123")
|
|
|
|
def test_post_session_merge_requires_task_ids(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
task = Task(
|
|
id="task-1",
|
|
source_type="local_file",
|
|
source_path=str(Path(tmpdir) / "session" / "task-title" / "source.mp4"),
|
|
title="task-title",
|
|
status="published",
|
|
created_at="2026-01-01T00:00:00+00:00",
|
|
updated_at="2026-01-01T00:00:00+00:00",
|
|
)
|
|
state = self._state(tmpdir, FakeRepo(task))
|
|
|
|
response_status, _, body = self._request(
|
|
"POST",
|
|
"/sessions/session-1/merge",
|
|
state,
|
|
body=json.dumps({}).encode("utf-8"),
|
|
headers={"Content-Type": "application/json"},
|
|
)
|
|
|
|
self.assertEqual(response_status, 400)
|
|
self.assertEqual(body["error"], "missing task_ids")
|
|
|
|
def test_post_session_merge_dispatches_to_action(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
task = Task(
|
|
id="task-1",
|
|
source_type="local_file",
|
|
source_path=str(Path(tmpdir) / "session" / "task-title" / "source.mp4"),
|
|
title="task-title",
|
|
status="published",
|
|
created_at="2026-01-01T00:00:00+00:00",
|
|
updated_at="2026-01-01T00:00:00+00:00",
|
|
)
|
|
state = self._state(tmpdir, FakeRepo(task))
|
|
|
|
with patch(
|
|
"biliup_next.app.api_server.merge_session_action",
|
|
return_value={"ok": True, "session_key": "session-1", "merged_task_ids": ["task-1", "task-2"]},
|
|
) as merge_mock:
|
|
response_status, _, body = self._request(
|
|
"POST",
|
|
"/sessions/session-1/merge",
|
|
state,
|
|
body=json.dumps({"task_ids": ["task-1", "task-2"]}).encode("utf-8"),
|
|
headers={"Content-Type": "application/json"},
|
|
)
|
|
|
|
self.assertEqual(response_status, 202)
|
|
self.assertEqual(body["merged_task_ids"], ["task-1", "task-2"])
|
|
merge_mock.assert_called_once_with("session-1", ["task-1", "task-2"])
|
|
|
|
def test_post_full_video_webhook_rejects_invalid_payload(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
task = Task(
|
|
id="task-1",
|
|
source_type="local_file",
|
|
source_path=str(Path(tmpdir) / "session" / "task-title" / "source.mp4"),
|
|
title="task-title",
|
|
status="published",
|
|
created_at="2026-01-01T00:00:00+00:00",
|
|
updated_at="2026-01-01T00:00:00+00:00",
|
|
)
|
|
state = self._state(tmpdir, FakeRepo(task))
|
|
|
|
response_status, _, body = self._request(
|
|
"POST",
|
|
"/webhooks/full-video-uploaded",
|
|
state,
|
|
body=json.dumps(["not-a-dict"]).encode("utf-8"),
|
|
headers={"Content-Type": "application/json"},
|
|
)
|
|
|
|
self.assertEqual(response_status, 400)
|
|
self.assertEqual(body["error"], "invalid payload")
|
|
|
|
def test_post_full_video_webhook_dispatches_to_action(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
task = Task(
|
|
id="task-1",
|
|
source_type="local_file",
|
|
source_path=str(Path(tmpdir) / "session" / "task-title" / "source.mp4"),
|
|
title="task-title",
|
|
status="published",
|
|
created_at="2026-01-01T00:00:00+00:00",
|
|
updated_at="2026-01-01T00:00:00+00:00",
|
|
)
|
|
state = self._state(tmpdir, FakeRepo(task))
|
|
|
|
with patch(
|
|
"biliup_next.app.api_server.receive_full_video_webhook",
|
|
return_value={"ok": True, "session_key": "session-1", "full_video_bvid": "BVFULL123"},
|
|
) as webhook_mock:
|
|
response_status, _, body = self._request(
|
|
"POST",
|
|
"/webhooks/full-video-uploaded",
|
|
state,
|
|
body=json.dumps({"session_key": "session-1", "full_video_bvid": "BVFULL123"}).encode("utf-8"),
|
|
headers={"Content-Type": "application/json"},
|
|
)
|
|
|
|
self.assertEqual(response_status, 202)
|
|
self.assertEqual(body["full_video_bvid"], "BVFULL123")
|
|
webhook_mock.assert_called_once_with({"session_key": "session-1", "full_video_bvid": "BVFULL123"})
|
|
|
|
def test_post_worker_run_once_records_action_and_returns_payload(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
task = Task(
|
|
id="task-1",
|
|
source_type="local_file",
|
|
source_path=str(Path(tmpdir) / "session" / "task-title" / "source.mp4"),
|
|
title="task-title",
|
|
status="created",
|
|
created_at="2026-01-01T00:00:00+00:00",
|
|
updated_at="2026-01-01T00:00:00+00:00",
|
|
)
|
|
repo = FakeRepo(task)
|
|
state = self._state(tmpdir, repo)
|
|
|
|
with patch("biliup_next.app.api_server.run_once", return_value={"worker": {"picked": 1}}) as run_once_mock:
|
|
response_status, _, body = self._request("POST", "/worker/run-once", state)
|
|
|
|
self.assertEqual(response_status, 202)
|
|
self.assertEqual(body["worker"]["picked"], 1)
|
|
self.assertEqual(repo.actions[-1].action_name, "worker_run_once")
|
|
run_once_mock.assert_called_once_with()
|
|
|
|
def test_post_scheduler_run_once_records_action_and_returns_payload(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
task = Task(
|
|
id="task-1",
|
|
source_type="local_file",
|
|
source_path=str(Path(tmpdir) / "session" / "task-title" / "source.mp4"),
|
|
title="task-title",
|
|
status="created",
|
|
created_at="2026-01-01T00:00:00+00:00",
|
|
updated_at="2026-01-01T00:00:00+00:00",
|
|
)
|
|
repo = FakeRepo(task)
|
|
state = self._state(tmpdir, repo)
|
|
|
|
with patch(
|
|
"biliup_next.app.api_server.run_once",
|
|
return_value={"scheduler": {"scan_count": 2}, "worker": {"picked": 0}},
|
|
) as run_once_mock:
|
|
response_status, _, body = self._request("POST", "/scheduler/run-once", state)
|
|
|
|
self.assertEqual(response_status, 202)
|
|
self.assertEqual(body["scheduler"]["scan_count"], 2)
|
|
self.assertEqual(repo.actions[-1].action_name, "scheduler_run_once")
|
|
run_once_mock.assert_called_once_with()
|
|
|
|
def test_post_runtime_service_maps_invalid_action_to_400(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
task = Task(
|
|
id="task-1",
|
|
source_type="local_file",
|
|
source_path=str(Path(tmpdir) / "session" / "task-title" / "source.mp4"),
|
|
title="task-title",
|
|
status="created",
|
|
created_at="2026-01-01T00:00:00+00:00",
|
|
updated_at="2026-01-01T00:00:00+00:00",
|
|
)
|
|
state = self._state(tmpdir, FakeRepo(task))
|
|
runtime = SimpleNamespace(act=lambda service, action: (_ for _ in ()).throw(ValueError("invalid action")))
|
|
|
|
with patch("biliup_next.app.api_server.SystemdRuntime", return_value=runtime):
|
|
response_status, _, body = self._request("POST", "/runtime/services/worker/restartx", state)
|
|
|
|
self.assertEqual(response_status, 400)
|
|
self.assertEqual(body["error"], "invalid action")
|
|
|
|
def test_post_runtime_service_records_action_and_returns_payload(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
task = Task(
|
|
id="task-1",
|
|
source_type="local_file",
|
|
source_path=str(Path(tmpdir) / "session" / "task-title" / "source.mp4"),
|
|
title="task-title",
|
|
status="created",
|
|
created_at="2026-01-01T00:00:00+00:00",
|
|
updated_at="2026-01-01T00:00:00+00:00",
|
|
)
|
|
repo = FakeRepo(task)
|
|
state = self._state(tmpdir, repo)
|
|
runtime = SimpleNamespace(act=lambda service, action: {"service": service, "action": action, "command_ok": True})
|
|
|
|
with patch("biliup_next.app.api_server.SystemdRuntime", return_value=runtime):
|
|
response_status, _, body = self._request("POST", "/runtime/services/worker/restart", state)
|
|
|
|
self.assertEqual(response_status, 202)
|
|
self.assertEqual(body["service"], "worker")
|
|
self.assertEqual(body["action"], "restart")
|
|
self.assertEqual(repo.actions[-1].action_name, "service_action")
|
|
|
|
def test_post_stage_import_requires_source_path(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
task = Task(
|
|
id="task-1",
|
|
source_type="local_file",
|
|
source_path=str(Path(tmpdir) / "session" / "task-title" / "source.mp4"),
|
|
title="task-title",
|
|
status="created",
|
|
created_at="2026-01-01T00:00:00+00:00",
|
|
updated_at="2026-01-01T00:00:00+00:00",
|
|
)
|
|
state = self._state(tmpdir, FakeRepo(task))
|
|
state["settings"]["paths"]["stage_dir"] = str(Path(tmpdir) / "stage")
|
|
state["settings"]["ingest"] = {"stage_min_free_space_mb": 100}
|
|
|
|
response_status, _, body = self._request(
|
|
"POST",
|
|
"/stage/import",
|
|
state,
|
|
body=json.dumps({}).encode("utf-8"),
|
|
headers={"Content-Type": "application/json"},
|
|
)
|
|
|
|
self.assertEqual(response_status, 400)
|
|
self.assertEqual(body["error"], "missing source_path")
|
|
|
|
def test_post_stage_import_records_action_and_returns_created(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
task = Task(
|
|
id="task-1",
|
|
source_type="local_file",
|
|
source_path=str(Path(tmpdir) / "session" / "task-title" / "source.mp4"),
|
|
title="task-title",
|
|
status="created",
|
|
created_at="2026-01-01T00:00:00+00:00",
|
|
updated_at="2026-01-01T00:00:00+00:00",
|
|
)
|
|
repo = FakeRepo(task)
|
|
state = self._state(tmpdir, repo)
|
|
stage_dir = Path(tmpdir) / "stage"
|
|
state["settings"]["paths"]["stage_dir"] = str(stage_dir)
|
|
state["settings"]["ingest"] = {"stage_min_free_space_mb": 100}
|
|
importer = SimpleNamespace(import_file=lambda path, dest, min_free_bytes=0: {"imported_to": str(dest / path.name)})
|
|
|
|
with patch("biliup_next.app.api_server.StageImporter", return_value=importer):
|
|
response_status, _, body = self._request(
|
|
"POST",
|
|
"/stage/import",
|
|
state,
|
|
body=json.dumps({"source_path": str(Path(tmpdir) / "incoming.mp4")}).encode("utf-8"),
|
|
headers={"Content-Type": "application/json"},
|
|
)
|
|
|
|
self.assertEqual(response_status, 201)
|
|
self.assertIn("incoming.mp4", body["imported_to"])
|
|
self.assertEqual(repo.actions[-1].action_name, "stage_import")
|
|
|
|
def test_post_stage_upload_requires_multipart_content_type(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
task = Task(
|
|
id="task-1",
|
|
source_type="local_file",
|
|
source_path=str(Path(tmpdir) / "session" / "task-title" / "source.mp4"),
|
|
title="task-title",
|
|
status="created",
|
|
created_at="2026-01-01T00:00:00+00:00",
|
|
updated_at="2026-01-01T00:00:00+00:00",
|
|
)
|
|
state = self._state(tmpdir, FakeRepo(task))
|
|
|
|
response_status, _, body = self._request(
|
|
"POST",
|
|
"/stage/upload",
|
|
state,
|
|
body=b"plain",
|
|
headers={"Content-Type": "text/plain"},
|
|
)
|
|
|
|
self.assertEqual(response_status, 400)
|
|
self.assertEqual(body["error"], "content-type must be multipart/form-data")
|
|
|
|
def test_post_stage_upload_records_action_and_returns_created(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
task = Task(
|
|
id="task-1",
|
|
source_type="local_file",
|
|
source_path=str(Path(tmpdir) / "session" / "task-title" / "source.mp4"),
|
|
title="task-title",
|
|
status="created",
|
|
created_at="2026-01-01T00:00:00+00:00",
|
|
updated_at="2026-01-01T00:00:00+00:00",
|
|
)
|
|
repo = FakeRepo(task)
|
|
state = self._state(tmpdir, repo)
|
|
stage_dir = Path(tmpdir) / "stage"
|
|
state["settings"]["paths"]["stage_dir"] = str(stage_dir)
|
|
state["settings"]["ingest"] = {"stage_min_free_space_mb": 100}
|
|
importer = SimpleNamespace(import_upload=lambda filename, fileobj, dest, min_free_bytes=0: {"filename": filename, "dest": str(dest)})
|
|
file_item = SimpleNamespace(filename="incoming.mp4", file=io.BytesIO(b"fake-video-content"))
|
|
|
|
class FakeFieldStorage(dict):
|
|
def __contains__(self, key: object) -> bool:
|
|
return key == "file"
|
|
|
|
def __getitem__(self, key: str):
|
|
if key == "file":
|
|
return file_item
|
|
raise KeyError(key)
|
|
|
|
with patch("biliup_next.app.api_server.StageImporter", return_value=importer):
|
|
with patch("biliup_next.app.api_server.cgi.FieldStorage", return_value=FakeFieldStorage()):
|
|
response_status, _, payload = self._request(
|
|
"POST",
|
|
"/stage/upload",
|
|
state,
|
|
body=b"ignored",
|
|
headers={"Content-Type": "multipart/form-data; boundary=----biliupnexttest"},
|
|
)
|
|
|
|
self.assertEqual(response_status, 201)
|
|
self.assertEqual(payload["filename"], "incoming.mp4")
|
|
self.assertEqual(repo.actions[-1].action_name, "stage_upload")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|