from __future__ import annotations import cgi import json import mimetypes from http import HTTPStatus from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from pathlib import Path from urllib.parse import parse_qs, unquote, urlparse from biliup_next.app.task_actions import reset_to_step_action from biliup_next.app.task_actions import retry_step_action from biliup_next.app.task_actions import run_task_action from biliup_next.app.bootstrap import ensure_initialized from biliup_next.app.dashboard import render_dashboard_html from biliup_next.app.retry_meta import retry_meta_for_step from biliup_next.app.scheduler import build_scheduler_preview from biliup_next.app.worker import run_once from biliup_next.core.config import SettingsService from biliup_next.core.models import ActionRecord, utc_now_iso from biliup_next.infra.log_reader import LogReader from biliup_next.infra.runtime_doctor import RuntimeDoctor from biliup_next.infra.stage_importer import StageImporter from biliup_next.infra.storage_guard import mb_to_bytes from biliup_next.infra.systemd_runtime import SystemdRuntime class ApiHandler(BaseHTTPRequestHandler): server_version = "biliup-next/0.1" def _task_payload(self, task_id: str, state: dict[str, object]) -> dict[str, object] | None: task = state["repo"].get_task(task_id) if task is None: return None payload = task.to_dict() retry_state = self._task_retry_state(task_id, state) if retry_state: payload["retry_state"] = retry_state payload["delivery_state"] = self._task_delivery_state(task_id, state) return payload def _step_payload(self, step, state: dict[str, object]) -> dict[str, object]: # type: ignore[no-untyped-def] payload = step.to_dict() retry_meta = retry_meta_for_step(step, state["settings"]) if retry_meta: payload.update(retry_meta) return payload def _task_retry_state(self, task_id: str, state: dict[str, object]) -> dict[str, object] | None: for step in state["repo"].list_steps(task_id): retry_meta = retry_meta_for_step(step, state["settings"]) if retry_meta: return {"step_name": step.step_name, **retry_meta} return None def _task_delivery_state(self, task_id: str, state: dict[str, object]) -> dict[str, object]: task = state["repo"].get_task(task_id) if task is None: return {} session_dir = Path(str(state["settings"]["paths"]["session_dir"])) / task.title source_path = Path(task.source_path) split_dir = session_dir / "split_video" legacy_comment_done = (session_dir / "comment_done.flag").exists() def comment_status(flag_name: str, *, enabled: bool) -> str: if not enabled: return "disabled" if flag_name == "comment_full_done.flag" and legacy_comment_done and not (session_dir / flag_name).exists(): return "legacy_untracked" return "done" if (session_dir / flag_name).exists() else "pending" return { "split_comment": comment_status("comment_split_done.flag", enabled=state["settings"]["comment"].get("post_split_comment", True)), "full_video_timeline_comment": comment_status( "comment_full_done.flag", enabled=state["settings"]["comment"].get("post_full_video_timeline_comment", True), ), "full_video_bvid_resolved": (session_dir / "full_video_bvid.txt").exists(), "source_video_present": source_path.exists(), "split_videos_present": split_dir.exists(), "cleanup_enabled": { "delete_source_video_after_collection_synced": state["settings"].get("cleanup", {}).get("delete_source_video_after_collection_synced", False), "delete_split_videos_after_collection_synced": state["settings"].get("cleanup", {}).get("delete_split_videos_after_collection_synced", False), }, } def _serve_asset(self, asset_name: str) -> None: root = ensure_initialized()["root"] asset_path = root / "src" / "biliup_next" / "app" / "static" / asset_name if not asset_path.exists(): self._json({"error": "asset not found"}, status=HTTPStatus.NOT_FOUND) return content_type = self._guess_content_type(asset_path) self.send_response(HTTPStatus.OK) self.send_header("Content-Type", content_type) self.end_headers() self.wfile.write(asset_path.read_bytes()) def _guess_content_type(self, path: Path) -> str: guessed, _ = mimetypes.guess_type(path.name) if guessed: if guessed.startswith("text/") or guessed in {"application/javascript", "application/json"}: return f"{guessed}; charset=utf-8" return guessed return "application/octet-stream" def _frontend_dist_dir(self) -> Path: root = ensure_initialized()["root"] return root / "frontend" / "dist" def _frontend_dist_ready(self) -> bool: dist = self._frontend_dist_dir() return (dist / "index.html").exists() def _serve_frontend_dist(self, parsed_path: str) -> bool: dist = self._frontend_dist_dir() if not (dist / "index.html").exists(): return False if parsed_path in {"/ui", "/ui/"}: self._html((dist / "index.html").read_text(encoding="utf-8")) return True if not parsed_path.startswith("/ui/"): return False relative = parsed_path.removeprefix("/ui/") asset_path = dist / relative if asset_path.exists() and asset_path.is_file(): body = asset_path.read_bytes() self.send_response(HTTPStatus.OK) self.send_header("Content-Type", self._guess_content_type(asset_path)) self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) return True if "." not in Path(relative).name: self._html((dist / "index.html").read_text(encoding="utf-8")) return True self._json({"error": "frontend asset not found"}, status=HTTPStatus.NOT_FOUND) return True def do_GET(self) -> None: # noqa: N802 parsed = urlparse(self.path) if parsed.path.startswith("/ui") and self._serve_frontend_dist(parsed.path): return if not self._check_auth(parsed.path): return if parsed.path.startswith("/assets/"): self._serve_asset(parsed.path.removeprefix("/assets/")) return if parsed.path == "/": self._html(render_dashboard_html()) return if parsed.path == "/health": self._json({"ok": True}) return if parsed.path == "/settings": state = ensure_initialized() service = SettingsService(state["root"]) self._json(service.load_redacted().settings) return if parsed.path == "/settings/schema": state = ensure_initialized() service = SettingsService(state["root"]) self._json(service.load().schema) return if parsed.path == "/doctor": doctor = RuntimeDoctor(ensure_initialized()["root"]) self._json(doctor.run()) return if parsed.path == "/runtime/services": self._json(SystemdRuntime().list_services()) return if parsed.path == "/scheduler/preview": state = ensure_initialized() self._json(build_scheduler_preview(state, include_stage_scan=False, limit=200)) return if parsed.path == "/logs": query = parse_qs(parsed.query) name = query.get("name", [None])[0] if not name: self._json(LogReader().list_logs()) return lines = int(query.get("lines", ["200"])[0]) contains = query.get("contains", [None])[0] self._json(LogReader().tail(name, lines, contains)) return if parsed.path == "/history": state = ensure_initialized() query = parse_qs(parsed.query) limit = int(query.get("limit", ["100"])[0]) task_id = query.get("task_id", [None])[0] action_name = query.get("action_name", [None])[0] status = query.get("status", [None])[0] items = [ item.to_dict() for item in state["repo"].list_action_records( task_id=task_id, limit=limit, action_name=action_name, status=status, ) ] self._json({"items": items}) return if parsed.path == "/modules": state = ensure_initialized() self._json({"items": state["registry"].list_manifests(), "discovered_manifests": state["manifests"]}) return if parsed.path == "/tasks": state = ensure_initialized() query = parse_qs(parsed.query) limit = int(query.get("limit", ["100"])[0]) tasks = [self._task_payload(task.id, state) for task in state["repo"].list_tasks(limit=limit)] self._json({"items": tasks}) return if parsed.path.startswith("/tasks/"): state = ensure_initialized() parts = [unquote(p) for p in parsed.path.split("/") if p] if len(parts) == 2: task = self._task_payload(parts[1], state) if task is None: self._json({"error": "task not found"}, status=HTTPStatus.NOT_FOUND) return self._json(task) return if len(parts) == 3 and parts[2] == "steps": steps = [self._step_payload(step, state) for step in state["repo"].list_steps(parts[1])] self._json({"items": steps}) return if len(parts) == 3 and parts[2] == "artifacts": artifacts = [artifact.to_dict() for artifact in state["repo"].list_artifacts(parts[1])] self._json({"items": artifacts}) return if len(parts) == 3 and parts[2] == "history": actions = [item.to_dict() for item in state["repo"].list_action_records(parts[1], limit=100)] self._json({"items": actions}) return if len(parts) == 3 and parts[2] == "timeline": task = state["repo"].get_task(parts[1]) if task is None: self._json({"error": "task not found"}, status=HTTPStatus.NOT_FOUND) return steps = state["repo"].list_steps(parts[1]) artifacts = state["repo"].list_artifacts(parts[1]) actions = state["repo"].list_action_records(parts[1], limit=200) items: list[dict[str, object]] = [] if task.created_at: items.append({ "kind": "task", "time": task.created_at, "title": "Task Created", "summary": task.title, "status": task.status, }) if task.updated_at and task.updated_at != task.created_at: items.append({ "kind": "task", "time": task.updated_at, "title": "Task Updated", "summary": task.status, "status": task.status, }) for step in steps: if step.started_at: items.append({ "kind": "step", "time": step.started_at, "title": f"{step.step_name} started", "summary": step.status, "status": step.status, }) if step.finished_at: retry_meta = retry_meta_for_step(step, state["settings"]) retry_note = "" if retry_meta and retry_meta.get("next_retry_at"): retry_note = f" | next retry: {retry_meta['next_retry_at']}" items.append({ "kind": "step", "time": step.finished_at, "title": f"{step.step_name} finished", "summary": f"{step.error_message or step.status}{retry_note}", "status": step.status, "retry_state": retry_meta, }) for artifact in artifacts: if artifact.created_at: items.append({ "kind": "artifact", "time": artifact.created_at, "title": artifact.artifact_type, "summary": artifact.path, "status": "created", }) for action in actions: summary = action.summary try: details = json.loads(action.details_json or "{}") except json.JSONDecodeError: details = {} if action.action_name == "comment" and isinstance(details, dict): split_status = details.get("split", {}).get("status") full_status = details.get("full", {}).get("status") fragments = [] if split_status: fragments.append(f"split={split_status}") if full_status: fragments.append(f"full={full_status}") if fragments: summary = f"{summary} | {' '.join(fragments)}" if action.action_name in {"collection_a", "collection_b"} and isinstance(details, dict): cleanup = details.get("result", {}).get("cleanup") or details.get("cleanup") if isinstance(cleanup, dict): removed = cleanup.get("removed") or [] if removed: summary = f"{summary} | cleanup removed={len(removed)}" items.append({ "kind": "action", "time": action.created_at, "title": action.action_name, "summary": summary, "status": action.status, }) items.sort(key=lambda item: str(item["time"]), reverse=True) self._json({"items": items}) return self._json({"error": "not found"}, status=HTTPStatus.NOT_FOUND) def do_PUT(self) -> None: # noqa: N802 parsed = urlparse(self.path) if not self._check_auth(parsed.path): return if parsed.path != "/settings": self._json({"error": "not found"}, status=HTTPStatus.NOT_FOUND) return length = int(self.headers.get("Content-Length", "0")) payload = json.loads(self.rfile.read(length) or b"{}") root = ensure_initialized()["root"] service = SettingsService(root) service.save_staged_from_redacted(payload) service.promote_staged() self._json({"ok": True}) def do_POST(self) -> None: # noqa: N802 parsed = urlparse(self.path) if not self._check_auth(parsed.path): return if parsed.path != "/tasks": if parsed.path.startswith("/tasks/"): parts = [unquote(p) for p in parsed.path.split("/") if p] if len(parts) == 4 and parts[0] == "tasks" and parts[2] == "actions": task_id = parts[1] action = parts[3] if action == "run": result = run_task_action(task_id) self._json(result, status=HTTPStatus.ACCEPTED) return if action == "retry-step": length = int(self.headers.get("Content-Length", "0")) payload = json.loads(self.rfile.read(length) or b"{}") step_name = payload.get("step_name") if not step_name: self._json({"error": "missing step_name"}, status=HTTPStatus.BAD_REQUEST) return result = retry_step_action(task_id, step_name) self._json(result, status=HTTPStatus.ACCEPTED) return if action == "reset-to-step": length = int(self.headers.get("Content-Length", "0")) payload = json.loads(self.rfile.read(length) or b"{}") step_name = payload.get("step_name") if not step_name: self._json({"error": "missing step_name"}, status=HTTPStatus.BAD_REQUEST) return result = reset_to_step_action(task_id, step_name) self._json(result, status=HTTPStatus.ACCEPTED) return if parsed.path == "/worker/run-once": payload = run_once() self._record_action(None, "worker_run_once", "ok", "worker run once invoked", payload) self._json(payload, status=HTTPStatus.ACCEPTED) return if parsed.path.startswith("/runtime/services/"): parts = [unquote(p) for p in parsed.path.split("/") if p] if len(parts) == 4 and parts[0] == "runtime" and parts[1] == "services": try: payload = SystemdRuntime().act(parts[2], parts[3]) except ValueError as exc: self._json({"error": str(exc)}, status=HTTPStatus.BAD_REQUEST) return self._record_action(None, "service_action", "ok" if payload.get("command_ok") else "error", f"{parts[3]} {parts[2]}", payload) self._json(payload, status=HTTPStatus.ACCEPTED) return if parsed.path == "/stage/import": length = int(self.headers.get("Content-Length", "0")) payload = json.loads(self.rfile.read(length) or b"{}") source_path = payload.get("source_path") if not source_path: self._json({"error": "missing source_path"}, status=HTTPStatus.BAD_REQUEST) return state = ensure_initialized() stage_dir = Path(state["settings"]["paths"]["stage_dir"]) try: result = StageImporter().import_file(Path(source_path), stage_dir) except Exception as exc: self._json({"error": str(exc)}, status=HTTPStatus.BAD_REQUEST) return self._record_action(None, "stage_import", "ok", "imported file into stage", result) self._json(result, status=HTTPStatus.CREATED) return if parsed.path == "/stage/upload": content_type = self.headers.get("Content-Type", "") if "multipart/form-data" not in content_type: self._json({"error": "content-type must be multipart/form-data"}, status=HTTPStatus.BAD_REQUEST) return form = cgi.FieldStorage( fp=self.rfile, headers=self.headers, environ={ "REQUEST_METHOD": "POST", "CONTENT_TYPE": content_type, "CONTENT_LENGTH": self.headers.get("Content-Length", "0"), }, ) file_item = form["file"] if "file" in form else None if file_item is None or not getattr(file_item, "filename", None): self._json({"error": "missing file"}, status=HTTPStatus.BAD_REQUEST) return state = ensure_initialized() stage_dir = Path(state["settings"]["paths"]["stage_dir"]) try: result = StageImporter().import_upload(file_item.filename, file_item.file, stage_dir) except Exception as exc: self._json({"error": str(exc)}, status=HTTPStatus.BAD_REQUEST) return self._record_action(None, "stage_upload", "ok", "uploaded file into stage", result) self._json(result, status=HTTPStatus.CREATED) return if parsed.path == "/scheduler/run-once": result = run_once() self._record_action(None, "scheduler_run_once", "ok", "scheduler run once completed", result.get("scheduler", {})) self._json(result, status=HTTPStatus.ACCEPTED) return self._json({"error": "not found"}, status=HTTPStatus.NOT_FOUND) return length = int(self.headers.get("Content-Length", "0")) payload = json.loads(self.rfile.read(length) or b"{}") source_path = payload.get("source_path") if not source_path: self._json({"error": "missing source_path"}, status=HTTPStatus.BAD_REQUEST) return state = ensure_initialized() try: task = state["ingest_service"].create_task_from_file( Path(source_path), state["settings"]["ingest"], ) except Exception as exc: # keep API small for now status = HTTPStatus.CONFLICT if exc.__class__.__name__ == "ModuleError" else HTTPStatus.INTERNAL_SERVER_ERROR payload = exc.to_dict() if hasattr(exc, "to_dict") else {"error": str(exc)} self._json(payload, status=status) return self._json(task.to_dict(), status=HTTPStatus.CREATED) def log_message(self, format: str, *args) -> None: # noqa: A003 return def _json(self, payload: object, status: HTTPStatus = HTTPStatus.OK) -> None: body = json.dumps(payload, ensure_ascii=False, indent=2).encode("utf-8") self.send_response(status) self.send_header("Content-Type", "application/json; charset=utf-8") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) def _html(self, html: str, status: HTTPStatus = HTTPStatus.OK) -> None: body = html.encode("utf-8") self.send_response(status) self.send_header("Content-Type", "text/html; charset=utf-8") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) def _record_action(self, task_id: str | None, action_name: str, status: str, summary: str, details: dict[str, object]) -> None: state = ensure_initialized() state["repo"].add_action_record( ActionRecord( id=None, task_id=task_id, action_name=action_name, status=status, summary=summary, details_json=json.dumps(details, ensure_ascii=False), created_at=utc_now_iso(), ) ) def _check_auth(self, path: str) -> bool: if path in {"/", "/health", "/ui", "/ui/"} or path.startswith("/assets/") or path.startswith("/ui/assets/"): return True state = ensure_initialized() expected = str(state["settings"]["runtime"].get("control_token", "")).strip() if not expected: return True provided = self.headers.get("X-Biliup-Token", "").strip() if provided == expected: return True self._json({"error": "unauthorized"}, status=HTTPStatus.UNAUTHORIZED) return False def serve(host: str, port: int) -> None: ensure_initialized() server = ThreadingHTTPServer((host, port), ApiHandler) print(f"biliup-next api listening on http://{host}:{port}") server.serve_forever()