basereal.py 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669
  1. ###############################################################################
  2. # Copyright (C) 2024 LiveTalking@lipku https://github.com/lipku/LiveTalking
  3. # email: lipku@foxmail.com
  4. #
  5. # Licensed under the Apache License, Version 2.0 (the "License");
  6. # you may not use this file except in compliance with the License.
  7. # You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. ###############################################################################
  17. import math
  18. import torch
  19. import numpy as np
  20. import subprocess
  21. import os
  22. import time
  23. import cv2
  24. import glob
  25. import resampy
  26. import queue
  27. from queue import Queue
  28. from threading import Thread, Event
  29. from io import BytesIO
  30. import soundfile as sf
  31. import asyncio
  32. from av import AudioFrame, VideoFrame
  33. import av
  34. from fractions import Fraction
  35. from ttsreal import EdgeTTS,SovitsTTS,XTTS,CosyVoiceTTS,FishTTS,TencentTTS,DoubaoTTS,IndexTTS2,AzureTTS,State
  36. from logger import logger
  37. from tqdm import tqdm
  38. from threading import Lock
  39. def read_imgs(img_list):
  40. frames = []
  41. logger.info('reading images...')
  42. for img_path in tqdm(img_list):
  43. frame = cv2.imread(img_path)
  44. frames.append(frame)
  45. return frames
  46. def play_audio(quit_event,queue):
  47. import pyaudio
  48. p = pyaudio.PyAudio()
  49. stream = p.open(
  50. rate=16000,
  51. channels=1,
  52. format=8,
  53. output=True,
  54. output_device_index=1,
  55. )
  56. stream.start_stream()
  57. # while queue.qsize() <= 0:
  58. # time.sleep(0.1)
  59. while not quit_event.is_set():
  60. stream.write(queue.get(block=True))
  61. stream.close()
  62. class BaseReal:
  63. def __init__(self, opt,model,avatar):
  64. #原本的播放序列
  65. self.msg_queue = queue.Queue()
  66. self.interrupted_queue = queue.Queue() # 用于存储被中断的内容(如介绍)
  67. self.user_question_queue = queue.Queue() # 用于存储用户问题(优先级更高)
  68. self.is_speaking_flag = False
  69. self.current_text = "" # 当前正在播放的文本
  70. self.current_pos = 0 # 当前正在播放的文本位置
  71. self.interrupt_lock = Lock() #线程安全锁
  72. self.opt = opt
  73. self.sample_rate = 16000
  74. self.chunk = self.sample_rate // opt.fps # 320 samples per chunk (20ms * 16000 / 1000)
  75. self.sessionid = self.opt.sessionid
  76. if opt.tts == "edgetts":
  77. self.tts = EdgeTTS(opt,self)
  78. elif opt.tts == "qwen3tts":
  79. from qwen3tts import Qwen3TTS
  80. self.tts = Qwen3TTS(opt,self)
  81. elif opt.tts == "gpt-sovits":
  82. self.tts = SovitsTTS(opt,self)
  83. elif opt.tts == "xtts":
  84. self.tts = XTTS(opt,self)
  85. elif opt.tts == "cosyvoice":
  86. self.tts = CosyVoiceTTS(opt,self)
  87. elif opt.tts == "fishtts":
  88. self.tts = FishTTS(opt,self)
  89. elif opt.tts == "tencent":
  90. self.tts = TencentTTS(opt,self)
  91. elif opt.tts == "doubao":
  92. self.tts = DoubaoTTS(opt,self)
  93. elif opt.tts == "indextts2":
  94. self.tts = IndexTTS2(opt,self)
  95. elif opt.tts == "azuretts":
  96. self.tts = AzureTTS(opt,self)
  97. else:
  98. # 默认使用 edgetts
  99. logger.warning(f"未知的 TTS 类型: {opt.tts},使用 edgetts 作为默认")
  100. self.tts = EdgeTTS(opt,self)
  101. self.speaking = False
  102. self.recording = False
  103. self._record_video_pipe = None
  104. self._record_audio_pipe = None
  105. self.width = self.height = 0
  106. self.curr_state=0
  107. self.custom_img_cycle = {}
  108. self.custom_audio_cycle = {}
  109. self.custom_audio_index = {}
  110. self.custom_index = {}
  111. self.custom_opt = {}
  112. self.__loadcustom()
  113. def put_msg_txt(self,msg,datainfo:dict={}):
  114. self.tts.put_msg_txt(msg,datainfo)
  115. def put_audio_frame(self,audio_chunk,datainfo:dict={}): #16khz 20ms pcm
  116. self.asr.put_audio_frame(audio_chunk,datainfo)
  117. def put_audio_file(self,filebyte,datainfo:dict={}):
  118. input_stream = BytesIO(filebyte)
  119. stream = self.__create_bytes_stream(input_stream)
  120. streamlen = stream.shape[0]
  121. idx=0
  122. while streamlen >= self.chunk: #and self.state==State.RUNNING
  123. self.put_audio_frame(stream[idx:idx+self.chunk],datainfo)
  124. streamlen -= self.chunk
  125. idx += self.chunk
  126. def __create_bytes_stream(self,byte_stream):
  127. #byte_stream=BytesIO(buffer)
  128. stream, sample_rate = sf.read(byte_stream) # [T*sample_rate,] float64
  129. logger.info(f'[INFO]put audio stream {sample_rate}: {stream.shape}')
  130. stream = stream.astype(np.float32)
  131. if stream.ndim > 1:
  132. logger.info(f'[WARN] audio has {stream.shape[1]} channels, only use the first.')
  133. stream = stream[:, 0]
  134. if sample_rate != self.sample_rate and stream.shape[0]>0:
  135. logger.info(f'[WARN] audio sample rate is {sample_rate}, resampling into {self.sample_rate}.')
  136. stream = resampy.resample(x=stream, sr_orig=sample_rate, sr_new=self.sample_rate)
  137. return stream
  138. def stop_tts(self):
  139. """停止当前TTS播放"""
  140. if hasattr(self.tts, 'state'):
  141. self.tts.state = State.PAUSE
  142. # 设置打断标志,强制中断当前播放
  143. if hasattr(self.tts, 'set_interrupt_flag'):
  144. self.tts.set_interrupt_flag()
  145. # 清空TTS内部的普通队列,但保留高优先级队列(用于用户问题)
  146. if hasattr(self.tts, 'msgqueue'):
  147. with self.tts.msgqueue.mutex:
  148. self.tts.msgqueue.queue.clear()
  149. # 注意:不清空高优先级队列,以确保用户问题能够被处理
  150. # 清空输入音频流缓冲区
  151. if hasattr(self.tts, 'input_stream'):
  152. if hasattr(self.tts.input_stream, 'seek') and hasattr(self.tts.input_stream, 'truncate'):
  153. self.tts.input_stream.seek(0)
  154. self.tts.input_stream.truncate()
  155. self.is_speaking_flag = False
  156. self.current_text = ""
  157. self.current_pos = 0
  158. def flush_talk(self):
  159. """打断当前播放并保存未完成的内容"""
  160. with self.interrupt_lock:
  161. # 1. 保存当前正在播、但没播完的文本(关键)
  162. if self.current_text and self.current_pos < len(self.current_text):
  163. unfinished = self.current_text[self.current_pos:]
  164. if unfinished.strip():
  165. self.interrupted_queue.put(unfinished)
  166. # 2. 把 msg_queue 里剩余的全部移到 interrupted_queue(不丢弃)
  167. while not self.msg_queue.empty():
  168. self.interrupted_queue.put(self.msg_queue.get())
  169. # 3.停止当前TTS播放
  170. self.stop_tts()
  171. # 4. 清空当前播放状态
  172. self.is_speaking_flag = False
  173. self.current_text = ""
  174. self.current_pos = 0
  175. # 5. 不要重置打断标志,保持中断状态直到新内容开始播放
  176. # self.tts.flush_talk()
  177. # self.asr.flush_talk()
  178. def handle_interruption_during_intro(self, user_question, datainfo:dict={}):
  179. """处理在介绍过程中用户提问的逻辑,暂停介绍并优先回答问题"""
  180. with self.interrupt_lock:
  181. # 1. 保存当前正在播放的介绍内容到中断队列
  182. if self.current_text and self.current_pos < len(self.current_text):
  183. unfinished_intro = self.current_text[self.current_pos:]
  184. if unfinished_intro.strip():
  185. self.interrupted_queue.put(unfinished_intro)
  186. logger.info(f"保存未完成的介绍内容: {unfinished_intro[:50]}...")
  187. # 2. 将消息队列中剩余的介绍内容也保存到中断队列
  188. remaining_items = []
  189. while not self.msg_queue.empty():
  190. remaining_items.append(self.msg_queue.get())
  191. if remaining_items:
  192. for item in remaining_items:
  193. self.interrupted_queue.put(item)
  194. logger.info(f"保存了 {len(remaining_items)} 个剩余的介绍内容到中断队列")
  195. # 3. 停止TTS播放(注意:这会将TTS状态设为PAUSE,但我们随后会处理用户问题)
  196. self.stop_tts()
  197. # 4. 清空当前播放状态
  198. self.is_speaking_flag = False
  199. self.current_text = ""
  200. self.current_pos = 0
  201. # 5. 确保TTS状态设置为RUNNING以处理用户问题
  202. if hasattr(self.tts, 'state'):
  203. self.tts.state = State.RUNNING
  204. # 6. 重置打断标志,允许播放新内容
  205. if hasattr(self.tts, 'reset_interrupt_flag'):
  206. self.tts.reset_interrupt_flag()
  207. # 7. 优先播放用户问题
  208. self.tts.put_high_priority_msg(user_question, datainfo)
  209. # 8. 立即尝试处理高优先级队列中的消息
  210. # 通过直接调用TTS处理方法,绕过正常的队列处理延迟
  211. logger.info("已暂停介绍并开始播放用户问题")
  212. # 9. 唤醒TTS处理线程,确保立即处理高优先级消息
  213. # 通过向普通队列添加一个空消息来触发处理循环
  214. if hasattr(self.tts, 'msgqueue'):
  215. self.tts.msgqueue.put(("", {}))
  216. def put_user_question(self, msg, datainfo:dict={}):
  217. """专门用于处理用户问题,优先级高于普通消息"""
  218. with self.interrupt_lock:
  219. # 强制中断当前播放
  220. self.flush_talk()
  221. # 直接播放用户问题(使用高优先级队列)
  222. self.tts.put_high_priority_msg(msg, datainfo)
  223. # 确保TTS状态设置为RUNNING以处理用户问题
  224. if hasattr(self.tts, 'state'):
  225. self.tts.state = State.RUNNING
  226. # 重置打断标志,允许播放新内容
  227. if hasattr(self.tts, 'reset_interrupt_flag'):
  228. self.tts.reset_interrupt_flag()
  229. # 唤醒TTS处理线程,确保立即处理高优先级消息
  230. # 通过向普通队列添加一个空消息来触发处理循环
  231. if hasattr(self.tts, 'msgqueue'):
  232. self.tts.msgqueue.put(("", {}))
  233. def process_user_questions_and_resume(self):
  234. """处理用户问题并恢复之前的内容"""
  235. with self.interrupt_lock:
  236. # 检查是否有用户问题需要处理
  237. while not self.user_question_queue.empty():
  238. msg, datainfo = self.user_question_queue.get()
  239. # 直接播放用户问题
  240. self.tts.put_msg_txt(msg, datainfo)
  241. # 检查是否有被中断的内容需要恢复
  242. if not self.interrupted_queue.empty():
  243. # 将被中断的内容放回主队列
  244. while not self.interrupted_queue.empty():
  245. msg = self.interrupted_queue.get()
  246. self.msg_queue.put(msg)
  247. def resume_interrupted(self):
  248. """把 interrupted_queue 里的内容放回播放队列"""
  249. with self.interrupt_lock:
  250. resumed = False
  251. items_to_resume = []
  252. # 先把所有中断的内容取出
  253. while not self.interrupted_queue.empty():
  254. items_to_resume.append(self.interrupted_queue.get())
  255. resumed = True
  256. # 再按顺序放回主队列
  257. for item in items_to_resume:
  258. self.msg_queue.put(item)
  259. return resumed
  260. # """恢复播放被中断的消息"""
  261. # return self.tts.resume_interrupted()
  262. def start_intro_with_interrupt_capability(self, intro_text, datainfo:dict={}):
  263. """开始播放介绍内容,同时保持接收用户提问的能力"""
  264. # 将介绍内容放入主消息队列
  265. self.put_msg_txt(intro_text, datainfo)
  266. def is_speaking(self)->bool:
  267. return self.speaking
  268. def __loadcustom(self):
  269. for item in self.opt.customopt:
  270. logger.info(item)
  271. input_img_list = glob.glob(os.path.join(item['imgpath'], '*.[jpJP][pnPN]*[gG]'))
  272. input_img_list = sorted(input_img_list, key=lambda x: int(os.path.splitext(os.path.basename(x))[0]))
  273. self.custom_img_cycle[item['audiotype']] = read_imgs(input_img_list)
  274. self.custom_audio_cycle[item['audiotype']], sample_rate = sf.read(item['audiopath'], dtype='float32')
  275. self.custom_audio_index[item['audiotype']] = 0
  276. self.custom_index[item['audiotype']] = 0
  277. self.custom_opt[item['audiotype']] = item
  278. def init_customindex(self):
  279. self.curr_state=0
  280. for key in self.custom_audio_index:
  281. self.custom_audio_index[key]=0
  282. for key in self.custom_index:
  283. self.custom_index[key]=0
  284. def notify(self,eventpoint):
  285. logger.info("notify:%s",eventpoint)
  286. # 检查是否是用户问题回答的结束事件
  287. if isinstance(eventpoint, dict) and eventpoint.get('status') == 'end':
  288. # 如果是用户问题的回答结束,检查是否需要恢复被中断的内容
  289. # 检查这个结束事件是否与用户问题相关
  290. if 'knowledge_base' in eventpoint or self.interrupted_queue.qsize() > 0:
  291. # 这是一个用户问题的回答结束,检查是否需要恢复被中断的内容
  292. import threading
  293. # 使用线程延时一小段时间再恢复,确保当前事件处理完成
  294. timer = threading.Timer(0.5, self._try_resume_after_qa)
  295. timer.start()
  296. # 注意:介绍播放的自动续播不再通过notify触发,而是在TTS处理完成后自动触发
  297. # 这样可以确保前一条完全播放完成后再播放下一条
  298. def _try_resume_after_qa(self):
  299. """尝试恢复问答后的内容"""
  300. with self.interrupt_lock:
  301. # 检查是否还有被中断的内容需要恢复
  302. if not self.interrupted_queue.empty():
  303. logger.info("检测到问答完成,恢复被中断的内容...")
  304. # 有被中断的内容需要恢复,调用恢复方法
  305. self.resume_interrupted()
  306. def _continue_intro_play(self):
  307. """继续播放下一条介绍内容"""
  308. try:
  309. # 检查是否处于介绍播放状态
  310. if not (hasattr(self, 'intro_play_state') and
  311. self.intro_play_state.get('is_playing', False) and
  312. not self.intro_play_state.get('is_paused', False)):
  313. logger.info("不处于介绍播放状态,跳过自动续播")
  314. return
  315. # 检查TTS队列是否为空,如果不为空说明还有消息在等待,不放入新消息
  316. if hasattr(self, 'tts') and hasattr(self.tts, 'msgqueue'):
  317. if not self.tts.msgqueue.empty():
  318. logger.info("TTS消息队列不为空,等待队列清空后再播放下一条")
  319. # 设置等待标志,并设置定时器再次检查
  320. self.intro_play_state['is_waiting_next'] = True
  321. import threading
  322. timer = threading.Timer(2.0, self._continue_intro_play)
  323. timer.start()
  324. return
  325. # 设置等待标志,防止重复触发
  326. if hasattr(self, 'intro_play_state'):
  327. self.intro_play_state['is_waiting_next'] = True
  328. if hasattr(self, 'knowledge_intro_instance') and self.knowledge_intro_instance:
  329. # 获取下一条介绍内容
  330. next_content = self.knowledge_intro_instance._get_next_content()
  331. if next_content and next_content.get('text'):
  332. logger.info(f"自动续播介绍内容 - 序号:{next_content.get('play_index', 1)}/{next_content.get('total_count', 1)}")
  333. # 更新播放状态
  334. if hasattr(self, 'intro_play_state'):
  335. self.intro_play_state["last_played_index"] = next_content.get("play_index", 1)
  336. # 计算停顿时间:根据文本长度估算播放时间 + 额外停顿
  337. # 语速约 3 字/秒(较慢的播报速度)
  338. text = next_content.get('text', '')
  339. estimated_play_time = len(text) / 3 # 估算播放时间(秒)
  340. pause_time = estimated_play_time + 3 # 播放时间 + 3秒停顿
  341. logger.info(f"当前文案长度:{len(text)}字,估算播放时间:{estimated_play_time:.1f}秒,播放下一条需等待:{pause_time:.1f}秒")
  342. # 播放下一条内容
  343. self.put_msg_txt(next_content['text'])
  344. # 设置定时器,在播放完成后继续播放下一条
  345. import threading
  346. timer = threading.Timer(pause_time, self._continue_intro_play)
  347. timer.start()
  348. else:
  349. # 没有更多内容了,标记播放完成
  350. logger.info("介绍内容全部播放完成")
  351. if hasattr(self, 'intro_play_state'):
  352. self.intro_play_state["is_playing"] = False
  353. self.intro_play_state['is_waiting_next'] = False
  354. except Exception as e:
  355. logger.error(f"自动续播介绍内容时出错: {e}")
  356. # 确保错误时也重置等待标志
  357. if hasattr(self, 'intro_play_state'):
  358. self.intro_play_state['is_waiting_next'] = False
  359. def _reset_waiting_flag(self):
  360. """重置等待标志,允许播放下一条"""
  361. if hasattr(self, 'intro_play_state'):
  362. self.intro_play_state['is_waiting_next'] = False
  363. logger.info("等待时间结束,可以播放下一条")
  364. def start_recording(self):
  365. """开始录制视频"""
  366. if self.recording:
  367. return
  368. command = ['ffmpeg',
  369. '-y', '-an',
  370. '-f', 'rawvideo',
  371. '-vcodec','rawvideo',
  372. '-pix_fmt', 'bgr24', #像素格式
  373. '-s', "{}x{}".format(self.width, self.height),
  374. '-r', str(25),
  375. '-i', '-',
  376. '-pix_fmt', 'yuv420p',
  377. '-vcodec', "h264",
  378. #'-f' , 'flv',
  379. f'temp{self.opt.sessionid}.mp4']
  380. self._record_video_pipe = subprocess.Popen(command, shell=False, stdin=subprocess.PIPE)
  381. acommand = ['ffmpeg',
  382. '-y', '-vn',
  383. '-f', 's16le',
  384. #'-acodec','pcm_s16le',
  385. '-ac', '1',
  386. '-ar', '16000',
  387. '-i', '-',
  388. '-acodec', 'aac',
  389. #'-f' , 'wav',
  390. f'temp{self.opt.sessionid}.aac']
  391. self._record_audio_pipe = subprocess.Popen(acommand, shell=False, stdin=subprocess.PIPE)
  392. self.recording = True
  393. # self.recordq_video.queue.clear()
  394. # self.recordq_audio.queue.clear()
  395. # self.container = av.open(path, mode="w")
  396. # process_thread = Thread(target=self.record_frame, args=())
  397. # process_thread.start()
  398. def record_video_data(self,image):
  399. if self.width == 0:
  400. print("image.shape:",image.shape)
  401. self.height,self.width,_ = image.shape
  402. if self.recording:
  403. self._record_video_pipe.stdin.write(image.tostring())
  404. def record_audio_data(self,frame):
  405. if self.recording:
  406. self._record_audio_pipe.stdin.write(frame.tostring())
  407. # def record_frame(self):
  408. # videostream = self.container.add_stream("libx264", rate=25)
  409. # videostream.codec_context.time_base = Fraction(1, 25)
  410. # audiostream = self.container.add_stream("aac")
  411. # audiostream.codec_context.time_base = Fraction(1, 16000)
  412. # init = True
  413. # framenum = 0
  414. # while self.recording:
  415. # try:
  416. # videoframe = self.recordq_video.get(block=True, timeout=1)
  417. # videoframe.pts = framenum #int(round(framenum*0.04 / videostream.codec_context.time_base))
  418. # videoframe.dts = videoframe.pts
  419. # if init:
  420. # videostream.width = videoframe.width
  421. # videostream.height = videoframe.height
  422. # init = False
  423. # for packet in videostream.encode(videoframe):
  424. # self.container.mux(packet)
  425. # for k in range(2):
  426. # audioframe = self.recordq_audio.get(block=True, timeout=1)
  427. # audioframe.pts = int(round((framenum*2+k)*0.02 / audiostream.codec_context.time_base))
  428. # audioframe.dts = audioframe.pts
  429. # for packet in audiostream.encode(audioframe):
  430. # self.container.mux(packet)
  431. # framenum += 1
  432. # except queue.Empty:
  433. # print('record queue empty,')
  434. # continue
  435. # except Exception as e:
  436. # print(e)
  437. # #break
  438. # for packet in videostream.encode(None):
  439. # self.container.mux(packet)
  440. # for packet in audiostream.encode(None):
  441. # self.container.mux(packet)
  442. # self.container.close()
  443. # self.recordq_video.queue.clear()
  444. # self.recordq_audio.queue.clear()
  445. # print('record thread stop')
  446. def stop_recording(self):
  447. """停止录制视频"""
  448. if not self.recording:
  449. return
  450. self.recording = False
  451. self._record_video_pipe.stdin.close() #wait()
  452. self._record_video_pipe.wait()
  453. self._record_audio_pipe.stdin.close()
  454. self._record_audio_pipe.wait()
  455. cmd_combine_audio = f"ffmpeg -y -i temp{self.opt.sessionid}.aac -i temp{self.opt.sessionid}.mp4 -c:v copy -c:a copy data/record.mp4"
  456. os.system(cmd_combine_audio)
  457. #os.remove(output_path)
  458. def mirror_index(self,size, index):
  459. #size = len(self.coord_list_cycle)
  460. turn = index // size
  461. res = index % size
  462. if turn % 2 == 0:
  463. return res
  464. else:
  465. return size - res - 1
  466. def get_audio_stream(self,audiotype):
  467. idx = self.custom_audio_index[audiotype]
  468. stream = self.custom_audio_cycle[audiotype][idx:idx+self.chunk]
  469. self.custom_audio_index[audiotype] += self.chunk
  470. if self.custom_audio_index[audiotype]>=self.custom_audio_cycle[audiotype].shape[0]:
  471. self.curr_state = 1 #当前视频不循环播放,切换到静音状态
  472. return stream
  473. def set_custom_state(self,audiotype, reinit=True):
  474. print('set_custom_state:',audiotype)
  475. if self.custom_audio_index.get(audiotype) is None:
  476. return
  477. self.curr_state = audiotype
  478. if reinit:
  479. self.custom_audio_index[audiotype] = 0
  480. self.custom_index[audiotype] = 0
  481. def process_frames(self,quit_event,loop=None,audio_track=None,video_track=None):
  482. enable_transition = False # 设置为False禁用过渡效果,True启用
  483. if enable_transition:
  484. _last_speaking = False
  485. _transition_start = time.time()
  486. _transition_duration = 0.1 # 过渡时间
  487. _last_silent_frame = None # 静音帧缓存
  488. _last_speaking_frame = None # 说话帧缓存
  489. if self.opt.transport=='virtualcam':
  490. import pyvirtualcam
  491. vircam = None
  492. audio_tmp = queue.Queue(maxsize=3000)
  493. audio_thread = Thread(target=play_audio, args=(quit_event,audio_tmp,), daemon=True, name="pyaudio_stream")
  494. audio_thread.start()
  495. while not quit_event.is_set():
  496. try:
  497. res_frame,idx,audio_frames = self.res_frame_queue.get(block=True, timeout=1)
  498. except queue.Empty:
  499. continue
  500. if enable_transition:
  501. # 检测状态变化
  502. current_speaking = not (audio_frames[0][1]!=0 and audio_frames[1][1]!=0)
  503. if current_speaking != _last_speaking:
  504. logger.info(f"状态切换:{'说话' if _last_speaking else '静音'} → {'说话' if current_speaking else '静音'}")
  505. _transition_start = time.time()
  506. _last_speaking = current_speaking
  507. if audio_frames[0][1]!=0 and audio_frames[1][1]!=0: #全为静音数据,只需要取fullimg
  508. self.speaking = False
  509. audiotype = audio_frames[0][1]
  510. if self.custom_index.get(audiotype) is not None: #有自定义视频
  511. mirindex = self.mirror_index(len(self.custom_img_cycle[audiotype]),self.custom_index[audiotype])
  512. target_frame = self.custom_img_cycle[audiotype][mirindex]
  513. self.custom_index[audiotype] += 1
  514. else:
  515. target_frame = self.frame_list_cycle[idx]
  516. if enable_transition:
  517. # 说话→静音过渡
  518. if time.time() - _transition_start < _transition_duration and _last_speaking_frame is not None:
  519. alpha = min(1.0, (time.time() - _transition_start) / _transition_duration)
  520. combine_frame = cv2.addWeighted(_last_speaking_frame, 1-alpha, target_frame, alpha, 0)
  521. else:
  522. combine_frame = target_frame
  523. # 缓存静音帧
  524. _last_silent_frame = combine_frame.copy()
  525. else:
  526. combine_frame = target_frame
  527. else:
  528. self.speaking = True
  529. try:
  530. current_frame = self.paste_back_frame(res_frame,idx)
  531. except Exception as e:
  532. logger.warning(f"paste_back_frame error: {e}")
  533. continue
  534. if enable_transition:
  535. # 静音→说话过渡
  536. if time.time() - _transition_start < _transition_duration and _last_silent_frame is not None:
  537. alpha = min(1.0, (time.time() - _transition_start) / _transition_duration)
  538. combine_frame = cv2.addWeighted(_last_silent_frame, 1-alpha, current_frame, alpha, 0)
  539. else:
  540. combine_frame = current_frame
  541. # 缓存说话帧
  542. _last_speaking_frame = combine_frame.copy()
  543. else:
  544. combine_frame = current_frame
  545. cv2.putText(combine_frame, "LiveTalking", (10, 20), cv2.FONT_HERSHEY_SIMPLEX, 0.3, (128,128,128), 1)
  546. if self.opt.transport=='virtualcam':
  547. if vircam==None:
  548. height, width,_= combine_frame.shape
  549. vircam = pyvirtualcam.Camera(width=width, height=height, fps=25, fmt=pyvirtualcam.PixelFormat.BGR,print_fps=True)
  550. vircam.send(combine_frame)
  551. else: #webrtc
  552. image = combine_frame
  553. new_frame = VideoFrame.from_ndarray(image, format="bgr24")
  554. asyncio.run_coroutine_threadsafe(video_track._queue.put((new_frame,None)), loop)
  555. self.record_video_data(combine_frame)
  556. for audio_frame in audio_frames:
  557. frame,type,eventpoint = audio_frame
  558. frame = (frame * 32767).astype(np.int16)
  559. if self.opt.transport=='virtualcam':
  560. audio_tmp.put(frame.tobytes()) #TODO
  561. else: #webrtc
  562. new_frame = AudioFrame(format='s16', layout='mono', samples=frame.shape[0])
  563. new_frame.planes[0].update(frame.tobytes())
  564. new_frame.sample_rate=16000
  565. asyncio.run_coroutine_threadsafe(audio_track._queue.put((new_frame,eventpoint)), loop)
  566. self.record_audio_data(frame)
  567. if self.opt.transport=='virtualcam':
  568. vircam.sleep_until_next_frame()
  569. if self.opt.transport=='virtualcam':
  570. audio_thread.join()
  571. vircam.close()
  572. logger.info('basereal process_frames thread stop')
  573. # def process_custom(self,audiotype:int,idx:int):
  574. # if self.curr_state!=audiotype: #从推理切到口播
  575. # if idx in self.switch_pos: #在卡点位置可以切换
  576. # self.curr_state=audiotype
  577. # self.custom_index=0
  578. # else:
  579. # self.custom_index+=1