小车ros驱动开发
小车ros设计¶
智能小车开发主要包含感知、规划和执行三个部分.涉及到大量的传感器和运动规划和控制功能.
感知和执行都是控制下位机
可以定义ros的驱动节点来控制下位机小车执行相应的操作
定义传感器节点来获取小车的传感器数据
自定义msg和srv¶
自定义msg
节点间通过topic传递传感器数据,需要自定义红外传感器数据msg及超声波传感器数据msg
红外传感器数据msg
int8[] data
超声波传感器数据msg
uint32 data
自定义srv
小车按照特定占空比运行时,需要自定义srv在调用service时传递left和right占空比
int8 left
int8 right
---
小车传感器节点¶
传感器节点负责接收小车传感器数据并通过topic发布这些数据
import rclpy
from rclpy.node import Node
import socket
from car_msg_srv.msg import InfraredData, UltrasonicData
import threading
import struct
class MyNode(Node):
def __init__(self):
super(MyNode, self).__init__('sensor_node')
# -------------- socket初始化 -------------- #
self.soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.soc.connect(('192.168.10.1', 7070))
# -------------- 定义topic发布者 -------------- #
self.infrared_pub = self.create_publisher(InfraredData, 'infrared_topic', 10)
self.ultrasonic_pub = self.create_publisher(UltrasonicData, 'ultrasonic_topic', 10)
# -------------- 传感器数据 -------------- #
self.infrared_data = 7*[0]
self.ultrasonic_data = -1
# -------------- 开启线程接收数据解析 -------------- #
threading.Thread(target=self.recv_data).start()
def pub_infrared(self):
data = InfraredData()
data.data = self.infrared_data
self.infrared_pub.publish(data)
def pub_ultrasonic(self):
data = UltrasonicData()
data.data = self.ultrasonic_data
self.ultrasonic_pub.publish(data)
def recv_data(self):
# head count
count = 0
# type
type = 0
# length
length = -1
# read_length
read_length = 0
# data_original
data_original = 4 * [b'0']
# data_trans
data_trans = 4 * [0]
while True:
original_data = self.soc.recv(1)
# 转换成数据
data = struct.unpack('<b', original_data)[0]
# -------------- 读取帧头数据 -------------- #
# head
# 防止读取数据不完整
if count < 2 and data != -128:
continue
# 读取头
if count < 2 and data == -128:
count += 1
continue
# -------------- 读取类型 -------------- #
# type
if type == 0:
type = data
continue
# -------------- 读取数据长度 -------------- #
# length
if length == -1:
length = data
continue
# -------------- 读取数据 -------------- #
# data
if read_length < length:
data_original[read_length] = original_data
data_trans[read_length] = data
read_length += 1
continue
# -------------- 验证数据是否正确 -------------- #
# check
total = 0
total += type
total += length
for i in range(length):
total += data_trans[i]
# data==total
if ((data&0xff) ^ (total&0xff))!=0:
print(f'check error,data:{data},total:{total}')
count = 0
type = 0
length = -1
read_length = 0
continue
# -------------- 接收数据正确 处理数据 -------------- #
cur_type = type
count = 0
type = 0
length = -1
read_length = 0
print('check success')
# -------------- 根据类型修改数据 -------------- #
# # 红外传感器
if cur_type==0x01:
trans_ = data_trans[0]
self.infrared_data[0] = trans_&0x01
self.infrared_data[1] = trans_>>1&0x01
self.infrared_data[2] = trans_>>2&0x01
self.infrared_data[3] = trans_>>3&0x01
self.infrared_data[4] = trans_>>4&0x01
self.infrared_data[5] = trans_>>5&0x01
self.infrared_data[6] = trans_>>6&0x01
print(f'hongwai {self.infrared_data}')
self.pub_infrared()
continue
# 超声波传感器
if cur_type==0x02:
dis_original = data_original[0]+data_original[1]+data_original[2]+data_original[3]
self.urasonic_data = struct.unpack('<i',dis_original)[0]
print(f'dis {self.urasonic_data}')
self.pub_ultrasonic()
continue
def main():
rclpy.init()
node = MyNode()
rclpy.spin(node)
node.soc.close()
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
小车传感器节点测试¶
接收传感器节点topic,获取传感器数据
import rclpy
from rclpy.node import Node
from car_msg_srv.msg import InfraredData, UltrasonicData
class MyNode(Node):
def __init__(self):
super(MyNode, self).__init__('sensor_test_node')
self.create_subscription(InfraredData,'infrared_topic',self.infrared_back,10)
self.create_subscription(UltrasonicData,'ultrasonic_topic',self.ultrasonic_back,10)
def infrared_back(self,data):
self.get_logger().info(f'{data.data}')
def ultrasonic_back(self,data):
self.get_logger().info(f'{data.data}')
def main():
rclpy.init()
node = MyNode()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
小车控制节点¶
发送接收client端控制需求,发送指令到小车下位机实现控制
import rclpy
import socket
import struct
from rclpy.node import Node
from car_msg_srv.srv import CarMove
from std_srvs.srv import Empty
class MyNode(Node):
def __init__(self):
super(MyNode, self).__init__('control_node')
# -------------- socket初始化 -------------- #
self.soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.soc.connect(('192.168.10.1', 8080))
# -------------- 常用数据准备 -------------- #
self.init_data()
# -------------- 服务定义 -------------- #
self.led_open_service = self.create_service(Empty, 'led_open', self.open_led)
self.led_close_service = self.create_service(Empty, 'led_close', self.close_led)
self.led_toggle_service = self.create_service(Empty, 'led_toggle', self.toggle_led)
self.car_up_service = self.create_service(Empty, 'car_up', self.move_up)
self.car_down_service = self.create_service(Empty, 'car_down', self.move_down)
self.car_left_service = self.create_service(Empty, 'car_left', self.move_left)
self.car_right_service = self.create_service(Empty, 'car_right', self.move_right)
self.car_stop_service = self.create_service(Empty, 'car_stop', self.move_stop)
self.car_move_service = self.create_service(CarMove, 'car_move', self.move)
def init_data(self):
# 标识位
head = -128
self.head_data = struct.pack('<b', head)
# 常用的类型
self.led_open = 0x01
self.led_close = 0x02
self.led_toggle = 0x03
self.car_up = 0x11
self.car_back = 0x12
self.car_left = 0x13
self.car_right = 0x14
self.car_stop = 0x15
self.car_move = 0x16
self.led_open_data = struct.pack('<b', self.led_open)
self.led_close_data = struct.pack('<b', self.led_close)
self.led_toggle_data = struct.pack('<b', self.led_toggle)
self.car_up_data = struct.pack('<b', self.car_up)
self.car_back_data = struct.pack('<b', self.car_back)
self.car_left_data = struct.pack('<b', self.car_left)
self.car_right_data = struct.pack('<b', self.car_right)
self.car_stop_data = struct.pack('<b', self.car_stop)
self.car_move_data = struct.pack('<b', self.car_move)
def send_instructions(self, type, type_data, length, data=()):
'''
发送请求小车
:param type: 请求类型
:param type_data: 请求类型需要发送的数据
:param length: 数据长度
:param data: 数据(元组类型)
:return:
'''
if length != len(data):
print('传递的数据长度不一致')
return
# 数据发送的数据
data_data = None
# 数据长度
length_data = struct.pack('<b', length)
# 0x01 0x02
# 总数
data_total = 0
for ele in data:
data_total += ele
if not data_data:
data_data = struct.pack('<b', ele)
continue
data_data += struct.pack('<b', ele)
# 校验位 注意:这里是无符号的校验位 方便服务器处理
check = (type + length + data_total) & 0xff
# 校验位发送的数据(注意)
check_data = struct.pack('<B', check)
# 发送数据
# 需要发送的数据
if data_data:
# 发送的数据不为空
send_data = self.head_data + self.head_data + type_data + length_data + data_data + check_data
else:
# 发送的数据不为空
send_data = self.head_data + self.head_data + type_data + length_data + check_data
# 发送数据
self.soc.send(send_data)
def open_led(self, req, res):
print('receive request open')
self.send_instructions(self.led_open, self.led_open_data, 0)
return res
def close_led(self, req, res):
print('receive request close')
self.send_instructions(self.led_close, self.led_close_data, 0)
return res
def toggle_led(self, req, res):
print('receive request toggle')
self.send_instructions(self.led_toggle, self.led_toggle_data, 0)
return res
def move_up(self, req, res):
print('receive request up')
self.send_instructions(self.car_up, self.car_up_data, 0)
return res
def move_down(self, req, res):
print('receive request down')
self.send_instructions(self.car_back, self.car_back_data, 0)
return res
def move_left(self, req, res):
print('receive request left')
self.send_instructions(self.car_left, self.car_left_data, 0)
return res
def move_right(self, req, res):
print('receive request right')
self.send_instructions(self.car_right, self.car_right_data, 0)
return res
def move_stop(self, req, res):
print('receive request stop')
self.send_instructions(self.car_stop, self.car_stop_data, 0)
return res
def move(self, req, res):
print('receive request move')
self.send_instructions(self.car_move, self.car_move_data, 2, (req.left,req.right))
return res
def main():
rclpy.init()
node = MyNode()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
小车控制节点测试¶
连接小车控制节点,发布控制需求给小车控制节点
import rclpy
import time
from rclpy.node import Node
from car_msg_srv.srv import CarMove
from std_srvs.srv import Empty
class MyNode(Node):
def __init__(self):
super(MyNode, self).__init__('sensor_test_node')
self.led_open_client = self.create_client(Empty, 'led_open')
self.led_close_client = self.create_client(Empty, 'led_close')
self.led_toggle_client = self.create_client(Empty, 'led_toggle')
self.car_up_client = self.create_client(Empty, 'car_up')
self.car_down_client = self.create_client(Empty, 'car_down')
self.car_left_client = self.create_client(Empty, 'car_left')
self.car_right_client = self.create_client(Empty, 'car_right')
self.car_stop_client = self.create_client(Empty, 'car_stop')
self.car_move_client = self.create_client(CarMove, 'car_move')
req = Empty.Request()
time.sleep(2)
self.led_open_client.call_async(req)
time.sleep(2)
self.led_close_client.call_async(req)
time.sleep(2)
self.led_toggle_client.call_async(req)
time.sleep(2)
self.car_up_client.call_async(req)
time.sleep(2)
self.car_down_client.call_async(req)
time.sleep(2)
self.car_left_client.call_async(req)
time.sleep(2)
self.car_right_client.call_async(req)
time.sleep(2)
self.car_stop_client.call_async(req)
time.sleep(2)
move_req = CarMove.Request()
move_req.left = 70
move_req.right = 70
self.car_move_client.call_async(move_req)
def main():
rclpy.init()
node = MyNode()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()