from __future__ import annotations import shutil from biliup_next.infra.task_repository import TaskRepository from biliup_next.infra.workspace_paths import resolve_task_work_dir class WorkspaceCleanupService: def __init__(self, repo: TaskRepository): self.repo = repo def cleanup_task_outputs(self, task_id: str, settings: dict[str, object]) -> dict[str, object]: task = self.repo.get_task(task_id) if task is None: raise RuntimeError(f"task not found: {task_id}") session_dir = resolve_task_work_dir(task) removed: list[str] = [] skipped: list[str] = [] if settings.get("delete_source_video_after_collection_synced", False): source_path = Path(task.source_path).resolve() try: source_path.relative_to(session_dir) source_managed = True except ValueError: source_managed = False if source_path.exists() and source_managed: source_path.unlink() self.repo.delete_artifact_by_path(task_id, str(source_path.resolve())) removed.append(str(source_path)) else: skipped.append(str(source_path)) if settings.get("delete_split_videos_after_collection_synced", False): split_dir = session_dir / "split_video" if split_dir.exists(): shutil.rmtree(split_dir, ignore_errors=True) self.repo.delete_artifacts(task_id, "clip_video") removed.append(str(split_dir)) else: skipped.append(str(split_dir)) return {"removed": removed, "skipped": skipped}