| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725 |
- from __future__ import annotations
- import time
- import numpy as np
- import soundfile as sf
- import resampy
- import asyncio
- import os
- import queue
- import json
- import hashlib
- import re
- from queue import Queue
- from io import BytesIO
- from threading import Thread, Event
- from enum import Enum
- from typing import TYPE_CHECKING
- if TYPE_CHECKING:
- from basereal import BaseReal
- from logger import logger
- class State(Enum):
- RUNNING = 0
- PAUSE = 1
- class Qwen3TTS:
- def __init__(self, opt, parent: BaseReal):
- self.opt = opt
- self.parent = parent
-
- self.fps = opt.fps # 20 ms per frame
- self.sample_rate = 16000
- self.chunk = self.sample_rate // self.fps # 320 samples per chunk (20ms * 16000 / 1000)
- self.input_stream = BytesIO()
-
- self.msgqueue = Queue()
- self.high_priority_queue = Queue()
- self.image_description_queue = Queue() # 超优先级队列,用于图像描述
- self.is_playing_image_description = False # 标记是否正在播放图像描述
- self.state = State.RUNNING
- self.interrupted_messages = []
- self.current_msg = None
- self.current_msg_progress = 0
- self.interrupt_flag = Event()
-
- # VoxCPM2 配置(替代 Qwen3-TTS API)
- self.voxcpm2_model_path = getattr(opt, 'VOXCPM2_MODEL_PATH', 'VoxCPM2')
- self.ref_audio_path = getattr(opt, 'VOXCPM2_REF_WAV', 'voice_output.wav')
- self.ref_text = getattr(opt, 'VOXCPM2_REF_TEXT', '你好,买水果,卖水果,新鲜的水果。')
- self.cfg_value = getattr(opt, 'CFG_VALUE', 2.5)
- self.inference_timesteps = getattr(opt, 'INFERENCE_TIMESTEPS', 8)
-
- # 加载 VoxCPM2 模型
- self.voxcpm2_model = None
- self._load_voxcpm2_model()
-
- # 预生成音频配置
- self.pre_gen_dir = "./data/pre_generated_tts"
- self.audio_map = self._load_audio_map()
- self.use_pre_gen = len(self.audio_map) > 0
-
- # Image_Analysis 音频目录配置(修改为 wav/wav 子目录)
- self.image_analysis_dir = "/mnt/nvme1data/Digital_Human/Image_Analysis/wav/wav"
- self.played_audio_files = set() # 已播放的音频文件集合
-
- # 预生成音频循环播放配置
- self.pre_gen_index = 0 # 当前播放索引
- self.is_playing_pre_gen_loop = False # 是否在循环播放预生成音频
-
- logger.info(f"Qwen3TTS 初始化完成:")
- logger.info(f" VoxCPM2 模型路径:{self.voxcpm2_model_path}")
- logger.info(f" 参考音频:{self.ref_audio_path}")
- logger.info(f" 参考文本:{self.ref_text}")
- logger.info(f" CFG 值:{self.cfg_value}")
- logger.info(f" 推理步数:{self.inference_timesteps}")
- logger.info(f" 预生成音频:{len(self.audio_map)} 条")
- logger.info(f" 是否使用预生成:{self.use_pre_gen}")
- logger.info(f" Image_Analysis 目录:{self.image_analysis_dir}")
- logger.info(f" 预生成循环播放:启用")
-
- def _load_voxcpm2_model(self):
- """加载 VoxCPM2 模型(单例模式)"""
- try:
- import torch
- from voxcpm import VoxCPM
-
- # 禁用 TorchDynamo,避免 scaled_dot_product_attention 兼容性错误
- torch._dynamo.config.suppress_errors = True
- torch.compiler.disable()
-
- logger.info(f"🔄 加载 VoxCPM2 模型: {self.voxcpm2_model_path}")
- logger.info(" 禁用 TorchDynamo 以避免兼容性问题")
-
- # 清理显存
- if torch.cuda.is_available():
- torch.cuda.empty_cache()
- logger.info(f" 清理显存完成")
-
- # 加载模型
- self.voxcpm2_model = VoxCPM.from_pretrained(
- self.voxcpm2_model_path,
- load_denoiser=False,
- optimize=False
- )
-
- logger.info("✅ VoxCPM2 模型加载成功")
-
- except Exception as e:
- logger.error(f"❌ VoxCPM2 模型加载失败: {e}")
- logger.error(" 请检查模型路径是否正确,以及 voxcpm 包是否已安装")
- raise
-
- def reset_interrupt_flag(self):
- """重置打断标志"""
- self.interrupt_flag.clear()
-
- def set_interrupt_flag(self):
- """设置打断标志"""
- self.interrupt_flag.set()
-
- def flush_talk(self):
- """停止当前播放并清空待处理的消息队列"""
- with self.msgqueue.mutex:
- remaining_msgs = list(self.msgqueue.queue)
- if remaining_msgs:
- self.interrupted_messages.extend(remaining_msgs)
- self.msgqueue.queue.clear()
-
- if self.current_msg:
- self.interrupted_messages.append(self.current_msg)
-
- self.current_msg = None
- self.state = State.PAUSE
-
- if hasattr(self, 'input_stream') and hasattr(self.input_stream, 'seek') and hasattr(self.input_stream, 'truncate'):
- self.input_stream.seek(0)
- self.input_stream.truncate()
-
- def resume_interrupted(self):
- """恢复播放被中断的消息"""
- if self.interrupted_messages:
- with self.msgqueue.mutex:
- for msg in self.interrupted_messages:
- self.msgqueue.put(msg)
- self.interrupted_messages.clear()
- self.state = State.RUNNING
- return True
- return False
-
- def _load_audio_map(self) -> dict:
- """加载预生成音频映射表"""
- map_path = os.path.join(self.pre_gen_dir, "audio_map.json")
- if os.path.exists(map_path):
- try:
- with open(map_path, 'r', encoding='utf-8') as f:
- return json.load(f)
- except Exception as e:
- logger.warning(f"加载预生成音频映射表失败: {e}")
- return {}
-
- def _get_pre_generated_audio(self, text: str):
- """获取预生成的音频数据,如果不存在返回 None"""
- if not self.use_pre_gen:
- return None
-
- text_hash = hashlib.md5(text.encode('utf-8')).hexdigest()[:16]
- audio_path = os.path.join(self.pre_gen_dir, f"{text_hash}.wav")
-
- if not os.path.exists(audio_path):
- return None
-
- try:
- # 读取音频文件
- stream, sample_rate = sf.read(audio_path, dtype='float32')
-
- # 转为单声道
- if stream.ndim > 1:
- stream = stream[:, 0]
-
- # 重采样到 16kHz
- if sample_rate != self.sample_rate and stream.shape[0] > 0:
- stream = resampy.resample(x=stream, sr_orig=sample_rate, sr_new=self.sample_rate)
-
- return stream
- except Exception as e:
- logger.warning(f"读取预生成音频失败:{e}")
- return None
-
- def _get_newest_image_analysis_audio(self):
- """获取最新的未播放的图片分析音频文件"""
- if not os.path.exists(self.image_analysis_dir):
- return None
-
- # 获取所有 wav 文件并按修改时间排序
- try:
- audio_files = []
- all_files = os.listdir(self.image_analysis_dir)
- logger.info(f"🔍 Image_Analysis 目录共有 {len(all_files)} 个文件")
- logger.info(f" 已播放文件数:{len(self.played_audio_files)}")
-
- for f in all_files:
- if f.endswith('.wav'):
- file_path = os.path.join(self.image_analysis_dir, f)
-
- # 【修改】尝试从文件名中提取时间戳(格式:hifi_clone_YYYYMMDD_HHMMSS.wav)
- match = re.search(r'hifi_clone_(\d{8}_\d{6})', f)
- if match:
- # 从文件名解析时间
- time_str = match.group(1)
- file_timestamp = time.mktime(time.strptime(time_str, "%Y%m%d_%H%M%S"))
- logger.info(f" 📁 {f} - 文件名时间:{time.strftime('%H:%M:%S', time.localtime(file_timestamp))}, 已播放:{f in self.played_audio_files}")
- else:
- # 如果文件名没有时间戳,使用文件修改时间
- file_timestamp = os.path.getmtime(file_path)
- logger.info(f" 📁 {f} - 修改时间:{time.strftime('%H:%M:%S', time.localtime(file_timestamp))}, 已播放:{f in self.played_audio_files}")
-
- is_played = f in self.played_audio_files
- if not is_played:
- audio_files.append((file_path, file_timestamp, f))
-
- if not audio_files:
- logger.info(" ⚠️ 没有未播放的音频文件")
- return None
-
- # 按时间戳排序,最新的在前
- audio_files.sort(key=lambda x: x[1], reverse=True)
-
- newest_file = audio_files[0][2]
- logger.info(f" ✅ 发现最新未播放文件:{newest_file}")
-
- # 返回最新的文件
- return audio_files[0]
- except Exception as e:
- logger.error(f"获取图片分析音频失败:{e}", exc_info=True)
- return None
-
- def _get_next_pre_gen_audio(self):
- """获取下一条预生成音频(循环播放)"""
- if not self.use_pre_gen or len(self.audio_map) == 0:
- return None
-
- try:
- # 获取所有预生成音频文件
- audio_files = sorted([
- f for f in os.listdir(self.pre_gen_dir)
- if f.endswith('.wav')
- ])
-
- if not audio_files:
- return None
-
- # 循环索引
- audio_file = audio_files[self.pre_gen_index % len(audio_files)]
- self.pre_gen_index += 1
-
- # 读取音频
- audio_path = os.path.join(self.pre_gen_dir, audio_file)
- stream, sample_rate = sf.read(audio_path, dtype='float32')
-
- # 转为单声道
- if stream.ndim > 1:
- stream = stream[:, 0]
-
- # 重采样到 16kHz
- if sample_rate != self.sample_rate and stream.shape[0] > 0:
- stream = resampy.resample(x=stream, sr_orig=sample_rate, sr_new=self.sample_rate)
-
- return stream
- except Exception as e:
- logger.error(f"获取预生成音频失败:{e}")
- return None
-
- def _play_image_analysis_audio(self):
- """播放图片分析音频(次高优先级)"""
- result = self._get_newest_image_analysis_audio()
-
- if result is None:
- return False
-
- file_path, file_timestamp, filename = result
-
- try:
- read_start = time.time() # 【时间戳】开始读取文件
- logger.info(f"🎵 播放图片分析音频:{filename}")
- logger.info(f" 音频文件路径:{file_path}")
- logger.info(f" 音频修改时间:{file_timestamp}")
-
- # 读取音频文件
- stream, sample_rate = sf.read(file_path, dtype='float32')
- read_end = time.time() # 【时间戳】读取完成
-
- logger.info(f" 音频原始信息:sample_rate={sample_rate}, shape={stream.shape}, duration={stream.shape[0]/sample_rate:.2f}s")
- logger.info(f" ⏱️ 文件读取耗时:{(read_end - read_start)*1000:.1f}毫秒")
-
- # 转为单声道
- if stream.ndim > 1:
- stream = stream[:, 0]
- logger.info(f" 转换为单声道")
-
- # 重采样到 16kHz
- resample_start = time.time() # 【时间戳】开始重采样
- if sample_rate != self.sample_rate and stream.shape[0] > 0:
- stream = resampy.resample(x=stream, sr_orig=sample_rate, sr_new=self.sample_rate)
- logger.info(f" 重采样到 16kHz")
- resample_end = time.time() # 【时间戳】重采样完成
- logger.info(f" ⏱️ 重采样耗时:{(resample_end - resample_start)*1000:.1f}毫秒")
-
- logger.info(f" 处理后音频信息:shape={stream.shape}, chunks={stream.shape[0]//self.chunk}")
-
- # 播放音频流
- eventpoint = {'status': 'start', 'text': '图片描述'}
- logger.info(f"▶️ 开始调用 _play_audio_stream")
- completed = self._play_audio_stream(stream, f"[图片描述] {filename}", eventpoint)
- logger.info(f"◀️ _play_audio_stream 返回,completed={completed}")
-
- # 标记为已播放
- self.played_audio_files.add(filename)
-
- if completed:
- logger.info(f"✅ 图片分析音频播放完成:{filename}")
- else:
- logger.info(f"⚠️ 图片分析音频被中断:{filename}")
- return completed
-
- except Exception as e:
- logger.error(f"播放图片分析音频失败:{e}", exc_info=True)
- return False
-
- def put_msg_txt(self, msg: str, datainfo: dict = {}):
- """将文本消息放入队列"""
- if len(msg) > 0:
- if len(msg) > 100:
- sentences = re.split(r'([。!?.!?])', msg)
- parts = []
- for i in range(0, len(sentences) - 1, 2):
- sentence = sentences[i]
- punctuation = sentences[i + 1] if i + 1 < len(sentences) else ''
- if sentence.strip():
- parts.append(sentence.strip() + punctuation)
-
- if len(parts) > 1:
- for part in parts:
- if part.strip():
- self.msgqueue.put((part, datainfo))
- return
-
- self.msgqueue.put((msg, datainfo))
-
- def put_high_priority_msg(self, msg: str, datainfo: dict = {}):
- """添加高优先级消息"""
- if len(msg) > 0:
- if len(msg) > 100:
- sentences = re.split(r'([。!?.!?])', msg)
- parts = []
- for i in range(0, len(sentences) - 1, 2):
- sentence = sentences[i]
- punctuation = sentences[i + 1] if i + 1 < len(sentences) else ''
- if sentence.strip():
- parts.append(sentence.strip() + punctuation)
-
- if len(parts) > 1:
- for part in parts:
- if part.strip():
- self.high_priority_queue.put((part, datainfo))
- return
-
- self.high_priority_queue.put((msg, datainfo))
-
- def put_image_description(self, msg: str, datainfo: dict = {}):
- """添加图像描述消息,具有最高优先级,会立即中断当前播放"""
- if len(msg) > 0:
- logger.info(f"收到图像描述消息,长度:{len(msg)}字")
- # 图像描述通常是短句,不需要分割
- self.image_description_queue.put((msg, datainfo))
-
- def render(self, quit_event):
- """启动 TTS 处理线程"""
- process_thread = Thread(target=self.process_tts, args=(quit_event,))
- process_thread.start()
-
- def process_tts(self, quit_event):
- """处理 TTS 消息队列"""
- # 标记是否正在播放预生成音频,用于支持中断后恢复
- was_playing_pre_gen = False
-
- while not quit_event.is_set():
- try:
- msg = None
-
- # ★★★★★ 优先级 1: 用户问答(最高优先级,随时中断任何播放)
- if not self.high_priority_queue.empty():
- msg = self.high_priority_queue.get_nowait()
- logger.info("★★★★★ 处理用户问答(最高优先级) ★★★★★")
- self.state = State.RUNNING
-
- # 播放用户问答
- if msg and len(msg) >= 2 and isinstance(msg[0], str) and msg[0].strip():
- self.txt_to_audio(msg)
-
- # 用户问答播放完成后,如果有预生成音频,继续播放
- if was_playing_pre_gen:
- logger.info("用户问答播放完成,恢复播放预生成音频")
- continue
-
- # ★★★ 优先级 2: Image_Analysis 新音频(次高优先级,可中断预生成音频)
- check_start_time = time.time() # 【时间戳】开始检查的时间
- newest_audio = self._get_newest_image_analysis_audio()
- if newest_audio is not None:
- file_path, file_timestamp, filename = newest_audio
- detect_time = time.time() # 【时间戳】检测到文件的时间
-
- # 【新增】等待文件写入完成
- # 检查文件大小是否稳定(500ms内没有变化)
- logger.info(f"⏳ 等待文件写入完成...")
- wait_start = time.time()
- last_size = 0
- stable_count = 0
- max_wait = 10 # 最多等待10秒
-
- while (time.time() - wait_start) < max_wait:
- try:
- current_size = os.path.getsize(file_path)
- if current_size == last_size and current_size > 0:
- stable_count += 1
- if stable_count >= 3: # 连续3次大小不变,认为文件写入完成
- wait_end = time.time()
- logger.info(f"✅ 文件写入完成,等待耗时:{(wait_end - wait_start)*1000:.0f}ms,文件大小:{current_size} bytes")
- break
- else:
- stable_count = 0
- last_size = current_size
- time.sleep(0.15) # 每150ms检查一次
- except:
- time.sleep(0.15)
-
- time_since_creation = detect_time - file_timestamp # 【计算】文件生成到检测到的时间差
-
- logger.info("★★★ 发现新的图片分析音频,立即播放 ★★★")
- logger.info(f" 文件名:{filename}")
- logger.info(f" 文件创建时间(从文件名):{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(file_timestamp))}")
- logger.info(f" 检测到文件时间:{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(detect_time))}.{int(detect_time % 1 * 1000):03d}")
- logger.info(f" ⏱️ 文件生成到检测耗时:{time_since_creation:.3f}秒")
- logger.info(f" 检查开始时间:{time.strftime('%H:%M:%S', time.localtime(check_start_time))}.{int(check_start_time % 1 * 1000):03d}")
- logger.info(f" ⏱️ 检查耗时:{(detect_time - check_start_time)*1000:.1f}毫秒")
-
- was_playing_pre_gen = True # 标记之前正在播放预生成音频
- play_start_time = time.time() # 【时间戳】开始播放准备的时间
- self._play_image_analysis_audio()
- play_end_time = time.time() # 【时间戳】播放完成的时间
-
- logger.info("=" * 80)
- logger.info("⏱️⏱️⏱️ 【图片分析音频时间统计】 ⏱️⏱️⏱️")
- logger.info(f" 文件创建时间:{time.strftime('%H:%M:%S', time.localtime(file_timestamp))}")
- logger.info(f" 开始播放时间:{time.strftime('%H:%M:%S', time.localtime(play_start_time))}.{int(play_start_time % 1 * 1000):03d}")
- logger.info(f" 播放完成时间:{time.strftime('%H:%M:%S', time.localtime(play_end_time))}.{int(play_end_time % 1 * 1000):03d}")
- logger.info(f" 📊 从文件生成到开始播放:{play_start_time - file_timestamp:.3f}秒")
- logger.info(f" 📊 播放处理耗时(读取+转换):{play_start_time - detect_time:.3f}秒")
- logger.info(f" 📊 实际播放时长:{play_end_time - play_start_time:.3f}秒")
- logger.info(f" 📊 总耗时(生成到播放完):{play_end_time - file_timestamp:.3f}秒")
- logger.info("=" * 80)
-
- logger.info("图片描述播放完成,将继续播放预生成音频")
- continue
-
- # ★ 优先级 3: 循环播放预生成音频(基础优先级,默认状态)
- pre_gen_stream = self._get_next_pre_gen_audio()
- if pre_gen_stream is not None:
- logger.info(f"★ 循环播放预生成音频 #{self.pre_gen_index}")
- eventpoint = {'status': 'start', 'text': '预生成介绍'}
- was_playing_pre_gen = True
- self._play_audio_stream(pre_gen_stream, "[预生成介绍]", eventpoint)
- continue
-
- # 没有预生成音频时,重置标记并等待普通消息
- was_playing_pre_gen = False
- try:
- msg = self.msgqueue.get(block=True, timeout=0.5)
- except queue.Empty:
- continue
-
- if msg and len(msg) >= 2 and isinstance(msg[0], str) and not msg[0].strip():
- continue
-
- self.current_msg = msg
- self.current_msg_progress = 0
-
- # 处理普通消息
- if self.state == State.RUNNING:
- self.txt_to_audio(msg)
-
- self.current_msg = None
- self.current_msg_progress = 0
-
- except Exception as e:
- logger.error(f"process_tts 错误:{e}")
- continue
-
- logger.info('qwen3tts thread stop')
-
- def _play_audio_stream(self, stream: np.ndarray, text: str, textevent: dict) -> bool:
- """播放音频流,返回是否完整播放(带速度控制)"""
- streamlen = stream.shape[0]
- idx = 0
- total_chunks = streamlen // self.chunk
- chunk_count = 0
-
- logger.info(f" 📦 音频流总帧数:{streamlen}, chunk 大小:{self.chunk}, 预计发送 {total_chunks} 个 chunk")
- logger.info(f" ⏱️ 预计播放时长:{total_chunks * 0.02:.2f}秒")
-
- start_time = time.time()
- expected_frame_time = 0.0 # 期望的帧时间戳
- frame_duration = 0.02 # 20ms per frame
-
- while streamlen >= self.chunk and self.state == State.RUNNING:
- if self.interrupt_flag.is_set() or not self.high_priority_queue.empty():
- logger.info(" ⚠️ 播放过程中发现高优先级消息,中断播放")
- return False
-
- eventpoint = {}
- streamlen -= self.chunk
- chunk_count += 1
- if idx == 0:
- eventpoint = {'status': 'start', 'text': text}
- eventpoint.update(**textevent)
- logger.info(f" ▶️ 开始播放第 {chunk_count}/{total_chunks} 个 chunk (START)")
- elif streamlen < self.chunk:
- eventpoint = {'status': 'end', 'text': text}
- eventpoint.update(**textevent)
- logger.info(f" ◀️ 开始播放第 {chunk_count}/{total_chunks} 个 chunk (END)")
- else:
- # 中间的 chunk,每 10 个记录一次
- if chunk_count % 10 == 1:
- logger.info(f" ➡️ 播放第 {chunk_count}/{total_chunks} 个 chunk")
-
- if self.state != State.RUNNING:
- logger.info(f" ⚠️ 状态不是 RUNNING,停止播放")
- return False
-
- if self.interrupt_flag.is_set() or not self.high_priority_queue.empty():
- logger.info(" ⚠️ 发送音频帧前发现高优先级消息,中断播放")
- return False
-
- # 发送音频帧
- self.parent.put_audio_frame(stream[idx:idx + self.chunk], eventpoint)
- idx += self.chunk
-
- # 速度控制:等待到正确的播放时间点
- expected_frame_time += frame_duration
- elapsed = time.time() - start_time
- delay = expected_frame_time - elapsed
-
- if delay > 0.005: # 如果延迟超过 5ms,则等待
- # 计算需要等待的时间
- sleep_time = min(delay, 0.04) # 最多等待 40ms
- time.sleep(sleep_time)
- elif delay < -0.1: # 如果落后超过 100ms,跳过一些帧追赶
- logger.warning(f" ⚠️ 播放落后 {abs(delay):.3f}s,跳过一些帧以追赶进度")
- # 不重置 expected_frame_time,让下一帧继续按正常节奏发送
-
- actual_duration = time.time() - start_time
- logger.info(f" ✅ 所有 {chunk_count} 个 chunk 发送完成,实际耗时:{actual_duration:.2f}s (预期:{chunk_count * 0.02:.2f}s)")
- return True
-
- def txt_to_audio(self, msg: tuple[str, dict]):
- """将文本转换为音频(优先使用预生成音频)"""
- text, textevent = msg
- t = time.time()
-
- if self.state != State.RUNNING:
- return
-
- if not text.strip():
- return
-
- self.reset_interrupt_flag()
-
- if not self.high_priority_queue.empty():
- logger.info("发现高优先级消息,跳过当前普通消息处理")
- return
-
- try:
- # 首先尝试使用预生成音频
- pre_gen_stream = self._get_pre_generated_audio(text)
-
- if pre_gen_stream is not None:
- # 使用预生成音频
- logger.info(f"使用预生成音频: {text[:30]}...")
-
- if self.state != State.RUNNING:
- return
-
- if self.interrupt_flag.is_set() or not self.high_priority_queue.empty():
- return
-
- self._play_audio_stream(pre_gen_stream, text, textevent)
- logger.info(f'-------预生成音频播放完成,耗时:{time.time()-t:.4f}s')
- return
-
- # 没有预生成音频,调用 API 生成
- logger.info(f"调用 API 生成音频: {text[:30]}...")
-
- loop = asyncio.new_event_loop()
- asyncio.set_event_loop(loop)
-
- task = loop.create_task(self.__generate_audio(text))
-
- while not task.done():
- if self.interrupt_flag.is_set() or not self.high_priority_queue.empty():
- task.cancel()
- logger.info("TTS 请求被高优先级消息中断")
- break
- loop.run_until_complete(asyncio.sleep(0.05))
-
- try:
- loop.run_until_complete(task)
- except asyncio.CancelledError:
- logger.info("TTS 请求被中断")
- self.input_stream.seek(0)
- self.input_stream.truncate()
- return
-
- logger.info(f'-------qwen3 tts time:{time.time()-t:.4f}s')
-
- if self.input_stream.getbuffer().nbytes <= 0:
- logger.error('qwen3 tts err!!!!!')
- return
-
- self.input_stream.seek(0)
- stream = self.__create_bytes_stream(self.input_stream)
-
- if self.state != State.RUNNING:
- self.input_stream.seek(0)
- self.input_stream.truncate()
- return
-
- if self.interrupt_flag.is_set() or not self.high_priority_queue.empty():
- self.input_stream.seek(0)
- self.input_stream.truncate()
- return
-
- self._play_audio_stream(stream, text, textevent)
-
- self.input_stream.seek(0)
- self.input_stream.truncate()
-
- except Exception as e:
- logger.error(f"qwen3tts txt_to_audio error: {e}")
-
- async def __generate_audio(self, text: str):
- """使用 VoxCPM2 本地模型生成音频"""
- try:
- import torch
- import numpy as np
-
- logger.info(f"🎤 VoxCPM2 开始生成音频: {text[:50]}...")
-
- # 检查模型是否加载成功
- if self.voxcpm2_model is None:
- raise Exception("VoxCPM2 模型未加载")
-
- # 检查参考音频是否存在
- if not os.path.exists(self.ref_audio_path):
- raise Exception(f"参考音频文件不存在:{self.ref_audio_path}")
-
- # 使用 VoxCPM2 生成音频(纯克隆模式)
- generate_start = time.time()
- wav = self.voxcpm2_model.generate(
- text=text,
- reference_wav_path=self.ref_audio_path,
- retry_badcase=False, # 禁用 Badcase 重试,提升速度
- inference_timesteps=self.inference_timesteps, # 推理步数
- cfg_value=self.cfg_value # CFG 值
- )
- generate_time = time.time() - generate_start
-
- logger.info(f"✅ VoxCPM2 音频生成完成,耗时:{generate_time:.2f}秒")
- logger.info(f" 音频采样点数:{wav.shape[0]}")
-
- # VoxCPM2 输出是 48kHz,需要重采样到 16kHz
- if len(wav) > 0:
- # 重采样到 16kHz
- resample_start = time.time()
- wav_16k = resampy.resample(x=wav, sr_orig=48000, sr_new=self.sample_rate)
- resample_time = time.time() - resample_start
- logger.info(f" 重采样到 16kHz 完成,耗时:{resample_time:.2f}秒")
-
- # 转换为 float32
- wav_16k = wav_16k.astype(np.float32)
-
- # 写入输入流
- # 先将音频保存为 WAV 格式,然后读取到 BytesIO
- import io
- wav_bytes = io.BytesIO()
- sf.write(wav_bytes, wav_16k, self.sample_rate, format='WAV')
- wav_bytes.seek(0)
-
- # 读取到 input_stream
- wav_data, _ = sf.read(wav_bytes, dtype='float32')
- self.input_stream.write(wav_data.tobytes())
-
- logger.info(f" 音频数据写入 input_stream,大小:{self.input_stream.getbuffer().nbytes} bytes")
- else:
- logger.error("❌ VoxCPM2 生成的音频为空")
- raise Exception("生成的音频为空")
-
- except Exception as e:
- logger.error(f"❌ VoxCPM2 音频生成失败:{e}")
- raise
- def __create_bytes_stream(self, byte_stream):
- """将字节流转换为音频流"""
- stream, sample_rate = sf.read(byte_stream)
- logger.info(f'[INFO]tts audio stream {sample_rate}: {stream.shape}')
- stream = stream.astype(np.float32)
- if stream.ndim > 1:
- logger.info(f'[WARN] audio has {stream.shape[1]} channels, only use the first.')
- stream = stream[:, 0]
-
- if sample_rate != self.sample_rate and stream.shape[0] > 0:
- logger.info(f'[WARN] audio sample rate is {sample_rate}, resampling into {self.sample_rate}.')
- stream = resampy.resample(x=stream, sr_orig=sample_rate, sr_new=self.sample_rate)
- return stream
|