Files
biliup-next/tests/test_workspace_cleanup.py

117 lines
4.8 KiB
Python

from __future__ import annotations
import tempfile
import unittest
from pathlib import Path
from types import SimpleNamespace
from biliup_next.core.models import Task, utc_now_iso
from biliup_next.infra.workspace_cleanup import WorkspaceCleanupService
class _FakeRepo:
def __init__(self, tasks: list[Task], session_key: str | None = None) -> None:
self.tasks = {task.id: task for task in tasks}
self.session_key = session_key
self.deleted_artifacts: list[tuple[str, str]] = []
self.deleted_artifact_paths: list[tuple[str, str]] = []
def get_task(self, task_id: str) -> Task | None:
return self.tasks.get(task_id)
def get_task_context(self, task_id: str): # noqa: ANN201
if self.session_key is None or task_id not in self.tasks:
return None
return SimpleNamespace(task_id=task_id, session_key=self.session_key)
def list_task_contexts_by_session_key(self, session_key: str): # noqa: ANN201
if session_key != self.session_key:
return []
return [SimpleNamespace(task_id=task_id, session_key=session_key) for task_id in self.tasks]
def delete_artifacts(self, task_id: str, artifact_type: str) -> None:
self.deleted_artifacts.append((task_id, artifact_type))
def delete_artifact_by_path(self, task_id: str, path: str) -> None:
self.deleted_artifact_paths.append((task_id, path))
def _make_task(task_id: str, root: Path) -> Task:
now = utc_now_iso()
work_dir = root / task_id
work_dir.mkdir(parents=True)
source = work_dir / "source.mp4"
source.write_bytes(b"source")
for dirname in ("split_video", "publish_video"):
video_dir = work_dir / dirname
video_dir.mkdir()
(video_dir / "01_song.mp4").write_bytes(b"clip")
return Task(task_id, "local_file", str(source), task_id, "collection_synced", now, now)
class WorkspaceCleanupServiceTests(unittest.TestCase):
def test_cleanup_removes_source_split_and_publish_video_for_single_task(self) -> None:
with tempfile.TemporaryDirectory() as tmpdir:
root = Path(tmpdir)
task = _make_task("task-1", root)
repo = _FakeRepo([task])
result = WorkspaceCleanupService(repo).cleanup_task_outputs(
task.id,
{
"delete_source_video_after_collection_synced": True,
"delete_split_videos_after_collection_synced": True,
},
)
work_dir = root / "task-1"
self.assertFalse((work_dir / "source.mp4").exists())
self.assertFalse((work_dir / "split_video").exists())
self.assertFalse((work_dir / "publish_video").exists())
self.assertEqual(result["task_ids"], ["task-1"])
self.assertEqual(repo.deleted_artifacts, [("task-1", "clip_video")])
self.assertEqual(repo.deleted_artifact_paths, [("task-1", str((work_dir / "source.mp4").resolve()))])
def test_cleanup_removes_all_tasks_in_same_session(self) -> None:
with tempfile.TemporaryDirectory() as tmpdir:
root = Path(tmpdir)
task_1 = _make_task("task-1", root)
task_2 = _make_task("task-2", root)
repo = _FakeRepo([task_1, task_2], session_key="session-1")
result = WorkspaceCleanupService(repo).cleanup_task_outputs(
task_1.id,
{
"delete_source_video_after_collection_synced": True,
"delete_split_videos_after_collection_synced": True,
},
)
for task_id in ("task-1", "task-2"):
work_dir = root / task_id
self.assertFalse((work_dir / "source.mp4").exists())
self.assertFalse((work_dir / "split_video").exists())
self.assertFalse((work_dir / "publish_video").exists())
self.assertEqual(result["task_ids"], ["task-1", "task-2"])
self.assertEqual(repo.deleted_artifacts, [("task-1", "clip_video"), ("task-2", "clip_video")])
def test_cleanup_skips_missing_source_video(self) -> None:
with tempfile.TemporaryDirectory() as tmpdir:
root = Path(tmpdir)
task = _make_task("task-1", root)
source = Path(task.source_path)
source.unlink()
repo = _FakeRepo([task])
result = WorkspaceCleanupService(repo).cleanup_task_outputs(
task.id,
{
"delete_source_video_after_collection_synced": True,
"delete_split_videos_after_collection_synced": False,
},
)
self.assertIn(str(source.resolve()), result["skipped"])
self.assertEqual(repo.deleted_artifact_paths, [])
if __name__ == "__main__":
unittest.main()