from __future__ import annotations import json from datetime import datetime from pathlib import Path from typing import Any from biliup_next.core.models import Artifact, PublishRecord, TaskContext, utc_now_iso from biliup_next.core.registry import Registry from biliup_next.infra.task_repository import TaskRepository class PublishService: def __init__(self, registry: Registry, repo: TaskRepository): self.registry = registry self.repo = repo def run(self, task_id: str, settings: dict[str, object]) -> PublishRecord: task = self.repo.get_task(task_id) if task is None: raise RuntimeError(f"task not found: {task_id}") provider = self.registry.get("publish_provider", str(settings.get("provider", "biliup_cli"))) started_at = utc_now_iso() self.repo.update_step_status(task_id, "publish", "running", started_at=started_at) session_contexts = self._session_contexts(task_id) if len(session_contexts) <= 1: clip_videos = self._clip_videos_for_task(task_id) record = provider.publish(task, clip_videos, settings) self._persist_publish_success(task_id, task.title, record, settings) return record anchor_context = session_contexts[0] shared_bvid = self._shared_session_bvid(session_contexts, settings) if task_id != anchor_context.task_id and shared_bvid: record = PublishRecord( id=None, task_id=task_id, platform="bilibili", aid=None, bvid=shared_bvid, title=task.title, published_at=utc_now_iso(), ) self._persist_publish_success(task_id, task.title, record, settings) return record clip_videos = self._session_clip_videos(session_contexts) anchor_task = self.repo.get_task(anchor_context.task_id) if anchor_task is None: raise RuntimeError(f"anchor task not found: {anchor_context.task_id}") session_settings = dict(settings) session_settings.update(self._session_publish_metadata(anchor_task, session_contexts, settings)) record = provider.publish(anchor_task, clip_videos, session_settings) for context in session_contexts: session_task = self.repo.get_task(context.task_id) if session_task is None: continue session_record = PublishRecord( id=None, task_id=context.task_id, platform=record.platform, aid=record.aid, bvid=record.bvid, title=record.title, published_at=record.published_at, ) self._persist_publish_success(context.task_id, session_task.title, session_record, settings) return PublishRecord( id=None, task_id=task_id, platform=record.platform, aid=record.aid, bvid=record.bvid, title=record.title, published_at=record.published_at, ) def _persist_publish_success(self, task_id: str, task_title: str, record: PublishRecord, settings: dict[str, object]) -> None: self.repo.add_publish_record(record) if record.bvid: session_dir = Path(str(settings.get("session_dir", "session"))) / task_title session_dir.mkdir(parents=True, exist_ok=True) bvid_path_obj = session_dir / "bvid.txt" bvid_path_obj.write_text(record.bvid, encoding="utf-8") self.repo.add_artifact( Artifact( id=None, task_id=task_id, artifact_type="publish_bvid", path=str(bvid_path_obj.resolve()), metadata_json=json.dumps({}), created_at=utc_now_iso(), ) ) finished_at = utc_now_iso() self.repo.update_step_status(task_id, "publish", "succeeded", finished_at=finished_at) self.repo.update_task_status(task_id, "published", finished_at) def _session_contexts(self, task_id: str) -> list[TaskContext]: context = self.repo.get_task_context(task_id) if context is None or not context.session_key or context.session_key.startswith("task:"): return [context] if context is not None else [] contexts = list(self.repo.list_task_contexts_by_session_key(context.session_key)) return sorted( contexts, key=lambda item: ( self._parse_started_at(item.segment_started_at), item.source_title or item.task_id, ), ) def _clip_videos_for_task(self, task_id: str) -> list[Artifact]: artifacts = self.repo.list_artifacts(task_id) return [artifact for artifact in artifacts if artifact.artifact_type == "clip_video"] def _session_clip_videos(self, contexts: list[TaskContext]) -> list[Artifact]: aggregated: list[Artifact] = [] for context in contexts: aggregated.extend(self._clip_videos_for_task(context.task_id)) return aggregated def _shared_session_bvid(self, contexts: list[TaskContext], settings: dict[str, object]) -> str | None: session_dir_root = Path(str(settings.get("session_dir", "session"))) for context in contexts: task = self.repo.get_task(context.task_id) if task is None: continue bvid_path = session_dir_root / task.title / "bvid.txt" if bvid_path.exists(): bvid = bvid_path.read_text(encoding="utf-8").strip() if bvid.startswith("BV"): return bvid return None def _session_publish_metadata( self, anchor_task, contexts: list[TaskContext], settings: dict[str, object], ) -> dict[str, Any]: # type: ignore[no-untyped-def] session_dir_root = Path(str(settings.get("session_dir", "session"))) anchor_work_dir = (session_dir_root / anchor_task.title).resolve() anchor_work_dir.mkdir(parents=True, exist_ok=True) aggregate_txt_lines: list[str] = [] aggregate_songs: list[dict[str, object]] = [] for index, context in enumerate(contexts, start=1): task = self.repo.get_task(context.task_id) if task is None: continue task_work_dir = (session_dir_root / task.title).resolve() songs_txt = task_work_dir / "songs.txt" songs_json = task_work_dir / "songs.json" if songs_txt.exists(): lines = [line.strip() for line in songs_txt.read_text(encoding="utf-8").splitlines() if line.strip()] if lines: aggregate_txt_lines.append(f"P{index}:") aggregate_txt_lines.extend(lines) aggregate_txt_lines.append("") if songs_json.exists(): try: songs = json.loads(songs_json.read_text(encoding="utf-8")).get("songs", []) except json.JSONDecodeError: songs = [] if isinstance(songs, list): aggregate_songs.extend(song for song in songs if isinstance(song, dict)) aggregate_txt_path = anchor_work_dir / "session_split_songs.txt" aggregate_json_path = anchor_work_dir / "session_split_songs.json" aggregate_txt_path.write_text("\n".join(aggregate_txt_lines).strip() + "\n", encoding="utf-8") aggregate_json_path.write_text( json.dumps({"songs": aggregate_songs}, ensure_ascii=False, indent=2) + "\n", encoding="utf-8", ) return { "publish_songs_txt_path": str(aggregate_txt_path), "publish_songs_json_path": str(aggregate_json_path), } @staticmethod def _parse_started_at(value: str | None) -> datetime: if not value: return datetime.max try: return datetime.fromisoformat(value) except ValueError: return datetime.max