qwen3tts.py 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725
  1. from __future__ import annotations
  2. import time
  3. import numpy as np
  4. import soundfile as sf
  5. import resampy
  6. import asyncio
  7. import os
  8. import queue
  9. import json
  10. import hashlib
  11. import re
  12. from queue import Queue
  13. from io import BytesIO
  14. from threading import Thread, Event
  15. from enum import Enum
  16. from typing import TYPE_CHECKING
  17. if TYPE_CHECKING:
  18. from basereal import BaseReal
  19. from logger import logger
  20. class State(Enum):
  21. RUNNING = 0
  22. PAUSE = 1
  23. class Qwen3TTS:
  24. def __init__(self, opt, parent: BaseReal):
  25. self.opt = opt
  26. self.parent = parent
  27. self.fps = opt.fps # 20 ms per frame
  28. self.sample_rate = 16000
  29. self.chunk = self.sample_rate // self.fps # 320 samples per chunk (20ms * 16000 / 1000)
  30. self.input_stream = BytesIO()
  31. self.msgqueue = Queue()
  32. self.high_priority_queue = Queue()
  33. self.image_description_queue = Queue() # 超优先级队列,用于图像描述
  34. self.is_playing_image_description = False # 标记是否正在播放图像描述
  35. self.state = State.RUNNING
  36. self.interrupted_messages = []
  37. self.current_msg = None
  38. self.current_msg_progress = 0
  39. self.interrupt_flag = Event()
  40. # VoxCPM2 配置(替代 Qwen3-TTS API)
  41. self.voxcpm2_model_path = getattr(opt, 'VOXCPM2_MODEL_PATH', 'VoxCPM2')
  42. self.ref_audio_path = getattr(opt, 'VOXCPM2_REF_WAV', 'voice_output.wav')
  43. self.ref_text = getattr(opt, 'VOXCPM2_REF_TEXT', '你好,买水果,卖水果,新鲜的水果。')
  44. self.cfg_value = getattr(opt, 'CFG_VALUE', 2.5)
  45. self.inference_timesteps = getattr(opt, 'INFERENCE_TIMESTEPS', 8)
  46. # 加载 VoxCPM2 模型
  47. self.voxcpm2_model = None
  48. self._load_voxcpm2_model()
  49. # 预生成音频配置
  50. self.pre_gen_dir = "./data/pre_generated_tts"
  51. self.audio_map = self._load_audio_map()
  52. self.use_pre_gen = len(self.audio_map) > 0
  53. # Image_Analysis 音频目录配置(修改为 wav/wav 子目录)
  54. self.image_analysis_dir = "/mnt/nvme1data/Digital_Human/Image_Analysis/wav/wav"
  55. self.played_audio_files = set() # 已播放的音频文件集合
  56. # 预生成音频循环播放配置
  57. self.pre_gen_index = 0 # 当前播放索引
  58. self.is_playing_pre_gen_loop = False # 是否在循环播放预生成音频
  59. logger.info(f"Qwen3TTS 初始化完成:")
  60. logger.info(f" VoxCPM2 模型路径:{self.voxcpm2_model_path}")
  61. logger.info(f" 参考音频:{self.ref_audio_path}")
  62. logger.info(f" 参考文本:{self.ref_text}")
  63. logger.info(f" CFG 值:{self.cfg_value}")
  64. logger.info(f" 推理步数:{self.inference_timesteps}")
  65. logger.info(f" 预生成音频:{len(self.audio_map)} 条")
  66. logger.info(f" 是否使用预生成:{self.use_pre_gen}")
  67. logger.info(f" Image_Analysis 目录:{self.image_analysis_dir}")
  68. logger.info(f" 预生成循环播放:启用")
  69. def _load_voxcpm2_model(self):
  70. """加载 VoxCPM2 模型(单例模式)"""
  71. try:
  72. import torch
  73. from voxcpm import VoxCPM
  74. # 禁用 TorchDynamo,避免 scaled_dot_product_attention 兼容性错误
  75. torch._dynamo.config.suppress_errors = True
  76. torch.compiler.disable()
  77. logger.info(f"🔄 加载 VoxCPM2 模型: {self.voxcpm2_model_path}")
  78. logger.info(" 禁用 TorchDynamo 以避免兼容性问题")
  79. # 清理显存
  80. if torch.cuda.is_available():
  81. torch.cuda.empty_cache()
  82. logger.info(f" 清理显存完成")
  83. # 加载模型
  84. self.voxcpm2_model = VoxCPM.from_pretrained(
  85. self.voxcpm2_model_path,
  86. load_denoiser=False,
  87. optimize=False
  88. )
  89. logger.info("✅ VoxCPM2 模型加载成功")
  90. except Exception as e:
  91. logger.error(f"❌ VoxCPM2 模型加载失败: {e}")
  92. logger.error(" 请检查模型路径是否正确,以及 voxcpm 包是否已安装")
  93. raise
  94. def reset_interrupt_flag(self):
  95. """重置打断标志"""
  96. self.interrupt_flag.clear()
  97. def set_interrupt_flag(self):
  98. """设置打断标志"""
  99. self.interrupt_flag.set()
  100. def flush_talk(self):
  101. """停止当前播放并清空待处理的消息队列"""
  102. with self.msgqueue.mutex:
  103. remaining_msgs = list(self.msgqueue.queue)
  104. if remaining_msgs:
  105. self.interrupted_messages.extend(remaining_msgs)
  106. self.msgqueue.queue.clear()
  107. if self.current_msg:
  108. self.interrupted_messages.append(self.current_msg)
  109. self.current_msg = None
  110. self.state = State.PAUSE
  111. if hasattr(self, 'input_stream') and hasattr(self.input_stream, 'seek') and hasattr(self.input_stream, 'truncate'):
  112. self.input_stream.seek(0)
  113. self.input_stream.truncate()
  114. def resume_interrupted(self):
  115. """恢复播放被中断的消息"""
  116. if self.interrupted_messages:
  117. with self.msgqueue.mutex:
  118. for msg in self.interrupted_messages:
  119. self.msgqueue.put(msg)
  120. self.interrupted_messages.clear()
  121. self.state = State.RUNNING
  122. return True
  123. return False
  124. def _load_audio_map(self) -> dict:
  125. """加载预生成音频映射表"""
  126. map_path = os.path.join(self.pre_gen_dir, "audio_map.json")
  127. if os.path.exists(map_path):
  128. try:
  129. with open(map_path, 'r', encoding='utf-8') as f:
  130. return json.load(f)
  131. except Exception as e:
  132. logger.warning(f"加载预生成音频映射表失败: {e}")
  133. return {}
  134. def _get_pre_generated_audio(self, text: str):
  135. """获取预生成的音频数据,如果不存在返回 None"""
  136. if not self.use_pre_gen:
  137. return None
  138. text_hash = hashlib.md5(text.encode('utf-8')).hexdigest()[:16]
  139. audio_path = os.path.join(self.pre_gen_dir, f"{text_hash}.wav")
  140. if not os.path.exists(audio_path):
  141. return None
  142. try:
  143. # 读取音频文件
  144. stream, sample_rate = sf.read(audio_path, dtype='float32')
  145. # 转为单声道
  146. if stream.ndim > 1:
  147. stream = stream[:, 0]
  148. # 重采样到 16kHz
  149. if sample_rate != self.sample_rate and stream.shape[0] > 0:
  150. stream = resampy.resample(x=stream, sr_orig=sample_rate, sr_new=self.sample_rate)
  151. return stream
  152. except Exception as e:
  153. logger.warning(f"读取预生成音频失败:{e}")
  154. return None
  155. def _get_newest_image_analysis_audio(self):
  156. """获取最新的未播放的图片分析音频文件"""
  157. if not os.path.exists(self.image_analysis_dir):
  158. return None
  159. # 获取所有 wav 文件并按修改时间排序
  160. try:
  161. audio_files = []
  162. all_files = os.listdir(self.image_analysis_dir)
  163. logger.info(f"🔍 Image_Analysis 目录共有 {len(all_files)} 个文件")
  164. logger.info(f" 已播放文件数:{len(self.played_audio_files)}")
  165. for f in all_files:
  166. if f.endswith('.wav'):
  167. file_path = os.path.join(self.image_analysis_dir, f)
  168. # 【修改】尝试从文件名中提取时间戳(格式:hifi_clone_YYYYMMDD_HHMMSS.wav)
  169. match = re.search(r'hifi_clone_(\d{8}_\d{6})', f)
  170. if match:
  171. # 从文件名解析时间
  172. time_str = match.group(1)
  173. file_timestamp = time.mktime(time.strptime(time_str, "%Y%m%d_%H%M%S"))
  174. logger.info(f" 📁 {f} - 文件名时间:{time.strftime('%H:%M:%S', time.localtime(file_timestamp))}, 已播放:{f in self.played_audio_files}")
  175. else:
  176. # 如果文件名没有时间戳,使用文件修改时间
  177. file_timestamp = os.path.getmtime(file_path)
  178. logger.info(f" 📁 {f} - 修改时间:{time.strftime('%H:%M:%S', time.localtime(file_timestamp))}, 已播放:{f in self.played_audio_files}")
  179. is_played = f in self.played_audio_files
  180. if not is_played:
  181. audio_files.append((file_path, file_timestamp, f))
  182. if not audio_files:
  183. logger.info(" ⚠️ 没有未播放的音频文件")
  184. return None
  185. # 按时间戳排序,最新的在前
  186. audio_files.sort(key=lambda x: x[1], reverse=True)
  187. newest_file = audio_files[0][2]
  188. logger.info(f" ✅ 发现最新未播放文件:{newest_file}")
  189. # 返回最新的文件
  190. return audio_files[0]
  191. except Exception as e:
  192. logger.error(f"获取图片分析音频失败:{e}", exc_info=True)
  193. return None
  194. def _get_next_pre_gen_audio(self):
  195. """获取下一条预生成音频(循环播放)"""
  196. if not self.use_pre_gen or len(self.audio_map) == 0:
  197. return None
  198. try:
  199. # 获取所有预生成音频文件
  200. audio_files = sorted([
  201. f for f in os.listdir(self.pre_gen_dir)
  202. if f.endswith('.wav')
  203. ])
  204. if not audio_files:
  205. return None
  206. # 循环索引
  207. audio_file = audio_files[self.pre_gen_index % len(audio_files)]
  208. self.pre_gen_index += 1
  209. # 读取音频
  210. audio_path = os.path.join(self.pre_gen_dir, audio_file)
  211. stream, sample_rate = sf.read(audio_path, dtype='float32')
  212. # 转为单声道
  213. if stream.ndim > 1:
  214. stream = stream[:, 0]
  215. # 重采样到 16kHz
  216. if sample_rate != self.sample_rate and stream.shape[0] > 0:
  217. stream = resampy.resample(x=stream, sr_orig=sample_rate, sr_new=self.sample_rate)
  218. return stream
  219. except Exception as e:
  220. logger.error(f"获取预生成音频失败:{e}")
  221. return None
  222. def _play_image_analysis_audio(self):
  223. """播放图片分析音频(次高优先级)"""
  224. result = self._get_newest_image_analysis_audio()
  225. if result is None:
  226. return False
  227. file_path, file_timestamp, filename = result
  228. try:
  229. read_start = time.time() # 【时间戳】开始读取文件
  230. logger.info(f"🎵 播放图片分析音频:{filename}")
  231. logger.info(f" 音频文件路径:{file_path}")
  232. logger.info(f" 音频修改时间:{file_timestamp}")
  233. # 读取音频文件
  234. stream, sample_rate = sf.read(file_path, dtype='float32')
  235. read_end = time.time() # 【时间戳】读取完成
  236. logger.info(f" 音频原始信息:sample_rate={sample_rate}, shape={stream.shape}, duration={stream.shape[0]/sample_rate:.2f}s")
  237. logger.info(f" ⏱️ 文件读取耗时:{(read_end - read_start)*1000:.1f}毫秒")
  238. # 转为单声道
  239. if stream.ndim > 1:
  240. stream = stream[:, 0]
  241. logger.info(f" 转换为单声道")
  242. # 重采样到 16kHz
  243. resample_start = time.time() # 【时间戳】开始重采样
  244. if sample_rate != self.sample_rate and stream.shape[0] > 0:
  245. stream = resampy.resample(x=stream, sr_orig=sample_rate, sr_new=self.sample_rate)
  246. logger.info(f" 重采样到 16kHz")
  247. resample_end = time.time() # 【时间戳】重采样完成
  248. logger.info(f" ⏱️ 重采样耗时:{(resample_end - resample_start)*1000:.1f}毫秒")
  249. logger.info(f" 处理后音频信息:shape={stream.shape}, chunks={stream.shape[0]//self.chunk}")
  250. # 播放音频流
  251. eventpoint = {'status': 'start', 'text': '图片描述'}
  252. logger.info(f"▶️ 开始调用 _play_audio_stream")
  253. completed = self._play_audio_stream(stream, f"[图片描述] {filename}", eventpoint)
  254. logger.info(f"◀️ _play_audio_stream 返回,completed={completed}")
  255. # 标记为已播放
  256. self.played_audio_files.add(filename)
  257. if completed:
  258. logger.info(f"✅ 图片分析音频播放完成:{filename}")
  259. else:
  260. logger.info(f"⚠️ 图片分析音频被中断:{filename}")
  261. return completed
  262. except Exception as e:
  263. logger.error(f"播放图片分析音频失败:{e}", exc_info=True)
  264. return False
  265. def put_msg_txt(self, msg: str, datainfo: dict = {}):
  266. """将文本消息放入队列"""
  267. if len(msg) > 0:
  268. if len(msg) > 100:
  269. sentences = re.split(r'([。!?.!?])', msg)
  270. parts = []
  271. for i in range(0, len(sentences) - 1, 2):
  272. sentence = sentences[i]
  273. punctuation = sentences[i + 1] if i + 1 < len(sentences) else ''
  274. if sentence.strip():
  275. parts.append(sentence.strip() + punctuation)
  276. if len(parts) > 1:
  277. for part in parts:
  278. if part.strip():
  279. self.msgqueue.put((part, datainfo))
  280. return
  281. self.msgqueue.put((msg, datainfo))
  282. def put_high_priority_msg(self, msg: str, datainfo: dict = {}):
  283. """添加高优先级消息"""
  284. if len(msg) > 0:
  285. if len(msg) > 100:
  286. sentences = re.split(r'([。!?.!?])', msg)
  287. parts = []
  288. for i in range(0, len(sentences) - 1, 2):
  289. sentence = sentences[i]
  290. punctuation = sentences[i + 1] if i + 1 < len(sentences) else ''
  291. if sentence.strip():
  292. parts.append(sentence.strip() + punctuation)
  293. if len(parts) > 1:
  294. for part in parts:
  295. if part.strip():
  296. self.high_priority_queue.put((part, datainfo))
  297. return
  298. self.high_priority_queue.put((msg, datainfo))
  299. def put_image_description(self, msg: str, datainfo: dict = {}):
  300. """添加图像描述消息,具有最高优先级,会立即中断当前播放"""
  301. if len(msg) > 0:
  302. logger.info(f"收到图像描述消息,长度:{len(msg)}字")
  303. # 图像描述通常是短句,不需要分割
  304. self.image_description_queue.put((msg, datainfo))
  305. def render(self, quit_event):
  306. """启动 TTS 处理线程"""
  307. process_thread = Thread(target=self.process_tts, args=(quit_event,))
  308. process_thread.start()
  309. def process_tts(self, quit_event):
  310. """处理 TTS 消息队列"""
  311. # 标记是否正在播放预生成音频,用于支持中断后恢复
  312. was_playing_pre_gen = False
  313. while not quit_event.is_set():
  314. try:
  315. msg = None
  316. # ★★★★★ 优先级 1: 用户问答(最高优先级,随时中断任何播放)
  317. if not self.high_priority_queue.empty():
  318. msg = self.high_priority_queue.get_nowait()
  319. logger.info("★★★★★ 处理用户问答(最高优先级) ★★★★★")
  320. self.state = State.RUNNING
  321. # 播放用户问答
  322. if msg and len(msg) >= 2 and isinstance(msg[0], str) and msg[0].strip():
  323. self.txt_to_audio(msg)
  324. # 用户问答播放完成后,如果有预生成音频,继续播放
  325. if was_playing_pre_gen:
  326. logger.info("用户问答播放完成,恢复播放预生成音频")
  327. continue
  328. # ★★★ 优先级 2: Image_Analysis 新音频(次高优先级,可中断预生成音频)
  329. check_start_time = time.time() # 【时间戳】开始检查的时间
  330. newest_audio = self._get_newest_image_analysis_audio()
  331. if newest_audio is not None:
  332. file_path, file_timestamp, filename = newest_audio
  333. detect_time = time.time() # 【时间戳】检测到文件的时间
  334. # 【新增】等待文件写入完成
  335. # 检查文件大小是否稳定(500ms内没有变化)
  336. logger.info(f"⏳ 等待文件写入完成...")
  337. wait_start = time.time()
  338. last_size = 0
  339. stable_count = 0
  340. max_wait = 10 # 最多等待10秒
  341. while (time.time() - wait_start) < max_wait:
  342. try:
  343. current_size = os.path.getsize(file_path)
  344. if current_size == last_size and current_size > 0:
  345. stable_count += 1
  346. if stable_count >= 3: # 连续3次大小不变,认为文件写入完成
  347. wait_end = time.time()
  348. logger.info(f"✅ 文件写入完成,等待耗时:{(wait_end - wait_start)*1000:.0f}ms,文件大小:{current_size} bytes")
  349. break
  350. else:
  351. stable_count = 0
  352. last_size = current_size
  353. time.sleep(0.15) # 每150ms检查一次
  354. except:
  355. time.sleep(0.15)
  356. time_since_creation = detect_time - file_timestamp # 【计算】文件生成到检测到的时间差
  357. logger.info("★★★ 发现新的图片分析音频,立即播放 ★★★")
  358. logger.info(f" 文件名:{filename}")
  359. logger.info(f" 文件创建时间(从文件名):{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(file_timestamp))}")
  360. logger.info(f" 检测到文件时间:{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(detect_time))}.{int(detect_time % 1 * 1000):03d}")
  361. logger.info(f" ⏱️ 文件生成到检测耗时:{time_since_creation:.3f}秒")
  362. logger.info(f" 检查开始时间:{time.strftime('%H:%M:%S', time.localtime(check_start_time))}.{int(check_start_time % 1 * 1000):03d}")
  363. logger.info(f" ⏱️ 检查耗时:{(detect_time - check_start_time)*1000:.1f}毫秒")
  364. was_playing_pre_gen = True # 标记之前正在播放预生成音频
  365. play_start_time = time.time() # 【时间戳】开始播放准备的时间
  366. self._play_image_analysis_audio()
  367. play_end_time = time.time() # 【时间戳】播放完成的时间
  368. logger.info("=" * 80)
  369. logger.info("⏱️⏱️⏱️ 【图片分析音频时间统计】 ⏱️⏱️⏱️")
  370. logger.info(f" 文件创建时间:{time.strftime('%H:%M:%S', time.localtime(file_timestamp))}")
  371. logger.info(f" 开始播放时间:{time.strftime('%H:%M:%S', time.localtime(play_start_time))}.{int(play_start_time % 1 * 1000):03d}")
  372. logger.info(f" 播放完成时间:{time.strftime('%H:%M:%S', time.localtime(play_end_time))}.{int(play_end_time % 1 * 1000):03d}")
  373. logger.info(f" 📊 从文件生成到开始播放:{play_start_time - file_timestamp:.3f}秒")
  374. logger.info(f" 📊 播放处理耗时(读取+转换):{play_start_time - detect_time:.3f}秒")
  375. logger.info(f" 📊 实际播放时长:{play_end_time - play_start_time:.3f}秒")
  376. logger.info(f" 📊 总耗时(生成到播放完):{play_end_time - file_timestamp:.3f}秒")
  377. logger.info("=" * 80)
  378. logger.info("图片描述播放完成,将继续播放预生成音频")
  379. continue
  380. # ★ 优先级 3: 循环播放预生成音频(基础优先级,默认状态)
  381. pre_gen_stream = self._get_next_pre_gen_audio()
  382. if pre_gen_stream is not None:
  383. logger.info(f"★ 循环播放预生成音频 #{self.pre_gen_index}")
  384. eventpoint = {'status': 'start', 'text': '预生成介绍'}
  385. was_playing_pre_gen = True
  386. self._play_audio_stream(pre_gen_stream, "[预生成介绍]", eventpoint)
  387. continue
  388. # 没有预生成音频时,重置标记并等待普通消息
  389. was_playing_pre_gen = False
  390. try:
  391. msg = self.msgqueue.get(block=True, timeout=0.5)
  392. except queue.Empty:
  393. continue
  394. if msg and len(msg) >= 2 and isinstance(msg[0], str) and not msg[0].strip():
  395. continue
  396. self.current_msg = msg
  397. self.current_msg_progress = 0
  398. # 处理普通消息
  399. if self.state == State.RUNNING:
  400. self.txt_to_audio(msg)
  401. self.current_msg = None
  402. self.current_msg_progress = 0
  403. except Exception as e:
  404. logger.error(f"process_tts 错误:{e}")
  405. continue
  406. logger.info('qwen3tts thread stop')
  407. def _play_audio_stream(self, stream: np.ndarray, text: str, textevent: dict) -> bool:
  408. """播放音频流,返回是否完整播放(带速度控制)"""
  409. streamlen = stream.shape[0]
  410. idx = 0
  411. total_chunks = streamlen // self.chunk
  412. chunk_count = 0
  413. logger.info(f" 📦 音频流总帧数:{streamlen}, chunk 大小:{self.chunk}, 预计发送 {total_chunks} 个 chunk")
  414. logger.info(f" ⏱️ 预计播放时长:{total_chunks * 0.02:.2f}秒")
  415. start_time = time.time()
  416. expected_frame_time = 0.0 # 期望的帧时间戳
  417. frame_duration = 0.02 # 20ms per frame
  418. while streamlen >= self.chunk and self.state == State.RUNNING:
  419. if self.interrupt_flag.is_set() or not self.high_priority_queue.empty():
  420. logger.info(" ⚠️ 播放过程中发现高优先级消息,中断播放")
  421. return False
  422. eventpoint = {}
  423. streamlen -= self.chunk
  424. chunk_count += 1
  425. if idx == 0:
  426. eventpoint = {'status': 'start', 'text': text}
  427. eventpoint.update(**textevent)
  428. logger.info(f" ▶️ 开始播放第 {chunk_count}/{total_chunks} 个 chunk (START)")
  429. elif streamlen < self.chunk:
  430. eventpoint = {'status': 'end', 'text': text}
  431. eventpoint.update(**textevent)
  432. logger.info(f" ◀️ 开始播放第 {chunk_count}/{total_chunks} 个 chunk (END)")
  433. else:
  434. # 中间的 chunk,每 10 个记录一次
  435. if chunk_count % 10 == 1:
  436. logger.info(f" ➡️ 播放第 {chunk_count}/{total_chunks} 个 chunk")
  437. if self.state != State.RUNNING:
  438. logger.info(f" ⚠️ 状态不是 RUNNING,停止播放")
  439. return False
  440. if self.interrupt_flag.is_set() or not self.high_priority_queue.empty():
  441. logger.info(" ⚠️ 发送音频帧前发现高优先级消息,中断播放")
  442. return False
  443. # 发送音频帧
  444. self.parent.put_audio_frame(stream[idx:idx + self.chunk], eventpoint)
  445. idx += self.chunk
  446. # 速度控制:等待到正确的播放时间点
  447. expected_frame_time += frame_duration
  448. elapsed = time.time() - start_time
  449. delay = expected_frame_time - elapsed
  450. if delay > 0.005: # 如果延迟超过 5ms,则等待
  451. # 计算需要等待的时间
  452. sleep_time = min(delay, 0.04) # 最多等待 40ms
  453. time.sleep(sleep_time)
  454. elif delay < -0.1: # 如果落后超过 100ms,跳过一些帧追赶
  455. logger.warning(f" ⚠️ 播放落后 {abs(delay):.3f}s,跳过一些帧以追赶进度")
  456. # 不重置 expected_frame_time,让下一帧继续按正常节奏发送
  457. actual_duration = time.time() - start_time
  458. logger.info(f" ✅ 所有 {chunk_count} 个 chunk 发送完成,实际耗时:{actual_duration:.2f}s (预期:{chunk_count * 0.02:.2f}s)")
  459. return True
  460. def txt_to_audio(self, msg: tuple[str, dict]):
  461. """将文本转换为音频(优先使用预生成音频)"""
  462. text, textevent = msg
  463. t = time.time()
  464. if self.state != State.RUNNING:
  465. return
  466. if not text.strip():
  467. return
  468. self.reset_interrupt_flag()
  469. if not self.high_priority_queue.empty():
  470. logger.info("发现高优先级消息,跳过当前普通消息处理")
  471. return
  472. try:
  473. # 首先尝试使用预生成音频
  474. pre_gen_stream = self._get_pre_generated_audio(text)
  475. if pre_gen_stream is not None:
  476. # 使用预生成音频
  477. logger.info(f"使用预生成音频: {text[:30]}...")
  478. if self.state != State.RUNNING:
  479. return
  480. if self.interrupt_flag.is_set() or not self.high_priority_queue.empty():
  481. return
  482. self._play_audio_stream(pre_gen_stream, text, textevent)
  483. logger.info(f'-------预生成音频播放完成,耗时:{time.time()-t:.4f}s')
  484. return
  485. # 没有预生成音频,调用 API 生成
  486. logger.info(f"调用 API 生成音频: {text[:30]}...")
  487. loop = asyncio.new_event_loop()
  488. asyncio.set_event_loop(loop)
  489. task = loop.create_task(self.__generate_audio(text))
  490. while not task.done():
  491. if self.interrupt_flag.is_set() or not self.high_priority_queue.empty():
  492. task.cancel()
  493. logger.info("TTS 请求被高优先级消息中断")
  494. break
  495. loop.run_until_complete(asyncio.sleep(0.05))
  496. try:
  497. loop.run_until_complete(task)
  498. except asyncio.CancelledError:
  499. logger.info("TTS 请求被中断")
  500. self.input_stream.seek(0)
  501. self.input_stream.truncate()
  502. return
  503. logger.info(f'-------qwen3 tts time:{time.time()-t:.4f}s')
  504. if self.input_stream.getbuffer().nbytes <= 0:
  505. logger.error('qwen3 tts err!!!!!')
  506. return
  507. self.input_stream.seek(0)
  508. stream = self.__create_bytes_stream(self.input_stream)
  509. if self.state != State.RUNNING:
  510. self.input_stream.seek(0)
  511. self.input_stream.truncate()
  512. return
  513. if self.interrupt_flag.is_set() or not self.high_priority_queue.empty():
  514. self.input_stream.seek(0)
  515. self.input_stream.truncate()
  516. return
  517. self._play_audio_stream(stream, text, textevent)
  518. self.input_stream.seek(0)
  519. self.input_stream.truncate()
  520. except Exception as e:
  521. logger.error(f"qwen3tts txt_to_audio error: {e}")
  522. async def __generate_audio(self, text: str):
  523. """使用 VoxCPM2 本地模型生成音频"""
  524. try:
  525. import torch
  526. import numpy as np
  527. logger.info(f"🎤 VoxCPM2 开始生成音频: {text[:50]}...")
  528. # 检查模型是否加载成功
  529. if self.voxcpm2_model is None:
  530. raise Exception("VoxCPM2 模型未加载")
  531. # 检查参考音频是否存在
  532. if not os.path.exists(self.ref_audio_path):
  533. raise Exception(f"参考音频文件不存在:{self.ref_audio_path}")
  534. # 使用 VoxCPM2 生成音频(纯克隆模式)
  535. generate_start = time.time()
  536. wav = self.voxcpm2_model.generate(
  537. text=text,
  538. reference_wav_path=self.ref_audio_path,
  539. retry_badcase=False, # 禁用 Badcase 重试,提升速度
  540. inference_timesteps=self.inference_timesteps, # 推理步数
  541. cfg_value=self.cfg_value # CFG 值
  542. )
  543. generate_time = time.time() - generate_start
  544. logger.info(f"✅ VoxCPM2 音频生成完成,耗时:{generate_time:.2f}秒")
  545. logger.info(f" 音频采样点数:{wav.shape[0]}")
  546. # VoxCPM2 输出是 48kHz,需要重采样到 16kHz
  547. if len(wav) > 0:
  548. # 重采样到 16kHz
  549. resample_start = time.time()
  550. wav_16k = resampy.resample(x=wav, sr_orig=48000, sr_new=self.sample_rate)
  551. resample_time = time.time() - resample_start
  552. logger.info(f" 重采样到 16kHz 完成,耗时:{resample_time:.2f}秒")
  553. # 转换为 float32
  554. wav_16k = wav_16k.astype(np.float32)
  555. # 写入输入流
  556. # 先将音频保存为 WAV 格式,然后读取到 BytesIO
  557. import io
  558. wav_bytes = io.BytesIO()
  559. sf.write(wav_bytes, wav_16k, self.sample_rate, format='WAV')
  560. wav_bytes.seek(0)
  561. # 读取到 input_stream
  562. wav_data, _ = sf.read(wav_bytes, dtype='float32')
  563. self.input_stream.write(wav_data.tobytes())
  564. logger.info(f" 音频数据写入 input_stream,大小:{self.input_stream.getbuffer().nbytes} bytes")
  565. else:
  566. logger.error("❌ VoxCPM2 生成的音频为空")
  567. raise Exception("生成的音频为空")
  568. except Exception as e:
  569. logger.error(f"❌ VoxCPM2 音频生成失败:{e}")
  570. raise
  571. def __create_bytes_stream(self, byte_stream):
  572. """将字节流转换为音频流"""
  573. stream, sample_rate = sf.read(byte_stream)
  574. logger.info(f'[INFO]tts audio stream {sample_rate}: {stream.shape}')
  575. stream = stream.astype(np.float32)
  576. if stream.ndim > 1:
  577. logger.info(f'[WARN] audio has {stream.shape[1]} channels, only use the first.')
  578. stream = stream[:, 0]
  579. if sample_rate != self.sample_rate and stream.shape[0] > 0:
  580. logger.info(f'[WARN] audio sample rate is {sample_rate}, resampling into {self.sample_rate}.')
  581. stream = resampy.resample(x=stream, sr_orig=sample_rate, sr_new=self.sample_rate)
  582. return stream