zwy 1 päivä sitten
vanhempi
commit
9e818eaf16
3 muutettua tiedostoa jossa 296 lisäystä ja 0 poistoa
  1. 88 0
      Image_Analysis/jietu.py
  2. 54 0
      back/back_api.py
  3. 154 0
      voxcpm_api.py

+ 88 - 0
Image_Analysis/jietu.py

@@ -0,0 +1,88 @@
+import paramiko
+from scp import SCPClient
+import os
+import time
+
+# ===================== 配置 =====================
+JUMP_HOST = "183.252.196.135"
+JUMP_PORT = 22
+JUMP_USER = "root"
+# 如果你有密码就填密码,有密钥就留空
+JUMP_PASSWORD = "9Mp$ctw6k~(W"  # 有密码就写在这里
+JUMP_KEY_PATH = None  # 有密钥文件就填路径
+
+# 远程 wav 目录
+REMOTE_WAV_DIR = "/opt/upload-service/static-6009/RTMP_pict/wav"
+# 本地保存目录
+LOCAL_DIR = r"/Users/alien/Desktop/Digital_Human/Image_Analysis/wav/wav"
+# 拉取间隔(秒)
+INTERVAL = 1
+# =================================================
+
+# 确保本地文件夹存在
+os.makedirs(LOCAL_DIR, exist_ok=True)
+
+def create_ssh():
+    ssh = paramiko.SSHClient()
+    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+    try:
+        if JUMP_PASSWORD:
+            ssh.connect(
+                JUMP_HOST,
+                port=JUMP_PORT,
+                username=JUMP_USER,
+                password=JUMP_PASSWORD,
+                timeout=10
+            )
+        else:
+            ssh.connect(
+                JUMP_HOST,
+                port=JUMP_PORT,
+                username=JUMP_USER,
+                key_filename=JUMP_KEY_PATH,
+                timeout=10
+            )
+        return ssh
+    except Exception as e:
+        return None
+
+def list_remote_files(ssh):
+    try:
+        stdin, stdout, stderr = ssh.exec_command(f"ls -1 {REMOTE_WAV_DIR}")
+        return [line.strip() for line in stdout if line.strip()]
+    except:
+        return []
+
+def pull_new_files():
+    ssh = create_ssh()
+    if not ssh:
+        return
+
+    try:
+        remote_files = list_remote_files(ssh)
+        local_files = set(os.listdir(LOCAL_DIR))
+
+        new_files = [f for f in remote_files if f not in local_files and f.endswith(".wav")]
+
+        if new_files:
+            with SCPClient(ssh.get_transport()) as scp:
+                for f in new_files:
+                    remote_path = f"{REMOTE_WAV_DIR}/{f}"
+                    local_path = os.path.join(LOCAL_DIR, f)
+                    scp.get(remote_path, local_path)
+                    print(f"✅ 已拉取: {f}")
+
+    except Exception as e:
+        pass
+    finally:
+        ssh.close()
+
+if __name__ == "__main__":
+    print("=" * 60)
+    print("🚀 本地实时拉取 wav 音频(纯Python版)")
+    print(f"📂 本地: {LOCAL_DIR}")
+    print("=" * 60)
+
+    while True:
+        pull_new_files()
+        time.sleep(INTERVAL)

+ 54 - 0
back/back_api.py

@@ -0,0 +1,54 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+import requests
+import base64
+import os
+
+# ===================== 配置 =====================
+API_URL = "http://183.252.196.135:6001"
+INPUT_FILE = "/Users/alien/Desktop/Digital_Human/coco.mp4"
+# 输出文件名由调用脚本决定,这里使用动态命名
+import sys
+if len(sys.argv) > 1:
+    OUTPUT_FILE = sys.argv[1]
+else:
+    OUTPUT_FILE = "/Users/alien/Desktop/Digital_Human/back/video/result.mp4"
+BG_COLOR = "black"
+CROP_TO_PERSON = True
+CROP_MARGIN = 50
+# =================================================
+
+def main():
+    print("🚀 开始处理视频...")
+
+    # 读取视频并转 base64
+    with open(INPUT_FILE, 'rb') as f:
+        video_base64 = base64.b64encode(f.read()).decode('utf-8')
+
+    # 发送请求(自动裁剪 + 背景颜色)
+    response = requests.post(
+        f"{API_URL}/v1/video/greenscreen",
+        json={
+            "video_base64": video_base64,
+            "downsample_ratio": 0.25,
+            "bg_color": BG_COLOR,
+            "crop_to_person": CROP_TO_PERSON,
+            "margin": CROP_MARGIN,
+            "return_base64": True
+        },
+        timeout=600
+    )
+
+    # 获取结果
+    result = response.json()
+    video_bytes = base64.b64decode(result['video_base64'])
+
+    # 直接保存到你指定的目录
+    with open(OUTPUT_FILE, 'wb') as f:
+        f.write(video_bytes)
+
+    print(f"\n🎉 处理完成!")
+    print(f"✅ 文件已保存到:{OUTPUT_FILE}")
+
+if __name__ == "__main__":
+    main()

+ 154 - 0
voxcpm_api.py

@@ -0,0 +1,154 @@
+#!/usr/bin/env python3
+"""
+VoxCPM2 声音克隆 - 最终版
+支持两种方式:
+1. 使用文件路径(音频在API服务器上)
+2. 使用base64编码(音频在本地电脑)
+"""
+
+import requests
+import base64
+import os
+
+# ==================== 配置 ====================
+API_URL = "http://183.252.196.135:6003"
+SAVE_DIR = "/Users/alien/Desktop/Digital_Human/Image_Analysis/knowledge_kelong"
+os.makedirs(SAVE_DIR, exist_ok=True)
+
+TEXT = "音频可以在本地电脑,自动传输到服务器。"
+
+# 本地音频文件路径(在你的调用电脑上)
+LOCAL_REFERENCE_WAV = "/Users/alien/Desktop/Digital_Human/voice_output.wav"
+
+# ==================== 方式1: 使用base64编码(推荐) ====================
+def generate_tts_clone_base64():
+    """
+    方式1: 将本地音频编码为base64发送给API
+    优点:不需要提前上传文件到API服务器
+    """
+    print("=" * 60)
+    print("🚀 开始声音克隆(方式1:base64传输)")
+    print("=" * 60)
+    print(f"📝 文本: {TEXT}")
+    print(f"🔊 本地音频: {LOCAL_REFERENCE_WAV}")
+    
+    # 检查本地文件是否存在
+    if not os.path.exists(LOCAL_REFERENCE_WAV):
+        print(f"❌ 本地音频文件不存在: {LOCAL_REFERENCE_WAV}")
+        return
+    
+    # 读取本地音频并编码为base64
+    print(f"📦 正在编码音频为base64...")
+    with open(LOCAL_REFERENCE_WAV, "rb") as f:
+        ref_base64 = base64.b64encode(f.read()).decode("utf-8")
+    print(f"✅ 编码完成,大小: {len(ref_base64) / 1024:.2f} KB")
+    
+    # 发送请求 - 使用base64编码
+    resp = requests.post(
+        f"{API_URL}/v1/tts/generate",
+        json={
+            "text": TEXT,
+            "reference_wav_base64": ref_base64,  # ✅ 使用base64字段
+            "cfg_value": 2.0,
+            "inference_timesteps": 20,
+            "return_base64": True
+        },
+        timeout=300
+    )
+    
+    # 检查响应
+    if resp.status_code != 200:
+        print(f"❌ 请求失败: {resp.status_code}")
+        print(f"错误信息: {resp.text}")
+        return
+    
+    # 保存音频
+    result = resp.json()
+    
+    if not result.get('audio_base64'):
+        print("❌ 未返回音频数据")
+        print(f"响应内容: {result}")
+        return
+    
+    audio_data = base64.b64decode(result["audio_base64"])
+    
+    out_path = os.path.join(SAVE_DIR, "clone_result_base64.wav")
+    with open(out_path, "wb") as f:
+        f.write(audio_data)
+    
+    print(f"✅ 声音克隆完成!")
+    print(f"💾 已保存到: {out_path}")
+    print(f"📊 文件大小: {len(audio_data) / 1024:.2f} KB")
+    print("=" * 60)
+
+
+# ==================== 方式2: 使用文件路径 ====================
+def generate_tts_clone_path():
+    """
+    方式2: 使用API服务器上的文件路径
+    前提:音频文件必须已经在API服务器上
+    """
+    print("=" * 60)
+    print("🚀 开始声音克隆(方式2:文件路径)")
+    print("=" * 60)
+    
+    # 这个路径必须在API服务器 192.168.22.9 上存在
+    SERVER_REFERENCE_WAV = "/tmp/voice_output.wav"
+    
+    print(f"📝 文本: {TEXT}")
+    print(f"🔊 服务器音频: {SERVER_REFERENCE_WAV}")
+    
+    resp = requests.post(
+        f"{API_URL}/v1/tts/generate",
+        json={
+            "text": TEXT,
+            "reference_wav_path": SERVER_REFERENCE_WAV,  # ✅ 使用文件路径
+            "cfg_value": 2.0,
+            "inference_timesteps": 20,
+            "return_base64": True
+        },
+        timeout=300
+    )
+    
+    if resp.status_code != 200:
+        print(f"❌ 请求失败: {resp.status_code}")
+        print(f"错误信息: {resp.text}")
+        return
+    
+    result = resp.json()
+    
+    if not result.get('audio_base64'):
+        print("❌ 未返回音频数据")
+        print(f"响应内容: {result}")
+        return
+    
+    audio_data = base64.b64decode(result["audio_base64"])
+    
+    out_path = os.path.join(SAVE_DIR, "clone_result_path.wav")
+    with open(out_path, "wb") as f:
+        f.write(audio_data)
+    
+    print(f"✅ 声音克隆完成!")
+    print(f"💾 已保存到: {out_path}")
+    print(f"📊 文件大小: {len(audio_data) / 1024:.2f} KB")
+    print("=" * 60)
+
+
+# ==================== 主函数 ====================
+if __name__ == "__main__":
+    print("\n📋 选择声音克隆方式:")
+    print("1. 音频在本地电脑 → 使用方式1(base64)")
+    print("2. 音频在API服务器 → 使用方式2(文件路径)")
+    print()
+    
+    # 默认使用方式1(base64)
+    print("🔹 使用方式1(base64传输)...")
+    generate_tts_clone_base64()
+    
+    # 如需使用方式2,取消注释:
+    # generate_tts_clone_path()
+    
+    print("\n💡 提示:")
+    print("  - 方式1(base64): 音频可以在本地电脑,自动传输到服务器")
+    print("  - 方式2(路径): 音频必须先在API服务器上")
+    print("  - 推荐使用方式1,更方便灵活")