| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669 |
- ###############################################################################
- # Copyright (C) 2024 LiveTalking@lipku https://github.com/lipku/LiveTalking
- # email: lipku@foxmail.com
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- ###############################################################################
- import math
- import torch
- import numpy as np
- import subprocess
- import os
- import time
- import cv2
- import glob
- import resampy
- import queue
- from queue import Queue
- from threading import Thread, Event
- from io import BytesIO
- import soundfile as sf
- import asyncio
- from av import AudioFrame, VideoFrame
- import av
- from fractions import Fraction
- from ttsreal import EdgeTTS,SovitsTTS,XTTS,CosyVoiceTTS,FishTTS,TencentTTS,DoubaoTTS,IndexTTS2,AzureTTS,State
- from logger import logger
- from tqdm import tqdm
- from threading import Lock
- def read_imgs(img_list):
- frames = []
- logger.info('reading images...')
- for img_path in tqdm(img_list):
- frame = cv2.imread(img_path)
- frames.append(frame)
- return frames
- def play_audio(quit_event,queue):
- import pyaudio
- p = pyaudio.PyAudio()
- stream = p.open(
- rate=16000,
- channels=1,
- format=8,
- output=True,
- output_device_index=1,
- )
- stream.start_stream()
- # while queue.qsize() <= 0:
- # time.sleep(0.1)
- while not quit_event.is_set():
- stream.write(queue.get(block=True))
- stream.close()
- class BaseReal:
- def __init__(self, opt,model,avatar):
- #原本的播放序列
- self.msg_queue = queue.Queue()
- self.interrupted_queue = queue.Queue() # 用于存储被中断的内容(如介绍)
- self.user_question_queue = queue.Queue() # 用于存储用户问题(优先级更高)
- self.is_speaking_flag = False
- self.current_text = "" # 当前正在播放的文本
- self.current_pos = 0 # 当前正在播放的文本位置
- self.interrupt_lock = Lock() #线程安全锁
- self.opt = opt
- self.sample_rate = 16000
- self.chunk = self.sample_rate // opt.fps # 320 samples per chunk (20ms * 16000 / 1000)
- self.sessionid = self.opt.sessionid
- if opt.tts == "edgetts":
- self.tts = EdgeTTS(opt,self)
- elif opt.tts == "qwen3tts":
- from qwen3tts import Qwen3TTS
- self.tts = Qwen3TTS(opt,self)
- elif opt.tts == "gpt-sovits":
- self.tts = SovitsTTS(opt,self)
- elif opt.tts == "xtts":
- self.tts = XTTS(opt,self)
- elif opt.tts == "cosyvoice":
- self.tts = CosyVoiceTTS(opt,self)
- elif opt.tts == "fishtts":
- self.tts = FishTTS(opt,self)
- elif opt.tts == "tencent":
- self.tts = TencentTTS(opt,self)
- elif opt.tts == "doubao":
- self.tts = DoubaoTTS(opt,self)
- elif opt.tts == "indextts2":
- self.tts = IndexTTS2(opt,self)
- elif opt.tts == "azuretts":
- self.tts = AzureTTS(opt,self)
- else:
- # 默认使用 edgetts
- logger.warning(f"未知的 TTS 类型: {opt.tts},使用 edgetts 作为默认")
- self.tts = EdgeTTS(opt,self)
- self.speaking = False
- self.recording = False
- self._record_video_pipe = None
- self._record_audio_pipe = None
- self.width = self.height = 0
- self.curr_state=0
- self.custom_img_cycle = {}
- self.custom_audio_cycle = {}
- self.custom_audio_index = {}
- self.custom_index = {}
- self.custom_opt = {}
- self.__loadcustom()
- def put_msg_txt(self,msg,datainfo:dict={}):
- self.tts.put_msg_txt(msg,datainfo)
-
- def put_audio_frame(self,audio_chunk,datainfo:dict={}): #16khz 20ms pcm
- self.asr.put_audio_frame(audio_chunk,datainfo)
- def put_audio_file(self,filebyte,datainfo:dict={}):
- input_stream = BytesIO(filebyte)
- stream = self.__create_bytes_stream(input_stream)
- streamlen = stream.shape[0]
- idx=0
- while streamlen >= self.chunk: #and self.state==State.RUNNING
- self.put_audio_frame(stream[idx:idx+self.chunk],datainfo)
- streamlen -= self.chunk
- idx += self.chunk
-
- def __create_bytes_stream(self,byte_stream):
- #byte_stream=BytesIO(buffer)
- stream, sample_rate = sf.read(byte_stream) # [T*sample_rate,] float64
- logger.info(f'[INFO]put audio stream {sample_rate}: {stream.shape}')
- stream = stream.astype(np.float32)
- if stream.ndim > 1:
- logger.info(f'[WARN] audio has {stream.shape[1]} channels, only use the first.')
- stream = stream[:, 0]
-
- if sample_rate != self.sample_rate and stream.shape[0]>0:
- logger.info(f'[WARN] audio sample rate is {sample_rate}, resampling into {self.sample_rate}.')
- stream = resampy.resample(x=stream, sr_orig=sample_rate, sr_new=self.sample_rate)
- return stream
- def stop_tts(self):
- """停止当前TTS播放"""
- if hasattr(self.tts, 'state'):
- self.tts.state = State.PAUSE
- # 设置打断标志,强制中断当前播放
- if hasattr(self.tts, 'set_interrupt_flag'):
- self.tts.set_interrupt_flag()
- # 清空TTS内部的普通队列,但保留高优先级队列(用于用户问题)
- if hasattr(self.tts, 'msgqueue'):
- with self.tts.msgqueue.mutex:
- self.tts.msgqueue.queue.clear()
- # 注意:不清空高优先级队列,以确保用户问题能够被处理
- # 清空输入音频流缓冲区
- if hasattr(self.tts, 'input_stream'):
- if hasattr(self.tts.input_stream, 'seek') and hasattr(self.tts.input_stream, 'truncate'):
- self.tts.input_stream.seek(0)
- self.tts.input_stream.truncate()
- self.is_speaking_flag = False
- self.current_text = ""
- self.current_pos = 0
- def flush_talk(self):
- """打断当前播放并保存未完成的内容"""
- with self.interrupt_lock:
- # 1. 保存当前正在播、但没播完的文本(关键)
- if self.current_text and self.current_pos < len(self.current_text):
- unfinished = self.current_text[self.current_pos:]
- if unfinished.strip():
- self.interrupted_queue.put(unfinished)
-
- # 2. 把 msg_queue 里剩余的全部移到 interrupted_queue(不丢弃)
- while not self.msg_queue.empty():
- self.interrupted_queue.put(self.msg_queue.get())
-
- # 3.停止当前TTS播放
- self.stop_tts()
-
- # 4. 清空当前播放状态
- self.is_speaking_flag = False
- self.current_text = ""
- self.current_pos = 0
-
- # 5. 不要重置打断标志,保持中断状态直到新内容开始播放
- # self.tts.flush_talk()
- # self.asr.flush_talk()
-
- def handle_interruption_during_intro(self, user_question, datainfo:dict={}):
- """处理在介绍过程中用户提问的逻辑,暂停介绍并优先回答问题"""
- with self.interrupt_lock:
- # 1. 保存当前正在播放的介绍内容到中断队列
- if self.current_text and self.current_pos < len(self.current_text):
- unfinished_intro = self.current_text[self.current_pos:]
- if unfinished_intro.strip():
- self.interrupted_queue.put(unfinished_intro)
- logger.info(f"保存未完成的介绍内容: {unfinished_intro[:50]}...")
-
- # 2. 将消息队列中剩余的介绍内容也保存到中断队列
- remaining_items = []
- while not self.msg_queue.empty():
- remaining_items.append(self.msg_queue.get())
-
- if remaining_items:
- for item in remaining_items:
- self.interrupted_queue.put(item)
- logger.info(f"保存了 {len(remaining_items)} 个剩余的介绍内容到中断队列")
-
- # 3. 停止TTS播放(注意:这会将TTS状态设为PAUSE,但我们随后会处理用户问题)
- self.stop_tts()
-
- # 4. 清空当前播放状态
- self.is_speaking_flag = False
- self.current_text = ""
- self.current_pos = 0
-
- # 5. 确保TTS状态设置为RUNNING以处理用户问题
- if hasattr(self.tts, 'state'):
- self.tts.state = State.RUNNING
-
- # 6. 重置打断标志,允许播放新内容
- if hasattr(self.tts, 'reset_interrupt_flag'):
- self.tts.reset_interrupt_flag()
-
- # 7. 优先播放用户问题
- self.tts.put_high_priority_msg(user_question, datainfo)
-
- # 8. 立即尝试处理高优先级队列中的消息
- # 通过直接调用TTS处理方法,绕过正常的队列处理延迟
- logger.info("已暂停介绍并开始播放用户问题")
-
- # 9. 唤醒TTS处理线程,确保立即处理高优先级消息
- # 通过向普通队列添加一个空消息来触发处理循环
- if hasattr(self.tts, 'msgqueue'):
- self.tts.msgqueue.put(("", {}))
-
- def put_user_question(self, msg, datainfo:dict={}):
- """专门用于处理用户问题,优先级高于普通消息"""
- with self.interrupt_lock:
- # 强制中断当前播放
- self.flush_talk()
-
- # 直接播放用户问题(使用高优先级队列)
- self.tts.put_high_priority_msg(msg, datainfo)
-
- # 确保TTS状态设置为RUNNING以处理用户问题
- if hasattr(self.tts, 'state'):
- self.tts.state = State.RUNNING
-
- # 重置打断标志,允许播放新内容
- if hasattr(self.tts, 'reset_interrupt_flag'):
- self.tts.reset_interrupt_flag()
-
- # 唤醒TTS处理线程,确保立即处理高优先级消息
- # 通过向普通队列添加一个空消息来触发处理循环
- if hasattr(self.tts, 'msgqueue'):
- self.tts.msgqueue.put(("", {}))
-
- def process_user_questions_and_resume(self):
- """处理用户问题并恢复之前的内容"""
- with self.interrupt_lock:
- # 检查是否有用户问题需要处理
- while not self.user_question_queue.empty():
- msg, datainfo = self.user_question_queue.get()
- # 直接播放用户问题
- self.tts.put_msg_txt(msg, datainfo)
-
- # 检查是否有被中断的内容需要恢复
- if not self.interrupted_queue.empty():
- # 将被中断的内容放回主队列
- while not self.interrupted_queue.empty():
- msg = self.interrupted_queue.get()
- self.msg_queue.put(msg)
-
- def resume_interrupted(self):
- """把 interrupted_queue 里的内容放回播放队列"""
- with self.interrupt_lock:
- resumed = False
- items_to_resume = []
- # 先把所有中断的内容取出
- while not self.interrupted_queue.empty():
- items_to_resume.append(self.interrupted_queue.get())
- resumed = True
-
- # 再按顺序放回主队列
- for item in items_to_resume:
- self.msg_queue.put(item)
-
- return resumed
- # """恢复播放被中断的消息"""
- # return self.tts.resume_interrupted()
-
- def start_intro_with_interrupt_capability(self, intro_text, datainfo:dict={}):
- """开始播放介绍内容,同时保持接收用户提问的能力"""
- # 将介绍内容放入主消息队列
- self.put_msg_txt(intro_text, datainfo)
- def is_speaking(self)->bool:
- return self.speaking
-
- def __loadcustom(self):
- for item in self.opt.customopt:
- logger.info(item)
- input_img_list = glob.glob(os.path.join(item['imgpath'], '*.[jpJP][pnPN]*[gG]'))
- input_img_list = sorted(input_img_list, key=lambda x: int(os.path.splitext(os.path.basename(x))[0]))
- self.custom_img_cycle[item['audiotype']] = read_imgs(input_img_list)
- self.custom_audio_cycle[item['audiotype']], sample_rate = sf.read(item['audiopath'], dtype='float32')
- self.custom_audio_index[item['audiotype']] = 0
- self.custom_index[item['audiotype']] = 0
- self.custom_opt[item['audiotype']] = item
- def init_customindex(self):
- self.curr_state=0
- for key in self.custom_audio_index:
- self.custom_audio_index[key]=0
- for key in self.custom_index:
- self.custom_index[key]=0
- def notify(self,eventpoint):
- logger.info("notify:%s",eventpoint)
-
- # 检查是否是用户问题回答的结束事件
- if isinstance(eventpoint, dict) and eventpoint.get('status') == 'end':
- # 如果是用户问题的回答结束,检查是否需要恢复被中断的内容
- # 检查这个结束事件是否与用户问题相关
- if 'knowledge_base' in eventpoint or self.interrupted_queue.qsize() > 0:
- # 这是一个用户问题的回答结束,检查是否需要恢复被中断的内容
- import threading
- # 使用线程延时一小段时间再恢复,确保当前事件处理完成
- timer = threading.Timer(0.5, self._try_resume_after_qa)
- timer.start()
- # 注意:介绍播放的自动续播不再通过notify触发,而是在TTS处理完成后自动触发
- # 这样可以确保前一条完全播放完成后再播放下一条
-
- def _try_resume_after_qa(self):
- """尝试恢复问答后的内容"""
- with self.interrupt_lock:
- # 检查是否还有被中断的内容需要恢复
- if not self.interrupted_queue.empty():
- logger.info("检测到问答完成,恢复被中断的内容...")
- # 有被中断的内容需要恢复,调用恢复方法
- self.resume_interrupted()
- def _continue_intro_play(self):
- """继续播放下一条介绍内容"""
- try:
- # 检查是否处于介绍播放状态
- if not (hasattr(self, 'intro_play_state') and
- self.intro_play_state.get('is_playing', False) and
- not self.intro_play_state.get('is_paused', False)):
- logger.info("不处于介绍播放状态,跳过自动续播")
- return
-
- # 检查TTS队列是否为空,如果不为空说明还有消息在等待,不放入新消息
- if hasattr(self, 'tts') and hasattr(self.tts, 'msgqueue'):
- if not self.tts.msgqueue.empty():
- logger.info("TTS消息队列不为空,等待队列清空后再播放下一条")
- # 设置等待标志,并设置定时器再次检查
- self.intro_play_state['is_waiting_next'] = True
- import threading
- timer = threading.Timer(2.0, self._continue_intro_play)
- timer.start()
- return
-
- # 设置等待标志,防止重复触发
- if hasattr(self, 'intro_play_state'):
- self.intro_play_state['is_waiting_next'] = True
-
- if hasattr(self, 'knowledge_intro_instance') and self.knowledge_intro_instance:
- # 获取下一条介绍内容
- next_content = self.knowledge_intro_instance._get_next_content()
- if next_content and next_content.get('text'):
- logger.info(f"自动续播介绍内容 - 序号:{next_content.get('play_index', 1)}/{next_content.get('total_count', 1)}")
- # 更新播放状态
- if hasattr(self, 'intro_play_state'):
- self.intro_play_state["last_played_index"] = next_content.get("play_index", 1)
-
- # 计算停顿时间:根据文本长度估算播放时间 + 额外停顿
- # 语速约 3 字/秒(较慢的播报速度)
- text = next_content.get('text', '')
- estimated_play_time = len(text) / 3 # 估算播放时间(秒)
- pause_time = estimated_play_time + 3 # 播放时间 + 3秒停顿
-
- logger.info(f"当前文案长度:{len(text)}字,估算播放时间:{estimated_play_time:.1f}秒,播放下一条需等待:{pause_time:.1f}秒")
-
- # 播放下一条内容
- self.put_msg_txt(next_content['text'])
-
- # 设置定时器,在播放完成后继续播放下一条
- import threading
- timer = threading.Timer(pause_time, self._continue_intro_play)
- timer.start()
- else:
- # 没有更多内容了,标记播放完成
- logger.info("介绍内容全部播放完成")
- if hasattr(self, 'intro_play_state'):
- self.intro_play_state["is_playing"] = False
- self.intro_play_state['is_waiting_next'] = False
- except Exception as e:
- logger.error(f"自动续播介绍内容时出错: {e}")
- # 确保错误时也重置等待标志
- if hasattr(self, 'intro_play_state'):
- self.intro_play_state['is_waiting_next'] = False
-
- def _reset_waiting_flag(self):
- """重置等待标志,允许播放下一条"""
- if hasattr(self, 'intro_play_state'):
- self.intro_play_state['is_waiting_next'] = False
- logger.info("等待时间结束,可以播放下一条")
- def start_recording(self):
- """开始录制视频"""
- if self.recording:
- return
- command = ['ffmpeg',
- '-y', '-an',
- '-f', 'rawvideo',
- '-vcodec','rawvideo',
- '-pix_fmt', 'bgr24', #像素格式
- '-s', "{}x{}".format(self.width, self.height),
- '-r', str(25),
- '-i', '-',
- '-pix_fmt', 'yuv420p',
- '-vcodec', "h264",
- #'-f' , 'flv',
- f'temp{self.opt.sessionid}.mp4']
- self._record_video_pipe = subprocess.Popen(command, shell=False, stdin=subprocess.PIPE)
- acommand = ['ffmpeg',
- '-y', '-vn',
- '-f', 's16le',
- #'-acodec','pcm_s16le',
- '-ac', '1',
- '-ar', '16000',
- '-i', '-',
- '-acodec', 'aac',
- #'-f' , 'wav',
- f'temp{self.opt.sessionid}.aac']
- self._record_audio_pipe = subprocess.Popen(acommand, shell=False, stdin=subprocess.PIPE)
- self.recording = True
- # self.recordq_video.queue.clear()
- # self.recordq_audio.queue.clear()
- # self.container = av.open(path, mode="w")
-
- # process_thread = Thread(target=self.record_frame, args=())
- # process_thread.start()
-
- def record_video_data(self,image):
- if self.width == 0:
- print("image.shape:",image.shape)
- self.height,self.width,_ = image.shape
- if self.recording:
- self._record_video_pipe.stdin.write(image.tostring())
- def record_audio_data(self,frame):
- if self.recording:
- self._record_audio_pipe.stdin.write(frame.tostring())
-
- # def record_frame(self):
- # videostream = self.container.add_stream("libx264", rate=25)
- # videostream.codec_context.time_base = Fraction(1, 25)
- # audiostream = self.container.add_stream("aac")
- # audiostream.codec_context.time_base = Fraction(1, 16000)
- # init = True
- # framenum = 0
- # while self.recording:
- # try:
- # videoframe = self.recordq_video.get(block=True, timeout=1)
- # videoframe.pts = framenum #int(round(framenum*0.04 / videostream.codec_context.time_base))
- # videoframe.dts = videoframe.pts
- # if init:
- # videostream.width = videoframe.width
- # videostream.height = videoframe.height
- # init = False
- # for packet in videostream.encode(videoframe):
- # self.container.mux(packet)
- # for k in range(2):
- # audioframe = self.recordq_audio.get(block=True, timeout=1)
- # audioframe.pts = int(round((framenum*2+k)*0.02 / audiostream.codec_context.time_base))
- # audioframe.dts = audioframe.pts
- # for packet in audiostream.encode(audioframe):
- # self.container.mux(packet)
- # framenum += 1
- # except queue.Empty:
- # print('record queue empty,')
- # continue
- # except Exception as e:
- # print(e)
- # #break
- # for packet in videostream.encode(None):
- # self.container.mux(packet)
- # for packet in audiostream.encode(None):
- # self.container.mux(packet)
- # self.container.close()
- # self.recordq_video.queue.clear()
- # self.recordq_audio.queue.clear()
- # print('record thread stop')
-
- def stop_recording(self):
- """停止录制视频"""
- if not self.recording:
- return
- self.recording = False
- self._record_video_pipe.stdin.close() #wait()
- self._record_video_pipe.wait()
- self._record_audio_pipe.stdin.close()
- self._record_audio_pipe.wait()
- cmd_combine_audio = f"ffmpeg -y -i temp{self.opt.sessionid}.aac -i temp{self.opt.sessionid}.mp4 -c:v copy -c:a copy data/record.mp4"
- os.system(cmd_combine_audio)
- #os.remove(output_path)
- def mirror_index(self,size, index):
- #size = len(self.coord_list_cycle)
- turn = index // size
- res = index % size
- if turn % 2 == 0:
- return res
- else:
- return size - res - 1
-
- def get_audio_stream(self,audiotype):
- idx = self.custom_audio_index[audiotype]
- stream = self.custom_audio_cycle[audiotype][idx:idx+self.chunk]
- self.custom_audio_index[audiotype] += self.chunk
- if self.custom_audio_index[audiotype]>=self.custom_audio_cycle[audiotype].shape[0]:
- self.curr_state = 1 #当前视频不循环播放,切换到静音状态
- return stream
-
- def set_custom_state(self,audiotype, reinit=True):
- print('set_custom_state:',audiotype)
- if self.custom_audio_index.get(audiotype) is None:
- return
- self.curr_state = audiotype
- if reinit:
- self.custom_audio_index[audiotype] = 0
- self.custom_index[audiotype] = 0
- def process_frames(self,quit_event,loop=None,audio_track=None,video_track=None):
- enable_transition = False # 设置为False禁用过渡效果,True启用
-
- if enable_transition:
- _last_speaking = False
- _transition_start = time.time()
- _transition_duration = 0.1 # 过渡时间
- _last_silent_frame = None # 静音帧缓存
- _last_speaking_frame = None # 说话帧缓存
-
- if self.opt.transport=='virtualcam':
- import pyvirtualcam
- vircam = None
- audio_tmp = queue.Queue(maxsize=3000)
- audio_thread = Thread(target=play_audio, args=(quit_event,audio_tmp,), daemon=True, name="pyaudio_stream")
- audio_thread.start()
-
- while not quit_event.is_set():
- try:
- res_frame,idx,audio_frames = self.res_frame_queue.get(block=True, timeout=1)
- except queue.Empty:
- continue
-
- if enable_transition:
- # 检测状态变化
- current_speaking = not (audio_frames[0][1]!=0 and audio_frames[1][1]!=0)
- if current_speaking != _last_speaking:
- logger.info(f"状态切换:{'说话' if _last_speaking else '静音'} → {'说话' if current_speaking else '静音'}")
- _transition_start = time.time()
- _last_speaking = current_speaking
- if audio_frames[0][1]!=0 and audio_frames[1][1]!=0: #全为静音数据,只需要取fullimg
- self.speaking = False
- audiotype = audio_frames[0][1]
- if self.custom_index.get(audiotype) is not None: #有自定义视频
- mirindex = self.mirror_index(len(self.custom_img_cycle[audiotype]),self.custom_index[audiotype])
- target_frame = self.custom_img_cycle[audiotype][mirindex]
- self.custom_index[audiotype] += 1
- else:
- target_frame = self.frame_list_cycle[idx]
-
- if enable_transition:
- # 说话→静音过渡
- if time.time() - _transition_start < _transition_duration and _last_speaking_frame is not None:
- alpha = min(1.0, (time.time() - _transition_start) / _transition_duration)
- combine_frame = cv2.addWeighted(_last_speaking_frame, 1-alpha, target_frame, alpha, 0)
- else:
- combine_frame = target_frame
- # 缓存静音帧
- _last_silent_frame = combine_frame.copy()
- else:
- combine_frame = target_frame
- else:
- self.speaking = True
- try:
- current_frame = self.paste_back_frame(res_frame,idx)
- except Exception as e:
- logger.warning(f"paste_back_frame error: {e}")
- continue
- if enable_transition:
- # 静音→说话过渡
- if time.time() - _transition_start < _transition_duration and _last_silent_frame is not None:
- alpha = min(1.0, (time.time() - _transition_start) / _transition_duration)
- combine_frame = cv2.addWeighted(_last_silent_frame, 1-alpha, current_frame, alpha, 0)
- else:
- combine_frame = current_frame
- # 缓存说话帧
- _last_speaking_frame = combine_frame.copy()
- else:
- combine_frame = current_frame
- cv2.putText(combine_frame, "LiveTalking", (10, 20), cv2.FONT_HERSHEY_SIMPLEX, 0.3, (128,128,128), 1)
- if self.opt.transport=='virtualcam':
- if vircam==None:
- height, width,_= combine_frame.shape
- vircam = pyvirtualcam.Camera(width=width, height=height, fps=25, fmt=pyvirtualcam.PixelFormat.BGR,print_fps=True)
- vircam.send(combine_frame)
- else: #webrtc
- image = combine_frame
- new_frame = VideoFrame.from_ndarray(image, format="bgr24")
- asyncio.run_coroutine_threadsafe(video_track._queue.put((new_frame,None)), loop)
- self.record_video_data(combine_frame)
- for audio_frame in audio_frames:
- frame,type,eventpoint = audio_frame
- frame = (frame * 32767).astype(np.int16)
- if self.opt.transport=='virtualcam':
- audio_tmp.put(frame.tobytes()) #TODO
- else: #webrtc
- new_frame = AudioFrame(format='s16', layout='mono', samples=frame.shape[0])
- new_frame.planes[0].update(frame.tobytes())
- new_frame.sample_rate=16000
- asyncio.run_coroutine_threadsafe(audio_track._queue.put((new_frame,eventpoint)), loop)
- self.record_audio_data(frame)
- if self.opt.transport=='virtualcam':
- vircam.sleep_until_next_frame()
- if self.opt.transport=='virtualcam':
- audio_thread.join()
- vircam.close()
- logger.info('basereal process_frames thread stop')
-
- # def process_custom(self,audiotype:int,idx:int):
- # if self.curr_state!=audiotype: #从推理切到口播
- # if idx in self.switch_pos: #在卡点位置可以切换
- # self.curr_state=audiotype
- # self.custom_index=0
- # else:
- # self.custom_index+=1
|