| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192 |
- #!/usr/bin/env python3
- import time
- import threading
- import rclpy
- from rclpy.node import Node
- from rclpy.qos import DurabilityPolicy, QoSProfile, ReliabilityPolicy
- from std_msgs.msg import Header, Int32
- from interface_protocol.msg import BodyVelCmd
- from interface_protocol.msg import MotionStateRequest
- from interface_protocol.msg import MotionState
- # 定义全局消息
- g_vel_msg = BodyVelCmd()
- # 初始化 linear_velocity 数组为2个元素,分别对应 x, y 轴速度
- g_vel_msg.linear_velocity = [0.0, 0.0]
- g_vel_msg.yaw_velocity = 0.0
- # 记录开始时间和发送持续时间
- g_start_time = None
- g_duration = 3.0 # 发送速度持续时间(秒)
- class VelocityTestNode(Node):
- def __init__(self):
- super().__init__("vel_test_node")
- self.publisher_ = self.create_publisher(
- BodyVelCmd,
- "/motion/body_vel_cmd", # 发布速度命令话题
- 10,
- )
- self.state_publisher_ = self.create_publisher(
- MotionStateRequest,
- "/motion/set_motion_state",# 发布状态命令话题
- 10,
- )
- self.state_sub_ = self.create_subscription(
- Int32,
- "/state_translate",# 订阅状态命令话题
- self.state_callback,
- 10,
- )
- # 创建运动状态订阅者
- state_qos = QoSProfile(
- depth=1,
- reliability=ReliabilityPolicy.BEST_EFFORT,
- durability=DurabilityPolicy.VOLATILE,
- )
- self.motion_state_sub_ = self.create_subscription(
- MotionState,
- "/motion/motion_state",
- self.motion_state_callback,
- state_qos,
- )
- # 内部状态变量
- self.current_motion_state = ""
- self.available_transition_motions = []
- self.timer_period = 0.02
- self.timer = self.create_timer(self.timer_period, self.timer_callback)
- self.get_logger().info("动作执行节点已启动,正在等待 /state_translate 话题")
- def motion_state_callback(self, msg: MotionState):
- """
- 回调函数,缓存运动状态信息
- """
- self.current_motion_state = msg.current_motion_task
- self.available_transition_motions = msg.available_transition_motions
- def state_callback(self, msg):
- state_translate = msg.data
- self.get_logger().info(f"收到 /state_translate: {state_translate}")
- # 在独立线程执行状态操作,防止阻塞主要的回调线程(防止影响 timer 发布速度及由于同时在此使用 spin 等造成死锁)
- threading.Thread(target=self.execute_state, args=(state_translate,)).start()
- def execute_state(self, state_translate):
- if state_translate == 1:
- self.get_logger().info("执行状态 1: 起立") #起立
- publish_states_sequence(self, ["passive", "pd_sitground", "lower_body_balance"], delay=2.5)
- elif state_translate == 2:
- self.get_logger().info("执行状态 2: 前进") #前进
- set_velocities(vx=0.6, vy=0.0, vyaw=0.0, duration=2.0)
- elif state_translate == 3:
- self.get_logger().info("执行状态 3: 左转") #左转
- set_velocities(vx=0.0, vy=0.0, vyaw=0.6, duration=1.5)
- elif state_translate == 4:
- self.get_logger().info("执行状态 4: 右转") #右转
- set_velocities(vx=0.0, vy=0.0, vyaw=-0.6, duration=1.5)
- elif state_translate == 5:
- self.get_logger().info("执行状态 5: 左移") #左移
- set_velocities(vx=0.0, vy=0.8, vyaw=0.0, duration=2.0)
- elif state_translate == 6:
- self.get_logger().info("执行状态 6: 右移") #右移
- set_velocities(vx=0.0, vy=-0.8, vyaw=0.0, duration=2.0)
- elif state_translate == 7:
- self.get_logger().info("执行状态 7: 坐下") #坐下
- # publish_states_sequence(self, ["rl_amp", "rl_floor_sitting", "passive","idle"], delay=3.5)
- publish_states_sequence(self, ["rl_amp", "rl_floor_sitting"], delay=4.0)
- elif state_translate == 8:
- self.get_logger().info("执行状态 8: 向后转") #后转
- set_velocities(vx=0.0, vy=0.0, vyaw=-0.6, duration=3.5)
- else:
- self.get_logger().info(f"未知状态: {state_translate}")
- def timer_callback(self):
- global g_vel_msg, g_start_time, g_duration
-
- # 记录第一次发送命令的开始时间
- if g_start_time is None:
- g_start_time = self.get_clock().now()
- # 计算已逝去的时间 (秒)
- elapsed_time = (self.get_clock().now() - g_start_time).nanoseconds / 1e9
- # 如果超出了目标时间,则将所有的速度设为 0
- if elapsed_time > g_duration:
- g_vel_msg.linear_velocity = [0.0, 0.0]
- g_vel_msg.yaw_velocity = 0.0
-
- g_vel_msg.header = Header()
- g_vel_msg.header.stamp = self.get_clock().now().to_msg()
- g_vel_msg.header.frame_id = "body"
- self.publisher_.publish(g_vel_msg)
- def set_velocities(vx=0.0, vy=0.0, vyaw=0.0, duration=3.0):
- """单独控制及更新 x, y 线速度和偏航角速度,并定义发送持续时间"""
- global g_vel_msg, g_duration, g_start_time
- g_vel_msg.linear_velocity[0] = vx
- g_vel_msg.linear_velocity[1] = vy
- g_vel_msg.yaw_velocity = vyaw
- g_duration = duration
- g_start_time = None # 重置使得计时器重新开始计时
- def publish_states_sequence(node, states, delay=3.0):
- for state in states:
- # 检查目标运动状态是否可用
- if not node.available_transition_motions:
- node.get_logger().warn("可用过渡运动列表为空,等待获取运动状态...")
- # 等待获取运动状态信息
- for i in range(10):
- if node.available_transition_motions:
- break
- time.sleep(0.5)
- if not node.available_transition_motions:
- node.get_logger().error("无法获取可用过渡运动列表,跳过状态切换")
- return
-
- if state not in node.available_transition_motions:
- node.get_logger().error(f"运动状态 '{state}' 不在可用过渡列表中,跳过状态切换")
- node.get_logger().info(f"当前状态: {node.current_motion_state}")
- node.get_logger().info(f"可用过渡: {node.available_transition_motions}")
- return
-
- # 发布状态切换请求
- msg = MotionStateRequest()
- msg.target_motion_name = state
- node.state_publisher_.publish(msg)
- node.get_logger().info(f"发布状态命令: {state}")
-
- # 因为我们此时在独立线程中,直接 sleep 即可,不会阻塞原节点的 spin() 和 timer 调用
- time.sleep(delay)
- def main(args=None):
- rclpy.init(args=args)
- vel_test_node = VelocityTestNode()
- try:
- vel_test_node.get_logger().info("动作执行节点已启动,正在等待 /state_translate 话题")
- rclpy.spin(vel_test_node)
- except KeyboardInterrupt:
- vel_test_node.get_logger().info("动作执行节点已停止")
- finally:
- vel_test_node.destroy_node()
- if rclpy.ok():
- rclpy.shutdown()
- if __name__ == "__main__":
- main()
|