ttsreal.py 61 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445
  1. from __future__ import annotations
  2. import time
  3. import numpy as np
  4. import soundfile as sf
  5. import resampy
  6. import asyncio
  7. import torch
  8. import os
  9. import hmac
  10. import hashlib
  11. import base64
  12. import json
  13. import uuid
  14. import threading
  15. from typing import Iterator
  16. import requests
  17. import queue
  18. from queue import Queue
  19. from io import BytesIO
  20. import copy,websockets,gzip
  21. import azure.cognitiveservices.speech as speechsdk
  22. from threading import Thread, Event
  23. from enum import Enum
  24. from typing import TYPE_CHECKING
  25. if TYPE_CHECKING:
  26. from basereal import BaseReal
  27. from logger import logger
  28. class State(Enum):
  29. RUNNING=0
  30. PAUSE=1
  31. class BaseTTS:
  32. def __init__(self, opt, parent:BaseReal):
  33. self.opt=opt
  34. self.parent = parent
  35. self.fps = opt.fps # 20 ms per frame
  36. self.sample_rate = 16000
  37. self.chunk = self.sample_rate // self.fps # 320 samples per chunk (20ms * 16000 / 1000)
  38. self.input_stream = BytesIO()
  39. # 添加大小限制防止内存泄漏
  40. self.msgqueue = Queue(maxsize=1000) # 最多1000条消息
  41. self.high_priority_queue = Queue(maxsize=100) # 高优先级队列,最多100条
  42. self.state = State.RUNNING
  43. # 添加属性来存储被中断的消息
  44. self.interrupted_messages = [] # 存储被中断的消息
  45. # 添加属性来跟踪当前正在处理的消息
  46. self.current_msg = None # 当前正在处理的消息
  47. self.current_msg_progress = 0 # 当前消息的进度(字符位置)
  48. self.interrupt_flag = threading.Event() # 添加打断事件
  49. def reset_interrupt_flag(self):
  50. """重置打断标志"""
  51. self.interrupt_flag.clear()
  52. def set_interrupt_flag(self):
  53. """设置打断标志"""
  54. self.interrupt_flag.set()
  55. def _trigger_continue_play(self):
  56. """触发自动续播下一条介绍内容"""
  57. try:
  58. # 检查是否处于介绍播放状态
  59. if (hasattr(self.parent, 'intro_play_state') and
  60. self.parent.intro_play_state.get('is_playing', False) and
  61. not self.parent.intro_play_state.get('is_paused', False) and
  62. not self.parent.intro_play_state.get('is_waiting_next', False)):
  63. # 检查是否有高优先级消息在等待,如果有则不触发续播
  64. if not self.high_priority_queue.empty():
  65. logger.info("有高优先级消息等待,跳过自动续播")
  66. return
  67. # 检查消息队列是否为空,如果不为空说明还有消息在等待,不触发续播
  68. if not self.msgqueue.empty():
  69. logger.info("消息队列不为空,跳过自动续播")
  70. return
  71. # 触发续播
  72. self.parent._continue_intro_play()
  73. else:
  74. if hasattr(self.parent, 'intro_play_state') and self.parent.intro_play_state.get('is_waiting_next', False):
  75. logger.info("正在等待前一条播放完成,跳过本次触发")
  76. except Exception as e:
  77. logger.error(f"触发自动续播时出错: {e}")
  78. def flush_talk(self):
  79. # 停止当前播放并清空待处理的消息队列,但保留当前正在处理的状态
  80. # 这样可以确保打断后不会播放队列中积累的旧消息
  81. with self.msgqueue.mutex: # 使用队列的互斥锁确保线程安全
  82. # 保存队列中的剩余消息到中断列表
  83. remaining_msgs = list(self.msgqueue.queue)
  84. if remaining_msgs:
  85. self.interrupted_messages.extend(remaining_msgs)
  86. self.msgqueue.queue.clear() # 清空队列中等待的消息
  87. # 如果当前有正在处理的消息,也要将其保存到中断列表
  88. if self.current_msg:
  89. # 将当前消息加入中断列表
  90. self.interrupted_messages.append(self.current_msg)
  91. # 清空当前消息引用,因为已经被中断
  92. self.current_msg = None
  93. self.state = State.PAUSE
  94. # 清除当前正在处理的音频缓冲区
  95. if hasattr(self, 'input_stream') and hasattr(self.input_stream, 'seek') and hasattr(self.input_stream, 'truncate'):
  96. self.input_stream.seek(0)
  97. self.input_stream.truncate()
  98. def resume_interrupted(self):
  99. """恢复播放被中断的消息"""
  100. if self.interrupted_messages:
  101. # 将被中断的消息重新加入队列
  102. with self.msgqueue.mutex:
  103. for msg in self.interrupted_messages:
  104. self.msgqueue.put(msg)
  105. # 清空中断消息列表
  106. self.interrupted_messages.clear()
  107. # 将状态设置为运行,以便继续处理消息
  108. self.state = State.RUNNING
  109. return True
  110. return False
  111. def put_msg_txt(self,msg:str,datainfo:dict={}):
  112. if len(msg)>0:
  113. # 对于长文本,按句子分割以支持更好的打断功能
  114. if len(msg) > 100: # 如果文本超过100字符,进行分割
  115. import re
  116. # 按标点符号分割文本,保留分隔符
  117. sentences = re.split(r'([。!?.!?])', msg)
  118. # 将句子和标点符号重新组合
  119. parts = []
  120. for i in range(0, len(sentences)-1, 2):
  121. sentence = sentences[i]
  122. punctuation = sentences[i+1] if i+1 < len(sentences) else ''
  123. if sentence.strip():
  124. parts.append(sentence.strip() + punctuation)
  125. # 如果分割后有多个部分,分别放入队列
  126. if len(parts) > 1:
  127. for part in parts:
  128. if part.strip():
  129. self.msgqueue.put((part, datainfo))
  130. return
  131. # 短文本或无法分割的文本直接放入队列
  132. self.msgqueue.put((msg, datainfo))
  133. def put_high_priority_msg(self,msg:str,datainfo:dict={}):
  134. """添加高优先级消息,会优先处理"""
  135. if len(msg)>0:
  136. # 对于长文本,按句子分割以支持更好的打断功能
  137. if len(msg) > 100: # 如果文本超过100字符,进行分割
  138. import re
  139. # 按标点符号分割文本,保留分隔符
  140. sentences = re.split(r'([。!?.!?])', msg)
  141. # 将句子和标点符号重新组合
  142. parts = []
  143. for i in range(0, len(sentences)-1, 2):
  144. sentence = sentences[i]
  145. punctuation = sentences[i+1] if i+1 < len(sentences) else ''
  146. if sentence.strip():
  147. parts.append(sentence.strip() + punctuation)
  148. # 如果分割后有多个部分,分别放入高优先级队列
  149. if len(parts) > 1:
  150. for part in parts:
  151. if part.strip():
  152. self.high_priority_queue.put((part, datainfo))
  153. return
  154. # 短文本或无法分割的文本直接放入高优先级队列
  155. self.high_priority_queue.put((msg, datainfo))
  156. def render(self,quit_event):
  157. process_thread = Thread(target=self.process_tts, args=(quit_event,))
  158. process_thread.start()
  159. def process_tts(self,quit_event):
  160. while not quit_event.is_set():
  161. try:
  162. # 检查状态是否为RUNNING,如果不是,则短暂等待后继续
  163. # 优先检查高优先级队列 - 每次循环都检查高优先级队列
  164. msg = None
  165. try:
  166. # 首先检查高优先级队列,不等待,立即返回
  167. if not self.high_priority_queue.empty():
  168. msg = self.high_priority_queue.get_nowait()
  169. # 处理高优先级消息时,确保状态为RUNNING
  170. self.state = State.RUNNING
  171. logger.info("处理高优先级消息")
  172. else:
  173. # 如果高优先级队列为空,检查普通队列
  174. if self.state != State.RUNNING and not self.msgqueue.empty():
  175. # 如果队列不为空但状态不是RUNNING,等待一段时间后重试
  176. import time
  177. time.sleep(0.1) # 短暂等待
  178. continue
  179. msg = self.msgqueue.get(block=True, timeout=0.05) # 使用较短超时时间,以便快速检查高优先级队列
  180. except queue.Empty:
  181. # 如果两个队列都没有消息,继续等待
  182. continue
  183. # 检查是否是唤醒消息(空消息),如果是则跳过处理,继续循环检查高优先级队列
  184. if msg and len(msg) >= 2 and isinstance(msg[0], str) and not msg[0].strip():
  185. continue # 跳过空消息,继续检查队列
  186. # 记录当前正在处理的消息
  187. self.current_msg = msg
  188. self.current_msg_progress = 0 # 重置进度
  189. except queue.Empty:
  190. continue
  191. # 在处理音频前再次检查状态,如果状态已改变则跳过处理
  192. if self.state == State.RUNNING:
  193. # 检查是否有高优先级消息,如果有则优先处理
  194. if not self.high_priority_queue.empty():
  195. logger.info("发现高优先级消息,中断当前普通消息处理")
  196. # 保存当前消息到中断队列
  197. if msg:
  198. self.interrupted_messages.append(msg)
  199. # 处理高优先级消息
  200. high_priority_msg = self.high_priority_queue.get_nowait()
  201. self.txt_to_audio(high_priority_msg)
  202. # 高优先级消息不触发自动续播
  203. else:
  204. # 处理普通消息
  205. self.txt_to_audio(msg)
  206. # 注意:自动续播不再在这里触发,而是在 _continue_intro_play 中通过定时器控制
  207. # 消息处理完成后,清空当前消息
  208. self.current_msg = None
  209. self.current_msg_progress = 0
  210. logger.info('ttsreal thread stop')
  211. def txt_to_audio(self,msg:tuple[str, dict]):
  212. pass
  213. ###########################################################################################
  214. class EdgeTTS(BaseTTS):
  215. def txt_to_audio(self,msg:tuple[str, dict]):
  216. voicename = self.opt.REF_FILE #"zh-CN-YunxiaNeural"
  217. text,textevent = msg
  218. t = time.time()
  219. # 在开始TTS请求前检查状态,如果状态已改变则跳过
  220. if self.state != State.RUNNING:
  221. return
  222. # 处理空消息,直接返回
  223. if not text.strip():
  224. return
  225. # 重置打断标志,确保高优先级消息能够正常处理
  226. self.reset_interrupt_flag()
  227. # 检查是否有高优先级消息,如果有则立即返回,让process_tts处理
  228. if not self.high_priority_queue.empty():
  229. logger.info("发现高优先级消息,跳过当前普通消息处理")
  230. return
  231. # 使用异步方式处理TTS请求,以便能够响应中断
  232. loop = asyncio.new_event_loop()
  233. asyncio.set_event_loop(loop)
  234. # 创建一个任务来处理TTS请求
  235. task = loop.create_task(self.__main(voicename, text))
  236. # 定期检查中断标志和高优先级队列
  237. while not task.done():
  238. if self.interrupt_flag.is_set() or not self.high_priority_queue.empty():
  239. task.cancel()
  240. logger.info("TTS请求被高优先级消息中断")
  241. break
  242. loop.run_until_complete(asyncio.sleep(0.05)) # 缩短检查间隔,提高响应速度
  243. # 等待任务完成或取消
  244. try:
  245. loop.run_until_complete(task)
  246. except asyncio.CancelledError:
  247. logger.info("TTS请求被中断")
  248. self.input_stream.seek(0)
  249. self.input_stream.truncate()
  250. return
  251. logger.info(f'-------edge tts time:{time.time()-t:.4f}s')
  252. if self.input_stream.getbuffer().nbytes<=0: #edgetts err
  253. logger.error('edgetts err!!!!!')
  254. return
  255. self.input_stream.seek(0)
  256. stream = self.__create_bytes_stream(self.input_stream)
  257. streamlen = stream.shape[0]
  258. idx=0
  259. # 在播放前再次检查状态,如果状态已改变则跳过播放
  260. if self.state != State.RUNNING:
  261. self.input_stream.seek(0)
  262. self.input_stream.truncate()
  263. return
  264. # 检查打断标志或高优先级队列
  265. if self.interrupt_flag.is_set() or not self.high_priority_queue.empty():
  266. self.input_stream.seek(0)
  267. self.input_stream.truncate()
  268. return
  269. while streamlen >= self.chunk and self.state==State.RUNNING:
  270. # 每次循环都检查打断标志和高优先级队列
  271. if self.interrupt_flag.is_set() or not self.high_priority_queue.empty():
  272. logger.info("播放过程中发现高优先级消息,中断播放")
  273. break
  274. eventpoint={}
  275. streamlen -= self.chunk
  276. if idx==0:
  277. eventpoint={'status':'start','text':text}
  278. eventpoint.update(**textevent) #eventpoint={'status':'start','text':text,'msgevent':textevent}
  279. elif streamlen<self.chunk:
  280. eventpoint={'status':'end','text':text}
  281. eventpoint.update(**textevent) #eventpoint={'status':'end','text':text,'msgevent':textevent}
  282. # 在发送音频帧之前再次检查状态
  283. if self.state != State.RUNNING:
  284. break
  285. # 检查打断标志和高优先级队列
  286. if self.interrupt_flag.is_set() or not self.high_priority_queue.empty():
  287. logger.info("发送音频帧前发现高优先级消息,中断播放")
  288. break
  289. self.parent.put_audio_frame(stream[idx:idx+self.chunk],eventpoint)
  290. idx += self.chunk
  291. #if streamlen>0: #skip last frame(not 20ms)
  292. # self.queue.put(stream[idx:])
  293. self.input_stream.seek(0)
  294. self.input_stream.truncate()
  295. def __create_bytes_stream(self,byte_stream):
  296. #byte_stream=BytesIO(buffer)
  297. stream, sample_rate = sf.read(byte_stream) # [T*sample_rate,] float64
  298. logger.info(f'[INFO]tts audio stream {sample_rate}: {stream.shape}')
  299. stream = stream.astype(np.float32)
  300. if stream.ndim > 1:
  301. logger.info(f'[WARN] audio has {stream.shape[1]} channels, only use the first.')
  302. stream = stream[:, 0]
  303. if sample_rate != self.sample_rate and stream.shape[0]>0:
  304. logger.info(f'[WARN] audio sample rate is {sample_rate}, resampling into {self.sample_rate}.')
  305. stream = resampy.resample(x=stream, sr_orig=sample_rate, sr_new=self.sample_rate)
  306. return stream
  307. async def __main(self,voicename: str, text: str):
  308. """通过本地 API 调用 Edge TTS 服务"""
  309. try:
  310. # 使用同步方式调用 HTTP API(在异步函数中使用线程池)
  311. loop = asyncio.get_event_loop()
  312. # 准备请求参数
  313. headers = {"Content-Type": "application/json"}
  314. data = {
  315. "text": text,
  316. "voice": voicename,
  317. "rate": "+0%",
  318. "volume": "+0%",
  319. "pitch": "+0Hz"
  320. }
  321. # 在线程池中执行同步请求,避免阻塞事件循环
  322. def make_request():
  323. resp = requests.post(
  324. "http://127.0.0.1:1024/tts",
  325. json=data,
  326. headers=headers,
  327. timeout=30
  328. )
  329. return resp
  330. # 使用线程池执行同步请求
  331. resp = await loop.run_in_executor(None, make_request)
  332. logger.info(f"Edge TTS API 响应状态码:{resp.status_code}")
  333. # 成功返回音频流
  334. if resp.status_code == 200 and str(resp.headers.get("Content-Type", "")).startswith("audio/"):
  335. # 将音频数据写入输入流
  336. self.input_stream.write(resp.content)
  337. logger.info(f"✅ 成功从 API 获取音频数据,大小:{len(resp.content)} bytes")
  338. else:
  339. # 非 200 时解析错误并抛出
  340. try:
  341. detail = resp.json()
  342. except Exception:
  343. detail = resp.text
  344. logger.error(f"⚠️ TTS API 接口返回错误:{detail}")
  345. resp.raise_for_status()
  346. except requests.exceptions.HTTPError as e:
  347. logger.error(f"HTTP 错误:{e}")
  348. except requests.exceptions.ConnectionError:
  349. logger.error(f"❌ 无法连接到 TTS API 服务器 (http://127.0.0.1:1024/tts),请检查服务是否在线")
  350. except requests.exceptions.Timeout:
  351. logger.error(f"❌ TTS API 请求超时")
  352. except Exception as e:
  353. logger.error(f"其他错误:{str(e)}")
  354. ###########################################################################################
  355. class FishTTS(BaseTTS):
  356. def txt_to_audio(self,msg:tuple[str, dict]):
  357. text,textevent = msg
  358. self.stream_tts(
  359. self.fish_speech(
  360. text,
  361. self.opt.REF_FILE,
  362. self.opt.REF_TEXT,
  363. "zh", #en args.language,
  364. self.opt.TTS_SERVER, #"http://127.0.0.1:5000", #args.server_url,
  365. ),
  366. msg
  367. )
  368. def fish_speech(self, text, reffile, reftext,language, server_url) -> Iterator[bytes]:
  369. start = time.perf_counter()
  370. req={
  371. 'text':text,
  372. 'reference_id':reffile,
  373. 'format':'wav',
  374. 'streaming':True,
  375. 'use_memory_cache':'on'
  376. }
  377. try:
  378. res = requests.post(
  379. f"{server_url}/v1/tts",
  380. json=req,
  381. stream=True,
  382. headers={
  383. "content-type": "application/json",
  384. },
  385. )
  386. end = time.perf_counter()
  387. logger.info(f"fish_speech Time to make POST: {end-start}s")
  388. if res.status_code != 200:
  389. logger.error("Error:%s", res.text)
  390. return
  391. first = True
  392. for chunk in res.iter_content(chunk_size=17640): # 1764 44100*20ms*2
  393. #print('chunk len:',len(chunk))
  394. if first:
  395. end = time.perf_counter()
  396. logger.info(f"fish_speech Time to first chunk: {end-start}s")
  397. first = False
  398. if chunk and self.state==State.RUNNING:
  399. yield chunk
  400. #print("gpt_sovits response.elapsed:", res.elapsed)
  401. except Exception as e:
  402. logger.exception('fishtts')
  403. def stream_tts(self,audio_stream,msg:tuple[str, dict]):
  404. text,textevent = msg
  405. first = True
  406. for chunk in audio_stream:
  407. if chunk is not None and len(chunk)>0:
  408. stream = np.frombuffer(chunk, dtype=np.int16).astype(np.float32) / 32767
  409. stream = resampy.resample(x=stream, sr_orig=44100, sr_new=self.sample_rate)
  410. #byte_stream=BytesIO(buffer)
  411. #stream = self.__create_bytes_stream(byte_stream)
  412. streamlen = stream.shape[0]
  413. idx=0
  414. while streamlen >= self.chunk:
  415. eventpoint={}
  416. if first:
  417. eventpoint={'status':'start','text':text}
  418. eventpoint.update(**textevent) #eventpoint={'status':'start','text':text,'msgevent':textevent}
  419. first = False
  420. self.parent.put_audio_frame(stream[idx:idx+self.chunk],eventpoint)
  421. streamlen -= self.chunk
  422. idx += self.chunk
  423. eventpoint={'status':'end','text':text}
  424. eventpoint.update(**textevent) #eventpoint={'status':'end','text':text,'msgevent':textevent}
  425. self.parent.put_audio_frame(np.zeros(self.chunk,np.float32),eventpoint)
  426. ###########################################################################################
  427. class SovitsTTS(BaseTTS):
  428. def txt_to_audio(self,msg:tuple[str, dict]):
  429. text,textevent = msg
  430. self.stream_tts(
  431. self.gpt_sovits(
  432. text=text,
  433. reffile=self.opt.REF_FILE,
  434. reftext=self.opt.REF_TEXT,
  435. language="zh", #en args.language,
  436. server_url=self.opt.TTS_SERVER, #"http://127.0.0.1:5000", #args.server_url,
  437. ),
  438. msg
  439. )
  440. def gpt_sovits(self, text, reffile, reftext,language, server_url) -> Iterator[bytes]:
  441. start = time.perf_counter()
  442. req={
  443. 'text':text,
  444. 'text_lang':language,
  445. 'ref_audio_path':reffile,
  446. 'prompt_text':reftext,
  447. 'prompt_lang':language,
  448. 'media_type':'ogg',
  449. 'streaming_mode':True
  450. }
  451. # req["text"] = text
  452. # req["text_language"] = language
  453. # req["character"] = character
  454. # req["emotion"] = emotion
  455. # #req["stream_chunk_size"] = stream_chunk_size # you can reduce it to get faster response, but degrade quality
  456. # req["streaming_mode"] = True
  457. try:
  458. res = requests.post(
  459. f"{server_url}/tts",
  460. json=req,
  461. stream=True,
  462. )
  463. end = time.perf_counter()
  464. logger.info(f"gpt_sovits Time to make POST: {end-start}s")
  465. if res.status_code != 200:
  466. logger.error("Error:%s", res.text)
  467. return
  468. first = True
  469. for chunk in res.iter_content(chunk_size=None): #12800 1280 32K*20ms*2
  470. logger.info('chunk len:%d',len(chunk))
  471. if first:
  472. end = time.perf_counter()
  473. logger.info(f"gpt_sovits Time to first chunk: {end-start}s")
  474. first = False
  475. if chunk and self.state==State.RUNNING:
  476. yield chunk
  477. #print("gpt_sovits response.elapsed:", res.elapsed)
  478. except Exception as e:
  479. logger.exception('sovits')
  480. def __create_bytes_stream(self,byte_stream):
  481. #byte_stream=BytesIO(buffer)
  482. stream, sample_rate = sf.read(byte_stream) # [T*sample_rate,] float64
  483. logger.info(f'[INFO]tts audio stream {sample_rate}: {stream.shape}')
  484. stream = stream.astype(np.float32)
  485. if stream.ndim > 1:
  486. logger.info(f'[WARN] audio has {stream.shape[1]} channels, only use the first.')
  487. stream = stream[:, 0]
  488. if sample_rate != self.sample_rate and stream.shape[0]>0:
  489. logger.info(f'[WARN] audio sample rate is {sample_rate}, resampling into {self.sample_rate}.')
  490. stream = resampy.resample(x=stream, sr_orig=sample_rate, sr_new=self.sample_rate)
  491. return stream
  492. def stream_tts(self,audio_stream,msg:tuple[str, dict]):
  493. text,textevent = msg
  494. first = True
  495. for chunk in audio_stream:
  496. if chunk is not None and len(chunk)>0:
  497. #stream = np.frombuffer(chunk, dtype=np.int16).astype(np.float32) / 32767
  498. #stream = resampy.resample(x=stream, sr_orig=32000, sr_new=self.sample_rate)
  499. byte_stream=BytesIO(chunk)
  500. stream = self.__create_bytes_stream(byte_stream)
  501. streamlen = stream.shape[0]
  502. idx=0
  503. while streamlen >= self.chunk:
  504. eventpoint={}
  505. if first:
  506. eventpoint={'status':'start','text':text}
  507. eventpoint.update(**textevent)
  508. first = False
  509. self.parent.put_audio_frame(stream[idx:idx+self.chunk],eventpoint)
  510. streamlen -= self.chunk
  511. idx += self.chunk
  512. eventpoint={'status':'end','text':text}
  513. eventpoint.update(**textevent)
  514. self.parent.put_audio_frame(np.zeros(self.chunk,np.float32),eventpoint)
  515. ###########################################################################################
  516. class CosyVoiceTTS(BaseTTS):
  517. def txt_to_audio(self,msg:tuple[str, dict]):
  518. text,textevent = msg
  519. self.stream_tts(
  520. self.cosy_voice(
  521. text,
  522. self.opt.REF_FILE,
  523. self.opt.REF_TEXT,
  524. "zh", #en args.language,
  525. self.opt.TTS_SERVER, #"http://127.0.0.1:5000", #args.server_url,
  526. ),
  527. msg
  528. )
  529. def cosy_voice(self, text, reffile, reftext,language, server_url) -> Iterator[bytes]:
  530. start = time.perf_counter()
  531. payload = {
  532. 'tts_text': text,
  533. 'prompt_text': reftext
  534. }
  535. try:
  536. files = [('prompt_wav', ('prompt_wav', open(reffile, 'rb'), 'application/octet-stream'))]
  537. res = requests.request("GET", f"{server_url}/inference_zero_shot", data=payload, files=files, stream=True)
  538. end = time.perf_counter()
  539. logger.info(f"cosy_voice Time to make POST: {end-start}s")
  540. if res.status_code != 200:
  541. logger.error("Error:%s", res.text)
  542. return
  543. first = True
  544. for chunk in res.iter_content(chunk_size=9600): # 960 24K*20ms*2
  545. if first:
  546. end = time.perf_counter()
  547. logger.info(f"cosy_voice Time to first chunk: {end-start}s")
  548. first = False
  549. if chunk and self.state==State.RUNNING:
  550. yield chunk
  551. except Exception as e:
  552. logger.exception('cosyvoice')
  553. def stream_tts(self,audio_stream,msg:tuple[str, dict]):
  554. text,textevent = msg
  555. first = True
  556. for chunk in audio_stream:
  557. if chunk is not None and len(chunk)>0:
  558. stream = np.frombuffer(chunk, dtype=np.int16).astype(np.float32) / 32767
  559. stream = resampy.resample(x=stream, sr_orig=24000, sr_new=self.sample_rate)
  560. #byte_stream=BytesIO(buffer)
  561. #stream = self.__create_bytes_stream(byte_stream)
  562. streamlen = stream.shape[0]
  563. idx=0
  564. while streamlen >= self.chunk:
  565. eventpoint={}
  566. if first:
  567. eventpoint={'status':'start','text':text}
  568. eventpoint.update(**textevent)
  569. first = False
  570. self.parent.put_audio_frame(stream[idx:idx+self.chunk],eventpoint)
  571. streamlen -= self.chunk
  572. idx += self.chunk
  573. eventpoint={'status':'end','text':text}
  574. eventpoint.update(**textevent)
  575. self.parent.put_audio_frame(np.zeros(self.chunk,np.float32),eventpoint)
  576. ###########################################################################################
  577. _PROTOCOL = "https://"
  578. _HOST = "tts.cloud.tencent.com"
  579. _PATH = "/stream"
  580. _ACTION = "TextToStreamAudio"
  581. class TencentTTS(BaseTTS):
  582. def __init__(self, opt, parent):
  583. super().__init__(opt,parent)
  584. self.appid = os.getenv("TENCENT_APPID")
  585. self.secret_key = os.getenv("TENCENT_SECRET_KEY")
  586. self.secret_id = os.getenv("TENCENT_SECRET_ID")
  587. self.voice_type = int(opt.REF_FILE)
  588. self.codec = "pcm"
  589. self.sample_rate = 16000
  590. self.volume = 0
  591. self.speed = 0
  592. def __gen_signature(self, params):
  593. sort_dict = sorted(params.keys())
  594. sign_str = "POST" + _HOST + _PATH + "?"
  595. for key in sort_dict:
  596. sign_str = sign_str + key + "=" + str(params[key]) + '&'
  597. sign_str = sign_str[:-1]
  598. hmacstr = hmac.new(self.secret_key.encode('utf-8'),
  599. sign_str.encode('utf-8'), hashlib.sha1).digest()
  600. s = base64.b64encode(hmacstr)
  601. s = s.decode('utf-8')
  602. return s
  603. def __gen_params(self, session_id, text):
  604. params = dict()
  605. params['Action'] = _ACTION
  606. params['AppId'] = int(self.appid)
  607. params['SecretId'] = self.secret_id
  608. params['ModelType'] = 1
  609. params['VoiceType'] = self.voice_type
  610. params['Codec'] = self.codec
  611. params['SampleRate'] = self.sample_rate
  612. params['Speed'] = self.speed
  613. params['Volume'] = self.volume
  614. params['SessionId'] = session_id
  615. params['Text'] = text
  616. timestamp = int(time.time())
  617. params['Timestamp'] = timestamp
  618. params['Expired'] = timestamp + 24 * 60 * 60
  619. return params
  620. def txt_to_audio(self,msg:tuple[str, dict]):
  621. text,textevent = msg
  622. self.stream_tts(
  623. self.tencent_voice(
  624. text,
  625. self.opt.REF_FILE,
  626. self.opt.REF_TEXT,
  627. "zh", #en args.language,
  628. self.opt.TTS_SERVER, #"http://127.0.0.1:5000", #args.server_url,
  629. ),
  630. msg
  631. )
  632. def tencent_voice(self, text, reffile, reftext,language, server_url) -> Iterator[bytes]:
  633. start = time.perf_counter()
  634. session_id = str(uuid.uuid1())
  635. params = self.__gen_params(session_id, text)
  636. signature = self.__gen_signature(params)
  637. headers = {
  638. "Content-Type": "application/json",
  639. "Authorization": str(signature)
  640. }
  641. url = _PROTOCOL + _HOST + _PATH
  642. try:
  643. res = requests.post(url, headers=headers,
  644. data=json.dumps(params), stream=True)
  645. end = time.perf_counter()
  646. logger.info(f"tencent Time to make POST: {end-start}s")
  647. first = True
  648. for chunk in res.iter_content(chunk_size=6400): # 640 16K*20ms*2
  649. #logger.info('chunk len:%d',len(chunk))
  650. if first:
  651. try:
  652. rsp = json.loads(chunk)
  653. #response["Code"] = rsp["Response"]["Error"]["Code"]
  654. #response["Message"] = rsp["Response"]["Error"]["Message"]
  655. logger.error("tencent tts:%s",rsp["Response"]["Error"]["Message"])
  656. return
  657. except:
  658. end = time.perf_counter()
  659. logger.info(f"tencent Time to first chunk: {end-start}s")
  660. first = False
  661. if chunk and self.state==State.RUNNING:
  662. yield chunk
  663. except Exception as e:
  664. logger.exception('tencent')
  665. def stream_tts(self,audio_stream,msg:tuple[str, dict]):
  666. text,textevent = msg
  667. first = True
  668. last_stream = np.array([],dtype=np.float32)
  669. for chunk in audio_stream:
  670. if chunk is not None and len(chunk)>0:
  671. stream = np.frombuffer(chunk, dtype=np.int16).astype(np.float32) / 32767
  672. stream = np.concatenate((last_stream,stream))
  673. #stream = resampy.resample(x=stream, sr_orig=24000, sr_new=self.sample_rate)
  674. #byte_stream=BytesIO(buffer)
  675. #stream = self.__create_bytes_stream(byte_stream)
  676. streamlen = stream.shape[0]
  677. idx=0
  678. while streamlen >= self.chunk:
  679. eventpoint={}
  680. if first:
  681. eventpoint={'status':'start','text':text}
  682. eventpoint.update(**textevent)
  683. first = False
  684. self.parent.put_audio_frame(stream[idx:idx+self.chunk],eventpoint)
  685. streamlen -= self.chunk
  686. idx += self.chunk
  687. last_stream = stream[idx:] #get the remain stream
  688. eventpoint={'status':'end','text':text}
  689. eventpoint.update(**textevent)
  690. self.parent.put_audio_frame(np.zeros(self.chunk,np.float32),eventpoint)
  691. ###########################################################################################
  692. class DoubaoTTS(BaseTTS):
  693. def __init__(self, opt, parent):
  694. super().__init__(opt, parent)
  695. # 从配置中读取火山引擎参数
  696. self.appid = os.getenv("DOUBAO_APPID")
  697. self.token = os.getenv("DOUBAO_TOKEN")
  698. _cluster = 'volcano_tts'
  699. _host = "openspeech.bytedance.com"
  700. self.api_url = f"wss://{_host}/api/v1/tts/ws_binary"
  701. self.request_json = {
  702. "app": {
  703. "appid": self.appid,
  704. "token": "access_token",
  705. "cluster": _cluster
  706. },
  707. "user": {
  708. "uid": "xxx"
  709. },
  710. "audio": {
  711. "voice_type": "xxx",
  712. "encoding": "pcm",
  713. "rate": 16000,
  714. "speed_ratio": 1.0,
  715. "volume_ratio": 1.0,
  716. "pitch_ratio": 1.0,
  717. },
  718. "request": {
  719. "reqid": "xxx",
  720. "text": "字节跳动语音合成。",
  721. "text_type": "plain",
  722. "operation": "xxx"
  723. }
  724. }
  725. async def doubao_voice(self, text): # -> Iterator[bytes]:
  726. start = time.perf_counter()
  727. voice_type = self.opt.REF_FILE
  728. try:
  729. # 创建请求对象
  730. default_header = bytearray(b'\x11\x10\x11\x00')
  731. submit_request_json = copy.deepcopy(self.request_json)
  732. submit_request_json["user"]["uid"] = self.parent.sessionid
  733. submit_request_json["audio"]["voice_type"] = voice_type
  734. submit_request_json["request"]["text"] = text
  735. submit_request_json["request"]["reqid"] = str(uuid.uuid4())
  736. submit_request_json["request"]["operation"] = "submit"
  737. payload_bytes = str.encode(json.dumps(submit_request_json))
  738. payload_bytes = gzip.compress(payload_bytes) # if no compression, comment this line
  739. full_client_request = bytearray(default_header)
  740. full_client_request.extend((len(payload_bytes)).to_bytes(4, 'big')) # payload size(4 bytes)
  741. full_client_request.extend(payload_bytes) # payload
  742. header = {"Authorization": f"Bearer; {self.token}"}
  743. first = True
  744. async with websockets.connect(self.api_url, extra_headers=header, ping_interval=None) as ws:
  745. await ws.send(full_client_request)
  746. while True:
  747. res = await ws.recv()
  748. header_size = res[0] & 0x0f
  749. message_type = res[1] >> 4
  750. message_type_specific_flags = res[1] & 0x0f
  751. payload = res[header_size*4:]
  752. if message_type == 0xb: # audio-only server response
  753. if message_type_specific_flags == 0: # no sequence number as ACK
  754. #print(" Payload size: 0")
  755. continue
  756. else:
  757. if first:
  758. end = time.perf_counter()
  759. logger.info(f"doubao tts Time to first chunk: {end-start}s")
  760. first = False
  761. sequence_number = int.from_bytes(payload[:4], "big", signed=True)
  762. payload_size = int.from_bytes(payload[4:8], "big", signed=False)
  763. payload = payload[8:]
  764. yield payload
  765. if sequence_number < 0:
  766. break
  767. else:
  768. break
  769. except Exception as e:
  770. logger.exception('doubao')
  771. # # 检查响应状态码
  772. # if response.status_code == 200:
  773. # # 处理响应数据
  774. # audio_data = base64.b64decode(response.json().get('data'))
  775. # yield audio_data
  776. # else:
  777. # logger.error(f"请求失败,状态码: {response.status_code}")
  778. # return
  779. def txt_to_audio(self, msg:tuple[str, dict]):
  780. text, textevent = msg
  781. asyncio.new_event_loop().run_until_complete(
  782. self.stream_tts(
  783. self.doubao_voice(text),
  784. msg
  785. )
  786. )
  787. async def stream_tts(self, audio_stream, msg:tuple[str, dict]):
  788. text, textevent = msg
  789. first = True
  790. last_stream = np.array([],dtype=np.float32)
  791. async for chunk in audio_stream:
  792. if chunk is not None and len(chunk) > 0:
  793. stream = np.frombuffer(chunk, dtype=np.int16).astype(np.float32) / 32767
  794. stream = np.concatenate((last_stream,stream))
  795. #stream = resampy.resample(x=stream, sr_orig=24000, sr_new=self.sample_rate)
  796. # byte_stream=BytesIO(buffer)
  797. # stream = self.__create_bytes_stream(byte_stream)
  798. streamlen = stream.shape[0]
  799. idx = 0
  800. while streamlen >= self.chunk:
  801. eventpoint = {}
  802. if first:
  803. eventpoint={'status':'start','text':text}
  804. eventpoint.update(**textevent)
  805. first = False
  806. self.parent.put_audio_frame(stream[idx:idx + self.chunk], eventpoint)
  807. streamlen -= self.chunk
  808. idx += self.chunk
  809. last_stream = stream[idx:] #get the remain stream
  810. eventpoint={'status':'end','text':text}
  811. eventpoint.update(**textevent)
  812. self.parent.put_audio_frame(np.zeros(self.chunk, np.float32), eventpoint)
  813. ###########################################################################################
  814. class IndexTTS2(BaseTTS):
  815. def __init__(self, opt, parent):
  816. super().__init__(opt, parent)
  817. # IndexTTS2 配置参数
  818. self.server_url = opt.TTS_SERVER # Gradio服务器地址,如 "http://127.0.0.1:7860/"
  819. self.ref_audio_path = opt.REF_FILE # 参考音频文件路径
  820. self.max_tokens = getattr(opt, 'MAX_TOKENS', 120) # 最大token数
  821. # 初始化Gradio客户端
  822. try:
  823. from gradio_client import Client, handle_file
  824. self.client = Client(self.server_url)
  825. self.handle_file = handle_file
  826. logger.info(f"IndexTTS2 Gradio客户端初始化成功: {self.server_url}")
  827. except ImportError:
  828. logger.error("IndexTTS2 需要安装 gradio_client: pip install gradio_client")
  829. raise
  830. except Exception as e:
  831. logger.error(f"IndexTTS2 Gradio客户端初始化失败: {e}")
  832. raise
  833. def txt_to_audio(self, msg):
  834. text, textevent = msg
  835. try:
  836. # 先进行文本分割
  837. segments = self.split_text(text)
  838. if not segments:
  839. logger.error("IndexTTS2 文本分割失败")
  840. return
  841. logger.info(f"IndexTTS2 文本分割为 {len(segments)} 个片段")
  842. # 循环生成每个片段的音频
  843. for i, segment_text in enumerate(segments):
  844. if self.state != State.RUNNING:
  845. break
  846. logger.info(f"IndexTTS2 正在生成第 {i+1}/{len(segments)} 段音频...")
  847. audio_file = self.indextts2_generate(segment_text)
  848. if audio_file:
  849. # 为每个片段创建事件信息
  850. segment_msg = (segment_text, textevent)
  851. self.file_to_stream(audio_file, segment_msg, is_first=(i==0), is_last=(i==len(segments)-1))
  852. else:
  853. logger.error(f"IndexTTS2 第 {i+1} 段音频生成失败")
  854. except Exception as e:
  855. logger.exception(f"IndexTTS2 txt_to_audio 错误: {e}")
  856. def split_text(self, text):
  857. """使用 IndexTTS2 API 分割文本"""
  858. try:
  859. logger.info(f"IndexTTS2 开始分割文本,长度: {len(text)}")
  860. # 调用文本分割 API
  861. result = self.client.predict(
  862. text=text,
  863. max_text_tokens_per_segment=self.max_tokens,
  864. api_name="/on_input_text_change"
  865. )
  866. # 解析分割结果
  867. if 'value' in result and 'data' in result['value']:
  868. data = result['value']['data']
  869. logger.info(f"IndexTTS2 共分割为 {len(data)} 个片段")
  870. segments = []
  871. for i, item in enumerate(data):
  872. 序号 = item[0] + 1
  873. 分句内容 = item[1]
  874. token数 = item[2]
  875. logger.info(f"片段 {序号}: {len(分句内容)} 字符, {token数} tokens")
  876. segments.append(分句内容)
  877. return segments
  878. else:
  879. logger.error(f"IndexTTS2 文本分割结果格式异常: {result}")
  880. return [text] # 如果分割失败,返回原文本
  881. except Exception as e:
  882. logger.exception(f"IndexTTS2 文本分割失败: {e}")
  883. return [text] # 如果分割失败,返回原文本
  884. def indextts2_generate(self, text):
  885. """调用 IndexTTS2 Gradio API 生成语音"""
  886. start = time.perf_counter()
  887. try:
  888. # 调用 gen_single API
  889. result = self.client.predict(
  890. emo_control_method="Same as the voice reference",
  891. prompt=self.handle_file(self.ref_audio_path),
  892. text=text,
  893. emo_ref_path=self.handle_file(self.ref_audio_path),
  894. emo_weight=0.8,
  895. vec1=0.5,
  896. vec2=0,
  897. vec3=0,
  898. vec4=0,
  899. vec5=0,
  900. vec6=0,
  901. vec7=0,
  902. vec8=0,
  903. emo_text="",
  904. emo_random=False,
  905. max_text_tokens_per_segment=self.max_tokens,
  906. param_16=True,
  907. param_17=0.8,
  908. param_18=30,
  909. param_19=0.8,
  910. param_20=0,
  911. param_21=3,
  912. param_22=10,
  913. param_23=1500,
  914. api_name="/gen_single"
  915. )
  916. end = time.perf_counter()
  917. logger.info(f"IndexTTS2 片段生成完成,耗时: {end-start:.2f}s")
  918. # 返回生成的音频文件路径
  919. if 'value' in result:
  920. audio_file = result['value']
  921. return audio_file
  922. else:
  923. logger.error(f"IndexTTS2 结果格式异常: {result}")
  924. return None
  925. except Exception as e:
  926. logger.exception(f"IndexTTS2 API调用失败: {e}")
  927. return None
  928. def file_to_stream(self, audio_file, msg, is_first=False, is_last=False):
  929. """将音频文件转换为音频流"""
  930. text, textevent = msg
  931. try:
  932. # 读取音频文件
  933. stream, sample_rate = sf.read(audio_file)
  934. logger.info(f'IndexTTS2 音频文件 {sample_rate}Hz: {stream.shape}')
  935. # 转换为float32
  936. stream = stream.astype(np.float32)
  937. # 如果是多声道,只取第一个声道
  938. if stream.ndim > 1:
  939. logger.info(f'IndexTTS2 音频有 {stream.shape[1]} 个声道,只使用第一个')
  940. stream = stream[:, 0]
  941. # 重采样到目标采样率
  942. if sample_rate != self.sample_rate and stream.shape[0] > 0:
  943. logger.info(f'IndexTTS2 重采样: {sample_rate}Hz -> {self.sample_rate}Hz')
  944. stream = resampy.resample(x=stream, sr_orig=sample_rate, sr_new=self.sample_rate)
  945. # 分块发送音频流
  946. streamlen = stream.shape[0]
  947. idx = 0
  948. first_chunk = True
  949. while streamlen >= self.chunk and self.state == State.RUNNING:
  950. eventpoint = None
  951. # 只在第一个片段的第一个chunk发送start事件
  952. if is_first and first_chunk:
  953. eventpoint = {'status': 'start', 'text': text, 'msgevent': textevent}
  954. first_chunk = False
  955. self.parent.put_audio_frame(stream[idx:idx + self.chunk], eventpoint)
  956. idx += self.chunk
  957. streamlen -= self.chunk
  958. # 只在最后一个片段发送end事件
  959. if is_last:
  960. eventpoint = {'status': 'end', 'text': text, 'msgevent': textevent}
  961. self.parent.put_audio_frame(np.zeros(self.chunk, np.float32), eventpoint)
  962. # 清理临时文件
  963. try:
  964. if os.path.exists(audio_file):
  965. os.remove(audio_file)
  966. logger.info(f"IndexTTS2 已删除临时文件: {audio_file}")
  967. except Exception as e:
  968. logger.warning(f"IndexTTS2 删除临时文件失败: {e}")
  969. except Exception as e:
  970. logger.exception(f"IndexTTS2 音频流处理失败: {e}")
  971. ###########################################################################################
  972. class XTTS(BaseTTS):
  973. def __init__(self, opt, parent):
  974. super().__init__(opt,parent)
  975. self.speaker = self.get_speaker(opt.REF_FILE, opt.TTS_SERVER)
  976. def txt_to_audio(self,msg:tuple[str, dict]):
  977. text,textevent = msg
  978. self.stream_tts(
  979. self.xtts(
  980. text,
  981. self.speaker,
  982. "zh-cn", #en args.language,
  983. self.opt.TTS_SERVER, #"http://localhost:9000", #args.server_url,
  984. "20" #args.stream_chunk_size
  985. ),
  986. msg
  987. )
  988. def get_speaker(self,ref_audio,server_url):
  989. files = {"wav_file": ("reference.wav", open(ref_audio, "rb"))}
  990. response = requests.post(f"{server_url}/clone_speaker", files=files)
  991. return response.json()
  992. def xtts(self,text, speaker, language, server_url, stream_chunk_size) -> Iterator[bytes]:
  993. start = time.perf_counter()
  994. speaker["text"] = text
  995. speaker["language"] = language
  996. speaker["stream_chunk_size"] = stream_chunk_size # you can reduce it to get faster response, but degrade quality
  997. try:
  998. res = requests.post(
  999. f"{server_url}/tts_stream",
  1000. json=speaker,
  1001. stream=True,
  1002. )
  1003. end = time.perf_counter()
  1004. logger.info(f"xtts Time to make POST: {end-start}s")
  1005. if res.status_code != 200:
  1006. print("Error:", res.text)
  1007. return
  1008. first = True
  1009. for chunk in res.iter_content(chunk_size=None): #24K*20ms*2
  1010. if first:
  1011. end = time.perf_counter()
  1012. logger.info(f"xtts Time to first chunk: {end-start}s")
  1013. first = False
  1014. if chunk:
  1015. yield chunk
  1016. except Exception as e:
  1017. print(e)
  1018. def stream_tts(self,audio_stream,msg:tuple[str, dict]):
  1019. text,textevent = msg
  1020. first = True
  1021. last_stream = np.array([],dtype=np.float32)
  1022. for chunk in audio_stream:
  1023. if chunk is not None and len(chunk)>0:
  1024. stream = np.frombuffer(chunk, dtype=np.int16).astype(np.float32) / 32767
  1025. stream = resampy.resample(x=stream, sr_orig=24000, sr_new=self.sample_rate)
  1026. stream = np.concatenate((last_stream,stream))
  1027. #byte_stream=BytesIO(buffer)
  1028. #stream = self.__create_bytes_stream(byte_stream)
  1029. streamlen = stream.shape[0]
  1030. idx=0
  1031. while streamlen >= self.chunk:
  1032. eventpoint={}
  1033. if first:
  1034. eventpoint={'status':'start','text':text}
  1035. eventpoint.update(**textevent)
  1036. first = False
  1037. self.parent.put_audio_frame(stream[idx:idx+self.chunk],eventpoint)
  1038. streamlen -= self.chunk
  1039. idx += self.chunk
  1040. last_stream = stream[idx:] #get the remain stream
  1041. eventpoint={'status':'end','text':text}
  1042. eventpoint.update(**textevent)
  1043. self.parent.put_audio_frame(np.zeros(self.chunk,np.float32),eventpoint)
  1044. ###########################################################################################
  1045. class AzureTTS(BaseTTS):
  1046. CHUNK_SIZE = 640 # 16kHz, 20ms, 16-bit Mono PCM size
  1047. def __init__(self, opt, parent):
  1048. super().__init__(opt,parent)
  1049. self.audio_buffer = b''
  1050. voicename = self.opt.REF_FILE # 比如"zh-CN-XiaoxiaoMultilingualNeural"
  1051. speech_key = os.getenv("AZURE_SPEECH_KEY")
  1052. tts_region = os.getenv("AZURE_TTS_REGION")
  1053. speech_endpoint = f"wss://{tts_region}.tts.speech.microsoft.com/cognitiveservices/websocket/v2"
  1054. speech_config = speechsdk.SpeechConfig(subscription=speech_key,endpoint=speech_endpoint)
  1055. speech_config.speech_synthesis_voice_name = voicename
  1056. speech_config.set_speech_synthesis_output_format(speechsdk.SpeechSynthesisOutputFormat.Raw16Khz16BitMonoPcm)
  1057. # 获取内存中流形式的结果
  1058. self.speech_synthesizer = speechsdk.SpeechSynthesizer(speech_config=speech_config, audio_config=None)
  1059. self.speech_synthesizer.synthesizing.connect(self._on_synthesizing)
  1060. def txt_to_audio(self,msg:tuple[str, dict]):
  1061. msg_text: str = msg[0]
  1062. result=self.speech_synthesizer.speak_text(msg_text)
  1063. # 延迟指标
  1064. fb_latency = int(result.properties.get_property(
  1065. speechsdk.PropertyId.SpeechServiceResponse_SynthesisFirstByteLatencyMs
  1066. ))
  1067. fin_latency = int(result.properties.get_property(
  1068. speechsdk.PropertyId.SpeechServiceResponse_SynthesisFinishLatencyMs
  1069. ))
  1070. logger.info(f"azure音频生成相关:首字节延迟: {fb_latency} ms, 完成延迟: {fin_latency} ms, result_id: {result.result_id}")
  1071. # === 回调 ===
  1072. def _on_synthesizing(self, evt: speechsdk.SpeechSynthesisEventArgs):
  1073. if evt.result.reason == speechsdk.ResultReason.SynthesizingAudioCompleted:
  1074. logger.info("SynthesizingAudioCompleted")
  1075. elif evt.result.reason == speechsdk.ResultReason.Canceled:
  1076. cancellation_details = evt.result.cancellation_details
  1077. logger.info(f"Speech synthesis canceled: {cancellation_details.reason}")
  1078. if cancellation_details.reason == speechsdk.CancellationReason.Error:
  1079. if cancellation_details.error_details:
  1080. logger.info(f"Error details: {cancellation_details.error_details}")
  1081. if self.state != State.RUNNING:
  1082. self.audio_buffer = b''
  1083. return
  1084. # evt.result.audio_data 是刚到的一小段原始 PCM
  1085. self.audio_buffer += evt.result.audio_data
  1086. while len(self.audio_buffer) >= self.CHUNK_SIZE:
  1087. chunk = self.audio_buffer[:self.CHUNK_SIZE]
  1088. self.audio_buffer = self.audio_buffer[self.CHUNK_SIZE:]
  1089. frame = (np.frombuffer(chunk, dtype=np.int16)
  1090. .astype(np.float32) / 32767.0)
  1091. self.parent.put_audio_frame(frame)
  1092. ###########################################################################################
  1093. class VoxCPM2TTS(BaseTTS):
  1094. """VoxCPM2 TTS 实现类 - 基于 OpenBMB 的 VoxCPM2 模型(终极克隆模式)"""
  1095. # 类变量:共享模型实例(避免多个 session 重复加载)
  1096. _shared_model = None
  1097. _shared_sample_rate = None
  1098. def __init__(self, opt, parent):
  1099. super().__init__(opt, parent)
  1100. # VoxCPM2 模型路径(从环境变量或 opt 读取)
  1101. self.model_path = os.getenv('VOXCPM2_MODEL_PATH', getattr(opt, 'VOXCPM2_MODEL_PATH', 'VoxCPM2'))
  1102. # 参考音频配置(从环境变量或 opt 读取)
  1103. self.ref_wav_path = os.getenv('VOXCPM2_REF_WAV', getattr(opt, 'VOXCPM2_REF_WAV', 'voice_output.wav'))
  1104. self.ref_text = os.getenv('VOXCPM2_REF_TEXT', getattr(opt, 'VOXCPM2_REF_TEXT', '你好,买水果,卖水果,新鲜的水果。'))
  1105. # 生成参数
  1106. self.cfg_value = getattr(opt, 'CFG_VALUE', 2.0)
  1107. self.inference_timesteps = getattr(opt, 'INFERENCE_TIMESTEPS', 10)
  1108. # 加载模型(使用单例模式,只加载一次)
  1109. try:
  1110. from voxcpm import VoxCPM
  1111. import torch
  1112. # 禁用 TorchDynamo 编译(避免 scaled_dot_product_attention 兼容性错误)
  1113. torch._dynamo.config.suppress_errors = True
  1114. torch.compiler.disable()
  1115. # 检查是否已经有共享模型
  1116. if VoxCPM2TTS._shared_model is None:
  1117. logger.info(f"🔄 首次加载 VoxCPM2 模型: {self.model_path}")
  1118. start_load = time.time()
  1119. # 清理显存
  1120. if torch.cuda.is_available():
  1121. torch.cuda.empty_cache()
  1122. logger.info(f"📊 GPU 显存状态: {torch.cuda.memory_allocated() / 1024**3:.2f}GB / {torch.cuda.get_device_properties(0).total_memory / 1024**3:.2f}GB")
  1123. logger.info("🔧 禁用 TorchDynamo 编译和优化以避免兼容性问题")
  1124. VoxCPM2TTS._shared_model = VoxCPM.from_pretrained(
  1125. self.model_path,
  1126. load_denoiser=False,
  1127. optimize=False # 禁用优化和 Warm up,避免 scaled_dot_product_attention 错误
  1128. )
  1129. load_time = time.time() - start_load
  1130. VoxCPM2TTS._shared_sample_rate = VoxCPM2TTS._shared_model.tts_model.sample_rate
  1131. logger.info(f"✅ VoxCPM2 模型加载成功!耗时: {load_time:.2f}s")
  1132. logger.info(f" 采样率: {VoxCPM2TTS._shared_sample_rate}Hz")
  1133. # 清理显存
  1134. if torch.cuda.is_available():
  1135. torch.cuda.empty_cache()
  1136. logger.info(f"📊 模型加载后 GPU 显存: {torch.cuda.memory_allocated() / 1024**3:.2f}GB")
  1137. else:
  1138. logger.info(f"♻️ 使用已加载的 VoxCPM2 模型(共享实例)")
  1139. self.model = VoxCPM2TTS._shared_model
  1140. self.sample_rate = VoxCPM2TTS._shared_sample_rate
  1141. logger.info(f" 参考音频: {self.ref_wav_path}")
  1142. logger.info(f" 参考文本: {self.ref_text}")
  1143. # 检查参考音频是否存在
  1144. if not os.path.exists(self.ref_wav_path):
  1145. logger.warning(f"⚠️ 参考音频不存在: {self.ref_wav_path},将使用默认声音")
  1146. self.ref_wav_path = None
  1147. except ImportError as e:
  1148. logger.error(f"❌ 请安装 voxcpm 包: pip install voxcpm")
  1149. raise
  1150. except Exception as e:
  1151. logger.error(f"❌ VoxCPM2 模型加载失败: {e}")
  1152. raise
  1153. def txt_to_audio(self, msg: tuple[str, dict]):
  1154. text, textevent = msg
  1155. t = time.time()
  1156. # 检查状态
  1157. if self.state != State.RUNNING:
  1158. return
  1159. # 处理空消息
  1160. if not text.strip():
  1161. return
  1162. # 重置打断标志
  1163. self.reset_interrupt_flag()
  1164. # 检查高优先级消息
  1165. if not self.high_priority_queue.empty():
  1166. logger.info("发现高优先级消息,跳过当前普通消息处理")
  1167. return
  1168. try:
  1169. # 生成音频
  1170. char_count = len(text)
  1171. logger.info(f"📝 VoxCPM2 开始生成音频,文本长度: {char_count} 字")
  1172. logger.info(f"📖 文本内容: {text[:50]}..." if char_count > 50 else f"📖 文本内容: {text}")
  1173. # 准备生成参数(终极克隆模式)
  1174. generate_kwargs = {
  1175. 'text': text,
  1176. }
  1177. # 如果有参考音频,使用纯声音克隆模式
  1178. if self.ref_wav_path and os.path.exists(self.ref_wav_path):
  1179. # 只使用 reference_wav_path,启用纯声音克隆模式
  1180. # 不使用 prompt_wav_path + prompt_text,避免进入延续模式
  1181. generate_kwargs['reference_wav_path'] = self.ref_wav_path
  1182. generate_kwargs['retry_badcase'] = False # 禁用 Badcase 重试,提升生成速度
  1183. generate_kwargs['inference_timesteps'] = 8 # 降低推理步数,从 10 降到 8,提升速度
  1184. generate_kwargs['cfg_value'] = 2.5 # 提高 CFG 值,增强克隆效果
  1185. logger.info("🎯 使用纯声音克隆模式(仅 reference_wav_path)")
  1186. logger.info(f" 参考音频: {self.ref_wav_path}")
  1187. else:
  1188. # 降级为普通生成模式
  1189. generate_kwargs['cfg_value'] = self.cfg_value
  1190. generate_kwargs['inference_timesteps'] = self.inference_timesteps
  1191. logger.info("🎤 使用默认声音生成模式")
  1192. # 生成音频
  1193. start_gen = time.perf_counter()
  1194. wav = self.model.generate(**generate_kwargs)
  1195. end_gen = time.perf_counter()
  1196. gen_duration = end_gen - start_gen
  1197. gen_speed = char_count / gen_duration if gen_duration > 0 else 0
  1198. logger.info(f'✅ VoxCPM2 音频生成完成')
  1199. logger.info(f' ⏱️ 生成耗时: {gen_duration:.3f}s')
  1200. logger.info(f' ⚡ 生成速度: {gen_speed:.2f} 字/秒')
  1201. logger.info(f' 🎵 音频长度: {wav.shape[0]} 采样点 ({wav.shape[0]/self.sample_rate:.2f}s)')
  1202. # 将 numpy 数组转换为 BytesIO
  1203. audio_bytes = BytesIO()
  1204. sf.write(audio_bytes, wav, self.sample_rate, format='wav', subtype='PCM_16')
  1205. audio_bytes.seek(0)
  1206. # 读取音频数据
  1207. stream, sample_rate = sf.read(audio_bytes)
  1208. stream = stream.astype(np.float32)
  1209. if stream.ndim > 1:
  1210. stream = stream[:, 0]
  1211. # 重采样到系统采样率(16kHz)
  1212. # VoxCPM2 输出 48kHz,必须重采样到 16kHz 以匹配系统期望
  1213. target_sample_rate = 16000 # 系统标准采样率
  1214. if sample_rate != target_sample_rate and stream.shape[0] > 0:
  1215. logger.info(f'🔄 重采样: {sample_rate}Hz -> {target_sample_rate}Hz')
  1216. stream = resampy.resample(x=stream, sr_orig=sample_rate, sr_new=target_sample_rate)
  1217. # 分块发送音频(使用 16kHz 的 chunk 大小)
  1218. # chunk = 16000 / 50 = 320 samples per 20ms
  1219. target_chunk = target_sample_rate // self.fps # 320 samples
  1220. streamlen = stream.shape[0]
  1221. idx = 0
  1222. first_chunk = True
  1223. # 播放前检查状态
  1224. if self.state != State.RUNNING:
  1225. return
  1226. if self.interrupt_flag.is_set() or not self.high_priority_queue.empty():
  1227. return
  1228. while streamlen >= target_chunk and self.state == State.RUNNING:
  1229. # 检查打断标志和高优先级队列
  1230. if self.interrupt_flag.is_set() or not self.high_priority_queue.empty():
  1231. logger.info("⚡ 播放过程中发现高优先级消息,中断播放")
  1232. break
  1233. eventpoint = {}
  1234. streamlen -= target_chunk
  1235. if first_chunk:
  1236. eventpoint = {'status': 'start', 'text': text}
  1237. eventpoint.update(**textevent)
  1238. first_chunk = False
  1239. elif streamlen < target_chunk:
  1240. eventpoint = {'status': 'end', 'text': text}
  1241. eventpoint.update(**textevent)
  1242. # 发送前再次检查状态
  1243. if self.state != State.RUNNING:
  1244. break
  1245. if self.interrupt_flag.is_set() or not self.high_priority_queue.empty():
  1246. logger.info("⚡ 发送音频帧前发现高优先级消息,中断播放")
  1247. break
  1248. self.parent.put_audio_frame(stream[idx:idx+target_chunk], eventpoint)
  1249. idx += target_chunk
  1250. logger.info(f"🎉 VoxCPM2 音频播放完成")
  1251. except Exception as e:
  1252. logger.exception(f"❌ VoxCPM2 TTS 错误: {e}")