basereal.py 43 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965
  1. import math
  2. import torch
  3. import numpy as np
  4. import subprocess
  5. import os
  6. import time
  7. import cv2
  8. import glob
  9. import resampy
  10. import queue
  11. from queue import Queue
  12. from threading import Thread, Event
  13. from io import BytesIO
  14. import soundfile as sf
  15. import asyncio
  16. from av import AudioFrame, VideoFrame
  17. import av
  18. from fractions import Fraction
  19. from ttsreal import EdgeTTS,SovitsTTS,XTTS,CosyVoiceTTS,FishTTS,TencentTTS,DoubaoTTS,IndexTTS2,AzureTTS,State
  20. from logger import logger
  21. from videoplayer import RealtimeVideoPlayer
  22. from audioplayer import RealtimeAudioPlayer
  23. from tqdm import tqdm
  24. from threading import Lock
  25. def read_imgs(img_list):
  26. frames = []
  27. logger.info('reading images...')
  28. for img_path in tqdm(img_list):
  29. frame = cv2.imread(img_path)
  30. frames.append(frame)
  31. return frames
  32. def play_audio(quit_event,queue):
  33. import pyaudio
  34. p = pyaudio.PyAudio()
  35. stream = p.open(
  36. rate=16000,
  37. channels=1,
  38. format=8,
  39. output=True,
  40. output_device_index=1,
  41. )
  42. stream.start_stream()
  43. # while queue.qsize() <= 0:
  44. # time.sleep(0.1)
  45. while not quit_event.is_set():
  46. stream.write(queue.get(block=True))
  47. stream.close()
  48. class BaseReal:
  49. def __init__(self, opt,model,avatar):
  50. #原本的播放序列 (添加大小限制防止内存泄漏)
  51. self.msg_queue = queue.Queue(maxsize=1000) # 最多1000条消息
  52. self.interrupted_queue = queue.Queue(maxsize=500) # 最多500条中断消息
  53. self.user_question_queue = queue.Queue(maxsize=100) # 最多100个用户问题
  54. self.is_speaking_flag = False
  55. self.current_text = "" # 当前正在播放的文本
  56. self.current_pos = 0 # 当前正在播放的文本位置
  57. self.interrupt_lock = Lock() #线程安全锁
  58. self.opt = opt
  59. self.sample_rate = 16000
  60. self.chunk = self.sample_rate // opt.fps # 320 samples per chunk (20ms * 16000 / 1000)
  61. self.sessionid = self.opt.sessionid
  62. if opt.tts == "edgetts":
  63. self.tts = EdgeTTS(opt,self)
  64. elif opt.tts == "qwen3tts":
  65. from qwen3tts import Qwen3TTS
  66. self.tts = Qwen3TTS(opt,self)
  67. elif opt.tts == "gpt-sovits":
  68. self.tts = SovitsTTS(opt,self)
  69. elif opt.tts == "xtts":
  70. self.tts = XTTS(opt,self)
  71. elif opt.tts == "cosyvoice":
  72. self.tts = CosyVoiceTTS(opt,self)
  73. elif opt.tts == "fishtts":
  74. self.tts = FishTTS(opt,self)
  75. elif opt.tts == "tencent":
  76. self.tts = TencentTTS(opt,self)
  77. elif opt.tts == "doubao":
  78. self.tts = DoubaoTTS(opt,self)
  79. elif opt.tts == "indextts2":
  80. self.tts = IndexTTS2(opt,self)
  81. elif opt.tts == "azuretts":
  82. self.tts = AzureTTS(opt,self)
  83. elif opt.tts == "voxcpm2":
  84. from ttsreal import VoxCPM2TTS
  85. self.tts = VoxCPM2TTS(opt,self)
  86. elif opt.tts == "voxcpm2api":
  87. from voxcpm2_api_tts import VoxCPM2APITTS
  88. self.tts = VoxCPM2APITTS(opt,self)
  89. elif opt.tts == "none":
  90. # 不使用 TTS,使用音频播放模式
  91. self.tts = None
  92. self.video_player = None
  93. self.audio_player = None
  94. # 检查是否指定视频播放目录
  95. if hasattr(opt, 'VIDEO_PLAYBACK_DIR') and opt.VIDEO_PLAYBACK_DIR:
  96. self.video_player = RealtimeVideoPlayer(opt.VIDEO_PLAYBACK_DIR)
  97. logger.info(f"🎬 视频播放模式已启用,监听目录: {opt.VIDEO_PLAYBACK_DIR}")
  98. # 检查是否指定音频播放目录
  99. elif hasattr(opt, 'AUDIO_PLAYBACK_DIR') and opt.AUDIO_PLAYBACK_DIR:
  100. self.audio_player = RealtimeAudioPlayer(opt.AUDIO_PLAYBACK_DIR, sample_rate=16000)
  101. logger.info(f"🎵 音频播放模式已启用,监听目录: {opt.AUDIO_PLAYBACK_DIR}")
  102. else:
  103. logger.warning("⚠️ 未指定 VIDEO_PLAYBACK_DIR 或 AUDIO_PLAYBACK_DIR,播放模式不可用")
  104. else:
  105. # 默认使用 edgetts
  106. logger.warning(f"未知的 TTS 类型: {opt.tts},使用 edgetts 作为默认")
  107. self.tts = EdgeTTS(opt,self)
  108. self.speaking = False
  109. self.recording = False
  110. self._record_video_pipe = None
  111. self._record_audio_pipe = None
  112. self.width = self.height = 0
  113. # RTMP 推流支持
  114. self._rtmp_pushing = False
  115. self._rtmp_video_pipe = None
  116. self._rtmp_audio_pipe = None
  117. self._rtmp_push_url = getattr(opt, 'push_url', '') if hasattr(opt, 'push_url') and opt.transport == 'rtcpush' and getattr(opt, 'push_url', '').startswith('rtmp://') else ''
  118. if self._rtmp_push_url:
  119. logger.info(f'RTMP 推流已配置: {self._rtmp_push_url}')
  120. self.curr_state=0
  121. self.custom_img_cycle = {}
  122. self.custom_audio_cycle = {}
  123. self.custom_audio_index = {}
  124. self.custom_index = {}
  125. self.custom_opt = {}
  126. self.__loadcustom()
  127. def put_msg_txt(self,msg,datainfo:dict={}):
  128. if self.opt.tts == "none":
  129. # 音频播放模式:激活音频播放器
  130. if self.audio_player and not self.audio_player.is_running:
  131. logger.info("🎵 音频播放模式已激活")
  132. self.audio_player.set_callbacks(
  133. on_audio=self._on_audio_data
  134. )
  135. self.audio_player.start()
  136. # 视频播放模式:不需要处理文本消息
  137. elif self.video_player and not self.video_player.is_running:
  138. logger.info("🎬 视频播放模式已激活")
  139. self.video_player.set_callbacks(
  140. on_frame=self._on_video_frame,
  141. on_audio=self._on_audio_frame
  142. )
  143. self.video_player.start()
  144. else:
  145. # TTS 模式:处理文本消息
  146. self.tts.put_msg_txt(msg,datainfo)
  147. def _on_video_frame(self, frame):
  148. """视频帧回调"""
  149. # 这里可以将帧传递给渲染管道
  150. # 目前先简单处理,后续可以集成到渲染流程
  151. pass
  152. def _on_audio_frame(self, audio):
  153. """音频帧回调(用于视频播放模式)"""
  154. # 这里可以将音频数据传递给音频播放
  155. pass
  156. def _on_audio_data(self, audio_chunk):
  157. """音频数据回调(用于音频播放模式)"""
  158. # 将音频数据帧传递给渲染管道
  159. # audio_chunk 是 16kHz 20ms 的 PCM 数据 (int16)
  160. try:
  161. # 转换为 float32 格式,这是 ASR 需要的格式
  162. if audio_chunk.dtype == np.int16:
  163. audio_float = audio_chunk.astype(np.float32) / 32768.0
  164. else:
  165. audio_float = audio_chunk.astype(np.float32)
  166. # 传递给 put_audio_frame 处理
  167. self.put_audio_frame(audio_float, {})
  168. except Exception as e:
  169. logger.error(f"处理音频数据回调时出错: {e}")
  170. def put_audio_frame(self,audio_chunk,datainfo:dict={}): #16khz 20ms pcm
  171. self.asr.put_audio_frame(audio_chunk,datainfo)
  172. def put_audio_file(self,filebyte,datainfo:dict={}):
  173. input_stream = BytesIO(filebyte)
  174. stream = self.__create_bytes_stream(input_stream)
  175. streamlen = stream.shape[0]
  176. idx=0
  177. while streamlen >= self.chunk: #and self.state==State.RUNNING
  178. self.put_audio_frame(stream[idx:idx+self.chunk],datainfo)
  179. streamlen -= self.chunk
  180. idx += self.chunk
  181. def __create_bytes_stream(self,byte_stream):
  182. #byte_stream=BytesIO(buffer)
  183. stream, sample_rate = sf.read(byte_stream) # [T*sample_rate,] float64
  184. logger.info(f'[INFO]put audio stream {sample_rate}: {stream.shape}')
  185. stream = stream.astype(np.float32)
  186. if stream.ndim > 1:
  187. logger.info(f'[WARN] audio has {stream.shape[1]} channels, only use the first.')
  188. stream = stream[:, 0]
  189. if sample_rate != self.sample_rate and stream.shape[0]>0:
  190. logger.info(f'[WARN] audio sample rate is {sample_rate}, resampling into {self.sample_rate}.')
  191. stream = resampy.resample(x=stream, sr_orig=sample_rate, sr_new=self.sample_rate)
  192. return stream
  193. def stop_tts(self):
  194. """停止当前TTS播放"""
  195. with self.interrupt_lock: # 添加锁保护
  196. if hasattr(self.tts, 'state'):
  197. self.tts.state = State.PAUSE
  198. # 设置打断标志,强制中断当前播放
  199. if hasattr(self.tts, 'set_interrupt_flag'):
  200. self.tts.set_interrupt_flag()
  201. # 清空TTS内部的普通队列,但保留高优先级队列(用于用户问题)
  202. if hasattr(self.tts, 'msgqueue'):
  203. with self.tts.msgqueue.mutex:
  204. self.tts.msgqueue.queue.clear()
  205. # 注意:不清空高优先级队列,以确保用户问题能够被处理
  206. # 清空输入音频流缓冲区
  207. if hasattr(self.tts, 'input_stream'):
  208. if hasattr(self.tts.input_stream, 'seek') and hasattr(self.tts.input_stream, 'truncate'):
  209. self.tts.input_stream.seek(0)
  210. self.tts.input_stream.truncate()
  211. # 在锁保护下更新状态
  212. self.is_speaking_flag = False
  213. self.current_text = ""
  214. self.current_pos = 0
  215. def flush_talk(self):
  216. """打断当前播放并保存未完成的内容"""
  217. with self.interrupt_lock:
  218. # 1. 保存当前正在播、但没播完的文本(关键)
  219. if self.current_text and self.current_pos < len(self.current_text):
  220. unfinished = self.current_text[self.current_pos:]
  221. if unfinished.strip():
  222. try:
  223. self.interrupted_queue.put_nowait(unfinished)
  224. except queue.Full:
  225. logger.warning("中断队列已满,丢弃未完成的文本")
  226. # 2. 把 msg_queue 里剩余的全部移到 interrupted_queue(不丢弃)
  227. while not self.msg_queue.empty():
  228. try:
  229. self.interrupted_queue.put_nowait(self.msg_queue.get_nowait())
  230. except queue.Full:
  231. logger.warning("中断队列已满,丢弃剩余消息")
  232. break
  233. # 3.停止当前TTS播放
  234. # 注意:stop_tts内部已经有锁,这里需要临时释放锁避免死锁
  235. pass
  236. # 4. 在锁保护下清空当前播放状态
  237. self.is_speaking_flag = False
  238. self.current_text = ""
  239. self.current_pos = 0
  240. # 5. 不要重置打断标志,保持中断状态直到新内容开始播放
  241. # 在锁外调用stop_tts避免死锁
  242. self.stop_tts()
  243. # self.tts.flush_talk()
  244. # self.asr.flush_talk()
  245. def handle_interruption_during_intro(self, user_question, datainfo:dict={}):
  246. """处理在介绍过程中用户提问的逻辑,暂停介绍并优先回答问题"""
  247. with self.interrupt_lock:
  248. # 1. 保存当前正在播放的介绍内容到中断队列
  249. if self.current_text and self.current_pos < len(self.current_text):
  250. unfinished_intro = self.current_text[self.current_pos:]
  251. if unfinished_intro.strip():
  252. self.interrupted_queue.put(unfinished_intro)
  253. logger.info(f"保存未完成的介绍内容: {unfinished_intro[:50]}...")
  254. # 2. 将消息队列中剩余的介绍内容也保存到中断队列
  255. remaining_items = []
  256. while not self.msg_queue.empty():
  257. remaining_items.append(self.msg_queue.get())
  258. if remaining_items:
  259. for item in remaining_items:
  260. self.interrupted_queue.put(item)
  261. logger.info(f"保存了 {len(remaining_items)} 个剩余的介绍内容到中断队列")
  262. # 3. 停止TTS播放(注意:这会将TTS状态设为PAUSE,但我们随后会处理用户问题)
  263. self.stop_tts()
  264. # 4. 清空当前播放状态
  265. self.is_speaking_flag = False
  266. self.current_text = ""
  267. self.current_pos = 0
  268. # 5. 确保TTS状态设置为RUNNING以处理用户问题
  269. if hasattr(self.tts, 'state'):
  270. self.tts.state = State.RUNNING
  271. # 6. 重置打断标志,允许播放新内容
  272. if hasattr(self.tts, 'reset_interrupt_flag'):
  273. self.tts.reset_interrupt_flag()
  274. # 7. 优先播放用户问题
  275. self.tts.put_high_priority_msg(user_question, datainfo)
  276. # 8. 立即尝试处理高优先级队列中的消息
  277. # 通过直接调用TTS处理方法,绕过正常的队列处理延迟
  278. logger.info("已暂停介绍并开始播放用户问题")
  279. # 9. 唤醒TTS处理线程,确保立即处理高优先级消息
  280. # 通过向普通队列添加一个空消息来触发处理循环
  281. if hasattr(self.tts, 'msgqueue'):
  282. self.tts.msgqueue.put(("", {}))
  283. def put_user_question(self, msg, datainfo:dict={}):
  284. """专门用于处理用户问题,优先级高于普通消息"""
  285. with self.interrupt_lock:
  286. # 强制中断当前播放
  287. self.flush_talk()
  288. # 直接播放用户问题(使用高优先级队列)
  289. self.tts.put_high_priority_msg(msg, datainfo)
  290. # 确保TTS状态设置为RUNNING以处理用户问题
  291. if hasattr(self.tts, 'state'):
  292. self.tts.state = State.RUNNING
  293. # 重置打断标志,允许播放新内容
  294. if hasattr(self.tts, 'reset_interrupt_flag'):
  295. self.tts.reset_interrupt_flag()
  296. # 唤醒TTS处理线程,确保立即处理高优先级消息
  297. # 通过向普通队列添加一个空消息来触发处理循环
  298. if hasattr(self.tts, 'msgqueue'):
  299. self.tts.msgqueue.put(("", {}))
  300. def process_user_questions_and_resume(self):
  301. """处理用户问题并恢复之前的内容"""
  302. with self.interrupt_lock:
  303. # 检查是否有用户问题需要处理
  304. while not self.user_question_queue.empty():
  305. msg, datainfo = self.user_question_queue.get()
  306. # 直接播放用户问题
  307. self.tts.put_msg_txt(msg, datainfo)
  308. # 检查是否有被中断的内容需要恢复
  309. if not self.interrupted_queue.empty():
  310. # 将被中断的内容放回主队列
  311. while not self.interrupted_queue.empty():
  312. msg = self.interrupted_queue.get()
  313. self.msg_queue.put(msg)
  314. def resume_interrupted(self):
  315. """把 interrupted_queue 里的内容放回播放队列"""
  316. with self.interrupt_lock:
  317. resumed = False
  318. items_to_resume = []
  319. # 先把所有中断的内容取出
  320. while not self.interrupted_queue.empty():
  321. items_to_resume.append(self.interrupted_queue.get())
  322. resumed = True
  323. # 再按顺序放回主队列
  324. for item in items_to_resume:
  325. self.msg_queue.put(item)
  326. return resumed
  327. # """恢复播放被中断的消息"""
  328. # return self.tts.resume_interrupted()
  329. def start_intro_with_interrupt_capability(self, intro_text, datainfo:dict={}):
  330. """开始播放介绍内容,同时保持接收用户提问的能力"""
  331. # 将介绍内容放入主消息队列
  332. self.put_msg_txt(intro_text, datainfo)
  333. def is_speaking(self)->bool:
  334. return self.speaking
  335. def __loadcustom(self):
  336. for item in self.opt.customopt:
  337. logger.info(item)
  338. input_img_list = glob.glob(os.path.join(item['imgpath'], '*.[jpJP][pnPN]*[gG]'))
  339. input_img_list = sorted(input_img_list, key=lambda x: int(os.path.splitext(os.path.basename(x))[0]))
  340. self.custom_img_cycle[item['audiotype']] = read_imgs(input_img_list)
  341. self.custom_audio_cycle[item['audiotype']], sample_rate = sf.read(item['audiopath'], dtype='float32')
  342. self.custom_audio_index[item['audiotype']] = 0
  343. self.custom_index[item['audiotype']] = 0
  344. self.custom_opt[item['audiotype']] = item
  345. def init_customindex(self):
  346. self.curr_state=0
  347. for key in self.custom_audio_index:
  348. self.custom_audio_index[key]=0
  349. for key in self.custom_index:
  350. self.custom_index[key]=0
  351. def notify(self,eventpoint):
  352. logger.info("notify:%s",eventpoint)
  353. # 检查是否是用户问题回答的结束事件
  354. if isinstance(eventpoint, dict) and eventpoint.get('status') == 'end':
  355. # 如果是用户问题的回答结束,检查是否需要恢复被中断的内容
  356. # 检查这个结束事件是否与用户问题相关
  357. if 'knowledge_base' in eventpoint or self.interrupted_queue.qsize() > 0:
  358. # 这是一个用户问题的回答结束,检查是否需要恢复被中断的内容
  359. import threading
  360. # 使用线程延时一小段时间再恢复,确保当前事件处理完成
  361. timer = threading.Timer(0.5, self._try_resume_after_qa)
  362. timer.start()
  363. # 注意:介绍播放的自动续播不再通过notify触发,而是在TTS处理完成后自动触发
  364. # 这样可以确保前一条完全播放完成后再播放下一条
  365. def _try_resume_after_qa(self):
  366. """尝试恢复问答后的内容"""
  367. with self.interrupt_lock:
  368. # 检查是否还有被中断的内容需要恢复
  369. if not self.interrupted_queue.empty():
  370. logger.info("检测到问答完成,恢复被中断的内容...")
  371. # 有被中断的内容需要恢复,调用恢复方法
  372. self.resume_interrupted()
  373. def _continue_intro_play(self):
  374. """继续播放下一条介绍内容"""
  375. try:
  376. # 检查是否处于介绍播放状态
  377. if not (hasattr(self, 'intro_play_state') and
  378. self.intro_play_state.get('is_playing', False) and
  379. not self.intro_play_state.get('is_paused', False)):
  380. logger.info("不处于介绍播放状态,跳过自动续播")
  381. return
  382. # 检查TTS队列是否为空,如果不为空说明还有消息在等待,不放入新消息
  383. if hasattr(self, 'tts') and hasattr(self.tts, 'msgqueue'):
  384. if not self.tts.msgqueue.empty():
  385. logger.info("TTS消息队列不为空,等待队列清空后再播放下一条")
  386. # 设置等待标志,并设置定时器再次检查
  387. self.intro_play_state['is_waiting_next'] = True
  388. import threading
  389. timer = threading.Timer(2.0, self._continue_intro_play)
  390. timer.start()
  391. return
  392. # 设置等待标志,防止重复触发
  393. if hasattr(self, 'intro_play_state'):
  394. self.intro_play_state['is_waiting_next'] = True
  395. if hasattr(self, 'knowledge_intro_instance') and self.knowledge_intro_instance:
  396. # 获取下一条介绍内容
  397. next_content = self.knowledge_intro_instance._get_next_content()
  398. if next_content and next_content.get('text'):
  399. logger.info(f"自动续播介绍内容 - 序号:{next_content.get('play_index', 1)}/{next_content.get('total_count', 1)}")
  400. # 更新播放状态
  401. if hasattr(self, 'intro_play_state'):
  402. self.intro_play_state["last_played_index"] = next_content.get("play_index", 1)
  403. # 计算停顿时间:根据文本长度估算播放时间 + 额外停顿
  404. # 语速约 3 字/秒(较慢的播报速度)
  405. text = next_content.get('text', '')
  406. estimated_play_time = len(text) / 3 # 估算播放时间(秒)
  407. pause_time = estimated_play_time + 3 # 播放时间 + 3秒停顿
  408. logger.info(f"当前文案长度:{len(text)}字,估算播放时间:{estimated_play_time:.1f}秒,播放下一条需等待:{pause_time:.1f}秒")
  409. # 播放下一条内容
  410. self.put_msg_txt(next_content['text'])
  411. # 设置定时器,在播放完成后继续播放下一条
  412. import threading
  413. timer = threading.Timer(pause_time, self._continue_intro_play)
  414. timer.start()
  415. else:
  416. # 没有更多内容了,标记播放完成
  417. logger.info("介绍内容全部播放完成")
  418. if hasattr(self, 'intro_play_state'):
  419. self.intro_play_state["is_playing"] = False
  420. self.intro_play_state['is_waiting_next'] = False
  421. except Exception as e:
  422. logger.error(f"自动续播介绍内容时出错: {e}")
  423. # 确保错误时也重置等待标志
  424. if hasattr(self, 'intro_play_state'):
  425. self.intro_play_state['is_waiting_next'] = False
  426. def _reset_waiting_flag(self):
  427. """重置等待标志,允许播放下一条"""
  428. if hasattr(self, 'intro_play_state'):
  429. self.intro_play_state['is_waiting_next'] = False
  430. logger.info("等待时间结束,可以播放下一条")
  431. def start_recording(self):
  432. """开始录制视频"""
  433. if self.recording:
  434. return
  435. command = ['ffmpeg',
  436. '-y', '-an',
  437. '-f', 'rawvideo',
  438. '-vcodec','rawvideo',
  439. '-pix_fmt', 'bgr24', #像素格式
  440. '-s', "{}x{}".format(self.width, self.height),
  441. '-r', str(25),
  442. '-i', '-',
  443. '-pix_fmt', 'yuv420p',
  444. '-vcodec', "h264",
  445. #'-f' , 'flv',
  446. f'temp{self.opt.sessionid}.mp4']
  447. self._record_video_pipe = subprocess.Popen(command, shell=False, stdin=subprocess.PIPE)
  448. acommand = ['ffmpeg',
  449. '-y', '-vn',
  450. '-f', 's16le',
  451. #'-acodec','pcm_s16le',
  452. '-ac', '1',
  453. '-ar', '16000',
  454. '-i', '-',
  455. '-acodec', 'aac',
  456. #'-f' , 'wav',
  457. f'temp{self.opt.sessionid}.aac']
  458. self._record_audio_pipe = subprocess.Popen(acommand, shell=False, stdin=subprocess.PIPE)
  459. self.recording = True
  460. # self.recordq_video.queue.clear()
  461. # self.recordq_audio.queue.clear()
  462. # self.container = av.open(path, mode="w")
  463. # process_thread = Thread(target=self.record_frame, args=())
  464. # process_thread.start()
  465. def record_video_data(self,image):
  466. if self.width == 0:
  467. logger.debug("image.shape: %s", image.shape)
  468. self.height,self.width,_ = image.shape
  469. if self.recording:
  470. self._record_video_pipe.stdin.write(image.tostring())
  471. def record_audio_data(self,frame):
  472. if self.recording:
  473. self._record_audio_pipe.stdin.write(frame.tostring())
  474. def start_rtmp_push(self):
  475. """启动 RTMP 推流(单进程 FFmpeg,视频+音频共用一套时钟)"""
  476. if not self._rtmp_push_url or self._rtmp_pushing:
  477. return
  478. if self.width == 0 or self.height == 0:
  479. logger.warning('RTMP 推流: 视频尺寸未初始化,等待第一帧')
  480. return
  481. self._rtmp_pushing = True
  482. # ffmpeg 命令:直接通过 stdin 管道接收视频和音频
  483. # 使用 pipe:0 接收视频,pipe:1 接收音频(需要两个独立的 FFmpeg 进程或使用 filter_complex)
  484. # 但更简单的方式是:使用一个 FFmpeg 进程,视频通过 stdin,音频通过第二个管道
  485. ffmpeg_cmd = [
  486. 'ffmpeg',
  487. '-y',
  488. # 视频输入:通过管道
  489. '-f', 'rawvideo', '-vcodec', 'rawvideo', '-pix_fmt', 'bgr24',
  490. '-s', f'{self.width}x{self.height}', '-r', '25',
  491. '-i', 'pipe:0', # 从 stdin 读取视频
  492. # 音频输入:通过第二个管道
  493. '-f', 's16le', '-acodec', 'pcm_s16le', '-ar', '16000', '-ac', '1',
  494. '-i', 'pipe:1', # 从 fd 1 读取音频(需要特殊处理)
  495. # 编码输出
  496. '-c:v', 'libx264', '-preset', 'ultrafast', '-tune', 'zerolatency',
  497. '-pix_fmt', 'yuv420p', '-g', '50', '-keyint_min', '25',
  498. '-c:a', 'aac', '-b:a', '128k', '-ar', '44100',
  499. '-f', 'flv', self._rtmp_push_url
  500. ]
  501. # 注意:FFmpeg 不支持同时从两个 pipe 读取,需要使用 FIFO
  502. # 所以我们保留 FIFO 方案,但去掉手动速率控制,让 FFmpeg 自动处理
  503. # 创建 FIFO
  504. import tempfile
  505. self._rtmp_fifo_dir = tempfile.mkdtemp(prefix='rtmp_fifo_')
  506. self._rtmp_video_fifo = os.path.join(self._rtmp_fifo_dir, 'video')
  507. self._rtmp_audio_fifo = os.path.join(self._rtmp_fifo_dir, 'audio')
  508. os.mkfifo(self._rtmp_video_fifo)
  509. os.mkfifo(self._rtmp_audio_fifo)
  510. # 视频和音频缓冲队列
  511. self._rtmp_video_queue = queue.Queue(maxsize=30)
  512. self._rtmp_audio_queue = queue.Queue(maxsize=100)
  513. # 重写 ffmpeg 命令使用 FIFO
  514. # 关键:不使用 -re(会阻塞 FIFO),改用 -vf realtime/-af arealtime 对输出限速
  515. # 再配合 -fflags +genpts 与 -vsync cfr 保证恒定 25fps,避免下游转推看到 2x 速度
  516. ffmpeg_cmd = [
  517. 'ffmpeg',
  518. '-y',
  519. '-fflags', '+genpts',
  520. # 视频输入(不使用 -re,让 FIFO 自然反压控制速率)
  521. '-f', 'rawvideo', '-vcodec', 'rawvideo', '-pix_fmt', 'bgr24',
  522. '-s', f'{self.width}x{self.height}', '-r', '25',
  523. '-thread_queue_size', '1024',
  524. '-i', self._rtmp_video_fifo,
  525. # 音频输入(16kHz 单声道 PCM)
  526. '-f', 's16le', '-acodec', 'pcm_s16le', '-ar', '16000', '-ac', '1',
  527. '-thread_queue_size', '1024',
  528. '-i', self._rtmp_audio_fifo,
  529. # 滤镜:按墙钟实时限速,视频+音频同步限速到 1x,避免 SRS 转推出现 speed=2x
  530. '-vf', 'realtime',
  531. '-af', 'arealtime,aresample=async=1:first_pts=0',
  532. # 编码输出(音频保持 16kHz 不重采样,避免时间戳错乱)
  533. '-c:v', 'libx264', '-preset', 'ultrafast', '-tune', 'zerolatency',
  534. '-pix_fmt', 'yuv420p', '-g', '50', '-keyint_min', '25',
  535. '-vsync', 'cfr', '-r', '25', # 强制输出恒定 25fps
  536. '-c:a', 'aac', '-b:a', '128k', '-ar', '16000', # 保持 16kHz!
  537. '-f', 'flv', self._rtmp_push_url
  538. ]
  539. try:
  540. # 启动 ffmpeg 进程
  541. self._rtmp_pipe = subprocess.Popen(
  542. ffmpeg_cmd, shell=False,
  543. stdout=subprocess.DEVNULL, stderr=subprocess.PIPE
  544. )
  545. logger.info(f'RTMP ffmpeg 进程已启动: PID={self._rtmp_pipe.pid}')
  546. # 视频 FIFO 写入线程(无速率控制,FFmpeg 自动处理)
  547. def video_fifo_writer():
  548. try:
  549. vfd = os.open(self._rtmp_video_fifo, os.O_WRONLY)
  550. logger.info('RTMP 视频 FIFO 已打开')
  551. while self._rtmp_pushing:
  552. try:
  553. data = self._rtmp_video_queue.get(timeout=1.0)
  554. if data is None:
  555. break
  556. os.write(vfd, data)
  557. except queue.Empty:
  558. continue
  559. except Exception as e:
  560. logger.error(f'RTMP 视频写入线程异常: {e}')
  561. self._rtmp_pushing = False
  562. finally:
  563. try: os.close(vfd)
  564. except: pass
  565. # 音频 FIFO 写入线程(无速率控制,FFmpeg 自动处理)
  566. def audio_fifo_writer():
  567. try:
  568. afd = os.open(self._rtmp_audio_fifo, os.O_WRONLY)
  569. logger.info('RTMP 音频 FIFO 已打开')
  570. # 静音帧:匹配一个视频帧(40ms)的 PCM 数据量
  571. # 16kHz * 0.04s * 2字节(单声道 s16le) = 1280 字节
  572. silence_frame = b'\x00' * 1280 # 40ms 静音(严格匹配视频帧率,避免音频过速)
  573. while self._rtmp_pushing:
  574. try:
  575. data = self._rtmp_audio_queue.get(timeout=0.04)
  576. if data is None:
  577. break
  578. os.write(afd, data)
  579. except queue.Empty:
  580. # 队列为空(TTS未说话),写入一帧 40ms 静音保持ffmpeg不卡顿
  581. try:
  582. os.write(afd, silence_frame)
  583. except:
  584. break
  585. except Exception as e:
  586. logger.error(f'RTMP 音频写入线程异常: {e}')
  587. self._rtmp_pushing = False
  588. finally:
  589. try: os.close(afd)
  590. except: pass
  591. self._rtmp_video_thread = Thread(target=video_fifo_writer, daemon=True, name='rtmp_video_writer')
  592. self._rtmp_audio_thread = Thread(target=audio_fifo_writer, daemon=True, name='rtmp_audio_writer')
  593. self._rtmp_video_thread.start()
  594. self._rtmp_audio_thread.start()
  595. # stderr 读取线程
  596. def read_stderr():
  597. for line in iter(self._rtmp_pipe.stderr.readline, b''):
  598. msg = line.decode('utf-8', errors='ignore').strip()
  599. if msg:
  600. logger.debug(f'FFmpeg RTMP: {msg}')
  601. Thread(target=read_stderr, daemon=True, name='rtmp_stderr_reader').start()
  602. logger.info(f'RTMP 音视频推流已启动: {self._rtmp_push_url}')
  603. except Exception as e:
  604. logger.error(f'RTMP 推流启动失败: {e}')
  605. self._rtmp_pushing = False
  606. self._cleanup_rtmp_fifos()
  607. def rtmp_push_video_data(self, image):
  608. """将视频帧放入 RTMP 推流队列"""
  609. if not self._rtmp_pushing:
  610. return
  611. try:
  612. self._rtmp_video_queue.put_nowait(image.tobytes())
  613. except queue.Full:
  614. pass # 丢弃旧帧,避免延迟累积
  615. def rtmp_push_audio_data(self, frame):
  616. """将音频帧放入 RTMP 推流队列"""
  617. if not self._rtmp_pushing:
  618. return
  619. try:
  620. self._rtmp_audio_queue.put_nowait(frame.tobytes())
  621. except queue.Full:
  622. pass # 丢弃旧帧,避免延迟累积
  623. def stop_rtmp_push(self):
  624. """停止 RTMP 推流"""
  625. if not self._rtmp_pushing:
  626. return
  627. self._rtmp_pushing = False
  628. # 发送停止信号给写入线程
  629. try:
  630. self._rtmp_video_queue.put(None, timeout=1)
  631. except: pass
  632. try:
  633. self._rtmp_audio_queue.put(None, timeout=1)
  634. except: pass
  635. # 等待写入线程结束
  636. for t in [getattr(self, '_rtmp_video_thread', None), getattr(self, '_rtmp_audio_thread', None)]:
  637. if t and t.is_alive():
  638. t.join(timeout=3)
  639. # 终止 ffmpeg
  640. if self._rtmp_pipe is not None:
  641. try:
  642. self._rtmp_pipe.wait(timeout=5)
  643. except:
  644. self._rtmp_pipe.kill()
  645. self._cleanup_rtmp_fifos()
  646. logger.info('RTMP 推流已停止')
  647. def _cleanup_rtmp_fifos(self):
  648. """清理 FIFO 文件和目录"""
  649. for path in [getattr(self, '_rtmp_video_fifo', ''), getattr(self, '_rtmp_audio_fifo', '')]:
  650. if path and os.path.exists(path):
  651. try: os.unlink(path)
  652. except: pass
  653. fifo_dir = getattr(self, '_rtmp_fifo_dir', '')
  654. if fifo_dir and os.path.isdir(fifo_dir):
  655. try: os.rmdir(fifo_dir)
  656. except: pass
  657. # def record_frame(self):
  658. # videostream = self.container.add_stream("libx264", rate=25)
  659. # videostream.codec_context.time_base = Fraction(1, 25)
  660. # audiostream = self.container.add_stream("aac")
  661. # audiostream.codec_context.time_base = Fraction(1, 16000)
  662. # init = True
  663. # framenum = 0
  664. # while self.recording:
  665. # try:
  666. # videoframe = self.recordq_video.get(block=True, timeout=1)
  667. # videoframe.pts = framenum #int(round(framenum*0.04 / videostream.codec_context.time_base))
  668. # videoframe.dts = videoframe.pts
  669. # if init:
  670. # videostream.width = videoframe.width
  671. # videostream.height = videoframe.height
  672. # init = False
  673. # for packet in videostream.encode(videoframe):
  674. # self.container.mux(packet)
  675. # for k in range(2):
  676. # audioframe = self.recordq_audio.get(block=True, timeout=1)
  677. # audioframe.pts = int(round((framenum*2+k)*0.02 / audiostream.codec_context.time_base))
  678. # audioframe.dts = audioframe.pts
  679. # for packet in audiostream.encode(audioframe):
  680. # self.container.mux(packet)
  681. # framenum += 1
  682. # except queue.Empty:
  683. # print('record queue empty,')
  684. # continue
  685. # except Exception as e:
  686. # print(e)
  687. # #break
  688. # for packet in videostream.encode(None):
  689. # self.container.mux(packet)
  690. # for packet in audiostream.encode(None):
  691. # self.container.mux(packet)
  692. # self.container.close()
  693. # self.recordq_video.queue.clear()
  694. # self.recordq_audio.queue.clear()
  695. # print('record thread stop')
  696. def stop_recording(self):
  697. """停止录制视频"""
  698. if not self.recording:
  699. return
  700. self.recording = False
  701. self._record_video_pipe.stdin.close() #wait()
  702. self._record_video_pipe.wait()
  703. self._record_audio_pipe.stdin.close()
  704. self._record_audio_pipe.wait()
  705. cmd_combine_audio = f"ffmpeg -y -i temp{self.opt.sessionid}.aac -i temp{self.opt.sessionid}.mp4 -c:v copy -c:a copy data/record.mp4"
  706. os.system(cmd_combine_audio)
  707. #os.remove(output_path)
  708. def mirror_index(self,size, index):
  709. #size = len(self.coord_list_cycle)
  710. turn = index // size
  711. res = index % size
  712. if turn % 2 == 0:
  713. return res
  714. else:
  715. return size - res - 1
  716. def get_audio_stream(self,audiotype):
  717. idx = self.custom_audio_index[audiotype]
  718. stream = self.custom_audio_cycle[audiotype][idx:idx+self.chunk]
  719. self.custom_audio_index[audiotype] += self.chunk
  720. if self.custom_audio_index[audiotype]>=self.custom_audio_cycle[audiotype].shape[0]:
  721. self.curr_state = 1 #当前视频不循环播放,切换到静音状态
  722. return stream
  723. def set_custom_state(self,audiotype, reinit=True):
  724. logger.debug('set_custom_state: %s', audiotype)
  725. if self.custom_audio_index.get(audiotype) is None:
  726. return
  727. self.curr_state = audiotype
  728. if reinit:
  729. self.custom_audio_index[audiotype] = 0
  730. self.custom_index[audiotype] = 0
  731. def process_frames(self,quit_event,loop=None,audio_track=None,video_track=None):
  732. enable_transition = False # 设置为False禁用过渡效果,True启用
  733. if enable_transition:
  734. _last_speaking = False
  735. _transition_start = time.time()
  736. _transition_duration = 0.1 # 过渡时间
  737. _last_silent_frame = None # 静音帧缓存
  738. _last_speaking_frame = None # 说话帧缓存
  739. if self.opt.transport=='virtualcam':
  740. import pyvirtualcam
  741. vircam = None
  742. audio_tmp = queue.Queue(maxsize=3000)
  743. audio_thread = Thread(target=play_audio, args=(quit_event,audio_tmp,), daemon=True, name="pyaudio_stream")
  744. audio_thread.start()
  745. # RTMP 推流: 延迟启动,等待第一帧确定视频尺寸
  746. _rtmp_started = False
  747. _rtmp_attempted = False # 记录是否已经尝试过启动推流
  748. while not quit_event.is_set():
  749. try:
  750. res_frame,idx,audio_frames = self.res_frame_queue.get(block=True, timeout=1)
  751. except queue.Empty:
  752. continue
  753. if enable_transition:
  754. # 检测状态变化
  755. current_speaking = not (audio_frames[0][1]!=0 and audio_frames[1][1]!=0)
  756. if current_speaking != _last_speaking:
  757. logger.info(f"状态切换:{'说话' if _last_speaking else '静音'} → {'说话' if current_speaking else '静音'}")
  758. _transition_start = time.time()
  759. _last_speaking = current_speaking
  760. if audio_frames[0][1]!=0 and audio_frames[1][1]!=0: #全为静音数据,只需要取fullimg
  761. self.speaking = False
  762. audiotype = audio_frames[0][1]
  763. if self.custom_index.get(audiotype) is not None: #有自定义视频
  764. mirindex = self.mirror_index(len(self.custom_img_cycle[audiotype]),self.custom_index[audiotype])
  765. target_frame = self.custom_img_cycle[audiotype][mirindex]
  766. self.custom_index[audiotype] += 1
  767. else:
  768. target_frame = self.frame_list_cycle[idx]
  769. if enable_transition:
  770. # 说话→静音过渡
  771. if time.time() - _transition_start < _transition_duration and _last_speaking_frame is not None:
  772. alpha = min(1.0, (time.time() - _transition_start) / _transition_duration)
  773. combine_frame = cv2.addWeighted(_last_speaking_frame, 1-alpha, target_frame, alpha, 0)
  774. else:
  775. combine_frame = target_frame
  776. # 缓存静音帧
  777. _last_silent_frame = combine_frame.copy()
  778. else:
  779. combine_frame = target_frame
  780. else:
  781. self.speaking = True
  782. try:
  783. current_frame = self.paste_back_frame(res_frame,idx)
  784. except Exception as e:
  785. logger.warning(f"paste_back_frame error: {e}")
  786. continue
  787. if enable_transition:
  788. # 静音→说话过渡
  789. if time.time() - _transition_start < _transition_duration and _last_silent_frame is not None:
  790. alpha = min(1.0, (time.time() - _transition_start) / _transition_duration)
  791. combine_frame = cv2.addWeighted(_last_silent_frame, 1-alpha, current_frame, alpha, 0)
  792. else:
  793. combine_frame = current_frame
  794. # 缓存说话帧
  795. _last_speaking_frame = combine_frame.copy()
  796. else:
  797. combine_frame = current_frame
  798. #cv2.putText(combine_frame, "LiveTalking", (10, 20), cv2.FONT_HERSHEY_SIMPLEX, 0.3, (128,128,128), 1)
  799. if self.opt.transport=='virtualcam':
  800. if vircam==None:
  801. height, width,_= combine_frame.shape
  802. vircam = pyvirtualcam.Camera(width=width, height=height, fps=25, fmt=pyvirtualcam.PixelFormat.BGR,print_fps=True)
  803. vircam.send(combine_frame)
  804. elif video_track is not None and loop is not None: #webrtc
  805. image = combine_frame
  806. new_frame = VideoFrame.from_ndarray(image, format="bgr24")
  807. asyncio.run_coroutine_threadsafe(video_track._queue.put((new_frame,None)), loop)
  808. self.record_video_data(combine_frame)
  809. # RTMP 推流: 视频帧
  810. if self._rtmp_push_url:
  811. if not _rtmp_started and not _rtmp_attempted:
  812. # 首次获得视频帧,启动 RTMP 推流
  813. if self.width > 0 and self.height > 0:
  814. self.start_rtmp_push()
  815. _rtmp_attempted = True # 标记已尝试
  816. if self._rtmp_pushing: # 启动成功
  817. _rtmp_started = True
  818. if _rtmp_started: # 只有启动成功后才推流
  819. self.rtmp_push_video_data(combine_frame)
  820. for audio_frame in audio_frames:
  821. frame,type,eventpoint = audio_frame
  822. frame = (frame * 32767).astype(np.int16)
  823. if self.opt.transport=='virtualcam':
  824. audio_tmp.put(frame.tobytes()) #TODO
  825. elif audio_track is not None and loop is not None: #webrtc
  826. new_frame = AudioFrame(format='s16', layout='mono', samples=frame.shape[0])
  827. new_frame.planes[0].update(frame.tobytes())
  828. new_frame.sample_rate=16000
  829. asyncio.run_coroutine_threadsafe(audio_track._queue.put((new_frame,eventpoint)), loop)
  830. self.record_audio_data(frame)
  831. # RTMP 推流: 音频帧
  832. if _rtmp_started:
  833. self.rtmp_push_audio_data(frame)
  834. if self.opt.transport=='virtualcam':
  835. vircam.sleep_until_next_frame()
  836. # 停止 RTMP 推流
  837. self.stop_rtmp_push()
  838. if self.opt.transport=='virtualcam':
  839. audio_thread.join()
  840. vircam.close()
  841. logger.info('basereal process_frames thread stop')
  842. # def process_custom(self,audiotype:int,idx:int):
  843. # if self.curr_state!=audiotype: #从推理切到口播
  844. # if idx in self.switch_pos: #在卡点位置可以切换
  845. # self.curr_state=audiotype
  846. # self.custom_index=0
  847. # else:
  848. # self.custom_index+=1