############################################################################### # Copyright (C) 2024 LiveTalking@lipku https://github.com/lipku/LiveTalking # email: lipku@foxmail.com # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. ############################################################################### from __future__ import annotations import time import numpy as np import soundfile as sf import resampy import asyncio # import edge_tts # 已移除,改为通过本地 API 调用 import os import hmac import hashlib import base64 import json import uuid import threading from typing import Iterator import requests import queue from queue import Queue from io import BytesIO import copy,websockets,gzip import azure.cognitiveservices.speech as speechsdk from threading import Thread, Event from enum import Enum from typing import TYPE_CHECKING if TYPE_CHECKING: from basereal import BaseReal from logger import logger class State(Enum): RUNNING=0 PAUSE=1 class BaseTTS: def __init__(self, opt, parent:BaseReal): self.opt=opt self.parent = parent self.fps = opt.fps # 20 ms per frame self.sample_rate = 16000 self.chunk = self.sample_rate // self.fps # 320 samples per chunk (20ms * 16000 / 1000) self.input_stream = BytesIO() self.msgqueue = Queue() self.high_priority_queue = Queue() # 高优先级队列,用于用户问题 self.state = State.RUNNING # 添加属性来存储被中断的消息 self.interrupted_messages = [] # 存储被中断的消息 # 添加属性来跟踪当前正在处理的消息 self.current_msg = None # 当前正在处理的消息 self.current_msg_progress = 0 # 当前消息的进度(字符位置) self.interrupt_flag = threading.Event() # 添加打断事件 def reset_interrupt_flag(self): """重置打断标志""" self.interrupt_flag.clear() def set_interrupt_flag(self): """设置打断标志""" self.interrupt_flag.set() def _trigger_continue_play(self): """触发自动续播下一条介绍内容""" try: # 检查是否处于介绍播放状态 if (hasattr(self.parent, 'intro_play_state') and self.parent.intro_play_state.get('is_playing', False) and not self.parent.intro_play_state.get('is_paused', False) and not self.parent.intro_play_state.get('is_waiting_next', False)): # 检查是否有高优先级消息在等待,如果有则不触发续播 if not self.high_priority_queue.empty(): logger.info("有高优先级消息等待,跳过自动续播") return # 检查消息队列是否为空,如果不为空说明还有消息在等待,不触发续播 if not self.msgqueue.empty(): logger.info("消息队列不为空,跳过自动续播") return # 触发续播 self.parent._continue_intro_play() else: if hasattr(self.parent, 'intro_play_state') and self.parent.intro_play_state.get('is_waiting_next', False): logger.info("正在等待前一条播放完成,跳过本次触发") except Exception as e: logger.error(f"触发自动续播时出错: {e}") def flush_talk(self): # 停止当前播放并清空待处理的消息队列,但保留当前正在处理的状态 # 这样可以确保打断后不会播放队列中积累的旧消息 with self.msgqueue.mutex: # 使用队列的互斥锁确保线程安全 # 保存队列中的剩余消息到中断列表 remaining_msgs = list(self.msgqueue.queue) if remaining_msgs: self.interrupted_messages.extend(remaining_msgs) self.msgqueue.queue.clear() # 清空队列中等待的消息 # 如果当前有正在处理的消息,也要将其保存到中断列表 if self.current_msg: # 将当前消息加入中断列表 self.interrupted_messages.append(self.current_msg) # 清空当前消息引用,因为已经被中断 self.current_msg = None self.state = State.PAUSE # 清除当前正在处理的音频缓冲区 if hasattr(self, 'input_stream') and hasattr(self.input_stream, 'seek') and hasattr(self.input_stream, 'truncate'): self.input_stream.seek(0) self.input_stream.truncate() def resume_interrupted(self): """恢复播放被中断的消息""" if self.interrupted_messages: # 将被中断的消息重新加入队列 with self.msgqueue.mutex: for msg in self.interrupted_messages: self.msgqueue.put(msg) # 清空中断消息列表 self.interrupted_messages.clear() # 将状态设置为运行,以便继续处理消息 self.state = State.RUNNING return True return False def put_msg_txt(self,msg:str,datainfo:dict={}): if len(msg)>0: # 对于长文本,按句子分割以支持更好的打断功能 if len(msg) > 100: # 如果文本超过100字符,进行分割 import re # 按标点符号分割文本,保留分隔符 sentences = re.split(r'([。!?.!?])', msg) # 将句子和标点符号重新组合 parts = [] for i in range(0, len(sentences)-1, 2): sentence = sentences[i] punctuation = sentences[i+1] if i+1 < len(sentences) else '' if sentence.strip(): parts.append(sentence.strip() + punctuation) # 如果分割后有多个部分,分别放入队列 if len(parts) > 1: for part in parts: if part.strip(): self.msgqueue.put((part, datainfo)) return # 短文本或无法分割的文本直接放入队列 self.msgqueue.put((msg, datainfo)) def put_high_priority_msg(self,msg:str,datainfo:dict={}): """添加高优先级消息,会优先处理""" if len(msg)>0: # 对于长文本,按句子分割以支持更好的打断功能 if len(msg) > 100: # 如果文本超过100字符,进行分割 import re # 按标点符号分割文本,保留分隔符 sentences = re.split(r'([。!?.!?])', msg) # 将句子和标点符号重新组合 parts = [] for i in range(0, len(sentences)-1, 2): sentence = sentences[i] punctuation = sentences[i+1] if i+1 < len(sentences) else '' if sentence.strip(): parts.append(sentence.strip() + punctuation) # 如果分割后有多个部分,分别放入高优先级队列 if len(parts) > 1: for part in parts: if part.strip(): self.high_priority_queue.put((part, datainfo)) return # 短文本或无法分割的文本直接放入高优先级队列 self.high_priority_queue.put((msg, datainfo)) def render(self,quit_event): process_thread = Thread(target=self.process_tts, args=(quit_event,)) process_thread.start() def process_tts(self,quit_event): while not quit_event.is_set(): try: # 检查状态是否为RUNNING,如果不是,则短暂等待后继续 # 优先检查高优先级队列 - 每次循环都检查高优先级队列 msg = None try: # 首先检查高优先级队列,不等待,立即返回 if not self.high_priority_queue.empty(): msg = self.high_priority_queue.get_nowait() # 处理高优先级消息时,确保状态为RUNNING self.state = State.RUNNING logger.info("处理高优先级消息") else: # 如果高优先级队列为空,检查普通队列 if self.state != State.RUNNING and not self.msgqueue.empty(): # 如果队列不为空但状态不是RUNNING,等待一段时间后重试 import time time.sleep(0.1) # 短暂等待 continue msg = self.msgqueue.get(block=True, timeout=0.05) # 使用较短超时时间,以便快速检查高优先级队列 except queue.Empty: # 如果两个队列都没有消息,继续等待 continue # 检查是否是唤醒消息(空消息),如果是则跳过处理,继续循环检查高优先级队列 if msg and len(msg) >= 2 and isinstance(msg[0], str) and not msg[0].strip(): continue # 跳过空消息,继续检查队列 # 记录当前正在处理的消息 self.current_msg = msg self.current_msg_progress = 0 # 重置进度 except queue.Empty: continue # 在处理音频前再次检查状态,如果状态已改变则跳过处理 if self.state == State.RUNNING: # 检查是否有高优先级消息,如果有则优先处理 if not self.high_priority_queue.empty(): logger.info("发现高优先级消息,中断当前普通消息处理") # 保存当前消息到中断队列 if msg: self.interrupted_messages.append(msg) # 处理高优先级消息 high_priority_msg = self.high_priority_queue.get_nowait() self.txt_to_audio(high_priority_msg) # 高优先级消息不触发自动续播 else: # 处理普通消息 self.txt_to_audio(msg) # 注意:自动续播不再在这里触发,而是在 _continue_intro_play 中通过定时器控制 # 消息处理完成后,清空当前消息 self.current_msg = None self.current_msg_progress = 0 logger.info('ttsreal thread stop') def txt_to_audio(self,msg:tuple[str, dict]): pass ########################################################################################### class EdgeTTS(BaseTTS): def txt_to_audio(self,msg:tuple[str, dict]): voicename = self.opt.REF_FILE #"zh-CN-YunxiaNeural" text,textevent = msg t = time.time() # 在开始TTS请求前检查状态,如果状态已改变则跳过 if self.state != State.RUNNING: return # 处理空消息,直接返回 if not text.strip(): return # 重置打断标志,确保高优先级消息能够正常处理 self.reset_interrupt_flag() # 检查是否有高优先级消息,如果有则立即返回,让process_tts处理 if not self.high_priority_queue.empty(): logger.info("发现高优先级消息,跳过当前普通消息处理") return # 使用异步方式处理TTS请求,以便能够响应中断 loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) # 创建一个任务来处理TTS请求 task = loop.create_task(self.__main(voicename, text)) # 定期检查中断标志和高优先级队列 while not task.done(): if self.interrupt_flag.is_set() or not self.high_priority_queue.empty(): task.cancel() logger.info("TTS请求被高优先级消息中断") break loop.run_until_complete(asyncio.sleep(0.05)) # 缩短检查间隔,提高响应速度 # 等待任务完成或取消 try: loop.run_until_complete(task) except asyncio.CancelledError: logger.info("TTS请求被中断") self.input_stream.seek(0) self.input_stream.truncate() return logger.info(f'-------edge tts time:{time.time()-t:.4f}s') if self.input_stream.getbuffer().nbytes<=0: #edgetts err logger.error('edgetts err!!!!!') return self.input_stream.seek(0) stream = self.__create_bytes_stream(self.input_stream) streamlen = stream.shape[0] idx=0 # 在播放前再次检查状态,如果状态已改变则跳过播放 if self.state != State.RUNNING: self.input_stream.seek(0) self.input_stream.truncate() return # 检查打断标志或高优先级队列 if self.interrupt_flag.is_set() or not self.high_priority_queue.empty(): self.input_stream.seek(0) self.input_stream.truncate() return while streamlen >= self.chunk and self.state==State.RUNNING: # 每次循环都检查打断标志和高优先级队列 if self.interrupt_flag.is_set() or not self.high_priority_queue.empty(): logger.info("播放过程中发现高优先级消息,中断播放") break eventpoint={} streamlen -= self.chunk if idx==0: eventpoint={'status':'start','text':text} eventpoint.update(**textevent) #eventpoint={'status':'start','text':text,'msgevent':textevent} elif streamlen0: #skip last frame(not 20ms) # self.queue.put(stream[idx:]) self.input_stream.seek(0) self.input_stream.truncate() def __create_bytes_stream(self,byte_stream): #byte_stream=BytesIO(buffer) stream, sample_rate = sf.read(byte_stream) # [T*sample_rate,] float64 logger.info(f'[INFO]tts audio stream {sample_rate}: {stream.shape}') stream = stream.astype(np.float32) if stream.ndim > 1: logger.info(f'[WARN] audio has {stream.shape[1]} channels, only use the first.') stream = stream[:, 0] if sample_rate != self.sample_rate and stream.shape[0]>0: logger.info(f'[WARN] audio sample rate is {sample_rate}, resampling into {self.sample_rate}.') stream = resampy.resample(x=stream, sr_orig=sample_rate, sr_new=self.sample_rate) return stream async def __main(self,voicename: str, text: str): """通过本地 API 调用 Edge TTS 服务""" try: # 使用同步方式调用 HTTP API(在异步函数中使用线程池) loop = asyncio.get_event_loop() # 准备请求参数 headers = {"Content-Type": "application/json"} data = { "text": text, "voice": voicename, "rate": "+0%", "volume": "+0%", "pitch": "+0Hz" } # 在线程池中执行同步请求,避免阻塞事件循环 def make_request(): resp = requests.post( "http://127.0.0.1:1024/tts", json=data, headers=headers, timeout=30 ) return resp # 使用线程池执行同步请求 resp = await loop.run_in_executor(None, make_request) logger.info(f"Edge TTS API 响应状态码:{resp.status_code}") # 成功返回音频流 if resp.status_code == 200 and str(resp.headers.get("Content-Type", "")).startswith("audio/"): # 将音频数据写入输入流 self.input_stream.write(resp.content) logger.info(f"✅ 成功从 API 获取音频数据,大小:{len(resp.content)} bytes") else: # 非 200 时解析错误并抛出 try: detail = resp.json() except Exception: detail = resp.text logger.error(f"⚠️ TTS API 接口返回错误:{detail}") resp.raise_for_status() except requests.exceptions.HTTPError as e: logger.error(f"HTTP 错误:{e}") except requests.exceptions.ConnectionError: logger.error(f"❌ 无法连接到 TTS API 服务器 (http://127.0.0.1:1024/tts),请检查服务是否在线") except requests.exceptions.Timeout: logger.error(f"❌ TTS API 请求超时") except Exception as e: logger.error(f"其他错误:{str(e)}") ########################################################################################### class FishTTS(BaseTTS): def txt_to_audio(self,msg:tuple[str, dict]): text,textevent = msg self.stream_tts( self.fish_speech( text, self.opt.REF_FILE, self.opt.REF_TEXT, "zh", #en args.language, self.opt.TTS_SERVER, #"http://127.0.0.1:5000", #args.server_url, ), msg ) def fish_speech(self, text, reffile, reftext,language, server_url) -> Iterator[bytes]: start = time.perf_counter() req={ 'text':text, 'reference_id':reffile, 'format':'wav', 'streaming':True, 'use_memory_cache':'on' } try: res = requests.post( f"{server_url}/v1/tts", json=req, stream=True, headers={ "content-type": "application/json", }, ) end = time.perf_counter() logger.info(f"fish_speech Time to make POST: {end-start}s") if res.status_code != 200: logger.error("Error:%s", res.text) return first = True for chunk in res.iter_content(chunk_size=17640): # 1764 44100*20ms*2 #print('chunk len:',len(chunk)) if first: end = time.perf_counter() logger.info(f"fish_speech Time to first chunk: {end-start}s") first = False if chunk and self.state==State.RUNNING: yield chunk #print("gpt_sovits response.elapsed:", res.elapsed) except Exception as e: logger.exception('fishtts') def stream_tts(self,audio_stream,msg:tuple[str, dict]): text,textevent = msg first = True for chunk in audio_stream: if chunk is not None and len(chunk)>0: stream = np.frombuffer(chunk, dtype=np.int16).astype(np.float32) / 32767 stream = resampy.resample(x=stream, sr_orig=44100, sr_new=self.sample_rate) #byte_stream=BytesIO(buffer) #stream = self.__create_bytes_stream(byte_stream) streamlen = stream.shape[0] idx=0 while streamlen >= self.chunk: eventpoint={} if first: eventpoint={'status':'start','text':text} eventpoint.update(**textevent) #eventpoint={'status':'start','text':text,'msgevent':textevent} first = False self.parent.put_audio_frame(stream[idx:idx+self.chunk],eventpoint) streamlen -= self.chunk idx += self.chunk eventpoint={'status':'end','text':text} eventpoint.update(**textevent) #eventpoint={'status':'end','text':text,'msgevent':textevent} self.parent.put_audio_frame(np.zeros(self.chunk,np.float32),eventpoint) ########################################################################################### class SovitsTTS(BaseTTS): def txt_to_audio(self,msg:tuple[str, dict]): text,textevent = msg self.stream_tts( self.gpt_sovits( text=text, reffile=self.opt.REF_FILE, reftext=self.opt.REF_TEXT, language="zh", #en args.language, server_url=self.opt.TTS_SERVER, #"http://127.0.0.1:5000", #args.server_url, ), msg ) def gpt_sovits(self, text, reffile, reftext,language, server_url) -> Iterator[bytes]: start = time.perf_counter() req={ 'text':text, 'text_lang':language, 'ref_audio_path':reffile, 'prompt_text':reftext, 'prompt_lang':language, 'media_type':'ogg', 'streaming_mode':True } # req["text"] = text # req["text_language"] = language # req["character"] = character # req["emotion"] = emotion # #req["stream_chunk_size"] = stream_chunk_size # you can reduce it to get faster response, but degrade quality # req["streaming_mode"] = True try: res = requests.post( f"{server_url}/tts", json=req, stream=True, ) end = time.perf_counter() logger.info(f"gpt_sovits Time to make POST: {end-start}s") if res.status_code != 200: logger.error("Error:%s", res.text) return first = True for chunk in res.iter_content(chunk_size=None): #12800 1280 32K*20ms*2 logger.info('chunk len:%d',len(chunk)) if first: end = time.perf_counter() logger.info(f"gpt_sovits Time to first chunk: {end-start}s") first = False if chunk and self.state==State.RUNNING: yield chunk #print("gpt_sovits response.elapsed:", res.elapsed) except Exception as e: logger.exception('sovits') def __create_bytes_stream(self,byte_stream): #byte_stream=BytesIO(buffer) stream, sample_rate = sf.read(byte_stream) # [T*sample_rate,] float64 logger.info(f'[INFO]tts audio stream {sample_rate}: {stream.shape}') stream = stream.astype(np.float32) if stream.ndim > 1: logger.info(f'[WARN] audio has {stream.shape[1]} channels, only use the first.') stream = stream[:, 0] if sample_rate != self.sample_rate and stream.shape[0]>0: logger.info(f'[WARN] audio sample rate is {sample_rate}, resampling into {self.sample_rate}.') stream = resampy.resample(x=stream, sr_orig=sample_rate, sr_new=self.sample_rate) return stream def stream_tts(self,audio_stream,msg:tuple[str, dict]): text,textevent = msg first = True for chunk in audio_stream: if chunk is not None and len(chunk)>0: #stream = np.frombuffer(chunk, dtype=np.int16).astype(np.float32) / 32767 #stream = resampy.resample(x=stream, sr_orig=32000, sr_new=self.sample_rate) byte_stream=BytesIO(chunk) stream = self.__create_bytes_stream(byte_stream) streamlen = stream.shape[0] idx=0 while streamlen >= self.chunk: eventpoint={} if first: eventpoint={'status':'start','text':text} eventpoint.update(**textevent) first = False self.parent.put_audio_frame(stream[idx:idx+self.chunk],eventpoint) streamlen -= self.chunk idx += self.chunk eventpoint={'status':'end','text':text} eventpoint.update(**textevent) self.parent.put_audio_frame(np.zeros(self.chunk,np.float32),eventpoint) ########################################################################################### class CosyVoiceTTS(BaseTTS): def txt_to_audio(self,msg:tuple[str, dict]): text,textevent = msg self.stream_tts( self.cosy_voice( text, self.opt.REF_FILE, self.opt.REF_TEXT, "zh", #en args.language, self.opt.TTS_SERVER, #"http://127.0.0.1:5000", #args.server_url, ), msg ) def cosy_voice(self, text, reffile, reftext,language, server_url) -> Iterator[bytes]: start = time.perf_counter() payload = { 'tts_text': text, 'prompt_text': reftext } try: files = [('prompt_wav', ('prompt_wav', open(reffile, 'rb'), 'application/octet-stream'))] res = requests.request("GET", f"{server_url}/inference_zero_shot", data=payload, files=files, stream=True) end = time.perf_counter() logger.info(f"cosy_voice Time to make POST: {end-start}s") if res.status_code != 200: logger.error("Error:%s", res.text) return first = True for chunk in res.iter_content(chunk_size=9600): # 960 24K*20ms*2 if first: end = time.perf_counter() logger.info(f"cosy_voice Time to first chunk: {end-start}s") first = False if chunk and self.state==State.RUNNING: yield chunk except Exception as e: logger.exception('cosyvoice') def stream_tts(self,audio_stream,msg:tuple[str, dict]): text,textevent = msg first = True for chunk in audio_stream: if chunk is not None and len(chunk)>0: stream = np.frombuffer(chunk, dtype=np.int16).astype(np.float32) / 32767 stream = resampy.resample(x=stream, sr_orig=24000, sr_new=self.sample_rate) #byte_stream=BytesIO(buffer) #stream = self.__create_bytes_stream(byte_stream) streamlen = stream.shape[0] idx=0 while streamlen >= self.chunk: eventpoint={} if first: eventpoint={'status':'start','text':text} eventpoint.update(**textevent) first = False self.parent.put_audio_frame(stream[idx:idx+self.chunk],eventpoint) streamlen -= self.chunk idx += self.chunk eventpoint={'status':'end','text':text} eventpoint.update(**textevent) self.parent.put_audio_frame(np.zeros(self.chunk,np.float32),eventpoint) ########################################################################################### _PROTOCOL = "https://" _HOST = "tts.cloud.tencent.com" _PATH = "/stream" _ACTION = "TextToStreamAudio" class TencentTTS(BaseTTS): def __init__(self, opt, parent): super().__init__(opt,parent) self.appid = os.getenv("TENCENT_APPID") self.secret_key = os.getenv("TENCENT_SECRET_KEY") self.secret_id = os.getenv("TENCENT_SECRET_ID") self.voice_type = int(opt.REF_FILE) self.codec = "pcm" self.sample_rate = 16000 self.volume = 0 self.speed = 0 def __gen_signature(self, params): sort_dict = sorted(params.keys()) sign_str = "POST" + _HOST + _PATH + "?" for key in sort_dict: sign_str = sign_str + key + "=" + str(params[key]) + '&' sign_str = sign_str[:-1] hmacstr = hmac.new(self.secret_key.encode('utf-8'), sign_str.encode('utf-8'), hashlib.sha1).digest() s = base64.b64encode(hmacstr) s = s.decode('utf-8') return s def __gen_params(self, session_id, text): params = dict() params['Action'] = _ACTION params['AppId'] = int(self.appid) params['SecretId'] = self.secret_id params['ModelType'] = 1 params['VoiceType'] = self.voice_type params['Codec'] = self.codec params['SampleRate'] = self.sample_rate params['Speed'] = self.speed params['Volume'] = self.volume params['SessionId'] = session_id params['Text'] = text timestamp = int(time.time()) params['Timestamp'] = timestamp params['Expired'] = timestamp + 24 * 60 * 60 return params def txt_to_audio(self,msg:tuple[str, dict]): text,textevent = msg self.stream_tts( self.tencent_voice( text, self.opt.REF_FILE, self.opt.REF_TEXT, "zh", #en args.language, self.opt.TTS_SERVER, #"http://127.0.0.1:5000", #args.server_url, ), msg ) def tencent_voice(self, text, reffile, reftext,language, server_url) -> Iterator[bytes]: start = time.perf_counter() session_id = str(uuid.uuid1()) params = self.__gen_params(session_id, text) signature = self.__gen_signature(params) headers = { "Content-Type": "application/json", "Authorization": str(signature) } url = _PROTOCOL + _HOST + _PATH try: res = requests.post(url, headers=headers, data=json.dumps(params), stream=True) end = time.perf_counter() logger.info(f"tencent Time to make POST: {end-start}s") first = True for chunk in res.iter_content(chunk_size=6400): # 640 16K*20ms*2 #logger.info('chunk len:%d',len(chunk)) if first: try: rsp = json.loads(chunk) #response["Code"] = rsp["Response"]["Error"]["Code"] #response["Message"] = rsp["Response"]["Error"]["Message"] logger.error("tencent tts:%s",rsp["Response"]["Error"]["Message"]) return except: end = time.perf_counter() logger.info(f"tencent Time to first chunk: {end-start}s") first = False if chunk and self.state==State.RUNNING: yield chunk except Exception as e: logger.exception('tencent') def stream_tts(self,audio_stream,msg:tuple[str, dict]): text,textevent = msg first = True last_stream = np.array([],dtype=np.float32) for chunk in audio_stream: if chunk is not None and len(chunk)>0: stream = np.frombuffer(chunk, dtype=np.int16).astype(np.float32) / 32767 stream = np.concatenate((last_stream,stream)) #stream = resampy.resample(x=stream, sr_orig=24000, sr_new=self.sample_rate) #byte_stream=BytesIO(buffer) #stream = self.__create_bytes_stream(byte_stream) streamlen = stream.shape[0] idx=0 while streamlen >= self.chunk: eventpoint={} if first: eventpoint={'status':'start','text':text} eventpoint.update(**textevent) first = False self.parent.put_audio_frame(stream[idx:idx+self.chunk],eventpoint) streamlen -= self.chunk idx += self.chunk last_stream = stream[idx:] #get the remain stream eventpoint={'status':'end','text':text} eventpoint.update(**textevent) self.parent.put_audio_frame(np.zeros(self.chunk,np.float32),eventpoint) ########################################################################################### class DoubaoTTS(BaseTTS): def __init__(self, opt, parent): super().__init__(opt, parent) # 从配置中读取火山引擎参数 self.appid = os.getenv("DOUBAO_APPID") self.token = os.getenv("DOUBAO_TOKEN") _cluster = 'volcano_tts' _host = "openspeech.bytedance.com" self.api_url = f"wss://{_host}/api/v1/tts/ws_binary" self.request_json = { "app": { "appid": self.appid, "token": "access_token", "cluster": _cluster }, "user": { "uid": "xxx" }, "audio": { "voice_type": "xxx", "encoding": "pcm", "rate": 16000, "speed_ratio": 1.0, "volume_ratio": 1.0, "pitch_ratio": 1.0, }, "request": { "reqid": "xxx", "text": "字节跳动语音合成。", "text_type": "plain", "operation": "xxx" } } async def doubao_voice(self, text): # -> Iterator[bytes]: start = time.perf_counter() voice_type = self.opt.REF_FILE try: # 创建请求对象 default_header = bytearray(b'\x11\x10\x11\x00') submit_request_json = copy.deepcopy(self.request_json) submit_request_json["user"]["uid"] = self.parent.sessionid submit_request_json["audio"]["voice_type"] = voice_type submit_request_json["request"]["text"] = text submit_request_json["request"]["reqid"] = str(uuid.uuid4()) submit_request_json["request"]["operation"] = "submit" payload_bytes = str.encode(json.dumps(submit_request_json)) payload_bytes = gzip.compress(payload_bytes) # if no compression, comment this line full_client_request = bytearray(default_header) full_client_request.extend((len(payload_bytes)).to_bytes(4, 'big')) # payload size(4 bytes) full_client_request.extend(payload_bytes) # payload header = {"Authorization": f"Bearer; {self.token}"} first = True async with websockets.connect(self.api_url, extra_headers=header, ping_interval=None) as ws: await ws.send(full_client_request) while True: res = await ws.recv() header_size = res[0] & 0x0f message_type = res[1] >> 4 message_type_specific_flags = res[1] & 0x0f payload = res[header_size*4:] if message_type == 0xb: # audio-only server response if message_type_specific_flags == 0: # no sequence number as ACK #print(" Payload size: 0") continue else: if first: end = time.perf_counter() logger.info(f"doubao tts Time to first chunk: {end-start}s") first = False sequence_number = int.from_bytes(payload[:4], "big", signed=True) payload_size = int.from_bytes(payload[4:8], "big", signed=False) payload = payload[8:] yield payload if sequence_number < 0: break else: break except Exception as e: logger.exception('doubao') # # 检查响应状态码 # if response.status_code == 200: # # 处理响应数据 # audio_data = base64.b64decode(response.json().get('data')) # yield audio_data # else: # logger.error(f"请求失败,状态码: {response.status_code}") # return def txt_to_audio(self, msg:tuple[str, dict]): text, textevent = msg asyncio.new_event_loop().run_until_complete( self.stream_tts( self.doubao_voice(text), msg ) ) async def stream_tts(self, audio_stream, msg:tuple[str, dict]): text, textevent = msg first = True last_stream = np.array([],dtype=np.float32) async for chunk in audio_stream: if chunk is not None and len(chunk) > 0: stream = np.frombuffer(chunk, dtype=np.int16).astype(np.float32) / 32767 stream = np.concatenate((last_stream,stream)) #stream = resampy.resample(x=stream, sr_orig=24000, sr_new=self.sample_rate) # byte_stream=BytesIO(buffer) # stream = self.__create_bytes_stream(byte_stream) streamlen = stream.shape[0] idx = 0 while streamlen >= self.chunk: eventpoint = {} if first: eventpoint={'status':'start','text':text} eventpoint.update(**textevent) first = False self.parent.put_audio_frame(stream[idx:idx + self.chunk], eventpoint) streamlen -= self.chunk idx += self.chunk last_stream = stream[idx:] #get the remain stream eventpoint={'status':'end','text':text} eventpoint.update(**textevent) self.parent.put_audio_frame(np.zeros(self.chunk, np.float32), eventpoint) ########################################################################################### class IndexTTS2(BaseTTS): def __init__(self, opt, parent): super().__init__(opt, parent) # IndexTTS2 配置参数 self.server_url = opt.TTS_SERVER # Gradio服务器地址,如 "http://127.0.0.1:7860/" self.ref_audio_path = opt.REF_FILE # 参考音频文件路径 self.max_tokens = getattr(opt, 'MAX_TOKENS', 120) # 最大token数 # 初始化Gradio客户端 try: from gradio_client import Client, handle_file self.client = Client(self.server_url) self.handle_file = handle_file logger.info(f"IndexTTS2 Gradio客户端初始化成功: {self.server_url}") except ImportError: logger.error("IndexTTS2 需要安装 gradio_client: pip install gradio_client") raise except Exception as e: logger.error(f"IndexTTS2 Gradio客户端初始化失败: {e}") raise def txt_to_audio(self, msg): text, textevent = msg try: # 先进行文本分割 segments = self.split_text(text) if not segments: logger.error("IndexTTS2 文本分割失败") return logger.info(f"IndexTTS2 文本分割为 {len(segments)} 个片段") # 循环生成每个片段的音频 for i, segment_text in enumerate(segments): if self.state != State.RUNNING: break logger.info(f"IndexTTS2 正在生成第 {i+1}/{len(segments)} 段音频...") audio_file = self.indextts2_generate(segment_text) if audio_file: # 为每个片段创建事件信息 segment_msg = (segment_text, textevent) self.file_to_stream(audio_file, segment_msg, is_first=(i==0), is_last=(i==len(segments)-1)) else: logger.error(f"IndexTTS2 第 {i+1} 段音频生成失败") except Exception as e: logger.exception(f"IndexTTS2 txt_to_audio 错误: {e}") def split_text(self, text): """使用 IndexTTS2 API 分割文本""" try: logger.info(f"IndexTTS2 开始分割文本,长度: {len(text)}") # 调用文本分割 API result = self.client.predict( text=text, max_text_tokens_per_segment=self.max_tokens, api_name="/on_input_text_change" ) # 解析分割结果 if 'value' in result and 'data' in result['value']: data = result['value']['data'] logger.info(f"IndexTTS2 共分割为 {len(data)} 个片段") segments = [] for i, item in enumerate(data): 序号 = item[0] + 1 分句内容 = item[1] token数 = item[2] logger.info(f"片段 {序号}: {len(分句内容)} 字符, {token数} tokens") segments.append(分句内容) return segments else: logger.error(f"IndexTTS2 文本分割结果格式异常: {result}") return [text] # 如果分割失败,返回原文本 except Exception as e: logger.exception(f"IndexTTS2 文本分割失败: {e}") return [text] # 如果分割失败,返回原文本 def indextts2_generate(self, text): """调用 IndexTTS2 Gradio API 生成语音""" start = time.perf_counter() try: # 调用 gen_single API result = self.client.predict( emo_control_method="Same as the voice reference", prompt=self.handle_file(self.ref_audio_path), text=text, emo_ref_path=self.handle_file(self.ref_audio_path), emo_weight=0.8, vec1=0.5, vec2=0, vec3=0, vec4=0, vec5=0, vec6=0, vec7=0, vec8=0, emo_text="", emo_random=False, max_text_tokens_per_segment=self.max_tokens, param_16=True, param_17=0.8, param_18=30, param_19=0.8, param_20=0, param_21=3, param_22=10, param_23=1500, api_name="/gen_single" ) end = time.perf_counter() logger.info(f"IndexTTS2 片段生成完成,耗时: {end-start:.2f}s") # 返回生成的音频文件路径 if 'value' in result: audio_file = result['value'] return audio_file else: logger.error(f"IndexTTS2 结果格式异常: {result}") return None except Exception as e: logger.exception(f"IndexTTS2 API调用失败: {e}") return None def file_to_stream(self, audio_file, msg, is_first=False, is_last=False): """将音频文件转换为音频流""" text, textevent = msg try: # 读取音频文件 stream, sample_rate = sf.read(audio_file) logger.info(f'IndexTTS2 音频文件 {sample_rate}Hz: {stream.shape}') # 转换为float32 stream = stream.astype(np.float32) # 如果是多声道,只取第一个声道 if stream.ndim > 1: logger.info(f'IndexTTS2 音频有 {stream.shape[1]} 个声道,只使用第一个') stream = stream[:, 0] # 重采样到目标采样率 if sample_rate != self.sample_rate and stream.shape[0] > 0: logger.info(f'IndexTTS2 重采样: {sample_rate}Hz -> {self.sample_rate}Hz') stream = resampy.resample(x=stream, sr_orig=sample_rate, sr_new=self.sample_rate) # 分块发送音频流 streamlen = stream.shape[0] idx = 0 first_chunk = True while streamlen >= self.chunk and self.state == State.RUNNING: eventpoint = None # 只在第一个片段的第一个chunk发送start事件 if is_first and first_chunk: eventpoint = {'status': 'start', 'text': text, 'msgevent': textevent} first_chunk = False self.parent.put_audio_frame(stream[idx:idx + self.chunk], eventpoint) idx += self.chunk streamlen -= self.chunk # 只在最后一个片段发送end事件 if is_last: eventpoint = {'status': 'end', 'text': text, 'msgevent': textevent} self.parent.put_audio_frame(np.zeros(self.chunk, np.float32), eventpoint) # 清理临时文件 try: if os.path.exists(audio_file): os.remove(audio_file) logger.info(f"IndexTTS2 已删除临时文件: {audio_file}") except Exception as e: logger.warning(f"IndexTTS2 删除临时文件失败: {e}") except Exception as e: logger.exception(f"IndexTTS2 音频流处理失败: {e}") ########################################################################################### class XTTS(BaseTTS): def __init__(self, opt, parent): super().__init__(opt,parent) self.speaker = self.get_speaker(opt.REF_FILE, opt.TTS_SERVER) def txt_to_audio(self,msg:tuple[str, dict]): text,textevent = msg self.stream_tts( self.xtts( text, self.speaker, "zh-cn", #en args.language, self.opt.TTS_SERVER, #"http://localhost:9000", #args.server_url, "20" #args.stream_chunk_size ), msg ) def get_speaker(self,ref_audio,server_url): files = {"wav_file": ("reference.wav", open(ref_audio, "rb"))} response = requests.post(f"{server_url}/clone_speaker", files=files) return response.json() def xtts(self,text, speaker, language, server_url, stream_chunk_size) -> Iterator[bytes]: start = time.perf_counter() speaker["text"] = text speaker["language"] = language speaker["stream_chunk_size"] = stream_chunk_size # you can reduce it to get faster response, but degrade quality try: res = requests.post( f"{server_url}/tts_stream", json=speaker, stream=True, ) end = time.perf_counter() logger.info(f"xtts Time to make POST: {end-start}s") if res.status_code != 200: print("Error:", res.text) return first = True for chunk in res.iter_content(chunk_size=None): #24K*20ms*2 if first: end = time.perf_counter() logger.info(f"xtts Time to first chunk: {end-start}s") first = False if chunk: yield chunk except Exception as e: print(e) def stream_tts(self,audio_stream,msg:tuple[str, dict]): text,textevent = msg first = True last_stream = np.array([],dtype=np.float32) for chunk in audio_stream: if chunk is not None and len(chunk)>0: stream = np.frombuffer(chunk, dtype=np.int16).astype(np.float32) / 32767 stream = resampy.resample(x=stream, sr_orig=24000, sr_new=self.sample_rate) stream = np.concatenate((last_stream,stream)) #byte_stream=BytesIO(buffer) #stream = self.__create_bytes_stream(byte_stream) streamlen = stream.shape[0] idx=0 while streamlen >= self.chunk: eventpoint={} if first: eventpoint={'status':'start','text':text} eventpoint.update(**textevent) first = False self.parent.put_audio_frame(stream[idx:idx+self.chunk],eventpoint) streamlen -= self.chunk idx += self.chunk last_stream = stream[idx:] #get the remain stream eventpoint={'status':'end','text':text} eventpoint.update(**textevent) self.parent.put_audio_frame(np.zeros(self.chunk,np.float32),eventpoint) ########################################################################################### class AzureTTS(BaseTTS): CHUNK_SIZE = 640 # 16kHz, 20ms, 16-bit Mono PCM size def __init__(self, opt, parent): super().__init__(opt,parent) self.audio_buffer = b'' voicename = self.opt.REF_FILE # 比如"zh-CN-XiaoxiaoMultilingualNeural" speech_key = os.getenv("AZURE_SPEECH_KEY") tts_region = os.getenv("AZURE_TTS_REGION") speech_endpoint = f"wss://{tts_region}.tts.speech.microsoft.com/cognitiveservices/websocket/v2" speech_config = speechsdk.SpeechConfig(subscription=speech_key,endpoint=speech_endpoint) speech_config.speech_synthesis_voice_name = voicename speech_config.set_speech_synthesis_output_format(speechsdk.SpeechSynthesisOutputFormat.Raw16Khz16BitMonoPcm) # 获取内存中流形式的结果 self.speech_synthesizer = speechsdk.SpeechSynthesizer(speech_config=speech_config, audio_config=None) self.speech_synthesizer.synthesizing.connect(self._on_synthesizing) def txt_to_audio(self,msg:tuple[str, dict]): msg_text: str = msg[0] result=self.speech_synthesizer.speak_text(msg_text) # 延迟指标 fb_latency = int(result.properties.get_property( speechsdk.PropertyId.SpeechServiceResponse_SynthesisFirstByteLatencyMs )) fin_latency = int(result.properties.get_property( speechsdk.PropertyId.SpeechServiceResponse_SynthesisFinishLatencyMs )) logger.info(f"azure音频生成相关:首字节延迟: {fb_latency} ms, 完成延迟: {fin_latency} ms, result_id: {result.result_id}") # === 回调 === def _on_synthesizing(self, evt: speechsdk.SpeechSynthesisEventArgs): if evt.result.reason == speechsdk.ResultReason.SynthesizingAudioCompleted: logger.info("SynthesizingAudioCompleted") elif evt.result.reason == speechsdk.ResultReason.Canceled: cancellation_details = evt.result.cancellation_details logger.info(f"Speech synthesis canceled: {cancellation_details.reason}") if cancellation_details.reason == speechsdk.CancellationReason.Error: if cancellation_details.error_details: logger.info(f"Error details: {cancellation_details.error_details}") if self.state != State.RUNNING: self.audio_buffer = b'' return # evt.result.audio_data 是刚到的一小段原始 PCM self.audio_buffer += evt.result.audio_data while len(self.audio_buffer) >= self.CHUNK_SIZE: chunk = self.audio_buffer[:self.CHUNK_SIZE] self.audio_buffer = self.audio_buffer[self.CHUNK_SIZE:] frame = (np.frombuffer(chunk, dtype=np.int16) .astype(np.float32) / 32767.0) self.parent.put_audio_frame(frame)