赞
踩
这个程序是实现对无人机控制的主要程序,需要重点分析。可以实现对无人机的位置,速度,加速度的控制。
主要节点:
sys.argv[1]+'_'+sys.argv[2]+"_communication"
#根据传入参数进行设置。
话题订阅:
/"self.vehicle_type+'_'+self.vehicle_id+"/mavros/local_position/pose(geometry_msgs/PoseStamped)
# 订阅mavros发布的位姿
/xtdrone/"+self.vehicle_type+'_'+self.vehicle_id+"/cmd(std_msgs/String)
# 订阅外部程序发布的状态控制命令
/xtdrone/"+self.vehicle_type+'_'+self.vehicle_id+"/cmd_pose_flu(geometry_msgs/Pose)
/xtdrone/"+self.vehicle_type+'_'+self.vehicle_id+"/cmd_pose_enu(geometry_msgs/Pose)
# 订阅外部程序发布的位置控制命令
/xtdrone/"+self.vehicle_type+'_'+self.vehicle_id+"/cmd_vel_flu(geometry_msgs/Twist)
/xtdrone/"+self.vehicle_type+'_'+self.vehicle_id+"/cmd_vel_enu(geometry_msgs/Twist)
# 订阅外部程序发布的速度控制命令
/xtdrone/"+self.vehicle_type+'_'+self.vehicle_id+"/cmd_accel_flu(geometry_msgs/Twist)
/xtdrone/"+self.vehicle_type+'_'+self.vehicle_id+"/cmd_accel_enu(geometry_msgs/Twist)
# 订阅外部程序发布的加速度控制命令
话题发布:
self.vehicle_type+'_'+self.vehicle_id+"/mavros/setpoint_raw/local(mavros_msgs/PositionTarget)
# 向mavros发布计算后的目标指令
服务:
self.vehicle_type+'_'+self.vehicle_id+"/mavros/cmd/arming(mavros_msgs/CommandBool)
# 从mavros获取无人机是否上锁
self.vehicle_type+'_'+self.vehicle_id+"/mavros/set_mode(mavros_msgs/SetMode)
# 从mavros获取无人机飞行模式
全部代码:
import rospy from mavros_msgs.msg import PositionTarget from mavros_msgs.srv import CommandBool, SetMode from geometry_msgs.msg import PoseStamped, Pose, Twist from std_msgs.msg import String from pyquaternion import Quaternion import sys rospy.init_node(sys.argv[1]+'_'+sys.argv[2]+"_communication") rate = rospy.Rate(30) class Communication: def __init__(self, vehicle_type, vehicle_id): self.vehicle_type = vehicle_type self.vehicle_id = vehicle_id self.current_position = None self.current_yaw = 0 self.hover_flag = 0 self.target_motion = PositionTarget() self.arm_state = False self.motion_type = 0 self.flight_mode = None self.mission = None self.last_cmd = None ''' ros subscribers ''' self.local_pose_sub = rospy.Subscriber(self.vehicle_type+'_'+self.vehicle_id+"/mavros/local_position/pose", PoseStamped, self.local_pose_callback,queue_size=1) self.cmd_sub = rospy.Subscriber("/xtdrone/"+self.vehicle_type+'_'+self.vehicle_id+"/cmd",String,self.cmd_callback,queue_size=3) self.cmd_pose_flu_sub = rospy.Subscriber("/xtdrone/"+self.vehicle_type+'_'+self.vehicle_id+"/cmd_pose_flu", Pose, self.cmd_pose_flu_callback,queue_size=1) self.cmd_pose_enu_sub = rospy.Subscriber("/xtdrone/"+self.vehicle_type+'_'+self.vehicle_id+"/cmd_pose_enu", Pose, self.cmd_pose_enu_callback,queue_size=1) self.cmd_vel_flu_sub = rospy.Subscriber("/xtdrone/"+self.vehicle_type+'_'+self.vehicle_id+"/cmd_vel_flu", Twist, self.cmd_vel_flu_callback,queue_size=1) self.cmd_vel_enu_sub = rospy.Subscriber("/xtdrone/"+self.vehicle_type+'_'+self.vehicle_id+"/cmd_vel_enu", Twist, self.cmd_vel_enu_callback,queue_size=1) self.cmd_accel_flu_sub = rospy.Subscriber("/xtdrone/"+self.vehicle_type+'_'+self.vehicle_id+"/cmd_accel_flu", Twist, self.cmd_accel_flu_callback,queue_size=1) self.cmd_accel_enu_sub = rospy.Subscriber("/xtdrone/"+self.vehicle_type+'_'+self.vehicle_id+"/cmd_accel_enu", Twist, self.cmd_accel_enu_callback,queue_size=1) ''' ros publishers ''' self.target_motion_pub = rospy.Publisher(self.vehicle_type+'_'+self.vehicle_id+"/mavros/setpoint_raw/local", PositionTarget, queue_size=1) ''' ros services ''' self.armService = rospy.ServiceProxy(self.vehicle_type+'_'+self.vehicle_id+"/mavros/cmd/arming", CommandBool) self.flightModeService = rospy.ServiceProxy(self.vehicle_type+'_'+self.vehicle_id+"/mavros/set_mode", SetMode) print(self.vehicle_type+'_'+self.vehicle_id+": "+"communication initialized") def start(self): ''' main ROS thread ''' while not rospy.is_shutdown(): self.target_motion_pub.publish(self.target_motion) rate.sleep() def local_pose_callback(self, msg): self.current_position = msg.pose.position self.current_yaw = self.q2yaw(msg.pose.orientation) def construct_target(self, x=0, y=0, z=0, vx=0, vy=0, vz=0, afx=0, afy=0, afz=0, yaw=0, yaw_rate=0): target_raw_pose = PositionTarget() target_raw_pose.coordinate_frame = self.coordinate_frame target_raw_pose.position.x = x target_raw_pose.position.y = y target_raw_pose.position.z = z target_raw_pose.velocity.x = vx target_raw_pose.velocity.y = vy target_raw_pose.velocity.z = vz target_raw_pose.acceleration_or_force.x = afx target_raw_pose.acceleration_or_force.y = afy target_raw_pose.acceleration_or_force.z = afz target_raw_pose.yaw = yaw target_raw_pose.yaw_rate = yaw_rate if(self.motion_type == 0): target_raw_pose.type_mask = PositionTarget.IGNORE_VX + PositionTarget.IGNORE_VY + PositionTarget.IGNORE_VZ \ + PositionTarget.IGNORE_AFX + PositionTarget.IGNORE_AFY + PositionTarget.IGNORE_AFZ \ + PositionTarget.IGNORE_YAW_RATE if(self.motion_type == 1): target_raw_pose.type_mask = PositionTarget.IGNORE_PX + PositionTarget.IGNORE_PY + PositionTarget.IGNORE_PZ \ + PositionTarget.IGNORE_AFX + PositionTarget.IGNORE_AFY + PositionTarget.IGNORE_AFZ \ + PositionTarget.IGNORE_YAW if(self.motion_type == 2): target_raw_pose.type_mask = PositionTarget.IGNORE_PX + PositionTarget.IGNORE_PY + PositionTarget.IGNORE_PZ \ + PositionTarget.IGNORE_VX + PositionTarget.IGNORE_VY + PositionTarget.IGNORE_VZ \ + PositionTarget.IGNORE_YAW return target_raw_pose def cmd_pose_flu_callback(self, msg): self.coordinate_frame = 9 self.motion_type = 0 yaw = self.q2yaw(msg.orientation) self.target_motion = self.construct_target(x=msg.position.x,y=msg.position.y,z=msg.position.z,yaw=yaw) def cmd_pose_enu_callback(self, msg): self.coordinate_frame = 1 self.motion_type = 0 yaw = self.q2yaw(msg.orientation) self.target_motion = self.construct_target(x=msg.position.x,y=msg.position.y,z=msg.position.z,yaw=yaw) def cmd_vel_flu_callback(self, msg): self.hover_state_transition(msg.linear.x, msg.linear.y, msg.linear.z, msg.angular.z) if self.hover_flag == 0: self.coordinate_frame = 8 self.motion_type = 1 self.target_motion = self.construct_target(vx=msg.linear.x,vy=msg.linear.y,vz=msg.linear.z,yaw_rate=msg.angular.z) def cmd_vel_enu_callback(self, msg): self.hover_state_transition(msg.linear.x, msg.linear.y, msg.linear.z, msg.angular.z) if self.hover_flag == 0: self.coordinate_frame = 1 self.motion_type = 1 self.target_motion = self.construct_target(vx=msg.linear.x,vy=msg.linear.y,vz=msg.linear.z,yaw_rate=msg.angular.z) def cmd_accel_flu_callback(self, msg): self.hover_state_transition(msg.linear.x, msg.linear.y, msg.linear.z, msg.angular.z) if self.hover_flag == 0: self.coordinate_frame = 8 self.motion_type = 2 self.target_motion = self.construct_target(ax=msg.linear.x,ay=msg.linear.y,az=msg.linear.z,yaw_rate=msg.angular.z) def cmd_accel_enu_callback(self, msg): self.hover_state_transition(msg.linear.x, msg.linear.y, msg.linear.z, msg.angular.z) if self.hover_flag == 0: self.coordinate_frame = 1 self.motion_type = 2 self.target_motion = self.construct_target(ax=msg.linear.x,ay=msg.linear.y,az=msg.linear.z,yaw_rate=msg.angular.z) def hover_state_transition(self,x,y,z,w): if abs(x) > 0.02 or abs(y) > 0.02 or abs(z) > 0.02 or abs(w) > 0.005: self.hover_flag = 0 self.flight_mode = 'OFFBOARD' elif not self.flight_mode == "HOVER": self.hover_flag = 1 self.flight_mode = 'HOVER' self.hover() def cmd_callback(self, msg): if msg.data == self.last_cmd or msg.data == '' or msg.data == 'stop controlling': return elif msg.data == 'ARM': self.arm_state =self.arm() print(self.vehicle_type+'_'+self.vehicle_id+": Armed "+str(self.arm_state)) elif msg.data == 'DISARM': self.arm_state = not self.disarm() print(self.vehicle_type+'_'+self.vehicle_id+": Armed "+str(self.arm_state)) elif msg.data[:-1] == "mission" and not msg.data == self.mission: self.mission = msg.data print(self.vehicle_type+'_'+self.vehicle_id+": "+msg.data) else: self.flight_mode = msg.data self.flight_mode_switch() self.last_cmd = msg.data def q2yaw(self, q): if isinstance(q, Quaternion): rotate_z_rad = q.yaw_pitch_roll[0] else: q_ = Quaternion(q.w, q.x, q.y, q.z) rotate_z_rad = q_.yaw_pitch_roll[0] return rotate_z_rad def arm(self): if self.armService(True): return True else: print(self.vehicle_type+'_'+self.vehicle_id+": arming failed!") return False def disarm(self): if self.armService(False): return True else: print(self.vehicle_type+'_'+self.vehicle_id+": disarming failed!") return False def hover(self): self.coordinate_frame = 1 self.motion_type = 0 self.target_motion = self.construct_target(x=self.current_position.x,y=self.current_position.y,z=self.current_position.z,yaw=self.current_yaw) print(self.vehicle_type+'_'+self.vehicle_id+":"+self.flight_mode) def flight_mode_switch(self): if self.flight_mode == 'HOVER': self.hover_flag = 1 self.hover() elif self.flightModeService(custom_mode=self.flight_mode): print(self.vehicle_type+'_'+self.vehicle_id+": "+self.flight_mode) return True else: print(self.vehicle_type+'_'+self.vehicle_id+": "+self.flight_mode+"failed") return False if __name__ == '__main__': communication = Communication(sys.argv[1],sys.argv[2]) communication.start()
import rospy
from mavros_msgs.msg import PositionTarget
from mavros_msgs.srv import CommandBool, SetMode
from geometry_msgs.msg import PoseStamped, Pose, Twist
from std_msgs.msg import String
from pyquaternion import Quaternion
import sys
rospy.init_node(sys.argv[1]+'_'+sys.argv[2]+"_communication")
rate = rospy.Rate(30)
传入的参数作为前缀区分多机时的通讯节点
def __init__(self, vehicle_type, vehicle_id): self.vehicle_type = vehicle_type self.vehicle_id = vehicle_id self.current_position = None self.current_yaw = 0 self.hover_flag = 0 self.target_motion = PositionTarget() self.arm_state = False self.motion_type = 0 self.flight_mode = None self.mission = None self.last_cmd = None ''' ros subscribers ''' self.local_pose_sub = rospy.Subscriber(self.vehicle_type+'_'+self.vehicle_id+"/mavros/local_position/pose", PoseStamped, self.local_pose_callback,queue_size=1) self.cmd_sub = rospy.Subscriber("/xtdrone/"+self.vehicle_type+'_'+self.vehicle_id+"/cmd",String,self.cmd_callback,queue_size=3) self.cmd_pose_flu_sub = rospy.Subscriber("/xtdrone/"+self.vehicle_type+'_'+self.vehicle_id+"/cmd_pose_flu", Pose, self.cmd_pose_flu_callback,queue_size=1) self.cmd_pose_enu_sub = rospy.Subscriber("/xtdrone/"+self.vehicle_type+'_'+self.vehicle_id+"/cmd_pose_enu", Pose, self.cmd_pose_enu_callback,queue_size=1) self.cmd_vel_flu_sub = rospy.Subscriber("/xtdrone/"+self.vehicle_type+'_'+self.vehicle_id+"/cmd_vel_flu", Twist, self.cmd_vel_flu_callback,queue_size=1) self.cmd_vel_enu_sub = rospy.Subscriber("/xtdrone/"+self.vehicle_type+'_'+self.vehicle_id+"/cmd_vel_enu", Twist, self.cmd_vel_enu_callback,queue_size=1) self.cmd_accel_flu_sub = rospy.Subscriber("/xtdrone/"+self.vehicle_type+'_'+self.vehicle_id+"/cmd_accel_flu", Twist, self.cmd_accel_flu_callback,queue_size=1) self.cmd_accel_enu_sub = rospy.Subscriber("/xtdrone/"+self.vehicle_type+'_'+self.vehicle_id+"/cmd_accel_enu", Twist, self.cmd_accel_enu_callback,queue_size=1) ''' ros publishers ''' self.target_motion_pub = rospy.Publisher(self.vehicle_type+'_'+self.vehicle_id+"/mavros/setpoint_raw/local", PositionTarget, queue_size=1) ''' ros services ''' self.armService = rospy.ServiceProxy(self.vehicle_type+'_'+self.vehicle_id+"/mavros/cmd/arming", CommandBool) self.flightModeService = rospy.ServiceProxy(self.vehicle_type+'_'+self.vehicle_id+"/mavros/set_mode", SetMode) print(self.vehicle_type+'_'+self.vehicle_id+": "+"communication initialized")
def local_pose_callback(self, msg):
self.current_position = msg.pose.position
self.current_yaw = self.q2yaw(msg.pose.orientation)
运用q2yaw函数进行变换,将四元数变为欧拉角。
def cmd_pose_flu_callback(self, msg): self.coordinate_frame = 9 self.motion_type = 0 yaw = self.q2yaw(msg.orientation) self.target_motion = self.construct_target(x=msg.position.x,y=msg.position.y,z=msg.position.z,yaw=yaw) def cmd_pose_enu_callback(self, msg): self.coordinate_frame = 1 self.motion_type = 0 yaw = self.q2yaw(msg.orientation) self.target_motion = self.construct_target(x=msg.position.x,y=msg.position.y,z=msg.position.z,yaw=yaw) def cmd_vel_flu_callback(self, msg): self.hover_state_transition(msg.linear.x, msg.linear.y, msg.linear.z, msg.angular.z) if self.hover_flag == 0: self.coordinate_frame = 8 self.motion_type = 1 self.target_motion = self.construct_target(vx=msg.linear.x,vy=msg.linear.y,vz=msg.linear.z,yaw_rate=msg.angular.z) def cmd_vel_enu_callback(self, msg): self.hover_state_transition(msg.linear.x, msg.linear.y, msg.linear.z, msg.angular.z) if self.hover_flag == 0: self.coordinate_frame = 1 self.motion_type = 1 self.target_motion = self.construct_target(vx=msg.linear.x,vy=msg.linear.y,vz=msg.linear.z,yaw_rate=msg.angular.z) def cmd_accel_flu_callback(self, msg): self.hover_state_transition(msg.linear.x, msg.linear.y, msg.linear.z, msg.angular.z) if self.hover_flag == 0: self.coordinate_frame = 8 self.motion_type = 2 self.target_motion = self.construct_target(vx=msg.linear.x,vy=msg.linear.y,vz=msg.linear.z,yaw_rate=msg.angular.z) def cmd_accel_enu_callback(self, msg): self.hover_state_transition(msg.linear.x, msg.linear.y, msg.linear.z, msg.angular.z) if self.hover_flag == 0: self.coordinate_frame = 1 self.motion_type = 2 self.target_motion = self.construct_target(ax=msg.linear.x,ay=msg.linear.y,az=msg.linear.z,yaw_rate=msg.angular.z)
以上这几个回调函数主要用到两个函数hover_state_transition函数(悬停状态转换)和construct_target函数(构建目标),首先解释hover_state_transition函数
def hover_state_transition(self,x,y,z,w):
if abs(x) > 0.02 or abs(y) > 0.02 or abs(z) > 0.02 or abs(w) > 0.005:
self.hover_flag = 0
self.flight_mode = 'OFFBOARD'
elif not self.flight_mode == "HOVER":
self.hover_flag = 1
self.flight_mode = 'HOVER'
self.hover()
这个函数用于悬停状态的转换,当变量x、y、z绝对值大于0.02或w绝对值大于0.005时,转换为OFFBOARD离线模式,当不小于于此值时设置为HOVER悬停模式。
接下来解释construct_target函数
def construct_target(self, x=0, y=0, z=0, vx=0, vy=0, vz=0, afx=0, afy=0, afz=0, yaw=0, yaw_rate=0): target_raw_pose = PositionTarget() target_raw_pose.coordinate_frame = self.coordinate_frame target_raw_pose.position.x = x target_raw_pose.position.y = y target_raw_pose.position.z = z target_raw_pose.velocity.x = vx target_raw_pose.velocity.y = vy target_raw_pose.velocity.z = vz target_raw_pose.acceleration_or_force.x = afx target_raw_pose.acceleration_or_force.y = afy target_raw_pose.acceleration_or_force.z = afz target_raw_pose.yaw = yaw target_raw_pose.yaw_rate = yaw_rate if(self.motion_type == 0): target_raw_pose.type_mask = PositionTarget.IGNORE_VX + PositionTarget.IGNORE_VY + PositionTarget.IGNORE_VZ \ + PositionTarget.IGNORE_AFX + PositionTarget.IGNORE_AFY + PositionTarget.IGNORE_AFZ \ + PositionTarget.IGNORE_YAW_RATE if(self.motion_type == 1): target_raw_pose.type_mask = PositionTarget.IGNORE_PX + PositionTarget.IGNORE_PY + PositionTarget.IGNORE_PZ \ + PositionTarget.IGNORE_AFX + PositionTarget.IGNORE_AFY + PositionTarget.IGNORE_AFZ \ + PositionTarget.IGNORE_YAW if(self.motion_type == 2): target_raw_pose.type_mask = PositionTarget.IGNORE_PX + PositionTarget.IGNORE_PY + PositionTarget.IGNORE_PZ \ + PositionTarget.IGNORE_VX + PositionTarget.IGNORE_VY + PositionTarget.IGNORE_VZ \ + PositionTarget.IGNORE_YAW return target_raw_pose
主要通过此函数将订阅的命令话题数据转化为发布话题数据。了解此函数需先认识PositionTarget消息类型
# Message for SET_POSITION_TARGET_LOCAL_NED # # Some complex system requires all feautures that mavlink # message provide. See issue #402. std_msgs/Header header uint8 coordinate_frame #坐标系选择 uint8 FRAME_LOCAL_NED = 1 uint8 FRAME_LOCAL_OFFSET_NED = 7 uint8 FRAME_BODY_NED = 8 uint8 FRAME_BODY_OFFSET_NED = 9 uint16 type_mask #表示要控制变量的忽略码字。 uint16 IGNORE_PX = 1 # Position ignore flags uint16 IGNORE_PY = 2 uint16 IGNORE_PZ = 4 uint16 IGNORE_VX = 8 # Velocity vector ignore flags uint16 IGNORE_VY = 16 uint16 IGNORE_VZ = 32 uint16 IGNORE_AFX = 64 # Acceleration/Force vector ignore flags uint16 IGNORE_AFY = 128 uint16 IGNORE_AFZ = 256 uint16 FORCE = 512 # Force in af vector flag uint16 IGNORE_YAW = 1024 uint16 IGNORE_YAW_RATE = 2048 # 共12位,如果要忽略多位,只需要将对应的码相加即可。 # 如速度控制的码字应该是111,111,000,111等于2*2048-1-8-16-32=4039。 # 速度控制加上偏航角控制应该是:101,111,000,111等于3015 # 位置控制的码字应该是111,111,111,000,等于4088. geometry_msgs/Point position # 位置 geometry_msgs/Vector3 velocity # 线速度 geometry_msgs/Vector3 acceleration_or_force # 线加速度 float32 yaw # 角速度 float32 yaw_rate # 角加速度
construct_target函数将订阅到的命令数据转换为PositionTarget类型的消息发布出去。从而实现对无人机位置,速度,加速度的控制。
def cmd_callback(self, msg): if msg.data == self.last_cmd or msg.data == '' or msg.data == 'stop controlling': return elif msg.data == 'ARM': self.arm_state =self.arm() print(self.vehicle_type+'_'+self.vehicle_id+": Armed "+str(self.arm_state)) elif msg.data == 'DISARM': self.arm_state = not self.disarm() print(self.vehicle_type+'_'+self.vehicle_id+": Armed "+str(self.arm_state)) elif msg.data[:-1] == "mission" and not msg.data == self.mission: self.mission = msg.data print(self.vehicle_type+'_'+self.vehicle_id+": "+msg.data) else: self.flight_mode = msg.data self.flight_mode_switch() self.last_cmd = msg.data
cmd_callback函数主要用于设置无人机状态,以下为设置各个状态的函数
def arm(self): if self.armService(True): return True else: print(self.vehicle_type+'_'+self.vehicle_id+": arming failed!") return False # 通过设置服务设置无人机解锁 def disarm(self): if self.armService(False): return True else: print(self.vehicle_type+'_'+self.vehicle_id+": disarming failed!") return False # 通过设置服务设置无人机上锁 def hover(self): self.coordinate_frame = 1 self.motion_type = 0 self.target_motion = self.construct_target(x=self.current_position.x,y=self.current_position.y,z=self.current_position.z,yaw=self.current_yaw) print(self.vehicle_type+'_'+self.vehicle_id+":"+self.flight_mode) # 设置无人机悬停在上次位置 def flight_mode_switch(self): if self.flight_mode == 'HOVER': self.hover_flag = 1 self.hover() elif self.flightModeService(custom_mode=self.flight_mode): print(self.vehicle_type+'_'+self.vehicle_id+": "+self.flight_mode) return True else: print(self.vehicle_type+'_'+self.vehicle_id+": "+self.flight_mode+"failed") return False # 通过设置服务更改无人机飞行模式。
循环发布定位目标消息。
def start(self):
'''
main ROS thread
'''
while not rospy.is_shutdown():
self.target_motion_pub.publish(self.target_motion)
rate.sleep()
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。