""" 音频采集模块 提供音频输入流管理,支持AEC回音消除 """ import asyncio import numpy as np import sounddevice as sd from typing import Callable, Optional, List from pathlib import Path class AudioListener: """音频监听器协议""" def on_audio_data(self, audio_data: np.ndarray) -> None: """接收音频数据的回调""" ... class AudioCapture: """音频采集器 - 支持AEC回音消除""" def __init__(self, sample_rate: int = 16000, channels: int = 1, device_id: Optional[int] = None, enable_aec: bool = False): """ 初始化音频采集器 Args: sample_rate: 采样率 channels: 声道数 device_id: 设备ID,None表示自动选择 enable_aec: 是否启用AEC回音消除 """ self.sample_rate = sample_rate self.channels = channels self.device_id = device_id self.enable_aec = enable_aec self.input_stream = None self.frame_size = int(sample_rate * 0.02) # 20ms帧 self._audio_listeners: List[AudioListener] = [] self._is_closing = False # AEC处理器 self.aec_processor = None def add_audio_listener(self, listener: AudioListener): """添加音频监听器""" if listener not in self._audio_listeners: self._audio_listeners.append(listener) def remove_audio_listener(self, listener: AudioListener): """移除音频监听器""" if listener in self._audio_listeners: self._audio_listeners.remove(listener) async def start(self): """启动音频采集""" try: # 初始化AEC处理器(如果启用) if self.enable_aec: try: from .aec_processor import AECProcessor self.aec_processor = AECProcessor( sample_rate=self.sample_rate, channels=self.channels ) await self.aec_processor.initialize() print(f"AEC回音消除已启用") except Exception as e: print(f"AEC初始化失败,将使用原始音频: {e}") self.aec_processor = None # 自动选择设备 if self.device_id is None: devices = sd.query_devices() input_devices = [ (i, d) for i, d in enumerate(devices) if d['max_input_channels'] > 0 ] if not input_devices: raise RuntimeError("找不到可用的输入设备") self.device_id = input_devices[0][0] print(f"自动选择输入设备: {input_devices[0][1]['name']}") # 创建输入流 self.input_stream = sd.InputStream( device=self.device_id, samplerate=self.sample_rate, channels=self.channels, dtype=np.float32, blocksize=self.frame_size, callback=self._input_callback, latency="low", ) self.input_stream.start() print(f"音频采集已启动: {self.sample_rate}Hz {self.channels}ch") except Exception as e: raise RuntimeError(f"启动音频采集失败: {e}") def _input_callback(self, indata, frames, time_info, status): """输入回调""" if status and "overflow" not in str(status).lower(): print(f"输入流状态: {status}") if self._is_closing: return try: # 转换为 int16 格式 audio_data_int16 = (indata.flatten() * 32768.0).astype(np.int16) # 应用AEC处理(如果启用) if self.aec_processor and self.aec_processor.is_initialized: audio_data_int16 = self.aec_processor.process_audio(audio_data_int16) # 通知所有监听器 for listener in self._audio_listeners: try: listener.on_audio_data(audio_data_int16.copy()) except Exception as e: print(f"音频监听器处理失败: {e}") except Exception as e: print(f"输入回调错误: {e}") async def stop(self): """停止音频采集""" self._is_closing = True # 停止AEC处理器(如果启用) if self.aec_processor: try: await self.aec_processor.close() except Exception as e: print(f"关闭AEC处理器失败: {e}") finally: self.aec_processor = None if self.input_stream: try: if self.input_stream.active: self.input_stream.stop() self.input_stream.close() except Exception as e: print(f"关闭音频流失败: {e}") finally: self.input_stream = None self._audio_listeners.clear() print("音频采集已停止") @staticmethod def list_devices(): """列出所有可用设备""" devices = sd.query_devices() print("\n可用音频设备:") print("-" * 60) for i, device in enumerate(devices): if device['max_input_channels'] > 0: print(f"[{i}] {device['name']}") print(f" 输入声道: {device['max_input_channels']}, 采样率: {device['default_samplerate']}Hz") print("-" * 60)