vel_test.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. #!/usr/bin/env python3
  2. import time
  3. import threading
  4. import rclpy
  5. from rclpy.node import Node
  6. from rclpy.qos import DurabilityPolicy, QoSProfile, ReliabilityPolicy
  7. from std_msgs.msg import Header, Int32
  8. from interface_protocol.msg import BodyVelCmd
  9. from interface_protocol.msg import MotionStateRequest
  10. from interface_protocol.msg import MotionState
  11. # 定义全局消息
  12. g_vel_msg = BodyVelCmd()
  13. # 初始化 linear_velocity 数组为2个元素,分别对应 x, y 轴速度
  14. g_vel_msg.linear_velocity = [0.0, 0.0]
  15. g_vel_msg.yaw_velocity = 0.0
  16. # 记录开始时间和发送持续时间
  17. g_start_time = None
  18. g_duration = 3.0 # 发送速度持续时间(秒)
  19. class VelocityTestNode(Node):
  20. def __init__(self):
  21. super().__init__("vel_test_node")
  22. self.publisher_ = self.create_publisher(
  23. BodyVelCmd,
  24. "/motion/body_vel_cmd", # 发布速度命令话题
  25. 10,
  26. )
  27. self.state_publisher_ = self.create_publisher(
  28. MotionStateRequest,
  29. "/motion/set_motion_state",# 发布状态命令话题
  30. 10,
  31. )
  32. self.state_sub_ = self.create_subscription(
  33. Int32,
  34. "/state_translate",# 订阅状态命令话题
  35. self.state_callback,
  36. 10,
  37. )
  38. # 创建运动状态订阅者
  39. state_qos = QoSProfile(
  40. depth=1,
  41. reliability=ReliabilityPolicy.BEST_EFFORT,
  42. durability=DurabilityPolicy.VOLATILE,
  43. )
  44. self.motion_state_sub_ = self.create_subscription(
  45. MotionState,
  46. "/motion/motion_state",
  47. self.motion_state_callback,
  48. state_qos,
  49. )
  50. # 内部状态变量
  51. self.current_motion_state = ""
  52. self.available_transition_motions = []
  53. self.timer_period = 0.02
  54. self.timer = self.create_timer(self.timer_period, self.timer_callback)
  55. self.get_logger().info("动作执行节点已启动,正在等待 /state_translate 话题")
  56. def motion_state_callback(self, msg: MotionState):
  57. """
  58. 回调函数,缓存运动状态信息
  59. """
  60. self.current_motion_state = msg.current_motion_task
  61. self.available_transition_motions = msg.available_transition_motions
  62. def state_callback(self, msg):
  63. state_translate = msg.data
  64. self.get_logger().info(f"收到 /state_translate: {state_translate}")
  65. # 在独立线程执行状态操作,防止阻塞主要的回调线程(防止影响 timer 发布速度及由于同时在此使用 spin 等造成死锁)
  66. threading.Thread(target=self.execute_state, args=(state_translate,)).start()
  67. def execute_state(self, state_translate):
  68. if state_translate == 1:
  69. self.get_logger().info("执行状态 1: 起立") #起立
  70. publish_states_sequence(self, ["passive", "pd_sitground", "lower_body_balance"], delay=2.5)
  71. elif state_translate == 2:
  72. self.get_logger().info("执行状态 2: 前进") #前进
  73. set_velocities(vx=0.6, vy=0.0, vyaw=0.0, duration=2.0)
  74. elif state_translate == 3:
  75. self.get_logger().info("执行状态 3: 左转") #左转
  76. set_velocities(vx=0.0, vy=0.0, vyaw=0.6, duration=1.5)
  77. elif state_translate == 4:
  78. self.get_logger().info("执行状态 4: 右转") #右转
  79. set_velocities(vx=0.0, vy=0.0, vyaw=-0.6, duration=1.5)
  80. elif state_translate == 5:
  81. self.get_logger().info("执行状态 5: 左移") #左移
  82. set_velocities(vx=0.0, vy=0.8, vyaw=0.0, duration=2.0)
  83. elif state_translate == 6:
  84. self.get_logger().info("执行状态 6: 右移") #右移
  85. set_velocities(vx=0.0, vy=-0.8, vyaw=0.0, duration=2.0)
  86. elif state_translate == 7:
  87. self.get_logger().info("执行状态 7: 坐下") #坐下
  88. # publish_states_sequence(self, ["rl_amp", "rl_floor_sitting", "passive","idle"], delay=3.5)
  89. publish_states_sequence(self, ["rl_amp", "rl_floor_sitting"], delay=4.0)
  90. elif state_translate == 8:
  91. self.get_logger().info("执行状态 8: 向后转") #后转
  92. set_velocities(vx=0.0, vy=0.0, vyaw=-0.6, duration=3.5)
  93. else:
  94. self.get_logger().info(f"未知状态: {state_translate}")
  95. def timer_callback(self):
  96. global g_vel_msg, g_start_time, g_duration
  97. # 记录第一次发送命令的开始时间
  98. if g_start_time is None:
  99. g_start_time = self.get_clock().now()
  100. # 计算已逝去的时间 (秒)
  101. elapsed_time = (self.get_clock().now() - g_start_time).nanoseconds / 1e9
  102. # 如果超出了目标时间,则将所有的速度设为 0
  103. if elapsed_time > g_duration:
  104. g_vel_msg.linear_velocity = [0.0, 0.0]
  105. g_vel_msg.yaw_velocity = 0.0
  106. g_vel_msg.header = Header()
  107. g_vel_msg.header.stamp = self.get_clock().now().to_msg()
  108. g_vel_msg.header.frame_id = "body"
  109. self.publisher_.publish(g_vel_msg)
  110. def set_velocities(vx=0.0, vy=0.0, vyaw=0.0, duration=3.0):
  111. """单独控制及更新 x, y 线速度和偏航角速度,并定义发送持续时间"""
  112. global g_vel_msg, g_duration, g_start_time
  113. g_vel_msg.linear_velocity[0] = vx
  114. g_vel_msg.linear_velocity[1] = vy
  115. g_vel_msg.yaw_velocity = vyaw
  116. g_duration = duration
  117. g_start_time = None # 重置使得计时器重新开始计时
  118. def publish_states_sequence(node, states, delay=3.0):
  119. for state in states:
  120. # 检查目标运动状态是否可用
  121. if not node.available_transition_motions:
  122. node.get_logger().warn("可用过渡运动列表为空,等待获取运动状态...")
  123. # 等待获取运动状态信息
  124. for i in range(10):
  125. if node.available_transition_motions:
  126. break
  127. time.sleep(0.5)
  128. if not node.available_transition_motions:
  129. node.get_logger().error("无法获取可用过渡运动列表,跳过状态切换")
  130. return
  131. if state not in node.available_transition_motions:
  132. node.get_logger().error(f"运动状态 '{state}' 不在可用过渡列表中,跳过状态切换")
  133. node.get_logger().info(f"当前状态: {node.current_motion_state}")
  134. node.get_logger().info(f"可用过渡: {node.available_transition_motions}")
  135. return
  136. # 发布状态切换请求
  137. msg = MotionStateRequest()
  138. msg.target_motion_name = state
  139. node.state_publisher_.publish(msg)
  140. node.get_logger().info(f"发布状态命令: {state}")
  141. # 因为我们此时在独立线程中,直接 sleep 即可,不会阻塞原节点的 spin() 和 timer 调用
  142. time.sleep(delay)
  143. def main(args=None):
  144. rclpy.init(args=args)
  145. vel_test_node = VelocityTestNode()
  146. try:
  147. vel_test_node.get_logger().info("动作执行节点已启动,正在等待 /state_translate 话题")
  148. rclpy.spin(vel_test_node)
  149. except KeyboardInterrupt:
  150. vel_test_node.get_logger().info("动作执行节点已停止")
  151. finally:
  152. vel_test_node.destroy_node()
  153. if rclpy.ok():
  154. rclpy.shutdown()
  155. if __name__ == "__main__":
  156. main()