| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- 基于API的智能语音聊天助手
- - ASR: 使用本地ASR识别
- - LLM: 千问API (DashScope)
- - TTS: 阿里云语音合成API
- - 全部使用云端API,无需GPU,部署简单
- """
- import os
- import time
- import queue
- import threading
- import numpy as np
- import sounddevice as sd
- from pathlib import Path
- from http import HTTPStatus
- import dashscope
- import json
- # ============ 配置参数 ============
- # 音频参数
- SAMPLING_RATE = 16000 # ASR采样率
- CHUNK_DURATION = 0.5 # 录音块时长(秒)
- CHUNK_SIZE = int(SAMPLING_RATE * CHUNK_DURATION)
- # ALSA缓冲区设置,增加缓冲区大小减少欠载错误
- LATENCY = 0.1 # 音频流延迟(秒) - 增加延迟以减少ALSA欠载错误
- # ===== API配置说明 =====
- # ASR: 使用本地ASR识别
- #TTS: 使用阿里云智能语音交互 (NLS)
- # LLM: 使用千问 API (DashScope)
- # 阿里云 NLS 配置(用于 ASR 和 TTS)
- ALIYUN_APPKEY = "uE6YeyIf1q7InhxB" # NLS项目的Appkey
- ALIYUN_ACCESS_KEY_ID = "LTAI5tGZwbyXsaykduFMsTGE" # AccessKey ID
- ALIYUN_ACCESS_KEY_SECRET = "888up1AgLIKDm1rVctnG422OcCmXFt" # AccessKey Secret
- # 千问 API 配置(用于 LLM)
- QWEN_API_KEY = "sk-362d1accbdec4bacbc8d291348049ab0" # 千问API Key
- QWEN_MODEL = "qwen-plus" # LLM模型: qwen-turbo, qwen-plus, qwen-max
- # 天气查询 API 配置
- WEATHER_API_KEY = "738b541a5f7a" # 天气API密钥
- WEATHER_API_URL = "https://whyta.cn/api/tianqi" # 天气API接口地址
- # 唤醒词配置
- WAKE_WORDS = ["优宝同学","又把同学","有保同学","你好优宝"]
- ENABLE_WAKE_WORD = True # 是否启用唤醒词
- AWAKE_TIMEOUT = 60 # 唤醒后无操作自动休眠时间(秒) - 从30秒增加到60秒
- # 休眠词配置(说这些词会进入休眠,而不是退出程序)
- SLEEP_WORDS = ["拜拜", "再见", "休息吧", "睡觉吧"]
- SLEEP_RESPONSES = [
- "好的,我先休息了,有需要再叫我!",
- "好的,拜拜!",
- "那我休息了,再见!",
- "收到,我去休息了!"
- ]
- # 退出程序词配置(说这些词会完全退出程序)
- EXIT_WORDS = ["关闭程序", "退出程序", "彻底退出"]
- EXIT_RESPONSES = [
- "好的,程序即将关闭,再见!",
- "收到,正在退出程序!"
- ]
- # TTS音色配置
- # 阿里云NLS支持的音色列表(更多音色请查看文档)
- # 女声:zhiyuan,zhiyue,zhisha,aiqi,aijia,siqi,
- # 男声:aicheng,zhida,aida,
- # 童声:mashu,yueer
- TTS_VOICE = "aicheng"
- TTS_SPEECH_RATE = 0 # 语速: -500到500, 0为正常速度
- TTS_PITCH_RATE = 0 # 音调: -500到500, 0为正常音调
- TTS_VOLUME = 80 # 音量: 0-100
- # VAD参数(语音活动检测)
- # ⚡ 高灵敏度配置 - 支持远距离拾音
- SILENCE_THRESHOLD = 0.4 # 静音阈值(秒) - 优化: 从0.7降至0.4秒,更快响应
- VOLUME_THRESHOLD = 0.009 # 音量阈值 - 从0.008降至0.003,大幅提高灵敏度,支持远距离拾音
- AUDIO_GAIN = 1.3 # 音频增益倍数 - 放大远距离声音
- # 音频设备
- MIC_DEVICE = 0 # 自动选择麦克风
- SPEAKER_DEVICE = None # 自动选择扬声器
- # 缓存目录
- CACHE_DIR = Path.home() / ".cache" / "voice_chat_api"
- CACHE_DIR.mkdir(parents=True, exist_ok=True)
- # ============ 人脸识别配置 ============
- FACE_RECOGNITION_ENABLED = True # 是否启用人脸识别
- FACE_RECOGNITION_CONFIDENCE_THRESHOLD = 300 # 人脸检测置信度阈值
- FACE_RECOGNITION_DURATION_THRESHOLD = 3 # 人脸持续出现时间阈值(秒)
- FACE_DATABASE_PATH = "face_database.pkl" # 人脸数据库路径
- CAMERA_ID = 10,8 # 摄像头ID
- FACE_RECOGNITION_MODEL = "buffalo_l" # 人脸识别模型
- FORCE_CUDA = True # 是否强制使用CUDA(建议根据设备情况调整)
- # ============ 全局队列 ============
- audio_queue = queue.Queue() # 音频数据队列
- text_queue = queue.Queue() # 识别文本队列
- response_queue = queue.Queue() # AI回复队列
- tts_queue = queue.Queue() # TTS音频队列
- face_event_queue = queue.Queue() # 人脸识别事件队列
- # 控制标志
- stop_flag = threading.Event()
- is_speaking = threading.Event() # TTS正在播放
- is_listening = threading.Event() # 正在录音
- is_awake = threading.Event() # 唤醒状态
- exit_requested = threading.Event() # 请求退出
- sleep_requested = threading.Event() # 请求休眠
- asr_input_enabled = threading.Event() # ASR文本传入LLM的开关,默认开启
- face_recognition_running = threading.Event() # 人脸识别运行状态
- asr_input_enabled.set() # 默认开启
- # 全局人脸识别实例,用于共享资源
- global_face_recognition_instance = None
- face_recognition_camera_lock = threading.Lock() # 摄像头访问锁
- # ============ 人脸识别人脸识别模块 ============
- class FaceRecognitionModule:
- """人脸识别模块"""
-
- def __init__(self):
- """初始化人脸识别模块"""
- self.face_recognition = None
- self.camera = None
- self.running = False
- self.face_tracking = {}
- self.current_face = None
- self.current_face_start_time = None
-
- def initialize(self):
- """初始化人脸识别模型和摄像头"""
- try:
- # 添加项目根目录到Python路径
- import sys
- sys.path.append('/home/ubuntu')
-
- # 直接导入FaceRecognitionAPI类
- from face_recognition_api import FaceRecognitionAPI
-
- # 初始化人脸识别API
- self.face_recognition = FaceRecognitionAPI(
- db_path=FACE_DATABASE_PATH,
- model_name=FACE_RECOGNITION_MODEL,
- force_cuda=FORCE_CUDA
- )
-
- # 打开摄像头 - 尝试多个ID
- import cv2
-
- # 尝试的摄像头ID列表,优先级从高到低
- camera_ids = [10, 8]
- self.camera = None
-
- for cam_id in camera_ids:
- print(f"尝试打开摄像头 ID: {cam_id}...")
- self.camera = cv2.VideoCapture(cam_id)
- if self.camera.isOpened():
- print(f"✅ 成功打开摄像头 ID: {cam_id}")
- break
- else:
- print(f"❌ 无法打开摄像头 ID: {cam_id}")
- # 释放失败的摄像头资源
- self.camera.release()
- self.camera = None
-
- if not self.camera:
- print("❌ 所有摄像头ID都无法打开")
- return False
-
- # 设置摄像头分辨率
- self.camera.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
- self.camera.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
-
- # 将实例保存到全局变量,供工具调用
- global global_face_recognition_instance
- global_face_recognition_instance = self
-
- print("✅ 人脸识别模块初始化成功")
- return True
- except Exception as e:
- print(f"❌ 人脸识别模块初始化失败: {e}")
- import traceback
- traceback.print_exc()
- return False
-
- def start(self):
- """启动人脸识别后台线程"""
- if not self.face_recognition or not self.camera:
- if not self.initialize():
- return False
-
- self.running = True
- face_recognition_running.set()
-
- # 启动后台线程
- self.thread = threading.Thread(target=self._face_recognition_loop, daemon=True)
- self.thread.start()
- print("✅ 人脸识别后台线程已启动")
- return True
-
- def stop(self):
- """停止人脸识别后台线程"""
- self.running = False
- face_recognition_running.clear()
-
- if hasattr(self, 'thread') and self.thread.is_alive():
- self.thread.join(timeout=2)
-
- if self.camera:
- self.camera.release()
-
- print("✅ 人脸识别后台线程已停止")
-
- def _face_recognition_loop(self):
- """人脸识别后台循环"""
- import cv2
-
- # 创建窗口
- cv2.namedWindow("人脸识别", cv2.WINDOW_NORMAL)
- cv2.resizeWindow("人脸识别", 640, 480)
-
- while self.running and not stop_flag.is_set():
- try:
- # 获取摄像头锁
- with face_recognition_camera_lock:
- # 读取摄像头帧
- ret, frame = self.camera.read()
- if not ret:
- time.sleep(0.01)
- continue
-
- # 检测和识别人脸
- try:
- results = self.face_recognition.detect_and_recognize(frame)
-
- # 绘制人脸识别结果
- self._draw_recognition_results(frame, results)
-
- # 显示摄像头窗口
- cv2.imshow("人脸识别", frame)
-
- # 处理键盘事件,按q键退出
- if cv2.waitKey(1) & 0xFF == ord('q'):
- break
-
- # 处理识别结果
- self._process_recognition_results(results)
- except Exception as recog_e:
- print(f"⚠️ 人脸识别处理错误: {recog_e}")
- # 增加安全检查,避免vector越界
- import traceback
- traceback.print_exc()
- # 短暂休眠后继续
- time.sleep(0.5)
- continue
-
- # 控制检测频率,避免占用过多资源
- time.sleep(0.1)
- except Exception as e:
- print(f"❌ 人脸识别循环错误: {e}")
- import traceback
- traceback.print_exc()
- time.sleep(1)
-
- # 关闭窗口
- cv2.destroyWindow("人脸识别")
-
- def _process_recognition_results(self, results):
- """处理人脸识别结果"""
- current_time = time.time()
- detected_faces = results.get("faces", [])
-
- # 安全检查:确保detected_faces是列表
- if not isinstance(detected_faces, list):
- print("⚠️ 检测到的人脸不是列表类型,跳过处理")
- return
-
- # 更新人脸追踪信息
- updated_face_ids = set()
- valid_faces = []
-
- # 过滤有效人脸(置信度达标)
- for face_info in detected_faces:
- try:
- # 安全检查:确保face_info是字典且包含必要键
- if not isinstance(face_info, dict):
- continue
-
- # 检查置信度
- similarity = face_info.get("similarity", 0)
- if similarity < FACE_RECOGNITION_CONFIDENCE_THRESHOLD:
- continue
-
- # 简单的人脸ID生成(基于边界框)
- bbox = face_info.get("bbox", [])
- # 安全检查:确保边界框有效
- if not isinstance(bbox, (list, tuple)) or len(bbox) < 4:
- continue
-
- # 确保边界框坐标是整数
- try:
- bbox = [int(coord) for coord in bbox]
- except (ValueError, TypeError):
- continue
-
- face_id = f"{bbox[0]}_{bbox[1]}_{bbox[2]}_{bbox[3]}"
- updated_face_ids.add(face_id)
-
- # 更新人脸持续时间
- if face_id in self.face_tracking:
- # 人脸已存在,更新持续时间
- self.face_tracking[face_id]["end_time"] = current_time
- self.face_tracking[face_id]["duration"] = current_time - self.face_tracking[face_id]["start_time"]
- self.face_tracking[face_id]["face_info"] = face_info
- else:
- # 新人脸,初始化追踪信息
- self.face_tracking[face_id] = {
- "start_time": current_time,
- "end_time": current_time,
- "duration": 0,
- "face_info": face_info
- }
-
- # 添加到有效人脸列表
- valid_faces.append({
- "face_id": face_id,
- "face_info": face_info,
- "duration": self.face_tracking[face_id]["duration"]
- })
- except Exception as face_e:
- print(f"⚠️ 处理单个人脸信息错误: {face_e}")
- import traceback
- traceback.print_exc()
- continue
-
- # 处理有效人脸
- if valid_faces:
- # 过滤出已知人脸
- known_faces = []
- for face in valid_faces:
- face_info = face["face_info"]
- name = face_info.get("name", "Unknown")
- if name != "Unknown" and face["duration"] >= FACE_RECOGNITION_DURATION_THRESHOLD:
- known_faces.append(face)
-
- # 检查是否满足单一人脸条件
- if len(known_faces) == 1:
- # 单一人脸且停留时间达标
- face = known_faces[0]
- face_info = face["face_info"]
- name = face_info.get("name", "Unknown")
-
- # 已知人脸,发送个性化打招呼事件
- self._send_face_event({
- "type": "known_single_face",
- "name": name,
- "face_info": face_info
- })
- elif len(known_faces) > 1:
- # 多个人脸,但只处理已知的
- # 获取所有已知人脸的名字
- known_names = []
- for face in known_faces:
- name = face["face_info"].get("name", "Unknown")
- if name not in known_names:
- known_names.append(name)
-
- # 发送包含已知人脸信息的事件
- self._send_face_event({
- "type": "multiple_known_faces",
- "face_count": len(known_faces),
- "known_names": known_names
- })
- # 当只有unknown人脸时,不发送任何事件
-
- # 清理已消失的人脸
- try:
- for face_id in list(self.face_tracking.keys()):
- if face_id not in updated_face_ids:
- del self.face_tracking[face_id]
- except Exception as clean_e:
- print(f"⚠️ 清理人脸追踪信息错误: {clean_e}")
- # 重置追踪字典,避免持续错误
- self.face_tracking = {}
-
- def _draw_recognition_results(self, frame, results):
- """在图像上绘制人脸识别结果"""
- import cv2
-
- # 遍历所有检测到的人脸
- for face in results.get("faces", []):
- try:
- # 获取边界框
- bbox = face.get("bbox", [])
- if not isinstance(bbox, (list, tuple)) or len(bbox) < 4:
- continue
-
- # 确保边界框坐标是整数
- bbox = [int(coord) for coord in bbox]
-
- # 获取人脸信息
- name = face.get("name", "Unknown")
- confidence = face.get("confidence", 0.0)
- similarity = face.get("similarity", 0.0)
-
- # 绘制边界框
- if name != "Unknown":
- # 已知人脸使用绿色框
- box_color = (0, 255, 0)
- else:
- # 未知人脸使用蓝色框
- box_color = (255, 0, 0)
-
- cv2.rectangle(frame, (bbox[0], bbox[1]), (bbox[2], bbox[3]), box_color, 2)
-
- # 绘制文本背景
- text_bg_color = (0, 0, 0) # 黑色背景
- text_color = (255, 255, 255) # 白色文字
-
- # 准备文本
- if name != "Unknown":
- text = f"{name} ({similarity:.1f})"
- else:
- text = f"Unknown ({confidence:.1f})"
-
- # 使用freetype库绘制文字
- try:
- # 调用face_recognition_api中的_put_text_freetype方法
- self.face_recognition._put_text_freetype(
- frame,
- text,
- (bbox[0], bbox[1] - 10),
- font_size=18,
- color=text_color
- )
- except Exception as ft_e:
- # 如果freetype绘制失败,使用OpenCV默认字体
- print(f"⚠️ freetype绘制文字失败,使用默认字体: {ft_e}")
- cv2.putText(
- frame,
- text,
- (bbox[0], bbox[1] - 10),
- cv2.FONT_HERSHEY_SIMPLEX,
- 0.6,
- text_color,
- 2
- )
- except Exception as draw_e:
- print(f"⚠️ 绘制人脸识别结果错误: {draw_e}")
- import traceback
- traceback.print_exc()
- continue
-
- def _send_face_event(self, event_data):
- """发送人脸事件到事件队列"""
- # 确保事件数据包含基本信息
- event = {
- "timestamp": time.time(),
- **event_data
- }
- face_event_queue.put(event)
-
- if event_data["type"] == "known_single_face":
- print(f"✅ 检测到已知单一人脸: {event_data['name']}")
- elif event_data["type"] == "unknown_single_face":
- print(f"✅ 检测到未知单一人脸")
- elif event_data["type"] == "multiple_faces":
- print(f"✅ 检测到多个人脸: {event_data['face_count']}人")
-
- def pause(self):
- """暂停人脸识别"""
- self.running = False
- face_recognition_running.clear()
- print("✅ 人脸识别已暂停")
-
- def resume(self):
- """恢复人脸识别"""
- self.start()
- # ============ 1. 音频采集模块 ============
- class AudioCapture:
- """实时音频采集"""
-
- def __init__(self):
- self.stream = None
- self.audio_buffer = []
- self.silence_start = None
-
- def callback(self, indata, frames, time_info, status):
- """音频流回调 - 流式模式"""
- # TTS播放时不采集音频,避免回声(完全丢弃所有数据)
- if is_speaking.is_set():
- # 清空所有状态
- self.audio_buffer = []
- self.silence_start = None
- is_listening.clear()
- return
-
- # 转换为float32格式
- audio_data = indata[:, 0].astype(np.float32)
-
- # 🎤 应用音频增益 - 放大远距离声音
- audio_data = audio_data * AUDIO_GAIN
- # 防止削波(限制在-1到1之间)
- audio_data = np.clip(audio_data, -1.0, 1.0)
-
- # 计算音量(检测是否有声音)
- volume = np.abs(audio_data).mean()
-
- if volume > VOLUME_THRESHOLD: # 有声音
- self.silence_start = None
- # 流式模式:立即发送音频块到队列
- audio_queue.put(audio_data.copy())
- is_listening.set()
- else: # 静音
- # 检测静音持续时间
- if is_listening.is_set():
- if self.silence_start is None:
- self.silence_start = time.time()
- elif time.time() - self.silence_start > SILENCE_THRESHOLD:
- # 静音超过阈值,标记说话结束
- self.silence_start = None
- is_listening.clear()
-
- def start(self):
- """启动音频采集"""
- self.stream = sd.InputStream(
- samplerate=SAMPLING_RATE,
- blocksize=CHUNK_SIZE,
- device=MIC_DEVICE,
- channels=1,
- dtype=np.float32,
- callback=self.callback,
- latency=LATENCY # 增加延迟以减少ALSA欠载错误
- )
- self.stream.start()
-
- def stop(self):
- """停止采集"""
- if self.stream:
- self.stream.stop()
- self.stream.close()
- # ============ 2. 语音唤醒检测模块 ============
- class WakeWordDetector:
- """语音唤醒检测器"""
-
- def __init__(self):
- self.wake_words = WAKE_WORDS
- self.last_active_time = None
-
- # 预计算唤醒词的拼音(用于模糊匹配)
- try:
- from pypinyin import lazy_pinyin
- self.wake_words_pinyin = {
- word: ''.join(lazy_pinyin(word))
- for word in self.wake_words
- }
- except ImportError:
- self.wake_words_pinyin = None
-
- if not ENABLE_WAKE_WORD:
- is_awake.set() # 如果不启用唤醒词,默认一直唤醒
-
- def check_wake_word(self, text):
- """检查是否包含唤醒词(支持拼音模糊匹配)"""
- if not text:
- return False
-
- # 方法1: 精确匹配(优先)
- for wake_word in self.wake_words:
- if wake_word in text:
- return True
-
- # 方法2: 拼音模糊匹配
- if self.wake_words_pinyin:
- try:
- from pypinyin import lazy_pinyin
- text_pinyin = ''.join(lazy_pinyin(text))
-
- for wake_word, wake_pinyin in self.wake_words_pinyin.items():
- # 检查拼音是否包含
- if wake_pinyin in text_pinyin:
- return True
-
- # 更模糊的匹配:计算相似度
- similarity = self._calculate_similarity(wake_pinyin, text_pinyin)
- if similarity > 0.7: # 70%相似度就认为匹配
- return True
-
- except Exception:
- pass
-
- return False
-
- def _calculate_similarity(self, s1, s2):
- """计算两个拼音字符串的相似度(编辑距离)"""
- if not s1 or not s2:
- return 0.0
-
- # Levenshtein距离
- len1, len2 = len(s1), len(s2)
- dp = [[0] * (len2 + 1) for _ in range(len1 + 1)]
-
- for i in range(len1 + 1):
- dp[i][0] = i
- for j in range(len2 + 1):
- dp[0][j] = j
-
- for i in range(1, len1 + 1):
- for j in range(1, len2 + 1):
- if s1[i-1] == s2[j-1]:
- dp[i][j] = dp[i-1][j-1]
- else:
- dp[i][j] = min(dp[i-1][j], dp[i][j-1], dp[i-1][j-1]) + 1
-
- distance = dp[len1][len2]
- max_len = max(len1, len2)
- similarity = 1 - (distance / max_len) if max_len > 0 else 0
-
- return similarity
-
- def check_sleep_word(self, text):
- """检查是否包含休眠词(支持拼音匹配)"""
- if not text:
- return False
-
- # 精确匹配
- for sleep_word in SLEEP_WORDS:
- if sleep_word in text:
- return True
-
- # 拼音匹配
- if self.wake_words_pinyin:
- try:
- from pypinyin import lazy_pinyin
- text_pinyin = ''.join(lazy_pinyin(text))
- for sleep_word in SLEEP_WORDS:
- sleep_pinyin = ''.join(lazy_pinyin(sleep_word))
- if sleep_pinyin in text_pinyin:
- return True
- except:
- pass
-
- return False
-
- def check_exit_word(self, text):
- """检查是否包含退出程序词(支持拼音匹配)"""
- if not text:
- return False
-
- # 精确匹配
- for exit_word in EXIT_WORDS:
- if exit_word in text:
- return True
-
- # 拼音匹配
- if self.wake_words_pinyin:
- try:
- from pypinyin import lazy_pinyin
- text_pinyin = ''.join(lazy_pinyin(text))
- for exit_word in EXIT_WORDS:
- exit_pinyin = ''.join(lazy_pinyin(exit_word))
- if exit_pinyin in text_pinyin:
- return True
- except:
- pass
-
- return False
-
- def wake_up(self):
- """唤醒"""
- is_awake.set()
- self.last_active_time = time.time()
-
- def sleep(self):
- """休眠"""
- is_awake.clear()
- self.last_active_time = None
-
- def update_activity(self):
- """更新活动时间"""
- if is_awake.is_set() and ENABLE_WAKE_WORD:
- self.last_active_time = time.time()
-
- def check_timeout(self):
- """检查是否超时需要休眠"""
- if ENABLE_WAKE_WORD and is_awake.is_set() and self.last_active_time:
- if time.time() - self.last_active_time > AWAKE_TIMEOUT:
- self.sleep()
- return True
- return False
- # ============ 3. ASR识别模块(基于外部程序)============
- class ExternalASR:
- """基于外部程序的ASR识别(使用pty方式)"""
-
- def __init__(self):
- try:
- import pty
- import os
- import re
-
- self.pty = pty
- self.os = os
- self.re = re
- self.pattern = self.re.compile(r"\('([^']*)',\s*'([^']*)'\)")
-
- self.demo_path = self.os.path.abspath("../audio_check_demo")
-
- if not self.os.path.exists(self.demo_path):
- print(f"[ASR] ✗ 外部程序不存在: {self.demo_path}")
- raise FileNotFoundError(f"ASR程序不存在: {self.demo_path}")
-
- self.master = None
- self.slave = None
- self.pid = None
- self.wake_detector = None
-
- self._start_pty_process()
-
- print("[ASR] ✓ 外部ASR程序加载成功")
- except Exception as e:
- print(f"[ASR] ✗ 初始化失败: {e}")
- import traceback
- traceback.print_exc()
- raise
-
- def _start_pty_process(self):
- """启动pty进程运行外部ASR程序"""
- self.master, self.slave = self.pty.openpty()
- self.pid = self.os.fork()
-
- if self.pid == 0:
- self.os.close(self.master)
- self.os.dup2(self.slave, 1)
- self.os.dup2(self.slave, 2)
- self.os.close(self.slave)
- self.os.execv(self.demo_path, [self.demo_path])
- else:
- self.os.close(self.slave)
- print(f"[ASR] 已启动外部ASR程序 (PID: {self.pid})")
-
- def _process_recognition_result(self, text, wake_detector):
- """处理识别结果"""
- if not text:
- return
-
- if wake_detector.check_exit_word(text):
- import random
- goodbye_msg = random.choice(EXIT_RESPONSES)
- response_queue.put(goodbye_msg)
- exit_requested.set()
- return
-
- if wake_detector.check_sleep_word(text):
- if is_awake.is_set():
- import random
- sleep_msg = random.choice(SLEEP_RESPONSES)
- response_queue.put(sleep_msg)
- sleep_requested.set()
- return
-
- if not asr_input_enabled.is_set():
- return
-
- if ENABLE_WAKE_WORD:
- if not is_awake.is_set():
- if wake_detector.check_wake_word(text):
- wake_detector.wake_up()
- print(f"[ASR] 识别: {text}")
- wake_detector.update_activity()
- text_queue.put(text)
- else:
- if len(text.strip()) < 2:
- return
-
- print(f"[ASR] 识别: {text}")
- wake_detector.update_activity()
- text_queue.put(text)
- else:
- print(f"[ASR] 识别: {text}")
- text_queue.put(text)
-
- def run(self, wake_detector):
- """ASR线程主循环"""
- self.wake_detector = wake_detector
-
- print("[ASR] 外部ASR识别线程已启动")
-
- try:
- with self.os.fdopen(self.master, 'r', encoding='utf-8', errors='ignore') as f:
- while not stop_flag.is_set():
- try:
- line = f.readline()
- if not line:
- break
-
- line = line.strip()
- if not line:
- continue
-
- match = self.pattern.match(line)
- if match:
- text = match.group(1)
- pinyin = match.group(2)
- print(f"[ASR] 识别文本:{text} | 拼音:{pinyin}")
-
- if text:
- self._process_recognition_result(text, self.wake_detector)
- else:
- print(f"【原始】:{line}")
-
- except Exception as e:
- print(f"[ASR] 读取错误: {e}")
- import traceback
- traceback.print_exc()
- break
-
- except Exception as e:
- print(f"[ASR] 线程错误: {e}")
- import traceback
- traceback.print_exc()
- finally:
- if self.master:
- try:
- self.os.close(self.master)
- except:
- pass
- if self.pid > 0:
- try:
- self.os.waitpid(self.pid, 0)
- except:
- pass
- # ============ 3. ASR识别模块(阿里云NLS - 已废弃)============
- class AliyunStreamingASR:
- """阿里云NLS流式语音识别(WebSocket)"""
-
- def __init__(self):
- print("[ASR] 初始化阿里云NLS流式识别...")
- try:
- self.appkey = ALIYUN_APPKEY
- self.access_key_id = ALIYUN_ACCESS_KEY_ID
- self.access_key_secret = ALIYUN_ACCESS_KEY_SECRET
- self.token = None
- self.token_expire_time = 0
-
- # WebSocket连接
- self.ws = None
- self.ws_connected = False
- self.ws_lock = threading.Lock()
- self.task_id = None # 当前识别会话ID
- self.session_started = False # 当前会话是否已发送StartTranscription
-
- # 识别结果缓存
- self.current_result = ""
- self.final_result = ""
- self.sentence_results = [] # 🔧 新增:存储SentenceEnd的句子结果
- self.wake_detector = None # 🔧 新增:保存wake_detector引用
-
- # Token管理(Token有效期180秒)
- self.token_create_time = 0
-
- # 获取Token
- self._get_token()
-
- print("[ASR] ✓ 阿里云NLS 流式ASR初始化成功")
- print(f"[ASR] 使用Appkey: {self.appkey}")
- print(f"[ASR] 模式: WebSocket 流式识别")
- except Exception as e:
- print(f"[ASR] ✗ 初始化失败: {e}")
- raise
-
- def _get_token(self):
- """获取NLS访问Token(使用阿里云OpenAPI签名)"""
- import requests
- import hmac
- import hashlib
- import base64
- from datetime import datetime
- import uuid
- from urllib.parse import quote
-
- try:
- # OpenAPI参数
- timestamp = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')
- nonce = str(uuid.uuid4())
-
- # 公共请求参数
- params = {
- 'AccessKeyId': self.access_key_id,
- 'Action': 'CreateToken',
- 'Format': 'JSON',
- 'RegionId': 'cn-shanghai',
- 'SignatureMethod': 'HMAC-SHA1',
- 'SignatureNonce': nonce,
- 'SignatureVersion': '1.0',
- 'Timestamp': timestamp,
- 'Version': '2019-02-28'
- }
-
- # 构造签名字符串
- sorted_params = sorted(params.items())
- canonicalized_query_string = '&'.join([f"{quote(k, safe='')}={quote(str(v), safe='')}" for k, v in sorted_params])
- string_to_sign = f"GET&%2F&{quote(canonicalized_query_string, safe='')}"
-
- # 计算签名
- signature = base64.b64encode(
- hmac.new(
- (self.access_key_secret + '&').encode('utf-8'),
- string_to_sign.encode('utf-8'),
- hashlib.sha1
- ).digest()
- ).decode('utf-8')
-
- # 添加签名到参数
- params['Signature'] = signature
-
- # 发送请求
- url = "https://nls-meta.cn-shanghai.aliyuncs.com/"
-
- print("[ASR] 正在获取NLS Token...")
- print(f"[ASR] 使用签名认证")
-
- response = requests.get(url, params=params, timeout=10)
-
- print(f"[ASR] HTTP状态码: {response.status_code}")
-
- if response.status_code == 200:
- result = response.json()
- print(f"[ASR] CreateToken响应: {result}")
-
- # 提取Token
- if 'Token' in result and isinstance(result['Token'], dict):
- if 'Id' in result['Token']:
- self.token = result['Token']['Id']
- self.token_create_time = time.time()
- self.token_expire_time = result['Token'].get('ExpireTime', 0)
- print(f"[ASR] ✓ Token获取成功")
- print(f"[ASR] Token: {self.token}")
- print(f"[ASR] 有效期: 180秒")
- print(f"[ASR] Appkey: {self.appkey}")
- print(f"[ASR] 提示: 确保Token和Appkey来自同一账号")
- return
-
- print(f"[ASR] ⚠️ 无法从响应提取Token,响应: {result}")
- else:
- print(f"[ASR] Token API失败: {response.text}")
-
- except Exception as e:
- print(f"[ASR] ❌ Token获取异常: {e}")
- import traceback
- traceback.print_exc()
-
- # 失败提示
- print("\n" + "=" * 60)
- print("❌ Token获取失败 - 请检查:")
- print("=" * 60)
- print(f"1. AccessKey ID: {self.access_key_id}")
- print(f"2. AccessKey Secret: {self.access_key_secret[:5]}***")
- print("3. 是否已开通 NLS 服务: https://nls.console.aliyun.com/")
- print("4. AccessKey 是否有 NLS 权限")
- print("=" * 60 + "\n")
-
- self.token = None
-
- def _connect_websocket(self):
- """建立WebSocket连接"""
- import websocket
- import json
-
- try:
- # 检查Token有效期(180秒)
- if not self.token or (time.time() - self.token_create_time) > 170:
- print("[ASR] Token为空或即将过期,重新获取...")
- self._get_token()
- if not self.token:
- print("[ASR] Token获取失败,无法建立连接")
- return False
-
- # ✅ 正确方式:Token必须通过URL参数传递(不是Header!)
- ws_url = f"wss://nls-gateway-cn-shanghai.aliyuncs.com/ws/v1?token={self.token}"
- # 创建WebSocket连接(Token已在URL中,无需header)
- self.ws = websocket.WebSocketApp(
- ws_url,
- on_message=self._on_message,
- on_error=self._on_error,
- on_close=self._on_close,
- on_open=self._on_open
- )
-
- # 在新线程中运行WebSocket
- ws_thread = threading.Thread(target=self.ws.run_forever, daemon=True)
- ws_thread.start()
-
- # 等待连接建立
- timeout = 5
- start_time = time.time()
- while not self.ws_connected and time.time() - start_time < timeout:
- time.sleep(0.1)
-
- if self.ws_connected:
- print("[ASR] ✓ WebSocket连接成功")
- return True
- else:
- print("[ASR] ✗ WebSocket连接超时")
- return False
-
- except Exception as e:
- print(f"[ASR] WebSocket连接失败: {e}")
- import traceback
- traceback.print_exc()
- return False
-
- def _on_open(self, ws):
- """WebSocket连接打开(长连接建立成功)"""
- print("[ASR] ✓ WebSocket长连接已建立")
- self.ws_connected = True
- # 不在这里发送StartTranscription,等待实际识别时再发送
-
- def _send_start_transcription(self):
- """发送开始识别命令(每次新的识别会话调用)"""
- import json
- import uuid
-
- if not self.ws_connected or not self.ws:
- print("[ASR] WebSocket未连接")
- return False
-
- try:
- # 为每个新会话生成新的task_id
- self.task_id = uuid.uuid4().hex # 32位十六进制
- message_id = uuid.uuid4().hex # 32位十六进制
-
- # 清空之前的结果
- self.current_result = ""
- self.final_result = ""
-
- print(f"[ASR] 开始新识别会话 (task_id: {self.task_id})")
-
- start_msg = {
- "header": {
- "message_id": message_id,
- "task_id": self.task_id,
- "namespace": "SpeechTranscriber",
- "name": "StartTranscription",
- "appkey": self.appkey
- },
- "payload": {
- "format": "pcm",
- "sample_rate": SAMPLING_RATE,
- "enable_intermediate_result": True,
- "enable_punctuation_prediction": True,
- "enable_inverse_text_normalization": True,
- "max_sentence_silence": 400 # 🔧 从800降至400ms,更快触发识别
- }
- }
-
- with self.ws_lock:
- if self.ws:
- self.ws.send(json.dumps(start_msg))
- print("[ASR] ✓ StartTranscription已发送")
- return True
-
- except Exception as e:
- print(f"[ASR] 发送StartTranscription失败: {e}")
- return False
-
- def _on_message(self, ws, message):
- """接收识别结果"""
- import json
-
- try:
- result = json.loads(message)
-
- # 解析结果
- if 'header' in result:
- name = result['header'].get('name', '')
- status = result['header'].get('status', 0)
-
- # 🔧 添加调试: 打印所有消息类型(排除常见的)
- if name not in ['TranscriptionResultChanged', 'TranscriptionCompleted', 'TaskFailed', 'SentenceEnd', 'SentenceBegin']:
- print(f"[ASR] 收到消息: {name}, status: {status}")
-
- if name == 'TranscriptionResultChanged':
- # 中间结果
- if 'payload' in result and 'result' in result['payload']:
- text = result['payload']['result']
- self.current_result = text
- print(f"[ASR] 中间结果: {text}")
-
- elif name == 'SentenceEnd':
- # 🔧 新增:处理句子结束消息(流式识别的实时结果)
- if 'payload' in result and 'result' in result['payload']:
- text = result['payload']['result']
- print(f"[ASR] ✓ 句子完成: {text}")
-
- # 🔧 过滤短句和可能的回声
- if text and len(text.strip()) >= 2: # 至少2个字符
- # 检查是否在播放中(回声)
- if is_speaking.is_set():
- print(f"[ASR] ⚠️ 播放期间的识别,丢弃(可能是回声): {text}")
- elif self.wake_detector:
- self._process_recognition_result(text, self.wake_detector)
- else:
- print(f"[ASR] ⚠️ 句子太短,丢弃: {text}")
-
- elif name == 'TranscriptionCompleted':
- # 最终结果
- print(f"[ASR] 收到TranscriptionCompleted消息")
- print(f"[ASR] 完整消息: {result}")
-
- if 'payload' in result:
- payload = result['payload']
- print(f"[ASR] payload: {payload}")
-
- # 尝试多种可能的字段
- text = None
- if 'result' in payload:
- text = payload['result']
- elif 'text' in payload:
- text = payload['text']
- elif isinstance(payload, str):
- text = payload
-
- if text:
- self.final_result = text
- print(f"[ASR] ✓ 最终结果: {text}")
- else:
- print(f"[ASR] ⚠️ payload中无result字段")
- else:
- print(f"[ASR] ⚠️ 消息中无payload")
-
- elif name == 'TaskFailed':
- # 识别失败
- message = result['header'].get('message', '未知错误')
- print(f"[ASR] ✗ 识别失败: {message}")
-
- except Exception as e:
- print(f"[ASR] 消息解析错误: {e}")
-
- def _on_error(self, ws, error):
- """WebSocket错误"""
- print(f"[ASR] WebSocket错误: {error}")
- import traceback
- traceback.print_exc()
- self.ws_connected = False
-
- def _on_close(self, ws, close_status_code, close_msg):
- """WebSocket关闭"""
- print(f"[ASR] WebSocket已关闭")
- print(f"[ASR] 关闭码: {close_status_code}")
- print(f"[ASR] 关闭消息: {close_msg}")
-
- # 解析错误码
- if close_msg:
- try:
- close_msg_str = str(close_msg)
- if '240000002' in close_msg_str or b'240000002' in close_msg:
- print("[ASR] ❌ 错误: 请求参数校验失败 (240000002)")
- print("[ASR] 可能原因:")
- print(" 1. Token已过期(有效期180秒)")
- print(" 2. Appkey与Token不匹配")
- print(" 3. 参数格式错误")
- elif '440000004' in close_msg_str or b'440000004' in close_msg:
- print("[ASR] ⚠️ 错误: 连接异常 (440000004)")
- print("[ASR] 可能原因: 长时间连接或音频数据异常,将自动重连")
- except:
- pass
-
- self.ws_connected = False
-
- def send_audio(self, audio_data):
- """发送音频数据到WebSocket"""
- import websocket
-
- if not self.ws_connected or not self.ws:
- # 重新连接
- if not self._connect_websocket():
- return False
-
- try:
- # 将float32转为PCM (int16)
- audio_int16 = (audio_data * 32767).astype(np.int16)
- audio_bytes = audio_int16.tobytes()
-
- # 🔧 添加调试: 打印音频数据统计
- duration_ms = len(audio_data) / SAMPLING_RATE * 1000
- # print(f"[ASR] 发送音频: {len(audio_bytes)} bytes ({duration_ms:.0f}ms)")
-
- # 发送二进制音频数据
- with self.ws_lock:
- if self.ws and self.ws_connected:
- self.ws.send(audio_bytes, opcode=websocket.ABNF.OPCODE_BINARY)
- return True
-
- except Exception as e:
- print(f"[ASR] 发送音频失败: {e}")
- self.ws_connected = False
- return False
-
- def finish_recognition(self):
- """结束当前识别会话"""
- import json
- import uuid
-
- if not self.ws_connected or not self.ws:
- return self.final_result
-
- try:
- # 发送结束命令(使用相同的task_id)
- message_id = uuid.uuid4().hex # 32位十六进制
-
- stop_msg = {
- "header": {
- "message_id": message_id, # 32位十六进制
- "task_id": self.task_id, # 使用开始时的task_id(32位十六进制)
- "namespace": "SpeechTranscriber",
- "name": "StopTranscription",
- "appkey": self.appkey
- }
- }
-
- print("[ASR] 发送StopTranscription命令...")
- print(f"[ASR] task_id: {self.task_id}")
- print(f"[ASR] StopTranscription消息: {json.dumps(stop_msg)}")
-
- with self.ws_lock:
- if self.ws:
- self.ws.send(json.dumps(stop_msg))
-
- # 等待最终结果(WebSocket异步接收,需要足够的时间)
- print("[ASR] 等待TranscriptionCompleted消息...")
- print(f"[ASR] 当前final_result: '{self.final_result}'")
-
- max_wait = 1.2 # 🔧 等待1.2秒获取识别结果
- start_time = time.time()
-
- while (time.time() - start_time) < max_wait:
- if self.final_result:
- print(f"[ASR] ✓ 收到最终结果!")
- break
- time.sleep(0.05) # 优化: 从0.1秒降至0.05秒,更快检测
-
- result = self.final_result
-
- # 🔧 优先使用最终结果,没有则用中间结果
- if not result and self.current_result:
- result = self.current_result
- print(f"[ASR] 使用中间结果作为最终结果: '{result}'")
- elif not result:
- print(f"[ASR] ⚠️ 既无最终结果也无中间结果!")
-
- print(f"[ASR] 返回结果: '{result}' (等待时间: {time.time()-start_time:.2f}秒)")
-
- # 清空结果缓存
- self.current_result = ""
- self.final_result = ""
-
- return result
-
- except Exception as e:
- print(f"[ASR] 结束识别失败: {e}")
- return self.final_result
-
- def run(self, wake_detector):
- """ASR线程主循环 - 流式识别版本(保持WebSocket长连接)"""
- print("[ASR] 流式识别线程已启动")
-
- # 🔧 保存wake_detector引用,供_on_message使用
- self.wake_detector = wake_detector
-
- # 建立初始WebSocket连接(保持长连接)
- if not self._connect_websocket():
- print("[ASR] ✗ 无法建立WebSocket连接,退出")
- return
-
- recognition_active = False
- empty_count = 0 # 连续空队列计数,避免过早结束识别
- audio_chunk_count = 0 # 已发送音频块计数
-
- while not stop_flag.is_set():
- try:
- # 检查WebSocket连接状态,如果断开则重连
- if not self.ws_connected:
- print("[ASR] WebSocket已断开,尝试重新连接...")
- if not self._connect_websocket():
- print("[ASR] 重连失败,等待下次...")
- time.sleep(1)
- continue
-
- # 获取音频数据(非阻塞,使用较短超时)
- try:
- audio_data = audio_queue.get(timeout=0.1)
- empty_count = 0 # 重置空队列计数
- except queue.Empty:
- # 检查是否有正在进行的识别需要结束
- # 需要连续多次Empty且is_listening为False才真正结束
- if recognition_active and not is_listening.is_set():
- empty_count += 1
-
- # 🔧 修复: 等待3次Empty(约0.3秒)
- # 且至少发送了1个完整音频块(约0.5秒)
- if empty_count >= 3 and audio_chunk_count >= 1:
- print(f"[ASR] 用户停止说话,结束识别会话... (已发送{audio_chunk_count}个音频块)")
- text = self.finish_recognition()
- recognition_active = False
- self.session_started = False # 重置会话状态
- empty_count = 0
- audio_chunk_count = 0 # 重置计数
-
- # ✅ 不关闭WebSocket,保持长连接
- print("[ASR] 识别会话结束,WebSocket保持连接")
-
- if text:
- self._process_recognition_result(text, wake_detector)
- else:
- print("[ASR] ⚠️ 未获取到最终结果")
- elif empty_count >= 5 and audio_chunk_count < 1:
- # 音频太短(没有完整的音频块),直接丢弃
- print(f"[ASR] ⚠️ 音频过短({audio_chunk_count}块),丢弃")
- recognition_active = False
- self.session_started = False
- empty_count = 0
- audio_chunk_count = 0
-
- continue
-
- # 开始新的识别会话(如果还没开始)
- if not recognition_active:
- print("[ASR] 准备开始新识别...")
- recognition_active = True
- self.session_started = False # 重置会话状态
- audio_chunk_count = 0 # 重置计数
-
- # 发送音频数据
- if recognition_active:
- # ✅ 关键优化:只在第一次发送音频时才发送StartTranscription
- if not self.session_started:
- print("[ASR] 🎤 收到音频数据,发送StartTranscription...")
- if self._send_start_transcription():
- self.session_started = True
- else:
- print("[ASR] StartTranscription发送失败,跳过本次音频")
- continue
-
- # 发送音频数据
- self.send_audio(audio_data)
- audio_chunk_count += 1 # 增加计数
-
- except Exception as e:
- print(f"[ASR] 线程错误: {e}")
- import traceback
- traceback.print_exc()
- recognition_active = False
- self.ws_connected = False # 标记需要重连
- audio_chunk_count = 0 # 重置计数
-
- # 清理:程序退出时才关闭WebSocket
- print("[ASR] 程序退出,关闭WebSocket连接...")
- if self.ws:
- try:
- self.ws.close()
- except:
- pass
-
- def _process_recognition_result(self, text, wake_detector):
- """处理识别结果"""
- # 在方法开头声明全局变量,避免"used prior to global declaration"错误
- global latest_face_info
-
- print(f"\n[ASR] ========== 处理识别结果 ==========")
- print(f"[ASR] 原始文本: '{text}'")
- print(f"[ASR] 文本长度: {len(text) if text else 0}")
- print(f"[ASR] 唤醒状态: {is_awake.is_set()}")
-
- if not text:
- print("[ASR] 未识别到有效内容")
- return
-
- # 优先级1: 检查退出程序词(完全退出)
- if wake_detector.check_exit_word(text):
- print("[ASR] ✓ 检测到退出程序指令!")
- import random
- goodbye_msg = random.choice(EXIT_RESPONSES)
- response_queue.put(goodbye_msg)
- exit_requested.set() # 标记退出请求
- return
-
- # 优先级2: 检查休眠词(进入休眠,可再次唤醒)
- if wake_detector.check_sleep_word(text):
- print("[ASR] ✓ 检测到休眠指令!")
- if is_awake.is_set(): # 只有唤醒状态才能休眠
- import random
- sleep_msg = random.choice(SLEEP_RESPONSES)
- response_queue.put(sleep_msg)
- sleep_requested.set() # 标记休眠请求,等待TTS播放完成
- print("[ASR] 已请求休眠,将在TTS播放完成后执行")
- else:
- print("[ASR] 当前已是休眠状态,忽略")
- return
-
- # 检查唤醒词
- if ENABLE_WAKE_WORD:
- if not is_awake.is_set():
- # 未唤醒,检查是否包含唤醒词
- print(f"[ASR] 未唤醒状态,检查唤醒词...")
- print(f"[ASR] 唤醒词列表: {WAKE_WORDS}")
- print(f"[ASR] 识别文本: '{text}'")
- if wake_detector.check_wake_word(text):
- print(f"[ASR] ✓ 检测到唤醒词!")
- wake_detector.wake_up() # 自动播放唤醒确认
- # 去除唤醒词后的内容
- original_text = text
- for wake_word in wake_detector.wake_words:
- text = text.replace(wake_word, "").strip()
-
- # 🔧 去除前后的标点符号
- text = text.strip(',。!?、;:,.!?;: ')
-
- print(f"[ASR] 去除唤醒词后: '{text}'")
-
- # 🔧 先停止ASR,再把文本传给大模型,附加视觉信息
- print("[ASR] 停止ASR程序(准备将文本传给大模型)")
- self._stop_asr_process()
- text_with_visual = f"{text}{visual_context}"
- print(f"[ASR] → 放入text_queue: '{text_with_visual}'")
- wake_detector.update_activity()
- text_queue.put(text_with_visual)
- else:
- # 不启用唤醒词,直接处理
- print(f"[ASR] 唤醒词未启用,直接处理")
-
- # 🔧 先停止ASR,再把文本传给大模型
- print("[ASR] 停止ASR程序(准备将文本传给大模型)")
- self._stop_asr_process()
- text_with_visual = f"{text}{visual_context}"
- print(f"[ASR] → 放入text_queue: '{text_with_visual}'")
- text_queue.put(text_with_visual)
-
- print(f"[ASR] ========== 处理完成 ==========\n")
- # ============ 4. 大模型对话模块(千问API)============
- # ============ 天气查询工具 ============
- def query_weather(city):
- """查询指定城市的天气信息"""
- try:
- import requests
-
- # 构建请求URL
- url = f"{WEATHER_API_URL}?key={WEATHER_API_KEY}&city={city}"
-
- # 发送GET请求
- response = requests.get(url, timeout=10)
-
- # 解析响应
- result = response.json()
-
- # 检查响应状态
- if result.get("status") == 1 and result.get("message") == "success":
- data = result.get("data", {})
-
- # 提取关键天气信息
- temp_c = data.get("temp_C", "")
- feels_like = data.get("FeelsLikeC", "")
- humidity = data.get("humidity", "")
- weather_desc = data["weatherDesc"][0]["value"] if "weatherDesc" in data and data["weatherDesc"] else ""
- wind_speed = data.get("windspeedKmph", "0")
- wind_dir = data.get("winddir16Point", "")
- city_name = data.get("city", city)
-
- # 转换为整数进行比较
- try:
- wind_speed_int = int(wind_speed)
- except ValueError:
- wind_speed_int = 0
-
- if wind_speed_int < 1:
- windlv="无风"
- elif wind_speed_int < 5:
- windlv="1"
- elif wind_speed_int < 10:
- windlv="2"
- elif wind_speed_int < 20:
- windlv="3"
- elif wind_speed_int < 30:
- windlv="4"
- else:
- windlv = "5"
- # 处理风向
- wind = ""
- if wind_dir == "E":
- wind = "东"
- elif wind_dir == "NE":
- wind = "东北"
- elif wind_dir == "NW":
- wind = "西北"
- elif wind_dir == "N":
- wind = "北"
- elif wind_dir == "W":
- wind = "西"
- elif wind_dir == "SW":
- wind = "西南"
- elif wind_dir == "S":
- wind = "南"
- elif wind_dir == "SE":
- wind = "东南"
-
-
- # 格式化天气信息
- weather_info = f"{city_name}当前天气:{weather_desc},温度{temp_c}摄氏度,体感温度{feels_like}摄氏度,湿度百分之{humidity},{wind}风{windlv}级"
- return weather_info
- else:
- # API返回错误信息
- error_msg = result.get("message", "查询失败")
- return f"天气查询失败:{error_msg}"
-
- except Exception as e:
- # 捕获网络或解析错误
- return f"天气查询失败:{str(e)}"
- # ============ 人脸识别工具 ============
- def recognize_face():
- """识别人脸信息,返回识别结果"""
- try:
- print("[人脸识别工具] 开始识别人脸...")
-
- # 使用全局人脸识别实例
- global global_face_recognition_instance
-
- if global_face_recognition_instance is None:
- return "人脸识别失败:人脸识别模块未初始化"
-
- # 获取全局实例
- face_recognition_module = global_face_recognition_instance
-
- # 使用已打开的摄像头进行识别
- import cv2
- import time
-
- # 尝试多次识别,提高成功率
- recognition_results = []
- max_attempts = 3
- attempts = 0
-
- # 获取摄像头锁,避免冲突
- with face_recognition_camera_lock:
- while attempts < max_attempts:
- # 读取摄像头帧
- ret, frame = face_recognition_module.camera.read()
- if not ret:
- attempts += 1
- time.sleep(0.1)
- continue
-
- # 检测和识别人脸
- results = face_recognition_module.face_recognition.detect_and_recognize(frame)
-
- if results["detected"] and len(results["faces"]) > 0:
- recognition_results.append(results)
-
- # 短暂休眠,避免过度占用资源
- time.sleep(0.1)
- attempts += 1
-
- # 处理识别结果
- if recognition_results:
- # 选择检测到人脸最多的结果
- best_result = max(recognition_results, key=lambda x: x["count"])
-
- # 过滤出已知人脸
- known_faces = []
- for face in best_result["faces"]:
- if face["name"] != "Unknown":
- known_faces.append(face)
-
- if len(known_faces) == 1:
- # 单一人脸
- face = known_faces[0]
- name = face["name"]
- confidence = face["confidence"]
- similarity = face["similarity"]
-
- return f"识别成功!这是{name},相似度{similarity:.2f},置信度{confidence:.2f}"
- elif len(known_faces) > 1:
- # 多个人脸,只返回已知人脸信息
- known_names = [face["name"] for face in known_faces]
- # 去重
- known_names = list(set(known_names))
- if len(known_names) == 1:
- # 多个人脸但都是同一个人
- return f"识别成功!检测到{len(known_faces)}张人脸,都是{known_names[0]}"
- else:
- # 多个人脸且是不同的人
- names_str = "、".join(known_names)
- return f"识别成功!检测到{len(known_faces)}张人脸,其中已知的有:{names_str}"
- elif best_result["count"] == 1:
- # 只有一张未知人脸
- return f"识别成功!检测到人脸,但系统中没有该人脸信息"
- else:
- # 多张人脸但都是未知的
- return "识别成功!检测到多张人脸,但系统中没有这些人脸的信息"
- else:
- return "识别失败:未检测到人脸"
-
- except Exception as e:
- import traceback
- traceback.print_exc()
- return f"人脸识别失败:{str(e)}"
- class QwenChat:
- """千问API对话"""
-
- def __init__(self):
- self.api_key = QWEN_API_KEY
- self.model = QWEN_MODEL
-
- # 初始化对话历史
- self.conversation_history = [{
- "role": "system",
- "content": """你是一个智能语音聊天助手,名叫'优宝'。请用简洁、友好、口语化的语言回答用户问题。
- 回答要通俗易懂,避免使用过于专业的术语。每次回答控制在80字以内,适合语音朗读。
- 不要输出颜文字表情,也不要使用emoji表情。
-
- 当用户询问天气相关问题时,你可以调用query_weather工具来获取实时天气信息。
- 例如:
- - 用户问:'北京今天天气怎么样?',你应该调用query_weather工具,参数city为'北京'
- - 用户问:'上海的天气如何?',你应该调用query_weather工具,参数city为'上海'
- - 用户问:'武汉今天热吗?',你应该调用query_weather工具,参数city为'武汉'
-
- 当用户提出身份确认类问题时,你可以调用recognize_face工具来获取人脸信息。
- 例如:
- - 用户问:'我是谁?',你应该调用recognize_face工具
- - 用户问:'你认识我吗?',你应该调用recognize_face工具
- - 用户问:'你知道我是谁吗?',你应该调用recognize_face工具"""
- }]
-
- def chat(self, user_message):
- """发送消息并获取回复"""
- try:
-
- dashscope.api_key = self.api_key
-
- # 添加到历史
- self.conversation_history.append({
- "role": "user",
- "content": user_message
- })
-
- # 定义工具列表
- tools = [
- {
- "type": "function",
- "function": {
- "name": "query_weather",
- "description": "查询指定城市的天气信息",
- "parameters": {
- "type": "object",
- "properties": {
- "city": {
- "type": "string",
- "description": "城市名称,例如:北京、上海、武汉"
- }
- },
- "required": ["city"]
- }
- }
- },
- {
- "type": "function",
- "function": {
- "name": "recognize_face",
- "description": "识别人脸信息,用于身份确认",
- "parameters": {
- "type": "object",
- "properties": {},
- "required": []
- }
- }
- }
- ]
-
- # 调用千问API
- response = dashscope.Generation.call(
- model=self.model,
- messages=self.conversation_history,
- tools=tools,
- result_format='message',
- stream=False,
- top_p=0.8,
- temperature=0.7,
- )
-
- if response.status_code == HTTPStatus.OK:
- # 获取模型回复
- message = response.output.choices[0].message
-
- # 初始化assistant_message
- assistant_message = ""
-
- # 检查是否需要工具调用 - 更安全的方式
- # 先检查message的类型
- if isinstance(message, dict):
- # 字典类型直接使用get方法
- tool_calls = message.get('tool_calls', None)
- else:
- # 对象类型尝试获取属性
- try:
- tool_calls = message.tool_calls
- except (AttributeError, KeyError):
- tool_calls = None
-
- if tool_calls:
- # 处理工具调用
- for tool_call in tool_calls:
- try:
- # 获取工具调用的基本信息
- if isinstance(tool_call, dict):
- tool_call_id = tool_call.get('id', '')
- function_info = tool_call.get('function', {})
- func_name = function_info.get('name', '')
- arguments = function_info.get('arguments', '{}')
- else:
- # 对象类型
- tool_call_id = getattr(tool_call, 'id', '')
- function_obj = getattr(tool_call, 'function', None)
- func_name = getattr(function_obj, 'name', '') if function_obj else ''
- arguments = getattr(function_obj, 'arguments', '{}') if function_obj else '{}'
-
- if func_name == "query_weather":
- # 解析参数
- params = json.loads(arguments)
- city = params.get("city")
-
- # 调用天气查询函数
- tool_result = query_weather(city)
-
- # 添加工具调用记录到历史
- self.conversation_history.append({
- "role": "assistant",
- "tool_calls": [{
- "id": tool_call_id,
- "type": "function",
- "function": {
- "name": func_name,
- "arguments": arguments
- }
- }]
- })
-
- # 添加工具执行结果到历史
- self.conversation_history.append({
- "role": "tool",
- "name": "query_weather",
- "content": tool_result,
- "tool_call_id": tool_call_id
- })
-
- # 再次调用API获取最终回复
- final_response = dashscope.Generation.call(
- model=self.model,
- messages=self.conversation_history,
- result_format='message',
- stream=False,
- top_p=0.8,
- temperature=0.7,
- )
-
- if final_response.status_code == HTTPStatus.OK:
- final_message = final_response.output.choices[0].message
- # 获取最终回复
- if isinstance(final_message, dict):
- assistant_message = final_message.get('content', '')
- else:
- assistant_message = getattr(final_message, 'content', '')
-
- # 添加最终回复到历史
- self.conversation_history.append({
- "role": "assistant",
- "content": assistant_message
- })
- else:
- assistant_message = "抱歉,处理天气信息时出错了。"
- elif func_name == "recognize_face":
- # 调用人脸识别函数
- tool_result = recognize_face()
-
- # 添加工具调用记录到历史
- self.conversation_history.append({
- "role": "assistant",
- "tool_calls": [{
- "id": tool_call_id,
- "type": "function",
- "function": {
- "name": func_name,
- "arguments": arguments
- }
- }]
- })
-
- # 添加工具执行结果到历史
- self.conversation_history.append({
- "role": "tool",
- "name": "recognize_face",
- "content": tool_result,
- "tool_call_id": tool_call_id
- })
-
- # 再次调用API获取最终回复
- final_response = dashscope.Generation.call(
- model=self.model,
- messages=self.conversation_history,
- result_format='message',
- stream=False,
- top_p=0.8,
- temperature=0.7,
- )
-
- if final_response.status_code == HTTPStatus.OK:
- final_message = final_response.output.choices[0].message
- # 获取最终回复
- if isinstance(final_message, dict):
- assistant_message = final_message.get('content', '')
- else:
- assistant_message = getattr(final_message, 'content', '')
-
- # 添加最终回复到历史
- self.conversation_history.append({
- "role": "assistant",
- "content": assistant_message
- })
- else:
- assistant_message = "抱歉,处理人脸识别信息时出错了。"
- else:
- assistant_message = "抱歉,我不支持该工具。"
- except Exception as e:
- print(f"[千问] 处理工具调用时出错: {e}")
- assistant_message = "抱歉,处理工具调用时出错了。"
- else:
- # 直接回复
- if isinstance(message, dict):
- assistant_message = message.get('content', '')
- else:
- assistant_message = getattr(message, 'content', '')
-
- # 添加到历史
- self.conversation_history.append({
- "role": "assistant",
- "content": assistant_message
- })
-
- # 限制历史长度(保留系统提示词+最近8轮对话)
- if len(self.conversation_history) > 17:
- self.conversation_history = [self.conversation_history[0]] + self.conversation_history[-16:]
-
- return assistant_message
- else:
- print(f"[千问] API错误: {response.code} - {response.message}")
- return "抱歉,我现在无法回答。"
-
- except Exception as e:
- print(f"[千问] 错误: {e}")
- import traceback
- traceback.print_exc()
- return "抱歉,出现了错误。"
-
- def run(self):
- """对话线程主循环"""
- while not stop_flag.is_set():
- try:
- user_text = text_queue.get(timeout=1)
- # 🔧 LLM处理时禁用ASR文本传入LLM
- asr_input_enabled.clear()
- print(f"[LLM] 处理: {user_text}")
- response = self.chat(user_text)
- print(f"[LLM] 回复: {response}")
- response_queue.put(response)
- # 🔧 LLM处理完成后,ASR文本传入会在TTS播放结束后0.2秒才启用
-
- except queue.Empty:
- continue
- except Exception as e:
- print(f"[千问] 线程错误: {e}")
- import traceback
- traceback.print_exc()
- # ============ 5. TTS语音合成模块(阿里云NLS)============
- class AliyunTTS:
- """阿里云NLS语音合成"""
-
- def __init__(self):
- try:
- self.appkey = ALIYUN_APPKEY
- self.access_key_id = ALIYUN_ACCESS_KEY_ID
- self.access_key_secret = ALIYUN_ACCESS_KEY_SECRET
- self.token = None
-
- # 获取Token
- self._get_token()
- except Exception as e:
- print(f"[TTS] ✗ 初始化失败: {e}")
- raise
-
- def _get_token(self):
- """获取NLS访问Token(使用签名认证)"""
- import requests
- import hmac
- import hashlib
- import base64
- from datetime import datetime
- import uuid
- from urllib.parse import quote
-
- try:
- # OpenAPI参数
- timestamp = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')
- nonce = str(uuid.uuid4())
-
- # 公共请求参数
- params = {
- 'AccessKeyId': self.access_key_id,
- 'Action': 'CreateToken',
- 'Format': 'JSON',
- 'RegionId': 'cn-shanghai',
- 'SignatureMethod': 'HMAC-SHA1',
- 'SignatureNonce': nonce,
- 'SignatureVersion': '1.0',
- 'Timestamp': timestamp,
- 'Version': '2019-02-28',
- }
-
- # 构造签名字符串
- sorted_params = sorted(params.items())
- canonicalized_query_string = '&'.join([f"{quote(k, safe='')}={quote(str(v), safe='')}" for k, v in sorted_params])
- string_to_sign = f"GET&%2F&{quote(canonicalized_query_string, safe='')}"
-
- # 计算签名
- signature = base64.b64encode(
- hmac.new(
- (self.access_key_secret + '&').encode('utf-8'),
- string_to_sign.encode('utf-8'),
- hashlib.sha1
- ).digest()
- ).decode('utf-8')
-
- # 添加签名
- params['Signature'] = signature
-
- # 发送请求
- url = "https://nls-meta.cn-shanghai.aliyuncs.com/"
-
- print("[TTS] 正在获取NLS Token...")
- response = requests.get(url, params=params, timeout=10)
-
- print(f"[TTS] HTTP状态码: {response.status_code}")
-
- if response.status_code == 200:
- result = response.json()
-
- if 'Token' in result and isinstance(result['Token'], dict):
- if 'Id' in result['Token']:
- self.token = result['Token']['Id']
- return
-
- except Exception as e:
- print(f"[TTS] Token获取异常: {e}")
-
- self.token = None
-
- def synthesize(self, text):
- """合成语音(使用NLS RESTful API)"""
- import requests
- import json
-
- try:
- # NLS语音合成API地址
- url = "https://nls-gateway.cn-shanghai.aliyuncs.com/stream/v1/tts"
-
- # 请求参数
- params = {
- "appkey": self.appkey,
- "text": text,
- "format": "pcm",
- "sample_rate": 16000,
- "voice": TTS_VOICE, # 使用配置的音色
- "volume": TTS_VOLUME, # 使用配置的音量
- "speech_rate": TTS_SPEECH_RATE, # 使用配置的语速
- "pitch_rate": TTS_PITCH_RATE # 使用配置的音调
- }
-
- # 检查Token
- if not self.token:
- self._get_token()
-
- # 请求头 - 使用 Bearer Token 格式
- headers = {
- "Content-Type": "application/json",
- }
-
- # 添加Token
- if self.token:
- headers["X-NLS-Token"] = self.token
-
- # 发送请求
- response = requests.post(
- url,
- params=params,
- headers=headers,
- timeout=10
- )
-
- if response.status_code == 200:
- # 返回的是PCM音频数据
- pcm_data = response.content
-
- # 将PCM bytes转换为numpy数组
- audio_int16 = np.frombuffer(pcm_data, dtype=np.int16)
- audio_float32 = audio_int16.astype(np.float32) / 32767.0
-
- return audio_float32
- else:
- print(f"[TTS] API调用失败: {response.status_code}")
- print(f"[TTS] 错误内容: {response.text}")
-
- # 如果Token过期,重新获取
- if response.status_code == 401:
- self._get_token()
-
- return None
-
- except Exception as e:
- print(f"[TTS] 合成错误: {e}")
- import traceback
- traceback.print_exc()
- return None
-
- def run(self):
- """TTS线程主循环"""
- while not stop_flag.is_set():
- try:
- text = response_queue.get(timeout=1)
- print(f"[TTS] 合成: {text[:30]}...")
- audio = self.synthesize(text)
-
- if audio is not None:
- tts_queue.put(audio)
- print(f"[TTS] 完成")
-
- except queue.Empty:
- continue
- except Exception as e:
- print(f"[TTS] 线程错误: {e}")
- # ============ 6. 音频播放模块 ============
- class AudioPlayer:
- """音频播放器"""
-
- def __init__(self, wake_detector=None, asr=None):
- self.wake_detector = wake_detector
- self.asr = asr # ASR实例,用于控制ASR的启动和停止
-
- def play(self, audio_data, sample_rate=16000):
- """播放音频"""
- try:
- is_speaking.set() # 标记为播放中,此时会暂停音频采集
-
- # 🔧 TTS播放时禁用ASR文本传入LLM
- asr_input_enabled.clear()
-
- # 确保是numpy数组
- if not isinstance(audio_data, np.ndarray):
- audio_data = np.array(audio_data)
-
- # 如果是双声道,转为单声道
- if len(audio_data.shape) > 1:
- audio_data = audio_data.mean(axis=1)
-
- # 使用OutputStream控制缓冲区大小和延迟,减少ALSA欠载错误
- with sd.OutputStream(
- samplerate=sample_rate,
- device=SPEAKER_DEVICE,
- channels=1,
- dtype=np.float32,
- latency=LATENCY, # 增加延迟以减少ALSA欠载错误
- blocksize=int(sample_rate * 0.1) # 增加缓冲区大小
- ) as stream:
- # 写入音频数据
- stream.write(audio_data)
- # 🔧 等待缓冲区中的数据完全播放完成
- stream.stop() # 确保所有数据都已播放
- time.sleep(0.1) # 额外等待0.1秒,确保最后一个字播放完成
-
- # 🔧 播放完成后延迟足够时间,避免捕获回声和残留声音
- time.sleep(0.4) # 增加到0.5秒,确保回声完全消失
-
- is_speaking.clear() # 恢复音频采集
-
- # 🔧 TTS播放完成后等待0.2秒再启用ASR文本传入LLM
- time.sleep(0.5)
- asr_input_enabled.set()
-
- # 🔧 清空播放期间累积的音频队列
- while not audio_queue.empty():
- try:
- audio_queue.get_nowait()
- except:
- break
- except Exception as e:
- print(f"[播放] 错误: {e}")
- is_speaking.clear()
- asr_input_enabled.set() # 出错时也要恢复
-
- def run(self):
- """播放线程主循环"""
- while not stop_flag.is_set():
- try:
- audio_data = tts_queue.get(timeout=1)
-
- # 🔧 播放前清空音频队列和停止当前识别
- while not audio_queue.empty():
- try:
- audio_queue.get_nowait()
- except:
- break
-
- # 停止当前监听状态
- is_listening.clear()
-
- self.play(audio_data)
-
- # 检查是否需要休眠(播放完告别语后)
- if sleep_requested.is_set():
- time.sleep(0.3) # 短暂延迟
- if self.wake_detector:
- self.wake_detector.sleep() # 执行休眠
- sleep_requested.clear() # 清除标志
-
- # 检查是否是告别语播放完成后需要退出程序
- if exit_requested.is_set():
- time.sleep(0.5) # 短暂延迟,让用户听完
- stop_flag.set() # 触发主程序退出
- break
-
- except queue.Empty:
- continue
- except Exception:
- pass
- # ============ 7. 主控制器 ============
- # 预设个性化欢迎词模板
- WELCOME_TEMPLATES = {
- "warm": [
- "你好,{name}!欢迎回来!",
- "嗨,{name}!好久不见,很高兴见到你!",
- "{name},欢迎你!今天过得怎么样?",
- "你好啊,{name}!有什么我可以帮你的吗?",
- "欢迎回来,{name}!最近还好吗?"
- ],
- "professional": [
- "您好,{name}!欢迎使用智能语音助手。",
- "{name},您好!请问有什么可以为您服务的?",
- "欢迎,{name}!我已准备好为您提供帮助。",
- "您好,{name}!很高兴为您服务。",
- "{name},您好!请告诉我您的需求。"
- ],
- "casual": [
- "嘿,{name}!又见面啦!",
- "哇,{name}!今天你看起来气色不错!",
- "{name}!欢迎欢迎,热烈欢迎!",
- "哟,{name}!什么风把你吹来了?",
- "嗨呀,{name}!今天想聊点啥?"
- ]
- }
- class VoiceChatAssistant:
- """语音聊天助手主控制器"""
-
- def __init__(self):
- self.audio_capture = AudioCapture()
- self.wake_detector = WakeWordDetector()
- self.asr = ExternalASR() # 使用外部ASR程序
- self.llm = QwenChat()
- self.tts = AliyunTTS()
- self.player = AudioPlayer(wake_detector=self.wake_detector, asr=self.asr) # 传入wake_detector和asr
-
- # 初始化人脸识别模块
- self.face_recognition = None
- if FACE_RECOGNITION_ENABLED:
- self.face_recognition = FaceRecognitionModule()
-
- self.threads = []
- self.current_face_name = None
- self.face_event_processed = False
-
- def _generate_welcome_message(self, name):
- """生成个性化欢迎语"""
- import random
-
- # 随机选择欢迎语风格
- style = random.choice(list(WELCOME_TEMPLATES.keys()))
- templates = WELCOME_TEMPLATES[style]
-
- # 随机选择一个模板并填充名字
- welcome_message = random.choice(templates).format(name=name)
- return welcome_message
-
- def _face_event_handler(self):
- """处理人脸事件的线程"""
- while not stop_flag.is_set():
- try:
- # 获取人脸事件
- event = face_event_queue.get(timeout=1)
-
- # 初始化消息
- welcome_message = ""
-
- if event["type"] == "known_single_face":
- # 已知单一人脸,执行个性化打招呼
- face_name = event["name"]
-
- # 检查是否是新的人脸
- if face_name != self.current_face_name:
- self.current_face_name = face_name
- self.face_event_processed = False
-
- # 如果还没有处理过这个人脸事件,生成欢迎语
- if not self.face_event_processed:
- print(f"[人脸识别] 检测到已知人脸 {face_name},准备唤醒...")
-
- # 生成个性化欢迎语
- welcome_message = self._generate_welcome_message(face_name)
-
- # 唤醒智能体
- self.wake_detector.wake_up()
- print(f"[人脸识别] 智能体已唤醒,唤醒状态: {is_awake.is_set()}")
-
- # 发送欢迎语到TTS队列
- response_queue.put(welcome_message)
- print(f"[人脸识别] 欢迎语已发送: {welcome_message}")
-
- # 标记事件已处理
- self.face_event_processed = True
-
- # 暂时暂停人脸识别模块,节省资源
- if self.face_recognition:
- self.face_recognition.pause()
- print(f"[人脸识别] 检测到已知人脸 {face_name},暂停人脸识别功能")
-
- elif event["type"] == "unknown_single_face" or event["type"] == "multiple_faces":
- # 未知单一人脸或多个人脸,执行通用礼貌问好
- if not hasattr(self, 'face_event_processed') or not self.face_event_processed:
- print(f"[人脸识别] 检测到{event['type']},准备唤醒...")
-
- if event["type"] == "unknown_single_face":
- welcome_message = "你好,欢迎使用智能语音助手!"
- else:
- welcome_message = f"你们好,欢迎使用智能语音助手!"
-
- # 唤醒智能体
- self.wake_detector.wake_up()
- print(f"[人脸识别] 智能体已唤醒,唤醒状态: {is_awake.is_set()}")
-
- # 发送欢迎语到TTS队列
- response_queue.put(welcome_message)
- print(f"[人脸识别] 欢迎语已发送: {welcome_message}")
-
- # 标记事件已处理
- self.face_event_processed = True
-
- # 暂时暂停人脸识别模块,节省资源
- if self.face_recognition:
- self.face_recognition.pause()
- print(f"[人脸识别] 检测到{event['type']},暂停人脸识别功能")
-
- elif event["type"] == "multiple_known_faces":
- # 多个人脸,只对已知人脸打招呼
- if not hasattr(self, 'face_event_processed') or not self.face_event_processed:
- known_names = event.get("known_names", [])
- print(f"[人脸识别] 检测到多个人脸,已知人脸: {known_names},准备唤醒...")
-
- # 生成包含所有已知人脸的欢迎语
- if len(known_names) == 1:
- welcome_message = self._generate_welcome_message(known_names[0])
- else:
- # 多人欢迎语
- name_list = "、".join(known_names)
- welcome_message = f"你们好,{name_list}!欢迎使用智能语音助手!"
-
- # 唤醒智能体
- self.wake_detector.wake_up()
- print(f"[人脸识别] 智能体已唤醒,唤醒状态: {is_awake.is_set()}")
-
- # 发送欢迎语到TTS队列
- response_queue.put(welcome_message)
- print(f"[人脸识别] 欢迎语已发送: {welcome_message}")
-
- # 标记事件已处理
- self.face_event_processed = True
-
- # 暂时暂停人脸识别模块,节省资源
- if self.face_recognition:
- self.face_recognition.pause()
- print(f"[人脸识别] 检测到多个人脸,暂停人脸识别功能")
-
- except queue.Empty:
- continue
- except Exception as e:
- print(f"❌ 人脸事件处理错误: {e}")
- import traceback
- traceback.print_exc()
-
- def start(self):
- """启动所有模块"""
- # 检查配置
- if not QWEN_API_KEY or QWEN_API_KEY.startswith("your-"):
- print("⚠️ 警告: 请配置DashScope API Key!")
- print(" 1. 访问 https://dashscope.aliyun.com/")
- print(" 2. 登录后在控制台获取API-KEY")
- print(" 3. 将API Key填入代码第36行 QWEN_API_KEY 处")
-
- # 启动音频采集
- self.audio_capture.start()
-
- # 启动人脸识别模块
- if self.face_recognition:
- if self.face_recognition.start():
- # 启动人脸事件处理线程
- face_event_thread = threading.Thread(target=self._face_event_handler, daemon=True, name="人脸事件处理")
- face_event_thread.start()
- self.threads.append(face_event_thread)
-
- # 启动各工作线程
- threads_config = [
- ("ASR识别", lambda: self.asr.run(self.wake_detector)),
- ("千问对话", self.llm.run),
- ("TTS合成", self.tts.run),
- ("音频播放", self.player.run),
- ]
-
- for name, target in threads_config:
- t = threading.Thread(target=target, name=name, daemon=True)
- t.start()
- self.threads.append(t)
-
- # 主循环(状态监控)
- try:
- while True:
- time.sleep(0.5)
-
- # 检查唤醒超时
- if ENABLE_WAKE_WORD:
- self.wake_detector.check_timeout()
-
- except KeyboardInterrupt:
- self.stop()
-
- def stop(self):
- """停止所有模块"""
- stop_flag.set()
- self.audio_capture.stop()
-
- # 停止人脸识别模块
- if self.face_recognition:
- self.face_recognition.stop()
-
- # 清空队列
- while not audio_queue.empty():
- try:
- audio_queue.get_nowait()
- except:
- break
- while not text_queue.empty():
- try:
- text_queue.get_nowait()
- except:
- break
- while not response_queue.empty():
- try:
- response_queue.get_nowait()
- except:
- break
- while not tts_queue.empty():
- try:
- tts_queue.get_nowait()
- except:
- break
- while not face_event_queue.empty():
- try:
- face_event_queue.get_nowait()
- except:
- break
-
- # 等待线程结束
- for t in self.threads:
- t.join(timeout=2)
-
-
- # ============ 主入口 ============
- if __name__ == "__main__":
- # 启动助手
- assistant = VoiceChatAssistant()
- assistant.start()
|