videoplayer.py 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. ###############################################################################
  2. # 实时视频播放模块 - 监听 Image_Analysis 文件夹并播放新视频
  3. ###############################################################################
  4. import os
  5. import time
  6. import cv2
  7. import numpy as np
  8. import threading
  9. import queue
  10. from pathlib import Path
  11. from logger import logger
  12. class RealtimeVideoPlayer:
  13. """实时监控文件夹并播放新到达的视频"""
  14. def __init__(self, watch_dir, fps=20):
  15. """
  16. 初始化视频播放器
  17. Args:
  18. watch_dir: 监听的文件夹路径
  19. fps: 帧率(默认20fps)
  20. """
  21. self.watch_dir = watch_dir
  22. self.fps = fps
  23. self.frame_interval = 1.0 / fps
  24. # 确保监听目录存在
  25. os.makedirs(watch_dir, exist_ok=True)
  26. # 状态管理
  27. self.is_running = False
  28. self.is_playing = False
  29. self.current_video = None
  30. # 帧队列
  31. self.frame_queue = queue.Queue(maxsize=100)
  32. self.audio_queue = queue.Queue(maxsize=100)
  33. # 已处理的文件
  34. self.processed_files = set()
  35. # 线程控制
  36. self.watch_thread = None
  37. self.play_thread = None
  38. self.stop_event = threading.Event()
  39. # 回调函数
  40. self.on_frame_callback = None
  41. self.on_audio_callback = None
  42. logger.info(f"RealtimeVideoPlayer 初始化,监听目录: {watch_dir}")
  43. def set_callbacks(self, on_frame=None, on_audio=None):
  44. """设置回调函数"""
  45. self.on_frame_callback = on_frame
  46. self.on_audio_callback = on_audio
  47. def start(self):
  48. """启动监听和播放"""
  49. if self.is_running:
  50. logger.warning("RealtimeVideoPlayer 已经在运行中")
  51. return
  52. self.is_running = True
  53. self.stop_event.clear()
  54. # 启动监听线程
  55. self.watch_thread = threading.Thread(target=self._watch_loop, daemon=True)
  56. self.watch_thread.start()
  57. # 启动播放线程
  58. self.play_thread = threading.Thread(target=self._play_loop, daemon=True)
  59. self.play_thread.start()
  60. logger.info("RealtimeVideoPlayer 已启动")
  61. def stop(self):
  62. """停止监听和播放"""
  63. if not self.is_running:
  64. return
  65. self.is_running = False
  66. self.stop_event.set()
  67. # 清空队列
  68. while not self.frame_queue.empty():
  69. try:
  70. self.frame_queue.get_nowait()
  71. except:
  72. break
  73. while not self.audio_queue.empty():
  74. try:
  75. self.audio_queue.get_nowait()
  76. except:
  77. break
  78. if self.watch_thread:
  79. self.watch_thread.join(timeout=2)
  80. if self.play_thread:
  81. self.play_thread.join(timeout=2)
  82. logger.info("RealtimeVideoPlayer 已停止")
  83. def _watch_loop(self):
  84. """监听文件夹循环"""
  85. logger.info("开始监听文件夹...")
  86. while not self.stop_event.is_set():
  87. try:
  88. # 获取所有视频文件
  89. if not os.path.exists(self.watch_dir):
  90. time.sleep(0.5)
  91. continue
  92. video_files = []
  93. for ext in ['*.mp4', '*.avi', '*.mov', '*.mkv', '*.flv']:
  94. video_files.extend(Path(self.watch_dir).glob(ext))
  95. # 过滤出未处理的文件
  96. new_videos = []
  97. for video_path in video_files:
  98. if video_path.name not in self.processed_files:
  99. # 确保文件已经完全写入
  100. if self._is_file_ready(video_path):
  101. new_videos.append(video_path)
  102. # 按修改时间排序,处理最新的
  103. if new_videos:
  104. new_videos.sort(key=lambda p: p.stat().st_mtime)
  105. for video_path in new_videos:
  106. if self.stop_event.is_set():
  107. break
  108. logger.info(f"📹 发现新视频: {video_path.name}")
  109. self._play_video(video_path)
  110. self.processed_files.add(video_path.name)
  111. # 限制已处理文件数量,避免内存占用过大
  112. if len(self.processed_files) > 1000:
  113. self.processed_files = set(list(self.processed_files)[-500:])
  114. time.sleep(0.5) # 每0.5秒检查一次
  115. except Exception as e:
  116. logger.error(f"监听文件夹时出错: {e}")
  117. time.sleep(1)
  118. logger.info("文件夹监听已停止")
  119. def _is_file_ready(self, file_path):
  120. """检查文件是否已经完全写入"""
  121. try:
  122. # 尝试打开文件,如果能打开说明写入完成
  123. with open(file_path, 'rb') as f:
  124. f.seek(0, 2) # 移动到文件末尾
  125. size = f.tell()
  126. return size > 0
  127. except:
  128. return False
  129. def _play_video(self, video_path):
  130. """播放单个视频文件"""
  131. try:
  132. logger.info(f"🎬 开始播放视频: {video_path.name}")
  133. self.is_playing = True
  134. self.current_video = str(video_path)
  135. # 打开视频
  136. cap = cv2.VideoCapture(str(video_path))
  137. if not cap.isOpened():
  138. logger.error(f"无法打开视频: {video_path}")
  139. self.is_playing = False
  140. return
  141. # 获取视频信息
  142. fps = cap.get(cv2.CAP_PROP_FPS)
  143. total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
  144. width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
  145. height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
  146. logger.info(f"视频信息: {width}x{height}, FPS={fps}, 总帧数={total_frames}")
  147. # 读取帧
  148. frame_count = 0
  149. while not self.stop_event.is_set():
  150. ret, frame = cap.read()
  151. if not ret:
  152. break
  153. # 将帧放入队列
  154. if self.frame_queue.full():
  155. # 如果队列满了,丢弃最旧的帧
  156. try:
  157. self.frame_queue.get_nowait()
  158. except:
  159. pass
  160. self.frame_queue.put(frame)
  161. frame_count += 1
  162. # 控制帧率
  163. time.sleep(self.frame_interval)
  164. cap.release()
  165. self.is_playing = False
  166. logger.info(f"✅ 视频播放完成: {video_path.name} ({frame_count} 帧)")
  167. except Exception as e:
  168. logger.error(f"播放视频时出错: {e}")
  169. self.is_playing = False
  170. def _play_loop(self):
  171. """播放帧循环"""
  172. logger.info("开始播放帧循环...")
  173. while not self.stop_event.is_set():
  174. try:
  175. # 从队列获取帧
  176. try:
  177. frame = self.frame_queue.get(timeout=0.1)
  178. # 调用回调函数
  179. if self.on_frame_callback:
  180. self.on_frame_callback(frame)
  181. except queue.Empty:
  182. continue
  183. except Exception as e:
  184. logger.error(f"播放帧时出错: {e}")
  185. time.sleep(0.1)
  186. logger.info("播放帧循环已停止")
  187. def get_status(self):
  188. """获取播放器状态"""
  189. return {
  190. 'is_running': self.is_running,
  191. 'is_playing': self.is_playing,
  192. 'current_video': self.current_video,
  193. 'queue_size': self.frame_queue.qsize(),
  194. 'processed_files': len(self.processed_files)
  195. }