传感器数据获取
实现步骤¶
准备工作¶
- 安装固件
recv_send.bin
- 启动小车会自动开启wifi,名称是
smart_car
,密码是12345678
,电脑连接这个wifi
代码实现¶
- 搭建ui界面
- 连接小车服务器
- 开启新线程接收小车服务器数据并解析
- 更新界面数据
接收协议¶
小车目前会返回红外传感器数据及超声波传感器数据
接收的数据遵循如下协议:
一帧完整数据
帧头 | 类型 | 数据长度 | 数据位 | 校验位 |
---|---|---|---|---|
0xff 0xff | 0x01 | 0x01 | 0x00 | 0x02 |
注意:
数据长度不包含校验位
指令类型
类型 | 类型标识 |
---|---|
0x01 | 红外传感器 |
0x02 | 超声波传感器 |
数据长度
类型 | 数据长度 |
---|---|
红外传感器 | 0x01 |
超声波传感器 | 0x04 |
代码实现¶
from PyQt5.QtWidgets import QApplication, QWidget, QVBoxLayout, QHBoxLayout,QFormLayout, QGroupBox, QPushButton,QLineEdit,QCheckBox,QLabel
from PyQt5.QtCore import pyqtSignal
import socket
import sys
import threading
import struct
class MainWindow(QWidget):
def __init__(self):
super(MainWindow, self).__init__()
# -------------- 界面 -------------- #
self.init_ui()
# -------------- socket初始化 -------------- #
self.soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# -------------- 数据结果变量 -------------- #
# 红外传感器数据
self.infrared_data = []
# 超声波传感器数据
self.dis = 0
# -------------- 控制界面关闭停止任务 -------------- #
self.window_has_closed = False
def init_ui(self):
# 整体布局
whole_layout = QVBoxLayout()
self.setLayout(whole_layout)
# 分组布局
connect_gb = QGroupBox('连接小车')
infrared_gb = QGroupBox('红外传感器状态')
ultrasonic_gb = QGroupBox('超声波传感器测距')
# 添加控件
whole_layout.addWidget(connect_gb)
whole_layout.addWidget(infrared_gb)
whole_layout.addWidget(ultrasonic_gb)
# 布局
connect_layout = QHBoxLayout(connect_gb)
infrared_layout = QHBoxLayout(infrared_gb)
ultrasonic_layout = QFormLayout(ultrasonic_gb)
# -------------- 小车连接 -------------- #
self.ip_edit = QLineEdit()
self.port_edit = QLineEdit()
self.ip_edit.setPlaceholderText('请输入小车ip')
self.port_edit.setPlaceholderText('请输入小车端口')
self.ip_edit.setText('192.168.10.1')
self.port_edit.setText('7070')
connect_btn = QPushButton('连接')
connect_layout.addWidget(self.ip_edit)
connect_layout.addWidget(self.port_edit)
connect_layout.addWidget(connect_btn)
# -------------- 红外传感器 -------------- #
self.ck1 = QCheckBox('灯1')
self.ck2 = QCheckBox('灯2')
self.ck3 = QCheckBox('灯3')
self.ck4 = QCheckBox('灯4')
self.ck5 = QCheckBox('灯5')
self.ck6 = QCheckBox('灯6')
self.ck7 = QCheckBox('灯7')
self.ck1.setDisabled(True)
self.ck2.setDisabled(True)
self.ck3.setDisabled(True)
self.ck4.setDisabled(True)
self.ck5.setDisabled(True)
self.ck6.setDisabled(True)
self.ck7.setDisabled(True)
infrared_layout.addWidget(self.ck1)
infrared_layout.addWidget(self.ck2)
infrared_layout.addWidget(self.ck3)
infrared_layout.addWidget(self.ck4)
infrared_layout.addWidget(self.ck5)
infrared_layout.addWidget(self.ck6)
infrared_layout.addWidget(self.ck7)
# -------------- 超声波传感器 -------------- #
self.dis_label = QLabel()
ultrasonic_layout.addRow('距离:',self.dis_label)
# -------------- 信号和槽连接 -------------- #
connect_btn.clicked.connect(self.connect_to_server)
def connect_to_server(self):
ip = self.ip_edit.text()
port = int(self.port_edit.text())
self.soc.connect((ip, port))
# 开启线程接收传感器数据
threading.Thread(target=self.recv_sensor_data).start()
def recv_sensor_data(self):
# 帧头数据个数
count = 0
# 任务类型
type = 0
# 数据长度
length = -1
# 已经读取的数据长度
read_length = 0
# 转换之后的数据
recv_data_trans = 4 * [0]
# 接收的原始数据
recv_data_original = 4 * [b'0']
while not self.window_has_closed:
print('read data')
original_data = self.soc.recv(1)
# 转换成数据
data = struct.unpack('<b', original_data)[0]
if count < 2 and data != -128:
continue
# -------------- 读取帧头数据 -------------- #
if count < 2 and data == -128:
print("recv head\n")
count += 1
continue
# -------------- 读取类型 -------------- #
if type == 0:
# 标识
type = data
count += 1
print(f"recv type type={type}")
continue
# -------------- 读取数据长度 -------------- #
if length == -1:
length = data
print(f"recv length length={length}")
continue
# -------------- 读取数据 -------------- #
# 读取数据
if read_length < length:
print(f"recv mini data read_length={read_length}")
recv_data_original[read_length] = original_data
recv_data_trans[read_length] = data
read_length += 1
continue
# -------------- 验证数据是否正确 -------------- #
total = 0
total += type
total += length
for i in range(length):
total += recv_data_trans[i]
print('type:', type, ' data:', data, ' total:', total, ' total & 0xff', total & 0xff)
# 结果错误
if ((data & 0xff) ^ (total & 0xff)) != 0:
print(
f"check error,check:{data},total:{total},total&0xff:{total & 0xff},{recv_data_trans[0]},{recv_data_trans[1]},{recv_data_trans[2]},{recv_data_trans[3]}")
count = 0
type = 0
length = -1
read_length = 0
continue
# -------------- 接收数据正确 处理数据 -------------- #
# 重置数据
count = 0
# 通过临时变量保存type 并重置type
cur_type = type
type = 0
length = -1
read_length = 0
print('type ', cur_type)
# -------------- 根据类型修改数据 -------------- #
# 红外传感器
if cur_type == 1:
# 清空
self.infrared_data.clear()
sensor_data = recv_data_trans[0]
s1 = sensor_data & 0x01
s2 = sensor_data >> 1 & 0x01
s3 = sensor_data >> 2 & 0x01
s4 = sensor_data >> 3 & 0x01
s5 = sensor_data >> 4 & 0x01
s6 = sensor_data >> 5 & 0x01
s7 = sensor_data >> 6 & 0x01
self.infrared_data.append(sensor_data & 0x01)
self.infrared_data.append(sensor_data >> 1 & 0x01)
self.infrared_data.append(sensor_data >> 2 & 0x01)
self.infrared_data.append(sensor_data >> 3 & 0x01)
self.infrared_data.append(sensor_data >> 4 & 0x01)
self.infrared_data.append(sensor_data >> 5 & 0x01)
self.infrared_data.append(sensor_data >> 6 & 0x01)
print('红外数据:', self.infrared_data)
self.update_ui(1)
continue
# 超声波传感器
if cur_type == 2:
dis_bit = recv_data_original[0] + recv_data_original[1] + recv_data_original[2] + recv_data_original[3]
# self.dis = int.from_bytes(dis_bit, 'little')
self.dis = struct.unpack('<i',dis_bit)
print('距离:', self.dis)
self.update_ui(2)
def update_ui(self,type):
'''
接收到数据 更新界面
:param type: 类型 1:更新红外传感器数据 2:更新超声波传感器数据
:return:
'''
if type==1:
self.ck1.setChecked(self.infrared_data[0])
self.ck2.setChecked(self.infrared_data[1])
self.ck3.setChecked(self.infrared_data[2])
self.ck4.setChecked(self.infrared_data[3])
self.ck5.setChecked(self.infrared_data[4])
self.ck6.setChecked(self.infrared_data[5])
self.ck7.setChecked(self.infrared_data[6])
return
if type==2:
self.dis_label.setText(str(self.dis))
if __name__ == '__main__':
app = QApplication(sys.argv)
window = MainWindow()
window.show()
exec_ = app.exec_()
window.window_has_closed = True
sys.exit(exec_)