| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624 |
- """
- VoxCPM2 API 调用模式 TTS
- 通过 HTTP API 调用独立的 VoxCPM2 模型服务,避免在数字人进程中加载模型
- """
- from __future__ import annotations
- import time
- import os
- import re
- import hashlib
- import numpy as np
- import soundfile as sf
- import resampy
- import requests
- from io import BytesIO
- from queue import Queue
- from threading import Thread, Event
- from enum import Enum
- from typing import TYPE_CHECKING
- if TYPE_CHECKING:
- from basereal import BaseReal
- from logger import logger
- class State(Enum):
- RUNNING = 0
- PAUSE = 1
- class VoxCPM2APITTS:
- """通过 API 调用 VoxCPM2 语音合成服务"""
-
- def __init__(self, opt, parent: BaseReal):
- self.opt = opt
- self.parent = parent
-
- # API 配置(中性模式:降低 CFG,不使用 prompt)
- self.api_url = getattr(opt, 'VOXCPM2_API_URL', 'http://localhost:6003')
- self.ref_audio_path = getattr(opt, 'VOXCPM2_REF_WAV', 'voice_output.wav')
- self.ref_text = getattr(opt, 'VOXCPM2_REF_TEXT', '你好,买水果,卖水果,新鲜的水果。')
- # 中性模式:降低 CFG 值,减少情绪模仿
- self.cfg_value = getattr(opt, 'CFG_VALUE', 1.5) # 从 2.0 降低到 1.5
- self.inference_timesteps = getattr(opt, 'INFERENCE_TIMESTEPS', 10)
-
- # 音频参数
- self.fps = opt.fps
- self.sample_rate = 16000
- self.chunk = self.sample_rate // self.fps # 320 samples per chunk (20ms)
-
- # 消息队列
- self.msgqueue = Queue()
- self.high_priority_queue = Queue()
- self.image_description_queue = Queue()
- self.is_playing_image_description = False
- self.state = State.RUNNING
- self.interrupted_messages = []
- self.current_msg = None
- self.current_msg_progress = 0
- self.interrupt_flag = Event()
-
- # 预生成音频配置(已禁用,使用实时克隆)
- self.pre_gen_dir = "./data/pre_generated_tts"
- self.audio_map = {} # 不再加载预生成音频
- self.use_pre_gen = False # 禁用预生成音频
-
- # Image_Analysis 音频目录配置
- self.image_analysis_dir = "/mnt/nvme1data/Digital_Human/Image_Analysis/wav/wav"
- self.played_audio_files = set()
-
- # 实时克隆音频保存目录
- self.kelong_dir = "/mnt/nvme1data/Digital_Human/Image_Analysis/knowledge_kelong"
- os.makedirs(self.kelong_dir, exist_ok=True)
- logger.info(f"📁 实时克隆音频保存目录:{self.kelong_dir}")
-
- # 移除预生成音频循环播放配置
- # self.pre_gen_index = 0 # 不再需要
- # self.is_playing_pre_gen_loop = False # 不再需要
-
- # 检查 API 服务
- self._check_api_service()
-
- logger.info(f"VoxCPM2 API TTS 初始化完成(实时克隆模式):")
- logger.info(f" API 地址:{self.api_url}")
- logger.info(f" 参考音频:{self.ref_audio_path}")
- logger.info(f" 参考文本:{self.ref_text}")
- logger.info(f" CFG 值:{self.cfg_value}")
- logger.info(f" 推理步数:{self.inference_timesteps}")
- logger.info(f" 预生成音频:已禁用(使用实时克隆)")
- logger.info(f" 克隆音频保存:{self.kelong_dir}")
- logger.info(f" Image_Analysis 目录:{self.image_analysis_dir}")
-
- def _check_api_service(self):
- """检查 API 服务是否可用"""
- try:
- logger.info(f"🔍 检查 VoxCPM2 API 服务:{self.api_url}/health")
- resp = requests.get(f"{self.api_url}/health", timeout=5)
- if resp.status_code == 200:
- health = resp.json()
- logger.info(f"✅ VoxCPM2 API 服务正常")
- logger.info(f" 模型:{health.get('model', 'Unknown')}")
- logger.info(f" 设备:{health.get('device', 'Unknown')}")
- logger.info(f" 采样率:{health.get('sample_rate', 'Unknown')} Hz")
- else:
- raise Exception(f"API 服务异常:HTTP {resp.status_code}")
- except requests.exceptions.ConnectionError:
- logger.error(f"❌ 无法连接到 VoxCPM2 API 服务:{self.api_url}")
- logger.error("请先启动 API 服务:")
- logger.error(" cd /mnt/nvme1data/model && python voxcpm2_api.py")
- raise
- except Exception as e:
- logger.error(f"❌ VoxCPM2 API 服务检查失败:{e}")
- raise
-
- def reset_interrupt_flag(self):
- """重置打断标志"""
- self.interrupt_flag.clear()
-
- def set_interrupt_flag(self):
- """设置打断标志"""
- self.interrupt_flag.set()
-
- def flush_talk(self):
- """停止当前播放并清空待处理的消息队列"""
- with self.msgqueue.mutex:
- remaining_msgs = list(self.msgqueue.queue)
- if remaining_msgs:
- self.interrupted_messages.extend(remaining_msgs)
- self.msgqueue.queue.clear()
-
- if self.current_msg:
- self.interrupted_messages.append(self.current_msg)
-
- self.current_msg = None
- self.state = State.PAUSE
-
- def resume_interrupted(self):
- """恢复播放被中断的消息"""
- if self.interrupted_messages:
- with self.msgqueue.mutex:
- for msg in self.interrupted_messages:
- self.msgqueue.put(msg)
- self.interrupted_messages.clear()
- self.state = State.RUNNING
- return True
- return False
-
- def _load_audio_map(self) -> dict:
- """加载预生成音频映射表"""
- import json
- import os
-
- map_path = os.path.join(self.pre_gen_dir, "audio_map.json")
- if os.path.exists(map_path):
- try:
- with open(map_path, 'r', encoding='utf-8') as f:
- return json.load(f)
- except Exception as e:
- logger.warning(f"加载预生成音频映射表失败: {e}")
- return {}
-
- def _get_pre_generated_audio(self, text: str):
- """获取预生成的音频数据,如果不存在返回 None"""
- import hashlib
- import os
-
- if not self.use_pre_gen:
- return None
-
- text_hash = hashlib.md5(text.encode('utf-8')).hexdigest()[:16]
- audio_path = os.path.join(self.pre_gen_dir, f"{text_hash}.wav")
-
- if not os.path.exists(audio_path):
- return None
-
- try:
- stream, sample_rate = sf.read(audio_path, dtype='float32')
- if stream.ndim > 1:
- stream = stream[:, 0]
- if sample_rate != self.sample_rate and stream.shape[0] > 0:
- stream = resampy.resample(x=stream, sr_orig=sample_rate, sr_new=self.sample_rate)
- return stream
- except Exception as e:
- logger.warning(f"读取预生成音频失败:{e}")
- return None
-
- def _get_newest_image_analysis_audio(self):
- """获取最新的未播放的图片分析音频文件"""
- import re
- import time
-
- if not os.path.exists(self.image_analysis_dir):
- return None
-
- try:
- audio_files = []
- all_files = os.listdir(self.image_analysis_dir)
- logger.info(f"🔍 Image_Analysis 目录共有 {len(all_files)} 个文件")
- logger.info(f" 已播放文件数:{len(self.played_audio_files)}")
-
- for f in all_files:
- if f.endswith('.wav'):
- file_path = os.path.join(self.image_analysis_dir, f)
- match = re.search(r'hifi_clone_(\d{8}_\d{6})', f)
- if match:
- time_str = match.group(1)
- file_timestamp = time.mktime(time.strptime(time_str, "%Y%m%d_%H%M%S"))
- logger.info(f" 📁 {f} - 文件名时间:{time.strftime('%H:%M:%S', time.localtime(file_timestamp))}, 已播放:{f in self.played_audio_files}")
- else:
- file_timestamp = os.path.getmtime(file_path)
- logger.info(f" 📁 {f} - 修改时间:{time.strftime('%H:%M:%S', time.localtime(file_timestamp))}, 已播放:{f in self.played_audio_files}")
-
- is_played = f in self.played_audio_files
- if not is_played:
- audio_files.append((file_path, file_timestamp, f))
-
- if not audio_files:
- logger.info(" ⚠️ 没有未播放的音频文件")
- return None
-
- audio_files.sort(key=lambda x: x[1], reverse=True)
- newest_file = audio_files[0][2]
- logger.info(f" ✅ 发现最新未播放文件:{newest_file}")
- return audio_files[0]
- except Exception as e:
- logger.error(f"获取图片分析音频失败:{e}", exc_info=True)
- return None
-
- def _get_next_pre_gen_audio(self):
- """获取下一条预生成音频(循环播放)"""
- if not self.use_pre_gen or len(self.audio_map) == 0:
- return None
-
- try:
- audio_files = sorted([
- f for f in os.listdir(self.pre_gen_dir)
- if f.endswith('.wav')
- ])
-
- if not audio_files:
- return None
-
- audio_file = audio_files[self.pre_gen_index % len(audio_files)]
- self.pre_gen_index += 1
-
- audio_path = os.path.join(self.pre_gen_dir, audio_file)
- stream, sample_rate = sf.read(audio_path, dtype='float32')
-
- if stream.ndim > 1:
- stream = stream[:, 0]
-
- if sample_rate != self.sample_rate and stream.shape[0] > 0:
- stream = resampy.resample(x=stream, sr_orig=sample_rate, sr_new=self.sample_rate)
-
- return stream
- except Exception as e:
- logger.error(f"获取预生成音频失败:{e}")
- return None
-
- def generate_audio_via_api(self, text: str, save_to_kelong: bool = True) -> np.ndarray:
- """通过 API 生成音频,返回 16kHz numpy 数组
-
- Args:
- text: 要转换的文本
- save_to_kelong: 是否保存到 knowledge_kelong 目录
- """
- try:
- logger.info(f"🎤 调用 VoxCPM2 API 生成音频(中性模式): {text[:30]}...")
-
- # 中性模式配置:只使用 reference_wav_path,不使用 prompt
- payload = {
- "text": text,
- "reference_wav_path": self.ref_audio_path, # 音色参考
- # 不使用 prompt_wav_path 和 prompt_text(避免情绪模仿)
- "cfg_value": self.cfg_value, # 降低 CFG,减少情绪
- "inference_timesteps": self.inference_timesteps
- }
-
- start_time = time.time()
- resp = requests.post(
- f"{self.api_url}/v1/tts/generate_audio",
- json=payload,
- timeout=120
- )
-
- if resp.status_code != 200:
- raise Exception(f"API 返回错误:HTTP {resp.status_code} - {resp.text}")
-
- api_sample_rate = int(resp.headers.get('X-Sample-Rate', 48000))
- generate_time = time.time() - start_time
-
- logger.info(f"✅ API 音频生成完成,耗时:{generate_time:.2f}秒")
- logger.info(f" API 返回采样率:{api_sample_rate} Hz")
-
- wav_data = BytesIO(resp.content)
- stream, sr = sf.read(wav_data, dtype='float32')
-
- if stream.ndim > 1:
- stream = stream[:, 0]
-
- # 保存原始 48kHz 音频到 knowledge_kelong 目录
- if save_to_kelong:
- try:
- timestamp = int(time.time())
- # 使用文本前 20 个字符作为文件名(去除特殊字符)
- safe_text = re.sub(r'[\\/*?:"<>|]', '', text[:20])
- filename = f"{timestamp}_{safe_text}.wav"
- kelong_path = os.path.join(self.kelong_dir, filename)
-
- # 保存 48kHz 原始音频(重采样之前)
- sf.write(kelong_path, stream, sr, format='WAV')
- logger.info(f"💾 已保存克隆音频:{kelong_path}")
- logger.info(f" 采样率:{sr} Hz,文件大小:{os.path.getsize(kelong_path)} bytes")
- except Exception as e:
- logger.error(f"保存克隆音频失败:{e}")
-
- # 重采样到 16kHz(数字人播放需要)
- if sr != self.sample_rate:
- logger.info(f" 重采样:{sr} Hz → {self.sample_rate} Hz")
- resample_start = time.time()
- # 使用高质量重采样滤波器,避免滋啦声和高频噪声
- stream = resampy.resample(
- x=stream,
- sr_orig=sr,
- sr_new=self.sample_rate,
- filter='kaiser_best' # 使用 Kaiser 最佳滤波器
- )
- logger.info(f" 重采样完成,耗时:{time.time()-resample_start:.2f}秒")
-
- # 添加淡入淡出处理,避免首尾爆音和咔嗒声
- fade_duration = 0.015 # 15ms 淡入淡出
- fade_samples = int(fade_duration * self.sample_rate)
- if len(stream) > 2 * fade_samples:
- # 创建淡入淡出曲线(使用余弦曲线更平滑)
- fade_in = (1.0 - np.cos(np.linspace(0, np.pi, fade_samples))) / 2.0
- fade_out = (1.0 - np.cos(np.linspace(np.pi, 0, fade_samples))) / 2.0
-
- # 应用淡入淡出
- stream[:fade_samples] *= fade_in
- stream[-fade_samples:] *= fade_out
- logger.info(f" 已应用淡入淡出处理({fade_duration*1000:.0f}ms)")
-
- logger.info(f" 音频采样点数:{stream.shape[0]}")
- logger.info(f" 音频时长:{stream.shape[0] / self.sample_rate:.2f}s")
-
- return stream
-
- except requests.exceptions.Timeout:
- logger.error("❌ API 请求超时(120秒)")
- raise Exception("语音生成超时")
- except Exception as e:
- logger.error(f"❌ API 调用失败:{e}")
- raise
-
- def _play_audio_stream(self, stream: np.ndarray, text: str, textevent: dict) -> bool:
- """播放音频流,返回是否完整播放"""
- streamlen = stream.shape[0]
- idx = 0
- total_chunks = streamlen // self.chunk
- chunk_count = 0
-
- logger.info(f" 📦 音频流总帧数:{streamlen}, chunk 大小:{self.chunk}, 预计发送 {total_chunks} 个 chunk")
- logger.info(f" ⏱️ 预计播放时长:{total_chunks * 0.02:.2f}秒")
-
- start_time = time.time()
- expected_frame_time = 0.0
- frame_duration = 0.02
-
- while streamlen >= self.chunk and self.state == State.RUNNING:
- if self.interrupt_flag.is_set() or not self.high_priority_queue.empty():
- logger.info(" ⚠️ 播放过程中发现高优先级消息,中断播放")
- return False
-
- eventpoint = {}
- streamlen -= self.chunk
- chunk_count += 1
- if idx == 0:
- eventpoint = {'status': 'start', 'text': text}
- eventpoint.update(**textevent)
- logger.info(f" ▶️ 开始播放第 {chunk_count}/{total_chunks} 个 chunk (START)")
- elif streamlen < self.chunk:
- eventpoint = {'status': 'end', 'text': text}
- eventpoint.update(**textevent)
- logger.info(f" ◀️ 开始播放第 {chunk_count}/{total_chunks} 个 chunk (END)")
- else:
- if chunk_count % 10 == 1:
- logger.info(f" ➡️ 播放第 {chunk_count}/{total_chunks} 个 chunk")
-
- if self.state != State.RUNNING:
- logger.info(f" ⚠️ 状态不是 RUNNING,停止播放")
- return False
-
- if self.interrupt_flag.is_set() or not self.high_priority_queue.empty():
- logger.info(" ⚠️ 发送音频帧前发现高优先级消息,中断播放")
- return False
-
- self.parent.put_audio_frame(stream[idx:idx + self.chunk], eventpoint)
- idx += self.chunk
-
- expected_frame_time += frame_duration
- elapsed = time.time() - start_time
- delay = expected_frame_time - elapsed
-
- if delay > 0.005:
- sleep_time = min(delay, 0.04)
- time.sleep(sleep_time)
- elif delay < -0.1:
- logger.warning(f" ⚠️ 播放落后 {abs(delay):.3f}s,跳过一些帧以追赶进度")
-
- actual_duration = time.time() - start_time
- logger.info(f" ✅ 所有 {chunk_count} 个 chunk 发送完成,实际耗时:{actual_duration:.2f}s (预期:{chunk_count * 0.02:.2f}s)")
- return True
-
- def put_msg_txt(self, msg: str, datainfo: dict = {}):
- """将文本消息放入队列"""
- if len(msg) > 0:
- self.msgqueue.put((msg, datainfo))
-
- def put_high_priority_msg(self, msg: str, datainfo: dict = {}):
- """添加高优先级消息"""
- if len(msg) > 0:
- self.high_priority_queue.put((msg, datainfo))
-
- def put_image_description(self, msg: str, datainfo: dict = {}):
- """添加图像描述消息"""
- if len(msg) > 0:
- logger.info(f"收到图像描述消息,长度:{len(msg)}字")
- self.image_description_queue.put((msg, datainfo))
-
- def render(self, quit_event):
- """启动 TTS 处理线程"""
- process_thread = Thread(target=self.process_tts, args=(quit_event,))
- process_thread.start()
-
- def process_tts(self, quit_event):
- """处理 TTS 消息队列"""
- was_playing_pre_gen = False
-
- while not quit_event.is_set():
- try:
- msg = None
-
- # 优先级 1: 用户问答
- if not self.high_priority_queue.empty():
- msg = self.high_priority_queue.get_nowait()
- logger.info("★★★★★ 处理用户问答(最高优先级) ★★★★★")
- self.state = State.RUNNING
-
- if msg and len(msg) >= 2 and isinstance(msg[0], str) and msg[0].strip():
- self.txt_to_audio(msg)
-
- if was_playing_pre_gen:
- logger.info("用户问答播放完成,恢复播放预生成音频")
- continue
-
- # 优先级 2: Image_Analysis 新音频
- check_start_time = time.time()
- newest_audio = self._get_newest_image_analysis_audio()
- if newest_audio is not None:
- file_path, file_timestamp, filename = newest_audio
- detect_time = time.time()
-
- logger.info("⏳ 等待文件写入完成...")
- wait_start = time.time()
- last_size = 0
- stable_count = 0
- max_wait = 10
-
- while (time.time() - wait_start) < max_wait:
- try:
- current_size = os.path.getsize(file_path)
- if current_size == last_size and current_size > 0:
- stable_count += 1
- if stable_count >= 3:
- wait_end = time.time()
- logger.info(f"✅ 文件写入完成,等待耗时:{(wait_end - wait_start)*1000:.0f}ms,文件大小:{current_size} bytes")
- break
- else:
- stable_count = 0
- last_size = current_size
- time.sleep(0.15)
- except:
- time.sleep(0.15)
-
- time_since_creation = detect_time - file_timestamp
-
- logger.info("★★★ 发现新的图片分析音频,立即播放 ★★★")
- logger.info(f" 文件名:{filename}")
- logger.info(f" ⏱️ 文件生成到检测耗时:{time_since_creation:.3f}秒")
-
- was_playing_pre_gen = True
- play_start_time = time.time()
- self._play_image_analysis_audio()
- play_end_time = time.time()
-
- logger.info("=" * 80)
- logger.info("⏱️⏱️⏱️ 【图片分析音频时间统计】 ⏱️⏱️⏱️")
- logger.info(f" 📊 从文件生成到开始播放:{play_start_time - file_timestamp:.3f}秒")
- logger.info(f" 📊 实际播放时长:{play_end_time - play_start_time:.3f}秒")
- logger.info("=" * 80)
-
- logger.info("图片描述播放完成,将继续播放预生成音频")
- continue
-
- # 优先级 3: 循环播放预生成音频
- pre_gen_stream = self._get_next_pre_gen_audio()
- if pre_gen_stream is not None:
- logger.info(f"★ 循环播放预生成音频 #{self.pre_gen_index}")
- eventpoint = {'status': 'start', 'text': '预生成介绍'}
- was_playing_pre_gen = True
- self._play_audio_stream(pre_gen_stream, "[预生成介绍]", eventpoint)
- continue
-
- was_playing_pre_gen = False
- try:
- msg = self.msgqueue.get(block=True, timeout=0.5)
- except queue.Empty:
- continue
-
- if msg and len(msg) >= 2 and isinstance(msg[0], str) and not msg[0].strip():
- continue
-
- self.current_msg = msg
- self.current_msg_progress = 0
-
- if self.state == State.RUNNING:
- self.txt_to_audio(msg)
-
- self.current_msg = None
- self.current_msg_progress = 0
-
- except Exception as e:
- logger.error(f"process_tts 错误:{e}")
- continue
-
- logger.info('voxcpm2_api_tts thread stop')
-
- def txt_to_audio(self, msg: tuple[str, dict]):
- """将文本转换为音频(通过 API)"""
- text, textevent = msg
- t = time.time()
-
- if self.state != State.RUNNING:
- return
-
- if not text.strip():
- return
-
- self.reset_interrupt_flag()
-
- if not self.high_priority_queue.empty():
- logger.info("发现高优先级消息,跳过当前普通消息处理")
- return
-
- try:
- # 首先尝试使用预生成音频
- pre_gen_stream = self._get_pre_generated_audio(text)
-
- if pre_gen_stream is not None:
- logger.info(f"使用预生成音频: {text[:30]}...")
-
- if self.state != State.RUNNING:
- return
-
- if self.interrupt_flag.is_set() or not self.high_priority_queue.empty():
- return
-
- self._play_audio_stream(pre_gen_stream, text, textevent)
- logger.info(f'-------预生成音频播放完成,耗时:{time.time()-t:.4f}s')
- return
-
- # 调用 API 生成音频
- logger.info(f"调用 API 生成音频: {text[:30]}...")
-
- stream = self.generate_audio_via_api(text)
-
- if self.state != State.RUNNING:
- return
-
- if self.interrupt_flag.is_set() or not self.high_priority_queue.empty():
- return
-
- self._play_audio_stream(stream, text, textevent)
-
- except Exception as e:
- logger.error(f"VoxCPM2 API TTS 失败:{e}")
-
- def _play_image_analysis_audio(self):
- """播放图片分析音频"""
- result = self._get_newest_image_analysis_audio()
-
- if result is None:
- return False
-
- file_path, file_timestamp, filename = result
-
- try:
- logger.info(f"🎵 播放图片分析音频:{filename}")
- logger.info(f" 音频文件路径:{file_path}")
-
- stream, sample_rate = sf.read(file_path, dtype='float32')
-
- logger.info(f" 音频原始信息:sample_rate={sample_rate}, shape={stream.shape}")
-
- if stream.ndim > 1:
- stream = stream[:, 0]
-
- if sample_rate != self.sample_rate and stream.shape[0] > 0:
- stream = resampy.resample(x=stream, sr_orig=sample_rate, sr_new=self.sample_rate)
-
- logger.info(f" 处理后音频信息:shape={stream.shape}, chunks={stream.shape[0]//self.chunk}")
-
- eventpoint = {'status': 'start', 'text': '图片描述'}
- completed = self._play_audio_stream(stream, f"[图片描述] {filename}", eventpoint)
-
- self.played_audio_files.add(filename)
-
- if completed:
- logger.info(f"✅ 图片分析音频播放完成:{filename}")
- else:
- logger.info(f"⚠️ 图片分析音频被中断:{filename}")
- return completed
-
- except Exception as e:
- logger.error(f"播放图片分析音频失败:{e}", exc_info=True)
- return False
|