zwy 14 ore fa
commit
8e40013afe
100 ha cambiato i file con 3196 aggiunte e 0 eliminazioni
  1. 8 0
      .idea/.gitignore
  2. 8 0
      .idea/Digital_Human.iml
  3. 6 0
      .idea/inspectionProfiles/Project_Default.xml
  4. 6 0
      .idea/inspectionProfiles/profiles_settings.xml
  5. 7 0
      .idea/misc.xml
  6. 8 0
      .idea/modules.xml
  7. 7 0
      .idea/vcs.xml
  8. 92 0
      Dockerfile
  9. BIN
      Image_Analysis/wav/wav/hifi_clone_20260511_135734.wav
  10. BIN
      Image_Analysis/wav/wav/hifi_clone_20260511_140012.wav
  11. BIN
      Image_Analysis/wav/wav/hifi_clone_20260511_141931.wav
  12. BIN
      Image_Analysis/wav/wav/hifi_clone_20260512_093222.wav
  13. BIN
      Image_Analysis/wav/wav/hifi_clone_20260512_132911.wav
  14. 13 0
      Image_Analysis/wav_cleaner.log
  15. 151 0
      Image_Analysis/wav_cleaner.py
  16. 14 0
      Image_Analysis/wav_cleaner_nohup.log
  17. 201 0
      LICENSE
  18. 1075 0
      app.py
  19. BIN
      assets/LiveTalking-logo.jpg
  20. BIN
      assets/dataflow.png
  21. BIN
      assets/demo.mp4
  22. 65 0
      assets/faq.md
  23. BIN
      assets/main.png
  24. BIN
      assets/qrcode-wechat.jpg
  25. 243 0
      audioplayer.py
  26. BIN
      back/rvm_resnet50.pth
  27. BIN
      back/video/coco_greenscreen.mp4
  28. 239 0
      back/video_to_greenscreen.py
  29. 88 0
      baseasr.py
  30. 965 0
      basereal.py
  31. BIN
      coco.mp4
  32. 0 0
      data/avatars/.gitkeep
  33. BIN
      data/avatars/wav2lip256_avatar1/coords.pkl
  34. BIN
      data/avatars/wav2lip256_avatar1/face_imgs/00000000.png
  35. BIN
      data/avatars/wav2lip256_avatar1/face_imgs/00000001.png
  36. BIN
      data/avatars/wav2lip256_avatar1/face_imgs/00000002.png
  37. BIN
      data/avatars/wav2lip256_avatar1/face_imgs/00000003.png
  38. BIN
      data/avatars/wav2lip256_avatar1/face_imgs/00000004.png
  39. BIN
      data/avatars/wav2lip256_avatar1/face_imgs/00000005.png
  40. BIN
      data/avatars/wav2lip256_avatar1/face_imgs/00000006.png
  41. BIN
      data/avatars/wav2lip256_avatar1/face_imgs/00000007.png
  42. BIN
      data/avatars/wav2lip256_avatar1/face_imgs/00000008.png
  43. BIN
      data/avatars/wav2lip256_avatar1/face_imgs/00000009.png
  44. BIN
      data/avatars/wav2lip256_avatar1/face_imgs/00000010.png
  45. BIN
      data/avatars/wav2lip256_avatar1/face_imgs/00000011.png
  46. BIN
      data/avatars/wav2lip256_avatar1/face_imgs/00000012.png
  47. BIN
      data/avatars/wav2lip256_avatar1/face_imgs/00000013.png
  48. BIN
      data/avatars/wav2lip256_avatar1/face_imgs/00000014.png
  49. BIN
      data/avatars/wav2lip256_avatar1/face_imgs/00000015.png
  50. BIN
      data/avatars/wav2lip256_avatar1/face_imgs/00000016.png
  51. BIN
      data/avatars/wav2lip256_avatar1/face_imgs/00000017.png
  52. BIN
      data/avatars/wav2lip256_avatar1/face_imgs/00000018.png
  53. BIN
      data/avatars/wav2lip256_avatar1/face_imgs/00000019.png
  54. BIN
      data/avatars/wav2lip256_avatar1/face_imgs/00000020.png
  55. BIN
      data/avatars/wav2lip256_avatar1/face_imgs/00000021.png
  56. BIN
      data/avatars/wav2lip256_avatar1/face_imgs/00000022.png
  57. BIN
      data/avatars/wav2lip256_avatar1/face_imgs/00000023.png
  58. BIN
      data/avatars/wav2lip256_avatar1/face_imgs/00000024.png
  59. BIN
      data/avatars/wav2lip256_avatar1/face_imgs/00000025.png
  60. BIN
      data/avatars/wav2lip256_avatar1/face_imgs/00000026.png
  61. BIN
      data/avatars/wav2lip256_avatar1/face_imgs/00000027.png
  62. BIN
      data/avatars/wav2lip256_avatar1/face_imgs/00000028.png
  63. BIN
      data/avatars/wav2lip256_avatar1/face_imgs/00000029.png
  64. BIN
      data/avatars/wav2lip256_avatar1/face_imgs/00000030.png
  65. BIN
      data/avatars/wav2lip256_avatar1/face_imgs/00000031.png
  66. BIN
      data/avatars/wav2lip256_avatar1/face_imgs/00000032.png
  67. BIN
      data/avatars/wav2lip256_avatar1/face_imgs/00000033.png
  68. BIN
      data/avatars/wav2lip256_avatar1/face_imgs/00000034.png
  69. BIN
      data/avatars/wav2lip256_avatar1/face_imgs/00000035.png
  70. BIN
      data/avatars/wav2lip256_avatar1/face_imgs/00000036.png
  71. BIN
      data/avatars/wav2lip256_avatar1/face_imgs/00000037.png
  72. BIN
      data/avatars/wav2lip256_avatar1/face_imgs/00000038.png
  73. BIN
      data/avatars/wav2lip256_avatar1/face_imgs/00000039.png
  74. BIN
      data/avatars/wav2lip256_avatar1/face_imgs/00000040.png
  75. BIN
      data/avatars/wav2lip256_avatar1/face_imgs/00000041.png
  76. BIN
      data/avatars/wav2lip256_avatar1/face_imgs/00000042.png
  77. BIN
      data/avatars/wav2lip256_avatar1/face_imgs/00000043.png
  78. BIN
      data/avatars/wav2lip256_avatar1/face_imgs/00000044.png
  79. BIN
      data/avatars/wav2lip256_avatar1/face_imgs/00000045.png
  80. BIN
      data/avatars/wav2lip256_avatar1/face_imgs/00000046.png
  81. BIN
      data/avatars/wav2lip256_avatar1/face_imgs/00000047.png
  82. BIN
      data/avatars/wav2lip256_avatar1/face_imgs/00000048.png
  83. BIN
      data/avatars/wav2lip256_avatar1/face_imgs/00000049.png
  84. BIN
      data/avatars/wav2lip256_avatar1/face_imgs/00000050.png
  85. BIN
      data/avatars/wav2lip256_avatar1/face_imgs/00000051.png
  86. BIN
      data/avatars/wav2lip256_avatar1/face_imgs/00000052.png
  87. BIN
      data/avatars/wav2lip256_avatar1/face_imgs/00000053.png
  88. BIN
      data/avatars/wav2lip256_avatar1/face_imgs/00000054.png
  89. BIN
      data/avatars/wav2lip256_avatar1/face_imgs/00000055.png
  90. BIN
      data/avatars/wav2lip256_avatar1/face_imgs/00000056.png
  91. BIN
      data/avatars/wav2lip256_avatar1/face_imgs/00000057.png
  92. BIN
      data/avatars/wav2lip256_avatar1/face_imgs/00000058.png
  93. BIN
      data/avatars/wav2lip256_avatar1/face_imgs/00000059.png
  94. BIN
      data/avatars/wav2lip256_avatar1/face_imgs/00000060.png
  95. BIN
      data/avatars/wav2lip256_avatar1/face_imgs/00000061.png
  96. BIN
      data/avatars/wav2lip256_avatar1/face_imgs/00000062.png
  97. BIN
      data/avatars/wav2lip256_avatar1/face_imgs/00000063.png
  98. BIN
      data/avatars/wav2lip256_avatar1/face_imgs/00000064.png
  99. BIN
      data/avatars/wav2lip256_avatar1/face_imgs/00000065.png
  100. BIN
      data/avatars/wav2lip256_avatar1/face_imgs/00000066.png

+ 8 - 0
.idea/.gitignore

@@ -0,0 +1,8 @@
+# 默认忽略的文件
+/shelf/
+/workspace.xml
+# 基于编辑器的 HTTP 客户端请求
+/httpRequests/
+# Datasource local storage ignored files
+/dataSources/
+/dataSources.local.xml

+ 8 - 0
.idea/Digital_Human.iml

@@ -0,0 +1,8 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<module type="PYTHON_MODULE" version="4">
+  <component name="NewModuleRootManager">
+    <content url="file://$MODULE_DIR$" />
+    <orderEntry type="jdk" jdkName="Python 3.9" jdkType="Python SDK" />
+    <orderEntry type="sourceFolder" forTests="false" />
+  </component>
+</module>

+ 6 - 0
.idea/inspectionProfiles/Project_Default.xml

@@ -0,0 +1,6 @@
+<component name="InspectionProjectProfileManager">
+  <profile version="1.0">
+    <option name="myName" value="Project Default" />
+    <inspection_tool class="Eslint" enabled="true" level="WARNING" enabled_by_default="true" />
+  </profile>
+</component>

+ 6 - 0
.idea/inspectionProfiles/profiles_settings.xml

@@ -0,0 +1,6 @@
+<component name="InspectionProjectProfileManager">
+  <settings>
+    <option name="USE_PROJECT_PROFILE" value="false" />
+    <version value="1.0" />
+  </settings>
+</component>

+ 7 - 0
.idea/misc.xml

@@ -0,0 +1,7 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+  <component name="Black">
+    <option name="sdkName" value="Python 3.9" />
+  </component>
+  <component name="ProjectRootManager" version="2" project-jdk-name="Python 3.9" project-jdk-type="Python SDK" />
+</project>

+ 8 - 0
.idea/modules.xml

@@ -0,0 +1,8 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+  <component name="ProjectModuleManager">
+    <modules>
+      <module fileurl="file://$PROJECT_DIR$/.idea/Digital_Human.iml" filepath="$PROJECT_DIR$/.idea/Digital_Human.iml" />
+    </modules>
+  </component>
+</project>

+ 7 - 0
.idea/vcs.xml

@@ -0,0 +1,7 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+  <component name="VcsDirectoryMappings">
+    <mapping directory="" vcs="Git" />
+    <mapping directory="$PROJECT_DIR$/back/rvm_temp" vcs="Git" />
+  </component>
+</project>

+ 92 - 0
Dockerfile

@@ -0,0 +1,92 @@
+# 使用多阶段构建优化镜像大小
+# 第一阶段:构建阶段
+FROM nvcr.io/nvidia/cuda:12.4.1-cudnn8-devel-ubuntu22.04 AS builder
+
+# 设置环境变量
+ENV PYTHONDONTWRITEBYTECODE=1
+ENV PYTHONUNBUFFERED=1
+ENV DEBIAN_FRONTEND=noninteractive
+
+# 安装系统依赖
+RUN apt-get update && apt-get install -y --no-install-recommends \
+    wget \
+    curl \
+    git \
+    vim \
+    ffmpeg \
+    libsm6 \
+    libxext6 \
+    libxrender-dev \
+    libglib2.0-0 \
+    && rm -rf /var/lib/apt/lists/*
+
+# 安装Miniconda
+RUN wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh -O miniconda.sh \
+    && bash miniconda.sh -b -p /opt/conda \
+    && rm miniconda.sh
+
+# 设置conda环境
+ENV PATH=/opt/conda/bin:$PATH
+
+# 创建conda环境
+RUN conda create -n livetalking python=3.10 -y
+
+# 激活conda环境
+ENV PATH=/opt/conda/envs/livetalking/bin:$PATH
+
+# 设置pip镜像源
+RUN pip config set global.index-url https://pypi.tuna.tsinghua.edu.cn/simple
+
+# 安装PyTorch和依赖
+RUN pip install torch==2.5.0 torchvision==0.20.0 torchaudio==2.5.0 --index-url https://download.pytorch.org/whl/cu124
+
+# 复制requirements文件
+COPY requirements.txt .
+
+# 安装Python依赖
+RUN pip install -r requirements.txt
+
+# 第二阶段:运行时阶段
+FROM nvcr.io/nvidia/cuda:12.4.1-runtime-ubuntu22.04
+
+# 设置环境变量
+ENV PYTHONDONTWRITEBYTECODE=1
+ENV PYTHONUNBUFFERED=1
+ENV DEBIAN_FRONTEND=noninteractive
+
+# 安装运行时依赖
+RUN apt-get update && apt-get install -y --no-install-recommends \
+    ffmpeg \
+    libsm6 \
+    libxext6 \
+    libxrender-dev \
+    libglib2.0-0 \
+    && rm -rf /var/lib/apt/lists/*
+
+# 从构建阶段复制conda环境
+COPY --from=builder /opt/conda /opt/conda
+
+# 设置环境变量
+ENV PATH=/opt/conda/envs/livetalking/bin:$PATH
+
+# 创建工作目录
+WORKDIR /app
+
+# 复制项目文件
+COPY . /app/
+
+# 创建模型目录
+RUN mkdir -p /app/models
+
+# 设置权限
+RUN chmod +x /app/app.py
+
+# 暴露端口
+EXPOSE 7860 8080
+
+# 健康检查
+HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
+    CMD curl -f http://localhost:7860/health || exit 1
+
+# 启动命令
+CMD ["python", "app.py"]

BIN
Image_Analysis/wav/wav/hifi_clone_20260511_135734.wav


BIN
Image_Analysis/wav/wav/hifi_clone_20260511_140012.wav


BIN
Image_Analysis/wav/wav/hifi_clone_20260511_141931.wav


BIN
Image_Analysis/wav/wav/hifi_clone_20260512_093222.wav


BIN
Image_Analysis/wav/wav/hifi_clone_20260512_132911.wav


+ 13 - 0
Image_Analysis/wav_cleaner.log

@@ -0,0 +1,13 @@
+2026-05-09 09:25:06,083 - INFO - 🎯 WAV 文件夹监控器已初始化
+2026-05-09 09:25:06,083 - INFO -    监听目录: /mnt/nvme1data/Digital_Human/Image_Analysis/wav/wav
+2026-05-09 09:25:06,083 - INFO -    最大文件数: 20
+2026-05-09 09:25:06,083 - INFO -    检查间隔: 3600秒
+2026-05-09 09:25:06,083 - INFO - 🚀 开始监控 WAV 文件夹...
+2026-05-09 09:25:06,083 - INFO - 按 Ctrl+C 停止监控
+2026-05-09 09:25:06,084 - INFO - 📊 当前状态: 5 个文件 | 最新: hifi_clone_20260508_171600.wav (969.1分钟前)
+2026-05-09 10:25:06,184 - INFO - 📊 当前状态: 5 个文件 | 最新: hifi_clone_20260508_171600.wav (1029.1分钟前)
+2026-05-09 11:25:06,285 - INFO - 📊 当前状态: 5 个文件 | 最新: hifi_clone_20260508_171600.wav (1089.1分钟前)
+2026-05-09 12:25:06,293 - INFO - 📊 当前状态: 5 个文件 | 最新: hifi_clone_20260508_171600.wav (1149.1分钟前)
+2026-05-09 13:25:06,297 - INFO - 📊 当前状态: 5 个文件 | 最新: hifi_clone_20260508_171600.wav (1209.1分钟前)
+2026-05-09 14:25:06,398 - INFO - 📊 当前状态: 2 个文件 | 最新: hifi_clone_20260509_140656.wav (18.2分钟前)
+2026-05-09 15:25:06,496 - INFO - 📊 当前状态: 2 个文件 | 最新: hifi_clone_20260509_140656.wav (78.2分钟前)

+ 151 - 0
Image_Analysis/wav_cleaner.py

@@ -0,0 +1,151 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+WAV 文件夹自动清理监控器
+实时监控 /mnt/nvme1data/Digital_Human/Image_Analysis/wav/wav 目录
+当 WAV 文件数量超过 20 条时,自动删除最旧的文件
+"""
+
+import os
+import time
+import logging
+from pathlib import Path
+from datetime import datetime
+
+# 配置日志
+logging.basicConfig(
+    level=logging.INFO,
+    format='%(asctime)s - %(levelname)s - %(message)s',
+    handlers=[
+        logging.FileHandler('/mnt/nvme1data/Digital_Human/Image_Analysis/wav_cleaner.log'),
+        logging.StreamHandler()
+    ]
+)
+logger = logging.getLogger(__name__)
+
+
+class WavFolderMonitor:
+    """WAV 文件夹监控器"""
+    
+    def __init__(self, watch_dir, max_files=20, check_interval=5):
+        """
+        初始化监控器
+        
+        Args:
+            watch_dir: 监听的目录路径
+            max_files: 最大文件数量(默认20)
+            check_interval: 检查间隔(秒,默认5秒)
+        """
+        self.watch_dir = Path(watch_dir)
+        self.max_files = max_files
+        self.check_interval = check_interval
+        
+        # 确保目录存在
+        self.watch_dir.mkdir(parents=True, exist_ok=True)
+        
+        logger.info(f"🎯 WAV 文件夹监控器已初始化")
+        logger.info(f"   监听目录: {self.watch_dir}")
+        logger.info(f"   最大文件数: {self.max_files}")
+        logger.info(f"   检查间隔: {self.check_interval}秒")
+    
+    def get_wav_files(self):
+        """获取所有 WAV 文件,按修改时间排序"""
+        wav_files = list(self.watch_dir.glob('*.wav'))
+        # 按修改时间排序(最新的在后)
+        wav_files.sort(key=lambda p: p.stat().st_mtime)
+        return wav_files
+    
+    def cleanup_old_files(self):
+        """清理超过限制的最旧文件"""
+        try:
+            wav_files = self.get_wav_files()
+            file_count = len(wav_files)
+            
+            if file_count > self.max_files:
+                # 需要删除的文件数量
+                delete_count = file_count - self.max_files
+                
+                logger.warning(f"⚠️ 文件数量 ({file_count}) 超过限制 ({self.max_files})")
+                logger.warning(f"   准备删除最旧的 {delete_count} 个文件...")
+                
+                # 删除最旧的文件
+                for i in range(delete_count):
+                    old_file = wav_files[i]
+                    try:
+                        file_size = old_file.stat().st_size
+                        file_age = time.time() - old_file.stat().st_mtime
+                        
+                        logger.info(f"   🗑️ 删除: {old_file.name}")
+                        logger.info(f"      大小: {file_size / 1024:.2f} KB")
+                        logger.info(f"      年龄: {file_age / 60:.1f} 分钟")
+                        
+                        old_file.unlink()
+                        
+                    except Exception as e:
+                        logger.error(f"   ❌ 删除失败 {old_file.name}: {e}")
+                
+                logger.info(f"✅ 清理完成,剩余文件: {self.max_files}")
+                return delete_count
+            else:
+                logger.debug(f"✓ 文件数量正常: {file_count}/{self.max_files}")
+                return 0
+                
+        except Exception as e:
+            logger.error(f"❌ 清理文件时出错: {e}")
+            return 0
+    
+    def monitor(self):
+        """开始监控循环"""
+        logger.info("🚀 开始监控 WAV 文件夹...")
+        logger.info("按 Ctrl+C 停止监控")
+        
+        check_count = 0
+        
+        try:
+            while True:
+                check_count += 1
+                logger.debug(f"--- 第 {check_count} 次检查 ---")
+                
+                # 执行清理
+                deleted = self.cleanup_old_files()
+                
+                # 每次清理后显示当前状态
+                current_files = self.get_wav_files()
+                if current_files:
+                    newest = current_files[-1]
+                    newest_age = (time.time() - newest.stat().st_mtime) / 60
+                    logger.info(f"📊 当前状态: {len(current_files)} 个文件 | 最新: {newest.name} ({newest_age:.1f}分钟前)")
+                else:
+                    logger.info("📊 当前状态: 0 个文件")
+                
+                # 等待下一次检查
+                time.sleep(self.check_interval)
+                
+        except KeyboardInterrupt:
+            logger.info("\n🛑 收到停止信号,监控已停止")
+        except Exception as e:
+            logger.error(f"❌ 监控过程中出错: {e}")
+            raise
+
+
+def main():
+    """主函数"""
+    # 配置路径
+    watch_dir = '/mnt/nvme1data/Digital_Human/Image_Analysis/wav/wav'
+    max_files = 20
+    check_interval = 3600  # 每小时检查一次
+    
+    # 创建监控器
+    monitor = WavFolderMonitor(
+        watch_dir=watch_dir,
+        max_files=max_files,
+        check_interval=check_interval
+    )
+    
+    # 启动监控
+    monitor.monitor()
+
+
+if __name__ == "__main__":
+    main()
+

+ 14 - 0
Image_Analysis/wav_cleaner_nohup.log

@@ -0,0 +1,14 @@
+nohup: ignoring input
+2026-05-09 09:25:06,083 - INFO - 🎯 WAV 文件夹监控器已初始化
+2026-05-09 09:25:06,083 - INFO -    监听目录: /mnt/nvme1data/Digital_Human/Image_Analysis/wav/wav
+2026-05-09 09:25:06,083 - INFO -    最大文件数: 20
+2026-05-09 09:25:06,083 - INFO -    检查间隔: 3600秒
+2026-05-09 09:25:06,083 - INFO - 🚀 开始监控 WAV 文件夹...
+2026-05-09 09:25:06,083 - INFO - 按 Ctrl+C 停止监控
+2026-05-09 09:25:06,084 - INFO - 📊 当前状态: 5 个文件 | 最新: hifi_clone_20260508_171600.wav (969.1分钟前)
+2026-05-09 10:25:06,184 - INFO - 📊 当前状态: 5 个文件 | 最新: hifi_clone_20260508_171600.wav (1029.1分钟前)
+2026-05-09 11:25:06,285 - INFO - 📊 当前状态: 5 个文件 | 最新: hifi_clone_20260508_171600.wav (1089.1分钟前)
+2026-05-09 12:25:06,293 - INFO - 📊 当前状态: 5 个文件 | 最新: hifi_clone_20260508_171600.wav (1149.1分钟前)
+2026-05-09 13:25:06,297 - INFO - 📊 当前状态: 5 个文件 | 最新: hifi_clone_20260508_171600.wav (1209.1分钟前)
+2026-05-09 14:25:06,398 - INFO - 📊 当前状态: 2 个文件 | 最新: hifi_clone_20260509_140656.wav (18.2分钟前)
+2026-05-09 15:25:06,496 - INFO - 📊 当前状态: 2 个文件 | 最新: hifi_clone_20260509_140656.wav (78.2分钟前)

+ 201 - 0
LICENSE

@@ -0,0 +1,201 @@
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "[]"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright [livetalking@lipku]
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.

+ 1075 - 0
app.py

@@ -0,0 +1,1075 @@
+# server.py
+import flask
+from flask import Flask, render_template,send_from_directory,request, jsonify
+from flask_sockets import Sockets
+import base64
+import json
+#import gevent
+#from gevent import pywsgi
+#from geventwebsocket.handler import WebSocketHandler
+import re
+import numpy as np
+from threading import Thread,Event
+#import multiprocessing
+
+# 禁用 TorchDynamo 编译(避免 VoxCPM2 兼容性问题)
+import os
+os.environ['TORCHDYNAMO_DISABLE'] = '1'
+import torch
+if hasattr(torch, '_dynamo'):
+    torch._dynamo.config.suppress_errors = True
+import torch.multiprocessing as mp
+
+from aiohttp import web
+import aiohttp
+import aiohttp_cors
+from aiortc import RTCPeerConnection, RTCSessionDescription,RTCIceServer,RTCConfiguration
+from aiortc.rtcrtpsender import RTCRtpSender
+from webrtc import HumanPlayer
+from basereal import BaseReal
+from llm import llm_response
+
+import argparse
+import random
+import shutil
+import asyncio
+import torch
+import os
+import socket
+import signal
+import sys
+from typing import Dict
+from logger import logger
+import gc
+
+
+app = Flask(__name__)
+#sockets = Sockets(app)
+nerfreals:Dict[int, BaseReal] = {} #sessionid:BaseReal
+opt = None
+model = None
+avatar = None
+        
+
+#####webrtc###############################
+pcs = set()
+# 记录本次运行创建的 avatar 目录,用于退出时清理
+_avatar_dirs_to_clean = set()
+_enable_avatar_cleanup = False
+
+def randN(N)->int:
+    '''生成长度为 N的随机数 '''
+    min = pow(10, N - 1)
+    max = pow(10, N)
+    return random.randint(min, max - 1)
+
+def build_nerfreal(sessionid:int)->BaseReal:
+    opt.sessionid=sessionid
+    # 记录本次运行使用的 avatar 目录
+    if hasattr(opt, 'avatar_id') and opt.avatar_id:
+        avatar_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'data', 'avatars', opt.avatar_id)
+        if os.path.exists(avatar_dir):
+            _avatar_dirs_to_clean.add(avatar_dir)
+            logger.info(f'记录 avatar 目录用于退出时清理: {avatar_dir}')
+    
+    if opt.model == 'wav2lip':
+        from lipreal import LipReal
+        nerfreal = LipReal(opt,model,avatar)
+    elif opt.model == 'musetalk':
+        from musereal import MuseReal
+        nerfreal = MuseReal(opt,model,avatar)
+    # elif opt.model == 'ernerf':
+    #     from nerfreal import NeRFReal
+    #     nerfreal = NeRFReal(opt,model,avatar)
+    elif opt.model == 'ultralight':
+        from lightreal import LightReal
+        nerfreal = LightReal(opt,model,avatar)
+    else:
+        raise ValueError(f"Unsupported model type: {opt.model}")
+    return nerfreal
+
+#@app.route('/offer', methods=['POST'])
+async def offer(request):
+    try:
+        params = await request.json()
+        offer = RTCSessionDescription(sdp=params["sdp"], type=params["type"])
+
+        # if len(nerfreals) >= opt.max_session:
+        #     logger.info('reach max session')
+        #     return web.Response(
+        #         content_type="application/json",
+        #         text=json.dumps(
+        #             {"code": -1, "msg": "reach max session"}
+        #         ),
+        #     )
+        sessionid = randN(6) #len(nerfreals)
+        nerfreals[sessionid] = None
+        logger.info('sessionid=%d, session num=%d',sessionid,len(nerfreals))
+        nerfreal = await asyncio.get_event_loop().run_in_executor(None, build_nerfreal,sessionid)
+        nerfreals[sessionid] = nerfreal
+        
+        # 内网/跳板机环境:不使用外网 STUN(内网访问不到),只用 host 候选
+        # 通过环境变量 WEBRTC_NAT_IP 指定跳板机对外暴露的 IP(浏览器能访问的 IP)
+        nat_public_ip = os.environ.get('WEBRTC_NAT_IP', '')
+        if nat_public_ip:
+            logger.info('Using NAT public IP for ICE: %s', nat_public_ip)
+            ice_servers = []  # 内网不用 STUN
+        else:
+            # 使用多个 STUN 服务器(包括国内和国外的)
+            ice_servers = [
+                RTCIceServer(urls='stun:stun.l.google.com:19302'),
+                RTCIceServer(urls='stun:stun1.l.google.com:19302'),
+                RTCIceServer(urls='stun:stun2.l.google.com:19302'),
+                RTCIceServer(urls='stun:stun3.l.google.com:19302'),
+                RTCIceServer(urls='stun:stun4.l.google.com:19302'),
+            ]
+        pc = RTCPeerConnection(configuration=RTCConfiguration(iceServers=ice_servers))
+        pcs.add(pc)
+
+        @pc.on("connectionstatechange")
+        async def on_connectionstatechange():
+            logger.info(f"Session {sessionid} - Connection state is {pc.connectionState}")
+            if pc.connectionState == "connected":
+                logger.info(f"Session {sessionid} - WebRTC connection established successfully!")
+            if pc.connectionState == "failed":
+                logger.error(f"Session {sessionid} - Connection failed!")
+                await pc.close()
+                pcs.discard(pc)
+                if sessionid in nerfreals:
+                    del nerfreals[sessionid]
+            if pc.connectionState == "closed":
+                logger.info(f"Session {sessionid} - Connection closed")
+                pcs.discard(pc)
+                if sessionid in nerfreals:
+                    del nerfreals[sessionid]
+                # gc.collect()
+
+        player = HumanPlayer(nerfreals[sessionid])
+        audio_sender = pc.addTrack(player.audio)
+        video_sender = pc.addTrack(player.video)
+        
+        # 记录轨道添加信息
+        logger.info(f"Added tracks for session {sessionid}: audio={player.audio}, video={player.video}")
+        
+        capabilities = RTCRtpSender.getCapabilities("video")
+        preferences = list(filter(lambda x: x.name == "H264", capabilities.codecs))
+        preferences += list(filter(lambda x: x.name == "VP8", capabilities.codecs))
+        preferences += list(filter(lambda x: x.name == "rtx", capabilities.codecs))
+        transceiver = pc.getTransceivers()[1]
+        transceiver.setCodecPreferences(preferences)
+
+        await pc.setRemoteDescription(offer)
+
+        answer = await pc.createAnswer()
+        await pc.setLocalDescription(answer)
+
+        sdp = pc.localDescription.sdp
+
+        # 跳板机内网场景:如果配置了 WEBRTC_NAT_IP,将 SDP 中的内网 IP 替换为跳板机对外 IP
+        # aioice 已经将真实绑定的端口(AIOICE_PORT_MIN~MAX 范围内)写入 SDP 的 a=candidate 行
+        # 我们只需要把内网 IP 替换成浏览器能访问的 IP 即可
+        nat_public_ip = os.environ.get('WEBRTC_NAT_IP', '')
+        if nat_public_ip:
+            try:
+                # 获取本机所有非 loopback IPv4 地址
+                local_ips = set()
+                bind_ip = os.environ.get('AIOICE_BIND_IP', '')
+                if bind_ip:
+                    local_ips.add(bind_ip)
+                else:
+                    hostname = socket.gethostname()
+                    for ip_info in socket.getaddrinfo(hostname, None, socket.AF_INET):
+                        ip = ip_info[4][0]
+                        if not ip.startswith('127.'):
+                            local_ips.add(ip)
+                    # 也枚举所有网卡 IP(getaddrinfo 可能不全)
+                    try:
+                        import ifaddr
+                        for adapter in ifaddr.get_adapters():
+                            for ip_obj in adapter.ips:
+                                if isinstance(ip_obj.ip, str) and not ip_obj.ip.startswith('127.'):
+                                    local_ips.add(ip_obj.ip)
+                    except Exception:
+                        pass
+
+                # 将 SDP 中所有内网 IP 替换为跳板机对外 IP
+                for local_ip in local_ips:
+                    sdp = sdp.replace(local_ip, nat_public_ip)
+                logger.info('NAT mapping: %s -> %s in SDP', local_ips, nat_public_ip)
+            except Exception as e:
+                logger.warning('Failed to apply NAT IP mapping to SDP: %s', e)
+        else:
+            logger.info('WEBRTC_NAT_IP not set. SDP uses server internal IPs. '
+                        'Set WEBRTC_NAT_IP=<jumphost_ip> if browser cannot reach server directly.')
+
+        return web.Response(
+            content_type="application/json",
+            text=json.dumps(
+                {"sdp": sdp, "type": pc.localDescription.type, "sessionid": sessionid}
+            ),
+        )
+    except Exception as e:
+        logger.exception('Error in offer:')
+        return web.Response(
+            content_type="application/json",
+            text=json.dumps({"code": -1, "msg": str(e)}),
+            status=500
+        )
+
+async def log(request):
+    """接收前端日志的接口"""
+    try:
+        params = await request.json()
+        log_type = params.get('type', 'info')
+        message = params.get('message', '')
+        logger.info('[WEBRTC] %s', message)
+    except Exception:
+        pass
+    return web.Response(
+        content_type="application/json",
+        text=json.dumps({"code": 0}),
+    )
+
+async def human(request):
+    try:
+        params = await request.json()
+
+        sessionid = params.get('sessionid',0)
+        if sessionid not in nerfreals or nerfreals[sessionid] is None:
+            return web.Response(
+                content_type="application/json",
+                text=json.dumps(
+                    {"code": -1, "msg": f"Session {sessionid} not found or not initialized"}
+                ),
+            )
+            
+        # 标记是否已经处理了聊天请求
+        chat_processed = False
+        
+        if params.get('interrupt'):
+            # 立即中断当前播放,无论是否是聊天请求
+            nerfreals[sessionid].flush_talk()
+            # 检查是否是用户在介绍过程中提问
+            if params['type'] == 'chat':
+                knowledge_base_type = params.get('knowledge_base', None)
+                during_intro = params.get('during_intro', False)
+                # 处理用户问题(异步执行,避免阻塞事件循环)
+                asyncio.get_event_loop().run_in_executor(
+                    None, 
+                    llm_response, 
+                    params['text'], 
+                    nerfreals[sessionid], 
+                    knowledge_base_type, 
+                    during_intro
+                )
+                chat_processed = True
+
+        if params['type']=='echo':
+            nerfreals[sessionid].put_msg_txt(params['text'])
+        elif params['type']=='chat' and not chat_processed:
+            # 如果没有中断标志,或者已经处理过聊天请求,则不再处理
+            knowledge_base_type = params.get('knowledge_base', None)
+            during_intro = params.get('during_intro', False)  # 是否在介绍过程中
+            asyncio.get_event_loop().run_in_executor(None, llm_response, params['text'], nerfreals[sessionid], knowledge_base_type, during_intro)                         
+            #nerfreals[sessionid].put_msg_txt(res)
+
+        return web.Response(
+            content_type="application/json",
+            text=json.dumps(
+                {"code": 0, "msg":"ok"}
+            ),
+        )
+    except Exception as e:
+        logger.exception('exception:')
+        return web.Response(
+            content_type="application/json",
+            text=json.dumps(
+                {"code": -1, "msg": str(e)}
+            ),
+        )
+
+async def api_chat(request):
+    """
+    独立聊天API - 让数字人说话/问答
+    请求体: {
+        "text": "用户说的话",
+        "type": "chat",           // "chat"=问答, "echo"=纯播报
+        "interrupt": true,        // 是否打断当前播放
+        "sessionid": 123456,      // WebRTC session ID
+        "knowledge_base": "",     // 可选: 指定知识库
+        "during_intro": false     // 可选: 是否在介绍过程中
+    }
+    """
+    try:
+        params = await request.json()
+        
+        # 参数验证
+        text = params.get('text', '').strip()
+        if not text:
+            return web.Response(
+                content_type="application/json",
+                text=json.dumps({"code": -1, "msg": "text parameter is required"}),
+            )
+        
+        sessionid = params.get('sessionid', 0)
+        if sessionid not in nerfreals or nerfreals[sessionid] is None:
+            return web.Response(
+                content_type="application/json",
+                text=json.dumps(
+                    {"code": -1, "msg": f"Session {sessionid} not found or not initialized"}
+                ),
+            )
+        
+        msg_type = params.get('type', 'chat')
+        interrupt = params.get('interrupt', True)
+        knowledge_base = params.get('knowledge_base', None)
+        during_intro = params.get('during_intro', False)
+        
+        logger.info(f"API Chat - Session {sessionid}, Type: {msg_type}, Text: {text[:50]}...")
+        
+        # 处理中断
+        if interrupt:
+            nerfreals[sessionid].flush_talk()
+        
+        # 根据类型处理
+        if msg_type == 'echo':
+            # 纯播报模式 - 直接播放文本
+            nerfreals[sessionid].put_msg_txt(text)
+        else:
+            # 聊天/问答模式 - 走LLM
+            asyncio.get_event_loop().run_in_executor(
+                None,
+                llm_response,
+                text,
+                nerfreals[sessionid],
+                knowledge_base,
+                during_intro
+            )
+        
+        return web.Response(
+            content_type="application/json",
+            text=json.dumps({
+                "code": 0,
+                "msg": "ok",
+                "data": {
+                    "sessionid": sessionid,
+                    "type": msg_type,
+                    "text": text[:100]  # 返回前100字符用于确认
+                }
+            }),
+        )
+        
+    except Exception as e:
+        logger.exception('API Chat exception:')
+        return web.Response(
+            content_type="application/json",
+            text=json.dumps({"code": -1, "msg": str(e)}),
+        )
+
+async def interrupt_talk(request):
+    try:
+        params = await request.json()
+
+        sessionid = params.get('sessionid',0)
+        if sessionid not in nerfreals or nerfreals[sessionid] is None:
+            return web.Response(
+                content_type="application/json",
+                text=json.dumps(
+                    {"code": -1, "msg": f"Session {sessionid} not found or not initialized"}
+                ),
+            )
+        nerfreals[sessionid].flush_talk()
+        
+        return web.Response(
+            content_type="application/json",
+            text=json.dumps(
+                {"code": 0, "msg":"ok"}
+            ),
+        )
+    except Exception as e:
+        logger.exception('exception:')
+        return web.Response(
+            content_type="application/json",
+            text=json.dumps(
+                {"code": -1, "msg": str(e)}
+            ),
+        )
+
+async def humanaudio(request):
+    try:
+        form= await request.post()
+        sessionid = int(form.get('sessionid',0))
+        if sessionid not in nerfreals or nerfreals[sessionid] is None:
+            return web.Response(
+                content_type="application/json",
+                text=json.dumps(
+                    {"code": -1, "msg": f"Session {sessionid} not found or not initialized"}
+                ),
+            )
+        fileobj = form["file"]
+        filename=fileobj.filename
+        filebytes=fileobj.file.read()
+        nerfreals[sessionid].put_audio_file(filebytes)
+
+        return web.Response(
+            content_type="application/json",
+            text=json.dumps(
+                {"code": 0, "msg":"ok"}
+            ),
+        )
+    except Exception as e:
+        logger.exception('exception:')
+        return web.Response(
+            content_type="application/json",
+            text=json.dumps(
+                {"code": -1, "msg": str(e)}
+            ),
+        )
+
+async def set_audiotype(request):
+    try:
+        params = await request.json()
+
+        sessionid = params.get('sessionid',0)    
+        if sessionid not in nerfreals or nerfreals[sessionid] is None:
+            return web.Response(
+                content_type="application/json",
+                text=json.dumps(
+                    {"code": -1, "msg": f"Session {sessionid} not found or not initialized"}
+                ),
+            )
+        nerfreals[sessionid].set_custom_state(params['audiotype'],params['reinit'])
+
+        return web.Response(
+            content_type="application/json",
+            text=json.dumps(
+                {"code": 0, "msg":"ok"}
+            ),
+        )
+    except Exception as e:
+        logger.exception('exception:')
+        return web.Response(
+            content_type="application/json",
+            text=json.dumps(
+                {"code": -1, "msg": str(e)}
+            ),
+        )
+
+async def record(request):
+    try:
+        params = await request.json()
+
+        sessionid = params.get('sessionid',0)
+        if sessionid not in nerfreals or nerfreals[sessionid] is None:
+            return web.Response(
+                content_type="application/json",
+                text=json.dumps(
+                    {"code": -1, "msg": f"Session {sessionid} not found or not initialized"}
+                ),
+            )
+        if params['type']=='start_record':
+            # nerfreals[sessionid].put_msg_txt(params['text'])
+            nerfreals[sessionid].start_recording()
+        elif params['type']=='end_record':
+            nerfreals[sessionid].stop_recording()
+        return web.Response(
+            content_type="application/json",
+            text=json.dumps(
+                {"code": 0, "msg":"ok"}
+            ),
+        )
+    except Exception as e:
+        logger.exception('exception:')
+        return web.Response(
+            content_type="application/json",
+            text=json.dumps(
+                {"code": -1, "msg": str(e)}
+            ),
+        )
+
+async def is_speaking(request):
+    params = await request.json()
+
+    sessionid = params.get('sessionid',0)
+    if sessionid not in nerfreals or nerfreals[sessionid] is None:
+        return web.Response(
+            content_type="application/json",
+            text=json.dumps(
+                {"code": -1, "msg": f"Session {sessionid} not found or not initialized"}
+            ),
+        )
+    return web.Response(
+        content_type="application/json",
+        text=json.dumps(
+            {"code": 0, "data": nerfreals[sessionid].is_speaking()}
+        )
+    )
+
+
+async def knowledge_intro(request):
+    """返回知识库介绍内容"""
+    try:
+        from knowledge_intro import start_intro_play, knowledge_intro
+        # 启动完整版介绍播放,获取第一条文案
+        play_result = start_intro_play("full")
+        intro_text = play_result.get("text", "")
+        
+        params = await request.json()
+        sessionid = params.get('sessionid', 0)
+        
+        if sessionid not in nerfreals or nerfreals[sessionid] is None:
+            return web.Response(
+                content_type="application/json",
+                text=json.dumps(
+                    {"code": -1, "msg": f"Session {sessionid} not found or not initialized"}
+                ),
+            )
+        
+        # 保存介绍实例到会话中,便于后续操作
+        if not hasattr(nerfreals[sessionid], 'knowledge_intro_instance'):
+            nerfreals[sessionid].knowledge_intro_instance = knowledge_intro
+        
+        # 保存介绍播放状态到会话中
+        if not hasattr(nerfreals[sessionid], 'intro_play_state'):
+            nerfreals[sessionid].intro_play_state = {
+                "is_playing": True,
+                "current_type": "full",
+                "last_played_index": 0,
+                "is_paused": False,
+                "is_waiting_next": False
+            }
+        
+        # 使用支持打断恢复的新方法来播放介绍内容
+        nerfreals[sessionid].start_intro_with_interrupt_capability(intro_text)
+        
+        return web.Response(
+            content_type="application/json",
+            text=json.dumps(
+                {"code": 0, "msg": "Knowledge intro played successfully", "text": intro_text, "mode": "intro", "play_index": play_result.get("play_index", 1), "total_count": play_result.get("total_count", 8)}
+            ),
+        )
+    except Exception as e:
+        logger.exception('exception in knowledge_intro:')
+        return web.Response(
+            content_type="application/json",
+            text=json.dumps(
+                {"code": -1, "msg": str(e)}
+            ),
+        )
+
+
+async def resume_interrupted(request):
+    """恢复播放被中断的消息"""
+    try:
+        params = await request.json()
+        sessionid = params.get('sessionid', 0)
+        
+        if sessionid not in nerfreals or nerfreals[sessionid] is None:
+            return web.Response(
+                content_type="application/json",
+                text=json.dumps(
+                    {"code": -1, "msg": f"Session {sessionid} not found or not initialized"}
+                ),
+            )
+        
+        # 尝试恢复被中断的消息
+        resumed = nerfreals[sessionid].resume_interrupted()
+        
+        if resumed:
+            return web.Response(
+                content_type="application/json",
+                text=json.dumps(
+                    {"code": 0, "msg": "Interrupted messages resumed successfully", "mode": "intro"}
+                ),
+            )
+        else:
+            return web.Response(
+                content_type="application/json",
+                text=json.dumps(
+                    {"code": 0, "msg": "No interrupted messages to resume"}
+                ),
+            )
+    except Exception as e:
+        logger.exception('exception in resume_interrupted:')
+        return web.Response(
+            content_type="application/json",
+            text=json.dumps(
+                {"code": -1, "msg": str(e)}
+            ),
+        )
+
+
+async def handle_user_question(request):
+    """处理用户提问,暂停当前内容并优先回答问题"""
+    try:
+        params = await request.json()
+        sessionid = params.get('sessionid', 0)
+        question = params.get('text', '')
+        
+        if sessionid not in nerfreals or nerfreals[sessionid] is None:
+            return web.Response(
+                content_type="application/json",
+                text=json.dumps(
+                    {"code": -1, "msg": f"Session {sessionid} not found or not initialized"}
+                ),
+            )
+        
+        if not question:
+            return web.Response(
+                content_type="application/json",
+                text=json.dumps(
+                    {"code": -1, "msg": "Question text is required"}
+                ),
+            )
+        
+        # 检查是否在介绍过程中,如果是,则标记
+        during_intro = params.get('during_intro', False)
+        if during_intro and hasattr(nerfreals[sessionid], 'knowledge_intro_instance'):
+            # 暂停介绍播放
+            nerfreals[sessionid].knowledge_intro_instance.pause_play()
+        
+        # 使用高优先级方法处理用户问题
+        knowledge_base_type = params.get('knowledge_base', None)
+        # 直接调用llm_response,它会使用put_user_question方法
+        asyncio.get_event_loop().run_in_executor(None, llm_response, question, nerfreals[sessionid], knowledge_base_type, during_intro)
+        
+        return web.Response(
+            content_type="application/json",
+            text=json.dumps(
+                {"code": 0, "msg": "User question processed successfully", "mode": "qa"}
+            ),
+        )
+    except Exception as e:
+        logger.exception('exception in handle_user_question:')
+        return web.Response(
+            content_type="application/json",
+            text=json.dumps(
+                {"code": -1, "msg": str(e)}
+            ),
+        )
+
+
+async def continue_after_qa(request):
+    """在用户问题回答完毕后,继续播放之前的内容"""
+    try:
+        params = await request.json()
+        sessionid = params.get('sessionid', 0)
+        
+        if sessionid not in nerfreals or nerfreals[sessionid] is None:
+            return web.Response(
+                content_type="application/json",
+                text=json.dumps(
+                    {"code": -1, "msg": f"Session {sessionid} not found or not initialized"}
+                ),
+            )
+        
+        # 尝试恢复被中断的内容
+        if hasattr(nerfreals[sessionid], 'knowledge_intro_instance'):
+            # 从knowledge_intro_instance获取下一条内容
+            next_content = nerfreals[sessionid].knowledge_intro_instance.resume_play()
+            if next_content:
+                # 播放下一条介绍内容
+                nerfreals[sessionid].put_msg_txt(next_content['text'])
+                return web.Response(
+                    content_type="application/json",
+                    text=json.dumps(
+                        {"code": 0, "msg": "Previously interrupted content resumed successfully", "text": next_content['text'], "mode": "intro", "play_index": next_content.get("play_index", 1), "total_count": next_content.get("total_count", 8)}
+                    ),
+                )
+        
+        # 如果没有找到knowledge_intro_instance或没有剩余内容
+        resumed = nerfreals[sessionid].resume_interrupted()
+        
+        if resumed:
+            return web.Response(
+                content_type="application/json",
+                text=json.dumps(
+                    {"code": 0, "msg": "Previously interrupted content resumed successfully", "mode": "intro"}
+                ),
+            )
+        else:
+            return web.Response(
+                content_type="application/json",
+                text=json.dumps(
+                    {"code": 0, "msg": "No interrupted content to resume"}
+                ),
+            )
+    except Exception as e:
+        logger.exception('exception in continue_after_qa:')
+        return web.Response(
+            content_type="application/json",
+            text=json.dumps(
+                {"code": -1, "msg": str(e)}
+            ),
+        )
+
+async def intro_play_completed(request):
+    """处理介绍播放完成的回调,自动播放下一条介绍内容"""
+    try:
+        params = await request.json()
+        sessionid = params.get('sessionid', 0)
+        
+        if sessionid not in nerfreals or nerfreals[sessionid] is None:
+            return web.Response(
+                content_type="application/json",
+                text=json.dumps(
+                    {"code": -1, "msg": f"Session {sessionid} not found or not initialized"}
+                ),
+            )
+        
+        # 检查是否有介绍实例
+        if not hasattr(nerfreals[sessionid], 'knowledge_intro_instance'):
+            return web.Response(
+                content_type="application/json",
+                text=json.dumps(
+                    {"code": -1, "msg": "Knowledge intro instance not found"}
+                ),
+            )
+        
+        # 检查介绍播放状态
+        if hasattr(nerfreals[sessionid], 'intro_play_state') and not nerfreals[sessionid].intro_play_state.get("is_playing", True):
+            return web.Response(
+                content_type="application/json",
+                text=json.dumps(
+                    {"code": 0, "msg": "Introduction playback is paused"}
+                ),
+            )
+        
+        # 获取下一条介绍内容
+        next_content = nerfreals[sessionid].knowledge_intro_instance._get_next_content()
+        if next_content:
+            # 播放下一条介绍内容
+            nerfreals[sessionid].put_msg_txt(next_content['text'])
+            
+            # 更新播放状态
+            if hasattr(nerfreals[sessionid], 'intro_play_state'):
+                nerfreals[sessionid].intro_play_state["last_played_index"] = next_content.get("play_index", 1)
+            
+            return web.Response(
+                content_type="application/json",
+                text=json.dumps(
+                    {"code": 0, "msg": "Next introduction content played successfully", "text": next_content['text'], "mode": "intro", "play_index": next_content.get("play_index", 1), "total_count": next_content.get("total_count", 8), "is_last": next_content.get("is_last", False)}
+                ),
+            )
+        else:
+            return web.Response(
+                content_type="application/json",
+                text=json.dumps(
+                    {"code": 0, "msg": "No more introduction content"}
+                ),
+            )
+    except Exception as e:
+        logger.exception('exception in intro_play_completed:')
+        return web.Response(
+            content_type="application/json",
+            text=json.dumps(
+                {"code": -1, "msg": str(e)}
+            ),
+        )
+
+async def on_shutdown(app):
+    """关闭时清理所有资源"""
+    logger.info("开始清理所有资源...")
+    
+    # 1. 关闭所有 WebRTC 连接
+    coros = [pc.close() for pc in pcs]
+    await asyncio.gather(*coros, return_exceptions=True)
+    pcs.clear()
+    
+    # 2. 清理所有数字人实例
+    for sessionid, nerfreal in nerfreals.items():
+        try:
+            if nerfreal is not None:
+                logger.info(f"清理 session {sessionid} 的资源")
+                # 停止 TTS
+                if hasattr(nerfreal, 'tts') and nerfreal.tts:
+                    nerfreal.tts.state = State.PAUSE
+                # 清理队列
+                if hasattr(nerfreal, 'msg_queue'):
+                    with nerfreal.msg_queue.mutex:
+                        nerfreal.msg_queue.queue.clear()
+                if hasattr(nerfreal, 'interrupted_queue'):
+                    with nerfreal.interrupted_queue.mutex:
+                        nerfreal.interrupted_queue.queue.clear()
+        except Exception as e:
+            logger.error(f"清理 session {sessionid} 时出错: {e}")
+    
+    # 3. 清理数字人实例字典
+    nerfreals.clear()
+    
+    # 4. 强制垃圾回收
+    import gc
+    gc.collect()
+    
+    if torch.cuda.is_available():
+        torch.cuda.empty_cache()
+        logger.info(f"清理后 GPU 显存: {torch.cuda.memory_allocated() / 1024**3:.2f}GB")
+    
+    logger.info("所有资源清理完成")
+
+def cleanup_avatar_directories():
+    """程序退出时清理 avatar 目录"""
+    global _avatar_dirs_to_clean, _enable_avatar_cleanup
+    
+    if not _enable_avatar_cleanup:
+        logger.info("未启用 avatar 目录自动清理功能")
+        return
+    
+    if not _avatar_dirs_to_clean:
+        logger.info("没有需要清理的 avatar 目录")
+        return
+    
+    logger.info(f"开始清理 {len(_avatar_dirs_to_clean)} 个 avatar 目录...")
+    
+    for avatar_dir in _avatar_dirs_to_clean:
+        try:
+            if os.path.exists(avatar_dir):
+                logger.info(f"正在删除 avatar 目录: {avatar_dir}")
+                shutil.rmtree(avatar_dir)
+                logger.info(f"✅ 已删除: {avatar_dir}")
+            else:
+                logger.info(f"目录不存在,无需删除: {avatar_dir}")
+        except Exception as e:
+            logger.error(f"❌ 删除 avatar 目录失败 {avatar_dir}: {e}")
+    
+    _avatar_dirs_to_clean.clear()
+    logger.info("所有 avatar 目录清理完成")
+
+def signal_handler(signum, frame):
+    """信号处理器 - 捕获退出信号"""
+    logger.info(f"收到退出信号 {signum},开始清理...")
+    cleanup_avatar_directories()
+    sys.exit(0)
+
+async def post(url,data):
+    try:
+        async with aiohttp.ClientSession() as session:
+            async with session.post(url,data=data) as response:
+                return await response.text()
+    except aiohttp.ClientError as e:
+        logger.error(f'POST请求错误: {e}')
+        return None  # 明确返回None
+
+async def run(push_url,sessionid):
+    nerfreal = await asyncio.get_event_loop().run_in_executor(None, build_nerfreal,sessionid)
+    nerfreals[sessionid] = nerfreal
+
+    # RTMP 推流模式:跳过 WHIP 连接,使用 basereal.py 中的 ffmpeg 管道推流
+    if push_url.startswith('rtmp://'):
+        logger.info(f'RTMP 推流模式: {push_url},跳过 WHIP 连接,推流将在 render 时自动启动')
+        # 需要启动渲染循环以产生视频/音频帧
+        # 使用 HumanPlayer 触发 render,但不建立 WebRTC 连接
+        player = HumanPlayer(nerfreals[sessionid])
+        # 手动启动 player worker thread 来触发 nerfreal.render()
+        # HumanPlayer._start 会在 track 被 recv 时调用,这里直接启动
+        from threading import Event as ThreadEvent
+        render_quit_event = ThreadEvent()
+        # 直接启动 render 线程,不需要 WebRTC track
+        import threading
+        def rtmp_render_loop():
+            nerfreals[sessionid].render(render_quit_event, loop=None, audio_track=None, video_track=None)
+        render_thread = threading.Thread(target=rtmp_render_loop, daemon=True, name='rtmp_render')
+        render_thread.start()
+        logger.info('RTMP 渲染线程已启动')
+        return
+    
+    # WebRTC WHIP 推流(原有逻辑)
+    pc = RTCPeerConnection()
+    pcs.add(pc)
+
+    @pc.on("connectionstatechange")
+    async def on_connectionstatechange():
+        logger.info("Connection state is %s" % pc.connectionState)
+        if pc.connectionState == "failed":
+            await pc.close()
+            pcs.discard(pc)
+
+    player = HumanPlayer(nerfreals[sessionid])
+    audio_sender = pc.addTrack(player.audio)
+    video_sender = pc.addTrack(player.video)
+
+    await pc.setLocalDescription(await pc.createOffer())
+    answer = await post(push_url,pc.localDescription.sdp)
+    
+    # 检查POST请求是否成功
+    if answer is None:
+        logger.error(f'推流失败: 无法连接到 {push_url}')
+        await pc.close()
+        pcs.discard(pc)
+        return
+    
+    await pc.setRemoteDescription(RTCSessionDescription(sdp=answer,type='answer'))
+##########################################
+# os.environ['MKL_SERVICE_FORCE_INTEL'] = '1'
+# os.environ['MULTIPROCESSING_METHOD'] = 'forkserver'                                                    
+if __name__ == '__main__':
+    # 注册信号处理器
+    signal.signal(signal.SIGINT, signal_handler)
+    signal.signal(signal.SIGTERM, signal_handler)
+    
+    mp.set_start_method('spawn')
+    parser = argparse.ArgumentParser()
+    
+    # audio FPS
+    parser.add_argument('--fps', type=int, default=50, help="audio fps,must be 50")
+    # sliding window left-middle-right length (unit: 20ms)
+    parser.add_argument('-l', type=int, default=10)
+    parser.add_argument('-m', type=int, default=8)
+    parser.add_argument('-r', type=int, default=10)
+
+    parser.add_argument('--W', type=int, default=450, help="GUI width")
+    parser.add_argument('--H', type=int, default=450, help="GUI height")
+
+    #musetalk opt
+    parser.add_argument('--avatar_id', type=str, default='avator_1', help="define which avatar in data/avatars")
+    #parser.add_argument('--bbox_shift', type=int, default=5)
+    parser.add_argument('--batch_size', type=int, default=16, help="infer batch")
+
+    parser.add_argument('--customvideo_config', type=str, default='', help="custom action json")
+
+    parser.add_argument('--tts', type=str, default='edgetts', help="tts service type") #xtts gpt-sovits cosyvoice fishtts tencent doubao indextts2 azuretts qwen3tts voxcpm2api
+    parser.add_argument('--REF_FILE', type=str, default="zh-CN-YunxiaNeural",help="参考文件名或语音模型 ID,默认值为 edgetts 的语音模型 ID zh-CN-YunxiaNeural, 若--tts 指定为 azuretts, 可以使用 Azure 语音模型 ID, 如 zh-CN-XiaoxiaoMultilingualNeural")
+    parser.add_argument('--TTS_SERVER', type=str, default='http://127.0.0.1:9880', help="TTS server URL")
+    parser.add_argument('--QWEN3_TTS_MODEL_PATH', type=str, default='/home/test/Digital_Human/Qwen3-TTS-12Hz-1.7B-Base', help="Qwen3 TTS model path")
+    parser.add_argument('--QWEN3_TTS_LANGUAGE', type=str, default='Chinese', help="Qwen3 TTS language")
+    parser.add_argument('--VOXCPM2_MODEL_PATH', type=str, default='VoxCPM2', help="VoxCPM2 模型路径")
+    parser.add_argument('--VOXCPM2_API_URL', type=str, default='http://localhost:6003', help="VoxCPM2 API 服务地址(API 调用模式)")
+    parser.add_argument('--VOXCPM2_REF_WAV', type=str, default='voice_output.wav', help="VoxCPM2 参考音频路径")
+    parser.add_argument('--VOXCPM2_REF_TEXT', type=str, default='你好,买水果,卖水果,新鲜的水果。', help="VoxCPM2 参考文本")
+    parser.add_argument('--CFG_VALUE', type=float, default=2.0, help="VoxCPM2 CFG value")
+    parser.add_argument('--INFERENCE_TIMESTEPS', type=int, default=10, help="VoxCPM2 inference timesteps")
+    parser.add_argument('--REF_TEXT', type=str, default=None, help="参考文本")
+    # parser.add_argument('--CHARACTER', type=str, default='test')
+    # parser.add_argument('--EMOTION', type=str, default='default')
+
+    parser.add_argument('--model', type=str, default='musetalk') #musetalk wav2lip ultralight
+
+    parser.add_argument('--transport', type=str, default='rtcpush') #webrtc rtcpush virtualcam
+    parser.add_argument('--push_url', type=str, default='http://localhost:1985/rtc/v1/whip/?app=live&stream=livestream') #rtmp://localhost/live/livestream
+
+    parser.add_argument('--max_session', type=int, default=1)  #multi session count
+    parser.add_argument('--listenport', type=int, default=7868, help="web listen port")
+    
+    # Avatar 目录清理配置
+    parser.add_argument('--cleanup_avatar_on_exit', action='store_true', default=False, 
+                       help="程序退出时自动删除本次使用的 avatar 目录")
+
+    opt = parser.parse_args()
+    
+    # 设置全局清理标志
+    _enable_avatar_cleanup = opt.cleanup_avatar_on_exit
+    if _enable_avatar_cleanup:
+        logger.info("✅ 已启用 avatar 目录退出自动清理功能")
+    else:
+        logger.info("ℹ️ 未启用 avatar 目录自动清理(使用 --cleanup_avatar_on_exit 启用)")
+    #app.config.from_object(opt)
+    #print(app.config)
+    opt.customopt = []
+    if opt.customvideo_config!='':
+        with open(opt.customvideo_config,'r') as file:
+            opt.customopt = json.load(file)
+
+    # if opt.model == 'ernerf':       
+    #     from nerfreal import NeRFReal,load_model,load_avatar
+    #     model = load_model(opt)
+    #     avatar = load_avatar(opt) 
+    if opt.model == 'musetalk':
+        from musereal import MuseReal,load_model,load_avatar,warm_up
+        logger.info(opt)
+        model = load_model()
+        avatar = load_avatar(opt.avatar_id) 
+        warm_up(opt.batch_size,model)      
+    elif opt.model == 'wav2lip':
+        from lipreal import LipReal,load_model,load_avatar,warm_up
+        logger.info(opt)
+        model = load_model("./models/wav2lip256.pth")
+        avatar = load_avatar(opt.avatar_id)
+        warm_up(opt.batch_size,model,256)
+    elif opt.model == 'ultralight':
+        from lightreal import LightReal,load_model,load_avatar,warm_up
+        logger.info(opt)
+        model = load_model(opt)
+        avatar = load_avatar(opt.avatar_id)
+        warm_up(opt.batch_size,avatar,160)
+
+    # if opt.transport=='rtmp':
+    #     thread_quit = Event()
+    #     nerfreals[0] = build_nerfreal(0)
+    #     rendthrd = Thread(target=nerfreals[0].render,args=(thread_quit,))
+    #     rendthrd.start()
+    if opt.transport=='virtualcam':
+        thread_quit = Event()
+        nerfreals[0] = build_nerfreal(0)
+        rendthrd = Thread(target=nerfreals[0].render,args=(thread_quit,))
+        rendthrd.start()
+
+    #############################################################################
+    appasync = web.Application(client_max_size=1024**2*100)
+    appasync.on_shutdown.append(on_shutdown)
+    appasync.router.add_post("/offer", offer)
+    appasync.router.add_post("/human", human)
+    appasync.router.add_post("/api/chat", api_chat)  # 新增独立聊天API
+    appasync.router.add_post("/humanaudio", humanaudio)
+    appasync.router.add_post("/set_audiotype", set_audiotype)
+    appasync.router.add_post("/record", record)
+    appasync.router.add_post("/interrupt_talk", interrupt_talk)
+    appasync.router.add_post("/is_speaking", is_speaking)
+    appasync.router.add_post("/knowledge_intro", knowledge_intro)  # 新增知识库介绍 API
+    appasync.router.add_post("/resume_interrupted", resume_interrupted)  # 新增恢复中断消息 API
+    appasync.router.add_post("/handle_user_question", handle_user_question)  # 新增用户提问 API
+    appasync.router.add_post("/continue_after_qa", continue_after_qa)  # 新增问答后继续播放 API
+    appasync.router.add_post("/intro_play_completed", intro_play_completed)  # 新增介绍播放完成回调 API
+    appasync.router.add_post("/log", log)  # 前端日志接口
+    appasync.router.add_static('/',path='web')
+
+    # Configure default CORS settings.
+    cors = aiohttp_cors.setup(appasync, defaults={
+            "*": aiohttp_cors.ResourceOptions(
+                allow_credentials=True,
+                expose_headers="*",
+                allow_headers="*",
+            )
+        })
+    # Configure CORS on all routes.
+    for route in list(appasync.router.routes()):
+        cors.add(route)
+
+    pagename='webrtcapi.html'
+    if opt.transport=='rtmp':
+        pagename='echoapi.html'
+    elif opt.transport=='rtcpush':
+        pagename='rtcpushapi.html'
+    logger.info('start http server; http://<serverip>:'+str(opt.listenport)+'/'+pagename)
+    logger.info('如果使用webrtc,推荐访问webrtc集成前端: http://<serverip>:'+str(opt.listenport)+'/dashboard.html')
+    def run_server(runner):
+        try:
+            loop = asyncio.new_event_loop()
+            asyncio.set_event_loop(loop)
+            loop.run_until_complete(runner.setup())
+            site = web.TCPSite(runner, '0.0.0.0', opt.listenport)
+            loop.run_until_complete(site.start())
+            if opt.transport=='rtcpush':
+                for k in range(opt.max_session):
+                    push_url = opt.push_url
+                    if k!=0:
+                        push_url = opt.push_url+str(k)
+                    loop.run_until_complete(run(push_url,k))
+            loop.run_forever()
+        except KeyboardInterrupt:
+            logger.info("收到 KeyboardInterrupt,正在退出...")
+        except Exception as e:
+            logger.error(f"服务器运行出错: {e}")
+        finally:
+            # 服务器退出时清理 avatar 目录
+            cleanup_avatar_directories()    
+    #Thread(target=run_server, args=(web.AppRunner(appasync),)).start()
+    run_server(web.AppRunner(appasync))
+
+    #app.on_shutdown.append(on_shutdown)
+    #app.router.add_post("/offer", offer)
+
+    # print('start websocket server')
+    # server = pywsgi.WSGIServer(('0.0.0.0', 8000), app, handler_class=WebSocketHandler)
+    # server.serve_forever()
+    
+    

BIN
assets/LiveTalking-logo.jpg


BIN
assets/dataflow.png


BIN
assets/demo.mp4


+ 65 - 0
assets/faq.md

@@ -0,0 +1,65 @@
+1.  pytorch3d安装不成功\
+    下载源码编译
+
+```bash
+git clone https://github.com/facebookresearch/pytorch3d.git
+python setup.py install
+```
+
+2.  websocket连接报错\
+    修改python/site-packages/flask\_sockets.py
+
+```python
+self.url_map.add(Rule(rule, endpoint=f)) 改成 
+self.url_map.add(Rule(rule, endpoint=f, websocket=True))
+```
+
+3. protobuf版本过高
+
+```bash
+pip uninstall protobuf
+pip install protobuf==3.20.1
+```
+
+4. 数字人不眨眼\
+训练模型时添加如下步骤
+
+> Obtain AU45 for eyes blinking.\
+> Run FeatureExtraction in OpenFace, rename and move the output CSV file to data/\<ID>/au.csv.
+
+将au.csv拷到本项目的data目录下
+
+5. 数字人添加背景图片
+
+```bash
+python app.py --bg_img bc.jpg
+```
+
+6. 用自己训练的模型报错维度不匹配\
+训练模型时用wav2vec提取音频特征
+
+```bash
+python main.py data/ --workspace workspace/ -O --iters 100000 --asr_model cpierse/wav2vec2-large-xlsr-53-esperanto
+```
+
+7. rtmp推流时ffmpeg版本不对
+网上版友反馈是需要4.2.2版本。我也不确定具体哪些版本不行。原则是运行一下ffmpeg,打印的信息里需要有libx264,如果没有肯定不行
+```
+--enable-libx264
+```
+8. 替换自己训练的模型
+```python
+.
+├── data
+│   ├── data_kf.json (对应训练数据中的transforms_train.json)
+│   ├── au.csv			
+│   ├── pretrained
+│   └── └── ngp_kf.pth (对应训练后的模型ngp_ep00xx.pth)
+
+```
+
+
+其他参考
+https://github.com/lipku/metahuman-stream/issues/43#issuecomment-2008930101
+
+

BIN
assets/main.png


BIN
assets/qrcode-wechat.jpg


+ 243 - 0
audioplayer.py

@@ -0,0 +1,243 @@
+###############################################################################
+# 实时音频播放模块 - 监听 WAV 文件夹并播放新音频
+###############################################################################
+
+import os
+import time
+import wave
+import numpy as np
+import threading
+import queue
+from pathlib import Path
+from logger import logger
+
+
+class RealtimeAudioPlayer:
+    """实时监控文件夹并播放新到达的 WAV 音频"""
+    
+    def __init__(self, watch_dir, sample_rate=16000):
+        """
+        初始化音频播放器
+        
+        Args:
+            watch_dir: 监听的文件夹路径
+            sample_rate: 采样率(默认16000Hz)
+        """
+        self.watch_dir = watch_dir
+        self.sample_rate = sample_rate
+        
+        # 确保监听目录存在
+        os.makedirs(watch_dir, exist_ok=True)
+        
+        # 状态管理
+        self.is_running = False
+        self.is_playing = False
+        self.current_audio = None
+        
+        # 音频数据队列
+        self.audio_queue = queue.Queue(maxsize=50)
+        
+        # 已处理的文件
+        self.processed_files = set()
+        
+        # 线程控制
+        self.watch_thread = None
+        self.play_thread = None
+        self.stop_event = threading.Event()
+        
+        # 回调函数
+        self.on_audio = None
+        
+        logger.info(f"🎵 实时音频播放器已初始化")
+        logger.info(f"   监听目录: {watch_dir}")
+        logger.info(f"   采样率: {sample_rate}Hz")
+    
+    def set_callbacks(self, on_audio=None):
+        """
+        设置回调函数
+        
+        Args:
+            on_audio: 音频数据回调 (audio_data: np.ndarray)
+        """
+        self.on_audio = on_audio
+    
+    def start(self):
+        """启动监听和播放"""
+        if self.is_running:
+            logger.warning("⚠️ 音频播放器已在运行")
+            return
+        
+        logger.info("🎵 启动实时音频播放器...")
+        self.is_running = True
+        self.stop_event.clear()
+        
+        # 启动监听线程
+        self.watch_thread = threading.Thread(target=self._watch_loop, daemon=True)
+        self.watch_thread.start()
+        
+        # 启动播放线程
+        self.play_thread = threading.Thread(target=self._play_loop, daemon=True)
+        self.play_thread.start()
+        
+        logger.info("✅ 实时音频播放器已启动")
+    
+    def stop(self):
+        """停止播放"""
+        if not self.is_running:
+            return
+        
+        logger.info("🛑 停止实时音频播放器...")
+        self.is_running = False
+        self.stop_event.set()
+        
+        if self.watch_thread:
+            self.watch_thread.join(timeout=2)
+        if self.play_thread:
+            self.play_thread.join(timeout=2)
+        
+        logger.info("✅ 实时音频播放器已停止")
+    
+    def _watch_loop(self):
+        """监听文件夹循环"""
+        logger.info("👁️ 开始监听音频文件...")
+        
+        while not self.stop_event.is_set():
+            try:
+                # 查找所有 WAV 文件
+                wav_files = list(Path(self.watch_dir).glob('*.wav'))
+                
+                # 过滤未处理的文件
+                new_wavs = [f for f in wav_files if f.name not in self.processed_files]
+                
+                if new_wavs:
+                    # 按修改时间排序,处理最新的
+                    new_wavs.sort(key=lambda p: p.stat().st_mtime)
+                    
+                    for wav_path in new_wavs:
+                        if self.stop_event.is_set():
+                            break
+                        
+                        logger.info(f"📥 检测到新音频文件: {wav_path.name}")
+                        self._load_audio(wav_path)
+                        self.processed_files.add(wav_path.name)
+                
+                # 等待一段时间再检查
+                time.sleep(0.5)
+                
+            except Exception as e:
+                logger.error(f"❌ 监听音频文件出错: {e}")
+                time.sleep(1)
+    
+    def _load_audio(self, wav_path):
+        """
+        加载 WAV 文件到队列
+        
+        Args:
+            wav_path: WAV 文件路径
+        """
+        try:
+            # 读取 WAV 文件
+            with wave.open(str(wav_path), 'rb') as wf:
+                n_channels = wf.getnchannels()
+                sample_width = wf.getsampwidth()
+                framerate = wf.getframerate()
+                n_frames = wf.getnframes()
+                
+                # 读取所有音频数据
+                audio_data = wf.readframes(n_frames)
+                
+                # 转换为 numpy 数组
+                if sample_width == 2:
+                    audio_array = np.frombuffer(audio_data, dtype=np.int16)
+                elif sample_width == 1:
+                    audio_array = np.frombuffer(audio_data, dtype=np.uint8)
+                else:
+                    logger.warning(f"⚠️ 不支持的采样宽度: {sample_width}")
+                    return
+                
+                # 如果是立体声,转换为单声道
+                if n_channels == 2:
+                    audio_array = audio_array.reshape(-1, 2).mean(axis=1).astype(np.int16)
+                
+                # 重采样(如果需要)
+                if framerate != self.sample_rate:
+                    logger.info(f"🔄 重采样: {framerate}Hz -> {self.sample_rate}Hz")
+                    audio_array = self._resample(audio_array, framerate, self.sample_rate)
+                
+                # 添加到队列
+                logger.info(f"📤 音频已加入队列: {wav_path.name}, 时长: {len(audio_array)/self.sample_rate:.2f}s")
+                self.audio_queue.put(audio_array)
+                
+        except Exception as e:
+            logger.error(f"❌ 加载音频文件失败 {wav_path}: {e}")
+    
+    def _resample(self, audio_data, orig_sr, target_sr):
+        """
+        简单的线性插值重采样
+        
+        Args:
+            audio_data: 音频数据
+            orig_sr: 原始采样率
+            target_sr: 目标采样率
+        """
+        ratio = target_sr / orig_sr
+        new_length = int(len(audio_data) * ratio)
+        new_data = np.zeros(new_length, dtype=np.int16)
+        
+        for i in range(new_length):
+            orig_idx = i / ratio
+            idx = int(orig_idx)
+            frac = orig_idx - idx
+            
+            if idx + 1 < len(audio_data):
+                new_data[i] = int(audio_data[idx] * (1 - frac) + audio_data[idx + 1] * frac)
+            else:
+                new_data[i] = audio_data[idx]
+        
+        return new_data
+    
+    def _play_loop(self):
+        """播放音频数据循环"""
+        logger.info("🎵 开始播放音频...")
+        
+        while not self.stop_event.is_set():
+            try:
+                # 从队列获取音频数据
+                if not self.audio_queue.empty():
+                    audio_data = self.audio_queue.get(timeout=0.1)
+                    
+                    self.is_playing = True
+                    logger.info(f"🔊 正在播放音频,长度: {len(audio_data)/self.sample_rate:.2f}s")
+                    
+                    # 通过回调发送音频数据
+                    if self.on_audio:
+                        # 计算每帧的大小(20ms)
+                        chunk_size = self.sample_rate // 50  # 16000 / 50 = 320 samples per 20ms
+                        
+                        # 按帧发送
+                        for i in range(0, len(audio_data), chunk_size):
+                            if self.stop_event.is_set():
+                                break
+                            
+                            chunk = audio_data[i:i + chunk_size]
+                            
+                            # 如果最后一个 chunk 不足,补零
+                            if len(chunk) < chunk_size:
+                                chunk = np.pad(chunk, (0, chunk_size - len(chunk)), 'constant')
+                            
+                            self.on_audio(chunk)
+                            
+                            # 控制播放速度
+                            time.sleep(0.02)  # 20ms per chunk
+                    
+                    self.is_playing = False
+                    logger.info("✅ 音频播放完成")
+                else:
+                    time.sleep(0.01)
+                    
+            except queue.Empty:
+                continue
+            except Exception as e:
+                logger.error(f"❌ 播放音频出错: {e}")
+                self.is_playing = False
+                time.sleep(0.1)

BIN
back/rvm_resnet50.pth


BIN
back/video/coco_greenscreen.mp4


+ 239 - 0
back/video_to_greenscreen.py

@@ -0,0 +1,239 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+视频绿幕替换工具
+使用 RobustVideoMatting (RVM) 模型将视频背景替换为绿色
+已优化:本地模型极速加载,不联网、不卡
+"""
+
+import os
+import sys
+import cv2
+import torch
+from torch.utils.data import DataLoader
+from torchvision.transforms import ToTensor
+import numpy as np
+from tqdm import tqdm
+import argparse
+
+
+class VideoReader:
+    """视频读取器"""
+    def __init__(self, video_path, transform=None):
+        self.cap = cv2.VideoCapture(video_path)
+        if not self.cap.isOpened():
+            raise ValueError(f"Cannot open video: {video_path}")
+        
+        self.transform = transform
+        self.fps = int(self.cap.get(cv2.CAP_PROP_FPS))
+        self.width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
+        self.height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
+        self.total_frames = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT))
+    
+    def __len__(self):
+        return self.total_frames
+    
+    def __getitem__(self, idx):
+        ret, frame = self.cap.read()
+        if not ret:
+            raise IndexError(f"Failed to read frame {idx}")
+        
+        # BGR to RGB
+        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
+        
+        if self.transform:
+            frame = self.transform(frame)
+        
+        return frame
+    
+    def release(self):
+        self.cap.release()
+
+
+class VideoWriter:
+    """视频写入器"""
+    def __init__(self, output_path, frame_rate, width, height):
+        self.output_path = output_path
+        self.frame_rate = frame_rate
+        self.width = width
+        self.height = height
+        
+        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
+        self.writer = cv2.VideoWriter(output_path, fourcc, frame_rate, (width, height))
+    
+    def write(self, tensor):
+        """Write tensor (C, H, W) to video"""
+        # Tensor to numpy
+        frame = tensor.cpu().numpy().transpose(1, 2, 0)
+        frame = np.clip(frame * 255.0, 0, 255).astype(np.uint8)
+        
+        # RGB to BGR
+        frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
+        
+        self.writer.write(frame)
+    
+    def release(self):
+        self.writer.release()
+
+
+class GreenScreenProcessor:
+    def __init__(self, model_path=None, device='cuda'):
+        """
+        初始化绿幕处理器 —— 使用 torch.hub 官方结构加载
+        
+        Args:
+            model_path: 模型路径,默认使用脚本所在目录的 rvm_resnet50.pth
+            device: 计算设备 'cuda' 或 'cpu'
+        """
+        self.device = device
+        
+        # 使用动态路径,避免硬编码
+        if model_path is None:
+            script_dir = os.path.dirname(os.path.abspath(__file__))
+            model_path = os.path.join(script_dir, 'rvm_resnet50.pth')
+        
+        print(f"🚀 加载 RVM 模型: {model_path}")
+
+        # 尝试加载模型,如果网络不可用则使用离线模式
+        print("📦 加载模型结构...")
+        try:
+            # 尝试从 GitHub 加载(需要网络)
+            model = torch.hub.load('PeterL1n/RobustVideoMatting', 'resnet50', skip_validation=True)
+            print("✅ 从 GitHub 加载模型结构成功")
+        except Exception as e:
+            # 网络不可用,尝试使用本地缓存
+            print(f"⚠️  网络加载失败: {e}")
+            print("📦 尝试使用本地缓存的模型结构...")
+            
+            # 设置 torch.hub 为离线模式
+            torch.hub.set_dir(os.path.expanduser('~/.cache/torch/hub'))
+            try:
+                model = torch.hub.load('PeterL1n/RobustVideoMatting', 'resnet50', skip_validation=True, force_reload=False)
+                print("✅ 从本地缓存加载模型结构成功")
+            except Exception as e2:
+                print(f"❌ 本地缓存也不可用: {e2}")
+                print("\n💡 解决方案:")
+                print("   1. 确保网络连接正常")
+                print("   2. 或者预先下载模型:python -c \"import torch; torch.hub.load('PeterL1n/RobustVideoMatting', 'resnet50')\"")
+                print("   3. 或者使用已缓存的模型(首次运行后会自动缓存)")
+                raise
+        
+        # 加载本地权重
+        print(f"📦 加载本地权重:{model_path}")
+        checkpoint = torch.load(model_path, map_location=device, weights_only=True)
+        model.load_state_dict(checkpoint)
+        
+        self.model = model.to(device)
+        self.model.eval()
+        
+        print("✅ RVM 模型加载完成!")
+
+    def process_video(self, input_path, output_path, max_frames=None, downsample_ratio=0.25, progress_callback=None):
+        """
+        处理整个视频
+        
+        Args:
+            input_path: 输入视频路径
+            output_path: 输出视频路径
+            max_frames: 最大处理帧数
+            downsample_ratio: 下采样比例
+            progress_callback: 进度回调函数 callback(current, total, percentage)
+        """
+        print(f"\nProcessing video: {input_path}")
+        
+        reader = VideoReader(input_path, transform=ToTensor())
+        total_frames = reader.total_frames if max_frames is None else min(reader.total_frames, max_frames)
+        
+        print(f"Video info: {reader.width}x{reader.height}, FPS={reader.fps}, Frames={total_frames}")
+        
+        writer = VideoWriter(output_path, reader.fps, reader.width, reader.height)
+        
+        # 绿色背景(0,255,0)
+        bgr = torch.tensor([0, 0, 0]).view(3, 1, 1).to(self.device)
+        
+        rec = [None] * 4
+        frame_count = 0
+        
+        with torch.no_grad():
+            for src in tqdm(DataLoader(reader, batch_size=1), total=total_frames):
+                if frame_count >= total_frames:
+                    break
+                
+                src = src.to(self.device)
+                fgr, pha, *rec = self.model(src, *rec, downsample_ratio)
+                
+                # 合成绿幕
+                com = fgr * pha + bgr * (1 - pha)
+                
+                writer.write(com[0])
+                frame_count += 1
+                
+                # 报告进度
+                if progress_callback and frame_count % 10 == 0:  # 每10帧报告一次
+                    percentage = (frame_count / total_frames) * 100
+                    progress_callback(frame_count, total_frames, percentage)
+        
+        reader.release()
+        writer.release()
+        
+        # 确保最终进度被报告
+        if progress_callback:
+            progress_callback(frame_count, total_frames, 100.0)
+        
+        print(f"\n✓ 视频已保存:{output_path}")
+        print(f"✓ 处理帧数:{frame_count}")
+
+
+def main():
+    parser = argparse.ArgumentParser(description='将视频背景替换为绿幕 (RVM 极速版)')
+    parser.add_argument('input_video', type=str, help='输入视频路径')
+    parser.add_argument('-o', '--output', type=str, default=None, help='输出视频路径')
+    parser.add_argument('-m', '--model', type=str, default=None, help='RVM模型路径 (默认使用脚本目录下的rvm_resnet50.pth)')
+    parser.add_argument('-d', '--device', type=str, default='cuda', help='计算设备 cuda/cpu')
+    parser.add_argument('--max-frames', type=int, default=None, help='最大处理帧数')
+    parser.add_argument('--downsample-ratio', type=float, default=0.25, help='下采样比例')
+    
+    args = parser.parse_args()
+    script_dir = os.path.dirname(os.path.abspath(__file__))
+    
+    # 使用动态路径,支持命令行参数覆盖
+    if args.model:
+        model_path = args.model
+    else:
+        model_path = os.path.join(script_dir, 'rvm_resnet50.pth')
+
+    # 输出路径
+    if args.output is None:
+        base_name = os.path.splitext(os.path.basename(args.input_video))[0]
+        output_dir = os.path.join(script_dir, 'video')
+        os.makedirs(output_dir, exist_ok=True)
+        args.output = os.path.join(output_dir, f"{base_name}_greenscreen.mp4")
+    
+    # 检查文件
+    if not os.path.isfile(args.input_video):
+        print(f"❌ 输入视频不存在:{args.input_video}")
+        sys.exit(1)
+    
+    if not os.path.isfile(model_path):
+        print(f"❌ 模型不存在:{model_path}")
+        sys.exit(1)
+    
+    # 运行
+    processor = GreenScreenProcessor(model_path, args.device)
+    
+    try:
+        processor.process_video(
+            args.input_video,
+            args.output,
+            args.max_frames,
+            args.downsample_ratio
+        )
+    except Exception as e:
+        print(f"\n❌ 错误:{e}")
+        import traceback
+        traceback.print_exc()
+        sys.exit(1)
+
+
+if __name__ == "__main__":
+    main()

+ 88 - 0
baseasr.py

@@ -0,0 +1,88 @@
+###############################################################################
+#  Copyright (C) 2024 LiveTalking@lipku https://github.com/lipku/LiveTalking
+#  email: lipku@foxmail.com
+# 
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#  
+#       http://www.apache.org/licenses/LICENSE-2.0
+# 
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+###############################################################################
+
+import time
+import numpy as np
+
+import queue
+from queue import Queue
+import torch.multiprocessing as mp
+
+from basereal import BaseReal
+
+
+class BaseASR:
+    def __init__(self, opt, parent:BaseReal = None):
+        self.opt = opt
+        self.parent = parent
+
+        self.fps = opt.fps # 20 ms per frame
+        self.sample_rate = 16000
+        self.chunk = self.sample_rate // self.fps # 320 samples per chunk (20ms * 16000 / 1000)
+        self.queue = Queue()
+        self.output_queue = mp.Queue()
+
+        self.batch_size = opt.batch_size
+
+        self.frames = []
+        self.stride_left_size = opt.l
+        self.stride_right_size = opt.r
+        #self.context_size = 10
+        self.feat_queue = mp.Queue(2)
+
+        #self.warm_up()
+
+    def flush_talk(self):
+        self.queue.queue.clear()
+
+    def put_audio_frame(self,audio_chunk,datainfo:dict): #16khz 20ms pcm
+        self.queue.put((audio_chunk,datainfo))
+
+    #return frame:audio pcm; type: 0-normal speak, 1-silence; eventpoint:custom event sync with audio
+    def get_audio_frame(self):        
+        try:
+            frame,eventpoint = self.queue.get(block=True,timeout=0.01)
+            type = 0
+            #print(f'[INFO] get frame {frame.shape}')
+        except queue.Empty:
+            if self.parent and self.parent.curr_state>1: #播放自定义音频
+                frame = self.parent.get_audio_stream(self.parent.curr_state)
+                type = self.parent.curr_state
+            else:
+                frame = np.zeros(self.chunk, dtype=np.float32)
+                type = 1
+            eventpoint = None
+
+        return frame,type,eventpoint 
+
+    #return frame:audio pcm; type: 0-normal speak, 1-silence; eventpoint:custom event sync with audio
+    def get_audio_out(self): 
+        return self.output_queue.get()
+    
+    def warm_up(self):
+        for _ in range(self.stride_left_size + self.stride_right_size):
+            audio_frame,type,eventpoint=self.get_audio_frame()
+            self.frames.append(audio_frame)
+            self.output_queue.put((audio_frame,type,eventpoint))
+        for _ in range(self.stride_left_size):
+            self.output_queue.get()
+
+    def run_step(self):
+        pass
+
+    def get_next_feat(self,block,timeout):        
+        return self.feat_queue.get(block,timeout)

+ 965 - 0
basereal.py

@@ -0,0 +1,965 @@
+import math
+import torch
+import numpy as np
+
+import subprocess
+import os
+import time
+import cv2
+import glob
+import resampy
+
+import queue
+from queue import Queue
+from threading import Thread, Event
+from io import BytesIO
+import soundfile as sf
+
+import asyncio
+from av import AudioFrame, VideoFrame
+
+import av
+from fractions import Fraction
+
+from ttsreal import EdgeTTS,SovitsTTS,XTTS,CosyVoiceTTS,FishTTS,TencentTTS,DoubaoTTS,IndexTTS2,AzureTTS,State
+from logger import logger
+from videoplayer import RealtimeVideoPlayer
+from audioplayer import RealtimeAudioPlayer
+
+from tqdm import tqdm
+
+from threading import Lock
+def read_imgs(img_list):
+    frames = []
+    logger.info('reading images...')
+    for img_path in tqdm(img_list):
+        frame = cv2.imread(img_path)
+        frames.append(frame)
+    return frames
+
+def play_audio(quit_event,queue):        
+    import pyaudio
+    p = pyaudio.PyAudio()
+    stream = p.open(
+        rate=16000,
+        channels=1,
+        format=8,
+        output=True,
+        output_device_index=1,
+    )
+    stream.start_stream()
+    # while queue.qsize() <= 0:
+    #     time.sleep(0.1)
+    while not quit_event.is_set():
+        stream.write(queue.get(block=True))
+    stream.close()
+
+class BaseReal:
+    def __init__(self, opt,model,avatar):
+
+        #原本的播放序列 (添加大小限制防止内存泄漏)
+        self.msg_queue = queue.Queue(maxsize=1000)  # 最多1000条消息
+        self.interrupted_queue = queue.Queue(maxsize=500)  # 最多500条中断消息
+        self.user_question_queue = queue.Queue(maxsize=100)  # 最多100个用户问题
+        self.is_speaking_flag = False
+        self.current_text = "" # 当前正在播放的文本
+        self.current_pos = 0 # 当前正在播放的文本位置
+        self.interrupt_lock = Lock() #线程安全锁
+
+        self.opt = opt
+        self.sample_rate = 16000
+        self.chunk = self.sample_rate // opt.fps # 320 samples per chunk (20ms * 16000 / 1000)
+        self.sessionid = self.opt.sessionid
+
+        if opt.tts == "edgetts":
+            self.tts = EdgeTTS(opt,self)
+        elif opt.tts == "qwen3tts":
+            from qwen3tts import Qwen3TTS
+            self.tts = Qwen3TTS(opt,self)
+        elif opt.tts == "gpt-sovits":
+            self.tts = SovitsTTS(opt,self)
+        elif opt.tts == "xtts":
+            self.tts = XTTS(opt,self)
+        elif opt.tts == "cosyvoice":
+            self.tts = CosyVoiceTTS(opt,self)
+        elif opt.tts == "fishtts":
+            self.tts = FishTTS(opt,self)
+        elif opt.tts == "tencent":
+            self.tts = TencentTTS(opt,self)
+        elif opt.tts == "doubao":
+            self.tts = DoubaoTTS(opt,self)
+        elif opt.tts == "indextts2":
+            self.tts = IndexTTS2(opt,self)
+        elif opt.tts == "azuretts":
+            self.tts = AzureTTS(opt,self)
+        elif opt.tts == "voxcpm2":
+            from ttsreal import VoxCPM2TTS
+            self.tts = VoxCPM2TTS(opt,self)
+        elif opt.tts == "voxcpm2api":
+            from voxcpm2_api_tts import VoxCPM2APITTS
+            self.tts = VoxCPM2APITTS(opt,self)
+        elif opt.tts == "none":
+            # 不使用 TTS,使用音频播放模式
+            self.tts = None
+            self.video_player = None
+            self.audio_player = None
+            
+            # 检查是否指定视频播放目录
+            if hasattr(opt, 'VIDEO_PLAYBACK_DIR') and opt.VIDEO_PLAYBACK_DIR:
+                self.video_player = RealtimeVideoPlayer(opt.VIDEO_PLAYBACK_DIR)
+                logger.info(f"🎬 视频播放模式已启用,监听目录: {opt.VIDEO_PLAYBACK_DIR}")
+            # 检查是否指定音频播放目录
+            elif hasattr(opt, 'AUDIO_PLAYBACK_DIR') and opt.AUDIO_PLAYBACK_DIR:
+                self.audio_player = RealtimeAudioPlayer(opt.AUDIO_PLAYBACK_DIR, sample_rate=16000)
+                logger.info(f"🎵 音频播放模式已启用,监听目录: {opt.AUDIO_PLAYBACK_DIR}")
+            else:
+                logger.warning("⚠️ 未指定 VIDEO_PLAYBACK_DIR 或 AUDIO_PLAYBACK_DIR,播放模式不可用")
+        else:
+            # 默认使用 edgetts
+            logger.warning(f"未知的 TTS 类型: {opt.tts},使用 edgetts 作为默认")
+            self.tts = EdgeTTS(opt,self)
+
+        self.speaking = False
+
+        self.recording = False
+        self._record_video_pipe = None
+        self._record_audio_pipe = None
+        self.width = self.height = 0
+
+        # RTMP 推流支持
+        self._rtmp_pushing = False
+        self._rtmp_video_pipe = None
+        self._rtmp_audio_pipe = None
+        self._rtmp_push_url = getattr(opt, 'push_url', '') if hasattr(opt, 'push_url') and opt.transport == 'rtcpush' and getattr(opt, 'push_url', '').startswith('rtmp://') else ''
+        if self._rtmp_push_url:
+            logger.info(f'RTMP 推流已配置: {self._rtmp_push_url}')
+
+        self.curr_state=0
+        self.custom_img_cycle = {}
+        self.custom_audio_cycle = {}
+        self.custom_audio_index = {}
+        self.custom_index = {}
+        self.custom_opt = {}
+        self.__loadcustom()
+
+    def put_msg_txt(self,msg,datainfo:dict={}):
+        if self.opt.tts == "none":
+            # 音频播放模式:激活音频播放器
+            if self.audio_player and not self.audio_player.is_running:
+                logger.info("🎵 音频播放模式已激活")
+                self.audio_player.set_callbacks(
+                    on_audio=self._on_audio_data
+                )
+                self.audio_player.start()
+            # 视频播放模式:不需要处理文本消息
+            elif self.video_player and not self.video_player.is_running:
+                logger.info("🎬 视频播放模式已激活")
+                self.video_player.set_callbacks(
+                    on_frame=self._on_video_frame,
+                    on_audio=self._on_audio_frame
+                )
+                self.video_player.start()
+        else:
+            # TTS 模式:处理文本消息
+            self.tts.put_msg_txt(msg,datainfo)
+    
+    def _on_video_frame(self, frame):
+        """视频帧回调"""
+        # 这里可以将帧传递给渲染管道
+        # 目前先简单处理,后续可以集成到渲染流程
+        pass
+    
+    def _on_audio_frame(self, audio):
+        """音频帧回调(用于视频播放模式)"""
+        # 这里可以将音频数据传递给音频播放
+        pass
+    
+    def _on_audio_data(self, audio_chunk):
+        """音频数据回调(用于音频播放模式)"""
+        # 将音频数据帧传递给渲染管道
+        # audio_chunk 是 16kHz 20ms 的 PCM 数据 (int16)
+        try:
+            # 转换为 float32 格式,这是 ASR 需要的格式
+            if audio_chunk.dtype == np.int16:
+                audio_float = audio_chunk.astype(np.float32) / 32768.0
+            else:
+                audio_float = audio_chunk.astype(np.float32)
+            
+            # 传递给 put_audio_frame 处理
+            self.put_audio_frame(audio_float, {})
+        except Exception as e:
+            logger.error(f"处理音频数据回调时出错: {e}")
+    
+    def put_audio_frame(self,audio_chunk,datainfo:dict={}): #16khz 20ms pcm
+        self.asr.put_audio_frame(audio_chunk,datainfo)
+
+    def put_audio_file(self,filebyte,datainfo:dict={}): 
+        input_stream = BytesIO(filebyte)
+        stream = self.__create_bytes_stream(input_stream)
+        streamlen = stream.shape[0]
+        idx=0
+        while streamlen >= self.chunk:  #and self.state==State.RUNNING
+            self.put_audio_frame(stream[idx:idx+self.chunk],datainfo)
+            streamlen -= self.chunk
+            idx += self.chunk
+    
+    def __create_bytes_stream(self,byte_stream):
+        #byte_stream=BytesIO(buffer)
+        stream, sample_rate = sf.read(byte_stream) # [T*sample_rate,] float64
+        logger.info(f'[INFO]put audio stream {sample_rate}: {stream.shape}')
+        stream = stream.astype(np.float32)
+
+        if stream.ndim > 1:
+            logger.info(f'[WARN] audio has {stream.shape[1]} channels, only use the first.')
+            stream = stream[:, 0]
+    
+        if sample_rate != self.sample_rate and stream.shape[0]>0:
+            logger.info(f'[WARN] audio sample rate is {sample_rate}, resampling into {self.sample_rate}.')
+            stream = resampy.resample(x=stream, sr_orig=sample_rate, sr_new=self.sample_rate)
+
+        return stream
+
+    def stop_tts(self):
+        """停止当前TTS播放"""
+        with self.interrupt_lock:  # 添加锁保护
+            if hasattr(self.tts, 'state'):
+                self.tts.state = State.PAUSE
+            # 设置打断标志,强制中断当前播放
+            if hasattr(self.tts, 'set_interrupt_flag'):
+                self.tts.set_interrupt_flag()
+            # 清空TTS内部的普通队列,但保留高优先级队列(用于用户问题)
+            if hasattr(self.tts, 'msgqueue'):
+                with self.tts.msgqueue.mutex:
+                    self.tts.msgqueue.queue.clear()
+            # 注意:不清空高优先级队列,以确保用户问题能够被处理
+            # 清空输入音频流缓冲区
+            if hasattr(self.tts, 'input_stream'):
+                if hasattr(self.tts.input_stream, 'seek') and hasattr(self.tts.input_stream, 'truncate'):
+                    self.tts.input_stream.seek(0)
+                    self.tts.input_stream.truncate()
+            # 在锁保护下更新状态
+            self.is_speaking_flag = False
+            self.current_text = ""
+            self.current_pos = 0
+
+    def flush_talk(self):
+        """打断当前播放并保存未完成的内容"""
+        with self.interrupt_lock:
+            # 1. 保存当前正在播、但没播完的文本(关键)
+            if self.current_text and self.current_pos < len(self.current_text):
+                unfinished = self.current_text[self.current_pos:]
+                if unfinished.strip():
+                    try:
+                        self.interrupted_queue.put_nowait(unfinished)
+                    except queue.Full:
+                        logger.warning("中断队列已满,丢弃未完成的文本")
+            
+            # 2. 把 msg_queue 里剩余的全部移到 interrupted_queue(不丢弃)
+            while not self.msg_queue.empty():
+                try:
+                    self.interrupted_queue.put_nowait(self.msg_queue.get_nowait())
+                except queue.Full:
+                    logger.warning("中断队列已满,丢弃剩余消息")
+                    break
+            
+            # 3.停止当前TTS播放
+            # 注意:stop_tts内部已经有锁,这里需要临时释放锁避免死锁
+            pass
+            
+            # 4. 在锁保护下清空当前播放状态
+            self.is_speaking_flag = False
+            self.current_text = ""
+            self.current_pos = 0
+            
+            # 5. 不要重置打断标志,保持中断状态直到新内容开始播放
+        
+        # 在锁外调用stop_tts避免死锁
+        self.stop_tts()
+
+        # self.tts.flush_talk()
+        # self.asr.flush_talk()
+    
+    def handle_interruption_during_intro(self, user_question, datainfo:dict={}):
+        """处理在介绍过程中用户提问的逻辑,暂停介绍并优先回答问题"""
+        with self.interrupt_lock:
+            # 1. 保存当前正在播放的介绍内容到中断队列
+            if self.current_text and self.current_pos < len(self.current_text):
+                unfinished_intro = self.current_text[self.current_pos:]
+                if unfinished_intro.strip():
+                    self.interrupted_queue.put(unfinished_intro)
+                    logger.info(f"保存未完成的介绍内容: {unfinished_intro[:50]}...")
+            
+            # 2. 将消息队列中剩余的介绍内容也保存到中断队列
+            remaining_items = []
+            while not self.msg_queue.empty():
+                remaining_items.append(self.msg_queue.get())
+            
+            if remaining_items:
+                for item in remaining_items:
+                    self.interrupted_queue.put(item)
+                logger.info(f"保存了 {len(remaining_items)} 个剩余的介绍内容到中断队列")
+            
+            # 3. 停止TTS播放(注意:这会将TTS状态设为PAUSE,但我们随后会处理用户问题)
+            self.stop_tts()
+            
+            # 4. 清空当前播放状态
+            self.is_speaking_flag = False
+            self.current_text = ""
+            self.current_pos = 0
+            
+            # 5. 确保TTS状态设置为RUNNING以处理用户问题
+            if hasattr(self.tts, 'state'):
+                self.tts.state = State.RUNNING
+            
+            # 6. 重置打断标志,允许播放新内容
+            if hasattr(self.tts, 'reset_interrupt_flag'):
+                self.tts.reset_interrupt_flag()
+            
+            # 7. 优先播放用户问题
+            self.tts.put_high_priority_msg(user_question, datainfo)
+            
+            # 8. 立即尝试处理高优先级队列中的消息
+            # 通过直接调用TTS处理方法,绕过正常的队列处理延迟
+            logger.info("已暂停介绍并开始播放用户问题")
+            
+            # 9. 唤醒TTS处理线程,确保立即处理高优先级消息
+            # 通过向普通队列添加一个空消息来触发处理循环
+            if hasattr(self.tts, 'msgqueue'):
+                self.tts.msgqueue.put(("", {}))
+    
+    def put_user_question(self, msg, datainfo:dict={}):
+        """专门用于处理用户问题,优先级高于普通消息"""
+        with self.interrupt_lock:
+            # 强制中断当前播放
+            self.flush_talk()
+            
+            # 直接播放用户问题(使用高优先级队列)
+            self.tts.put_high_priority_msg(msg, datainfo)
+            
+            # 确保TTS状态设置为RUNNING以处理用户问题
+            if hasattr(self.tts, 'state'):
+                self.tts.state = State.RUNNING
+            
+            # 重置打断标志,允许播放新内容
+            if hasattr(self.tts, 'reset_interrupt_flag'):
+                self.tts.reset_interrupt_flag()
+            
+            # 唤醒TTS处理线程,确保立即处理高优先级消息
+            # 通过向普通队列添加一个空消息来触发处理循环
+            if hasattr(self.tts, 'msgqueue'):
+                self.tts.msgqueue.put(("", {}))
+    
+    def process_user_questions_and_resume(self):
+        """处理用户问题并恢复之前的内容"""
+        with self.interrupt_lock:
+            # 检查是否有用户问题需要处理
+            while not self.user_question_queue.empty():
+                msg, datainfo = self.user_question_queue.get()
+                # 直接播放用户问题
+                self.tts.put_msg_txt(msg, datainfo)
+            
+            # 检查是否有被中断的内容需要恢复
+            if not self.interrupted_queue.empty():
+                # 将被中断的内容放回主队列
+                while not self.interrupted_queue.empty():
+                    msg = self.interrupted_queue.get()
+                    self.msg_queue.put(msg)
+    
+    def resume_interrupted(self):
+        """把 interrupted_queue 里的内容放回播放队列"""
+        with self.interrupt_lock:
+            resumed = False
+            items_to_resume = []
+            # 先把所有中断的内容取出
+            while not self.interrupted_queue.empty():
+                items_to_resume.append(self.interrupted_queue.get())
+                resumed = True
+            
+            # 再按顺序放回主队列
+            for item in items_to_resume:
+                self.msg_queue.put(item)
+            
+            return resumed
+        # """恢复播放被中断的消息"""
+        # return self.tts.resume_interrupted()
+    
+    def start_intro_with_interrupt_capability(self, intro_text, datainfo:dict={}):
+        """开始播放介绍内容,同时保持接收用户提问的能力"""
+        # 将介绍内容放入主消息队列
+        self.put_msg_txt(intro_text, datainfo)
+
+    def is_speaking(self)->bool:
+        return self.speaking
+    
+    def __loadcustom(self):
+        for item in self.opt.customopt:
+            logger.info(item)
+            input_img_list = glob.glob(os.path.join(item['imgpath'], '*.[jpJP][pnPN]*[gG]'))
+            input_img_list = sorted(input_img_list, key=lambda x: int(os.path.splitext(os.path.basename(x))[0]))
+            self.custom_img_cycle[item['audiotype']] = read_imgs(input_img_list)
+            self.custom_audio_cycle[item['audiotype']], sample_rate = sf.read(item['audiopath'], dtype='float32')
+            self.custom_audio_index[item['audiotype']] = 0
+            self.custom_index[item['audiotype']] = 0
+            self.custom_opt[item['audiotype']] = item
+
+    def init_customindex(self):
+        self.curr_state=0
+        for key in self.custom_audio_index:
+            self.custom_audio_index[key]=0
+        for key in self.custom_index:
+            self.custom_index[key]=0
+
+    def notify(self,eventpoint):
+        logger.info("notify:%s",eventpoint)
+        
+        # 检查是否是用户问题回答的结束事件
+        if isinstance(eventpoint, dict) and eventpoint.get('status') == 'end':
+            # 如果是用户问题的回答结束,检查是否需要恢复被中断的内容
+            # 检查这个结束事件是否与用户问题相关
+            if 'knowledge_base' in eventpoint or self.interrupted_queue.qsize() > 0:
+                # 这是一个用户问题的回答结束,检查是否需要恢复被中断的内容
+                import threading
+                # 使用线程延时一小段时间再恢复,确保当前事件处理完成
+                timer = threading.Timer(0.5, self._try_resume_after_qa)
+                timer.start()
+            # 注意:介绍播放的自动续播不再通过notify触发,而是在TTS处理完成后自动触发
+            # 这样可以确保前一条完全播放完成后再播放下一条
+    
+    def _try_resume_after_qa(self):
+        """尝试恢复问答后的内容"""
+        with self.interrupt_lock:
+            # 检查是否还有被中断的内容需要恢复
+            if not self.interrupted_queue.empty():
+                logger.info("检测到问答完成,恢复被中断的内容...")
+                # 有被中断的内容需要恢复,调用恢复方法
+                self.resume_interrupted()
+
+    def _continue_intro_play(self):
+        """继续播放下一条介绍内容"""
+        try:
+            # 检查是否处于介绍播放状态
+            if not (hasattr(self, 'intro_play_state') and 
+                    self.intro_play_state.get('is_playing', False) and
+                    not self.intro_play_state.get('is_paused', False)):
+                logger.info("不处于介绍播放状态,跳过自动续播")
+                return
+            
+            # 检查TTS队列是否为空,如果不为空说明还有消息在等待,不放入新消息
+            if hasattr(self, 'tts') and hasattr(self.tts, 'msgqueue'):
+                if not self.tts.msgqueue.empty():
+                    logger.info("TTS消息队列不为空,等待队列清空后再播放下一条")
+                    # 设置等待标志,并设置定时器再次检查
+                    self.intro_play_state['is_waiting_next'] = True
+                    import threading
+                    timer = threading.Timer(2.0, self._continue_intro_play)
+                    timer.start()
+                    return
+            
+            # 设置等待标志,防止重复触发
+            if hasattr(self, 'intro_play_state'):
+                self.intro_play_state['is_waiting_next'] = True
+            
+            if hasattr(self, 'knowledge_intro_instance') and self.knowledge_intro_instance:
+                # 获取下一条介绍内容
+                next_content = self.knowledge_intro_instance._get_next_content()
+                if next_content and next_content.get('text'):
+                    logger.info(f"自动续播介绍内容 - 序号:{next_content.get('play_index', 1)}/{next_content.get('total_count', 1)}")
+                    # 更新播放状态
+                    if hasattr(self, 'intro_play_state'):
+                        self.intro_play_state["last_played_index"] = next_content.get("play_index", 1)
+                    
+                    # 计算停顿时间:根据文本长度估算播放时间 + 额外停顿
+                    # 语速约 3 字/秒(较慢的播报速度)
+                    text = next_content.get('text', '')
+                    estimated_play_time = len(text) / 3  # 估算播放时间(秒)
+                    pause_time = estimated_play_time + 3  # 播放时间 + 3秒停顿
+                    
+                    logger.info(f"当前文案长度:{len(text)}字,估算播放时间:{estimated_play_time:.1f}秒,播放下一条需等待:{pause_time:.1f}秒")
+                    
+                    # 播放下一条内容
+                    self.put_msg_txt(next_content['text'])
+                    
+                    # 设置定时器,在播放完成后继续播放下一条
+                    import threading
+                    timer = threading.Timer(pause_time, self._continue_intro_play)
+                    timer.start()
+                else:
+                    # 没有更多内容了,标记播放完成
+                    logger.info("介绍内容全部播放完成")
+                    if hasattr(self, 'intro_play_state'):
+                        self.intro_play_state["is_playing"] = False
+                        self.intro_play_state['is_waiting_next'] = False
+        except Exception as e:
+            logger.error(f"自动续播介绍内容时出错: {e}")
+            # 确保错误时也重置等待标志
+            if hasattr(self, 'intro_play_state'):
+                self.intro_play_state['is_waiting_next'] = False
+    
+    def _reset_waiting_flag(self):
+        """重置等待标志,允许播放下一条"""
+        if hasattr(self, 'intro_play_state'):
+            self.intro_play_state['is_waiting_next'] = False
+            logger.info("等待时间结束,可以播放下一条")
+
+    def start_recording(self):
+        """开始录制视频"""
+        if self.recording:
+            return
+
+        command = ['ffmpeg',
+                    '-y', '-an',
+                    '-f', 'rawvideo',
+                    '-vcodec','rawvideo',
+                    '-pix_fmt', 'bgr24', #像素格式
+                    '-s', "{}x{}".format(self.width, self.height),
+                    '-r', str(25),
+                    '-i', '-',
+                    '-pix_fmt', 'yuv420p', 
+                    '-vcodec', "h264",
+                    #'-f' , 'flv',                  
+                    f'temp{self.opt.sessionid}.mp4']
+        self._record_video_pipe = subprocess.Popen(command, shell=False, stdin=subprocess.PIPE)
+
+        acommand = ['ffmpeg',
+                    '-y', '-vn',
+                    '-f', 's16le',
+                    #'-acodec','pcm_s16le',
+                    '-ac', '1',
+                    '-ar', '16000',
+                    '-i', '-',
+                    '-acodec', 'aac',
+                    #'-f' , 'wav',                  
+                    f'temp{self.opt.sessionid}.aac']
+        self._record_audio_pipe = subprocess.Popen(acommand, shell=False, stdin=subprocess.PIPE)
+
+        self.recording = True
+        # self.recordq_video.queue.clear()
+        # self.recordq_audio.queue.clear()
+        # self.container = av.open(path, mode="w")
+    
+        # process_thread = Thread(target=self.record_frame, args=())
+        # process_thread.start()
+    
+    def record_video_data(self,image):
+        if self.width == 0:
+            logger.debug("image.shape: %s", image.shape)
+            self.height,self.width,_ = image.shape
+        if self.recording:
+            self._record_video_pipe.stdin.write(image.tostring())
+
+    def record_audio_data(self,frame):
+        if self.recording:
+            self._record_audio_pipe.stdin.write(frame.tostring())
+    
+    def start_rtmp_push(self):
+        """启动 RTMP 推流(单进程 FFmpeg,视频+音频共用一套时钟)"""
+        if not self._rtmp_push_url or self._rtmp_pushing:
+            return
+        
+        if self.width == 0 or self.height == 0:
+            logger.warning('RTMP 推流: 视频尺寸未初始化,等待第一帧')
+            return
+        
+        self._rtmp_pushing = True
+        
+        # ffmpeg 命令:直接通过 stdin 管道接收视频和音频
+        # 使用 pipe:0 接收视频,pipe:1 接收音频(需要两个独立的 FFmpeg 进程或使用 filter_complex)
+        # 但更简单的方式是:使用一个 FFmpeg 进程,视频通过 stdin,音频通过第二个管道
+        ffmpeg_cmd = [
+            'ffmpeg',
+            '-y',
+            # 视频输入:通过管道
+            '-f', 'rawvideo', '-vcodec', 'rawvideo', '-pix_fmt', 'bgr24',
+            '-s', f'{self.width}x{self.height}', '-r', '25',
+            '-i', 'pipe:0',  # 从 stdin 读取视频
+            # 音频输入:通过第二个管道
+            '-f', 's16le', '-acodec', 'pcm_s16le', '-ar', '16000', '-ac', '1',
+            '-i', 'pipe:1',  # 从 fd 1 读取音频(需要特殊处理)
+            # 编码输出
+            '-c:v', 'libx264', '-preset', 'ultrafast', '-tune', 'zerolatency',
+            '-pix_fmt', 'yuv420p', '-g', '50', '-keyint_min', '25',
+            '-c:a', 'aac', '-b:a', '128k', '-ar', '44100',
+            '-f', 'flv', self._rtmp_push_url
+        ]
+        
+        # 注意:FFmpeg 不支持同时从两个 pipe 读取,需要使用 FIFO
+        # 所以我们保留 FIFO 方案,但去掉手动速率控制,让 FFmpeg 自动处理
+        
+        # 创建 FIFO
+        import tempfile
+        self._rtmp_fifo_dir = tempfile.mkdtemp(prefix='rtmp_fifo_')
+        self._rtmp_video_fifo = os.path.join(self._rtmp_fifo_dir, 'video')
+        self._rtmp_audio_fifo = os.path.join(self._rtmp_fifo_dir, 'audio')
+        os.mkfifo(self._rtmp_video_fifo)
+        os.mkfifo(self._rtmp_audio_fifo)
+        
+        # 视频和音频缓冲队列
+        self._rtmp_video_queue = queue.Queue(maxsize=30)
+        self._rtmp_audio_queue = queue.Queue(maxsize=100)
+        
+        # 重写 ffmpeg 命令使用 FIFO
+        # 关键:不使用 -re(会阻塞 FIFO),改用 -vf realtime/-af arealtime 对输出限速
+        # 再配合 -fflags +genpts 与 -vsync cfr 保证恒定 25fps,避免下游转推看到 2x 速度
+        ffmpeg_cmd = [
+            'ffmpeg',
+            '-y',
+            '-fflags', '+genpts',
+            # 视频输入(不使用 -re,让 FIFO 自然反压控制速率)
+            '-f', 'rawvideo', '-vcodec', 'rawvideo', '-pix_fmt', 'bgr24',
+            '-s', f'{self.width}x{self.height}', '-r', '25',
+            '-thread_queue_size', '1024',
+            '-i', self._rtmp_video_fifo,
+            # 音频输入(16kHz 单声道 PCM)
+            '-f', 's16le', '-acodec', 'pcm_s16le', '-ar', '16000', '-ac', '1',
+            '-thread_queue_size', '1024',
+            '-i', self._rtmp_audio_fifo,
+            # 滤镜:按墙钟实时限速,视频+音频同步限速到 1x,避免 SRS 转推出现 speed=2x
+            '-vf', 'realtime',
+            '-af', 'arealtime,aresample=async=1:first_pts=0',
+            # 编码输出(音频保持 16kHz 不重采样,避免时间戳错乱)
+            '-c:v', 'libx264', '-preset', 'ultrafast', '-tune', 'zerolatency',
+            '-pix_fmt', 'yuv420p', '-g', '50', '-keyint_min', '25',
+            '-vsync', 'cfr', '-r', '25',  # 强制输出恒定 25fps
+            '-c:a', 'aac', '-b:a', '128k', '-ar', '16000',  # 保持 16kHz!
+            '-f', 'flv', self._rtmp_push_url
+        ]
+        
+        try:
+            # 启动 ffmpeg 进程
+            self._rtmp_pipe = subprocess.Popen(
+                ffmpeg_cmd, shell=False,
+                stdout=subprocess.DEVNULL, stderr=subprocess.PIPE
+            )
+            logger.info(f'RTMP ffmpeg 进程已启动: PID={self._rtmp_pipe.pid}')
+            
+            # 视频 FIFO 写入线程(无速率控制,FFmpeg 自动处理)
+            def video_fifo_writer():
+                try:
+                    vfd = os.open(self._rtmp_video_fifo, os.O_WRONLY)
+                    logger.info('RTMP 视频 FIFO 已打开')
+                    while self._rtmp_pushing:
+                        try:
+                            data = self._rtmp_video_queue.get(timeout=1.0)
+                            if data is None:
+                                break
+                            os.write(vfd, data)
+                        except queue.Empty:
+                            continue
+                except Exception as e:
+                    logger.error(f'RTMP 视频写入线程异常: {e}')
+                    self._rtmp_pushing = False
+                finally:
+                    try: os.close(vfd)
+                    except: pass
+            
+            # 音频 FIFO 写入线程(无速率控制,FFmpeg 自动处理)
+            def audio_fifo_writer():
+                try:
+                    afd = os.open(self._rtmp_audio_fifo, os.O_WRONLY)
+                    logger.info('RTMP 音频 FIFO 已打开')
+                    # 静音帧:匹配一个视频帧(40ms)的 PCM 数据量
+                    # 16kHz * 0.04s * 2字节(单声道 s16le) = 1280 字节
+                    silence_frame = b'\x00' * 1280  # 40ms 静音(严格匹配视频帧率,避免音频过速)
+                    while self._rtmp_pushing:
+                        try:
+                            data = self._rtmp_audio_queue.get(timeout=0.04)
+                            if data is None:
+                                break
+                            os.write(afd, data)
+                        except queue.Empty:
+                            # 队列为空(TTS未说话),写入一帧 40ms 静音保持ffmpeg不卡顿
+                            try:
+                                os.write(afd, silence_frame)
+                            except:
+                                break
+                except Exception as e:
+                    logger.error(f'RTMP 音频写入线程异常: {e}')
+                    self._rtmp_pushing = False
+                finally:
+                    try: os.close(afd)
+                    except: pass
+            
+            self._rtmp_video_thread = Thread(target=video_fifo_writer, daemon=True, name='rtmp_video_writer')
+            self._rtmp_audio_thread = Thread(target=audio_fifo_writer, daemon=True, name='rtmp_audio_writer')
+            self._rtmp_video_thread.start()
+            self._rtmp_audio_thread.start()
+            
+            # stderr 读取线程
+            def read_stderr():
+                for line in iter(self._rtmp_pipe.stderr.readline, b''):
+                    msg = line.decode('utf-8', errors='ignore').strip()
+                    if msg:
+                        logger.debug(f'FFmpeg RTMP: {msg}')
+            Thread(target=read_stderr, daemon=True, name='rtmp_stderr_reader').start()
+            
+            logger.info(f'RTMP 音视频推流已启动: {self._rtmp_push_url}')
+            
+        except Exception as e:
+            logger.error(f'RTMP 推流启动失败: {e}')
+            self._rtmp_pushing = False
+            self._cleanup_rtmp_fifos()
+    
+    def rtmp_push_video_data(self, image):
+        """将视频帧放入 RTMP 推流队列"""
+        if not self._rtmp_pushing:
+            return
+        try:
+            self._rtmp_video_queue.put_nowait(image.tobytes())
+        except queue.Full:
+            pass  # 丢弃旧帧,避免延迟累积
+    
+    def rtmp_push_audio_data(self, frame):
+        """将音频帧放入 RTMP 推流队列"""
+        if not self._rtmp_pushing:
+            return
+        try:
+            self._rtmp_audio_queue.put_nowait(frame.tobytes())
+        except queue.Full:
+            pass  # 丢弃旧帧,避免延迟累积
+    
+    def stop_rtmp_push(self):
+        """停止 RTMP 推流"""
+        if not self._rtmp_pushing:
+            return
+        self._rtmp_pushing = False
+        # 发送停止信号给写入线程
+        try:
+            self._rtmp_video_queue.put(None, timeout=1)
+        except: pass
+        try:
+            self._rtmp_audio_queue.put(None, timeout=1)
+        except: pass
+        # 等待写入线程结束
+        for t in [getattr(self, '_rtmp_video_thread', None), getattr(self, '_rtmp_audio_thread', None)]:
+            if t and t.is_alive():
+                t.join(timeout=3)
+        # 终止 ffmpeg
+        if self._rtmp_pipe is not None:
+            try:
+                self._rtmp_pipe.wait(timeout=5)
+            except:
+                self._rtmp_pipe.kill()
+        self._cleanup_rtmp_fifos()
+        logger.info('RTMP 推流已停止')
+    
+    def _cleanup_rtmp_fifos(self):
+        """清理 FIFO 文件和目录"""
+        for path in [getattr(self, '_rtmp_video_fifo', ''), getattr(self, '_rtmp_audio_fifo', '')]:
+            if path and os.path.exists(path):
+                try: os.unlink(path)
+                except: pass
+        fifo_dir = getattr(self, '_rtmp_fifo_dir', '')
+        if fifo_dir and os.path.isdir(fifo_dir):
+            try: os.rmdir(fifo_dir)
+            except: pass
+    
+    # def record_frame(self): 
+    #     videostream = self.container.add_stream("libx264", rate=25)
+    #     videostream.codec_context.time_base = Fraction(1, 25)
+    #     audiostream = self.container.add_stream("aac")
+    #     audiostream.codec_context.time_base = Fraction(1, 16000)
+    #     init = True
+    #     framenum = 0       
+    #     while self.recording:
+    #         try:
+    #             videoframe = self.recordq_video.get(block=True, timeout=1)
+    #             videoframe.pts = framenum #int(round(framenum*0.04 / videostream.codec_context.time_base))
+    #             videoframe.dts = videoframe.pts
+    #             if init:
+    #                 videostream.width = videoframe.width
+    #                 videostream.height = videoframe.height
+    #                 init = False
+    #             for packet in videostream.encode(videoframe):
+    #                 self.container.mux(packet)
+    #             for k in range(2):
+    #                 audioframe = self.recordq_audio.get(block=True, timeout=1)
+    #                 audioframe.pts = int(round((framenum*2+k)*0.02 / audiostream.codec_context.time_base))
+    #                 audioframe.dts = audioframe.pts
+    #                 for packet in audiostream.encode(audioframe):
+    #                     self.container.mux(packet)
+    #             framenum += 1
+    #         except queue.Empty:
+    #             print('record queue empty,')
+    #             continue
+    #         except Exception as e:
+    #             print(e)
+    #             #break
+    #     for packet in videostream.encode(None):
+    #         self.container.mux(packet)
+    #     for packet in audiostream.encode(None):
+    #         self.container.mux(packet)
+    #     self.container.close()
+    #     self.recordq_video.queue.clear()
+    #     self.recordq_audio.queue.clear()
+    #     print('record thread stop')
+		
+    def stop_recording(self):
+        """停止录制视频"""
+        if not self.recording:
+            return
+        self.recording = False 
+        self._record_video_pipe.stdin.close()  #wait() 
+        self._record_video_pipe.wait()
+        self._record_audio_pipe.stdin.close()
+        self._record_audio_pipe.wait()
+        cmd_combine_audio = f"ffmpeg -y -i temp{self.opt.sessionid}.aac -i temp{self.opt.sessionid}.mp4 -c:v copy -c:a copy data/record.mp4"
+        os.system(cmd_combine_audio) 
+        #os.remove(output_path)
+
+    def mirror_index(self,size, index):
+        #size = len(self.coord_list_cycle)
+        turn = index // size
+        res = index % size
+        if turn % 2 == 0:
+            return res
+        else:
+            return size - res - 1 
+    
+    def get_audio_stream(self,audiotype):
+        idx = self.custom_audio_index[audiotype]
+        stream = self.custom_audio_cycle[audiotype][idx:idx+self.chunk]
+        self.custom_audio_index[audiotype] += self.chunk
+        if self.custom_audio_index[audiotype]>=self.custom_audio_cycle[audiotype].shape[0]:
+            self.curr_state = 1  #当前视频不循环播放,切换到静音状态
+        return stream
+    
+    def set_custom_state(self,audiotype, reinit=True):
+        logger.debug('set_custom_state: %s', audiotype)
+        if self.custom_audio_index.get(audiotype) is None:
+            return
+        self.curr_state = audiotype
+        if reinit:
+            self.custom_audio_index[audiotype] = 0
+            self.custom_index[audiotype] = 0
+
+    def process_frames(self,quit_event,loop=None,audio_track=None,video_track=None):
+        enable_transition = False  # 设置为False禁用过渡效果,True启用
+        
+        if enable_transition:
+            _last_speaking = False
+            _transition_start = time.time()
+            _transition_duration = 0.1  # 过渡时间
+            _last_silent_frame = None  # 静音帧缓存
+            _last_speaking_frame = None  # 说话帧缓存
+        
+        if self.opt.transport=='virtualcam':
+            import pyvirtualcam
+            vircam = None
+
+            audio_tmp = queue.Queue(maxsize=3000)
+            audio_thread = Thread(target=play_audio, args=(quit_event,audio_tmp,), daemon=True, name="pyaudio_stream")
+            audio_thread.start()
+        
+        # RTMP 推流: 延迟启动,等待第一帧确定视频尺寸
+        _rtmp_started = False
+        _rtmp_attempted = False  # 记录是否已经尝试过启动推流
+        
+        while not quit_event.is_set():
+            try:
+                res_frame,idx,audio_frames = self.res_frame_queue.get(block=True, timeout=1)
+            except queue.Empty:
+                continue
+            
+            if enable_transition:
+                # 检测状态变化
+                current_speaking = not (audio_frames[0][1]!=0 and audio_frames[1][1]!=0)
+                if current_speaking != _last_speaking:
+                    logger.info(f"状态切换:{'说话' if _last_speaking else '静音'} → {'说话' if current_speaking else '静音'}")
+                    _transition_start = time.time()
+                _last_speaking = current_speaking
+
+            if audio_frames[0][1]!=0 and audio_frames[1][1]!=0: #全为静音数据,只需要取fullimg
+                self.speaking = False
+                audiotype = audio_frames[0][1]
+                if self.custom_index.get(audiotype) is not None: #有自定义视频
+                    mirindex = self.mirror_index(len(self.custom_img_cycle[audiotype]),self.custom_index[audiotype])
+                    target_frame = self.custom_img_cycle[audiotype][mirindex]
+                    self.custom_index[audiotype] += 1
+                else:
+                    target_frame = self.frame_list_cycle[idx]
+                
+                if enable_transition:
+                    # 说话→静音过渡
+                    if time.time() - _transition_start < _transition_duration and _last_speaking_frame is not None:
+                        alpha = min(1.0, (time.time() - _transition_start) / _transition_duration)
+                        combine_frame = cv2.addWeighted(_last_speaking_frame, 1-alpha, target_frame, alpha, 0)
+                    else:
+                        combine_frame = target_frame
+                    # 缓存静音帧
+                    _last_silent_frame = combine_frame.copy()
+                else:
+                    combine_frame = target_frame
+            else:
+                self.speaking = True
+                try:
+                    current_frame = self.paste_back_frame(res_frame,idx)
+                except Exception as e:
+                    logger.warning(f"paste_back_frame error: {e}")
+                    continue
+                if enable_transition:
+                    # 静音→说话过渡
+                    if time.time() - _transition_start < _transition_duration and _last_silent_frame is not None:
+                        alpha = min(1.0, (time.time() - _transition_start) / _transition_duration)
+                        combine_frame = cv2.addWeighted(_last_silent_frame, 1-alpha, current_frame, alpha, 0)
+                    else:
+                        combine_frame = current_frame
+                    # 缓存说话帧
+                    _last_speaking_frame = combine_frame.copy()
+                else:
+                    combine_frame = current_frame
+
+            #cv2.putText(combine_frame, "LiveTalking", (10, 20), cv2.FONT_HERSHEY_SIMPLEX, 0.3, (128,128,128), 1)
+            if self.opt.transport=='virtualcam':
+                if vircam==None:
+                    height, width,_= combine_frame.shape
+                    vircam = pyvirtualcam.Camera(width=width, height=height, fps=25, fmt=pyvirtualcam.PixelFormat.BGR,print_fps=True)
+                vircam.send(combine_frame)
+            elif video_track is not None and loop is not None: #webrtc
+                image = combine_frame
+                new_frame = VideoFrame.from_ndarray(image, format="bgr24")
+                asyncio.run_coroutine_threadsafe(video_track._queue.put((new_frame,None)), loop)
+            self.record_video_data(combine_frame)
+            
+            # RTMP 推流: 视频帧
+            if self._rtmp_push_url:
+                if not _rtmp_started and not _rtmp_attempted:
+                    # 首次获得视频帧,启动 RTMP 推流
+                    if self.width > 0 and self.height > 0:
+                        self.start_rtmp_push()
+                        _rtmp_attempted = True  # 标记已尝试
+                        if self._rtmp_pushing:  # 启动成功
+                            _rtmp_started = True
+                if _rtmp_started:  # 只有启动成功后才推流
+                    self.rtmp_push_video_data(combine_frame)
+
+            for audio_frame in audio_frames:
+                frame,type,eventpoint = audio_frame
+                frame = (frame * 32767).astype(np.int16)
+
+                if self.opt.transport=='virtualcam':
+                    audio_tmp.put(frame.tobytes()) #TODO
+                elif audio_track is not None and loop is not None: #webrtc
+                    new_frame = AudioFrame(format='s16', layout='mono', samples=frame.shape[0])
+                    new_frame.planes[0].update(frame.tobytes())
+                    new_frame.sample_rate=16000
+                    asyncio.run_coroutine_threadsafe(audio_track._queue.put((new_frame,eventpoint)), loop)
+                self.record_audio_data(frame)
+                # RTMP 推流: 音频帧
+                if _rtmp_started:
+                    self.rtmp_push_audio_data(frame)
+            if self.opt.transport=='virtualcam':
+                vircam.sleep_until_next_frame()
+        # 停止 RTMP 推流
+        self.stop_rtmp_push()
+        if self.opt.transport=='virtualcam':
+            audio_thread.join()
+            vircam.close()
+        logger.info('basereal process_frames thread stop') 
+    
+    # def process_custom(self,audiotype:int,idx:int):
+    #     if self.curr_state!=audiotype: #从推理切到口播
+    #         if idx in self.switch_pos:  #在卡点位置可以切换
+    #             self.curr_state=audiotype
+    #             self.custom_index=0
+    #     else:
+    #         self.custom_index+=1

BIN
coco.mp4


+ 0 - 0
data/avatars/.gitkeep


BIN
data/avatars/wav2lip256_avatar1/coords.pkl


BIN
data/avatars/wav2lip256_avatar1/face_imgs/00000000.png


BIN
data/avatars/wav2lip256_avatar1/face_imgs/00000001.png


BIN
data/avatars/wav2lip256_avatar1/face_imgs/00000002.png


BIN
data/avatars/wav2lip256_avatar1/face_imgs/00000003.png


BIN
data/avatars/wav2lip256_avatar1/face_imgs/00000004.png


BIN
data/avatars/wav2lip256_avatar1/face_imgs/00000005.png


BIN
data/avatars/wav2lip256_avatar1/face_imgs/00000006.png


BIN
data/avatars/wav2lip256_avatar1/face_imgs/00000007.png


BIN
data/avatars/wav2lip256_avatar1/face_imgs/00000008.png


BIN
data/avatars/wav2lip256_avatar1/face_imgs/00000009.png


BIN
data/avatars/wav2lip256_avatar1/face_imgs/00000010.png


BIN
data/avatars/wav2lip256_avatar1/face_imgs/00000011.png


BIN
data/avatars/wav2lip256_avatar1/face_imgs/00000012.png


BIN
data/avatars/wav2lip256_avatar1/face_imgs/00000013.png


BIN
data/avatars/wav2lip256_avatar1/face_imgs/00000014.png


BIN
data/avatars/wav2lip256_avatar1/face_imgs/00000015.png


BIN
data/avatars/wav2lip256_avatar1/face_imgs/00000016.png


BIN
data/avatars/wav2lip256_avatar1/face_imgs/00000017.png


BIN
data/avatars/wav2lip256_avatar1/face_imgs/00000018.png


BIN
data/avatars/wav2lip256_avatar1/face_imgs/00000019.png


BIN
data/avatars/wav2lip256_avatar1/face_imgs/00000020.png


BIN
data/avatars/wav2lip256_avatar1/face_imgs/00000021.png


BIN
data/avatars/wav2lip256_avatar1/face_imgs/00000022.png


BIN
data/avatars/wav2lip256_avatar1/face_imgs/00000023.png


BIN
data/avatars/wav2lip256_avatar1/face_imgs/00000024.png


BIN
data/avatars/wav2lip256_avatar1/face_imgs/00000025.png


BIN
data/avatars/wav2lip256_avatar1/face_imgs/00000026.png


BIN
data/avatars/wav2lip256_avatar1/face_imgs/00000027.png


BIN
data/avatars/wav2lip256_avatar1/face_imgs/00000028.png


BIN
data/avatars/wav2lip256_avatar1/face_imgs/00000029.png


BIN
data/avatars/wav2lip256_avatar1/face_imgs/00000030.png


BIN
data/avatars/wav2lip256_avatar1/face_imgs/00000031.png


BIN
data/avatars/wav2lip256_avatar1/face_imgs/00000032.png


BIN
data/avatars/wav2lip256_avatar1/face_imgs/00000033.png


BIN
data/avatars/wav2lip256_avatar1/face_imgs/00000034.png


BIN
data/avatars/wav2lip256_avatar1/face_imgs/00000035.png


BIN
data/avatars/wav2lip256_avatar1/face_imgs/00000036.png


BIN
data/avatars/wav2lip256_avatar1/face_imgs/00000037.png


BIN
data/avatars/wav2lip256_avatar1/face_imgs/00000038.png


BIN
data/avatars/wav2lip256_avatar1/face_imgs/00000039.png


BIN
data/avatars/wav2lip256_avatar1/face_imgs/00000040.png


BIN
data/avatars/wav2lip256_avatar1/face_imgs/00000041.png


BIN
data/avatars/wav2lip256_avatar1/face_imgs/00000042.png


BIN
data/avatars/wav2lip256_avatar1/face_imgs/00000043.png


BIN
data/avatars/wav2lip256_avatar1/face_imgs/00000044.png


BIN
data/avatars/wav2lip256_avatar1/face_imgs/00000045.png


BIN
data/avatars/wav2lip256_avatar1/face_imgs/00000046.png


BIN
data/avatars/wav2lip256_avatar1/face_imgs/00000047.png


BIN
data/avatars/wav2lip256_avatar1/face_imgs/00000048.png


BIN
data/avatars/wav2lip256_avatar1/face_imgs/00000049.png


BIN
data/avatars/wav2lip256_avatar1/face_imgs/00000050.png


BIN
data/avatars/wav2lip256_avatar1/face_imgs/00000051.png


BIN
data/avatars/wav2lip256_avatar1/face_imgs/00000052.png


BIN
data/avatars/wav2lip256_avatar1/face_imgs/00000053.png


BIN
data/avatars/wav2lip256_avatar1/face_imgs/00000054.png


BIN
data/avatars/wav2lip256_avatar1/face_imgs/00000055.png


BIN
data/avatars/wav2lip256_avatar1/face_imgs/00000056.png


BIN
data/avatars/wav2lip256_avatar1/face_imgs/00000057.png


BIN
data/avatars/wav2lip256_avatar1/face_imgs/00000058.png


BIN
data/avatars/wav2lip256_avatar1/face_imgs/00000059.png


BIN
data/avatars/wav2lip256_avatar1/face_imgs/00000060.png


BIN
data/avatars/wav2lip256_avatar1/face_imgs/00000061.png


BIN
data/avatars/wav2lip256_avatar1/face_imgs/00000062.png


BIN
data/avatars/wav2lip256_avatar1/face_imgs/00000063.png


BIN
data/avatars/wav2lip256_avatar1/face_imgs/00000064.png


BIN
data/avatars/wav2lip256_avatar1/face_imgs/00000065.png


BIN
data/avatars/wav2lip256_avatar1/face_imgs/00000066.png


Some files were not shown because too many files changed in this diff