from __future__ import annotations import time import numpy as np import soundfile as sf import resampy import asyncio import os import queue import json import hashlib import re from queue import Queue from io import BytesIO from threading import Thread, Event from enum import Enum from typing import TYPE_CHECKING if TYPE_CHECKING: from basereal import BaseReal from logger import logger class State(Enum): RUNNING = 0 PAUSE = 1 class Qwen3TTS: def __init__(self, opt, parent: BaseReal): self.opt = opt self.parent = parent self.fps = opt.fps # 20 ms per frame self.sample_rate = 16000 self.chunk = self.sample_rate // self.fps # 320 samples per chunk (20ms * 16000 / 1000) self.input_stream = BytesIO() self.msgqueue = Queue() self.high_priority_queue = Queue() self.image_description_queue = Queue() # 超优先级队列,用于图像描述 self.is_playing_image_description = False # 标记是否正在播放图像描述 self.state = State.RUNNING self.interrupted_messages = [] self.current_msg = None self.current_msg_progress = 0 self.interrupt_flag = Event() # VoxCPM2 配置(替代 Qwen3-TTS API) self.voxcpm2_model_path = getattr(opt, 'VOXCPM2_MODEL_PATH', 'VoxCPM2') self.ref_audio_path = getattr(opt, 'VOXCPM2_REF_WAV', 'voice_output.wav') self.ref_text = getattr(opt, 'VOXCPM2_REF_TEXT', '你好,买水果,卖水果,新鲜的水果。') self.cfg_value = getattr(opt, 'CFG_VALUE', 2.5) self.inference_timesteps = getattr(opt, 'INFERENCE_TIMESTEPS', 8) # 加载 VoxCPM2 模型 self.voxcpm2_model = None self._load_voxcpm2_model() # 预生成音频配置 self.pre_gen_dir = "./data/pre_generated_tts" self.audio_map = self._load_audio_map() self.use_pre_gen = len(self.audio_map) > 0 # Image_Analysis 音频目录配置(修改为 wav/wav 子目录) self.image_analysis_dir = "/mnt/nvme1data/Digital_Human/Image_Analysis/wav/wav" self.played_audio_files = set() # 已播放的音频文件集合 # 预生成音频循环播放配置 self.pre_gen_index = 0 # 当前播放索引 self.is_playing_pre_gen_loop = False # 是否在循环播放预生成音频 logger.info(f"Qwen3TTS 初始化完成:") logger.info(f" VoxCPM2 模型路径:{self.voxcpm2_model_path}") logger.info(f" 参考音频:{self.ref_audio_path}") logger.info(f" 参考文本:{self.ref_text}") logger.info(f" CFG 值:{self.cfg_value}") logger.info(f" 推理步数:{self.inference_timesteps}") logger.info(f" 预生成音频:{len(self.audio_map)} 条") logger.info(f" 是否使用预生成:{self.use_pre_gen}") logger.info(f" Image_Analysis 目录:{self.image_analysis_dir}") logger.info(f" 预生成循环播放:启用") def _load_voxcpm2_model(self): """加载 VoxCPM2 模型(单例模式)""" try: import torch from voxcpm import VoxCPM # 禁用 TorchDynamo,避免 scaled_dot_product_attention 兼容性错误 torch._dynamo.config.suppress_errors = True torch.compiler.disable() logger.info(f"🔄 加载 VoxCPM2 模型: {self.voxcpm2_model_path}") logger.info(" 禁用 TorchDynamo 以避免兼容性问题") # 清理显存 if torch.cuda.is_available(): torch.cuda.empty_cache() logger.info(f" 清理显存完成") # 加载模型 self.voxcpm2_model = VoxCPM.from_pretrained( self.voxcpm2_model_path, load_denoiser=False, optimize=False ) logger.info("✅ VoxCPM2 模型加载成功") except Exception as e: logger.error(f"❌ VoxCPM2 模型加载失败: {e}") logger.error(" 请检查模型路径是否正确,以及 voxcpm 包是否已安装") raise def reset_interrupt_flag(self): """重置打断标志""" self.interrupt_flag.clear() def set_interrupt_flag(self): """设置打断标志""" self.interrupt_flag.set() def flush_talk(self): """停止当前播放并清空待处理的消息队列""" with self.msgqueue.mutex: remaining_msgs = list(self.msgqueue.queue) if remaining_msgs: self.interrupted_messages.extend(remaining_msgs) self.msgqueue.queue.clear() if self.current_msg: self.interrupted_messages.append(self.current_msg) self.current_msg = None self.state = State.PAUSE if hasattr(self, 'input_stream') and hasattr(self.input_stream, 'seek') and hasattr(self.input_stream, 'truncate'): self.input_stream.seek(0) self.input_stream.truncate() def resume_interrupted(self): """恢复播放被中断的消息""" if self.interrupted_messages: with self.msgqueue.mutex: for msg in self.interrupted_messages: self.msgqueue.put(msg) self.interrupted_messages.clear() self.state = State.RUNNING return True return False def _load_audio_map(self) -> dict: """加载预生成音频映射表""" map_path = os.path.join(self.pre_gen_dir, "audio_map.json") if os.path.exists(map_path): try: with open(map_path, 'r', encoding='utf-8') as f: return json.load(f) except Exception as e: logger.warning(f"加载预生成音频映射表失败: {e}") return {} def _get_pre_generated_audio(self, text: str): """获取预生成的音频数据,如果不存在返回 None""" if not self.use_pre_gen: return None text_hash = hashlib.md5(text.encode('utf-8')).hexdigest()[:16] audio_path = os.path.join(self.pre_gen_dir, f"{text_hash}.wav") if not os.path.exists(audio_path): return None try: # 读取音频文件 stream, sample_rate = sf.read(audio_path, dtype='float32') # 转为单声道 if stream.ndim > 1: stream = stream[:, 0] # 重采样到 16kHz if sample_rate != self.sample_rate and stream.shape[0] > 0: stream = resampy.resample(x=stream, sr_orig=sample_rate, sr_new=self.sample_rate) return stream except Exception as e: logger.warning(f"读取预生成音频失败:{e}") return None def _get_newest_image_analysis_audio(self): """获取最新的未播放的图片分析音频文件""" if not os.path.exists(self.image_analysis_dir): return None # 获取所有 wav 文件并按修改时间排序 try: audio_files = [] all_files = os.listdir(self.image_analysis_dir) logger.info(f"🔍 Image_Analysis 目录共有 {len(all_files)} 个文件") logger.info(f" 已播放文件数:{len(self.played_audio_files)}") for f in all_files: if f.endswith('.wav'): file_path = os.path.join(self.image_analysis_dir, f) # 【修改】尝试从文件名中提取时间戳(格式:hifi_clone_YYYYMMDD_HHMMSS.wav) match = re.search(r'hifi_clone_(\d{8}_\d{6})', f) if match: # 从文件名解析时间 time_str = match.group(1) file_timestamp = time.mktime(time.strptime(time_str, "%Y%m%d_%H%M%S")) logger.info(f" 📁 {f} - 文件名时间:{time.strftime('%H:%M:%S', time.localtime(file_timestamp))}, 已播放:{f in self.played_audio_files}") else: # 如果文件名没有时间戳,使用文件修改时间 file_timestamp = os.path.getmtime(file_path) logger.info(f" 📁 {f} - 修改时间:{time.strftime('%H:%M:%S', time.localtime(file_timestamp))}, 已播放:{f in self.played_audio_files}") is_played = f in self.played_audio_files if not is_played: audio_files.append((file_path, file_timestamp, f)) if not audio_files: logger.info(" ⚠️ 没有未播放的音频文件") return None # 按时间戳排序,最新的在前 audio_files.sort(key=lambda x: x[1], reverse=True) newest_file = audio_files[0][2] logger.info(f" ✅ 发现最新未播放文件:{newest_file}") # 返回最新的文件 return audio_files[0] except Exception as e: logger.error(f"获取图片分析音频失败:{e}", exc_info=True) return None def _get_next_pre_gen_audio(self): """获取下一条预生成音频(循环播放)""" if not self.use_pre_gen or len(self.audio_map) == 0: return None try: # 获取所有预生成音频文件 audio_files = sorted([ f for f in os.listdir(self.pre_gen_dir) if f.endswith('.wav') ]) if not audio_files: return None # 循环索引 audio_file = audio_files[self.pre_gen_index % len(audio_files)] self.pre_gen_index += 1 # 读取音频 audio_path = os.path.join(self.pre_gen_dir, audio_file) stream, sample_rate = sf.read(audio_path, dtype='float32') # 转为单声道 if stream.ndim > 1: stream = stream[:, 0] # 重采样到 16kHz if sample_rate != self.sample_rate and stream.shape[0] > 0: stream = resampy.resample(x=stream, sr_orig=sample_rate, sr_new=self.sample_rate) return stream except Exception as e: logger.error(f"获取预生成音频失败:{e}") return None def _play_image_analysis_audio(self): """播放图片分析音频(次高优先级)""" result = self._get_newest_image_analysis_audio() if result is None: return False file_path, file_timestamp, filename = result try: read_start = time.time() # 【时间戳】开始读取文件 logger.info(f"🎵 播放图片分析音频:{filename}") logger.info(f" 音频文件路径:{file_path}") logger.info(f" 音频修改时间:{file_timestamp}") # 读取音频文件 stream, sample_rate = sf.read(file_path, dtype='float32') read_end = time.time() # 【时间戳】读取完成 logger.info(f" 音频原始信息:sample_rate={sample_rate}, shape={stream.shape}, duration={stream.shape[0]/sample_rate:.2f}s") logger.info(f" ⏱️ 文件读取耗时:{(read_end - read_start)*1000:.1f}毫秒") # 转为单声道 if stream.ndim > 1: stream = stream[:, 0] logger.info(f" 转换为单声道") # 重采样到 16kHz resample_start = time.time() # 【时间戳】开始重采样 if sample_rate != self.sample_rate and stream.shape[0] > 0: stream = resampy.resample(x=stream, sr_orig=sample_rate, sr_new=self.sample_rate) logger.info(f" 重采样到 16kHz") resample_end = time.time() # 【时间戳】重采样完成 logger.info(f" ⏱️ 重采样耗时:{(resample_end - resample_start)*1000:.1f}毫秒") logger.info(f" 处理后音频信息:shape={stream.shape}, chunks={stream.shape[0]//self.chunk}") # 播放音频流 eventpoint = {'status': 'start', 'text': '图片描述'} logger.info(f"▶️ 开始调用 _play_audio_stream") completed = self._play_audio_stream(stream, f"[图片描述] {filename}", eventpoint) logger.info(f"◀️ _play_audio_stream 返回,completed={completed}") # 标记为已播放 self.played_audio_files.add(filename) if completed: logger.info(f"✅ 图片分析音频播放完成:{filename}") else: logger.info(f"⚠️ 图片分析音频被中断:{filename}") return completed except Exception as e: logger.error(f"播放图片分析音频失败:{e}", exc_info=True) return False def put_msg_txt(self, msg: str, datainfo: dict = {}): """将文本消息放入队列""" if len(msg) > 0: if len(msg) > 100: sentences = re.split(r'([。!?.!?])', msg) parts = [] for i in range(0, len(sentences) - 1, 2): sentence = sentences[i] punctuation = sentences[i + 1] if i + 1 < len(sentences) else '' if sentence.strip(): parts.append(sentence.strip() + punctuation) if len(parts) > 1: for part in parts: if part.strip(): self.msgqueue.put((part, datainfo)) return self.msgqueue.put((msg, datainfo)) def put_high_priority_msg(self, msg: str, datainfo: dict = {}): """添加高优先级消息""" if len(msg) > 0: if len(msg) > 100: sentences = re.split(r'([。!?.!?])', msg) parts = [] for i in range(0, len(sentences) - 1, 2): sentence = sentences[i] punctuation = sentences[i + 1] if i + 1 < len(sentences) else '' if sentence.strip(): parts.append(sentence.strip() + punctuation) if len(parts) > 1: for part in parts: if part.strip(): self.high_priority_queue.put((part, datainfo)) return self.high_priority_queue.put((msg, datainfo)) def put_image_description(self, msg: str, datainfo: dict = {}): """添加图像描述消息,具有最高优先级,会立即中断当前播放""" if len(msg) > 0: logger.info(f"收到图像描述消息,长度:{len(msg)}字") # 图像描述通常是短句,不需要分割 self.image_description_queue.put((msg, datainfo)) def render(self, quit_event): """启动 TTS 处理线程""" process_thread = Thread(target=self.process_tts, args=(quit_event,)) process_thread.start() def process_tts(self, quit_event): """处理 TTS 消息队列""" # 标记是否正在播放预生成音频,用于支持中断后恢复 was_playing_pre_gen = False while not quit_event.is_set(): try: msg = None # ★★★★★ 优先级 1: 用户问答(最高优先级,随时中断任何播放) if not self.high_priority_queue.empty(): msg = self.high_priority_queue.get_nowait() logger.info("★★★★★ 处理用户问答(最高优先级) ★★★★★") self.state = State.RUNNING # 播放用户问答 if msg and len(msg) >= 2 and isinstance(msg[0], str) and msg[0].strip(): self.txt_to_audio(msg) # 用户问答播放完成后,如果有预生成音频,继续播放 if was_playing_pre_gen: logger.info("用户问答播放完成,恢复播放预生成音频") continue # ★★★ 优先级 2: Image_Analysis 新音频(次高优先级,可中断预生成音频) check_start_time = time.time() # 【时间戳】开始检查的时间 newest_audio = self._get_newest_image_analysis_audio() if newest_audio is not None: file_path, file_timestamp, filename = newest_audio detect_time = time.time() # 【时间戳】检测到文件的时间 # 【新增】等待文件写入完成 # 检查文件大小是否稳定(500ms内没有变化) logger.info(f"⏳ 等待文件写入完成...") wait_start = time.time() last_size = 0 stable_count = 0 max_wait = 10 # 最多等待10秒 while (time.time() - wait_start) < max_wait: try: current_size = os.path.getsize(file_path) if current_size == last_size and current_size > 0: stable_count += 1 if stable_count >= 3: # 连续3次大小不变,认为文件写入完成 wait_end = time.time() logger.info(f"✅ 文件写入完成,等待耗时:{(wait_end - wait_start)*1000:.0f}ms,文件大小:{current_size} bytes") break else: stable_count = 0 last_size = current_size time.sleep(0.15) # 每150ms检查一次 except: time.sleep(0.15) time_since_creation = detect_time - file_timestamp # 【计算】文件生成到检测到的时间差 logger.info("★★★ 发现新的图片分析音频,立即播放 ★★★") logger.info(f" 文件名:{filename}") logger.info(f" 文件创建时间(从文件名):{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(file_timestamp))}") logger.info(f" 检测到文件时间:{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(detect_time))}.{int(detect_time % 1 * 1000):03d}") logger.info(f" ⏱️ 文件生成到检测耗时:{time_since_creation:.3f}秒") logger.info(f" 检查开始时间:{time.strftime('%H:%M:%S', time.localtime(check_start_time))}.{int(check_start_time % 1 * 1000):03d}") logger.info(f" ⏱️ 检查耗时:{(detect_time - check_start_time)*1000:.1f}毫秒") was_playing_pre_gen = True # 标记之前正在播放预生成音频 play_start_time = time.time() # 【时间戳】开始播放准备的时间 self._play_image_analysis_audio() play_end_time = time.time() # 【时间戳】播放完成的时间 logger.info("=" * 80) logger.info("⏱️⏱️⏱️ 【图片分析音频时间统计】 ⏱️⏱️⏱️") logger.info(f" 文件创建时间:{time.strftime('%H:%M:%S', time.localtime(file_timestamp))}") logger.info(f" 开始播放时间:{time.strftime('%H:%M:%S', time.localtime(play_start_time))}.{int(play_start_time % 1 * 1000):03d}") logger.info(f" 播放完成时间:{time.strftime('%H:%M:%S', time.localtime(play_end_time))}.{int(play_end_time % 1 * 1000):03d}") logger.info(f" 📊 从文件生成到开始播放:{play_start_time - file_timestamp:.3f}秒") logger.info(f" 📊 播放处理耗时(读取+转换):{play_start_time - detect_time:.3f}秒") logger.info(f" 📊 实际播放时长:{play_end_time - play_start_time:.3f}秒") logger.info(f" 📊 总耗时(生成到播放完):{play_end_time - file_timestamp:.3f}秒") logger.info("=" * 80) logger.info("图片描述播放完成,将继续播放预生成音频") continue # ★ 优先级 3: 循环播放预生成音频(基础优先级,默认状态) pre_gen_stream = self._get_next_pre_gen_audio() if pre_gen_stream is not None: logger.info(f"★ 循环播放预生成音频 #{self.pre_gen_index}") eventpoint = {'status': 'start', 'text': '预生成介绍'} was_playing_pre_gen = True self._play_audio_stream(pre_gen_stream, "[预生成介绍]", eventpoint) continue # 没有预生成音频时,重置标记并等待普通消息 was_playing_pre_gen = False try: msg = self.msgqueue.get(block=True, timeout=0.5) except queue.Empty: continue if msg and len(msg) >= 2 and isinstance(msg[0], str) and not msg[0].strip(): continue self.current_msg = msg self.current_msg_progress = 0 # 处理普通消息 if self.state == State.RUNNING: self.txt_to_audio(msg) self.current_msg = None self.current_msg_progress = 0 except Exception as e: logger.error(f"process_tts 错误:{e}") continue logger.info('qwen3tts thread stop') def _play_audio_stream(self, stream: np.ndarray, text: str, textevent: dict) -> bool: """播放音频流,返回是否完整播放(带速度控制)""" streamlen = stream.shape[0] idx = 0 total_chunks = streamlen // self.chunk chunk_count = 0 logger.info(f" 📦 音频流总帧数:{streamlen}, chunk 大小:{self.chunk}, 预计发送 {total_chunks} 个 chunk") logger.info(f" ⏱️ 预计播放时长:{total_chunks * 0.02:.2f}秒") start_time = time.time() expected_frame_time = 0.0 # 期望的帧时间戳 frame_duration = 0.02 # 20ms per frame while streamlen >= self.chunk and self.state == State.RUNNING: if self.interrupt_flag.is_set() or not self.high_priority_queue.empty(): logger.info(" ⚠️ 播放过程中发现高优先级消息,中断播放") return False eventpoint = {} streamlen -= self.chunk chunk_count += 1 if idx == 0: eventpoint = {'status': 'start', 'text': text} eventpoint.update(**textevent) logger.info(f" ▶️ 开始播放第 {chunk_count}/{total_chunks} 个 chunk (START)") elif streamlen < self.chunk: eventpoint = {'status': 'end', 'text': text} eventpoint.update(**textevent) logger.info(f" ◀️ 开始播放第 {chunk_count}/{total_chunks} 个 chunk (END)") else: # 中间的 chunk,每 10 个记录一次 if chunk_count % 10 == 1: logger.info(f" ➡️ 播放第 {chunk_count}/{total_chunks} 个 chunk") if self.state != State.RUNNING: logger.info(f" ⚠️ 状态不是 RUNNING,停止播放") return False if self.interrupt_flag.is_set() or not self.high_priority_queue.empty(): logger.info(" ⚠️ 发送音频帧前发现高优先级消息,中断播放") return False # 发送音频帧 self.parent.put_audio_frame(stream[idx:idx + self.chunk], eventpoint) idx += self.chunk # 速度控制:等待到正确的播放时间点 expected_frame_time += frame_duration elapsed = time.time() - start_time delay = expected_frame_time - elapsed if delay > 0.005: # 如果延迟超过 5ms,则等待 # 计算需要等待的时间 sleep_time = min(delay, 0.04) # 最多等待 40ms time.sleep(sleep_time) elif delay < -0.1: # 如果落后超过 100ms,跳过一些帧追赶 logger.warning(f" ⚠️ 播放落后 {abs(delay):.3f}s,跳过一些帧以追赶进度") # 不重置 expected_frame_time,让下一帧继续按正常节奏发送 actual_duration = time.time() - start_time logger.info(f" ✅ 所有 {chunk_count} 个 chunk 发送完成,实际耗时:{actual_duration:.2f}s (预期:{chunk_count * 0.02:.2f}s)") return True def txt_to_audio(self, msg: tuple[str, dict]): """将文本转换为音频(优先使用预生成音频)""" text, textevent = msg t = time.time() if self.state != State.RUNNING: return if not text.strip(): return self.reset_interrupt_flag() if not self.high_priority_queue.empty(): logger.info("发现高优先级消息,跳过当前普通消息处理") return try: # 首先尝试使用预生成音频 pre_gen_stream = self._get_pre_generated_audio(text) if pre_gen_stream is not None: # 使用预生成音频 logger.info(f"使用预生成音频: {text[:30]}...") if self.state != State.RUNNING: return if self.interrupt_flag.is_set() or not self.high_priority_queue.empty(): return self._play_audio_stream(pre_gen_stream, text, textevent) logger.info(f'-------预生成音频播放完成,耗时:{time.time()-t:.4f}s') return # 没有预生成音频,调用 API 生成 logger.info(f"调用 API 生成音频: {text[:30]}...") loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) task = loop.create_task(self.__generate_audio(text)) while not task.done(): if self.interrupt_flag.is_set() or not self.high_priority_queue.empty(): task.cancel() logger.info("TTS 请求被高优先级消息中断") break loop.run_until_complete(asyncio.sleep(0.05)) try: loop.run_until_complete(task) except asyncio.CancelledError: logger.info("TTS 请求被中断") self.input_stream.seek(0) self.input_stream.truncate() return logger.info(f'-------qwen3 tts time:{time.time()-t:.4f}s') if self.input_stream.getbuffer().nbytes <= 0: logger.error('qwen3 tts err!!!!!') return self.input_stream.seek(0) stream = self.__create_bytes_stream(self.input_stream) if self.state != State.RUNNING: self.input_stream.seek(0) self.input_stream.truncate() return if self.interrupt_flag.is_set() or not self.high_priority_queue.empty(): self.input_stream.seek(0) self.input_stream.truncate() return self._play_audio_stream(stream, text, textevent) self.input_stream.seek(0) self.input_stream.truncate() except Exception as e: logger.error(f"qwen3tts txt_to_audio error: {e}") async def __generate_audio(self, text: str): """使用 VoxCPM2 本地模型生成音频""" try: import torch import numpy as np logger.info(f"🎤 VoxCPM2 开始生成音频: {text[:50]}...") # 检查模型是否加载成功 if self.voxcpm2_model is None: raise Exception("VoxCPM2 模型未加载") # 检查参考音频是否存在 if not os.path.exists(self.ref_audio_path): raise Exception(f"参考音频文件不存在:{self.ref_audio_path}") # 使用 VoxCPM2 生成音频(纯克隆模式) generate_start = time.time() wav = self.voxcpm2_model.generate( text=text, reference_wav_path=self.ref_audio_path, retry_badcase=False, # 禁用 Badcase 重试,提升速度 inference_timesteps=self.inference_timesteps, # 推理步数 cfg_value=self.cfg_value # CFG 值 ) generate_time = time.time() - generate_start logger.info(f"✅ VoxCPM2 音频生成完成,耗时:{generate_time:.2f}秒") logger.info(f" 音频采样点数:{wav.shape[0]}") # VoxCPM2 输出是 48kHz,需要重采样到 16kHz if len(wav) > 0: # 重采样到 16kHz resample_start = time.time() wav_16k = resampy.resample(x=wav, sr_orig=48000, sr_new=self.sample_rate) resample_time = time.time() - resample_start logger.info(f" 重采样到 16kHz 完成,耗时:{resample_time:.2f}秒") # 转换为 float32 wav_16k = wav_16k.astype(np.float32) # 写入输入流 # 先将音频保存为 WAV 格式,然后读取到 BytesIO import io wav_bytes = io.BytesIO() sf.write(wav_bytes, wav_16k, self.sample_rate, format='WAV') wav_bytes.seek(0) # 读取到 input_stream wav_data, _ = sf.read(wav_bytes, dtype='float32') self.input_stream.write(wav_data.tobytes()) logger.info(f" 音频数据写入 input_stream,大小:{self.input_stream.getbuffer().nbytes} bytes") else: logger.error("❌ VoxCPM2 生成的音频为空") raise Exception("生成的音频为空") except Exception as e: logger.error(f"❌ VoxCPM2 音频生成失败:{e}") raise def __create_bytes_stream(self, byte_stream): """将字节流转换为音频流""" stream, sample_rate = sf.read(byte_stream) logger.info(f'[INFO]tts audio stream {sample_rate}: {stream.shape}') stream = stream.astype(np.float32) if stream.ndim > 1: logger.info(f'[WARN] audio has {stream.shape[1]} channels, only use the first.') stream = stream[:, 0] if sample_rate != self.sample_rate and stream.shape[0] > 0: logger.info(f'[WARN] audio sample rate is {sample_rate}, resampling into {self.sample_rate}.') stream = resampy.resample(x=stream, sr_orig=sample_rate, sr_new=self.sample_rate) return stream