import os import time import subprocess import json import shutil from pathlib import Path from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler from logger import get_system_logger, get_ai_logger, log_exception # ========================================== # 接口配置 (Interface Configuration) # ========================================== SESSION_DIR = r'./session' # 监控的工作区目录 CHECK_INTERVAL = 2 # 轮询频率 CODEX_CMD = "/home/theshy/.nvm/versions/node/v22.13.0/bin/codex" # Linux 下通常直接用 codex DONE_FLAG = "transcribe_done.flag" # 监听这个标记 # 初始化日志 logger = get_system_logger('monitorSrt') # ========================================== # 定义输出数据的 JSON Schema SONG_SCHEMA = { "type": "object", "properties": { "songs": { "type": "array", "items": { "type": "object", "properties": { "start": {"type": "string"}, "end": {"type": "string"}, "title": {"type": "string"}, "artist": {"type": "string"}, "confidence": {"type": "number"}, "evidence": {"type": "string"} }, "required": ["start", "end", "title", "artist", "confidence", "evidence"], "additionalProperties": False } } }, "required": ["songs"], "additionalProperties": False } TASK_PROMPT = """你是音乐片段识别助手。当前目录下有一个字幕文件。 任务: 1. 结合字幕内容并允许联网搜索进行纠错(识别同音字、唱错等)。 2. 识别出直播中唱过的所有歌曲,给出精确的开始和结束时间。歌曲开始时间规则: - 歌曲开始时间应使用“上一句字幕的结束时间”作为 start_time。 - 这样可以尽量保留歌曲可能存在的前奏。 3. 同一首歌间隔 ≤160s 合并,>160s 分开。若连续识别出相同歌曲,且中间只有短暂对白、空白、转场或无歌词段,应合并为同一首歌. 4. 忽略纯聊天片段。 5. 无法确认的歌曲丢弃,宁缺毋滥:你的输出将直接面向最终用户。 6. 忽略短片段:如果一段演唱持续时间总和少于 15 秒,视为随口哼唱,请直接忽略,不计入列表。 7. 仔细分析每一句歌词,识别出相关歌曲后, 使用该歌曲歌词上下文对比字幕上下文,确定歌曲起始与停止时间 8.歌曲标注规则: - 可以在歌曲名称后使用括号 () 添加补充说明。 - 常见标注示例: - (片段):歌曲演唱时间较短,例如 < 60 秒 - (清唱):无伴奏演唱 - (副歌):只演唱副歌部分 - 标注应简洁,仅在确有必要时使用。 9. 通过歌曲起始和结束时间自检, 一般歌曲长度在5分钟以内, 1分钟以上, 可疑片段重新联网搜索检查. 最后请严格按照 Schema 生成 JSON 数据。""" # ========================================== class SrtHandler(FileSystemEventHandler): def on_created(self, event): if not event.is_directory: src_path = event.src_path if isinstance(src_path, bytes): src_path = src_path.decode('utf-8') if src_path.endswith(DONE_FLAG): logger.debug(f"检测到转录完成标记: {src_path}") self.process_with_codex(Path(src_path)) def on_moved(self, event): dest_path = event.dest_path if isinstance(dest_path, bytes): dest_path = dest_path.decode('utf-8') if not event.is_directory and dest_path.lower().endswith('.srt'): logger.debug(f"检测到字幕文件移动: {dest_path}") self.process_with_codex(Path(dest_path)) def process_with_codex(self, srt_path): work_dir = srt_path.parent # 避免对同一目录重复调用 if (work_dir / "songs.json").exists(): logger.info(f"songs.json 已存在,跳过: {work_dir.name}") return logger.info(f"发现新任务,准备识别歌曲: {work_dir.name}") # 创建AI日志 ai_log, ai_log_file = get_ai_logger('codex', 'songs') ai_log.info("="*50) ai_log.info("Codex 歌曲识别任务开始") ai_log.info(f"工作目录: {work_dir}") ai_log.info("="*50) # 生成临时 Schema 文件 schema_file = work_dir / "song_schema.json" with open(schema_file, "w", encoding="utf-8") as f: json.dump(SONG_SCHEMA, f, ensure_ascii=False, indent=2) # 构建命令行参数 (Linux 下必须使用列表形式) cmd = [ CODEX_CMD, "exec", TASK_PROMPT.replace('\n', ' '), "--full-auto", "--sandbox", "workspace-write", "--output-schema", "./song_schema.json", "-o", "songs.json", "--skip-git-repo-check", "--json" ] logger.info("调用 Codex...") ai_log.info(f"执行命令: {subprocess.list2cmdline(cmd)}") try: start_time = time.time() # 关键修改:shell=False + 直接传列表,解决 "File name too long" 错误 result = subprocess.run( cmd, cwd=str(work_dir), shell=False, capture_output=True, text=True, encoding='utf-8' ) elapsed = time.time() - start_time ai_log.info(f"Codex 执行完成,耗时: {elapsed:.2f}秒") # 记录输出 if result.stdout: ai_log.info("=== STDOUT ===") ai_log.info(result.stdout) if result.stderr: ai_log.warning("=== STDERR ===") ai_log.warning(result.stderr) if result.returncode == 0: logger.info(f"Codex 执行成功: {work_dir.name}") self.generate_txt_fallback(work_dir, ai_log) else: logger.error(f"Codex 失败,返回码: {result.returncode}") ai_log.error(f"Codex 失败,返回码: {result.returncode}") except Exception as e: log_exception(logger, e, "Codex 调用异常") log_exception(ai_log, e, "Codex 执行异常") ai_log.info("="*50) ai_log.info("Codex 歌曲识别任务完成") ai_log.info("="*50) def generate_txt_fallback(self, work_dir, ai_log): json_path = work_dir / "songs.json" txt_path = work_dir / "songs.txt" try: if json_path.exists(): with open(json_path, 'r', encoding='utf-8') as f: data = json.load(f) songs = data.get('songs', []) with open(txt_path, 'w', encoding='utf-8') as t: for s in songs: start_time = s['start'].split(',')[0].split('.')[0] # 兼容点号和逗号 line = f"{start_time} {s['title']} — {s['artist']}\n" t.write(line) logger.info(f"成功生成: {txt_path.name}") except Exception as e: log_exception(logger, e, "生成 txt 失败") def main(): path = Path(SESSION_DIR) path.mkdir(parents=True, exist_ok=True) logger.info("="*50) logger.info("字幕监控模块启动 (Linux 优化版)") logger.info("="*50) event_handler = SrtHandler() # 启动扫描:检查是否有 flag 但没 songs.json 的存量目录 logger.info("正在扫描存量任务...") for sub_dir in path.iterdir(): if sub_dir.is_dir(): flag = sub_dir / DONE_FLAG json_file = sub_dir / "songs.json" if flag.exists() and not json_file.exists(): logger.info(f"发现存量任务: {sub_dir.name}") event_handler.process_with_codex(flag) observer = Observer() observer.schedule(event_handler, str(path), recursive=True) observer.start() try: while True: time.sleep(CHECK_INTERVAL) except KeyboardInterrupt: observer.stop() observer.join() if __name__ == "__main__": main()