############################################################################### # 实时视频播放模块 - 监听 Image_Analysis 文件夹并播放新视频 ############################################################################### import os import time import cv2 import numpy as np import threading import queue from pathlib import Path from logger import logger class RealtimeVideoPlayer: """实时监控文件夹并播放新到达的视频""" def __init__(self, watch_dir, fps=20): """ 初始化视频播放器 Args: watch_dir: 监听的文件夹路径 fps: 帧率(默认20fps) """ self.watch_dir = watch_dir self.fps = fps self.frame_interval = 1.0 / fps # 确保监听目录存在 os.makedirs(watch_dir, exist_ok=True) # 状态管理 self.is_running = False self.is_playing = False self.current_video = None # 帧队列 self.frame_queue = queue.Queue(maxsize=100) self.audio_queue = queue.Queue(maxsize=100) # 已处理的文件 self.processed_files = set() # 线程控制 self.watch_thread = None self.play_thread = None self.stop_event = threading.Event() # 回调函数 self.on_frame_callback = None self.on_audio_callback = None logger.info(f"RealtimeVideoPlayer 初始化,监听目录: {watch_dir}") def set_callbacks(self, on_frame=None, on_audio=None): """设置回调函数""" self.on_frame_callback = on_frame self.on_audio_callback = on_audio def start(self): """启动监听和播放""" if self.is_running: logger.warning("RealtimeVideoPlayer 已经在运行中") return self.is_running = True self.stop_event.clear() # 启动监听线程 self.watch_thread = threading.Thread(target=self._watch_loop, daemon=True) self.watch_thread.start() # 启动播放线程 self.play_thread = threading.Thread(target=self._play_loop, daemon=True) self.play_thread.start() logger.info("RealtimeVideoPlayer 已启动") def stop(self): """停止监听和播放""" if not self.is_running: return self.is_running = False self.stop_event.set() # 清空队列 while not self.frame_queue.empty(): try: self.frame_queue.get_nowait() except: break while not self.audio_queue.empty(): try: self.audio_queue.get_nowait() except: break if self.watch_thread: self.watch_thread.join(timeout=2) if self.play_thread: self.play_thread.join(timeout=2) logger.info("RealtimeVideoPlayer 已停止") def _watch_loop(self): """监听文件夹循环""" logger.info("开始监听文件夹...") while not self.stop_event.is_set(): try: # 获取所有视频文件 if not os.path.exists(self.watch_dir): time.sleep(0.5) continue video_files = [] for ext in ['*.mp4', '*.avi', '*.mov', '*.mkv', '*.flv']: video_files.extend(Path(self.watch_dir).glob(ext)) # 过滤出未处理的文件 new_videos = [] for video_path in video_files: if video_path.name not in self.processed_files: # 确保文件已经完全写入 if self._is_file_ready(video_path): new_videos.append(video_path) # 按修改时间排序,处理最新的 if new_videos: new_videos.sort(key=lambda p: p.stat().st_mtime) for video_path in new_videos: if self.stop_event.is_set(): break logger.info(f"📹 发现新视频: {video_path.name}") self._play_video(video_path) self.processed_files.add(video_path.name) # 限制已处理文件数量,避免内存占用过大 if len(self.processed_files) > 1000: self.processed_files = set(list(self.processed_files)[-500:]) time.sleep(0.5) # 每0.5秒检查一次 except Exception as e: logger.error(f"监听文件夹时出错: {e}") time.sleep(1) logger.info("文件夹监听已停止") def _is_file_ready(self, file_path): """检查文件是否已经完全写入""" try: # 尝试打开文件,如果能打开说明写入完成 with open(file_path, 'rb') as f: f.seek(0, 2) # 移动到文件末尾 size = f.tell() return size > 0 except: return False def _play_video(self, video_path): """播放单个视频文件""" try: logger.info(f"🎬 开始播放视频: {video_path.name}") self.is_playing = True self.current_video = str(video_path) # 打开视频 cap = cv2.VideoCapture(str(video_path)) if not cap.isOpened(): logger.error(f"无法打开视频: {video_path}") self.is_playing = False return # 获取视频信息 fps = cap.get(cv2.CAP_PROP_FPS) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) logger.info(f"视频信息: {width}x{height}, FPS={fps}, 总帧数={total_frames}") # 读取帧 frame_count = 0 while not self.stop_event.is_set(): ret, frame = cap.read() if not ret: break # 将帧放入队列 if self.frame_queue.full(): # 如果队列满了,丢弃最旧的帧 try: self.frame_queue.get_nowait() except: pass self.frame_queue.put(frame) frame_count += 1 # 控制帧率 time.sleep(self.frame_interval) cap.release() self.is_playing = False logger.info(f"✅ 视频播放完成: {video_path.name} ({frame_count} 帧)") except Exception as e: logger.error(f"播放视频时出错: {e}") self.is_playing = False def _play_loop(self): """播放帧循环""" logger.info("开始播放帧循环...") while not self.stop_event.is_set(): try: # 从队列获取帧 try: frame = self.frame_queue.get(timeout=0.1) # 调用回调函数 if self.on_frame_callback: self.on_frame_callback(frame) except queue.Empty: continue except Exception as e: logger.error(f"播放帧时出错: {e}") time.sleep(0.1) logger.info("播放帧循环已停止") def get_status(self): """获取播放器状态""" return { 'is_running': self.is_running, 'is_playing': self.is_playing, 'current_video': self.current_video, 'queue_size': self.frame_queue.qsize(), 'processed_files': len(self.processed_files) }