ttsreal.py 53 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255
  1. ###############################################################################
  2. # Copyright (C) 2024 LiveTalking@lipku https://github.com/lipku/LiveTalking
  3. # email: lipku@foxmail.com
  4. #
  5. # Licensed under the Apache License, Version 2.0 (the "License");
  6. # you may not use this file except in compliance with the License.
  7. # You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. ###############################################################################
  17. from __future__ import annotations
  18. import time
  19. import numpy as np
  20. import soundfile as sf
  21. import resampy
  22. import asyncio
  23. # import edge_tts # 已移除,改为通过本地 API 调用
  24. import os
  25. import hmac
  26. import hashlib
  27. import base64
  28. import json
  29. import uuid
  30. import threading
  31. from typing import Iterator
  32. import requests
  33. import queue
  34. from queue import Queue
  35. from io import BytesIO
  36. import copy,websockets,gzip
  37. import azure.cognitiveservices.speech as speechsdk
  38. from threading import Thread, Event
  39. from enum import Enum
  40. from typing import TYPE_CHECKING
  41. if TYPE_CHECKING:
  42. from basereal import BaseReal
  43. from logger import logger
  44. class State(Enum):
  45. RUNNING=0
  46. PAUSE=1
  47. class BaseTTS:
  48. def __init__(self, opt, parent:BaseReal):
  49. self.opt=opt
  50. self.parent = parent
  51. self.fps = opt.fps # 20 ms per frame
  52. self.sample_rate = 16000
  53. self.chunk = self.sample_rate // self.fps # 320 samples per chunk (20ms * 16000 / 1000)
  54. self.input_stream = BytesIO()
  55. self.msgqueue = Queue()
  56. self.high_priority_queue = Queue() # 高优先级队列,用于用户问题
  57. self.state = State.RUNNING
  58. # 添加属性来存储被中断的消息
  59. self.interrupted_messages = [] # 存储被中断的消息
  60. # 添加属性来跟踪当前正在处理的消息
  61. self.current_msg = None # 当前正在处理的消息
  62. self.current_msg_progress = 0 # 当前消息的进度(字符位置)
  63. self.interrupt_flag = threading.Event() # 添加打断事件
  64. def reset_interrupt_flag(self):
  65. """重置打断标志"""
  66. self.interrupt_flag.clear()
  67. def set_interrupt_flag(self):
  68. """设置打断标志"""
  69. self.interrupt_flag.set()
  70. def _trigger_continue_play(self):
  71. """触发自动续播下一条介绍内容"""
  72. try:
  73. # 检查是否处于介绍播放状态
  74. if (hasattr(self.parent, 'intro_play_state') and
  75. self.parent.intro_play_state.get('is_playing', False) and
  76. not self.parent.intro_play_state.get('is_paused', False) and
  77. not self.parent.intro_play_state.get('is_waiting_next', False)):
  78. # 检查是否有高优先级消息在等待,如果有则不触发续播
  79. if not self.high_priority_queue.empty():
  80. logger.info("有高优先级消息等待,跳过自动续播")
  81. return
  82. # 检查消息队列是否为空,如果不为空说明还有消息在等待,不触发续播
  83. if not self.msgqueue.empty():
  84. logger.info("消息队列不为空,跳过自动续播")
  85. return
  86. # 触发续播
  87. self.parent._continue_intro_play()
  88. else:
  89. if hasattr(self.parent, 'intro_play_state') and self.parent.intro_play_state.get('is_waiting_next', False):
  90. logger.info("正在等待前一条播放完成,跳过本次触发")
  91. except Exception as e:
  92. logger.error(f"触发自动续播时出错: {e}")
  93. def flush_talk(self):
  94. # 停止当前播放并清空待处理的消息队列,但保留当前正在处理的状态
  95. # 这样可以确保打断后不会播放队列中积累的旧消息
  96. with self.msgqueue.mutex: # 使用队列的互斥锁确保线程安全
  97. # 保存队列中的剩余消息到中断列表
  98. remaining_msgs = list(self.msgqueue.queue)
  99. if remaining_msgs:
  100. self.interrupted_messages.extend(remaining_msgs)
  101. self.msgqueue.queue.clear() # 清空队列中等待的消息
  102. # 如果当前有正在处理的消息,也要将其保存到中断列表
  103. if self.current_msg:
  104. # 将当前消息加入中断列表
  105. self.interrupted_messages.append(self.current_msg)
  106. # 清空当前消息引用,因为已经被中断
  107. self.current_msg = None
  108. self.state = State.PAUSE
  109. # 清除当前正在处理的音频缓冲区
  110. if hasattr(self, 'input_stream') and hasattr(self.input_stream, 'seek') and hasattr(self.input_stream, 'truncate'):
  111. self.input_stream.seek(0)
  112. self.input_stream.truncate()
  113. def resume_interrupted(self):
  114. """恢复播放被中断的消息"""
  115. if self.interrupted_messages:
  116. # 将被中断的消息重新加入队列
  117. with self.msgqueue.mutex:
  118. for msg in self.interrupted_messages:
  119. self.msgqueue.put(msg)
  120. # 清空中断消息列表
  121. self.interrupted_messages.clear()
  122. # 将状态设置为运行,以便继续处理消息
  123. self.state = State.RUNNING
  124. return True
  125. return False
  126. def put_msg_txt(self,msg:str,datainfo:dict={}):
  127. if len(msg)>0:
  128. # 对于长文本,按句子分割以支持更好的打断功能
  129. if len(msg) > 100: # 如果文本超过100字符,进行分割
  130. import re
  131. # 按标点符号分割文本,保留分隔符
  132. sentences = re.split(r'([。!?.!?])', msg)
  133. # 将句子和标点符号重新组合
  134. parts = []
  135. for i in range(0, len(sentences)-1, 2):
  136. sentence = sentences[i]
  137. punctuation = sentences[i+1] if i+1 < len(sentences) else ''
  138. if sentence.strip():
  139. parts.append(sentence.strip() + punctuation)
  140. # 如果分割后有多个部分,分别放入队列
  141. if len(parts) > 1:
  142. for part in parts:
  143. if part.strip():
  144. self.msgqueue.put((part, datainfo))
  145. return
  146. # 短文本或无法分割的文本直接放入队列
  147. self.msgqueue.put((msg, datainfo))
  148. def put_high_priority_msg(self,msg:str,datainfo:dict={}):
  149. """添加高优先级消息,会优先处理"""
  150. if len(msg)>0:
  151. # 对于长文本,按句子分割以支持更好的打断功能
  152. if len(msg) > 100: # 如果文本超过100字符,进行分割
  153. import re
  154. # 按标点符号分割文本,保留分隔符
  155. sentences = re.split(r'([。!?.!?])', msg)
  156. # 将句子和标点符号重新组合
  157. parts = []
  158. for i in range(0, len(sentences)-1, 2):
  159. sentence = sentences[i]
  160. punctuation = sentences[i+1] if i+1 < len(sentences) else ''
  161. if sentence.strip():
  162. parts.append(sentence.strip() + punctuation)
  163. # 如果分割后有多个部分,分别放入高优先级队列
  164. if len(parts) > 1:
  165. for part in parts:
  166. if part.strip():
  167. self.high_priority_queue.put((part, datainfo))
  168. return
  169. # 短文本或无法分割的文本直接放入高优先级队列
  170. self.high_priority_queue.put((msg, datainfo))
  171. def render(self,quit_event):
  172. process_thread = Thread(target=self.process_tts, args=(quit_event,))
  173. process_thread.start()
  174. def process_tts(self,quit_event):
  175. while not quit_event.is_set():
  176. try:
  177. # 检查状态是否为RUNNING,如果不是,则短暂等待后继续
  178. # 优先检查高优先级队列 - 每次循环都检查高优先级队列
  179. msg = None
  180. try:
  181. # 首先检查高优先级队列,不等待,立即返回
  182. if not self.high_priority_queue.empty():
  183. msg = self.high_priority_queue.get_nowait()
  184. # 处理高优先级消息时,确保状态为RUNNING
  185. self.state = State.RUNNING
  186. logger.info("处理高优先级消息")
  187. else:
  188. # 如果高优先级队列为空,检查普通队列
  189. if self.state != State.RUNNING and not self.msgqueue.empty():
  190. # 如果队列不为空但状态不是RUNNING,等待一段时间后重试
  191. import time
  192. time.sleep(0.1) # 短暂等待
  193. continue
  194. msg = self.msgqueue.get(block=True, timeout=0.05) # 使用较短超时时间,以便快速检查高优先级队列
  195. except queue.Empty:
  196. # 如果两个队列都没有消息,继续等待
  197. continue
  198. # 检查是否是唤醒消息(空消息),如果是则跳过处理,继续循环检查高优先级队列
  199. if msg and len(msg) >= 2 and isinstance(msg[0], str) and not msg[0].strip():
  200. continue # 跳过空消息,继续检查队列
  201. # 记录当前正在处理的消息
  202. self.current_msg = msg
  203. self.current_msg_progress = 0 # 重置进度
  204. except queue.Empty:
  205. continue
  206. # 在处理音频前再次检查状态,如果状态已改变则跳过处理
  207. if self.state == State.RUNNING:
  208. # 检查是否有高优先级消息,如果有则优先处理
  209. if not self.high_priority_queue.empty():
  210. logger.info("发现高优先级消息,中断当前普通消息处理")
  211. # 保存当前消息到中断队列
  212. if msg:
  213. self.interrupted_messages.append(msg)
  214. # 处理高优先级消息
  215. high_priority_msg = self.high_priority_queue.get_nowait()
  216. self.txt_to_audio(high_priority_msg)
  217. # 高优先级消息不触发自动续播
  218. else:
  219. # 处理普通消息
  220. self.txt_to_audio(msg)
  221. # 注意:自动续播不再在这里触发,而是在 _continue_intro_play 中通过定时器控制
  222. # 消息处理完成后,清空当前消息
  223. self.current_msg = None
  224. self.current_msg_progress = 0
  225. logger.info('ttsreal thread stop')
  226. def txt_to_audio(self,msg:tuple[str, dict]):
  227. pass
  228. ###########################################################################################
  229. class EdgeTTS(BaseTTS):
  230. def txt_to_audio(self,msg:tuple[str, dict]):
  231. voicename = self.opt.REF_FILE #"zh-CN-YunxiaNeural"
  232. text,textevent = msg
  233. t = time.time()
  234. # 在开始TTS请求前检查状态,如果状态已改变则跳过
  235. if self.state != State.RUNNING:
  236. return
  237. # 处理空消息,直接返回
  238. if not text.strip():
  239. return
  240. # 重置打断标志,确保高优先级消息能够正常处理
  241. self.reset_interrupt_flag()
  242. # 检查是否有高优先级消息,如果有则立即返回,让process_tts处理
  243. if not self.high_priority_queue.empty():
  244. logger.info("发现高优先级消息,跳过当前普通消息处理")
  245. return
  246. # 使用异步方式处理TTS请求,以便能够响应中断
  247. loop = asyncio.new_event_loop()
  248. asyncio.set_event_loop(loop)
  249. # 创建一个任务来处理TTS请求
  250. task = loop.create_task(self.__main(voicename, text))
  251. # 定期检查中断标志和高优先级队列
  252. while not task.done():
  253. if self.interrupt_flag.is_set() or not self.high_priority_queue.empty():
  254. task.cancel()
  255. logger.info("TTS请求被高优先级消息中断")
  256. break
  257. loop.run_until_complete(asyncio.sleep(0.05)) # 缩短检查间隔,提高响应速度
  258. # 等待任务完成或取消
  259. try:
  260. loop.run_until_complete(task)
  261. except asyncio.CancelledError:
  262. logger.info("TTS请求被中断")
  263. self.input_stream.seek(0)
  264. self.input_stream.truncate()
  265. return
  266. logger.info(f'-------edge tts time:{time.time()-t:.4f}s')
  267. if self.input_stream.getbuffer().nbytes<=0: #edgetts err
  268. logger.error('edgetts err!!!!!')
  269. return
  270. self.input_stream.seek(0)
  271. stream = self.__create_bytes_stream(self.input_stream)
  272. streamlen = stream.shape[0]
  273. idx=0
  274. # 在播放前再次检查状态,如果状态已改变则跳过播放
  275. if self.state != State.RUNNING:
  276. self.input_stream.seek(0)
  277. self.input_stream.truncate()
  278. return
  279. # 检查打断标志或高优先级队列
  280. if self.interrupt_flag.is_set() or not self.high_priority_queue.empty():
  281. self.input_stream.seek(0)
  282. self.input_stream.truncate()
  283. return
  284. while streamlen >= self.chunk and self.state==State.RUNNING:
  285. # 每次循环都检查打断标志和高优先级队列
  286. if self.interrupt_flag.is_set() or not self.high_priority_queue.empty():
  287. logger.info("播放过程中发现高优先级消息,中断播放")
  288. break
  289. eventpoint={}
  290. streamlen -= self.chunk
  291. if idx==0:
  292. eventpoint={'status':'start','text':text}
  293. eventpoint.update(**textevent) #eventpoint={'status':'start','text':text,'msgevent':textevent}
  294. elif streamlen<self.chunk:
  295. eventpoint={'status':'end','text':text}
  296. eventpoint.update(**textevent) #eventpoint={'status':'end','text':text,'msgevent':textevent}
  297. # 在发送音频帧之前再次检查状态
  298. if self.state != State.RUNNING:
  299. break
  300. # 检查打断标志和高优先级队列
  301. if self.interrupt_flag.is_set() or not self.high_priority_queue.empty():
  302. logger.info("发送音频帧前发现高优先级消息,中断播放")
  303. break
  304. self.parent.put_audio_frame(stream[idx:idx+self.chunk],eventpoint)
  305. idx += self.chunk
  306. #if streamlen>0: #skip last frame(not 20ms)
  307. # self.queue.put(stream[idx:])
  308. self.input_stream.seek(0)
  309. self.input_stream.truncate()
  310. def __create_bytes_stream(self,byte_stream):
  311. #byte_stream=BytesIO(buffer)
  312. stream, sample_rate = sf.read(byte_stream) # [T*sample_rate,] float64
  313. logger.info(f'[INFO]tts audio stream {sample_rate}: {stream.shape}')
  314. stream = stream.astype(np.float32)
  315. if stream.ndim > 1:
  316. logger.info(f'[WARN] audio has {stream.shape[1]} channels, only use the first.')
  317. stream = stream[:, 0]
  318. if sample_rate != self.sample_rate and stream.shape[0]>0:
  319. logger.info(f'[WARN] audio sample rate is {sample_rate}, resampling into {self.sample_rate}.')
  320. stream = resampy.resample(x=stream, sr_orig=sample_rate, sr_new=self.sample_rate)
  321. return stream
  322. async def __main(self,voicename: str, text: str):
  323. """通过本地 API 调用 Edge TTS 服务"""
  324. try:
  325. # 使用同步方式调用 HTTP API(在异步函数中使用线程池)
  326. loop = asyncio.get_event_loop()
  327. # 准备请求参数
  328. headers = {"Content-Type": "application/json"}
  329. data = {
  330. "text": text,
  331. "voice": voicename,
  332. "rate": "+0%",
  333. "volume": "+0%",
  334. "pitch": "+0Hz"
  335. }
  336. # 在线程池中执行同步请求,避免阻塞事件循环
  337. def make_request():
  338. resp = requests.post(
  339. "http://127.0.0.1:1024/tts",
  340. json=data,
  341. headers=headers,
  342. timeout=30
  343. )
  344. return resp
  345. # 使用线程池执行同步请求
  346. resp = await loop.run_in_executor(None, make_request)
  347. logger.info(f"Edge TTS API 响应状态码:{resp.status_code}")
  348. # 成功返回音频流
  349. if resp.status_code == 200 and str(resp.headers.get("Content-Type", "")).startswith("audio/"):
  350. # 将音频数据写入输入流
  351. self.input_stream.write(resp.content)
  352. logger.info(f"✅ 成功从 API 获取音频数据,大小:{len(resp.content)} bytes")
  353. else:
  354. # 非 200 时解析错误并抛出
  355. try:
  356. detail = resp.json()
  357. except Exception:
  358. detail = resp.text
  359. logger.error(f"⚠️ TTS API 接口返回错误:{detail}")
  360. resp.raise_for_status()
  361. except requests.exceptions.HTTPError as e:
  362. logger.error(f"HTTP 错误:{e}")
  363. except requests.exceptions.ConnectionError:
  364. logger.error(f"❌ 无法连接到 TTS API 服务器 (http://127.0.0.1:1024/tts),请检查服务是否在线")
  365. except requests.exceptions.Timeout:
  366. logger.error(f"❌ TTS API 请求超时")
  367. except Exception as e:
  368. logger.error(f"其他错误:{str(e)}")
  369. ###########################################################################################
  370. class FishTTS(BaseTTS):
  371. def txt_to_audio(self,msg:tuple[str, dict]):
  372. text,textevent = msg
  373. self.stream_tts(
  374. self.fish_speech(
  375. text,
  376. self.opt.REF_FILE,
  377. self.opt.REF_TEXT,
  378. "zh", #en args.language,
  379. self.opt.TTS_SERVER, #"http://127.0.0.1:5000", #args.server_url,
  380. ),
  381. msg
  382. )
  383. def fish_speech(self, text, reffile, reftext,language, server_url) -> Iterator[bytes]:
  384. start = time.perf_counter()
  385. req={
  386. 'text':text,
  387. 'reference_id':reffile,
  388. 'format':'wav',
  389. 'streaming':True,
  390. 'use_memory_cache':'on'
  391. }
  392. try:
  393. res = requests.post(
  394. f"{server_url}/v1/tts",
  395. json=req,
  396. stream=True,
  397. headers={
  398. "content-type": "application/json",
  399. },
  400. )
  401. end = time.perf_counter()
  402. logger.info(f"fish_speech Time to make POST: {end-start}s")
  403. if res.status_code != 200:
  404. logger.error("Error:%s", res.text)
  405. return
  406. first = True
  407. for chunk in res.iter_content(chunk_size=17640): # 1764 44100*20ms*2
  408. #print('chunk len:',len(chunk))
  409. if first:
  410. end = time.perf_counter()
  411. logger.info(f"fish_speech Time to first chunk: {end-start}s")
  412. first = False
  413. if chunk and self.state==State.RUNNING:
  414. yield chunk
  415. #print("gpt_sovits response.elapsed:", res.elapsed)
  416. except Exception as e:
  417. logger.exception('fishtts')
  418. def stream_tts(self,audio_stream,msg:tuple[str, dict]):
  419. text,textevent = msg
  420. first = True
  421. for chunk in audio_stream:
  422. if chunk is not None and len(chunk)>0:
  423. stream = np.frombuffer(chunk, dtype=np.int16).astype(np.float32) / 32767
  424. stream = resampy.resample(x=stream, sr_orig=44100, sr_new=self.sample_rate)
  425. #byte_stream=BytesIO(buffer)
  426. #stream = self.__create_bytes_stream(byte_stream)
  427. streamlen = stream.shape[0]
  428. idx=0
  429. while streamlen >= self.chunk:
  430. eventpoint={}
  431. if first:
  432. eventpoint={'status':'start','text':text}
  433. eventpoint.update(**textevent) #eventpoint={'status':'start','text':text,'msgevent':textevent}
  434. first = False
  435. self.parent.put_audio_frame(stream[idx:idx+self.chunk],eventpoint)
  436. streamlen -= self.chunk
  437. idx += self.chunk
  438. eventpoint={'status':'end','text':text}
  439. eventpoint.update(**textevent) #eventpoint={'status':'end','text':text,'msgevent':textevent}
  440. self.parent.put_audio_frame(np.zeros(self.chunk,np.float32),eventpoint)
  441. ###########################################################################################
  442. class SovitsTTS(BaseTTS):
  443. def txt_to_audio(self,msg:tuple[str, dict]):
  444. text,textevent = msg
  445. self.stream_tts(
  446. self.gpt_sovits(
  447. text=text,
  448. reffile=self.opt.REF_FILE,
  449. reftext=self.opt.REF_TEXT,
  450. language="zh", #en args.language,
  451. server_url=self.opt.TTS_SERVER, #"http://127.0.0.1:5000", #args.server_url,
  452. ),
  453. msg
  454. )
  455. def gpt_sovits(self, text, reffile, reftext,language, server_url) -> Iterator[bytes]:
  456. start = time.perf_counter()
  457. req={
  458. 'text':text,
  459. 'text_lang':language,
  460. 'ref_audio_path':reffile,
  461. 'prompt_text':reftext,
  462. 'prompt_lang':language,
  463. 'media_type':'ogg',
  464. 'streaming_mode':True
  465. }
  466. # req["text"] = text
  467. # req["text_language"] = language
  468. # req["character"] = character
  469. # req["emotion"] = emotion
  470. # #req["stream_chunk_size"] = stream_chunk_size # you can reduce it to get faster response, but degrade quality
  471. # req["streaming_mode"] = True
  472. try:
  473. res = requests.post(
  474. f"{server_url}/tts",
  475. json=req,
  476. stream=True,
  477. )
  478. end = time.perf_counter()
  479. logger.info(f"gpt_sovits Time to make POST: {end-start}s")
  480. if res.status_code != 200:
  481. logger.error("Error:%s", res.text)
  482. return
  483. first = True
  484. for chunk in res.iter_content(chunk_size=None): #12800 1280 32K*20ms*2
  485. logger.info('chunk len:%d',len(chunk))
  486. if first:
  487. end = time.perf_counter()
  488. logger.info(f"gpt_sovits Time to first chunk: {end-start}s")
  489. first = False
  490. if chunk and self.state==State.RUNNING:
  491. yield chunk
  492. #print("gpt_sovits response.elapsed:", res.elapsed)
  493. except Exception as e:
  494. logger.exception('sovits')
  495. def __create_bytes_stream(self,byte_stream):
  496. #byte_stream=BytesIO(buffer)
  497. stream, sample_rate = sf.read(byte_stream) # [T*sample_rate,] float64
  498. logger.info(f'[INFO]tts audio stream {sample_rate}: {stream.shape}')
  499. stream = stream.astype(np.float32)
  500. if stream.ndim > 1:
  501. logger.info(f'[WARN] audio has {stream.shape[1]} channels, only use the first.')
  502. stream = stream[:, 0]
  503. if sample_rate != self.sample_rate and stream.shape[0]>0:
  504. logger.info(f'[WARN] audio sample rate is {sample_rate}, resampling into {self.sample_rate}.')
  505. stream = resampy.resample(x=stream, sr_orig=sample_rate, sr_new=self.sample_rate)
  506. return stream
  507. def stream_tts(self,audio_stream,msg:tuple[str, dict]):
  508. text,textevent = msg
  509. first = True
  510. for chunk in audio_stream:
  511. if chunk is not None and len(chunk)>0:
  512. #stream = np.frombuffer(chunk, dtype=np.int16).astype(np.float32) / 32767
  513. #stream = resampy.resample(x=stream, sr_orig=32000, sr_new=self.sample_rate)
  514. byte_stream=BytesIO(chunk)
  515. stream = self.__create_bytes_stream(byte_stream)
  516. streamlen = stream.shape[0]
  517. idx=0
  518. while streamlen >= self.chunk:
  519. eventpoint={}
  520. if first:
  521. eventpoint={'status':'start','text':text}
  522. eventpoint.update(**textevent)
  523. first = False
  524. self.parent.put_audio_frame(stream[idx:idx+self.chunk],eventpoint)
  525. streamlen -= self.chunk
  526. idx += self.chunk
  527. eventpoint={'status':'end','text':text}
  528. eventpoint.update(**textevent)
  529. self.parent.put_audio_frame(np.zeros(self.chunk,np.float32),eventpoint)
  530. ###########################################################################################
  531. class CosyVoiceTTS(BaseTTS):
  532. def txt_to_audio(self,msg:tuple[str, dict]):
  533. text,textevent = msg
  534. self.stream_tts(
  535. self.cosy_voice(
  536. text,
  537. self.opt.REF_FILE,
  538. self.opt.REF_TEXT,
  539. "zh", #en args.language,
  540. self.opt.TTS_SERVER, #"http://127.0.0.1:5000", #args.server_url,
  541. ),
  542. msg
  543. )
  544. def cosy_voice(self, text, reffile, reftext,language, server_url) -> Iterator[bytes]:
  545. start = time.perf_counter()
  546. payload = {
  547. 'tts_text': text,
  548. 'prompt_text': reftext
  549. }
  550. try:
  551. files = [('prompt_wav', ('prompt_wav', open(reffile, 'rb'), 'application/octet-stream'))]
  552. res = requests.request("GET", f"{server_url}/inference_zero_shot", data=payload, files=files, stream=True)
  553. end = time.perf_counter()
  554. logger.info(f"cosy_voice Time to make POST: {end-start}s")
  555. if res.status_code != 200:
  556. logger.error("Error:%s", res.text)
  557. return
  558. first = True
  559. for chunk in res.iter_content(chunk_size=9600): # 960 24K*20ms*2
  560. if first:
  561. end = time.perf_counter()
  562. logger.info(f"cosy_voice Time to first chunk: {end-start}s")
  563. first = False
  564. if chunk and self.state==State.RUNNING:
  565. yield chunk
  566. except Exception as e:
  567. logger.exception('cosyvoice')
  568. def stream_tts(self,audio_stream,msg:tuple[str, dict]):
  569. text,textevent = msg
  570. first = True
  571. for chunk in audio_stream:
  572. if chunk is not None and len(chunk)>0:
  573. stream = np.frombuffer(chunk, dtype=np.int16).astype(np.float32) / 32767
  574. stream = resampy.resample(x=stream, sr_orig=24000, sr_new=self.sample_rate)
  575. #byte_stream=BytesIO(buffer)
  576. #stream = self.__create_bytes_stream(byte_stream)
  577. streamlen = stream.shape[0]
  578. idx=0
  579. while streamlen >= self.chunk:
  580. eventpoint={}
  581. if first:
  582. eventpoint={'status':'start','text':text}
  583. eventpoint.update(**textevent)
  584. first = False
  585. self.parent.put_audio_frame(stream[idx:idx+self.chunk],eventpoint)
  586. streamlen -= self.chunk
  587. idx += self.chunk
  588. eventpoint={'status':'end','text':text}
  589. eventpoint.update(**textevent)
  590. self.parent.put_audio_frame(np.zeros(self.chunk,np.float32),eventpoint)
  591. ###########################################################################################
  592. _PROTOCOL = "https://"
  593. _HOST = "tts.cloud.tencent.com"
  594. _PATH = "/stream"
  595. _ACTION = "TextToStreamAudio"
  596. class TencentTTS(BaseTTS):
  597. def __init__(self, opt, parent):
  598. super().__init__(opt,parent)
  599. self.appid = os.getenv("TENCENT_APPID")
  600. self.secret_key = os.getenv("TENCENT_SECRET_KEY")
  601. self.secret_id = os.getenv("TENCENT_SECRET_ID")
  602. self.voice_type = int(opt.REF_FILE)
  603. self.codec = "pcm"
  604. self.sample_rate = 16000
  605. self.volume = 0
  606. self.speed = 0
  607. def __gen_signature(self, params):
  608. sort_dict = sorted(params.keys())
  609. sign_str = "POST" + _HOST + _PATH + "?"
  610. for key in sort_dict:
  611. sign_str = sign_str + key + "=" + str(params[key]) + '&'
  612. sign_str = sign_str[:-1]
  613. hmacstr = hmac.new(self.secret_key.encode('utf-8'),
  614. sign_str.encode('utf-8'), hashlib.sha1).digest()
  615. s = base64.b64encode(hmacstr)
  616. s = s.decode('utf-8')
  617. return s
  618. def __gen_params(self, session_id, text):
  619. params = dict()
  620. params['Action'] = _ACTION
  621. params['AppId'] = int(self.appid)
  622. params['SecretId'] = self.secret_id
  623. params['ModelType'] = 1
  624. params['VoiceType'] = self.voice_type
  625. params['Codec'] = self.codec
  626. params['SampleRate'] = self.sample_rate
  627. params['Speed'] = self.speed
  628. params['Volume'] = self.volume
  629. params['SessionId'] = session_id
  630. params['Text'] = text
  631. timestamp = int(time.time())
  632. params['Timestamp'] = timestamp
  633. params['Expired'] = timestamp + 24 * 60 * 60
  634. return params
  635. def txt_to_audio(self,msg:tuple[str, dict]):
  636. text,textevent = msg
  637. self.stream_tts(
  638. self.tencent_voice(
  639. text,
  640. self.opt.REF_FILE,
  641. self.opt.REF_TEXT,
  642. "zh", #en args.language,
  643. self.opt.TTS_SERVER, #"http://127.0.0.1:5000", #args.server_url,
  644. ),
  645. msg
  646. )
  647. def tencent_voice(self, text, reffile, reftext,language, server_url) -> Iterator[bytes]:
  648. start = time.perf_counter()
  649. session_id = str(uuid.uuid1())
  650. params = self.__gen_params(session_id, text)
  651. signature = self.__gen_signature(params)
  652. headers = {
  653. "Content-Type": "application/json",
  654. "Authorization": str(signature)
  655. }
  656. url = _PROTOCOL + _HOST + _PATH
  657. try:
  658. res = requests.post(url, headers=headers,
  659. data=json.dumps(params), stream=True)
  660. end = time.perf_counter()
  661. logger.info(f"tencent Time to make POST: {end-start}s")
  662. first = True
  663. for chunk in res.iter_content(chunk_size=6400): # 640 16K*20ms*2
  664. #logger.info('chunk len:%d',len(chunk))
  665. if first:
  666. try:
  667. rsp = json.loads(chunk)
  668. #response["Code"] = rsp["Response"]["Error"]["Code"]
  669. #response["Message"] = rsp["Response"]["Error"]["Message"]
  670. logger.error("tencent tts:%s",rsp["Response"]["Error"]["Message"])
  671. return
  672. except:
  673. end = time.perf_counter()
  674. logger.info(f"tencent Time to first chunk: {end-start}s")
  675. first = False
  676. if chunk and self.state==State.RUNNING:
  677. yield chunk
  678. except Exception as e:
  679. logger.exception('tencent')
  680. def stream_tts(self,audio_stream,msg:tuple[str, dict]):
  681. text,textevent = msg
  682. first = True
  683. last_stream = np.array([],dtype=np.float32)
  684. for chunk in audio_stream:
  685. if chunk is not None and len(chunk)>0:
  686. stream = np.frombuffer(chunk, dtype=np.int16).astype(np.float32) / 32767
  687. stream = np.concatenate((last_stream,stream))
  688. #stream = resampy.resample(x=stream, sr_orig=24000, sr_new=self.sample_rate)
  689. #byte_stream=BytesIO(buffer)
  690. #stream = self.__create_bytes_stream(byte_stream)
  691. streamlen = stream.shape[0]
  692. idx=0
  693. while streamlen >= self.chunk:
  694. eventpoint={}
  695. if first:
  696. eventpoint={'status':'start','text':text}
  697. eventpoint.update(**textevent)
  698. first = False
  699. self.parent.put_audio_frame(stream[idx:idx+self.chunk],eventpoint)
  700. streamlen -= self.chunk
  701. idx += self.chunk
  702. last_stream = stream[idx:] #get the remain stream
  703. eventpoint={'status':'end','text':text}
  704. eventpoint.update(**textevent)
  705. self.parent.put_audio_frame(np.zeros(self.chunk,np.float32),eventpoint)
  706. ###########################################################################################
  707. class DoubaoTTS(BaseTTS):
  708. def __init__(self, opt, parent):
  709. super().__init__(opt, parent)
  710. # 从配置中读取火山引擎参数
  711. self.appid = os.getenv("DOUBAO_APPID")
  712. self.token = os.getenv("DOUBAO_TOKEN")
  713. _cluster = 'volcano_tts'
  714. _host = "openspeech.bytedance.com"
  715. self.api_url = f"wss://{_host}/api/v1/tts/ws_binary"
  716. self.request_json = {
  717. "app": {
  718. "appid": self.appid,
  719. "token": "access_token",
  720. "cluster": _cluster
  721. },
  722. "user": {
  723. "uid": "xxx"
  724. },
  725. "audio": {
  726. "voice_type": "xxx",
  727. "encoding": "pcm",
  728. "rate": 16000,
  729. "speed_ratio": 1.0,
  730. "volume_ratio": 1.0,
  731. "pitch_ratio": 1.0,
  732. },
  733. "request": {
  734. "reqid": "xxx",
  735. "text": "字节跳动语音合成。",
  736. "text_type": "plain",
  737. "operation": "xxx"
  738. }
  739. }
  740. async def doubao_voice(self, text): # -> Iterator[bytes]:
  741. start = time.perf_counter()
  742. voice_type = self.opt.REF_FILE
  743. try:
  744. # 创建请求对象
  745. default_header = bytearray(b'\x11\x10\x11\x00')
  746. submit_request_json = copy.deepcopy(self.request_json)
  747. submit_request_json["user"]["uid"] = self.parent.sessionid
  748. submit_request_json["audio"]["voice_type"] = voice_type
  749. submit_request_json["request"]["text"] = text
  750. submit_request_json["request"]["reqid"] = str(uuid.uuid4())
  751. submit_request_json["request"]["operation"] = "submit"
  752. payload_bytes = str.encode(json.dumps(submit_request_json))
  753. payload_bytes = gzip.compress(payload_bytes) # if no compression, comment this line
  754. full_client_request = bytearray(default_header)
  755. full_client_request.extend((len(payload_bytes)).to_bytes(4, 'big')) # payload size(4 bytes)
  756. full_client_request.extend(payload_bytes) # payload
  757. header = {"Authorization": f"Bearer; {self.token}"}
  758. first = True
  759. async with websockets.connect(self.api_url, extra_headers=header, ping_interval=None) as ws:
  760. await ws.send(full_client_request)
  761. while True:
  762. res = await ws.recv()
  763. header_size = res[0] & 0x0f
  764. message_type = res[1] >> 4
  765. message_type_specific_flags = res[1] & 0x0f
  766. payload = res[header_size*4:]
  767. if message_type == 0xb: # audio-only server response
  768. if message_type_specific_flags == 0: # no sequence number as ACK
  769. #print(" Payload size: 0")
  770. continue
  771. else:
  772. if first:
  773. end = time.perf_counter()
  774. logger.info(f"doubao tts Time to first chunk: {end-start}s")
  775. first = False
  776. sequence_number = int.from_bytes(payload[:4], "big", signed=True)
  777. payload_size = int.from_bytes(payload[4:8], "big", signed=False)
  778. payload = payload[8:]
  779. yield payload
  780. if sequence_number < 0:
  781. break
  782. else:
  783. break
  784. except Exception as e:
  785. logger.exception('doubao')
  786. # # 检查响应状态码
  787. # if response.status_code == 200:
  788. # # 处理响应数据
  789. # audio_data = base64.b64decode(response.json().get('data'))
  790. # yield audio_data
  791. # else:
  792. # logger.error(f"请求失败,状态码: {response.status_code}")
  793. # return
  794. def txt_to_audio(self, msg:tuple[str, dict]):
  795. text, textevent = msg
  796. asyncio.new_event_loop().run_until_complete(
  797. self.stream_tts(
  798. self.doubao_voice(text),
  799. msg
  800. )
  801. )
  802. async def stream_tts(self, audio_stream, msg:tuple[str, dict]):
  803. text, textevent = msg
  804. first = True
  805. last_stream = np.array([],dtype=np.float32)
  806. async for chunk in audio_stream:
  807. if chunk is not None and len(chunk) > 0:
  808. stream = np.frombuffer(chunk, dtype=np.int16).astype(np.float32) / 32767
  809. stream = np.concatenate((last_stream,stream))
  810. #stream = resampy.resample(x=stream, sr_orig=24000, sr_new=self.sample_rate)
  811. # byte_stream=BytesIO(buffer)
  812. # stream = self.__create_bytes_stream(byte_stream)
  813. streamlen = stream.shape[0]
  814. idx = 0
  815. while streamlen >= self.chunk:
  816. eventpoint = {}
  817. if first:
  818. eventpoint={'status':'start','text':text}
  819. eventpoint.update(**textevent)
  820. first = False
  821. self.parent.put_audio_frame(stream[idx:idx + self.chunk], eventpoint)
  822. streamlen -= self.chunk
  823. idx += self.chunk
  824. last_stream = stream[idx:] #get the remain stream
  825. eventpoint={'status':'end','text':text}
  826. eventpoint.update(**textevent)
  827. self.parent.put_audio_frame(np.zeros(self.chunk, np.float32), eventpoint)
  828. ###########################################################################################
  829. class IndexTTS2(BaseTTS):
  830. def __init__(self, opt, parent):
  831. super().__init__(opt, parent)
  832. # IndexTTS2 配置参数
  833. self.server_url = opt.TTS_SERVER # Gradio服务器地址,如 "http://127.0.0.1:7860/"
  834. self.ref_audio_path = opt.REF_FILE # 参考音频文件路径
  835. self.max_tokens = getattr(opt, 'MAX_TOKENS', 120) # 最大token数
  836. # 初始化Gradio客户端
  837. try:
  838. from gradio_client import Client, handle_file
  839. self.client = Client(self.server_url)
  840. self.handle_file = handle_file
  841. logger.info(f"IndexTTS2 Gradio客户端初始化成功: {self.server_url}")
  842. except ImportError:
  843. logger.error("IndexTTS2 需要安装 gradio_client: pip install gradio_client")
  844. raise
  845. except Exception as e:
  846. logger.error(f"IndexTTS2 Gradio客户端初始化失败: {e}")
  847. raise
  848. def txt_to_audio(self, msg):
  849. text, textevent = msg
  850. try:
  851. # 先进行文本分割
  852. segments = self.split_text(text)
  853. if not segments:
  854. logger.error("IndexTTS2 文本分割失败")
  855. return
  856. logger.info(f"IndexTTS2 文本分割为 {len(segments)} 个片段")
  857. # 循环生成每个片段的音频
  858. for i, segment_text in enumerate(segments):
  859. if self.state != State.RUNNING:
  860. break
  861. logger.info(f"IndexTTS2 正在生成第 {i+1}/{len(segments)} 段音频...")
  862. audio_file = self.indextts2_generate(segment_text)
  863. if audio_file:
  864. # 为每个片段创建事件信息
  865. segment_msg = (segment_text, textevent)
  866. self.file_to_stream(audio_file, segment_msg, is_first=(i==0), is_last=(i==len(segments)-1))
  867. else:
  868. logger.error(f"IndexTTS2 第 {i+1} 段音频生成失败")
  869. except Exception as e:
  870. logger.exception(f"IndexTTS2 txt_to_audio 错误: {e}")
  871. def split_text(self, text):
  872. """使用 IndexTTS2 API 分割文本"""
  873. try:
  874. logger.info(f"IndexTTS2 开始分割文本,长度: {len(text)}")
  875. # 调用文本分割 API
  876. result = self.client.predict(
  877. text=text,
  878. max_text_tokens_per_segment=self.max_tokens,
  879. api_name="/on_input_text_change"
  880. )
  881. # 解析分割结果
  882. if 'value' in result and 'data' in result['value']:
  883. data = result['value']['data']
  884. logger.info(f"IndexTTS2 共分割为 {len(data)} 个片段")
  885. segments = []
  886. for i, item in enumerate(data):
  887. 序号 = item[0] + 1
  888. 分句内容 = item[1]
  889. token数 = item[2]
  890. logger.info(f"片段 {序号}: {len(分句内容)} 字符, {token数} tokens")
  891. segments.append(分句内容)
  892. return segments
  893. else:
  894. logger.error(f"IndexTTS2 文本分割结果格式异常: {result}")
  895. return [text] # 如果分割失败,返回原文本
  896. except Exception as e:
  897. logger.exception(f"IndexTTS2 文本分割失败: {e}")
  898. return [text] # 如果分割失败,返回原文本
  899. def indextts2_generate(self, text):
  900. """调用 IndexTTS2 Gradio API 生成语音"""
  901. start = time.perf_counter()
  902. try:
  903. # 调用 gen_single API
  904. result = self.client.predict(
  905. emo_control_method="Same as the voice reference",
  906. prompt=self.handle_file(self.ref_audio_path),
  907. text=text,
  908. emo_ref_path=self.handle_file(self.ref_audio_path),
  909. emo_weight=0.8,
  910. vec1=0.5,
  911. vec2=0,
  912. vec3=0,
  913. vec4=0,
  914. vec5=0,
  915. vec6=0,
  916. vec7=0,
  917. vec8=0,
  918. emo_text="",
  919. emo_random=False,
  920. max_text_tokens_per_segment=self.max_tokens,
  921. param_16=True,
  922. param_17=0.8,
  923. param_18=30,
  924. param_19=0.8,
  925. param_20=0,
  926. param_21=3,
  927. param_22=10,
  928. param_23=1500,
  929. api_name="/gen_single"
  930. )
  931. end = time.perf_counter()
  932. logger.info(f"IndexTTS2 片段生成完成,耗时: {end-start:.2f}s")
  933. # 返回生成的音频文件路径
  934. if 'value' in result:
  935. audio_file = result['value']
  936. return audio_file
  937. else:
  938. logger.error(f"IndexTTS2 结果格式异常: {result}")
  939. return None
  940. except Exception as e:
  941. logger.exception(f"IndexTTS2 API调用失败: {e}")
  942. return None
  943. def file_to_stream(self, audio_file, msg, is_first=False, is_last=False):
  944. """将音频文件转换为音频流"""
  945. text, textevent = msg
  946. try:
  947. # 读取音频文件
  948. stream, sample_rate = sf.read(audio_file)
  949. logger.info(f'IndexTTS2 音频文件 {sample_rate}Hz: {stream.shape}')
  950. # 转换为float32
  951. stream = stream.astype(np.float32)
  952. # 如果是多声道,只取第一个声道
  953. if stream.ndim > 1:
  954. logger.info(f'IndexTTS2 音频有 {stream.shape[1]} 个声道,只使用第一个')
  955. stream = stream[:, 0]
  956. # 重采样到目标采样率
  957. if sample_rate != self.sample_rate and stream.shape[0] > 0:
  958. logger.info(f'IndexTTS2 重采样: {sample_rate}Hz -> {self.sample_rate}Hz')
  959. stream = resampy.resample(x=stream, sr_orig=sample_rate, sr_new=self.sample_rate)
  960. # 分块发送音频流
  961. streamlen = stream.shape[0]
  962. idx = 0
  963. first_chunk = True
  964. while streamlen >= self.chunk and self.state == State.RUNNING:
  965. eventpoint = None
  966. # 只在第一个片段的第一个chunk发送start事件
  967. if is_first and first_chunk:
  968. eventpoint = {'status': 'start', 'text': text, 'msgevent': textevent}
  969. first_chunk = False
  970. self.parent.put_audio_frame(stream[idx:idx + self.chunk], eventpoint)
  971. idx += self.chunk
  972. streamlen -= self.chunk
  973. # 只在最后一个片段发送end事件
  974. if is_last:
  975. eventpoint = {'status': 'end', 'text': text, 'msgevent': textevent}
  976. self.parent.put_audio_frame(np.zeros(self.chunk, np.float32), eventpoint)
  977. # 清理临时文件
  978. try:
  979. if os.path.exists(audio_file):
  980. os.remove(audio_file)
  981. logger.info(f"IndexTTS2 已删除临时文件: {audio_file}")
  982. except Exception as e:
  983. logger.warning(f"IndexTTS2 删除临时文件失败: {e}")
  984. except Exception as e:
  985. logger.exception(f"IndexTTS2 音频流处理失败: {e}")
  986. ###########################################################################################
  987. class XTTS(BaseTTS):
  988. def __init__(self, opt, parent):
  989. super().__init__(opt,parent)
  990. self.speaker = self.get_speaker(opt.REF_FILE, opt.TTS_SERVER)
  991. def txt_to_audio(self,msg:tuple[str, dict]):
  992. text,textevent = msg
  993. self.stream_tts(
  994. self.xtts(
  995. text,
  996. self.speaker,
  997. "zh-cn", #en args.language,
  998. self.opt.TTS_SERVER, #"http://localhost:9000", #args.server_url,
  999. "20" #args.stream_chunk_size
  1000. ),
  1001. msg
  1002. )
  1003. def get_speaker(self,ref_audio,server_url):
  1004. files = {"wav_file": ("reference.wav", open(ref_audio, "rb"))}
  1005. response = requests.post(f"{server_url}/clone_speaker", files=files)
  1006. return response.json()
  1007. def xtts(self,text, speaker, language, server_url, stream_chunk_size) -> Iterator[bytes]:
  1008. start = time.perf_counter()
  1009. speaker["text"] = text
  1010. speaker["language"] = language
  1011. speaker["stream_chunk_size"] = stream_chunk_size # you can reduce it to get faster response, but degrade quality
  1012. try:
  1013. res = requests.post(
  1014. f"{server_url}/tts_stream",
  1015. json=speaker,
  1016. stream=True,
  1017. )
  1018. end = time.perf_counter()
  1019. logger.info(f"xtts Time to make POST: {end-start}s")
  1020. if res.status_code != 200:
  1021. print("Error:", res.text)
  1022. return
  1023. first = True
  1024. for chunk in res.iter_content(chunk_size=None): #24K*20ms*2
  1025. if first:
  1026. end = time.perf_counter()
  1027. logger.info(f"xtts Time to first chunk: {end-start}s")
  1028. first = False
  1029. if chunk:
  1030. yield chunk
  1031. except Exception as e:
  1032. print(e)
  1033. def stream_tts(self,audio_stream,msg:tuple[str, dict]):
  1034. text,textevent = msg
  1035. first = True
  1036. last_stream = np.array([],dtype=np.float32)
  1037. for chunk in audio_stream:
  1038. if chunk is not None and len(chunk)>0:
  1039. stream = np.frombuffer(chunk, dtype=np.int16).astype(np.float32) / 32767
  1040. stream = resampy.resample(x=stream, sr_orig=24000, sr_new=self.sample_rate)
  1041. stream = np.concatenate((last_stream,stream))
  1042. #byte_stream=BytesIO(buffer)
  1043. #stream = self.__create_bytes_stream(byte_stream)
  1044. streamlen = stream.shape[0]
  1045. idx=0
  1046. while streamlen >= self.chunk:
  1047. eventpoint={}
  1048. if first:
  1049. eventpoint={'status':'start','text':text}
  1050. eventpoint.update(**textevent)
  1051. first = False
  1052. self.parent.put_audio_frame(stream[idx:idx+self.chunk],eventpoint)
  1053. streamlen -= self.chunk
  1054. idx += self.chunk
  1055. last_stream = stream[idx:] #get the remain stream
  1056. eventpoint={'status':'end','text':text}
  1057. eventpoint.update(**textevent)
  1058. self.parent.put_audio_frame(np.zeros(self.chunk,np.float32),eventpoint)
  1059. ###########################################################################################
  1060. class AzureTTS(BaseTTS):
  1061. CHUNK_SIZE = 640 # 16kHz, 20ms, 16-bit Mono PCM size
  1062. def __init__(self, opt, parent):
  1063. super().__init__(opt,parent)
  1064. self.audio_buffer = b''
  1065. voicename = self.opt.REF_FILE # 比如"zh-CN-XiaoxiaoMultilingualNeural"
  1066. speech_key = os.getenv("AZURE_SPEECH_KEY")
  1067. tts_region = os.getenv("AZURE_TTS_REGION")
  1068. speech_endpoint = f"wss://{tts_region}.tts.speech.microsoft.com/cognitiveservices/websocket/v2"
  1069. speech_config = speechsdk.SpeechConfig(subscription=speech_key,endpoint=speech_endpoint)
  1070. speech_config.speech_synthesis_voice_name = voicename
  1071. speech_config.set_speech_synthesis_output_format(speechsdk.SpeechSynthesisOutputFormat.Raw16Khz16BitMonoPcm)
  1072. # 获取内存中流形式的结果
  1073. self.speech_synthesizer = speechsdk.SpeechSynthesizer(speech_config=speech_config, audio_config=None)
  1074. self.speech_synthesizer.synthesizing.connect(self._on_synthesizing)
  1075. def txt_to_audio(self,msg:tuple[str, dict]):
  1076. msg_text: str = msg[0]
  1077. result=self.speech_synthesizer.speak_text(msg_text)
  1078. # 延迟指标
  1079. fb_latency = int(result.properties.get_property(
  1080. speechsdk.PropertyId.SpeechServiceResponse_SynthesisFirstByteLatencyMs
  1081. ))
  1082. fin_latency = int(result.properties.get_property(
  1083. speechsdk.PropertyId.SpeechServiceResponse_SynthesisFinishLatencyMs
  1084. ))
  1085. logger.info(f"azure音频生成相关:首字节延迟: {fb_latency} ms, 完成延迟: {fin_latency} ms, result_id: {result.result_id}")
  1086. # === 回调 ===
  1087. def _on_synthesizing(self, evt: speechsdk.SpeechSynthesisEventArgs):
  1088. if evt.result.reason == speechsdk.ResultReason.SynthesizingAudioCompleted:
  1089. logger.info("SynthesizingAudioCompleted")
  1090. elif evt.result.reason == speechsdk.ResultReason.Canceled:
  1091. cancellation_details = evt.result.cancellation_details
  1092. logger.info(f"Speech synthesis canceled: {cancellation_details.reason}")
  1093. if cancellation_details.reason == speechsdk.CancellationReason.Error:
  1094. if cancellation_details.error_details:
  1095. logger.info(f"Error details: {cancellation_details.error_details}")
  1096. if self.state != State.RUNNING:
  1097. self.audio_buffer = b''
  1098. return
  1099. # evt.result.audio_data 是刚到的一小段原始 PCM
  1100. self.audio_buffer += evt.result.audio_data
  1101. while len(self.audio_buffer) >= self.CHUNK_SIZE:
  1102. chunk = self.audio_buffer[:self.CHUNK_SIZE]
  1103. self.audio_buffer = self.audio_buffer[self.CHUNK_SIZE:]
  1104. frame = (np.frombuffer(chunk, dtype=np.int16)
  1105. .astype(np.float32) / 32767.0)
  1106. self.parent.put_audio_frame(frame)