#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 基于API的智能语音聊天助手 - ASR: 使用本地ASR识别 - LLM: 千问API (DashScope) - TTS: 阿里云语音合成API - 全部使用云端API,无需GPU,部署简单 """ import os import time import queue import threading import numpy as np import sounddevice as sd from pathlib import Path from http import HTTPStatus import dashscope import json # ============ 配置参数 ============ # 音频参数 SAMPLING_RATE = 16000 # ASR采样率 CHUNK_DURATION = 0.5 # 录音块时长(秒) CHUNK_SIZE = int(SAMPLING_RATE * CHUNK_DURATION) # ALSA缓冲区设置,增加缓冲区大小减少欠载错误 LATENCY = 0.1 # 音频流延迟(秒) - 增加延迟以减少ALSA欠载错误 # ===== API配置说明 ===== # ASR: 使用本地ASR识别 #TTS: 使用阿里云智能语音交互 (NLS) # LLM: 使用千问 API (DashScope) # 阿里云 NLS 配置(用于 ASR 和 TTS) ALIYUN_APPKEY = "uE6YeyIf1q7InhxB" # NLS项目的Appkey ALIYUN_ACCESS_KEY_ID = "LTAI5tGZwbyXsaykduFMsTGE" # AccessKey ID ALIYUN_ACCESS_KEY_SECRET = "888up1AgLIKDm1rVctnG422OcCmXFt" # AccessKey Secret # 千问 API 配置(用于 LLM) QWEN_API_KEY = "sk-362d1accbdec4bacbc8d291348049ab0" # 千问API Key QWEN_MODEL = "qwen-plus" # LLM模型: qwen-turbo, qwen-plus, qwen-max # 天气查询 API 配置 WEATHER_API_KEY = "738b541a5f7a" # 天气API密钥 WEATHER_API_URL = "https://whyta.cn/api/tianqi" # 天气API接口地址 # 唤醒词配置 WAKE_WORDS = ["优宝同学","又把同学","有保同学","你好优宝"] ENABLE_WAKE_WORD = True # 是否启用唤醒词 AWAKE_TIMEOUT = 60 # 唤醒后无操作自动休眠时间(秒) - 从30秒增加到60秒 # 休眠词配置(说这些词会进入休眠,而不是退出程序) SLEEP_WORDS = ["拜拜", "再见", "休息吧", "睡觉吧"] SLEEP_RESPONSES = [ "好的,我先休息了,有需要再叫我!", "好的,拜拜!", "那我休息了,再见!", "收到,我去休息了!" ] # 退出程序词配置(说这些词会完全退出程序) EXIT_WORDS = ["关闭程序", "退出程序", "彻底退出"] EXIT_RESPONSES = [ "好的,程序即将关闭,再见!", "收到,正在退出程序!" ] # TTS音色配置 # 阿里云NLS支持的音色列表(更多音色请查看文档) # 女声:zhiyuan,zhiyue,zhisha,aiqi,aijia,siqi, # 男声:aicheng,zhida,aida, # 童声:mashu,yueer TTS_VOICE = "aicheng" TTS_SPEECH_RATE = 0 # 语速: -500到500, 0为正常速度 TTS_PITCH_RATE = 0 # 音调: -500到500, 0为正常音调 TTS_VOLUME = 80 # 音量: 0-100 # VAD参数(语音活动检测) # ⚡ 高灵敏度配置 - 支持远距离拾音 SILENCE_THRESHOLD = 0.4 # 静音阈值(秒) - 优化: 从0.7降至0.4秒,更快响应 VOLUME_THRESHOLD = 0.009 # 音量阈值 - 从0.008降至0.003,大幅提高灵敏度,支持远距离拾音 AUDIO_GAIN = 1.3 # 音频增益倍数 - 放大远距离声音 # 音频设备 MIC_DEVICE = 0 # 自动选择麦克风 SPEAKER_DEVICE = None # 自动选择扬声器 # 缓存目录 CACHE_DIR = Path.home() / ".cache" / "voice_chat_api" CACHE_DIR.mkdir(parents=True, exist_ok=True) # ============ 人脸识别配置 ============ FACE_RECOGNITION_ENABLED = True # 是否启用人脸识别 FACE_RECOGNITION_CONFIDENCE_THRESHOLD = 300 # 人脸检测置信度阈值 FACE_RECOGNITION_DURATION_THRESHOLD = 3 # 人脸持续出现时间阈值(秒) FACE_DATABASE_PATH = "face_database.pkl" # 人脸数据库路径 CAMERA_ID = 10,8 # 摄像头ID FACE_RECOGNITION_MODEL = "buffalo_l" # 人脸识别模型 FORCE_CUDA = True # 是否强制使用CUDA(建议根据设备情况调整) # ============ 全局队列 ============ audio_queue = queue.Queue() # 音频数据队列 text_queue = queue.Queue() # 识别文本队列 response_queue = queue.Queue() # AI回复队列 tts_queue = queue.Queue() # TTS音频队列 face_event_queue = queue.Queue() # 人脸识别事件队列 # 控制标志 stop_flag = threading.Event() is_speaking = threading.Event() # TTS正在播放 is_listening = threading.Event() # 正在录音 is_awake = threading.Event() # 唤醒状态 exit_requested = threading.Event() # 请求退出 sleep_requested = threading.Event() # 请求休眠 asr_input_enabled = threading.Event() # ASR文本传入LLM的开关,默认开启 face_recognition_running = threading.Event() # 人脸识别运行状态 asr_input_enabled.set() # 默认开启 # 全局人脸识别实例,用于共享资源 global_face_recognition_instance = None face_recognition_camera_lock = threading.Lock() # 摄像头访问锁 # ============ 人脸识别人脸识别模块 ============ class FaceRecognitionModule: """人脸识别模块""" def __init__(self): """初始化人脸识别模块""" self.face_recognition = None self.camera = None self.running = False self.face_tracking = {} self.current_face = None self.current_face_start_time = None def initialize(self): """初始化人脸识别模型和摄像头""" try: # 添加项目根目录到Python路径 import sys sys.path.append('/home/ubuntu') # 直接导入FaceRecognitionAPI类 from face_recognition_api import FaceRecognitionAPI # 初始化人脸识别API self.face_recognition = FaceRecognitionAPI( db_path=FACE_DATABASE_PATH, model_name=FACE_RECOGNITION_MODEL, force_cuda=FORCE_CUDA ) # 打开摄像头 - 尝试多个ID import cv2 # 尝试的摄像头ID列表,优先级从高到低 camera_ids = [10, 8] self.camera = None for cam_id in camera_ids: print(f"尝试打开摄像头 ID: {cam_id}...") self.camera = cv2.VideoCapture(cam_id) if self.camera.isOpened(): print(f"✅ 成功打开摄像头 ID: {cam_id}") break else: print(f"❌ 无法打开摄像头 ID: {cam_id}") # 释放失败的摄像头资源 self.camera.release() self.camera = None if not self.camera: print("❌ 所有摄像头ID都无法打开") return False # 设置摄像头分辨率 self.camera.set(cv2.CAP_PROP_FRAME_WIDTH, 640) self.camera.set(cv2.CAP_PROP_FRAME_HEIGHT, 480) # 将实例保存到全局变量,供工具调用 global global_face_recognition_instance global_face_recognition_instance = self print("✅ 人脸识别模块初始化成功") return True except Exception as e: print(f"❌ 人脸识别模块初始化失败: {e}") import traceback traceback.print_exc() return False def start(self): """启动人脸识别后台线程""" if not self.face_recognition or not self.camera: if not self.initialize(): return False self.running = True face_recognition_running.set() # 启动后台线程 self.thread = threading.Thread(target=self._face_recognition_loop, daemon=True) self.thread.start() print("✅ 人脸识别后台线程已启动") return True def stop(self): """停止人脸识别后台线程""" self.running = False face_recognition_running.clear() if hasattr(self, 'thread') and self.thread.is_alive(): self.thread.join(timeout=2) if self.camera: self.camera.release() print("✅ 人脸识别后台线程已停止") def _face_recognition_loop(self): """人脸识别后台循环""" import cv2 # 创建窗口 cv2.namedWindow("人脸识别", cv2.WINDOW_NORMAL) cv2.resizeWindow("人脸识别", 640, 480) while self.running and not stop_flag.is_set(): try: # 获取摄像头锁 with face_recognition_camera_lock: # 读取摄像头帧 ret, frame = self.camera.read() if not ret: time.sleep(0.01) continue # 检测和识别人脸 try: results = self.face_recognition.detect_and_recognize(frame) # 绘制人脸识别结果 self._draw_recognition_results(frame, results) # 显示摄像头窗口 cv2.imshow("人脸识别", frame) # 处理键盘事件,按q键退出 if cv2.waitKey(1) & 0xFF == ord('q'): break # 处理识别结果 self._process_recognition_results(results) except Exception as recog_e: print(f"⚠️ 人脸识别处理错误: {recog_e}") # 增加安全检查,避免vector越界 import traceback traceback.print_exc() # 短暂休眠后继续 time.sleep(0.5) continue # 控制检测频率,避免占用过多资源 time.sleep(0.1) except Exception as e: print(f"❌ 人脸识别循环错误: {e}") import traceback traceback.print_exc() time.sleep(1) # 关闭窗口 cv2.destroyWindow("人脸识别") def _process_recognition_results(self, results): """处理人脸识别结果""" current_time = time.time() detected_faces = results.get("faces", []) # 安全检查:确保detected_faces是列表 if not isinstance(detected_faces, list): print("⚠️ 检测到的人脸不是列表类型,跳过处理") return # 更新人脸追踪信息 updated_face_ids = set() valid_faces = [] # 过滤有效人脸(置信度达标) for face_info in detected_faces: try: # 安全检查:确保face_info是字典且包含必要键 if not isinstance(face_info, dict): continue # 检查置信度 similarity = face_info.get("similarity", 0) if similarity < FACE_RECOGNITION_CONFIDENCE_THRESHOLD: continue # 简单的人脸ID生成(基于边界框) bbox = face_info.get("bbox", []) # 安全检查:确保边界框有效 if not isinstance(bbox, (list, tuple)) or len(bbox) < 4: continue # 确保边界框坐标是整数 try: bbox = [int(coord) for coord in bbox] except (ValueError, TypeError): continue face_id = f"{bbox[0]}_{bbox[1]}_{bbox[2]}_{bbox[3]}" updated_face_ids.add(face_id) # 更新人脸持续时间 if face_id in self.face_tracking: # 人脸已存在,更新持续时间 self.face_tracking[face_id]["end_time"] = current_time self.face_tracking[face_id]["duration"] = current_time - self.face_tracking[face_id]["start_time"] self.face_tracking[face_id]["face_info"] = face_info else: # 新人脸,初始化追踪信息 self.face_tracking[face_id] = { "start_time": current_time, "end_time": current_time, "duration": 0, "face_info": face_info } # 添加到有效人脸列表 valid_faces.append({ "face_id": face_id, "face_info": face_info, "duration": self.face_tracking[face_id]["duration"] }) except Exception as face_e: print(f"⚠️ 处理单个人脸信息错误: {face_e}") import traceback traceback.print_exc() continue # 处理有效人脸 if valid_faces: # 过滤出已知人脸 known_faces = [] for face in valid_faces: face_info = face["face_info"] name = face_info.get("name", "Unknown") if name != "Unknown" and face["duration"] >= FACE_RECOGNITION_DURATION_THRESHOLD: known_faces.append(face) # 检查是否满足单一人脸条件 if len(known_faces) == 1: # 单一人脸且停留时间达标 face = known_faces[0] face_info = face["face_info"] name = face_info.get("name", "Unknown") # 已知人脸,发送个性化打招呼事件 self._send_face_event({ "type": "known_single_face", "name": name, "face_info": face_info }) elif len(known_faces) > 1: # 多个人脸,但只处理已知的 # 获取所有已知人脸的名字 known_names = [] for face in known_faces: name = face["face_info"].get("name", "Unknown") if name not in known_names: known_names.append(name) # 发送包含已知人脸信息的事件 self._send_face_event({ "type": "multiple_known_faces", "face_count": len(known_faces), "known_names": known_names }) # 当只有unknown人脸时,不发送任何事件 # 清理已消失的人脸 try: for face_id in list(self.face_tracking.keys()): if face_id not in updated_face_ids: del self.face_tracking[face_id] except Exception as clean_e: print(f"⚠️ 清理人脸追踪信息错误: {clean_e}") # 重置追踪字典,避免持续错误 self.face_tracking = {} def _draw_recognition_results(self, frame, results): """在图像上绘制人脸识别结果""" import cv2 # 遍历所有检测到的人脸 for face in results.get("faces", []): try: # 获取边界框 bbox = face.get("bbox", []) if not isinstance(bbox, (list, tuple)) or len(bbox) < 4: continue # 确保边界框坐标是整数 bbox = [int(coord) for coord in bbox] # 获取人脸信息 name = face.get("name", "Unknown") confidence = face.get("confidence", 0.0) similarity = face.get("similarity", 0.0) # 绘制边界框 if name != "Unknown": # 已知人脸使用绿色框 box_color = (0, 255, 0) else: # 未知人脸使用蓝色框 box_color = (255, 0, 0) cv2.rectangle(frame, (bbox[0], bbox[1]), (bbox[2], bbox[3]), box_color, 2) # 绘制文本背景 text_bg_color = (0, 0, 0) # 黑色背景 text_color = (255, 255, 255) # 白色文字 # 准备文本 if name != "Unknown": text = f"{name} ({similarity:.1f})" else: text = f"Unknown ({confidence:.1f})" # 使用freetype库绘制文字 try: # 调用face_recognition_api中的_put_text_freetype方法 self.face_recognition._put_text_freetype( frame, text, (bbox[0], bbox[1] - 10), font_size=18, color=text_color ) except Exception as ft_e: # 如果freetype绘制失败,使用OpenCV默认字体 print(f"⚠️ freetype绘制文字失败,使用默认字体: {ft_e}") cv2.putText( frame, text, (bbox[0], bbox[1] - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, text_color, 2 ) except Exception as draw_e: print(f"⚠️ 绘制人脸识别结果错误: {draw_e}") import traceback traceback.print_exc() continue def _send_face_event(self, event_data): """发送人脸事件到事件队列""" # 确保事件数据包含基本信息 event = { "timestamp": time.time(), **event_data } face_event_queue.put(event) if event_data["type"] == "known_single_face": print(f"✅ 检测到已知单一人脸: {event_data['name']}") elif event_data["type"] == "unknown_single_face": print(f"✅ 检测到未知单一人脸") elif event_data["type"] == "multiple_faces": print(f"✅ 检测到多个人脸: {event_data['face_count']}人") def pause(self): """暂停人脸识别""" self.running = False face_recognition_running.clear() print("✅ 人脸识别已暂停") def resume(self): """恢复人脸识别""" self.start() # ============ 1. 音频采集模块 ============ class AudioCapture: """实时音频采集""" def __init__(self): self.stream = None self.audio_buffer = [] self.silence_start = None def callback(self, indata, frames, time_info, status): """音频流回调 - 流式模式""" # TTS播放时不采集音频,避免回声(完全丢弃所有数据) if is_speaking.is_set(): # 清空所有状态 self.audio_buffer = [] self.silence_start = None is_listening.clear() return # 转换为float32格式 audio_data = indata[:, 0].astype(np.float32) # 🎤 应用音频增益 - 放大远距离声音 audio_data = audio_data * AUDIO_GAIN # 防止削波(限制在-1到1之间) audio_data = np.clip(audio_data, -1.0, 1.0) # 计算音量(检测是否有声音) volume = np.abs(audio_data).mean() if volume > VOLUME_THRESHOLD: # 有声音 self.silence_start = None # 流式模式:立即发送音频块到队列 audio_queue.put(audio_data.copy()) is_listening.set() else: # 静音 # 检测静音持续时间 if is_listening.is_set(): if self.silence_start is None: self.silence_start = time.time() elif time.time() - self.silence_start > SILENCE_THRESHOLD: # 静音超过阈值,标记说话结束 self.silence_start = None is_listening.clear() def start(self): """启动音频采集""" self.stream = sd.InputStream( samplerate=SAMPLING_RATE, blocksize=CHUNK_SIZE, device=MIC_DEVICE, channels=1, dtype=np.float32, callback=self.callback, latency=LATENCY # 增加延迟以减少ALSA欠载错误 ) self.stream.start() def stop(self): """停止采集""" if self.stream: self.stream.stop() self.stream.close() # ============ 2. 语音唤醒检测模块 ============ class WakeWordDetector: """语音唤醒检测器""" def __init__(self): self.wake_words = WAKE_WORDS self.last_active_time = None # 预计算唤醒词的拼音(用于模糊匹配) try: from pypinyin import lazy_pinyin self.wake_words_pinyin = { word: ''.join(lazy_pinyin(word)) for word in self.wake_words } except ImportError: self.wake_words_pinyin = None if not ENABLE_WAKE_WORD: is_awake.set() # 如果不启用唤醒词,默认一直唤醒 def check_wake_word(self, text): """检查是否包含唤醒词(支持拼音模糊匹配)""" if not text: return False # 方法1: 精确匹配(优先) for wake_word in self.wake_words: if wake_word in text: return True # 方法2: 拼音模糊匹配 if self.wake_words_pinyin: try: from pypinyin import lazy_pinyin text_pinyin = ''.join(lazy_pinyin(text)) for wake_word, wake_pinyin in self.wake_words_pinyin.items(): # 检查拼音是否包含 if wake_pinyin in text_pinyin: return True # 更模糊的匹配:计算相似度 similarity = self._calculate_similarity(wake_pinyin, text_pinyin) if similarity > 0.7: # 70%相似度就认为匹配 return True except Exception: pass return False def _calculate_similarity(self, s1, s2): """计算两个拼音字符串的相似度(编辑距离)""" if not s1 or not s2: return 0.0 # Levenshtein距离 len1, len2 = len(s1), len(s2) dp = [[0] * (len2 + 1) for _ in range(len1 + 1)] for i in range(len1 + 1): dp[i][0] = i for j in range(len2 + 1): dp[0][j] = j for i in range(1, len1 + 1): for j in range(1, len2 + 1): if s1[i-1] == s2[j-1]: dp[i][j] = dp[i-1][j-1] else: dp[i][j] = min(dp[i-1][j], dp[i][j-1], dp[i-1][j-1]) + 1 distance = dp[len1][len2] max_len = max(len1, len2) similarity = 1 - (distance / max_len) if max_len > 0 else 0 return similarity def check_sleep_word(self, text): """检查是否包含休眠词(支持拼音匹配)""" if not text: return False # 精确匹配 for sleep_word in SLEEP_WORDS: if sleep_word in text: return True # 拼音匹配 if self.wake_words_pinyin: try: from pypinyin import lazy_pinyin text_pinyin = ''.join(lazy_pinyin(text)) for sleep_word in SLEEP_WORDS: sleep_pinyin = ''.join(lazy_pinyin(sleep_word)) if sleep_pinyin in text_pinyin: return True except: pass return False def check_exit_word(self, text): """检查是否包含退出程序词(支持拼音匹配)""" if not text: return False # 精确匹配 for exit_word in EXIT_WORDS: if exit_word in text: return True # 拼音匹配 if self.wake_words_pinyin: try: from pypinyin import lazy_pinyin text_pinyin = ''.join(lazy_pinyin(text)) for exit_word in EXIT_WORDS: exit_pinyin = ''.join(lazy_pinyin(exit_word)) if exit_pinyin in text_pinyin: return True except: pass return False def wake_up(self): """唤醒""" is_awake.set() self.last_active_time = time.time() def sleep(self): """休眠""" is_awake.clear() self.last_active_time = None def update_activity(self): """更新活动时间""" if is_awake.is_set() and ENABLE_WAKE_WORD: self.last_active_time = time.time() def check_timeout(self): """检查是否超时需要休眠""" if ENABLE_WAKE_WORD and is_awake.is_set() and self.last_active_time: if time.time() - self.last_active_time > AWAKE_TIMEOUT: self.sleep() return True return False # ============ 3. ASR识别模块(基于外部程序)============ class ExternalASR: """基于外部程序的ASR识别(使用pty方式)""" def __init__(self): try: import pty import os import re self.pty = pty self.os = os self.re = re self.pattern = self.re.compile(r"\('([^']*)',\s*'([^']*)'\)") self.demo_path = self.os.path.abspath("../audio_check_demo") if not self.os.path.exists(self.demo_path): print(f"[ASR] ✗ 外部程序不存在: {self.demo_path}") raise FileNotFoundError(f"ASR程序不存在: {self.demo_path}") self.master = None self.slave = None self.pid = None self.wake_detector = None self._start_pty_process() print("[ASR] ✓ 外部ASR程序加载成功") except Exception as e: print(f"[ASR] ✗ 初始化失败: {e}") import traceback traceback.print_exc() raise def _start_pty_process(self): """启动pty进程运行外部ASR程序""" self.master, self.slave = self.pty.openpty() self.pid = self.os.fork() if self.pid == 0: self.os.close(self.master) self.os.dup2(self.slave, 1) self.os.dup2(self.slave, 2) self.os.close(self.slave) self.os.execv(self.demo_path, [self.demo_path]) else: self.os.close(self.slave) print(f"[ASR] 已启动外部ASR程序 (PID: {self.pid})") def _process_recognition_result(self, text, wake_detector): """处理识别结果""" if not text: return if wake_detector.check_exit_word(text): import random goodbye_msg = random.choice(EXIT_RESPONSES) response_queue.put(goodbye_msg) exit_requested.set() return if wake_detector.check_sleep_word(text): if is_awake.is_set(): import random sleep_msg = random.choice(SLEEP_RESPONSES) response_queue.put(sleep_msg) sleep_requested.set() return if not asr_input_enabled.is_set(): return if ENABLE_WAKE_WORD: if not is_awake.is_set(): if wake_detector.check_wake_word(text): wake_detector.wake_up() print(f"[ASR] 识别: {text}") wake_detector.update_activity() text_queue.put(text) else: if len(text.strip()) < 2: return print(f"[ASR] 识别: {text}") wake_detector.update_activity() text_queue.put(text) else: print(f"[ASR] 识别: {text}") text_queue.put(text) def run(self, wake_detector): """ASR线程主循环""" self.wake_detector = wake_detector print("[ASR] 外部ASR识别线程已启动") try: with self.os.fdopen(self.master, 'r', encoding='utf-8', errors='ignore') as f: while not stop_flag.is_set(): try: line = f.readline() if not line: break line = line.strip() if not line: continue match = self.pattern.match(line) if match: text = match.group(1) pinyin = match.group(2) print(f"[ASR] 识别文本:{text} | 拼音:{pinyin}") if text: self._process_recognition_result(text, self.wake_detector) else: print(f"【原始】:{line}") except Exception as e: print(f"[ASR] 读取错误: {e}") import traceback traceback.print_exc() break except Exception as e: print(f"[ASR] 线程错误: {e}") import traceback traceback.print_exc() finally: if self.master: try: self.os.close(self.master) except: pass if self.pid > 0: try: self.os.waitpid(self.pid, 0) except: pass # ============ 3. ASR识别模块(阿里云NLS - 已废弃)============ class AliyunStreamingASR: """阿里云NLS流式语音识别(WebSocket)""" def __init__(self): print("[ASR] 初始化阿里云NLS流式识别...") try: self.appkey = ALIYUN_APPKEY self.access_key_id = ALIYUN_ACCESS_KEY_ID self.access_key_secret = ALIYUN_ACCESS_KEY_SECRET self.token = None self.token_expire_time = 0 # WebSocket连接 self.ws = None self.ws_connected = False self.ws_lock = threading.Lock() self.task_id = None # 当前识别会话ID self.session_started = False # 当前会话是否已发送StartTranscription # 识别结果缓存 self.current_result = "" self.final_result = "" self.sentence_results = [] # 🔧 新增:存储SentenceEnd的句子结果 self.wake_detector = None # 🔧 新增:保存wake_detector引用 # Token管理(Token有效期180秒) self.token_create_time = 0 # 获取Token self._get_token() print("[ASR] ✓ 阿里云NLS 流式ASR初始化成功") print(f"[ASR] 使用Appkey: {self.appkey}") print(f"[ASR] 模式: WebSocket 流式识别") except Exception as e: print(f"[ASR] ✗ 初始化失败: {e}") raise def _get_token(self): """获取NLS访问Token(使用阿里云OpenAPI签名)""" import requests import hmac import hashlib import base64 from datetime import datetime import uuid from urllib.parse import quote try: # OpenAPI参数 timestamp = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ') nonce = str(uuid.uuid4()) # 公共请求参数 params = { 'AccessKeyId': self.access_key_id, 'Action': 'CreateToken', 'Format': 'JSON', 'RegionId': 'cn-shanghai', 'SignatureMethod': 'HMAC-SHA1', 'SignatureNonce': nonce, 'SignatureVersion': '1.0', 'Timestamp': timestamp, 'Version': '2019-02-28' } # 构造签名字符串 sorted_params = sorted(params.items()) canonicalized_query_string = '&'.join([f"{quote(k, safe='')}={quote(str(v), safe='')}" for k, v in sorted_params]) string_to_sign = f"GET&%2F&{quote(canonicalized_query_string, safe='')}" # 计算签名 signature = base64.b64encode( hmac.new( (self.access_key_secret + '&').encode('utf-8'), string_to_sign.encode('utf-8'), hashlib.sha1 ).digest() ).decode('utf-8') # 添加签名到参数 params['Signature'] = signature # 发送请求 url = "https://nls-meta.cn-shanghai.aliyuncs.com/" print("[ASR] 正在获取NLS Token...") print(f"[ASR] 使用签名认证") response = requests.get(url, params=params, timeout=10) print(f"[ASR] HTTP状态码: {response.status_code}") if response.status_code == 200: result = response.json() print(f"[ASR] CreateToken响应: {result}") # 提取Token if 'Token' in result and isinstance(result['Token'], dict): if 'Id' in result['Token']: self.token = result['Token']['Id'] self.token_create_time = time.time() self.token_expire_time = result['Token'].get('ExpireTime', 0) print(f"[ASR] ✓ Token获取成功") print(f"[ASR] Token: {self.token}") print(f"[ASR] 有效期: 180秒") print(f"[ASR] Appkey: {self.appkey}") print(f"[ASR] 提示: 确保Token和Appkey来自同一账号") return print(f"[ASR] ⚠️ 无法从响应提取Token,响应: {result}") else: print(f"[ASR] Token API失败: {response.text}") except Exception as e: print(f"[ASR] ❌ Token获取异常: {e}") import traceback traceback.print_exc() # 失败提示 print("\n" + "=" * 60) print("❌ Token获取失败 - 请检查:") print("=" * 60) print(f"1. AccessKey ID: {self.access_key_id}") print(f"2. AccessKey Secret: {self.access_key_secret[:5]}***") print("3. 是否已开通 NLS 服务: https://nls.console.aliyun.com/") print("4. AccessKey 是否有 NLS 权限") print("=" * 60 + "\n") self.token = None def _connect_websocket(self): """建立WebSocket连接""" import websocket import json try: # 检查Token有效期(180秒) if not self.token or (time.time() - self.token_create_time) > 170: print("[ASR] Token为空或即将过期,重新获取...") self._get_token() if not self.token: print("[ASR] Token获取失败,无法建立连接") return False # ✅ 正确方式:Token必须通过URL参数传递(不是Header!) ws_url = f"wss://nls-gateway-cn-shanghai.aliyuncs.com/ws/v1?token={self.token}" # 创建WebSocket连接(Token已在URL中,无需header) self.ws = websocket.WebSocketApp( ws_url, on_message=self._on_message, on_error=self._on_error, on_close=self._on_close, on_open=self._on_open ) # 在新线程中运行WebSocket ws_thread = threading.Thread(target=self.ws.run_forever, daemon=True) ws_thread.start() # 等待连接建立 timeout = 5 start_time = time.time() while not self.ws_connected and time.time() - start_time < timeout: time.sleep(0.1) if self.ws_connected: print("[ASR] ✓ WebSocket连接成功") return True else: print("[ASR] ✗ WebSocket连接超时") return False except Exception as e: print(f"[ASR] WebSocket连接失败: {e}") import traceback traceback.print_exc() return False def _on_open(self, ws): """WebSocket连接打开(长连接建立成功)""" print("[ASR] ✓ WebSocket长连接已建立") self.ws_connected = True # 不在这里发送StartTranscription,等待实际识别时再发送 def _send_start_transcription(self): """发送开始识别命令(每次新的识别会话调用)""" import json import uuid if not self.ws_connected or not self.ws: print("[ASR] WebSocket未连接") return False try: # 为每个新会话生成新的task_id self.task_id = uuid.uuid4().hex # 32位十六进制 message_id = uuid.uuid4().hex # 32位十六进制 # 清空之前的结果 self.current_result = "" self.final_result = "" print(f"[ASR] 开始新识别会话 (task_id: {self.task_id})") start_msg = { "header": { "message_id": message_id, "task_id": self.task_id, "namespace": "SpeechTranscriber", "name": "StartTranscription", "appkey": self.appkey }, "payload": { "format": "pcm", "sample_rate": SAMPLING_RATE, "enable_intermediate_result": True, "enable_punctuation_prediction": True, "enable_inverse_text_normalization": True, "max_sentence_silence": 400 # 🔧 从800降至400ms,更快触发识别 } } with self.ws_lock: if self.ws: self.ws.send(json.dumps(start_msg)) print("[ASR] ✓ StartTranscription已发送") return True except Exception as e: print(f"[ASR] 发送StartTranscription失败: {e}") return False def _on_message(self, ws, message): """接收识别结果""" import json try: result = json.loads(message) # 解析结果 if 'header' in result: name = result['header'].get('name', '') status = result['header'].get('status', 0) # 🔧 添加调试: 打印所有消息类型(排除常见的) if name not in ['TranscriptionResultChanged', 'TranscriptionCompleted', 'TaskFailed', 'SentenceEnd', 'SentenceBegin']: print(f"[ASR] 收到消息: {name}, status: {status}") if name == 'TranscriptionResultChanged': # 中间结果 if 'payload' in result and 'result' in result['payload']: text = result['payload']['result'] self.current_result = text print(f"[ASR] 中间结果: {text}") elif name == 'SentenceEnd': # 🔧 新增:处理句子结束消息(流式识别的实时结果) if 'payload' in result and 'result' in result['payload']: text = result['payload']['result'] print(f"[ASR] ✓ 句子完成: {text}") # 🔧 过滤短句和可能的回声 if text and len(text.strip()) >= 2: # 至少2个字符 # 检查是否在播放中(回声) if is_speaking.is_set(): print(f"[ASR] ⚠️ 播放期间的识别,丢弃(可能是回声): {text}") elif self.wake_detector: self._process_recognition_result(text, self.wake_detector) else: print(f"[ASR] ⚠️ 句子太短,丢弃: {text}") elif name == 'TranscriptionCompleted': # 最终结果 print(f"[ASR] 收到TranscriptionCompleted消息") print(f"[ASR] 完整消息: {result}") if 'payload' in result: payload = result['payload'] print(f"[ASR] payload: {payload}") # 尝试多种可能的字段 text = None if 'result' in payload: text = payload['result'] elif 'text' in payload: text = payload['text'] elif isinstance(payload, str): text = payload if text: self.final_result = text print(f"[ASR] ✓ 最终结果: {text}") else: print(f"[ASR] ⚠️ payload中无result字段") else: print(f"[ASR] ⚠️ 消息中无payload") elif name == 'TaskFailed': # 识别失败 message = result['header'].get('message', '未知错误') print(f"[ASR] ✗ 识别失败: {message}") except Exception as e: print(f"[ASR] 消息解析错误: {e}") def _on_error(self, ws, error): """WebSocket错误""" print(f"[ASR] WebSocket错误: {error}") import traceback traceback.print_exc() self.ws_connected = False def _on_close(self, ws, close_status_code, close_msg): """WebSocket关闭""" print(f"[ASR] WebSocket已关闭") print(f"[ASR] 关闭码: {close_status_code}") print(f"[ASR] 关闭消息: {close_msg}") # 解析错误码 if close_msg: try: close_msg_str = str(close_msg) if '240000002' in close_msg_str or b'240000002' in close_msg: print("[ASR] ❌ 错误: 请求参数校验失败 (240000002)") print("[ASR] 可能原因:") print(" 1. Token已过期(有效期180秒)") print(" 2. Appkey与Token不匹配") print(" 3. 参数格式错误") elif '440000004' in close_msg_str or b'440000004' in close_msg: print("[ASR] ⚠️ 错误: 连接异常 (440000004)") print("[ASR] 可能原因: 长时间连接或音频数据异常,将自动重连") except: pass self.ws_connected = False def send_audio(self, audio_data): """发送音频数据到WebSocket""" import websocket if not self.ws_connected or not self.ws: # 重新连接 if not self._connect_websocket(): return False try: # 将float32转为PCM (int16) audio_int16 = (audio_data * 32767).astype(np.int16) audio_bytes = audio_int16.tobytes() # 🔧 添加调试: 打印音频数据统计 duration_ms = len(audio_data) / SAMPLING_RATE * 1000 # print(f"[ASR] 发送音频: {len(audio_bytes)} bytes ({duration_ms:.0f}ms)") # 发送二进制音频数据 with self.ws_lock: if self.ws and self.ws_connected: self.ws.send(audio_bytes, opcode=websocket.ABNF.OPCODE_BINARY) return True except Exception as e: print(f"[ASR] 发送音频失败: {e}") self.ws_connected = False return False def finish_recognition(self): """结束当前识别会话""" import json import uuid if not self.ws_connected or not self.ws: return self.final_result try: # 发送结束命令(使用相同的task_id) message_id = uuid.uuid4().hex # 32位十六进制 stop_msg = { "header": { "message_id": message_id, # 32位十六进制 "task_id": self.task_id, # 使用开始时的task_id(32位十六进制) "namespace": "SpeechTranscriber", "name": "StopTranscription", "appkey": self.appkey } } print("[ASR] 发送StopTranscription命令...") print(f"[ASR] task_id: {self.task_id}") print(f"[ASR] StopTranscription消息: {json.dumps(stop_msg)}") with self.ws_lock: if self.ws: self.ws.send(json.dumps(stop_msg)) # 等待最终结果(WebSocket异步接收,需要足够的时间) print("[ASR] 等待TranscriptionCompleted消息...") print(f"[ASR] 当前final_result: '{self.final_result}'") max_wait = 1.2 # 🔧 等待1.2秒获取识别结果 start_time = time.time() while (time.time() - start_time) < max_wait: if self.final_result: print(f"[ASR] ✓ 收到最终结果!") break time.sleep(0.05) # 优化: 从0.1秒降至0.05秒,更快检测 result = self.final_result # 🔧 优先使用最终结果,没有则用中间结果 if not result and self.current_result: result = self.current_result print(f"[ASR] 使用中间结果作为最终结果: '{result}'") elif not result: print(f"[ASR] ⚠️ 既无最终结果也无中间结果!") print(f"[ASR] 返回结果: '{result}' (等待时间: {time.time()-start_time:.2f}秒)") # 清空结果缓存 self.current_result = "" self.final_result = "" return result except Exception as e: print(f"[ASR] 结束识别失败: {e}") return self.final_result def run(self, wake_detector): """ASR线程主循环 - 流式识别版本(保持WebSocket长连接)""" print("[ASR] 流式识别线程已启动") # 🔧 保存wake_detector引用,供_on_message使用 self.wake_detector = wake_detector # 建立初始WebSocket连接(保持长连接) if not self._connect_websocket(): print("[ASR] ✗ 无法建立WebSocket连接,退出") return recognition_active = False empty_count = 0 # 连续空队列计数,避免过早结束识别 audio_chunk_count = 0 # 已发送音频块计数 while not stop_flag.is_set(): try: # 检查WebSocket连接状态,如果断开则重连 if not self.ws_connected: print("[ASR] WebSocket已断开,尝试重新连接...") if not self._connect_websocket(): print("[ASR] 重连失败,等待下次...") time.sleep(1) continue # 获取音频数据(非阻塞,使用较短超时) try: audio_data = audio_queue.get(timeout=0.1) empty_count = 0 # 重置空队列计数 except queue.Empty: # 检查是否有正在进行的识别需要结束 # 需要连续多次Empty且is_listening为False才真正结束 if recognition_active and not is_listening.is_set(): empty_count += 1 # 🔧 修复: 等待3次Empty(约0.3秒) # 且至少发送了1个完整音频块(约0.5秒) if empty_count >= 3 and audio_chunk_count >= 1: print(f"[ASR] 用户停止说话,结束识别会话... (已发送{audio_chunk_count}个音频块)") text = self.finish_recognition() recognition_active = False self.session_started = False # 重置会话状态 empty_count = 0 audio_chunk_count = 0 # 重置计数 # ✅ 不关闭WebSocket,保持长连接 print("[ASR] 识别会话结束,WebSocket保持连接") if text: self._process_recognition_result(text, wake_detector) else: print("[ASR] ⚠️ 未获取到最终结果") elif empty_count >= 5 and audio_chunk_count < 1: # 音频太短(没有完整的音频块),直接丢弃 print(f"[ASR] ⚠️ 音频过短({audio_chunk_count}块),丢弃") recognition_active = False self.session_started = False empty_count = 0 audio_chunk_count = 0 continue # 开始新的识别会话(如果还没开始) if not recognition_active: print("[ASR] 准备开始新识别...") recognition_active = True self.session_started = False # 重置会话状态 audio_chunk_count = 0 # 重置计数 # 发送音频数据 if recognition_active: # ✅ 关键优化:只在第一次发送音频时才发送StartTranscription if not self.session_started: print("[ASR] 🎤 收到音频数据,发送StartTranscription...") if self._send_start_transcription(): self.session_started = True else: print("[ASR] StartTranscription发送失败,跳过本次音频") continue # 发送音频数据 self.send_audio(audio_data) audio_chunk_count += 1 # 增加计数 except Exception as e: print(f"[ASR] 线程错误: {e}") import traceback traceback.print_exc() recognition_active = False self.ws_connected = False # 标记需要重连 audio_chunk_count = 0 # 重置计数 # 清理:程序退出时才关闭WebSocket print("[ASR] 程序退出,关闭WebSocket连接...") if self.ws: try: self.ws.close() except: pass def _process_recognition_result(self, text, wake_detector): """处理识别结果""" # 在方法开头声明全局变量,避免"used prior to global declaration"错误 global latest_face_info print(f"\n[ASR] ========== 处理识别结果 ==========") print(f"[ASR] 原始文本: '{text}'") print(f"[ASR] 文本长度: {len(text) if text else 0}") print(f"[ASR] 唤醒状态: {is_awake.is_set()}") if not text: print("[ASR] 未识别到有效内容") return # 优先级1: 检查退出程序词(完全退出) if wake_detector.check_exit_word(text): print("[ASR] ✓ 检测到退出程序指令!") import random goodbye_msg = random.choice(EXIT_RESPONSES) response_queue.put(goodbye_msg) exit_requested.set() # 标记退出请求 return # 优先级2: 检查休眠词(进入休眠,可再次唤醒) if wake_detector.check_sleep_word(text): print("[ASR] ✓ 检测到休眠指令!") if is_awake.is_set(): # 只有唤醒状态才能休眠 import random sleep_msg = random.choice(SLEEP_RESPONSES) response_queue.put(sleep_msg) sleep_requested.set() # 标记休眠请求,等待TTS播放完成 print("[ASR] 已请求休眠,将在TTS播放完成后执行") else: print("[ASR] 当前已是休眠状态,忽略") return # 检查唤醒词 if ENABLE_WAKE_WORD: if not is_awake.is_set(): # 未唤醒,检查是否包含唤醒词 print(f"[ASR] 未唤醒状态,检查唤醒词...") print(f"[ASR] 唤醒词列表: {WAKE_WORDS}") print(f"[ASR] 识别文本: '{text}'") if wake_detector.check_wake_word(text): print(f"[ASR] ✓ 检测到唤醒词!") wake_detector.wake_up() # 自动播放唤醒确认 # 去除唤醒词后的内容 original_text = text for wake_word in wake_detector.wake_words: text = text.replace(wake_word, "").strip() # 🔧 去除前后的标点符号 text = text.strip(',。!?、;:,.!?;: ') print(f"[ASR] 去除唤醒词后: '{text}'") # 🔧 先停止ASR,再把文本传给大模型,附加视觉信息 print("[ASR] 停止ASR程序(准备将文本传给大模型)") self._stop_asr_process() text_with_visual = f"{text}{visual_context}" print(f"[ASR] → 放入text_queue: '{text_with_visual}'") wake_detector.update_activity() text_queue.put(text_with_visual) else: # 不启用唤醒词,直接处理 print(f"[ASR] 唤醒词未启用,直接处理") # 🔧 先停止ASR,再把文本传给大模型 print("[ASR] 停止ASR程序(准备将文本传给大模型)") self._stop_asr_process() text_with_visual = f"{text}{visual_context}" print(f"[ASR] → 放入text_queue: '{text_with_visual}'") text_queue.put(text_with_visual) print(f"[ASR] ========== 处理完成 ==========\n") # ============ 4. 大模型对话模块(千问API)============ # ============ 天气查询工具 ============ def query_weather(city): """查询指定城市的天气信息""" try: import requests # 构建请求URL url = f"{WEATHER_API_URL}?key={WEATHER_API_KEY}&city={city}" # 发送GET请求 response = requests.get(url, timeout=10) # 解析响应 result = response.json() # 检查响应状态 if result.get("status") == 1 and result.get("message") == "success": data = result.get("data", {}) # 提取关键天气信息 temp_c = data.get("temp_C", "") feels_like = data.get("FeelsLikeC", "") humidity = data.get("humidity", "") weather_desc = data["weatherDesc"][0]["value"] if "weatherDesc" in data and data["weatherDesc"] else "" wind_speed = data.get("windspeedKmph", "0") wind_dir = data.get("winddir16Point", "") city_name = data.get("city", city) # 转换为整数进行比较 try: wind_speed_int = int(wind_speed) except ValueError: wind_speed_int = 0 if wind_speed_int < 1: windlv="无风" elif wind_speed_int < 5: windlv="1" elif wind_speed_int < 10: windlv="2" elif wind_speed_int < 20: windlv="3" elif wind_speed_int < 30: windlv="4" else: windlv = "5" # 处理风向 wind = "" if wind_dir == "E": wind = "东" elif wind_dir == "NE": wind = "东北" elif wind_dir == "NW": wind = "西北" elif wind_dir == "N": wind = "北" elif wind_dir == "W": wind = "西" elif wind_dir == "SW": wind = "西南" elif wind_dir == "S": wind = "南" elif wind_dir == "SE": wind = "东南" # 格式化天气信息 weather_info = f"{city_name}当前天气:{weather_desc},温度{temp_c}摄氏度,体感温度{feels_like}摄氏度,湿度百分之{humidity},{wind}风{windlv}级" return weather_info else: # API返回错误信息 error_msg = result.get("message", "查询失败") return f"天气查询失败:{error_msg}" except Exception as e: # 捕获网络或解析错误 return f"天气查询失败:{str(e)}" # ============ 人脸识别工具 ============ def recognize_face(): """识别人脸信息,返回识别结果""" try: print("[人脸识别工具] 开始识别人脸...") # 使用全局人脸识别实例 global global_face_recognition_instance if global_face_recognition_instance is None: return "人脸识别失败:人脸识别模块未初始化" # 获取全局实例 face_recognition_module = global_face_recognition_instance # 使用已打开的摄像头进行识别 import cv2 import time # 尝试多次识别,提高成功率 recognition_results = [] max_attempts = 3 attempts = 0 # 获取摄像头锁,避免冲突 with face_recognition_camera_lock: while attempts < max_attempts: # 读取摄像头帧 ret, frame = face_recognition_module.camera.read() if not ret: attempts += 1 time.sleep(0.1) continue # 检测和识别人脸 results = face_recognition_module.face_recognition.detect_and_recognize(frame) if results["detected"] and len(results["faces"]) > 0: recognition_results.append(results) # 短暂休眠,避免过度占用资源 time.sleep(0.1) attempts += 1 # 处理识别结果 if recognition_results: # 选择检测到人脸最多的结果 best_result = max(recognition_results, key=lambda x: x["count"]) # 过滤出已知人脸 known_faces = [] for face in best_result["faces"]: if face["name"] != "Unknown": known_faces.append(face) if len(known_faces) == 1: # 单一人脸 face = known_faces[0] name = face["name"] confidence = face["confidence"] similarity = face["similarity"] return f"识别成功!这是{name},相似度{similarity:.2f},置信度{confidence:.2f}" elif len(known_faces) > 1: # 多个人脸,只返回已知人脸信息 known_names = [face["name"] for face in known_faces] # 去重 known_names = list(set(known_names)) if len(known_names) == 1: # 多个人脸但都是同一个人 return f"识别成功!检测到{len(known_faces)}张人脸,都是{known_names[0]}" else: # 多个人脸且是不同的人 names_str = "、".join(known_names) return f"识别成功!检测到{len(known_faces)}张人脸,其中已知的有:{names_str}" elif best_result["count"] == 1: # 只有一张未知人脸 return f"识别成功!检测到人脸,但系统中没有该人脸信息" else: # 多张人脸但都是未知的 return "识别成功!检测到多张人脸,但系统中没有这些人脸的信息" else: return "识别失败:未检测到人脸" except Exception as e: import traceback traceback.print_exc() return f"人脸识别失败:{str(e)}" class QwenChat: """千问API对话""" def __init__(self): self.api_key = QWEN_API_KEY self.model = QWEN_MODEL # 初始化对话历史 self.conversation_history = [{ "role": "system", "content": """你是一个智能语音聊天助手,名叫'优宝'。请用简洁、友好、口语化的语言回答用户问题。 回答要通俗易懂,避免使用过于专业的术语。每次回答控制在80字以内,适合语音朗读。 不要输出颜文字表情,也不要使用emoji表情。 当用户询问天气相关问题时,你可以调用query_weather工具来获取实时天气信息。 例如: - 用户问:'北京今天天气怎么样?',你应该调用query_weather工具,参数city为'北京' - 用户问:'上海的天气如何?',你应该调用query_weather工具,参数city为'上海' - 用户问:'武汉今天热吗?',你应该调用query_weather工具,参数city为'武汉' 当用户提出身份确认类问题时,你可以调用recognize_face工具来获取人脸信息。 例如: - 用户问:'我是谁?',你应该调用recognize_face工具 - 用户问:'你认识我吗?',你应该调用recognize_face工具 - 用户问:'你知道我是谁吗?',你应该调用recognize_face工具""" }] def chat(self, user_message): """发送消息并获取回复""" try: dashscope.api_key = self.api_key # 添加到历史 self.conversation_history.append({ "role": "user", "content": user_message }) # 定义工具列表 tools = [ { "type": "function", "function": { "name": "query_weather", "description": "查询指定城市的天气信息", "parameters": { "type": "object", "properties": { "city": { "type": "string", "description": "城市名称,例如:北京、上海、武汉" } }, "required": ["city"] } } }, { "type": "function", "function": { "name": "recognize_face", "description": "识别人脸信息,用于身份确认", "parameters": { "type": "object", "properties": {}, "required": [] } } } ] # 调用千问API response = dashscope.Generation.call( model=self.model, messages=self.conversation_history, tools=tools, result_format='message', stream=False, top_p=0.8, temperature=0.7, ) if response.status_code == HTTPStatus.OK: # 获取模型回复 message = response.output.choices[0].message # 初始化assistant_message assistant_message = "" # 检查是否需要工具调用 - 更安全的方式 # 先检查message的类型 if isinstance(message, dict): # 字典类型直接使用get方法 tool_calls = message.get('tool_calls', None) else: # 对象类型尝试获取属性 try: tool_calls = message.tool_calls except (AttributeError, KeyError): tool_calls = None if tool_calls: # 处理工具调用 for tool_call in tool_calls: try: # 获取工具调用的基本信息 if isinstance(tool_call, dict): tool_call_id = tool_call.get('id', '') function_info = tool_call.get('function', {}) func_name = function_info.get('name', '') arguments = function_info.get('arguments', '{}') else: # 对象类型 tool_call_id = getattr(tool_call, 'id', '') function_obj = getattr(tool_call, 'function', None) func_name = getattr(function_obj, 'name', '') if function_obj else '' arguments = getattr(function_obj, 'arguments', '{}') if function_obj else '{}' if func_name == "query_weather": # 解析参数 params = json.loads(arguments) city = params.get("city") # 调用天气查询函数 tool_result = query_weather(city) # 添加工具调用记录到历史 self.conversation_history.append({ "role": "assistant", "tool_calls": [{ "id": tool_call_id, "type": "function", "function": { "name": func_name, "arguments": arguments } }] }) # 添加工具执行结果到历史 self.conversation_history.append({ "role": "tool", "name": "query_weather", "content": tool_result, "tool_call_id": tool_call_id }) # 再次调用API获取最终回复 final_response = dashscope.Generation.call( model=self.model, messages=self.conversation_history, result_format='message', stream=False, top_p=0.8, temperature=0.7, ) if final_response.status_code == HTTPStatus.OK: final_message = final_response.output.choices[0].message # 获取最终回复 if isinstance(final_message, dict): assistant_message = final_message.get('content', '') else: assistant_message = getattr(final_message, 'content', '') # 添加最终回复到历史 self.conversation_history.append({ "role": "assistant", "content": assistant_message }) else: assistant_message = "抱歉,处理天气信息时出错了。" elif func_name == "recognize_face": # 调用人脸识别函数 tool_result = recognize_face() # 添加工具调用记录到历史 self.conversation_history.append({ "role": "assistant", "tool_calls": [{ "id": tool_call_id, "type": "function", "function": { "name": func_name, "arguments": arguments } }] }) # 添加工具执行结果到历史 self.conversation_history.append({ "role": "tool", "name": "recognize_face", "content": tool_result, "tool_call_id": tool_call_id }) # 再次调用API获取最终回复 final_response = dashscope.Generation.call( model=self.model, messages=self.conversation_history, result_format='message', stream=False, top_p=0.8, temperature=0.7, ) if final_response.status_code == HTTPStatus.OK: final_message = final_response.output.choices[0].message # 获取最终回复 if isinstance(final_message, dict): assistant_message = final_message.get('content', '') else: assistant_message = getattr(final_message, 'content', '') # 添加最终回复到历史 self.conversation_history.append({ "role": "assistant", "content": assistant_message }) else: assistant_message = "抱歉,处理人脸识别信息时出错了。" else: assistant_message = "抱歉,我不支持该工具。" except Exception as e: print(f"[千问] 处理工具调用时出错: {e}") assistant_message = "抱歉,处理工具调用时出错了。" else: # 直接回复 if isinstance(message, dict): assistant_message = message.get('content', '') else: assistant_message = getattr(message, 'content', '') # 添加到历史 self.conversation_history.append({ "role": "assistant", "content": assistant_message }) # 限制历史长度(保留系统提示词+最近8轮对话) if len(self.conversation_history) > 17: self.conversation_history = [self.conversation_history[0]] + self.conversation_history[-16:] return assistant_message else: print(f"[千问] API错误: {response.code} - {response.message}") return "抱歉,我现在无法回答。" except Exception as e: print(f"[千问] 错误: {e}") import traceback traceback.print_exc() return "抱歉,出现了错误。" def run(self): """对话线程主循环""" while not stop_flag.is_set(): try: user_text = text_queue.get(timeout=1) # 🔧 LLM处理时禁用ASR文本传入LLM asr_input_enabled.clear() print(f"[LLM] 处理: {user_text}") response = self.chat(user_text) print(f"[LLM] 回复: {response}") response_queue.put(response) # 🔧 LLM处理完成后,ASR文本传入会在TTS播放结束后0.2秒才启用 except queue.Empty: continue except Exception as e: print(f"[千问] 线程错误: {e}") import traceback traceback.print_exc() # ============ 5. TTS语音合成模块(阿里云NLS)============ class AliyunTTS: """阿里云NLS语音合成""" def __init__(self): try: self.appkey = ALIYUN_APPKEY self.access_key_id = ALIYUN_ACCESS_KEY_ID self.access_key_secret = ALIYUN_ACCESS_KEY_SECRET self.token = None # 获取Token self._get_token() except Exception as e: print(f"[TTS] ✗ 初始化失败: {e}") raise def _get_token(self): """获取NLS访问Token(使用签名认证)""" import requests import hmac import hashlib import base64 from datetime import datetime import uuid from urllib.parse import quote try: # OpenAPI参数 timestamp = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ') nonce = str(uuid.uuid4()) # 公共请求参数 params = { 'AccessKeyId': self.access_key_id, 'Action': 'CreateToken', 'Format': 'JSON', 'RegionId': 'cn-shanghai', 'SignatureMethod': 'HMAC-SHA1', 'SignatureNonce': nonce, 'SignatureVersion': '1.0', 'Timestamp': timestamp, 'Version': '2019-02-28', } # 构造签名字符串 sorted_params = sorted(params.items()) canonicalized_query_string = '&'.join([f"{quote(k, safe='')}={quote(str(v), safe='')}" for k, v in sorted_params]) string_to_sign = f"GET&%2F&{quote(canonicalized_query_string, safe='')}" # 计算签名 signature = base64.b64encode( hmac.new( (self.access_key_secret + '&').encode('utf-8'), string_to_sign.encode('utf-8'), hashlib.sha1 ).digest() ).decode('utf-8') # 添加签名 params['Signature'] = signature # 发送请求 url = "https://nls-meta.cn-shanghai.aliyuncs.com/" print("[TTS] 正在获取NLS Token...") response = requests.get(url, params=params, timeout=10) print(f"[TTS] HTTP状态码: {response.status_code}") if response.status_code == 200: result = response.json() if 'Token' in result and isinstance(result['Token'], dict): if 'Id' in result['Token']: self.token = result['Token']['Id'] return except Exception as e: print(f"[TTS] Token获取异常: {e}") self.token = None def synthesize(self, text): """合成语音(使用NLS RESTful API)""" import requests import json try: # NLS语音合成API地址 url = "https://nls-gateway.cn-shanghai.aliyuncs.com/stream/v1/tts" # 请求参数 params = { "appkey": self.appkey, "text": text, "format": "pcm", "sample_rate": 16000, "voice": TTS_VOICE, # 使用配置的音色 "volume": TTS_VOLUME, # 使用配置的音量 "speech_rate": TTS_SPEECH_RATE, # 使用配置的语速 "pitch_rate": TTS_PITCH_RATE # 使用配置的音调 } # 检查Token if not self.token: self._get_token() # 请求头 - 使用 Bearer Token 格式 headers = { "Content-Type": "application/json", } # 添加Token if self.token: headers["X-NLS-Token"] = self.token # 发送请求 response = requests.post( url, params=params, headers=headers, timeout=10 ) if response.status_code == 200: # 返回的是PCM音频数据 pcm_data = response.content # 将PCM bytes转换为numpy数组 audio_int16 = np.frombuffer(pcm_data, dtype=np.int16) audio_float32 = audio_int16.astype(np.float32) / 32767.0 return audio_float32 else: print(f"[TTS] API调用失败: {response.status_code}") print(f"[TTS] 错误内容: {response.text}") # 如果Token过期,重新获取 if response.status_code == 401: self._get_token() return None except Exception as e: print(f"[TTS] 合成错误: {e}") import traceback traceback.print_exc() return None def run(self): """TTS线程主循环""" while not stop_flag.is_set(): try: text = response_queue.get(timeout=1) print(f"[TTS] 合成: {text[:30]}...") audio = self.synthesize(text) if audio is not None: tts_queue.put(audio) print(f"[TTS] 完成") except queue.Empty: continue except Exception as e: print(f"[TTS] 线程错误: {e}") # ============ 6. 音频播放模块 ============ class AudioPlayer: """音频播放器""" def __init__(self, wake_detector=None, asr=None): self.wake_detector = wake_detector self.asr = asr # ASR实例,用于控制ASR的启动和停止 def play(self, audio_data, sample_rate=16000): """播放音频""" try: is_speaking.set() # 标记为播放中,此时会暂停音频采集 # 🔧 TTS播放时禁用ASR文本传入LLM asr_input_enabled.clear() # 确保是numpy数组 if not isinstance(audio_data, np.ndarray): audio_data = np.array(audio_data) # 如果是双声道,转为单声道 if len(audio_data.shape) > 1: audio_data = audio_data.mean(axis=1) # 使用OutputStream控制缓冲区大小和延迟,减少ALSA欠载错误 with sd.OutputStream( samplerate=sample_rate, device=SPEAKER_DEVICE, channels=1, dtype=np.float32, latency=LATENCY, # 增加延迟以减少ALSA欠载错误 blocksize=int(sample_rate * 0.1) # 增加缓冲区大小 ) as stream: # 写入音频数据 stream.write(audio_data) # 🔧 等待缓冲区中的数据完全播放完成 stream.stop() # 确保所有数据都已播放 time.sleep(0.1) # 额外等待0.1秒,确保最后一个字播放完成 # 🔧 播放完成后延迟足够时间,避免捕获回声和残留声音 time.sleep(0.4) # 增加到0.5秒,确保回声完全消失 is_speaking.clear() # 恢复音频采集 # 🔧 TTS播放完成后等待0.2秒再启用ASR文本传入LLM time.sleep(0.5) asr_input_enabled.set() # 🔧 清空播放期间累积的音频队列 while not audio_queue.empty(): try: audio_queue.get_nowait() except: break except Exception as e: print(f"[播放] 错误: {e}") is_speaking.clear() asr_input_enabled.set() # 出错时也要恢复 def run(self): """播放线程主循环""" while not stop_flag.is_set(): try: audio_data = tts_queue.get(timeout=1) # 🔧 播放前清空音频队列和停止当前识别 while not audio_queue.empty(): try: audio_queue.get_nowait() except: break # 停止当前监听状态 is_listening.clear() self.play(audio_data) # 检查是否需要休眠(播放完告别语后) if sleep_requested.is_set(): time.sleep(0.3) # 短暂延迟 if self.wake_detector: self.wake_detector.sleep() # 执行休眠 sleep_requested.clear() # 清除标志 # 检查是否是告别语播放完成后需要退出程序 if exit_requested.is_set(): time.sleep(0.5) # 短暂延迟,让用户听完 stop_flag.set() # 触发主程序退出 break except queue.Empty: continue except Exception: pass # ============ 7. 主控制器 ============ # 预设个性化欢迎词模板 WELCOME_TEMPLATES = { "warm": [ "你好,{name}!欢迎回来!", "嗨,{name}!好久不见,很高兴见到你!", "{name},欢迎你!今天过得怎么样?", "你好啊,{name}!有什么我可以帮你的吗?", "欢迎回来,{name}!最近还好吗?" ], "professional": [ "您好,{name}!欢迎使用智能语音助手。", "{name},您好!请问有什么可以为您服务的?", "欢迎,{name}!我已准备好为您提供帮助。", "您好,{name}!很高兴为您服务。", "{name},您好!请告诉我您的需求。" ], "casual": [ "嘿,{name}!又见面啦!", "哇,{name}!今天你看起来气色不错!", "{name}!欢迎欢迎,热烈欢迎!", "哟,{name}!什么风把你吹来了?", "嗨呀,{name}!今天想聊点啥?" ] } class VoiceChatAssistant: """语音聊天助手主控制器""" def __init__(self): self.audio_capture = AudioCapture() self.wake_detector = WakeWordDetector() self.asr = ExternalASR() # 使用外部ASR程序 self.llm = QwenChat() self.tts = AliyunTTS() self.player = AudioPlayer(wake_detector=self.wake_detector, asr=self.asr) # 传入wake_detector和asr # 初始化人脸识别模块 self.face_recognition = None if FACE_RECOGNITION_ENABLED: self.face_recognition = FaceRecognitionModule() self.threads = [] self.current_face_name = None self.face_event_processed = False def _generate_welcome_message(self, name): """生成个性化欢迎语""" import random # 随机选择欢迎语风格 style = random.choice(list(WELCOME_TEMPLATES.keys())) templates = WELCOME_TEMPLATES[style] # 随机选择一个模板并填充名字 welcome_message = random.choice(templates).format(name=name) return welcome_message def _face_event_handler(self): """处理人脸事件的线程""" while not stop_flag.is_set(): try: # 获取人脸事件 event = face_event_queue.get(timeout=1) # 初始化消息 welcome_message = "" if event["type"] == "known_single_face": # 已知单一人脸,执行个性化打招呼 face_name = event["name"] # 检查是否是新的人脸 if face_name != self.current_face_name: self.current_face_name = face_name self.face_event_processed = False # 如果还没有处理过这个人脸事件,生成欢迎语 if not self.face_event_processed: print(f"[人脸识别] 检测到已知人脸 {face_name},准备唤醒...") # 生成个性化欢迎语 welcome_message = self._generate_welcome_message(face_name) # 唤醒智能体 self.wake_detector.wake_up() print(f"[人脸识别] 智能体已唤醒,唤醒状态: {is_awake.is_set()}") # 发送欢迎语到TTS队列 response_queue.put(welcome_message) print(f"[人脸识别] 欢迎语已发送: {welcome_message}") # 标记事件已处理 self.face_event_processed = True # 暂时暂停人脸识别模块,节省资源 if self.face_recognition: self.face_recognition.pause() print(f"[人脸识别] 检测到已知人脸 {face_name},暂停人脸识别功能") elif event["type"] == "unknown_single_face" or event["type"] == "multiple_faces": # 未知单一人脸或多个人脸,执行通用礼貌问好 if not hasattr(self, 'face_event_processed') or not self.face_event_processed: print(f"[人脸识别] 检测到{event['type']},准备唤醒...") if event["type"] == "unknown_single_face": welcome_message = "你好,欢迎使用智能语音助手!" else: welcome_message = f"你们好,欢迎使用智能语音助手!" # 唤醒智能体 self.wake_detector.wake_up() print(f"[人脸识别] 智能体已唤醒,唤醒状态: {is_awake.is_set()}") # 发送欢迎语到TTS队列 response_queue.put(welcome_message) print(f"[人脸识别] 欢迎语已发送: {welcome_message}") # 标记事件已处理 self.face_event_processed = True # 暂时暂停人脸识别模块,节省资源 if self.face_recognition: self.face_recognition.pause() print(f"[人脸识别] 检测到{event['type']},暂停人脸识别功能") elif event["type"] == "multiple_known_faces": # 多个人脸,只对已知人脸打招呼 if not hasattr(self, 'face_event_processed') or not self.face_event_processed: known_names = event.get("known_names", []) print(f"[人脸识别] 检测到多个人脸,已知人脸: {known_names},准备唤醒...") # 生成包含所有已知人脸的欢迎语 if len(known_names) == 1: welcome_message = self._generate_welcome_message(known_names[0]) else: # 多人欢迎语 name_list = "、".join(known_names) welcome_message = f"你们好,{name_list}!欢迎使用智能语音助手!" # 唤醒智能体 self.wake_detector.wake_up() print(f"[人脸识别] 智能体已唤醒,唤醒状态: {is_awake.is_set()}") # 发送欢迎语到TTS队列 response_queue.put(welcome_message) print(f"[人脸识别] 欢迎语已发送: {welcome_message}") # 标记事件已处理 self.face_event_processed = True # 暂时暂停人脸识别模块,节省资源 if self.face_recognition: self.face_recognition.pause() print(f"[人脸识别] 检测到多个人脸,暂停人脸识别功能") except queue.Empty: continue except Exception as e: print(f"❌ 人脸事件处理错误: {e}") import traceback traceback.print_exc() def start(self): """启动所有模块""" # 检查配置 if not QWEN_API_KEY or QWEN_API_KEY.startswith("your-"): print("⚠️ 警告: 请配置DashScope API Key!") print(" 1. 访问 https://dashscope.aliyun.com/") print(" 2. 登录后在控制台获取API-KEY") print(" 3. 将API Key填入代码第36行 QWEN_API_KEY 处") # 启动音频采集 self.audio_capture.start() # 启动人脸识别模块 if self.face_recognition: if self.face_recognition.start(): # 启动人脸事件处理线程 face_event_thread = threading.Thread(target=self._face_event_handler, daemon=True, name="人脸事件处理") face_event_thread.start() self.threads.append(face_event_thread) # 启动各工作线程 threads_config = [ ("ASR识别", lambda: self.asr.run(self.wake_detector)), ("千问对话", self.llm.run), ("TTS合成", self.tts.run), ("音频播放", self.player.run), ] for name, target in threads_config: t = threading.Thread(target=target, name=name, daemon=True) t.start() self.threads.append(t) # 主循环(状态监控) try: while True: time.sleep(0.5) # 检查唤醒超时 if ENABLE_WAKE_WORD: self.wake_detector.check_timeout() except KeyboardInterrupt: self.stop() def stop(self): """停止所有模块""" stop_flag.set() self.audio_capture.stop() # 停止人脸识别模块 if self.face_recognition: self.face_recognition.stop() # 清空队列 while not audio_queue.empty(): try: audio_queue.get_nowait() except: break while not text_queue.empty(): try: text_queue.get_nowait() except: break while not response_queue.empty(): try: response_queue.get_nowait() except: break while not tts_queue.empty(): try: tts_queue.get_nowait() except: break while not face_event_queue.empty(): try: face_event_queue.get_nowait() except: break # 等待线程结束 for t in self.threads: t.join(timeout=2) # ============ 主入口 ============ if __name__ == "__main__": # 启动助手 assistant = VoiceChatAssistant() assistant.start()