import os import time import subprocess import json import shutil from pathlib import Path from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler from logger import get_system_logger, get_ai_logger, log_exception # ========================================== # 接口配置 (Interface Configuration) # ========================================== SESSION_DIR = r'./session' # 监控的工作区目录 CHECK_INTERVAL = 2 # 轮询频率 CODEX_CMD = "codex" # 如果报错,可以尝试改为 "codex.cmd" DONE_FLAG = "transcribe_done.flag" # 监听这个标记 # 初始化日志 logger = get_system_logger('monitorSrt') # ========================================== # 定义输出数据的 JSON Schema SONG_SCHEMA = { "type": "object", "properties": { "songs": { "type": "array", "items": { "type": "object", "properties": { "start": {"type": "string"}, "end": {"type": "string"}, "title": {"type": "string"}, "artist": {"type": "string"}, "confidence": {"type": "number"}, "evidence": {"type": "string"} }, "required": ["start", "end", "title", "artist", "confidence", "evidence"], "additionalProperties": False } } }, "required": ["songs"], "additionalProperties": False } TASK_PROMPT = """你是音乐片段识别助手。当前目录下有一个字幕文件。 任务: 1. 结合字幕内容并允许联网搜索进行纠错(识别同音字、唱错等)。 2. 识别出直播中唱过的所有歌曲,给出精确的开始和结束时间。 3. 同一首歌间隔 ≤30s 合并,>30s 分开。 4. 忽略纯聊天片段。 5. 无法确认的歌曲用 UNKNOWN 标注并在 evidence 说明。 最后请严格按照 Schema 生成 JSON 数据。""" # ========================================== class SrtHandler(FileSystemEventHandler): def on_created(self, event): # 修改:不再看 .srt,改为看 .flag if not event.is_directory and event.src_path.endswith(DONE_FLAG): logger.debug(f"检测到转录完成标记: {event.src_path}") self.process_with_codex(Path(event.src_path)) # if not event.is_directory and event.src_path.lower().endswith('.srt'): # self.process_with_codex(Path(event.src_path)) def on_moved(self, event): # 针对有些程序是先生成临时文件再重命名的情况 if not event.is_directory and event.dest_path.lower().endswith('.srt'): logger.debug(f"检测到字幕文件移动: {event.dest_path}") self.process_with_codex(Path(event.dest_path)) def process_with_codex(self, srt_path): work_dir = srt_path.parent # 避免对同一目录重复调用 if (work_dir / "songs.json").exists(): logger.info(f"songs.json 已存在,跳过: {work_dir.name}") return logger.info(f"发现新字幕,准备识别歌曲: {work_dir.name}") # 创建AI日志 ai_log, ai_log_file = get_ai_logger('codex', 'songs') ai_log.info("="*50) ai_log.info("Codex 歌曲识别任务开始") ai_log.info(f"工作目录: {work_dir}") ai_log.info("="*50) logger.debug("准备 Schema 文件...") ai_log.info("生成 JSON Schema") # 在当前目录下生成临时 Schema 文件供 Codex 参考 schema_file = work_dir / "song_schema.json" with open(schema_file, "w", encoding="utf-8") as f: json.dump(SONG_SCHEMA, f, ensure_ascii=False, indent=2) ai_log.info(f"Schema 文件: {schema_file.name}") logger.info("调用 Codex (Non-interactive mode)...") ai_log.info("开始 Codex 执行") ai_log.info(f"命令: {CODEX_CMD} exec") ai_log.info(f"任务提示: {TASK_PROMPT[:100]}...") # 构建命令行参数 # 注意:Windows 下为了防止 shell 解析错误,提示词尽量保持在一行 cmd = [ CODEX_CMD, "exec", TASK_PROMPT.replace('\n', ' '), "--full-auto", "--sandbox", "workspace-write", "--output-schema", "./song_schema.json", "-o", "songs.json", "--skip-git-repo-check", "--json" # 启用 JSON 输出以获取详细日志 ] ai_log.info(f"完整命令: {subprocess.list2cmdline(cmd)}") try: # 使用 shell=True 解决 Windows 下找不到 .cmd 脚本的问题 # 使用 subprocess.list2cmdline 将列表安全转为字符串 # process_cmd = subprocess.list2cmdline(cmd) # start_time = time.time() # result = subprocess.run( # process_cmd, # cwd=str(work_dir), # shell=False, # capture_output=True, # text=True, # encoding='utf-8' # ) # 2. 修改调用逻辑(去掉 list2cmdline) try: start_time = time.time() result = subprocess.run( cmd, # 直接传列表,不要传字符串 cwd=str(work_dir), shell=False, # 在 Linux 上,传列表时 shell 必须为 False 或不设置 capture_output=True, text=True, encoding='utf-8' ) elapsed = time.time() - start_time ai_log.info(f"Codex 执行完成,耗时: {elapsed:.2f}秒") ai_log.info(f"返回码: {result.returncode}") # 解析并记录 Codex 的 JSON 输出 if result.stdout: ai_log.info("=== Codex 执行日志 ===") for line in result.stdout.strip().split('\n'): if line.strip(): try: # 尝试解析 JSONL 格式的事件 event = json.loads(line) event_type = event.get('type', 'unknown') # 根据事件类型记录不同级别的日志 if event_type == 'error': ai_log.error(f"Codex Error: {json.dumps(event, ensure_ascii=False)}") elif event_type in ['tool_use', 'command_execution', 'file_operation']: ai_log.info(f"Codex Action: {json.dumps(event, ensure_ascii=False)}") else: ai_log.debug(f"Codex Event: {json.dumps(event, ensure_ascii=False)}") except json.JSONDecodeError: # 如果不是 JSON 格式,直接记录原始行 ai_log.info(line) if result.stderr: ai_log.warning("=== STDERR ===") for line in result.stderr.strip().split('\n'): if line.strip(): ai_log.warning(line) if result.returncode == 0: logger.info(f"Codex 执行成功: {work_dir.name}") ai_log.info("Codex 执行成功") self.generate_txt_fallback(work_dir, ai_log) else: logger.error(f"Codex 返回错误码 {result.returncode}") logger.error(f"错误详情: {result.stderr.strip() or result.stdout.strip()}") ai_log.error(f"Codex 执行失败,错误码: {result.returncode}") except Exception as e: log_exception(logger, e, "Codex 调用异常") log_exception(ai_log, e, "Codex 执行异常") ai_log.info("="*50) ai_log.info("Codex 歌曲识别任务完成") ai_log.info("="*50) logger.info(f"AI日志已保存: {ai_log_file}") def generate_txt_fallback(self, work_dir, ai_log): """解析生成的 JSON 并同步创建 B 站评论格式的 txt""" json_path = work_dir / "songs.json" txt_path = work_dir / "songs.txt" try: if json_path.exists(): with open(json_path, 'r', encoding='utf-8') as f: data = json.load(f) songs = data.get('songs', []) logger.info(f"识别到 {len(songs)} 首歌曲") ai_log.info(f"解析结果: {len(songs)} 首歌曲") with open(txt_path, 'w', encoding='utf-8') as t: for s in songs: # 将 SRT 时间格式 (00:00:00,360) 转为 B 站格式 (00:00:00) start_time = s['start'].split(',')[0] # 去掉毫秒部分 line = f"{start_time} {s['title']} — {s['artist']}\n" t.write(line) ai_log.debug(f" {s['title']} — {s['artist']} ({start_time})") logger.info(f"成功生成: {txt_path.name}") ai_log.info(f"生成 songs.txt 成功") except Exception as e: log_exception(logger, e, "生成 txt 失败") log_exception(ai_log, e, "生成 songs.txt 失败") def main(): path = Path(SESSION_DIR) if not path.exists(): path.mkdir(parents=True) logger.info("="*50) logger.info("字幕监控模块启动 (Codex 歌曲识别)") logger.info("="*50) logger.info(f"监控目录: {SESSION_DIR}") logger.info(f"Codex 命令: {CODEX_CMD}") event_handler = SrtHandler() observer = Observer() observer.schedule(event_handler, str(path), recursive=True) observer.start() logger.info("文件监控已启动") try: while True: time.sleep(CHECK_INTERVAL) except KeyboardInterrupt: logger.info("接收到停止信号,正在关闭...") observer.stop() observer.join() logger.info("字幕监控模块已停止") if __name__ == "__main__": main()