############################################################################### # 实时音频播放模块 - 监听 WAV 文件夹并播放新音频 ############################################################################### import os import time import wave import numpy as np import threading import queue from pathlib import Path from logger import logger class RealtimeAudioPlayer: """实时监控文件夹并播放新到达的 WAV 音频""" def __init__(self, watch_dir, sample_rate=16000): """ 初始化音频播放器 Args: watch_dir: 监听的文件夹路径 sample_rate: 采样率(默认16000Hz) """ self.watch_dir = watch_dir self.sample_rate = sample_rate # 确保监听目录存在 os.makedirs(watch_dir, exist_ok=True) # 状态管理 self.is_running = False self.is_playing = False self.current_audio = None # 音频数据队列 self.audio_queue = queue.Queue(maxsize=50) # 已处理的文件 self.processed_files = set() # 线程控制 self.watch_thread = None self.play_thread = None self.stop_event = threading.Event() # 回调函数 self.on_audio = None logger.info(f"🎵 实时音频播放器已初始化") logger.info(f" 监听目录: {watch_dir}") logger.info(f" 采样率: {sample_rate}Hz") def set_callbacks(self, on_audio=None): """ 设置回调函数 Args: on_audio: 音频数据回调 (audio_data: np.ndarray) """ self.on_audio = on_audio def start(self): """启动监听和播放""" if self.is_running: logger.warning("⚠️ 音频播放器已在运行") return logger.info("🎵 启动实时音频播放器...") self.is_running = True self.stop_event.clear() # 启动监听线程 self.watch_thread = threading.Thread(target=self._watch_loop, daemon=True) self.watch_thread.start() # 启动播放线程 self.play_thread = threading.Thread(target=self._play_loop, daemon=True) self.play_thread.start() logger.info("✅ 实时音频播放器已启动") def stop(self): """停止播放""" if not self.is_running: return logger.info("🛑 停止实时音频播放器...") self.is_running = False self.stop_event.set() if self.watch_thread: self.watch_thread.join(timeout=2) if self.play_thread: self.play_thread.join(timeout=2) logger.info("✅ 实时音频播放器已停止") def _watch_loop(self): """监听文件夹循环""" logger.info("👁️ 开始监听音频文件...") while not self.stop_event.is_set(): try: # 查找所有 WAV 文件 wav_files = list(Path(self.watch_dir).glob('*.wav')) # 过滤未处理的文件 new_wavs = [f for f in wav_files if f.name not in self.processed_files] if new_wavs: # 按修改时间排序,处理最新的 new_wavs.sort(key=lambda p: p.stat().st_mtime) for wav_path in new_wavs: if self.stop_event.is_set(): break logger.info(f"📥 检测到新音频文件: {wav_path.name}") self._load_audio(wav_path) self.processed_files.add(wav_path.name) # 等待一段时间再检查 time.sleep(0.5) except Exception as e: logger.error(f"❌ 监听音频文件出错: {e}") time.sleep(1) def _load_audio(self, wav_path): """ 加载 WAV 文件到队列 Args: wav_path: WAV 文件路径 """ try: # 读取 WAV 文件 with wave.open(str(wav_path), 'rb') as wf: n_channels = wf.getnchannels() sample_width = wf.getsampwidth() framerate = wf.getframerate() n_frames = wf.getnframes() # 读取所有音频数据 audio_data = wf.readframes(n_frames) # 转换为 numpy 数组 if sample_width == 2: audio_array = np.frombuffer(audio_data, dtype=np.int16) elif sample_width == 1: audio_array = np.frombuffer(audio_data, dtype=np.uint8) else: logger.warning(f"⚠️ 不支持的采样宽度: {sample_width}") return # 如果是立体声,转换为单声道 if n_channels == 2: audio_array = audio_array.reshape(-1, 2).mean(axis=1).astype(np.int16) # 重采样(如果需要) if framerate != self.sample_rate: logger.info(f"🔄 重采样: {framerate}Hz -> {self.sample_rate}Hz") audio_array = self._resample(audio_array, framerate, self.sample_rate) # 添加到队列 logger.info(f"📤 音频已加入队列: {wav_path.name}, 时长: {len(audio_array)/self.sample_rate:.2f}s") self.audio_queue.put(audio_array) except Exception as e: logger.error(f"❌ 加载音频文件失败 {wav_path}: {e}") def _resample(self, audio_data, orig_sr, target_sr): """ 简单的线性插值重采样 Args: audio_data: 音频数据 orig_sr: 原始采样率 target_sr: 目标采样率 """ ratio = target_sr / orig_sr new_length = int(len(audio_data) * ratio) new_data = np.zeros(new_length, dtype=np.int16) for i in range(new_length): orig_idx = i / ratio idx = int(orig_idx) frac = orig_idx - idx if idx + 1 < len(audio_data): new_data[i] = int(audio_data[idx] * (1 - frac) + audio_data[idx + 1] * frac) else: new_data[i] = audio_data[idx] return new_data def _play_loop(self): """播放音频数据循环""" logger.info("🎵 开始播放音频...") while not self.stop_event.is_set(): try: # 从队列获取音频数据 if not self.audio_queue.empty(): audio_data = self.audio_queue.get(timeout=0.1) self.is_playing = True logger.info(f"🔊 正在播放音频,长度: {len(audio_data)/self.sample_rate:.2f}s") # 通过回调发送音频数据 if self.on_audio: # 计算每帧的大小(20ms) chunk_size = self.sample_rate // 50 # 16000 / 50 = 320 samples per 20ms # 按帧发送 for i in range(0, len(audio_data), chunk_size): if self.stop_event.is_set(): break chunk = audio_data[i:i + chunk_size] # 如果最后一个 chunk 不足,补零 if len(chunk) < chunk_size: chunk = np.pad(chunk, (0, chunk_size - len(chunk)), 'constant') self.on_audio(chunk) # 控制播放速度 time.sleep(0.02) # 20ms per chunk self.is_playing = False logger.info("✅ 音频播放完成") else: time.sleep(0.01) except queue.Empty: continue except Exception as e: logger.error(f"❌ 播放音频出错: {e}") self.is_playing = False time.sleep(0.1)