| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248 |
- ###############################################################################
- # 实时视频播放模块 - 监听 Image_Analysis 文件夹并播放新视频
- ###############################################################################
- import os
- import time
- import cv2
- import numpy as np
- import threading
- import queue
- from pathlib import Path
- from logger import logger
- class RealtimeVideoPlayer:
- """实时监控文件夹并播放新到达的视频"""
-
- def __init__(self, watch_dir, fps=20):
- """
- 初始化视频播放器
-
- Args:
- watch_dir: 监听的文件夹路径
- fps: 帧率(默认20fps)
- """
- self.watch_dir = watch_dir
- self.fps = fps
- self.frame_interval = 1.0 / fps
-
- # 确保监听目录存在
- os.makedirs(watch_dir, exist_ok=True)
-
- # 状态管理
- self.is_running = False
- self.is_playing = False
- self.current_video = None
-
- # 帧队列
- self.frame_queue = queue.Queue(maxsize=100)
- self.audio_queue = queue.Queue(maxsize=100)
-
- # 已处理的文件
- self.processed_files = set()
-
- # 线程控制
- self.watch_thread = None
- self.play_thread = None
- self.stop_event = threading.Event()
-
- # 回调函数
- self.on_frame_callback = None
- self.on_audio_callback = None
-
- logger.info(f"RealtimeVideoPlayer 初始化,监听目录: {watch_dir}")
-
- def set_callbacks(self, on_frame=None, on_audio=None):
- """设置回调函数"""
- self.on_frame_callback = on_frame
- self.on_audio_callback = on_audio
-
- def start(self):
- """启动监听和播放"""
- if self.is_running:
- logger.warning("RealtimeVideoPlayer 已经在运行中")
- return
-
- self.is_running = True
- self.stop_event.clear()
-
- # 启动监听线程
- self.watch_thread = threading.Thread(target=self._watch_loop, daemon=True)
- self.watch_thread.start()
-
- # 启动播放线程
- self.play_thread = threading.Thread(target=self._play_loop, daemon=True)
- self.play_thread.start()
-
- logger.info("RealtimeVideoPlayer 已启动")
-
- def stop(self):
- """停止监听和播放"""
- if not self.is_running:
- return
-
- self.is_running = False
- self.stop_event.set()
-
- # 清空队列
- while not self.frame_queue.empty():
- try:
- self.frame_queue.get_nowait()
- except:
- break
-
- while not self.audio_queue.empty():
- try:
- self.audio_queue.get_nowait()
- except:
- break
-
- if self.watch_thread:
- self.watch_thread.join(timeout=2)
- if self.play_thread:
- self.play_thread.join(timeout=2)
-
- logger.info("RealtimeVideoPlayer 已停止")
-
- def _watch_loop(self):
- """监听文件夹循环"""
- logger.info("开始监听文件夹...")
-
- while not self.stop_event.is_set():
- try:
- # 获取所有视频文件
- if not os.path.exists(self.watch_dir):
- time.sleep(0.5)
- continue
-
- video_files = []
- for ext in ['*.mp4', '*.avi', '*.mov', '*.mkv', '*.flv']:
- video_files.extend(Path(self.watch_dir).glob(ext))
-
- # 过滤出未处理的文件
- new_videos = []
- for video_path in video_files:
- if video_path.name not in self.processed_files:
- # 确保文件已经完全写入
- if self._is_file_ready(video_path):
- new_videos.append(video_path)
-
- # 按修改时间排序,处理最新的
- if new_videos:
- new_videos.sort(key=lambda p: p.stat().st_mtime)
-
- for video_path in new_videos:
- if self.stop_event.is_set():
- break
-
- logger.info(f"📹 发现新视频: {video_path.name}")
- self._play_video(video_path)
- self.processed_files.add(video_path.name)
-
- # 限制已处理文件数量,避免内存占用过大
- if len(self.processed_files) > 1000:
- self.processed_files = set(list(self.processed_files)[-500:])
-
- time.sleep(0.5) # 每0.5秒检查一次
-
- except Exception as e:
- logger.error(f"监听文件夹时出错: {e}")
- time.sleep(1)
-
- logger.info("文件夹监听已停止")
-
- def _is_file_ready(self, file_path):
- """检查文件是否已经完全写入"""
- try:
- # 尝试打开文件,如果能打开说明写入完成
- with open(file_path, 'rb') as f:
- f.seek(0, 2) # 移动到文件末尾
- size = f.tell()
- return size > 0
- except:
- return False
-
- def _play_video(self, video_path):
- """播放单个视频文件"""
- try:
- logger.info(f"🎬 开始播放视频: {video_path.name}")
- self.is_playing = True
- self.current_video = str(video_path)
-
- # 打开视频
- cap = cv2.VideoCapture(str(video_path))
- if not cap.isOpened():
- logger.error(f"无法打开视频: {video_path}")
- self.is_playing = False
- return
-
- # 获取视频信息
- fps = cap.get(cv2.CAP_PROP_FPS)
- total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
- width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
- height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
-
- logger.info(f"视频信息: {width}x{height}, FPS={fps}, 总帧数={total_frames}")
-
- # 读取帧
- frame_count = 0
- while not self.stop_event.is_set():
- ret, frame = cap.read()
- if not ret:
- break
-
- # 将帧放入队列
- if self.frame_queue.full():
- # 如果队列满了,丢弃最旧的帧
- try:
- self.frame_queue.get_nowait()
- except:
- pass
-
- self.frame_queue.put(frame)
- frame_count += 1
-
- # 控制帧率
- time.sleep(self.frame_interval)
-
- cap.release()
- self.is_playing = False
- logger.info(f"✅ 视频播放完成: {video_path.name} ({frame_count} 帧)")
-
- except Exception as e:
- logger.error(f"播放视频时出错: {e}")
- self.is_playing = False
-
- def _play_loop(self):
- """播放帧循环"""
- logger.info("开始播放帧循环...")
-
- while not self.stop_event.is_set():
- try:
- # 从队列获取帧
- try:
- frame = self.frame_queue.get(timeout=0.1)
-
- # 调用回调函数
- if self.on_frame_callback:
- self.on_frame_callback(frame)
-
- except queue.Empty:
- continue
-
- except Exception as e:
- logger.error(f"播放帧时出错: {e}")
- time.sleep(0.1)
-
- logger.info("播放帧循环已停止")
-
- def get_status(self):
- """获取播放器状态"""
- return {
- 'is_running': self.is_running,
- 'is_playing': self.is_playing,
- 'current_video': self.current_video,
- 'queue_size': self.frame_queue.qsize(),
- 'processed_files': len(self.processed_files)
- }
|