跳转至

小车ros驱动开发

小车ros设计

智能小车开发主要包含感知、规划和执行三个部分.涉及到大量的传感器和运动规划和控制功能.

感知和执行都是控制下位机

可以定义ros的驱动节点来控制下位机小车执行相应的操作

定义传感器节点来获取小车的传感器数据

自定义msg和srv

自定义msg

节点间通过topic传递传感器数据,需要自定义红外传感器数据msg及超声波传感器数据msg

红外传感器数据msg

int8[] data

超声波传感器数据msg

uint32 data

自定义srv

小车按照特定占空比运行时,需要自定义srv在调用service时传递left和right占空比

int8 left
int8 right
---

小车传感器节点

传感器节点负责接收小车传感器数据并通过topic发布这些数据

import rclpy
from rclpy.node import Node
import socket
from car_msg_srv.msg import InfraredData, UltrasonicData
import threading
import struct


class MyNode(Node):
    def __init__(self):
        super(MyNode, self).__init__('sensor_node')
        # -------------- socket初始化 -------------- #
        self.soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.soc.connect(('192.168.10.1', 7070))
        # -------------- 定义topic发布者 -------------- #
        self.infrared_pub = self.create_publisher(InfraredData, 'infrared_topic', 10)
        self.ultrasonic_pub = self.create_publisher(UltrasonicData, 'ultrasonic_topic', 10)
        # -------------- 传感器数据 -------------- #
        self.infrared_data = 7*[0]
        self.ultrasonic_data = -1
        # -------------- 开启线程接收数据解析 -------------- #
        threading.Thread(target=self.recv_data).start()

    def pub_infrared(self):
        data = InfraredData()
        data.data = self.infrared_data
        self.infrared_pub.publish(data)

    def pub_ultrasonic(self):
        data = UltrasonicData()
        data.data = self.ultrasonic_data
        self.ultrasonic_pub.publish(data)

    def recv_data(self):
        # head count
        count = 0
        # type
        type = 0
        # length
        length = -1
        # read_length
        read_length = 0
        # data_original
        data_original = 4 * [b'0']
        # data_trans
        data_trans = 4 * [0]
        while True:
            original_data = self.soc.recv(1)
            # 转换成数据
            data = struct.unpack('<b', original_data)[0]
            # -------------- 读取帧头数据 -------------- #
            # head
            # 防止读取数据不完整
            if count < 2 and data != -128:
                continue
            # 读取头
            if count < 2 and data == -128:
                count += 1
                continue
            # -------------- 读取类型 -------------- #
            # type
            if type == 0:
                type = data
                continue
            # -------------- 读取数据长度 -------------- #
            # length
            if length == -1:
                length = data
                continue
            # -------------- 读取数据 -------------- #
            # data
            if read_length < length:
                data_original[read_length] = original_data
                data_trans[read_length] = data
                read_length += 1
                continue

            # -------------- 验证数据是否正确 -------------- #
            # check
            total = 0
            total += type
            total += length
            for i in range(length):
                total += data_trans[i]
            # data==total
            if ((data&0xff) ^ (total&0xff))!=0:
                print(f'check error,data:{data},total:{total}')
                count = 0
                type = 0
                length = -1
                read_length = 0
                continue
            # -------------- 接收数据正确 处理数据 -------------- #
            cur_type = type

            count = 0
            type = 0
            length = -1
            read_length = 0
            print('check success')
            # -------------- 根据类型修改数据 -------------- #
            # # 红外传感器
            if cur_type==0x01:
                trans_ = data_trans[0]
                self.infrared_data[0] = trans_&0x01
                self.infrared_data[1] = trans_>>1&0x01
                self.infrared_data[2] = trans_>>2&0x01
                self.infrared_data[3] = trans_>>3&0x01
                self.infrared_data[4] = trans_>>4&0x01
                self.infrared_data[5] = trans_>>5&0x01
                self.infrared_data[6] = trans_>>6&0x01
                print(f'hongwai {self.infrared_data}')
                self.pub_infrared()
                continue
            # 超声波传感器
            if cur_type==0x02:
                dis_original = data_original[0]+data_original[1]+data_original[2]+data_original[3]
                self.urasonic_data = struct.unpack('<i',dis_original)[0]
                print(f'dis {self.urasonic_data}')
                self.pub_ultrasonic()
                continue


def main():
    rclpy.init()
    node = MyNode()
    rclpy.spin(node)
    node.soc.close()
    node.destroy_node()
    rclpy.shutdown()


if __name__ == '__main__':
    main()

小车传感器节点测试

接收传感器节点topic,获取传感器数据

import rclpy
from rclpy.node import Node
from car_msg_srv.msg import InfraredData, UltrasonicData


class MyNode(Node):
    def __init__(self):
        super(MyNode, self).__init__('sensor_test_node')
        self.create_subscription(InfraredData,'infrared_topic',self.infrared_back,10)
        self.create_subscription(UltrasonicData,'ultrasonic_topic',self.ultrasonic_back,10)

    def infrared_back(self,data):
        self.get_logger().info(f'{data.data}')

    def ultrasonic_back(self,data):
        self.get_logger().info(f'{data.data}')

def main():
    rclpy.init()
    node = MyNode()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()


if __name__ == '__main__':
    main()

小车控制节点

发送接收client端控制需求,发送指令到小车下位机实现控制

import rclpy
import socket
import struct
from rclpy.node import Node
from car_msg_srv.srv import CarMove
from std_srvs.srv import Empty


class MyNode(Node):
    def __init__(self):
        super(MyNode, self).__init__('control_node')
        # -------------- socket初始化 -------------- #
        self.soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.soc.connect(('192.168.10.1', 8080))
        # -------------- 常用数据准备 -------------- #
        self.init_data()
        # -------------- 服务定义 -------------- #
        self.led_open_service = self.create_service(Empty, 'led_open', self.open_led)
        self.led_close_service = self.create_service(Empty, 'led_close', self.close_led)
        self.led_toggle_service = self.create_service(Empty, 'led_toggle', self.toggle_led)
        self.car_up_service = self.create_service(Empty, 'car_up', self.move_up)
        self.car_down_service = self.create_service(Empty, 'car_down', self.move_down)
        self.car_left_service = self.create_service(Empty, 'car_left', self.move_left)
        self.car_right_service = self.create_service(Empty, 'car_right', self.move_right)
        self.car_stop_service = self.create_service(Empty, 'car_stop', self.move_stop)
        self.car_move_service = self.create_service(CarMove, 'car_move', self.move)

    def init_data(self):
        # 标识位
        head = -128
        self.head_data = struct.pack('<b', head)
        # 常用的类型
        self.led_open = 0x01
        self.led_close = 0x02
        self.led_toggle = 0x03
        self.car_up = 0x11
        self.car_back = 0x12
        self.car_left = 0x13
        self.car_right = 0x14
        self.car_stop = 0x15
        self.car_move = 0x16
        self.led_open_data = struct.pack('<b', self.led_open)
        self.led_close_data = struct.pack('<b', self.led_close)
        self.led_toggle_data = struct.pack('<b', self.led_toggle)
        self.car_up_data = struct.pack('<b', self.car_up)
        self.car_back_data = struct.pack('<b', self.car_back)
        self.car_left_data = struct.pack('<b', self.car_left)
        self.car_right_data = struct.pack('<b', self.car_right)
        self.car_stop_data = struct.pack('<b', self.car_stop)
        self.car_move_data = struct.pack('<b', self.car_move)

    def send_instructions(self, type, type_data, length, data=()):
        '''
        发送请求小车
        :param type: 请求类型
        :param type_data: 请求类型需要发送的数据
        :param length: 数据长度
        :param data: 数据(元组类型)
        :return:
        '''
        if length != len(data):
            print('传递的数据长度不一致')
            return
        # 数据发送的数据
        data_data = None
        # 数据长度
        length_data = struct.pack('<b', length)
        # 0x01 0x02
        # 总数
        data_total = 0
        for ele in data:
            data_total += ele
            if not data_data:
                data_data = struct.pack('<b', ele)
                continue
            data_data += struct.pack('<b', ele)
        # 校验位  注意:这里是无符号的校验位  方便服务器处理
        check = (type + length + data_total) & 0xff
        # 校验位发送的数据(注意)
        check_data = struct.pack('<B', check)
        # 发送数据
        # 需要发送的数据
        if data_data:
            # 发送的数据不为空
            send_data = self.head_data + self.head_data + type_data + length_data + data_data + check_data
        else:
            # 发送的数据不为空
            send_data = self.head_data + self.head_data + type_data + length_data + check_data
        # 发送数据
        self.soc.send(send_data)

    def open_led(self, req, res):
        print('receive request open')
        self.send_instructions(self.led_open, self.led_open_data, 0)
        return res

    def close_led(self, req, res):
        print('receive request close')
        self.send_instructions(self.led_close, self.led_close_data, 0)
        return res

    def toggle_led(self, req, res):
        print('receive request toggle')
        self.send_instructions(self.led_toggle, self.led_toggle_data, 0)
        return res

    def move_up(self, req, res):
        print('receive request up')
        self.send_instructions(self.car_up, self.car_up_data, 0)
        return res

    def move_down(self, req, res):
        print('receive request down')
        self.send_instructions(self.car_back, self.car_back_data, 0)
        return res

    def move_left(self, req, res):
        print('receive request left')
        self.send_instructions(self.car_left, self.car_left_data, 0)
        return res

    def move_right(self, req, res):
        print('receive request right')
        self.send_instructions(self.car_right, self.car_right_data, 0)
        return res

    def move_stop(self, req, res):
        print('receive request stop')
        self.send_instructions(self.car_stop, self.car_stop_data, 0)
        return res

    def move(self, req, res):
        print('receive request move')
        self.send_instructions(self.car_move, self.car_move_data, 2, (req.left,req.right))
        return res


def main():
    rclpy.init()
    node = MyNode()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()


if __name__ == '__main__':
    main()

小车控制节点测试

连接小车控制节点,发布控制需求给小车控制节点

import rclpy
import time
from rclpy.node import Node
from car_msg_srv.srv import CarMove
from std_srvs.srv import Empty


class MyNode(Node):
    def __init__(self):
        super(MyNode, self).__init__('sensor_test_node')
        self.led_open_client = self.create_client(Empty, 'led_open')
        self.led_close_client = self.create_client(Empty, 'led_close')
        self.led_toggle_client = self.create_client(Empty, 'led_toggle')
        self.car_up_client = self.create_client(Empty, 'car_up')
        self.car_down_client = self.create_client(Empty, 'car_down')
        self.car_left_client = self.create_client(Empty, 'car_left')
        self.car_right_client = self.create_client(Empty, 'car_right')
        self.car_stop_client = self.create_client(Empty, 'car_stop')
        self.car_move_client = self.create_client(CarMove, 'car_move')

        req = Empty.Request()
        time.sleep(2)
        self.led_open_client.call_async(req)
        time.sleep(2)
        self.led_close_client.call_async(req)
        time.sleep(2)
        self.led_toggle_client.call_async(req)
        time.sleep(2)
        self.car_up_client.call_async(req)
        time.sleep(2)
        self.car_down_client.call_async(req)
        time.sleep(2)
        self.car_left_client.call_async(req)
        time.sleep(2)
        self.car_right_client.call_async(req)
        time.sleep(2)
        self.car_stop_client.call_async(req)
        time.sleep(2)
        move_req = CarMove.Request()
        move_req.left = 70
        move_req.right = 70
        self.car_move_client.call_async(move_req)


def main():
    rclpy.init()
    node = MyNode()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()


if __name__ == '__main__':
    main()