| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243 |
- ###############################################################################
- # 实时音频播放模块 - 监听 WAV 文件夹并播放新音频
- ###############################################################################
- import os
- import time
- import wave
- import numpy as np
- import threading
- import queue
- from pathlib import Path
- from logger import logger
- class RealtimeAudioPlayer:
- """实时监控文件夹并播放新到达的 WAV 音频"""
-
- def __init__(self, watch_dir, sample_rate=16000):
- """
- 初始化音频播放器
-
- Args:
- watch_dir: 监听的文件夹路径
- sample_rate: 采样率(默认16000Hz)
- """
- self.watch_dir = watch_dir
- self.sample_rate = sample_rate
-
- # 确保监听目录存在
- os.makedirs(watch_dir, exist_ok=True)
-
- # 状态管理
- self.is_running = False
- self.is_playing = False
- self.current_audio = None
-
- # 音频数据队列
- self.audio_queue = queue.Queue(maxsize=50)
-
- # 已处理的文件
- self.processed_files = set()
-
- # 线程控制
- self.watch_thread = None
- self.play_thread = None
- self.stop_event = threading.Event()
-
- # 回调函数
- self.on_audio = None
-
- logger.info(f"🎵 实时音频播放器已初始化")
- logger.info(f" 监听目录: {watch_dir}")
- logger.info(f" 采样率: {sample_rate}Hz")
-
- def set_callbacks(self, on_audio=None):
- """
- 设置回调函数
-
- Args:
- on_audio: 音频数据回调 (audio_data: np.ndarray)
- """
- self.on_audio = on_audio
-
- def start(self):
- """启动监听和播放"""
- if self.is_running:
- logger.warning("⚠️ 音频播放器已在运行")
- return
-
- logger.info("🎵 启动实时音频播放器...")
- self.is_running = True
- self.stop_event.clear()
-
- # 启动监听线程
- self.watch_thread = threading.Thread(target=self._watch_loop, daemon=True)
- self.watch_thread.start()
-
- # 启动播放线程
- self.play_thread = threading.Thread(target=self._play_loop, daemon=True)
- self.play_thread.start()
-
- logger.info("✅ 实时音频播放器已启动")
-
- def stop(self):
- """停止播放"""
- if not self.is_running:
- return
-
- logger.info("🛑 停止实时音频播放器...")
- self.is_running = False
- self.stop_event.set()
-
- if self.watch_thread:
- self.watch_thread.join(timeout=2)
- if self.play_thread:
- self.play_thread.join(timeout=2)
-
- logger.info("✅ 实时音频播放器已停止")
-
- def _watch_loop(self):
- """监听文件夹循环"""
- logger.info("👁️ 开始监听音频文件...")
-
- while not self.stop_event.is_set():
- try:
- # 查找所有 WAV 文件
- wav_files = list(Path(self.watch_dir).glob('*.wav'))
-
- # 过滤未处理的文件
- new_wavs = [f for f in wav_files if f.name not in self.processed_files]
-
- if new_wavs:
- # 按修改时间排序,处理最新的
- new_wavs.sort(key=lambda p: p.stat().st_mtime)
-
- for wav_path in new_wavs:
- if self.stop_event.is_set():
- break
-
- logger.info(f"📥 检测到新音频文件: {wav_path.name}")
- self._load_audio(wav_path)
- self.processed_files.add(wav_path.name)
-
- # 等待一段时间再检查
- time.sleep(0.5)
-
- except Exception as e:
- logger.error(f"❌ 监听音频文件出错: {e}")
- time.sleep(1)
-
- def _load_audio(self, wav_path):
- """
- 加载 WAV 文件到队列
-
- Args:
- wav_path: WAV 文件路径
- """
- try:
- # 读取 WAV 文件
- with wave.open(str(wav_path), 'rb') as wf:
- n_channels = wf.getnchannels()
- sample_width = wf.getsampwidth()
- framerate = wf.getframerate()
- n_frames = wf.getnframes()
-
- # 读取所有音频数据
- audio_data = wf.readframes(n_frames)
-
- # 转换为 numpy 数组
- if sample_width == 2:
- audio_array = np.frombuffer(audio_data, dtype=np.int16)
- elif sample_width == 1:
- audio_array = np.frombuffer(audio_data, dtype=np.uint8)
- else:
- logger.warning(f"⚠️ 不支持的采样宽度: {sample_width}")
- return
-
- # 如果是立体声,转换为单声道
- if n_channels == 2:
- audio_array = audio_array.reshape(-1, 2).mean(axis=1).astype(np.int16)
-
- # 重采样(如果需要)
- if framerate != self.sample_rate:
- logger.info(f"🔄 重采样: {framerate}Hz -> {self.sample_rate}Hz")
- audio_array = self._resample(audio_array, framerate, self.sample_rate)
-
- # 添加到队列
- logger.info(f"📤 音频已加入队列: {wav_path.name}, 时长: {len(audio_array)/self.sample_rate:.2f}s")
- self.audio_queue.put(audio_array)
-
- except Exception as e:
- logger.error(f"❌ 加载音频文件失败 {wav_path}: {e}")
-
- def _resample(self, audio_data, orig_sr, target_sr):
- """
- 简单的线性插值重采样
-
- Args:
- audio_data: 音频数据
- orig_sr: 原始采样率
- target_sr: 目标采样率
- """
- ratio = target_sr / orig_sr
- new_length = int(len(audio_data) * ratio)
- new_data = np.zeros(new_length, dtype=np.int16)
-
- for i in range(new_length):
- orig_idx = i / ratio
- idx = int(orig_idx)
- frac = orig_idx - idx
-
- if idx + 1 < len(audio_data):
- new_data[i] = int(audio_data[idx] * (1 - frac) + audio_data[idx + 1] * frac)
- else:
- new_data[i] = audio_data[idx]
-
- return new_data
-
- def _play_loop(self):
- """播放音频数据循环"""
- logger.info("🎵 开始播放音频...")
-
- while not self.stop_event.is_set():
- try:
- # 从队列获取音频数据
- if not self.audio_queue.empty():
- audio_data = self.audio_queue.get(timeout=0.1)
-
- self.is_playing = True
- logger.info(f"🔊 正在播放音频,长度: {len(audio_data)/self.sample_rate:.2f}s")
-
- # 通过回调发送音频数据
- if self.on_audio:
- # 计算每帧的大小(20ms)
- chunk_size = self.sample_rate // 50 # 16000 / 50 = 320 samples per 20ms
-
- # 按帧发送
- for i in range(0, len(audio_data), chunk_size):
- if self.stop_event.is_set():
- break
-
- chunk = audio_data[i:i + chunk_size]
-
- # 如果最后一个 chunk 不足,补零
- if len(chunk) < chunk_size:
- chunk = np.pad(chunk, (0, chunk_size - len(chunk)), 'constant')
-
- self.on_audio(chunk)
-
- # 控制播放速度
- time.sleep(0.02) # 20ms per chunk
-
- self.is_playing = False
- logger.info("✅ 音频播放完成")
- else:
- time.sleep(0.01)
-
- except queue.Empty:
- continue
- except Exception as e:
- logger.error(f"❌ 播放音频出错: {e}")
- self.is_playing = False
- time.sleep(0.1)
|