| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255 |
- ###############################################################################
- # Copyright (C) 2024 LiveTalking@lipku https://github.com/lipku/LiveTalking
- # email: lipku@foxmail.com
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- ###############################################################################
- from __future__ import annotations
- import time
- import numpy as np
- import soundfile as sf
- import resampy
- import asyncio
- # import edge_tts # 已移除,改为通过本地 API 调用
- import os
- import hmac
- import hashlib
- import base64
- import json
- import uuid
- import threading
- from typing import Iterator
- import requests
- import queue
- from queue import Queue
- from io import BytesIO
- import copy,websockets,gzip
- import azure.cognitiveservices.speech as speechsdk
- from threading import Thread, Event
- from enum import Enum
- from typing import TYPE_CHECKING
- if TYPE_CHECKING:
- from basereal import BaseReal
- from logger import logger
- class State(Enum):
- RUNNING=0
- PAUSE=1
- class BaseTTS:
- def __init__(self, opt, parent:BaseReal):
- self.opt=opt
- self.parent = parent
- self.fps = opt.fps # 20 ms per frame
- self.sample_rate = 16000
- self.chunk = self.sample_rate // self.fps # 320 samples per chunk (20ms * 16000 / 1000)
- self.input_stream = BytesIO()
- self.msgqueue = Queue()
- self.high_priority_queue = Queue() # 高优先级队列,用于用户问题
- self.state = State.RUNNING
- # 添加属性来存储被中断的消息
- self.interrupted_messages = [] # 存储被中断的消息
- # 添加属性来跟踪当前正在处理的消息
- self.current_msg = None # 当前正在处理的消息
- self.current_msg_progress = 0 # 当前消息的进度(字符位置)
- self.interrupt_flag = threading.Event() # 添加打断事件
-
- def reset_interrupt_flag(self):
- """重置打断标志"""
- self.interrupt_flag.clear()
-
- def set_interrupt_flag(self):
- """设置打断标志"""
- self.interrupt_flag.set()
- def _trigger_continue_play(self):
- """触发自动续播下一条介绍内容"""
- try:
- # 检查是否处于介绍播放状态
- if (hasattr(self.parent, 'intro_play_state') and
- self.parent.intro_play_state.get('is_playing', False) and
- not self.parent.intro_play_state.get('is_paused', False) and
- not self.parent.intro_play_state.get('is_waiting_next', False)):
-
- # 检查是否有高优先级消息在等待,如果有则不触发续播
- if not self.high_priority_queue.empty():
- logger.info("有高优先级消息等待,跳过自动续播")
- return
-
- # 检查消息队列是否为空,如果不为空说明还有消息在等待,不触发续播
- if not self.msgqueue.empty():
- logger.info("消息队列不为空,跳过自动续播")
- return
-
- # 触发续播
- self.parent._continue_intro_play()
- else:
- if hasattr(self.parent, 'intro_play_state') and self.parent.intro_play_state.get('is_waiting_next', False):
- logger.info("正在等待前一条播放完成,跳过本次触发")
- except Exception as e:
- logger.error(f"触发自动续播时出错: {e}")
- def flush_talk(self):
- # 停止当前播放并清空待处理的消息队列,但保留当前正在处理的状态
- # 这样可以确保打断后不会播放队列中积累的旧消息
- with self.msgqueue.mutex: # 使用队列的互斥锁确保线程安全
- # 保存队列中的剩余消息到中断列表
- remaining_msgs = list(self.msgqueue.queue)
- if remaining_msgs:
- self.interrupted_messages.extend(remaining_msgs)
- self.msgqueue.queue.clear() # 清空队列中等待的消息
- # 如果当前有正在处理的消息,也要将其保存到中断列表
- if self.current_msg:
- # 将当前消息加入中断列表
- self.interrupted_messages.append(self.current_msg)
- # 清空当前消息引用,因为已经被中断
- self.current_msg = None
- self.state = State.PAUSE
- # 清除当前正在处理的音频缓冲区
- if hasattr(self, 'input_stream') and hasattr(self.input_stream, 'seek') and hasattr(self.input_stream, 'truncate'):
- self.input_stream.seek(0)
- self.input_stream.truncate()
- def resume_interrupted(self):
- """恢复播放被中断的消息"""
- if self.interrupted_messages:
- # 将被中断的消息重新加入队列
- with self.msgqueue.mutex:
- for msg in self.interrupted_messages:
- self.msgqueue.put(msg)
- # 清空中断消息列表
- self.interrupted_messages.clear()
- # 将状态设置为运行,以便继续处理消息
- self.state = State.RUNNING
- return True
- return False
- def put_msg_txt(self,msg:str,datainfo:dict={}):
- if len(msg)>0:
- # 对于长文本,按句子分割以支持更好的打断功能
- if len(msg) > 100: # 如果文本超过100字符,进行分割
- import re
- # 按标点符号分割文本,保留分隔符
- sentences = re.split(r'([。!?.!?])', msg)
- # 将句子和标点符号重新组合
- parts = []
- for i in range(0, len(sentences)-1, 2):
- sentence = sentences[i]
- punctuation = sentences[i+1] if i+1 < len(sentences) else ''
- if sentence.strip():
- parts.append(sentence.strip() + punctuation)
-
- # 如果分割后有多个部分,分别放入队列
- if len(parts) > 1:
- for part in parts:
- if part.strip():
- self.msgqueue.put((part, datainfo))
- return
-
- # 短文本或无法分割的文本直接放入队列
- self.msgqueue.put((msg, datainfo))
-
- def put_high_priority_msg(self,msg:str,datainfo:dict={}):
- """添加高优先级消息,会优先处理"""
- if len(msg)>0:
- # 对于长文本,按句子分割以支持更好的打断功能
- if len(msg) > 100: # 如果文本超过100字符,进行分割
- import re
- # 按标点符号分割文本,保留分隔符
- sentences = re.split(r'([。!?.!?])', msg)
- # 将句子和标点符号重新组合
- parts = []
- for i in range(0, len(sentences)-1, 2):
- sentence = sentences[i]
- punctuation = sentences[i+1] if i+1 < len(sentences) else ''
- if sentence.strip():
- parts.append(sentence.strip() + punctuation)
-
- # 如果分割后有多个部分,分别放入高优先级队列
- if len(parts) > 1:
- for part in parts:
- if part.strip():
- self.high_priority_queue.put((part, datainfo))
- return
-
- # 短文本或无法分割的文本直接放入高优先级队列
- self.high_priority_queue.put((msg, datainfo))
- def render(self,quit_event):
- process_thread = Thread(target=self.process_tts, args=(quit_event,))
- process_thread.start()
-
- def process_tts(self,quit_event):
- while not quit_event.is_set():
- try:
- # 检查状态是否为RUNNING,如果不是,则短暂等待后继续
- # 优先检查高优先级队列 - 每次循环都检查高优先级队列
- msg = None
- try:
- # 首先检查高优先级队列,不等待,立即返回
- if not self.high_priority_queue.empty():
- msg = self.high_priority_queue.get_nowait()
- # 处理高优先级消息时,确保状态为RUNNING
- self.state = State.RUNNING
- logger.info("处理高优先级消息")
- else:
- # 如果高优先级队列为空,检查普通队列
- if self.state != State.RUNNING and not self.msgqueue.empty():
- # 如果队列不为空但状态不是RUNNING,等待一段时间后重试
- import time
- time.sleep(0.1) # 短暂等待
- continue
- msg = self.msgqueue.get(block=True, timeout=0.05) # 使用较短超时时间,以便快速检查高优先级队列
- except queue.Empty:
- # 如果两个队列都没有消息,继续等待
- continue
-
- # 检查是否是唤醒消息(空消息),如果是则跳过处理,继续循环检查高优先级队列
- if msg and len(msg) >= 2 and isinstance(msg[0], str) and not msg[0].strip():
- continue # 跳过空消息,继续检查队列
-
- # 记录当前正在处理的消息
- self.current_msg = msg
- self.current_msg_progress = 0 # 重置进度
- except queue.Empty:
- continue
-
- # 在处理音频前再次检查状态,如果状态已改变则跳过处理
- if self.state == State.RUNNING:
- # 检查是否有高优先级消息,如果有则优先处理
- if not self.high_priority_queue.empty():
- logger.info("发现高优先级消息,中断当前普通消息处理")
- # 保存当前消息到中断队列
- if msg:
- self.interrupted_messages.append(msg)
- # 处理高优先级消息
- high_priority_msg = self.high_priority_queue.get_nowait()
- self.txt_to_audio(high_priority_msg)
- # 高优先级消息不触发自动续播
- else:
- # 处理普通消息
- self.txt_to_audio(msg)
- # 注意:自动续播不再在这里触发,而是在 _continue_intro_play 中通过定时器控制
-
- # 消息处理完成后,清空当前消息
- self.current_msg = None
- self.current_msg_progress = 0
- logger.info('ttsreal thread stop')
-
- def txt_to_audio(self,msg:tuple[str, dict]):
- pass
-
- ###########################################################################################
- class EdgeTTS(BaseTTS):
- def txt_to_audio(self,msg:tuple[str, dict]):
- voicename = self.opt.REF_FILE #"zh-CN-YunxiaNeural"
- text,textevent = msg
- t = time.time()
- # 在开始TTS请求前检查状态,如果状态已改变则跳过
- if self.state != State.RUNNING:
- return
-
- # 处理空消息,直接返回
- if not text.strip():
- return
-
- # 重置打断标志,确保高优先级消息能够正常处理
- self.reset_interrupt_flag()
-
- # 检查是否有高优先级消息,如果有则立即返回,让process_tts处理
- if not self.high_priority_queue.empty():
- logger.info("发现高优先级消息,跳过当前普通消息处理")
- return
-
- # 使用异步方式处理TTS请求,以便能够响应中断
- loop = asyncio.new_event_loop()
- asyncio.set_event_loop(loop)
-
- # 创建一个任务来处理TTS请求
- task = loop.create_task(self.__main(voicename, text))
-
- # 定期检查中断标志和高优先级队列
- while not task.done():
- if self.interrupt_flag.is_set() or not self.high_priority_queue.empty():
- task.cancel()
- logger.info("TTS请求被高优先级消息中断")
- break
- loop.run_until_complete(asyncio.sleep(0.05)) # 缩短检查间隔,提高响应速度
-
- # 等待任务完成或取消
- try:
- loop.run_until_complete(task)
- except asyncio.CancelledError:
- logger.info("TTS请求被中断")
- self.input_stream.seek(0)
- self.input_stream.truncate()
- return
-
- logger.info(f'-------edge tts time:{time.time()-t:.4f}s')
- if self.input_stream.getbuffer().nbytes<=0: #edgetts err
- logger.error('edgetts err!!!!!')
- return
-
- self.input_stream.seek(0)
- stream = self.__create_bytes_stream(self.input_stream)
- streamlen = stream.shape[0]
- idx=0
- # 在播放前再次检查状态,如果状态已改变则跳过播放
- if self.state != State.RUNNING:
- self.input_stream.seek(0)
- self.input_stream.truncate()
- return
-
- # 检查打断标志或高优先级队列
- if self.interrupt_flag.is_set() or not self.high_priority_queue.empty():
- self.input_stream.seek(0)
- self.input_stream.truncate()
- return
-
- while streamlen >= self.chunk and self.state==State.RUNNING:
- # 每次循环都检查打断标志和高优先级队列
- if self.interrupt_flag.is_set() or not self.high_priority_queue.empty():
- logger.info("播放过程中发现高优先级消息,中断播放")
- break
-
- eventpoint={}
- streamlen -= self.chunk
- if idx==0:
- eventpoint={'status':'start','text':text}
- eventpoint.update(**textevent) #eventpoint={'status':'start','text':text,'msgevent':textevent}
- elif streamlen<self.chunk:
- eventpoint={'status':'end','text':text}
- eventpoint.update(**textevent) #eventpoint={'status':'end','text':text,'msgevent':textevent}
- # 在发送音频帧之前再次检查状态
- if self.state != State.RUNNING:
- break
- # 检查打断标志和高优先级队列
- if self.interrupt_flag.is_set() or not self.high_priority_queue.empty():
- logger.info("发送音频帧前发现高优先级消息,中断播放")
- break
- self.parent.put_audio_frame(stream[idx:idx+self.chunk],eventpoint)
- idx += self.chunk
- #if streamlen>0: #skip last frame(not 20ms)
- # self.queue.put(stream[idx:])
- self.input_stream.seek(0)
- self.input_stream.truncate()
- def __create_bytes_stream(self,byte_stream):
- #byte_stream=BytesIO(buffer)
- stream, sample_rate = sf.read(byte_stream) # [T*sample_rate,] float64
- logger.info(f'[INFO]tts audio stream {sample_rate}: {stream.shape}')
- stream = stream.astype(np.float32)
- if stream.ndim > 1:
- logger.info(f'[WARN] audio has {stream.shape[1]} channels, only use the first.')
- stream = stream[:, 0]
-
- if sample_rate != self.sample_rate and stream.shape[0]>0:
- logger.info(f'[WARN] audio sample rate is {sample_rate}, resampling into {self.sample_rate}.')
- stream = resampy.resample(x=stream, sr_orig=sample_rate, sr_new=self.sample_rate)
- return stream
-
- async def __main(self,voicename: str, text: str):
- """通过本地 API 调用 Edge TTS 服务"""
- try:
- # 使用同步方式调用 HTTP API(在异步函数中使用线程池)
- loop = asyncio.get_event_loop()
-
- # 准备请求参数
- headers = {"Content-Type": "application/json"}
- data = {
- "text": text,
- "voice": voicename,
- "rate": "+0%",
- "volume": "+0%",
- "pitch": "+0Hz"
- }
-
- # 在线程池中执行同步请求,避免阻塞事件循环
- def make_request():
- resp = requests.post(
- "http://127.0.0.1:1024/tts",
- json=data,
- headers=headers,
- timeout=30
- )
- return resp
-
- # 使用线程池执行同步请求
- resp = await loop.run_in_executor(None, make_request)
-
- logger.info(f"Edge TTS API 响应状态码:{resp.status_code}")
-
- # 成功返回音频流
- if resp.status_code == 200 and str(resp.headers.get("Content-Type", "")).startswith("audio/"):
- # 将音频数据写入输入流
- self.input_stream.write(resp.content)
- logger.info(f"✅ 成功从 API 获取音频数据,大小:{len(resp.content)} bytes")
- else:
- # 非 200 时解析错误并抛出
- try:
- detail = resp.json()
- except Exception:
- detail = resp.text
- logger.error(f"⚠️ TTS API 接口返回错误:{detail}")
- resp.raise_for_status()
-
- except requests.exceptions.HTTPError as e:
- logger.error(f"HTTP 错误:{e}")
- except requests.exceptions.ConnectionError:
- logger.error(f"❌ 无法连接到 TTS API 服务器 (http://127.0.0.1:1024/tts),请检查服务是否在线")
- except requests.exceptions.Timeout:
- logger.error(f"❌ TTS API 请求超时")
- except Exception as e:
- logger.error(f"其他错误:{str(e)}")
- ###########################################################################################
- class FishTTS(BaseTTS):
- def txt_to_audio(self,msg:tuple[str, dict]):
- text,textevent = msg
- self.stream_tts(
- self.fish_speech(
- text,
- self.opt.REF_FILE,
- self.opt.REF_TEXT,
- "zh", #en args.language,
- self.opt.TTS_SERVER, #"http://127.0.0.1:5000", #args.server_url,
- ),
- msg
- )
- def fish_speech(self, text, reffile, reftext,language, server_url) -> Iterator[bytes]:
- start = time.perf_counter()
- req={
- 'text':text,
- 'reference_id':reffile,
- 'format':'wav',
- 'streaming':True,
- 'use_memory_cache':'on'
- }
- try:
- res = requests.post(
- f"{server_url}/v1/tts",
- json=req,
- stream=True,
- headers={
- "content-type": "application/json",
- },
- )
- end = time.perf_counter()
- logger.info(f"fish_speech Time to make POST: {end-start}s")
- if res.status_code != 200:
- logger.error("Error:%s", res.text)
- return
-
- first = True
-
- for chunk in res.iter_content(chunk_size=17640): # 1764 44100*20ms*2
- #print('chunk len:',len(chunk))
- if first:
- end = time.perf_counter()
- logger.info(f"fish_speech Time to first chunk: {end-start}s")
- first = False
- if chunk and self.state==State.RUNNING:
- yield chunk
- #print("gpt_sovits response.elapsed:", res.elapsed)
- except Exception as e:
- logger.exception('fishtts')
- def stream_tts(self,audio_stream,msg:tuple[str, dict]):
- text,textevent = msg
- first = True
- for chunk in audio_stream:
- if chunk is not None and len(chunk)>0:
- stream = np.frombuffer(chunk, dtype=np.int16).astype(np.float32) / 32767
- stream = resampy.resample(x=stream, sr_orig=44100, sr_new=self.sample_rate)
- #byte_stream=BytesIO(buffer)
- #stream = self.__create_bytes_stream(byte_stream)
- streamlen = stream.shape[0]
- idx=0
- while streamlen >= self.chunk:
- eventpoint={}
- if first:
- eventpoint={'status':'start','text':text}
- eventpoint.update(**textevent) #eventpoint={'status':'start','text':text,'msgevent':textevent}
- first = False
- self.parent.put_audio_frame(stream[idx:idx+self.chunk],eventpoint)
- streamlen -= self.chunk
- idx += self.chunk
- eventpoint={'status':'end','text':text}
- eventpoint.update(**textevent) #eventpoint={'status':'end','text':text,'msgevent':textevent}
- self.parent.put_audio_frame(np.zeros(self.chunk,np.float32),eventpoint)
- ###########################################################################################
- class SovitsTTS(BaseTTS):
- def txt_to_audio(self,msg:tuple[str, dict]):
- text,textevent = msg
- self.stream_tts(
- self.gpt_sovits(
- text=text,
- reffile=self.opt.REF_FILE,
- reftext=self.opt.REF_TEXT,
- language="zh", #en args.language,
- server_url=self.opt.TTS_SERVER, #"http://127.0.0.1:5000", #args.server_url,
- ),
- msg
- )
- def gpt_sovits(self, text, reffile, reftext,language, server_url) -> Iterator[bytes]:
- start = time.perf_counter()
- req={
- 'text':text,
- 'text_lang':language,
- 'ref_audio_path':reffile,
- 'prompt_text':reftext,
- 'prompt_lang':language,
- 'media_type':'ogg',
- 'streaming_mode':True
- }
- # req["text"] = text
- # req["text_language"] = language
- # req["character"] = character
- # req["emotion"] = emotion
- # #req["stream_chunk_size"] = stream_chunk_size # you can reduce it to get faster response, but degrade quality
- # req["streaming_mode"] = True
- try:
- res = requests.post(
- f"{server_url}/tts",
- json=req,
- stream=True,
- )
- end = time.perf_counter()
- logger.info(f"gpt_sovits Time to make POST: {end-start}s")
- if res.status_code != 200:
- logger.error("Error:%s", res.text)
- return
-
- first = True
-
- for chunk in res.iter_content(chunk_size=None): #12800 1280 32K*20ms*2
- logger.info('chunk len:%d',len(chunk))
- if first:
- end = time.perf_counter()
- logger.info(f"gpt_sovits Time to first chunk: {end-start}s")
- first = False
- if chunk and self.state==State.RUNNING:
- yield chunk
- #print("gpt_sovits response.elapsed:", res.elapsed)
- except Exception as e:
- logger.exception('sovits')
- def __create_bytes_stream(self,byte_stream):
- #byte_stream=BytesIO(buffer)
- stream, sample_rate = sf.read(byte_stream) # [T*sample_rate,] float64
- logger.info(f'[INFO]tts audio stream {sample_rate}: {stream.shape}')
- stream = stream.astype(np.float32)
- if stream.ndim > 1:
- logger.info(f'[WARN] audio has {stream.shape[1]} channels, only use the first.')
- stream = stream[:, 0]
-
- if sample_rate != self.sample_rate and stream.shape[0]>0:
- logger.info(f'[WARN] audio sample rate is {sample_rate}, resampling into {self.sample_rate}.')
- stream = resampy.resample(x=stream, sr_orig=sample_rate, sr_new=self.sample_rate)
- return stream
- def stream_tts(self,audio_stream,msg:tuple[str, dict]):
- text,textevent = msg
- first = True
- for chunk in audio_stream:
- if chunk is not None and len(chunk)>0:
- #stream = np.frombuffer(chunk, dtype=np.int16).astype(np.float32) / 32767
- #stream = resampy.resample(x=stream, sr_orig=32000, sr_new=self.sample_rate)
- byte_stream=BytesIO(chunk)
- stream = self.__create_bytes_stream(byte_stream)
- streamlen = stream.shape[0]
- idx=0
- while streamlen >= self.chunk:
- eventpoint={}
- if first:
- eventpoint={'status':'start','text':text}
- eventpoint.update(**textevent)
- first = False
- self.parent.put_audio_frame(stream[idx:idx+self.chunk],eventpoint)
- streamlen -= self.chunk
- idx += self.chunk
- eventpoint={'status':'end','text':text}
- eventpoint.update(**textevent)
- self.parent.put_audio_frame(np.zeros(self.chunk,np.float32),eventpoint)
- ###########################################################################################
- class CosyVoiceTTS(BaseTTS):
- def txt_to_audio(self,msg:tuple[str, dict]):
- text,textevent = msg
- self.stream_tts(
- self.cosy_voice(
- text,
- self.opt.REF_FILE,
- self.opt.REF_TEXT,
- "zh", #en args.language,
- self.opt.TTS_SERVER, #"http://127.0.0.1:5000", #args.server_url,
- ),
- msg
- )
- def cosy_voice(self, text, reffile, reftext,language, server_url) -> Iterator[bytes]:
- start = time.perf_counter()
- payload = {
- 'tts_text': text,
- 'prompt_text': reftext
- }
- try:
- files = [('prompt_wav', ('prompt_wav', open(reffile, 'rb'), 'application/octet-stream'))]
- res = requests.request("GET", f"{server_url}/inference_zero_shot", data=payload, files=files, stream=True)
-
- end = time.perf_counter()
- logger.info(f"cosy_voice Time to make POST: {end-start}s")
- if res.status_code != 200:
- logger.error("Error:%s", res.text)
- return
-
- first = True
-
- for chunk in res.iter_content(chunk_size=9600): # 960 24K*20ms*2
- if first:
- end = time.perf_counter()
- logger.info(f"cosy_voice Time to first chunk: {end-start}s")
- first = False
- if chunk and self.state==State.RUNNING:
- yield chunk
- except Exception as e:
- logger.exception('cosyvoice')
- def stream_tts(self,audio_stream,msg:tuple[str, dict]):
- text,textevent = msg
- first = True
- for chunk in audio_stream:
- if chunk is not None and len(chunk)>0:
- stream = np.frombuffer(chunk, dtype=np.int16).astype(np.float32) / 32767
- stream = resampy.resample(x=stream, sr_orig=24000, sr_new=self.sample_rate)
- #byte_stream=BytesIO(buffer)
- #stream = self.__create_bytes_stream(byte_stream)
- streamlen = stream.shape[0]
- idx=0
- while streamlen >= self.chunk:
- eventpoint={}
- if first:
- eventpoint={'status':'start','text':text}
- eventpoint.update(**textevent)
- first = False
- self.parent.put_audio_frame(stream[idx:idx+self.chunk],eventpoint)
- streamlen -= self.chunk
- idx += self.chunk
- eventpoint={'status':'end','text':text}
- eventpoint.update(**textevent)
- self.parent.put_audio_frame(np.zeros(self.chunk,np.float32),eventpoint)
- ###########################################################################################
- _PROTOCOL = "https://"
- _HOST = "tts.cloud.tencent.com"
- _PATH = "/stream"
- _ACTION = "TextToStreamAudio"
- class TencentTTS(BaseTTS):
- def __init__(self, opt, parent):
- super().__init__(opt,parent)
- self.appid = os.getenv("TENCENT_APPID")
- self.secret_key = os.getenv("TENCENT_SECRET_KEY")
- self.secret_id = os.getenv("TENCENT_SECRET_ID")
- self.voice_type = int(opt.REF_FILE)
- self.codec = "pcm"
- self.sample_rate = 16000
- self.volume = 0
- self.speed = 0
-
- def __gen_signature(self, params):
- sort_dict = sorted(params.keys())
- sign_str = "POST" + _HOST + _PATH + "?"
- for key in sort_dict:
- sign_str = sign_str + key + "=" + str(params[key]) + '&'
- sign_str = sign_str[:-1]
- hmacstr = hmac.new(self.secret_key.encode('utf-8'),
- sign_str.encode('utf-8'), hashlib.sha1).digest()
- s = base64.b64encode(hmacstr)
- s = s.decode('utf-8')
- return s
- def __gen_params(self, session_id, text):
- params = dict()
- params['Action'] = _ACTION
- params['AppId'] = int(self.appid)
- params['SecretId'] = self.secret_id
- params['ModelType'] = 1
- params['VoiceType'] = self.voice_type
- params['Codec'] = self.codec
- params['SampleRate'] = self.sample_rate
- params['Speed'] = self.speed
- params['Volume'] = self.volume
- params['SessionId'] = session_id
- params['Text'] = text
- timestamp = int(time.time())
- params['Timestamp'] = timestamp
- params['Expired'] = timestamp + 24 * 60 * 60
- return params
- def txt_to_audio(self,msg:tuple[str, dict]):
- text,textevent = msg
- self.stream_tts(
- self.tencent_voice(
- text,
- self.opt.REF_FILE,
- self.opt.REF_TEXT,
- "zh", #en args.language,
- self.opt.TTS_SERVER, #"http://127.0.0.1:5000", #args.server_url,
- ),
- msg
- )
- def tencent_voice(self, text, reffile, reftext,language, server_url) -> Iterator[bytes]:
- start = time.perf_counter()
- session_id = str(uuid.uuid1())
- params = self.__gen_params(session_id, text)
- signature = self.__gen_signature(params)
- headers = {
- "Content-Type": "application/json",
- "Authorization": str(signature)
- }
- url = _PROTOCOL + _HOST + _PATH
- try:
- res = requests.post(url, headers=headers,
- data=json.dumps(params), stream=True)
-
- end = time.perf_counter()
- logger.info(f"tencent Time to make POST: {end-start}s")
-
- first = True
-
- for chunk in res.iter_content(chunk_size=6400): # 640 16K*20ms*2
- #logger.info('chunk len:%d',len(chunk))
- if first:
- try:
- rsp = json.loads(chunk)
- #response["Code"] = rsp["Response"]["Error"]["Code"]
- #response["Message"] = rsp["Response"]["Error"]["Message"]
- logger.error("tencent tts:%s",rsp["Response"]["Error"]["Message"])
- return
- except:
- end = time.perf_counter()
- logger.info(f"tencent Time to first chunk: {end-start}s")
- first = False
- if chunk and self.state==State.RUNNING:
- yield chunk
- except Exception as e:
- logger.exception('tencent')
- def stream_tts(self,audio_stream,msg:tuple[str, dict]):
- text,textevent = msg
- first = True
- last_stream = np.array([],dtype=np.float32)
- for chunk in audio_stream:
- if chunk is not None and len(chunk)>0:
- stream = np.frombuffer(chunk, dtype=np.int16).astype(np.float32) / 32767
- stream = np.concatenate((last_stream,stream))
- #stream = resampy.resample(x=stream, sr_orig=24000, sr_new=self.sample_rate)
- #byte_stream=BytesIO(buffer)
- #stream = self.__create_bytes_stream(byte_stream)
- streamlen = stream.shape[0]
- idx=0
- while streamlen >= self.chunk:
- eventpoint={}
- if first:
- eventpoint={'status':'start','text':text}
- eventpoint.update(**textevent)
- first = False
- self.parent.put_audio_frame(stream[idx:idx+self.chunk],eventpoint)
- streamlen -= self.chunk
- idx += self.chunk
- last_stream = stream[idx:] #get the remain stream
- eventpoint={'status':'end','text':text}
- eventpoint.update(**textevent)
- self.parent.put_audio_frame(np.zeros(self.chunk,np.float32),eventpoint)
- ###########################################################################################
- class DoubaoTTS(BaseTTS):
- def __init__(self, opt, parent):
- super().__init__(opt, parent)
- # 从配置中读取火山引擎参数
- self.appid = os.getenv("DOUBAO_APPID")
- self.token = os.getenv("DOUBAO_TOKEN")
- _cluster = 'volcano_tts'
- _host = "openspeech.bytedance.com"
- self.api_url = f"wss://{_host}/api/v1/tts/ws_binary"
-
- self.request_json = {
- "app": {
- "appid": self.appid,
- "token": "access_token",
- "cluster": _cluster
- },
- "user": {
- "uid": "xxx"
- },
- "audio": {
- "voice_type": "xxx",
- "encoding": "pcm",
- "rate": 16000,
- "speed_ratio": 1.0,
- "volume_ratio": 1.0,
- "pitch_ratio": 1.0,
- },
- "request": {
- "reqid": "xxx",
- "text": "字节跳动语音合成。",
- "text_type": "plain",
- "operation": "xxx"
- }
- }
- async def doubao_voice(self, text): # -> Iterator[bytes]:
- start = time.perf_counter()
- voice_type = self.opt.REF_FILE
- try:
- # 创建请求对象
- default_header = bytearray(b'\x11\x10\x11\x00')
- submit_request_json = copy.deepcopy(self.request_json)
- submit_request_json["user"]["uid"] = self.parent.sessionid
- submit_request_json["audio"]["voice_type"] = voice_type
- submit_request_json["request"]["text"] = text
- submit_request_json["request"]["reqid"] = str(uuid.uuid4())
- submit_request_json["request"]["operation"] = "submit"
- payload_bytes = str.encode(json.dumps(submit_request_json))
- payload_bytes = gzip.compress(payload_bytes) # if no compression, comment this line
- full_client_request = bytearray(default_header)
- full_client_request.extend((len(payload_bytes)).to_bytes(4, 'big')) # payload size(4 bytes)
- full_client_request.extend(payload_bytes) # payload
- header = {"Authorization": f"Bearer; {self.token}"}
- first = True
- async with websockets.connect(self.api_url, extra_headers=header, ping_interval=None) as ws:
- await ws.send(full_client_request)
- while True:
- res = await ws.recv()
- header_size = res[0] & 0x0f
- message_type = res[1] >> 4
- message_type_specific_flags = res[1] & 0x0f
- payload = res[header_size*4:]
- if message_type == 0xb: # audio-only server response
- if message_type_specific_flags == 0: # no sequence number as ACK
- #print(" Payload size: 0")
- continue
- else:
- if first:
- end = time.perf_counter()
- logger.info(f"doubao tts Time to first chunk: {end-start}s")
- first = False
- sequence_number = int.from_bytes(payload[:4], "big", signed=True)
- payload_size = int.from_bytes(payload[4:8], "big", signed=False)
- payload = payload[8:]
- yield payload
- if sequence_number < 0:
- break
- else:
- break
- except Exception as e:
- logger.exception('doubao')
- # # 检查响应状态码
- # if response.status_code == 200:
- # # 处理响应数据
- # audio_data = base64.b64decode(response.json().get('data'))
- # yield audio_data
- # else:
- # logger.error(f"请求失败,状态码: {response.status_code}")
- # return
- def txt_to_audio(self, msg:tuple[str, dict]):
- text, textevent = msg
- asyncio.new_event_loop().run_until_complete(
- self.stream_tts(
- self.doubao_voice(text),
- msg
- )
- )
- async def stream_tts(self, audio_stream, msg:tuple[str, dict]):
- text, textevent = msg
- first = True
- last_stream = np.array([],dtype=np.float32)
- async for chunk in audio_stream:
- if chunk is not None and len(chunk) > 0:
- stream = np.frombuffer(chunk, dtype=np.int16).astype(np.float32) / 32767
- stream = np.concatenate((last_stream,stream))
- #stream = resampy.resample(x=stream, sr_orig=24000, sr_new=self.sample_rate)
- # byte_stream=BytesIO(buffer)
- # stream = self.__create_bytes_stream(byte_stream)
- streamlen = stream.shape[0]
- idx = 0
- while streamlen >= self.chunk:
- eventpoint = {}
- if first:
- eventpoint={'status':'start','text':text}
- eventpoint.update(**textevent)
- first = False
- self.parent.put_audio_frame(stream[idx:idx + self.chunk], eventpoint)
- streamlen -= self.chunk
- idx += self.chunk
- last_stream = stream[idx:] #get the remain stream
- eventpoint={'status':'end','text':text}
- eventpoint.update(**textevent)
- self.parent.put_audio_frame(np.zeros(self.chunk, np.float32), eventpoint)
- ###########################################################################################
- class IndexTTS2(BaseTTS):
- def __init__(self, opt, parent):
- super().__init__(opt, parent)
- # IndexTTS2 配置参数
- self.server_url = opt.TTS_SERVER # Gradio服务器地址,如 "http://127.0.0.1:7860/"
- self.ref_audio_path = opt.REF_FILE # 参考音频文件路径
- self.max_tokens = getattr(opt, 'MAX_TOKENS', 120) # 最大token数
-
- # 初始化Gradio客户端
- try:
- from gradio_client import Client, handle_file
- self.client = Client(self.server_url)
- self.handle_file = handle_file
- logger.info(f"IndexTTS2 Gradio客户端初始化成功: {self.server_url}")
- except ImportError:
- logger.error("IndexTTS2 需要安装 gradio_client: pip install gradio_client")
- raise
- except Exception as e:
- logger.error(f"IndexTTS2 Gradio客户端初始化失败: {e}")
- raise
-
- def txt_to_audio(self, msg):
- text, textevent = msg
- try:
- # 先进行文本分割
- segments = self.split_text(text)
- if not segments:
- logger.error("IndexTTS2 文本分割失败")
- return
-
- logger.info(f"IndexTTS2 文本分割为 {len(segments)} 个片段")
-
- # 循环生成每个片段的音频
- for i, segment_text in enumerate(segments):
- if self.state != State.RUNNING:
- break
-
- logger.info(f"IndexTTS2 正在生成第 {i+1}/{len(segments)} 段音频...")
- audio_file = self.indextts2_generate(segment_text)
-
- if audio_file:
- # 为每个片段创建事件信息
- segment_msg = (segment_text, textevent)
- self.file_to_stream(audio_file, segment_msg, is_first=(i==0), is_last=(i==len(segments)-1))
- else:
- logger.error(f"IndexTTS2 第 {i+1} 段音频生成失败")
-
- except Exception as e:
- logger.exception(f"IndexTTS2 txt_to_audio 错误: {e}")
- def split_text(self, text):
- """使用 IndexTTS2 API 分割文本"""
- try:
- logger.info(f"IndexTTS2 开始分割文本,长度: {len(text)}")
-
- # 调用文本分割 API
- result = self.client.predict(
- text=text,
- max_text_tokens_per_segment=self.max_tokens,
- api_name="/on_input_text_change"
- )
-
- # 解析分割结果
- if 'value' in result and 'data' in result['value']:
- data = result['value']['data']
- logger.info(f"IndexTTS2 共分割为 {len(data)} 个片段")
-
- segments = []
- for i, item in enumerate(data):
- 序号 = item[0] + 1
- 分句内容 = item[1]
- token数 = item[2]
- logger.info(f"片段 {序号}: {len(分句内容)} 字符, {token数} tokens")
- segments.append(分句内容)
-
- return segments
- else:
- logger.error(f"IndexTTS2 文本分割结果格式异常: {result}")
- return [text] # 如果分割失败,返回原文本
-
- except Exception as e:
- logger.exception(f"IndexTTS2 文本分割失败: {e}")
- return [text] # 如果分割失败,返回原文本
- def indextts2_generate(self, text):
- """调用 IndexTTS2 Gradio API 生成语音"""
- start = time.perf_counter()
-
- try:
- # 调用 gen_single API
- result = self.client.predict(
- emo_control_method="Same as the voice reference",
- prompt=self.handle_file(self.ref_audio_path),
- text=text,
- emo_ref_path=self.handle_file(self.ref_audio_path),
- emo_weight=0.8,
- vec1=0.5,
- vec2=0,
- vec3=0,
- vec4=0,
- vec5=0,
- vec6=0,
- vec7=0,
- vec8=0,
- emo_text="",
- emo_random=False,
- max_text_tokens_per_segment=self.max_tokens,
- param_16=True,
- param_17=0.8,
- param_18=30,
- param_19=0.8,
- param_20=0,
- param_21=3,
- param_22=10,
- param_23=1500,
- api_name="/gen_single"
- )
-
- end = time.perf_counter()
- logger.info(f"IndexTTS2 片段生成完成,耗时: {end-start:.2f}s")
-
- # 返回生成的音频文件路径
- if 'value' in result:
- audio_file = result['value']
- return audio_file
- else:
- logger.error(f"IndexTTS2 结果格式异常: {result}")
- return None
-
- except Exception as e:
- logger.exception(f"IndexTTS2 API调用失败: {e}")
- return None
- def file_to_stream(self, audio_file, msg, is_first=False, is_last=False):
- """将音频文件转换为音频流"""
- text, textevent = msg
-
- try:
- # 读取音频文件
- stream, sample_rate = sf.read(audio_file)
- logger.info(f'IndexTTS2 音频文件 {sample_rate}Hz: {stream.shape}')
-
- # 转换为float32
- stream = stream.astype(np.float32)
-
- # 如果是多声道,只取第一个声道
- if stream.ndim > 1:
- logger.info(f'IndexTTS2 音频有 {stream.shape[1]} 个声道,只使用第一个')
- stream = stream[:, 0]
-
- # 重采样到目标采样率
- if sample_rate != self.sample_rate and stream.shape[0] > 0:
- logger.info(f'IndexTTS2 重采样: {sample_rate}Hz -> {self.sample_rate}Hz')
- stream = resampy.resample(x=stream, sr_orig=sample_rate, sr_new=self.sample_rate)
-
- # 分块发送音频流
- streamlen = stream.shape[0]
- idx = 0
- first_chunk = True
-
- while streamlen >= self.chunk and self.state == State.RUNNING:
- eventpoint = None
-
- # 只在第一个片段的第一个chunk发送start事件
- if is_first and first_chunk:
- eventpoint = {'status': 'start', 'text': text, 'msgevent': textevent}
- first_chunk = False
-
- self.parent.put_audio_frame(stream[idx:idx + self.chunk], eventpoint)
- idx += self.chunk
- streamlen -= self.chunk
-
- # 只在最后一个片段发送end事件
- if is_last:
- eventpoint = {'status': 'end', 'text': text, 'msgevent': textevent}
- self.parent.put_audio_frame(np.zeros(self.chunk, np.float32), eventpoint)
-
- # 清理临时文件
- try:
- if os.path.exists(audio_file):
- os.remove(audio_file)
- logger.info(f"IndexTTS2 已删除临时文件: {audio_file}")
- except Exception as e:
- logger.warning(f"IndexTTS2 删除临时文件失败: {e}")
-
- except Exception as e:
- logger.exception(f"IndexTTS2 音频流处理失败: {e}")
- ###########################################################################################
- class XTTS(BaseTTS):
- def __init__(self, opt, parent):
- super().__init__(opt,parent)
- self.speaker = self.get_speaker(opt.REF_FILE, opt.TTS_SERVER)
- def txt_to_audio(self,msg:tuple[str, dict]):
- text,textevent = msg
- self.stream_tts(
- self.xtts(
- text,
- self.speaker,
- "zh-cn", #en args.language,
- self.opt.TTS_SERVER, #"http://localhost:9000", #args.server_url,
- "20" #args.stream_chunk_size
- ),
- msg
- )
- def get_speaker(self,ref_audio,server_url):
- files = {"wav_file": ("reference.wav", open(ref_audio, "rb"))}
- response = requests.post(f"{server_url}/clone_speaker", files=files)
- return response.json()
- def xtts(self,text, speaker, language, server_url, stream_chunk_size) -> Iterator[bytes]:
- start = time.perf_counter()
- speaker["text"] = text
- speaker["language"] = language
- speaker["stream_chunk_size"] = stream_chunk_size # you can reduce it to get faster response, but degrade quality
- try:
- res = requests.post(
- f"{server_url}/tts_stream",
- json=speaker,
- stream=True,
- )
- end = time.perf_counter()
- logger.info(f"xtts Time to make POST: {end-start}s")
- if res.status_code != 200:
- print("Error:", res.text)
- return
- first = True
-
- for chunk in res.iter_content(chunk_size=None): #24K*20ms*2
- if first:
- end = time.perf_counter()
- logger.info(f"xtts Time to first chunk: {end-start}s")
- first = False
- if chunk:
- yield chunk
- except Exception as e:
- print(e)
-
- def stream_tts(self,audio_stream,msg:tuple[str, dict]):
- text,textevent = msg
- first = True
- last_stream = np.array([],dtype=np.float32)
- for chunk in audio_stream:
- if chunk is not None and len(chunk)>0:
- stream = np.frombuffer(chunk, dtype=np.int16).astype(np.float32) / 32767
- stream = resampy.resample(x=stream, sr_orig=24000, sr_new=self.sample_rate)
- stream = np.concatenate((last_stream,stream))
- #byte_stream=BytesIO(buffer)
- #stream = self.__create_bytes_stream(byte_stream)
- streamlen = stream.shape[0]
- idx=0
- while streamlen >= self.chunk:
- eventpoint={}
- if first:
- eventpoint={'status':'start','text':text}
- eventpoint.update(**textevent)
- first = False
- self.parent.put_audio_frame(stream[idx:idx+self.chunk],eventpoint)
- streamlen -= self.chunk
- idx += self.chunk
- last_stream = stream[idx:] #get the remain stream
- eventpoint={'status':'end','text':text}
- eventpoint.update(**textevent)
- self.parent.put_audio_frame(np.zeros(self.chunk,np.float32),eventpoint)
- ###########################################################################################
- class AzureTTS(BaseTTS):
- CHUNK_SIZE = 640 # 16kHz, 20ms, 16-bit Mono PCM size
- def __init__(self, opt, parent):
- super().__init__(opt,parent)
- self.audio_buffer = b''
- voicename = self.opt.REF_FILE # 比如"zh-CN-XiaoxiaoMultilingualNeural"
- speech_key = os.getenv("AZURE_SPEECH_KEY")
- tts_region = os.getenv("AZURE_TTS_REGION")
- speech_endpoint = f"wss://{tts_region}.tts.speech.microsoft.com/cognitiveservices/websocket/v2"
- speech_config = speechsdk.SpeechConfig(subscription=speech_key,endpoint=speech_endpoint)
- speech_config.speech_synthesis_voice_name = voicename
- speech_config.set_speech_synthesis_output_format(speechsdk.SpeechSynthesisOutputFormat.Raw16Khz16BitMonoPcm)
-
- # 获取内存中流形式的结果
- self.speech_synthesizer = speechsdk.SpeechSynthesizer(speech_config=speech_config, audio_config=None)
- self.speech_synthesizer.synthesizing.connect(self._on_synthesizing)
-
- def txt_to_audio(self,msg:tuple[str, dict]):
- msg_text: str = msg[0]
- result=self.speech_synthesizer.speak_text(msg_text)
-
- # 延迟指标
- fb_latency = int(result.properties.get_property(
- speechsdk.PropertyId.SpeechServiceResponse_SynthesisFirstByteLatencyMs
- ))
- fin_latency = int(result.properties.get_property(
- speechsdk.PropertyId.SpeechServiceResponse_SynthesisFinishLatencyMs
- ))
- logger.info(f"azure音频生成相关:首字节延迟: {fb_latency} ms, 完成延迟: {fin_latency} ms, result_id: {result.result_id}")
- # === 回调 ===
- def _on_synthesizing(self, evt: speechsdk.SpeechSynthesisEventArgs):
- if evt.result.reason == speechsdk.ResultReason.SynthesizingAudioCompleted:
- logger.info("SynthesizingAudioCompleted")
- elif evt.result.reason == speechsdk.ResultReason.Canceled:
- cancellation_details = evt.result.cancellation_details
- logger.info(f"Speech synthesis canceled: {cancellation_details.reason}")
- if cancellation_details.reason == speechsdk.CancellationReason.Error:
- if cancellation_details.error_details:
- logger.info(f"Error details: {cancellation_details.error_details}")
- if self.state != State.RUNNING:
- self.audio_buffer = b''
- return
- # evt.result.audio_data 是刚到的一小段原始 PCM
- self.audio_buffer += evt.result.audio_data
- while len(self.audio_buffer) >= self.CHUNK_SIZE:
- chunk = self.audio_buffer[:self.CHUNK_SIZE]
- self.audio_buffer = self.audio_buffer[self.CHUNK_SIZE:]
- frame = (np.frombuffer(chunk, dtype=np.int16)
- .astype(np.float32) / 32767.0)
- self.parent.put_audio_frame(frame)
|