webrtc.py 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235
  1. ###############################################################################
  2. # Copyright (C) 2024 LiveTalking@lipku https://github.com/lipku/LiveTalking
  3. # email: lipku@foxmail.com
  4. #
  5. # Licensed under the Apache License, Version 2.0 (the "License");
  6. # you may not use this file except in compliance with the License.
  7. # You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. ###############################################################################
  17. import asyncio
  18. import json
  19. import logging
  20. import threading
  21. import time
  22. from typing import Tuple, Dict, Optional, Set, Union
  23. from av.frame import Frame
  24. from av.packet import Packet
  25. from av import AudioFrame
  26. import fractions
  27. import numpy as np
  28. AUDIO_PTIME = 0.020 # 20ms audio packetization
  29. VIDEO_CLOCK_RATE = 90000
  30. VIDEO_PTIME = 0.040 #1 / 25 # 30fps
  31. VIDEO_TIME_BASE = fractions.Fraction(1, VIDEO_CLOCK_RATE)
  32. SAMPLE_RATE = 16000
  33. AUDIO_TIME_BASE = fractions.Fraction(1, SAMPLE_RATE)
  34. #from aiortc.contrib.media import MediaPlayer, MediaRelay
  35. #from aiortc.rtcrtpsender import RTCRtpSender
  36. from aiortc import (
  37. MediaStreamTrack,
  38. )
  39. logging.basicConfig()
  40. logger = logging.getLogger(__name__)
  41. from logger import logger as mylogger
  42. class PlayerStreamTrack(MediaStreamTrack):
  43. """
  44. A video track that returns an animated flag.
  45. """
  46. def __init__(self, player, kind):
  47. super().__init__() # don't forget this!
  48. self.kind = kind
  49. self._player = player
  50. self._queue = asyncio.Queue(maxsize=100)
  51. self.timelist = [] #记录最近包的时间戳
  52. self.current_frame_count = 0
  53. if self.kind == 'video':
  54. self.framecount = 0
  55. self.lasttime = time.perf_counter()
  56. self.totaltime = 0
  57. _start: float
  58. _timestamp: int
  59. async def next_timestamp(self) -> Tuple[int, fractions.Fraction]:
  60. if self.readyState != "live":
  61. raise Exception
  62. if self.kind == 'video':
  63. if hasattr(self, "_timestamp"):
  64. #self._timestamp = (time.time()-self._start) * VIDEO_CLOCK_RATE
  65. self._timestamp += int(VIDEO_PTIME * VIDEO_CLOCK_RATE)
  66. self.current_frame_count += 1
  67. wait = self._start + self.current_frame_count * VIDEO_PTIME - time.time()
  68. # wait = self.timelist[0] + len(self.timelist)*VIDEO_PTIME - time.time()
  69. if wait>0:
  70. await asyncio.sleep(wait)
  71. # if len(self.timelist)>=100:
  72. # self.timelist.pop(0)
  73. # self.timelist.append(time.time())
  74. else:
  75. self._start = time.time()
  76. self._timestamp = 0
  77. self.timelist.append(self._start)
  78. mylogger.info('video start:%f',self._start)
  79. return self._timestamp, VIDEO_TIME_BASE
  80. else: #audio
  81. if hasattr(self, "_timestamp"):
  82. #self._timestamp = (time.time()-self._start) * SAMPLE_RATE
  83. self._timestamp += int(AUDIO_PTIME * SAMPLE_RATE)
  84. self.current_frame_count += 1
  85. wait = self._start + self.current_frame_count * AUDIO_PTIME - time.time()
  86. # wait = self.timelist[0] + len(self.timelist)*AUDIO_PTIME - time.time()
  87. if wait>0:
  88. await asyncio.sleep(wait)
  89. # if len(self.timelist)>=200:
  90. # self.timelist.pop(0)
  91. # self.timelist.pop(0)
  92. # self.timelist.append(time.time())
  93. else:
  94. self._start = time.time()
  95. self._timestamp = 0
  96. self.timelist.append(self._start)
  97. mylogger.info('audio start:%f',self._start)
  98. return self._timestamp, AUDIO_TIME_BASE
  99. async def recv(self) -> Union[Frame, Packet]:
  100. # frame = self.frames[self.counter % 30]
  101. self._player._start(self)
  102. # if self.kind == 'video':
  103. # frame = await self._queue.get()
  104. # else: #audio
  105. # if hasattr(self, "_timestamp"):
  106. # wait = self._start + self._timestamp / SAMPLE_RATE + AUDIO_PTIME - time.time()
  107. # if wait>0:
  108. # await asyncio.sleep(wait)
  109. # if self._queue.qsize()<1:
  110. # #frame = AudioFrame(format='s16', layout='mono', samples=320)
  111. # audio = np.zeros((1, 320), dtype=np.int16)
  112. # frame = AudioFrame.from_ndarray(audio, layout='mono', format='s16')
  113. # frame.sample_rate=16000
  114. # else:
  115. # frame = await self._queue.get()
  116. # else:
  117. # frame = await self._queue.get()
  118. frame,eventpoint = await self._queue.get()
  119. pts, time_base = await self.next_timestamp()
  120. frame.pts = pts
  121. frame.time_base = time_base
  122. if eventpoint and self._player is not None:
  123. self._player.notify(eventpoint)
  124. if frame is None:
  125. self.stop()
  126. raise Exception
  127. if self.kind == 'video':
  128. self.totaltime += (time.perf_counter() - self.lasttime)
  129. self.framecount += 1
  130. self.lasttime = time.perf_counter()
  131. if self.framecount==100:
  132. mylogger.info(f"------actual avg final fps:{self.framecount/self.totaltime:.4f}")
  133. self.framecount = 0
  134. self.totaltime=0
  135. return frame
  136. def stop(self):
  137. super().stop()
  138. # Drain & delete remaining frames
  139. while not self._queue.empty():
  140. item = self._queue.get_nowait()
  141. del item
  142. if self._player is not None:
  143. self._player._stop(self)
  144. self._player = None
  145. def player_worker_thread(
  146. quit_event,
  147. loop,
  148. container,
  149. audio_track,
  150. video_track
  151. ):
  152. container.render(quit_event,loop,audio_track,video_track)
  153. class HumanPlayer:
  154. def __init__(
  155. self, nerfreal, format=None, options=None, timeout=None, loop=False, decode=True
  156. ):
  157. self.__thread: Optional[threading.Thread] = None
  158. self.__thread_quit: Optional[threading.Event] = None
  159. # examine streams
  160. self.__started: Set[PlayerStreamTrack] = set()
  161. self.__audio: Optional[PlayerStreamTrack] = None
  162. self.__video: Optional[PlayerStreamTrack] = None
  163. self.__audio = PlayerStreamTrack(self, kind="audio")
  164. self.__video = PlayerStreamTrack(self, kind="video")
  165. self.__container = nerfreal
  166. def notify(self,eventpoint):
  167. if self.__container is not None:
  168. self.__container.notify(eventpoint)
  169. @property
  170. def audio(self) -> MediaStreamTrack:
  171. """
  172. A :class:`aiortc.MediaStreamTrack` instance if the file contains audio.
  173. """
  174. return self.__audio
  175. @property
  176. def video(self) -> MediaStreamTrack:
  177. """
  178. A :class:`aiortc.MediaStreamTrack` instance if the file contains video.
  179. """
  180. return self.__video
  181. def _start(self, track: PlayerStreamTrack) -> None:
  182. self.__started.add(track)
  183. if self.__thread is None:
  184. self.__log_debug("Starting worker thread")
  185. self.__thread_quit = threading.Event()
  186. self.__thread = threading.Thread(
  187. name="media-player",
  188. target=player_worker_thread,
  189. args=(
  190. self.__thread_quit,
  191. asyncio.get_event_loop(),
  192. self.__container,
  193. self.__audio,
  194. self.__video
  195. ),
  196. )
  197. self.__thread.start()
  198. def _stop(self, track: PlayerStreamTrack) -> None:
  199. self.__started.discard(track)
  200. if not self.__started and self.__thread is not None:
  201. self.__log_debug("Stopping worker thread")
  202. self.__thread_quit.set()
  203. self.__thread.join()
  204. self.__thread = None
  205. if not self.__started and self.__container is not None:
  206. #self.__container.close()
  207. self.__container = None
  208. def __log_debug(self, msg: str, *args) -> None:
  209. mylogger.debug(f"HumanPlayer {msg}", *args)