""" VoxCPM2 API 调用模式 TTS 通过 HTTP API 调用独立的 VoxCPM2 模型服务,避免在数字人进程中加载模型 """ from __future__ import annotations import time import os import re import hashlib import numpy as np import soundfile as sf import resampy import requests from io import BytesIO from queue import Queue from threading import Thread, Event from enum import Enum from typing import TYPE_CHECKING if TYPE_CHECKING: from basereal import BaseReal from logger import logger class State(Enum): RUNNING = 0 PAUSE = 1 class VoxCPM2APITTS: """通过 API 调用 VoxCPM2 语音合成服务""" def __init__(self, opt, parent: BaseReal): self.opt = opt self.parent = parent # API 配置(中性模式:降低 CFG,不使用 prompt) self.api_url = getattr(opt, 'VOXCPM2_API_URL', 'http://183.252.196.135:6003') self.ref_audio_path = getattr(opt, 'VOXCPM2_REF_WAV', 'voice_output.wav') self.ref_text = getattr(opt, 'VOXCPM2_REF_TEXT', '你好,买水果,卖水果,新鲜的水果。') # 中性模式:降低 CFG 值,减少情绪模仿 self.cfg_value = getattr(opt, 'CFG_VALUE', 1.5) # 从 2.0 降低到 1.5 self.inference_timesteps = getattr(opt, 'INFERENCE_TIMESTEPS', 10) # 音频参数 self.fps = opt.fps self.sample_rate = 16000 self.chunk = self.sample_rate // self.fps # 320 samples per chunk (20ms) # 消息队列 self.msgqueue = Queue() self.high_priority_queue = Queue() self.image_description_queue = Queue() self.is_playing_image_description = False self.state = State.RUNNING self.interrupted_messages = [] self.current_msg = None self.current_msg_progress = 0 self.interrupt_flag = Event() # 预生成音频配置(已禁用,使用实时克隆) self.pre_gen_dir = "./data/pre_generated_tts" self.audio_map = {} # 不再加载预生成音频 self.use_pre_gen = False # 禁用预生成音频 # Image_Analysis 音频目录配置 self.image_analysis_dir = "/Users/alien/Desktop/Digital_Human/Image_Analysis/wav/wav" self.played_audio_files = set() # 实时克隆音频保存目录 self.kelong_dir = "/Users/alien/Desktop/Digital_Human/Image_Analysis/knowledge_kelong" os.makedirs(self.kelong_dir, exist_ok=True) logger.info(f"📁 实时克隆音频保存目录:{self.kelong_dir}") # 移除预生成音频循环播放配置 # self.pre_gen_index = 0 # 不再需要 # self.is_playing_pre_gen_loop = False # 不再需要 # 检查 API 服务 self._check_api_service() logger.info(f"VoxCPM2 API TTS 初始化完成(实时克隆模式):") logger.info(f" API 地址:{self.api_url}") logger.info(f" 参考音频:{self.ref_audio_path}") logger.info(f" 参考文本:{self.ref_text}") logger.info(f" CFG 值:{self.cfg_value}") logger.info(f" 推理步数:{self.inference_timesteps}") logger.info(f" 预生成音频:已禁用(使用实时克隆)") logger.info(f" 克隆音频保存:{self.kelong_dir}") logger.info(f" Image_Analysis 目录:{self.image_analysis_dir}") def _check_api_service(self): """检查 API 服务是否可用""" try: logger.info(f"🔍 检查 VoxCPM2 API 服务:{self.api_url}/health") resp = requests.get(f"{self.api_url}/health", timeout=5) if resp.status_code == 200: health = resp.json() logger.info(f"✅ VoxCPM2 API 服务正常") logger.info(f" 模型:{health.get('model', 'Unknown')}") logger.info(f" 设备:{health.get('device', 'Unknown')}") logger.info(f" 采样率:{health.get('sample_rate', 'Unknown')} Hz") else: raise Exception(f"API 服务异常:HTTP {resp.status_code}") except requests.exceptions.ConnectionError: logger.error(f"❌ 无法连接到 VoxCPM2 API 服务:{self.api_url}") logger.error("请先启动 API 服务:") logger.error(" cd /mnt/nvme1data/model && python voxcpm2_api.py") raise except Exception as e: logger.error(f"❌ VoxCPM2 API 服务检查失败:{e}") raise def reset_interrupt_flag(self): """重置打断标志""" self.interrupt_flag.clear() def set_interrupt_flag(self): """设置打断标志""" self.interrupt_flag.set() def flush_talk(self): """停止当前播放并清空待处理的消息队列""" with self.msgqueue.mutex: remaining_msgs = list(self.msgqueue.queue) if remaining_msgs: self.interrupted_messages.extend(remaining_msgs) self.msgqueue.queue.clear() if self.current_msg: self.interrupted_messages.append(self.current_msg) self.current_msg = None self.state = State.PAUSE def resume_interrupted(self): """恢复播放被中断的消息""" if self.interrupted_messages: with self.msgqueue.mutex: for msg in self.interrupted_messages: self.msgqueue.put(msg) self.interrupted_messages.clear() self.state = State.RUNNING return True return False def _load_audio_map(self) -> dict: """加载预生成音频映射表""" import json import os map_path = os.path.join(self.pre_gen_dir, "audio_map.json") if os.path.exists(map_path): try: with open(map_path, 'r', encoding='utf-8') as f: return json.load(f) except Exception as e: logger.warning(f"加载预生成音频映射表失败: {e}") return {} def _get_pre_generated_audio(self, text: str): """获取预生成的音频数据,如果不存在返回 None""" import hashlib import os if not self.use_pre_gen: return None text_hash = hashlib.md5(text.encode('utf-8')).hexdigest()[:16] audio_path = os.path.join(self.pre_gen_dir, f"{text_hash}.wav") if not os.path.exists(audio_path): return None try: stream, sample_rate = sf.read(audio_path, dtype='float32') if stream.ndim > 1: stream = stream[:, 0] if sample_rate != self.sample_rate and stream.shape[0] > 0: stream = resampy.resample(x=stream, sr_orig=sample_rate, sr_new=self.sample_rate) return stream except Exception as e: logger.warning(f"读取预生成音频失败:{e}") return None def _get_newest_image_analysis_audio(self): """获取最新的未播放的图片分析音频文件""" import re import time if not os.path.exists(self.image_analysis_dir): return None try: audio_files = [] all_files = os.listdir(self.image_analysis_dir) logger.info(f"🔍 Image_Analysis 目录共有 {len(all_files)} 个文件") logger.info(f" 已播放文件数:{len(self.played_audio_files)}") for f in all_files: if f.endswith('.wav'): file_path = os.path.join(self.image_analysis_dir, f) match = re.search(r'hifi_clone_(\d{8}_\d{6})', f) if match: time_str = match.group(1) file_timestamp = time.mktime(time.strptime(time_str, "%Y%m%d_%H%M%S")) logger.info(f" 📁 {f} - 文件名时间:{time.strftime('%H:%M:%S', time.localtime(file_timestamp))}, 已播放:{f in self.played_audio_files}") else: file_timestamp = os.path.getmtime(file_path) logger.info(f" 📁 {f} - 修改时间:{time.strftime('%H:%M:%S', time.localtime(file_timestamp))}, 已播放:{f in self.played_audio_files}") is_played = f in self.played_audio_files if not is_played: audio_files.append((file_path, file_timestamp, f)) if not audio_files: logger.info(" ⚠️ 没有未播放的音频文件") return None audio_files.sort(key=lambda x: x[1], reverse=True) newest_file = audio_files[0][2] logger.info(f" ✅ 发现最新未播放文件:{newest_file}") return audio_files[0] except Exception as e: logger.error(f"获取图片分析音频失败:{e}", exc_info=True) return None def _get_next_pre_gen_audio(self): """获取下一条预生成音频(循环播放)""" if not self.use_pre_gen or len(self.audio_map) == 0: return None try: audio_files = sorted([ f for f in os.listdir(self.pre_gen_dir) if f.endswith('.wav') ]) if not audio_files: return None audio_file = audio_files[self.pre_gen_index % len(audio_files)] self.pre_gen_index += 1 audio_path = os.path.join(self.pre_gen_dir, audio_file) stream, sample_rate = sf.read(audio_path, dtype='float32') if stream.ndim > 1: stream = stream[:, 0] if sample_rate != self.sample_rate and stream.shape[0] > 0: stream = resampy.resample(x=stream, sr_orig=sample_rate, sr_new=self.sample_rate) return stream except Exception as e: logger.error(f"获取预生成音频失败:{e}") return None def generate_audio_via_api(self, text: str, save_to_kelong: bool = True) -> np.ndarray: """通过 API 生成音频,返回 16kHz numpy 数组 Args: text: 要转换的文本 save_to_kelong: 是否保存到 knowledge_kelong 目录 """ try: logger.info(f"🎤 调用 VoxCPM2 API 生成音频(中性模式): {text[:30]}...") # 中性模式配置:只使用 reference_wav_path,不使用 prompt payload = { "text": text, "reference_wav_path": self.ref_audio_path, # 音色参考 # 不使用 prompt_wav_path 和 prompt_text(避免情绪模仿) "cfg_value": self.cfg_value, # 降低 CFG,减少情绪 "inference_timesteps": self.inference_timesteps } start_time = time.time() resp = requests.post( f"{self.api_url}/v1/tts/generate_audio", json=payload, timeout=120 ) if resp.status_code != 200: raise Exception(f"API 返回错误:HTTP {resp.status_code} - {resp.text}") api_sample_rate = int(resp.headers.get('X-Sample-Rate', 48000)) generate_time = time.time() - start_time logger.info(f"✅ API 音频生成完成,耗时:{generate_time:.2f}秒") logger.info(f" API 返回采样率:{api_sample_rate} Hz") wav_data = BytesIO(resp.content) stream, sr = sf.read(wav_data, dtype='float32') if stream.ndim > 1: stream = stream[:, 0] # 保存原始 48kHz 音频到 knowledge_kelong 目录 if save_to_kelong: try: timestamp = int(time.time()) # 使用文本前 20 个字符作为文件名(去除特殊字符) safe_text = re.sub(r'[\\/*?:"<>|]', '', text[:20]) filename = f"{timestamp}_{safe_text}.wav" kelong_path = os.path.join(self.kelong_dir, filename) # 保存 48kHz 原始音频(重采样之前) sf.write(kelong_path, stream, sr, format='WAV') logger.info(f"💾 已保存克隆音频:{kelong_path}") logger.info(f" 采样率:{sr} Hz,文件大小:{os.path.getsize(kelong_path)} bytes") except Exception as e: logger.error(f"保存克隆音频失败:{e}") # 重采样到 16kHz(数字人播放需要) if sr != self.sample_rate: logger.info(f" 重采样:{sr} Hz → {self.sample_rate} Hz") resample_start = time.time() # 使用高质量重采样滤波器,避免滋啦声和高频噪声 stream = resampy.resample( x=stream, sr_orig=sr, sr_new=self.sample_rate, filter='kaiser_best' # 使用 Kaiser 最佳滤波器 ) logger.info(f" 重采样完成,耗时:{time.time()-resample_start:.2f}秒") # 添加淡入淡出处理,避免首尾爆音和咔嗒声 fade_duration = 0.015 # 15ms 淡入淡出 fade_samples = int(fade_duration * self.sample_rate) if len(stream) > 2 * fade_samples: # 创建淡入淡出曲线(使用余弦曲线更平滑) fade_in = (1.0 - np.cos(np.linspace(0, np.pi, fade_samples))) / 2.0 fade_out = (1.0 - np.cos(np.linspace(np.pi, 0, fade_samples))) / 2.0 # 应用淡入淡出 stream[:fade_samples] *= fade_in stream[-fade_samples:] *= fade_out logger.info(f" 已应用淡入淡出处理({fade_duration*1000:.0f}ms)") logger.info(f" 音频采样点数:{stream.shape[0]}") logger.info(f" 音频时长:{stream.shape[0] / self.sample_rate:.2f}s") return stream except requests.exceptions.Timeout: logger.error("❌ API 请求超时(120秒)") raise Exception("语音生成超时") except Exception as e: logger.error(f"❌ API 调用失败:{e}") raise def _play_audio_stream(self, stream: np.ndarray, text: str, textevent: dict) -> bool: """播放音频流,返回是否完整播放""" streamlen = stream.shape[0] idx = 0 total_chunks = streamlen // self.chunk chunk_count = 0 logger.info(f" 📦 音频流总帧数:{streamlen}, chunk 大小:{self.chunk}, 预计发送 {total_chunks} 个 chunk") logger.info(f" ⏱️ 预计播放时长:{total_chunks * 0.02:.2f}秒") start_time = time.time() expected_frame_time = 0.0 frame_duration = 0.02 while streamlen >= self.chunk and self.state == State.RUNNING: if self.interrupt_flag.is_set() or not self.high_priority_queue.empty(): logger.info(" ⚠️ 播放过程中发现高优先级消息,中断播放") return False eventpoint = {} streamlen -= self.chunk chunk_count += 1 if idx == 0: eventpoint = {'status': 'start', 'text': text} eventpoint.update(**textevent) logger.info(f" ▶️ 开始播放第 {chunk_count}/{total_chunks} 个 chunk (START)") elif streamlen < self.chunk: eventpoint = {'status': 'end', 'text': text} eventpoint.update(**textevent) logger.info(f" ◀️ 开始播放第 {chunk_count}/{total_chunks} 个 chunk (END)") else: if chunk_count % 10 == 1: logger.info(f" ➡️ 播放第 {chunk_count}/{total_chunks} 个 chunk") if self.state != State.RUNNING: logger.info(f" ⚠️ 状态不是 RUNNING,停止播放") return False if self.interrupt_flag.is_set() or not self.high_priority_queue.empty(): logger.info(" ⚠️ 发送音频帧前发现高优先级消息,中断播放") return False self.parent.put_audio_frame(stream[idx:idx + self.chunk], eventpoint) idx += self.chunk expected_frame_time += frame_duration elapsed = time.time() - start_time delay = expected_frame_time - elapsed if delay > 0.005: sleep_time = min(delay, 0.04) time.sleep(sleep_time) elif delay < -0.1: logger.warning(f" ⚠️ 播放落后 {abs(delay):.3f}s,跳过一些帧以追赶进度") actual_duration = time.time() - start_time logger.info(f" ✅ 所有 {chunk_count} 个 chunk 发送完成,实际耗时:{actual_duration:.2f}s (预期:{chunk_count * 0.02:.2f}s)") return True def put_msg_txt(self, msg: str, datainfo: dict = {}): """将文本消息放入队列""" if len(msg) > 0: self.msgqueue.put((msg, datainfo)) def put_high_priority_msg(self, msg: str, datainfo: dict = {}): """添加高优先级消息""" if len(msg) > 0: self.high_priority_queue.put((msg, datainfo)) def put_image_description(self, msg: str, datainfo: dict = {}): """添加图像描述消息""" if len(msg) > 0: logger.info(f"收到图像描述消息,长度:{len(msg)}字") self.image_description_queue.put((msg, datainfo)) def render(self, quit_event): """启动 TTS 处理线程""" process_thread = Thread(target=self.process_tts, args=(quit_event,)) process_thread.start() def process_tts(self, quit_event): """处理 TTS 消息队列""" was_playing_pre_gen = False while not quit_event.is_set(): try: msg = None # 优先级 1: 用户问答 if not self.high_priority_queue.empty(): msg = self.high_priority_queue.get_nowait() logger.info("★★★★★ 处理用户问答(最高优先级) ★★★★★") self.state = State.RUNNING if msg and len(msg) >= 2 and isinstance(msg[0], str) and msg[0].strip(): self.txt_to_audio(msg) if was_playing_pre_gen: logger.info("用户问答播放完成,恢复播放预生成音频") continue # 优先级 2: Image_Analysis 新音频 check_start_time = time.time() newest_audio = self._get_newest_image_analysis_audio() if newest_audio is not None: file_path, file_timestamp, filename = newest_audio detect_time = time.time() logger.info("⏳ 等待文件写入完成...") wait_start = time.time() last_size = 0 stable_count = 0 max_wait = 10 while (time.time() - wait_start) < max_wait: try: current_size = os.path.getsize(file_path) if current_size == last_size and current_size > 0: stable_count += 1 if stable_count >= 3: wait_end = time.time() logger.info(f"✅ 文件写入完成,等待耗时:{(wait_end - wait_start)*1000:.0f}ms,文件大小:{current_size} bytes") break else: stable_count = 0 last_size = current_size time.sleep(0.15) except: time.sleep(0.15) time_since_creation = detect_time - file_timestamp logger.info("★★★ 发现新的图片分析音频,立即播放 ★★★") logger.info(f" 文件名:{filename}") logger.info(f" ⏱️ 文件生成到检测耗时:{time_since_creation:.3f}秒") was_playing_pre_gen = True play_start_time = time.time() self._play_image_analysis_audio() play_end_time = time.time() logger.info("=" * 80) logger.info("⏱️⏱️⏱️ 【图片分析音频时间统计】 ⏱️⏱️⏱️") logger.info(f" 📊 从文件生成到开始播放:{play_start_time - file_timestamp:.3f}秒") logger.info(f" 📊 实际播放时长:{play_end_time - play_start_time:.3f}秒") logger.info("=" * 80) logger.info("图片描述播放完成,将继续播放预生成音频") continue # 优先级 3: 循环播放预生成音频 pre_gen_stream = self._get_next_pre_gen_audio() if pre_gen_stream is not None: logger.info(f"★ 循环播放预生成音频 #{self.pre_gen_index}") eventpoint = {'status': 'start', 'text': '预生成介绍'} was_playing_pre_gen = True self._play_audio_stream(pre_gen_stream, "[预生成介绍]", eventpoint) continue was_playing_pre_gen = False try: msg = self.msgqueue.get(block=True, timeout=0.5) except queue.Empty: continue if msg and len(msg) >= 2 and isinstance(msg[0], str) and not msg[0].strip(): continue self.current_msg = msg self.current_msg_progress = 0 if self.state == State.RUNNING: self.txt_to_audio(msg) self.current_msg = None self.current_msg_progress = 0 except Exception as e: logger.error(f"process_tts 错误:{e}") continue logger.info('voxcpm2_api_tts thread stop') def txt_to_audio(self, msg: tuple[str, dict]): """将文本转换为音频(通过 API)""" text, textevent = msg t = time.time() if self.state != State.RUNNING: return if not text.strip(): return self.reset_interrupt_flag() if not self.high_priority_queue.empty(): logger.info("发现高优先级消息,跳过当前普通消息处理") return try: # 首先尝试使用预生成音频 pre_gen_stream = self._get_pre_generated_audio(text) if pre_gen_stream is not None: logger.info(f"使用预生成音频: {text[:30]}...") if self.state != State.RUNNING: return if self.interrupt_flag.is_set() or not self.high_priority_queue.empty(): return self._play_audio_stream(pre_gen_stream, text, textevent) logger.info(f'-------预生成音频播放完成,耗时:{time.time()-t:.4f}s') return # 调用 API 生成音频 logger.info(f"调用 API 生成音频: {text[:30]}...") stream = self.generate_audio_via_api(text) if self.state != State.RUNNING: return if self.interrupt_flag.is_set() or not self.high_priority_queue.empty(): return self._play_audio_stream(stream, text, textevent) except Exception as e: logger.error(f"VoxCPM2 API TTS 失败:{e}") def _play_image_analysis_audio(self): """播放图片分析音频""" result = self._get_newest_image_analysis_audio() if result is None: return False file_path, file_timestamp, filename = result try: logger.info(f"🎵 播放图片分析音频:{filename}") logger.info(f" 音频文件路径:{file_path}") stream, sample_rate = sf.read(file_path, dtype='float32') logger.info(f" 音频原始信息:sample_rate={sample_rate}, shape={stream.shape}") if stream.ndim > 1: stream = stream[:, 0] if sample_rate != self.sample_rate and stream.shape[0] > 0: stream = resampy.resample(x=stream, sr_orig=sample_rate, sr_new=self.sample_rate) logger.info(f" 处理后音频信息:shape={stream.shape}, chunks={stream.shape[0]//self.chunk}") eventpoint = {'status': 'start', 'text': '图片描述'} completed = self._play_audio_stream(stream, f"[图片描述] {filename}", eventpoint) self.played_audio_files.add(filename) if completed: logger.info(f"✅ 图片分析音频播放完成:{filename}") else: logger.info(f"⚠️ 图片分析音频被中断:{filename}") return completed except Exception as e: logger.error(f"播放图片分析音频失败:{e}", exc_info=True) return False