import math import torch import numpy as np import subprocess import os import time import cv2 import glob import resampy import queue from queue import Queue from threading import Thread, Event from io import BytesIO import soundfile as sf import asyncio from av import AudioFrame, VideoFrame import av from fractions import Fraction from ttsreal import EdgeTTS,SovitsTTS,XTTS,CosyVoiceTTS,FishTTS,TencentTTS,DoubaoTTS,IndexTTS2,AzureTTS,State from logger import logger from videoplayer import RealtimeVideoPlayer from audioplayer import RealtimeAudioPlayer from tqdm import tqdm from threading import Lock def read_imgs(img_list): frames = [] logger.info('reading images...') for img_path in tqdm(img_list): frame = cv2.imread(img_path) frames.append(frame) return frames def play_audio(quit_event,queue): import pyaudio p = pyaudio.PyAudio() stream = p.open( rate=16000, channels=1, format=8, output=True, output_device_index=1, ) stream.start_stream() # while queue.qsize() <= 0: # time.sleep(0.1) while not quit_event.is_set(): stream.write(queue.get(block=True)) stream.close() class BaseReal: def __init__(self, opt,model,avatar): #原本的播放序列 (添加大小限制防止内存泄漏) self.msg_queue = queue.Queue(maxsize=1000) # 最多1000条消息 self.interrupted_queue = queue.Queue(maxsize=500) # 最多500条中断消息 self.user_question_queue = queue.Queue(maxsize=100) # 最多100个用户问题 self.is_speaking_flag = False self.current_text = "" # 当前正在播放的文本 self.current_pos = 0 # 当前正在播放的文本位置 self.interrupt_lock = Lock() #线程安全锁 self.opt = opt self.sample_rate = 16000 self.chunk = self.sample_rate // opt.fps # 320 samples per chunk (20ms * 16000 / 1000) self.sessionid = self.opt.sessionid if opt.tts == "edgetts": self.tts = EdgeTTS(opt,self) elif opt.tts == "qwen3tts": from qwen3tts import Qwen3TTS self.tts = Qwen3TTS(opt,self) elif opt.tts == "gpt-sovits": self.tts = SovitsTTS(opt,self) elif opt.tts == "xtts": self.tts = XTTS(opt,self) elif opt.tts == "cosyvoice": self.tts = CosyVoiceTTS(opt,self) elif opt.tts == "fishtts": self.tts = FishTTS(opt,self) elif opt.tts == "tencent": self.tts = TencentTTS(opt,self) elif opt.tts == "doubao": self.tts = DoubaoTTS(opt,self) elif opt.tts == "indextts2": self.tts = IndexTTS2(opt,self) elif opt.tts == "azuretts": self.tts = AzureTTS(opt,self) elif opt.tts == "voxcpm2": from ttsreal import VoxCPM2TTS self.tts = VoxCPM2TTS(opt,self) elif opt.tts == "voxcpm2api": from voxcpm2_api_tts import VoxCPM2APITTS self.tts = VoxCPM2APITTS(opt,self) elif opt.tts == "none": # 不使用 TTS,使用音频播放模式 self.tts = None self.video_player = None self.audio_player = None # 检查是否指定视频播放目录 if hasattr(opt, 'VIDEO_PLAYBACK_DIR') and opt.VIDEO_PLAYBACK_DIR: self.video_player = RealtimeVideoPlayer(opt.VIDEO_PLAYBACK_DIR) logger.info(f"🎬 视频播放模式已启用,监听目录: {opt.VIDEO_PLAYBACK_DIR}") # 检查是否指定音频播放目录 elif hasattr(opt, 'AUDIO_PLAYBACK_DIR') and opt.AUDIO_PLAYBACK_DIR: self.audio_player = RealtimeAudioPlayer(opt.AUDIO_PLAYBACK_DIR, sample_rate=16000) logger.info(f"🎵 音频播放模式已启用,监听目录: {opt.AUDIO_PLAYBACK_DIR}") else: logger.warning("⚠️ 未指定 VIDEO_PLAYBACK_DIR 或 AUDIO_PLAYBACK_DIR,播放模式不可用") else: # 默认使用 edgetts logger.warning(f"未知的 TTS 类型: {opt.tts},使用 edgetts 作为默认") self.tts = EdgeTTS(opt,self) self.speaking = False self.recording = False self._record_video_pipe = None self._record_audio_pipe = None self.width = self.height = 0 # RTMP 推流支持 self._rtmp_pushing = False self._rtmp_video_pipe = None self._rtmp_audio_pipe = None self._rtmp_push_url = getattr(opt, 'push_url', '') if hasattr(opt, 'push_url') and opt.transport == 'rtcpush' and getattr(opt, 'push_url', '').startswith('rtmp://') else '' if self._rtmp_push_url: logger.info(f'RTMP 推流已配置: {self._rtmp_push_url}') self.curr_state=0 self.custom_img_cycle = {} self.custom_audio_cycle = {} self.custom_audio_index = {} self.custom_index = {} self.custom_opt = {} self.__loadcustom() def put_msg_txt(self,msg,datainfo:dict={}): if self.opt.tts == "none": # 音频播放模式:激活音频播放器 if self.audio_player and not self.audio_player.is_running: logger.info("🎵 音频播放模式已激活") self.audio_player.set_callbacks( on_audio=self._on_audio_data ) self.audio_player.start() # 视频播放模式:不需要处理文本消息 elif self.video_player and not self.video_player.is_running: logger.info("🎬 视频播放模式已激活") self.video_player.set_callbacks( on_frame=self._on_video_frame, on_audio=self._on_audio_frame ) self.video_player.start() else: # TTS 模式:处理文本消息 self.tts.put_msg_txt(msg,datainfo) def _on_video_frame(self, frame): """视频帧回调""" # 这里可以将帧传递给渲染管道 # 目前先简单处理,后续可以集成到渲染流程 pass def _on_audio_frame(self, audio): """音频帧回调(用于视频播放模式)""" # 这里可以将音频数据传递给音频播放 pass def _on_audio_data(self, audio_chunk): """音频数据回调(用于音频播放模式)""" # 将音频数据帧传递给渲染管道 # audio_chunk 是 16kHz 20ms 的 PCM 数据 (int16) try: # 转换为 float32 格式,这是 ASR 需要的格式 if audio_chunk.dtype == np.int16: audio_float = audio_chunk.astype(np.float32) / 32768.0 else: audio_float = audio_chunk.astype(np.float32) # 传递给 put_audio_frame 处理 self.put_audio_frame(audio_float, {}) except Exception as e: logger.error(f"处理音频数据回调时出错: {e}") def put_audio_frame(self,audio_chunk,datainfo:dict={}): #16khz 20ms pcm self.asr.put_audio_frame(audio_chunk,datainfo) def put_audio_file(self,filebyte,datainfo:dict={}): input_stream = BytesIO(filebyte) stream = self.__create_bytes_stream(input_stream) streamlen = stream.shape[0] idx=0 while streamlen >= self.chunk: #and self.state==State.RUNNING self.put_audio_frame(stream[idx:idx+self.chunk],datainfo) streamlen -= self.chunk idx += self.chunk def __create_bytes_stream(self,byte_stream): #byte_stream=BytesIO(buffer) stream, sample_rate = sf.read(byte_stream) # [T*sample_rate,] float64 logger.info(f'[INFO]put audio stream {sample_rate}: {stream.shape}') stream = stream.astype(np.float32) if stream.ndim > 1: logger.info(f'[WARN] audio has {stream.shape[1]} channels, only use the first.') stream = stream[:, 0] if sample_rate != self.sample_rate and stream.shape[0]>0: logger.info(f'[WARN] audio sample rate is {sample_rate}, resampling into {self.sample_rate}.') stream = resampy.resample(x=stream, sr_orig=sample_rate, sr_new=self.sample_rate) return stream def stop_tts(self): """停止当前TTS播放""" with self.interrupt_lock: # 添加锁保护 if hasattr(self.tts, 'state'): self.tts.state = State.PAUSE # 设置打断标志,强制中断当前播放 if hasattr(self.tts, 'set_interrupt_flag'): self.tts.set_interrupt_flag() # 清空TTS内部的普通队列,但保留高优先级队列(用于用户问题) if hasattr(self.tts, 'msgqueue'): with self.tts.msgqueue.mutex: self.tts.msgqueue.queue.clear() # 注意:不清空高优先级队列,以确保用户问题能够被处理 # 清空输入音频流缓冲区 if hasattr(self.tts, 'input_stream'): if hasattr(self.tts.input_stream, 'seek') and hasattr(self.tts.input_stream, 'truncate'): self.tts.input_stream.seek(0) self.tts.input_stream.truncate() # 在锁保护下更新状态 self.is_speaking_flag = False self.current_text = "" self.current_pos = 0 def flush_talk(self): """打断当前播放并保存未完成的内容""" with self.interrupt_lock: # 1. 保存当前正在播、但没播完的文本(关键) if self.current_text and self.current_pos < len(self.current_text): unfinished = self.current_text[self.current_pos:] if unfinished.strip(): try: self.interrupted_queue.put_nowait(unfinished) except queue.Full: logger.warning("中断队列已满,丢弃未完成的文本") # 2. 把 msg_queue 里剩余的全部移到 interrupted_queue(不丢弃) while not self.msg_queue.empty(): try: self.interrupted_queue.put_nowait(self.msg_queue.get_nowait()) except queue.Full: logger.warning("中断队列已满,丢弃剩余消息") break # 3.停止当前TTS播放 # 注意:stop_tts内部已经有锁,这里需要临时释放锁避免死锁 pass # 4. 在锁保护下清空当前播放状态 self.is_speaking_flag = False self.current_text = "" self.current_pos = 0 # 5. 不要重置打断标志,保持中断状态直到新内容开始播放 # 在锁外调用stop_tts避免死锁 self.stop_tts() # self.tts.flush_talk() # self.asr.flush_talk() def handle_interruption_during_intro(self, user_question, datainfo:dict={}): """处理在介绍过程中用户提问的逻辑,暂停介绍并优先回答问题""" with self.interrupt_lock: # 1. 保存当前正在播放的介绍内容到中断队列 if self.current_text and self.current_pos < len(self.current_text): unfinished_intro = self.current_text[self.current_pos:] if unfinished_intro.strip(): self.interrupted_queue.put(unfinished_intro) logger.info(f"保存未完成的介绍内容: {unfinished_intro[:50]}...") # 2. 将消息队列中剩余的介绍内容也保存到中断队列 remaining_items = [] while not self.msg_queue.empty(): remaining_items.append(self.msg_queue.get()) if remaining_items: for item in remaining_items: self.interrupted_queue.put(item) logger.info(f"保存了 {len(remaining_items)} 个剩余的介绍内容到中断队列") # 3. 停止TTS播放(注意:这会将TTS状态设为PAUSE,但我们随后会处理用户问题) self.stop_tts() # 4. 清空当前播放状态 self.is_speaking_flag = False self.current_text = "" self.current_pos = 0 # 5. 确保TTS状态设置为RUNNING以处理用户问题 if hasattr(self.tts, 'state'): self.tts.state = State.RUNNING # 6. 重置打断标志,允许播放新内容 if hasattr(self.tts, 'reset_interrupt_flag'): self.tts.reset_interrupt_flag() # 7. 优先播放用户问题 self.tts.put_high_priority_msg(user_question, datainfo) # 8. 立即尝试处理高优先级队列中的消息 # 通过直接调用TTS处理方法,绕过正常的队列处理延迟 logger.info("已暂停介绍并开始播放用户问题") # 9. 唤醒TTS处理线程,确保立即处理高优先级消息 # 通过向普通队列添加一个空消息来触发处理循环 if hasattr(self.tts, 'msgqueue'): self.tts.msgqueue.put(("", {})) def put_user_question(self, msg, datainfo:dict={}): """专门用于处理用户问题,优先级高于普通消息""" with self.interrupt_lock: # 强制中断当前播放 self.flush_talk() # 直接播放用户问题(使用高优先级队列) self.tts.put_high_priority_msg(msg, datainfo) # 确保TTS状态设置为RUNNING以处理用户问题 if hasattr(self.tts, 'state'): self.tts.state = State.RUNNING # 重置打断标志,允许播放新内容 if hasattr(self.tts, 'reset_interrupt_flag'): self.tts.reset_interrupt_flag() # 唤醒TTS处理线程,确保立即处理高优先级消息 # 通过向普通队列添加一个空消息来触发处理循环 if hasattr(self.tts, 'msgqueue'): self.tts.msgqueue.put(("", {})) def process_user_questions_and_resume(self): """处理用户问题并恢复之前的内容""" with self.interrupt_lock: # 检查是否有用户问题需要处理 while not self.user_question_queue.empty(): msg, datainfo = self.user_question_queue.get() # 直接播放用户问题 self.tts.put_msg_txt(msg, datainfo) # 检查是否有被中断的内容需要恢复 if not self.interrupted_queue.empty(): # 将被中断的内容放回主队列 while not self.interrupted_queue.empty(): msg = self.interrupted_queue.get() self.msg_queue.put(msg) def resume_interrupted(self): """把 interrupted_queue 里的内容放回播放队列""" with self.interrupt_lock: resumed = False items_to_resume = [] # 先把所有中断的内容取出 while not self.interrupted_queue.empty(): items_to_resume.append(self.interrupted_queue.get()) resumed = True # 再按顺序放回主队列 for item in items_to_resume: self.msg_queue.put(item) return resumed # """恢复播放被中断的消息""" # return self.tts.resume_interrupted() def start_intro_with_interrupt_capability(self, intro_text, datainfo:dict={}): """开始播放介绍内容,同时保持接收用户提问的能力""" # 将介绍内容放入主消息队列 self.put_msg_txt(intro_text, datainfo) def is_speaking(self)->bool: return self.speaking def __loadcustom(self): for item in self.opt.customopt: logger.info(item) input_img_list = glob.glob(os.path.join(item['imgpath'], '*.[jpJP][pnPN]*[gG]')) input_img_list = sorted(input_img_list, key=lambda x: int(os.path.splitext(os.path.basename(x))[0])) self.custom_img_cycle[item['audiotype']] = read_imgs(input_img_list) self.custom_audio_cycle[item['audiotype']], sample_rate = sf.read(item['audiopath'], dtype='float32') self.custom_audio_index[item['audiotype']] = 0 self.custom_index[item['audiotype']] = 0 self.custom_opt[item['audiotype']] = item def init_customindex(self): self.curr_state=0 for key in self.custom_audio_index: self.custom_audio_index[key]=0 for key in self.custom_index: self.custom_index[key]=0 def notify(self,eventpoint): logger.info("notify:%s",eventpoint) # 检查是否是用户问题回答的结束事件 if isinstance(eventpoint, dict) and eventpoint.get('status') == 'end': # 如果是用户问题的回答结束,检查是否需要恢复被中断的内容 # 检查这个结束事件是否与用户问题相关 if 'knowledge_base' in eventpoint or self.interrupted_queue.qsize() > 0: # 这是一个用户问题的回答结束,检查是否需要恢复被中断的内容 import threading # 使用线程延时一小段时间再恢复,确保当前事件处理完成 timer = threading.Timer(0.5, self._try_resume_after_qa) timer.start() # 注意:介绍播放的自动续播不再通过notify触发,而是在TTS处理完成后自动触发 # 这样可以确保前一条完全播放完成后再播放下一条 def _try_resume_after_qa(self): """尝试恢复问答后的内容""" with self.interrupt_lock: # 检查是否还有被中断的内容需要恢复 if not self.interrupted_queue.empty(): logger.info("检测到问答完成,恢复被中断的内容...") # 有被中断的内容需要恢复,调用恢复方法 self.resume_interrupted() def _continue_intro_play(self): """继续播放下一条介绍内容""" try: # 检查是否处于介绍播放状态 if not (hasattr(self, 'intro_play_state') and self.intro_play_state.get('is_playing', False) and not self.intro_play_state.get('is_paused', False)): logger.info("不处于介绍播放状态,跳过自动续播") return # 检查TTS队列是否为空,如果不为空说明还有消息在等待,不放入新消息 if hasattr(self, 'tts') and hasattr(self.tts, 'msgqueue'): if not self.tts.msgqueue.empty(): logger.info("TTS消息队列不为空,等待队列清空后再播放下一条") # 设置等待标志,并设置定时器再次检查 self.intro_play_state['is_waiting_next'] = True import threading timer = threading.Timer(2.0, self._continue_intro_play) timer.start() return # 设置等待标志,防止重复触发 if hasattr(self, 'intro_play_state'): self.intro_play_state['is_waiting_next'] = True if hasattr(self, 'knowledge_intro_instance') and self.knowledge_intro_instance: # 获取下一条介绍内容 next_content = self.knowledge_intro_instance._get_next_content() if next_content and next_content.get('text'): logger.info(f"自动续播介绍内容 - 序号:{next_content.get('play_index', 1)}/{next_content.get('total_count', 1)}") # 更新播放状态 if hasattr(self, 'intro_play_state'): self.intro_play_state["last_played_index"] = next_content.get("play_index", 1) # 计算停顿时间:根据文本长度估算播放时间 + 额外停顿 # 语速约 3 字/秒(较慢的播报速度) text = next_content.get('text', '') estimated_play_time = len(text) / 3 # 估算播放时间(秒) pause_time = estimated_play_time + 3 # 播放时间 + 3秒停顿 logger.info(f"当前文案长度:{len(text)}字,估算播放时间:{estimated_play_time:.1f}秒,播放下一条需等待:{pause_time:.1f}秒") # 播放下一条内容 self.put_msg_txt(next_content['text']) # 设置定时器,在播放完成后继续播放下一条 import threading timer = threading.Timer(pause_time, self._continue_intro_play) timer.start() else: # 没有更多内容了,标记播放完成 logger.info("介绍内容全部播放完成") if hasattr(self, 'intro_play_state'): self.intro_play_state["is_playing"] = False self.intro_play_state['is_waiting_next'] = False except Exception as e: logger.error(f"自动续播介绍内容时出错: {e}") # 确保错误时也重置等待标志 if hasattr(self, 'intro_play_state'): self.intro_play_state['is_waiting_next'] = False def _reset_waiting_flag(self): """重置等待标志,允许播放下一条""" if hasattr(self, 'intro_play_state'): self.intro_play_state['is_waiting_next'] = False logger.info("等待时间结束,可以播放下一条") def start_recording(self): """开始录制视频""" if self.recording: return command = ['ffmpeg', '-y', '-an', '-f', 'rawvideo', '-vcodec','rawvideo', '-pix_fmt', 'bgr24', #像素格式 '-s', "{}x{}".format(self.width, self.height), '-r', str(25), '-i', '-', '-pix_fmt', 'yuv420p', '-vcodec', "h264", #'-f' , 'flv', f'temp{self.opt.sessionid}.mp4'] self._record_video_pipe = subprocess.Popen(command, shell=False, stdin=subprocess.PIPE) acommand = ['ffmpeg', '-y', '-vn', '-f', 's16le', #'-acodec','pcm_s16le', '-ac', '1', '-ar', '16000', '-i', '-', '-acodec', 'aac', #'-f' , 'wav', f'temp{self.opt.sessionid}.aac'] self._record_audio_pipe = subprocess.Popen(acommand, shell=False, stdin=subprocess.PIPE) self.recording = True # self.recordq_video.queue.clear() # self.recordq_audio.queue.clear() # self.container = av.open(path, mode="w") # process_thread = Thread(target=self.record_frame, args=()) # process_thread.start() def record_video_data(self,image): if self.width == 0: logger.debug("image.shape: %s", image.shape) self.height,self.width,_ = image.shape if self.recording: self._record_video_pipe.stdin.write(image.tostring()) def record_audio_data(self,frame): if self.recording: self._record_audio_pipe.stdin.write(frame.tostring()) def start_rtmp_push(self): """启动 RTMP 推流(单进程 FFmpeg,视频+音频共用一套时钟)""" if not self._rtmp_push_url or self._rtmp_pushing: return if self.width == 0 or self.height == 0: logger.warning('RTMP 推流: 视频尺寸未初始化,等待第一帧') return self._rtmp_pushing = True # ffmpeg 命令:直接通过 stdin 管道接收视频和音频 # 使用 pipe:0 接收视频,pipe:1 接收音频(需要两个独立的 FFmpeg 进程或使用 filter_complex) # 但更简单的方式是:使用一个 FFmpeg 进程,视频通过 stdin,音频通过第二个管道 ffmpeg_cmd = [ 'ffmpeg', '-y', # 视频输入:通过管道 '-f', 'rawvideo', '-vcodec', 'rawvideo', '-pix_fmt', 'bgr24', '-s', f'{self.width}x{self.height}', '-r', '25', '-i', 'pipe:0', # 从 stdin 读取视频 # 音频输入:通过第二个管道 '-f', 's16le', '-acodec', 'pcm_s16le', '-ar', '16000', '-ac', '1', '-i', 'pipe:1', # 从 fd 1 读取音频(需要特殊处理) # 编码输出 '-c:v', 'libx264', '-preset', 'ultrafast', '-tune', 'zerolatency', '-pix_fmt', 'yuv420p', '-g', '50', '-keyint_min', '25', '-c:a', 'aac', '-b:a', '128k', '-ar', '44100', '-f', 'flv', self._rtmp_push_url ] # 注意:FFmpeg 不支持同时从两个 pipe 读取,需要使用 FIFO # 所以我们保留 FIFO 方案,但去掉手动速率控制,让 FFmpeg 自动处理 # 创建 FIFO import tempfile self._rtmp_fifo_dir = tempfile.mkdtemp(prefix='rtmp_fifo_') self._rtmp_video_fifo = os.path.join(self._rtmp_fifo_dir, 'video') self._rtmp_audio_fifo = os.path.join(self._rtmp_fifo_dir, 'audio') os.mkfifo(self._rtmp_video_fifo) os.mkfifo(self._rtmp_audio_fifo) # 视频和音频缓冲队列 self._rtmp_video_queue = queue.Queue(maxsize=30) self._rtmp_audio_queue = queue.Queue(maxsize=100) # 重写 ffmpeg 命令使用 FIFO # 关键:不使用 -re(会阻塞 FIFO),改用 -vf realtime/-af arealtime 对输出限速 # 再配合 -fflags +genpts 与 -vsync cfr 保证恒定 25fps,避免下游转推看到 2x 速度 ffmpeg_cmd = [ 'ffmpeg', '-y', '-fflags', '+genpts', # 视频输入(不使用 -re,让 FIFO 自然反压控制速率) '-f', 'rawvideo', '-vcodec', 'rawvideo', '-pix_fmt', 'bgr24', '-s', f'{self.width}x{self.height}', '-r', '25', '-thread_queue_size', '1024', '-i', self._rtmp_video_fifo, # 音频输入(16kHz 单声道 PCM) '-f', 's16le', '-acodec', 'pcm_s16le', '-ar', '16000', '-ac', '1', '-thread_queue_size', '1024', '-i', self._rtmp_audio_fifo, # 滤镜:按墙钟实时限速,视频+音频同步限速到 1x,避免 SRS 转推出现 speed=2x '-vf', 'realtime', '-af', 'arealtime,aresample=async=1:first_pts=0', # 编码输出(音频保持 16kHz 不重采样,避免时间戳错乱) '-c:v', 'libx264', '-preset', 'ultrafast', '-tune', 'zerolatency', '-pix_fmt', 'yuv420p', '-g', '50', '-keyint_min', '25', '-vsync', 'cfr', '-r', '25', # 强制输出恒定 25fps '-c:a', 'aac', '-b:a', '128k', '-ar', '16000', # 保持 16kHz! '-f', 'flv', self._rtmp_push_url ] try: # 启动 ffmpeg 进程 self._rtmp_pipe = subprocess.Popen( ffmpeg_cmd, shell=False, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE ) logger.info(f'RTMP ffmpeg 进程已启动: PID={self._rtmp_pipe.pid}') # 视频 FIFO 写入线程(无速率控制,FFmpeg 自动处理) def video_fifo_writer(): try: vfd = os.open(self._rtmp_video_fifo, os.O_WRONLY) logger.info('RTMP 视频 FIFO 已打开') while self._rtmp_pushing: try: data = self._rtmp_video_queue.get(timeout=1.0) if data is None: break os.write(vfd, data) except queue.Empty: continue except Exception as e: logger.error(f'RTMP 视频写入线程异常: {e}') self._rtmp_pushing = False finally: try: os.close(vfd) except: pass # 音频 FIFO 写入线程(无速率控制,FFmpeg 自动处理) def audio_fifo_writer(): try: afd = os.open(self._rtmp_audio_fifo, os.O_WRONLY) logger.info('RTMP 音频 FIFO 已打开') # 静音帧:匹配一个视频帧(40ms)的 PCM 数据量 # 16kHz * 0.04s * 2字节(单声道 s16le) = 1280 字节 silence_frame = b'\x00' * 1280 # 40ms 静音(严格匹配视频帧率,避免音频过速) while self._rtmp_pushing: try: data = self._rtmp_audio_queue.get(timeout=0.04) if data is None: break os.write(afd, data) except queue.Empty: # 队列为空(TTS未说话),写入一帧 40ms 静音保持ffmpeg不卡顿 try: os.write(afd, silence_frame) except: break except Exception as e: logger.error(f'RTMP 音频写入线程异常: {e}') self._rtmp_pushing = False finally: try: os.close(afd) except: pass self._rtmp_video_thread = Thread(target=video_fifo_writer, daemon=True, name='rtmp_video_writer') self._rtmp_audio_thread = Thread(target=audio_fifo_writer, daemon=True, name='rtmp_audio_writer') self._rtmp_video_thread.start() self._rtmp_audio_thread.start() # stderr 读取线程 def read_stderr(): for line in iter(self._rtmp_pipe.stderr.readline, b''): msg = line.decode('utf-8', errors='ignore').strip() if msg: logger.debug(f'FFmpeg RTMP: {msg}') Thread(target=read_stderr, daemon=True, name='rtmp_stderr_reader').start() logger.info(f'RTMP 音视频推流已启动: {self._rtmp_push_url}') except Exception as e: logger.error(f'RTMP 推流启动失败: {e}') self._rtmp_pushing = False self._cleanup_rtmp_fifos() def rtmp_push_video_data(self, image): """将视频帧放入 RTMP 推流队列""" if not self._rtmp_pushing: return try: self._rtmp_video_queue.put_nowait(image.tobytes()) except queue.Full: pass # 丢弃旧帧,避免延迟累积 def rtmp_push_audio_data(self, frame): """将音频帧放入 RTMP 推流队列""" if not self._rtmp_pushing: return try: self._rtmp_audio_queue.put_nowait(frame.tobytes()) except queue.Full: pass # 丢弃旧帧,避免延迟累积 def stop_rtmp_push(self): """停止 RTMP 推流""" if not self._rtmp_pushing: return self._rtmp_pushing = False # 发送停止信号给写入线程 try: self._rtmp_video_queue.put(None, timeout=1) except: pass try: self._rtmp_audio_queue.put(None, timeout=1) except: pass # 等待写入线程结束 for t in [getattr(self, '_rtmp_video_thread', None), getattr(self, '_rtmp_audio_thread', None)]: if t and t.is_alive(): t.join(timeout=3) # 终止 ffmpeg if self._rtmp_pipe is not None: try: self._rtmp_pipe.wait(timeout=5) except: self._rtmp_pipe.kill() self._cleanup_rtmp_fifos() logger.info('RTMP 推流已停止') def _cleanup_rtmp_fifos(self): """清理 FIFO 文件和目录""" for path in [getattr(self, '_rtmp_video_fifo', ''), getattr(self, '_rtmp_audio_fifo', '')]: if path and os.path.exists(path): try: os.unlink(path) except: pass fifo_dir = getattr(self, '_rtmp_fifo_dir', '') if fifo_dir and os.path.isdir(fifo_dir): try: os.rmdir(fifo_dir) except: pass # def record_frame(self): # videostream = self.container.add_stream("libx264", rate=25) # videostream.codec_context.time_base = Fraction(1, 25) # audiostream = self.container.add_stream("aac") # audiostream.codec_context.time_base = Fraction(1, 16000) # init = True # framenum = 0 # while self.recording: # try: # videoframe = self.recordq_video.get(block=True, timeout=1) # videoframe.pts = framenum #int(round(framenum*0.04 / videostream.codec_context.time_base)) # videoframe.dts = videoframe.pts # if init: # videostream.width = videoframe.width # videostream.height = videoframe.height # init = False # for packet in videostream.encode(videoframe): # self.container.mux(packet) # for k in range(2): # audioframe = self.recordq_audio.get(block=True, timeout=1) # audioframe.pts = int(round((framenum*2+k)*0.02 / audiostream.codec_context.time_base)) # audioframe.dts = audioframe.pts # for packet in audiostream.encode(audioframe): # self.container.mux(packet) # framenum += 1 # except queue.Empty: # print('record queue empty,') # continue # except Exception as e: # print(e) # #break # for packet in videostream.encode(None): # self.container.mux(packet) # for packet in audiostream.encode(None): # self.container.mux(packet) # self.container.close() # self.recordq_video.queue.clear() # self.recordq_audio.queue.clear() # print('record thread stop') def stop_recording(self): """停止录制视频""" if not self.recording: return self.recording = False self._record_video_pipe.stdin.close() #wait() self._record_video_pipe.wait() self._record_audio_pipe.stdin.close() self._record_audio_pipe.wait() cmd_combine_audio = f"ffmpeg -y -i temp{self.opt.sessionid}.aac -i temp{self.opt.sessionid}.mp4 -c:v copy -c:a copy data/record.mp4" os.system(cmd_combine_audio) #os.remove(output_path) def mirror_index(self,size, index): #size = len(self.coord_list_cycle) turn = index // size res = index % size if turn % 2 == 0: return res else: return size - res - 1 def get_audio_stream(self,audiotype): idx = self.custom_audio_index[audiotype] stream = self.custom_audio_cycle[audiotype][idx:idx+self.chunk] self.custom_audio_index[audiotype] += self.chunk if self.custom_audio_index[audiotype]>=self.custom_audio_cycle[audiotype].shape[0]: self.curr_state = 1 #当前视频不循环播放,切换到静音状态 return stream def set_custom_state(self,audiotype, reinit=True): logger.debug('set_custom_state: %s', audiotype) if self.custom_audio_index.get(audiotype) is None: return self.curr_state = audiotype if reinit: self.custom_audio_index[audiotype] = 0 self.custom_index[audiotype] = 0 def process_frames(self,quit_event,loop=None,audio_track=None,video_track=None): enable_transition = False # 设置为False禁用过渡效果,True启用 if enable_transition: _last_speaking = False _transition_start = time.time() _transition_duration = 0.1 # 过渡时间 _last_silent_frame = None # 静音帧缓存 _last_speaking_frame = None # 说话帧缓存 if self.opt.transport=='virtualcam': import pyvirtualcam vircam = None audio_tmp = queue.Queue(maxsize=3000) audio_thread = Thread(target=play_audio, args=(quit_event,audio_tmp,), daemon=True, name="pyaudio_stream") audio_thread.start() # RTMP 推流: 延迟启动,等待第一帧确定视频尺寸 _rtmp_started = False _rtmp_attempted = False # 记录是否已经尝试过启动推流 while not quit_event.is_set(): try: res_frame,idx,audio_frames = self.res_frame_queue.get(block=True, timeout=1) except queue.Empty: continue if enable_transition: # 检测状态变化 current_speaking = not (audio_frames[0][1]!=0 and audio_frames[1][1]!=0) if current_speaking != _last_speaking: logger.info(f"状态切换:{'说话' if _last_speaking else '静音'} → {'说话' if current_speaking else '静音'}") _transition_start = time.time() _last_speaking = current_speaking if audio_frames[0][1]!=0 and audio_frames[1][1]!=0: #全为静音数据,只需要取fullimg self.speaking = False audiotype = audio_frames[0][1] if self.custom_index.get(audiotype) is not None: #有自定义视频 mirindex = self.mirror_index(len(self.custom_img_cycle[audiotype]),self.custom_index[audiotype]) target_frame = self.custom_img_cycle[audiotype][mirindex] self.custom_index[audiotype] += 1 else: target_frame = self.frame_list_cycle[idx] if enable_transition: # 说话→静音过渡 if time.time() - _transition_start < _transition_duration and _last_speaking_frame is not None: alpha = min(1.0, (time.time() - _transition_start) / _transition_duration) combine_frame = cv2.addWeighted(_last_speaking_frame, 1-alpha, target_frame, alpha, 0) else: combine_frame = target_frame # 缓存静音帧 _last_silent_frame = combine_frame.copy() else: combine_frame = target_frame else: self.speaking = True try: current_frame = self.paste_back_frame(res_frame,idx) except Exception as e: logger.warning(f"paste_back_frame error: {e}") continue if enable_transition: # 静音→说话过渡 if time.time() - _transition_start < _transition_duration and _last_silent_frame is not None: alpha = min(1.0, (time.time() - _transition_start) / _transition_duration) combine_frame = cv2.addWeighted(_last_silent_frame, 1-alpha, current_frame, alpha, 0) else: combine_frame = current_frame # 缓存说话帧 _last_speaking_frame = combine_frame.copy() else: combine_frame = current_frame #cv2.putText(combine_frame, "LiveTalking", (10, 20), cv2.FONT_HERSHEY_SIMPLEX, 0.3, (128,128,128), 1) if self.opt.transport=='virtualcam': if vircam==None: height, width,_= combine_frame.shape vircam = pyvirtualcam.Camera(width=width, height=height, fps=25, fmt=pyvirtualcam.PixelFormat.BGR,print_fps=True) vircam.send(combine_frame) elif video_track is not None and loop is not None: #webrtc image = combine_frame new_frame = VideoFrame.from_ndarray(image, format="bgr24") asyncio.run_coroutine_threadsafe(video_track._queue.put((new_frame,None)), loop) self.record_video_data(combine_frame) # RTMP 推流: 视频帧 if self._rtmp_push_url: if not _rtmp_started and not _rtmp_attempted: # 首次获得视频帧,启动 RTMP 推流 if self.width > 0 and self.height > 0: self.start_rtmp_push() _rtmp_attempted = True # 标记已尝试 if self._rtmp_pushing: # 启动成功 _rtmp_started = True if _rtmp_started: # 只有启动成功后才推流 self.rtmp_push_video_data(combine_frame) for audio_frame in audio_frames: frame,type,eventpoint = audio_frame frame = (frame * 32767).astype(np.int16) if self.opt.transport=='virtualcam': audio_tmp.put(frame.tobytes()) #TODO elif audio_track is not None and loop is not None: #webrtc new_frame = AudioFrame(format='s16', layout='mono', samples=frame.shape[0]) new_frame.planes[0].update(frame.tobytes()) new_frame.sample_rate=16000 asyncio.run_coroutine_threadsafe(audio_track._queue.put((new_frame,eventpoint)), loop) self.record_audio_data(frame) # RTMP 推流: 音频帧 if _rtmp_started: self.rtmp_push_audio_data(frame) if self.opt.transport=='virtualcam': vircam.sleep_until_next_frame() # 停止 RTMP 推流 self.stop_rtmp_push() if self.opt.transport=='virtualcam': audio_thread.join() vircam.close() logger.info('basereal process_frames thread stop') # def process_custom(self,audiotype:int,idx:int): # if self.curr_state!=audiotype: #从推理切到口播 # if idx in self.switch_pos: #在卡点位置可以切换 # self.curr_state=audiotype # self.custom_index=0 # else: # self.custom_index+=1