【Python】获取 Arduino 上陀螺仪及压力传感器数据

本文内容具有一定的时效性,无法保证其在您阅读时依旧正确。开发板为 Arduino UNO。

Arduino 部分

Arduino 部分的代码,主要实现了:

  • 读取与校验 JY61P 陀螺仪传感器的数据。
  • 读取压力传感器的数据。
  • 通过串口将数据发送至 Python。

JY61P 参数

主要参数参数值
使用芯片ICM-42605
电压3.3-5V
电流<25mA
通信方式串口TTL/IIC通信
输出数据三轴(加速度+角速度+角度)+ 四元数
陀螺仪范围±250/500/1000/2000°/s(可设置)
加速度范围±2/4/8/16g(可设置)
角度范围X/Z轴:±180° Y轴:±90°
角度精度静态:0.05° 动态:0.1°
回传速度0.1-200Hz(可设置)
波特率4800-921600(IIC速率达400K)

读取陀螺仪的数据

数据协议

此部分内容也可以自行阅读《JY61P姿态角度传感器说明书》。

说明

  1. 数据是按照16进制方式发送的,不是 ASCII 码。
  2. 每个数据分低字节和高字节依次传送,二者组合成一个有符号的 short 类型的数据。

加速度

数据编号数据内容含义
00x55包头
10x51加速度标识
2AxLX分量低字节
3AxHX分量高字节
4AyLY分量低字节
5AyHY分量高字节
6AzLZ分量低字节
7AzHZ分量高字节
8TL温度低字节
9TH温度高字节
10SUM校验和
C++
// 加速度计算公式(g 为重力加速度;单位:米每二次方秒) double a_x = (((short)AxH << 8) | AxL) / 32768.0 * 16 * g; double a_y = (((short)AyH << 8) | AyL) / 32768.0 * 16 * g; double a_z = (((short)AzH << 8) | AzL) / 32768.0 * 16 * g; // 温度计算公式(单位:摄氏度) double T = (((short)TH << 8) | TL) / 100.0; // 校验和(如果下面的等式成立,则校验成功) SUM == (0x55 + 0x51 + AxL + AxH + AyL + AyH + AzL + AzH + TL + TH)

角速度

数据编号数据内容含义
00x55包头
10x52角速度标识
2WxLX分量低字节
3WxHX分量高字节
4WyLY分量低字节
5WyHY分量高字节
6WzLZ分量低字节
7WzHZ分量高字节
8TL温度低字节
9TH温度高字节
10SUM校验和
C++
// 角速度计算公式(单位:角度每秒) double w_x = (((short)WxH << 8) | WxL) / 32768.0 * 2000; double w_y = (((short)WyH << 8) | WyL) / 32768.0 * 2000; double w_z = (((short)WzH << 8) | WzL) / 32768.0 * 2000; // 温度计算公式(单位:摄氏度) double T = (((short)TH << 8) | TL) / 100.0; // 校验和(如果下面的等式成立,则校验成功) SUM == (0x55 + 0x52 + WxL + WxH + WyL + WyH + WzL + WzH + TL + TH)

角度

数据编号数据内容含义
00x55包头
10x53角度标识
2RollLX分量(滚转角)低字节
3RollHX分量(滚转角)高字节
4PitchLY分量(俯仰角)低字节
5PitchHY分量(俯仰角)高字节
6YawLZ分量(偏航角)低字节
7YawHZ分量(偏航角)高字节
8TL温度低字节
9TH温度高字节
10SUM校验和
C++
// 加速度计算公式(单位:角度) double Roll = (((short)RollH << 8) | RollL) / 32768.0 * 180; double Pitch = (((short)PitchH << 8) | PitchL) / 32768.0 * 180; double Yaw = (((short)YawH << 8) | YawL) / 32768.0 * 180; // 温度计算公式(单位:摄氏度) double T = (((short)TH << 8) | TL) / 100.0; // 校验和(如果下面的等式成立,则校验成功) SUM == (0x55 + 0x53 + RollL + RollH + PitchL + PitchH + YawL + YawH + TL + TH)

辅助类型

C++
struct Vector3 { short X; /* 向量的X分量 */ short Y; /* 向量的Y分量 */ short Z; /* 向量的Z分量 */ }; struct GyroData { Vector3 Acceleration; /* 加速度 */ Vector3 AngularVelocity; /* 角速度 */ Vector3 Rotation; /* 旋转 */ }; typedef void (*GyroDataHandler)(GyroData data);

陀螺仪类

陀螺仪类实现了对陀螺仪数据的读取与校验。

C++
/** * RX数据缓冲区的长度 */ #define RX_BUFFER_LEN 11 class Gyro { public: /** * 创建一个陀螺仪传感器的实例 */ Gyro(void); /** * 更新陀螺仪数据 * @param[in] ucData 从串口中读取的一个字节 * @note 这个方法需要在loop函数中不断调用 */ void Update(unsigned char ucData); /** * 设置用来处理接收完成的陀螺仪数据的函数 * @param[in] handler 陀螺仪数据的处理函数 */ void OnFinishReceivingData(GyroDataHandler handler); private: GyroData dataCache; /* 陀螺仪数据缓存 */ GyroDataHandler dataHandler; /* 陀螺仪数据处理函数 */ bool isDataValid; /* dataCache的数据是否合法 */ unsigned char ucRxSum; /* 从RX读取的字节的和 */ unsigned char ucRxCount; /* 从RX读取的字节数量 */ unsigned char ucRxBuffer[RX_BUFFER_LEN]; /* RX数据缓冲区 */ };

实现陀螺仪类的构造方法,对字段进行初始化。

C++
Gyro::Gyro(void) { this->dataHandler = NULL; this->isDataValid = true; this->ucRxSum = 0; this->ucRxCount = 0; // memset(&this->dataCache, 0, sizeof(GyroData)); // memset(this->ucRxBuffer, 0, sizeof(unsigned char) * RX_BUFFER_LEN); }

实现核心方法 Gyro::Update,处理传入的一个字节的数据,根据协议,完成数据的读取和校验。

在三组数据读取完成后,如果数据全部校验成功,将会调用设置的回调,即 dataHandler 字段所指向的函数。

C++
void Gyro::Update(unsigned char ucData) { this->ucRxBuffer[this->ucRxCount++] = ucData; if (ucRxBuffer[0] != 0x55) { this->ucRxCount = 0; return; } if (this->ucRxCount < RX_BUFFER_LEN) { this->ucRxSum += ucData; } else { // 进入到该分支时,ucData是SUM的值,被写入了缓冲区的[10] bool isValid = (this->ucRxSum == ucData); switch (this->ucRxBuffer[1]) { case 0x51: this->isDataValid &= isValid; memcpy(&this->dataCache.Acceleration, &ucRxBuffer[2], sizeof(Vector3)); break; case 0x52: this->isDataValid &= isValid; memcpy(&this->dataCache.AngularVelocity, &ucRxBuffer[2], sizeof(Vector3)); break; case 0x53: memcpy(&this->dataCache.Rotation, &ucRxBuffer[2], sizeof(Vector3)); if (this->dataHandler && this->isDataValid && isValid) { // 为 dataCache 创建一个防御性副本 this->dataHandler(this->dataCache); // 触发事件 } this->isDataValid = true; break; } this->ucRxSum = 0; this->ucRxCount = 0; } }

提供一个设置 dataHandler 字段的公共接口。

C++
void Gyro::OnFinishReceivingData(GyroDataHandler handler) { this->dataHandler = handler; }

读取压力传感器数据

定义压力传感器类,根据我的需要,设置了6个压力传感器。

C++
/** * 压力传感器的数量 */ #define PRESSURE_SENSOR_COUNT 6 class Pressures { public: /** * 创建一组压力传感器的实例 * @param[in] sensorPins 一个数组,保存所有连接了压力传感器的引脚的编号 */ Pressures(uint8_t *sensorPins); /** * 从压力传感器中读取数据 * @param[in] buffer 保存数据的缓冲区,其长度应该与压力传感器的数量一致 */ void ReadData(unsigned short *buffer) const; private: uint8_t sensorPins[PRESSURE_SENSOR_COUNT]; /* 所有连接了压力传感器的引脚的编号 */ };

实现构造方法,按顺序保存所有压力传感器使用的引脚编号。

C++
Pressures::Pressures(uint8_t *sensorPins) { memcpy(this->sensorPins, sensorPins, PRESSURE_SENSOR_COUNT * sizeof(uint8_t)); }

提供读取数据的接口。遍历所有的引脚编号,读取模拟量并保存到缓冲区中。
将每一个压力传感器的值转换为16位无符号整数,可以在不丢失数据的前提下,方便后续在 Python 中读取。

C++
void Pressures::ReadData(unsigned short *buffer) const { for (int i = 0; i < PRESSURE_SENSOR_COUNT; i++) { *buffer++ = (unsigned short)analogRead(this->sensorPins[i]); } }

定义 Python 数据包

发送给 Python 的数据包格式由标识数据、陀螺仪数据、压力传感器数据组成,其定义如下:

C++
struct PyPackage { unsigned char Flag0; /* 第一个标识 */ unsigned char Flag1; /* 第二个标识 */ GyroData GyroData; /* 陀螺仪数据 */ unsigned short PressureData[PRESSURE_SENSOR_COUNT]; /* 压力传感器数据 */ };

我使用如下两个值来作为标识数据:

C++
/** * 数据包的第一个标识 */ #define PACKAGE_FLAG_0 0x55 /** * 数据包的第二个标识 */ #define PACKAGE_FLAG_1 0x59

整合

C++
#define BAUDRATE 9600 #define DELAY 0 Gyro *gyro; Pressures *pressures; void SendPackage(GyroData data);

setup 函数中初始化,主要工作如下:

  1. 初始化陀螺仪对象。
  2. SendPackage 设置为陀螺仪数据接收成功的回调函数。
  3. 初始化压力传感器对象。
  4. 以特定的波特率初始化串口。
C++
void setup() { gyro = new Gyro(); gyro->OnFinishReceivingData(SendPackage); uint8_t sensors[PRESSURE_SENSOR_COUNT] = {A0, A1, A2, A3, A4, A5}; pressures = new Pressures(sensors); Serial.begin(BAUDRATE); }

然后,在 loop 中更新陀螺仪的数据,再做适当的延迟。

C++
void loop() { while (Serial.available()) { gyro->Update(Serial.read()); } delay(DELAY); }

最后,实现 Python 数据包的发送。

C++
void SendPackage(GyroData data) { PyPackage package; package.Flag0 = PACKAGE_FLAG_0; package.Flag1 = PACKAGE_FLAG_1; package.GyroData = data; pressures->ReadData(package.PressureData); Serial.write((char *)&package, sizeof(PyPackage)); }

到这里,Arduino 部分已经完成!接下来就是在 Python 中接收数据。

Python 部分

Python 部分的代码,主要实现了 Arduino 数据包的接收和解析。

二进制流

定义 CStream 类,从字节数组中解析数据。

声明基本方法

声明 __init____len__ 方法。
__init__ 方法接受一个字节序列 cbytes 作为参数,后续的解析将基于这个序列中的数据来进行。

class CStream(object):
  def __init__(self, cbytes: bytes):
    self.__cbytes = cbytes
    self.__index = 0

  def __len__(self):
    return len(self.__cbytes)

根据之前在 Arduino 中编写的代码,我们知道:数据包包体(不包括前两个标识字节)中的每一个数(包括向量的每一个分量)都由两个字节组成,而且使用小字节序(Little-Endian)

因此,我们声明一个方法,从字节序列中向后读取两个字节,并按低位字节和高位字节的顺序返回。

def __readTwoBytes(self):
  low = self.__cbytes[self.__index]
  high = self.__cbytes[self.__index + 1]
  self.__index += 2
  return low, high

读取整数

首先,读取整数的低位字节和高位字节,分别记为 lowhigh

因为从 Arduino 发送的整数都由两个字节组成,而 Python 的 int 类型占用的字节数大于2,所以当整数有符号时, (high << 8) | low 运算会将它的符号位当做数值位处理,导致结果不一定是正确的。

读取有符号整数时,先忽略高位字节中的符号位,进行移位运算:

v = ((high & 0x7F) << 8) | low

对原始数据的符号做判断:

sign = ((high >> 7) & 0x1)
# sign == 0 表示正数
# sign == 1 表示负数

如果是负数的话,还需要额外的运算。负数的储存形式是:相应正数按位取反再加1。使用它的逆运算就可以得到原始数值:

-((~(v - 1)) & 0x7FFF)

读取无符号整数时,直接进行移位运算即可。

最后,整理成一个方法:

def readInt(self, signed=True):
  low, high = self.__readTwoBytes()

  if signed:
    v = ((high & 0x7F) << 8) | low
    return v if (high & 0x80) == 0 else -((~(v - 1)) & 0x7FFF)
  else:
    return (high << 8) | low

注:有符号整数的读取也可以使用 ctypes 模块中的类来完成。

读取浮点数

从数据协议中可以发现:绝大部分的整数都会除以32768(2的15次方,即16位有符号整数的最大值)然后转换为浮点数,再进行其他运算。故可以封装一个方法来完成这步操作。

def readFloat(self):
  return self.readInt(signed=True) / 32768.0

读取向量

def readIntVec(self, elementDecorator, dimension=3, signed=True):
  return [
    elementDecorator(i, self.readInt(signed=signed))
    for i in range(dimension)
  ]

def readFloatVec(self, elementDecorator, dimension=3):
  return [
    elementDecorator(i, self.readFloat())
    for i in range(dimension)
  ]

定义数据包

根据 Arduino 端的代码来进行定义。因为标识数据只是在接收数据包时使用的,所以不需要增加这个字段。

class ArduinoPackage(object):
  def __init__(self, acceleration, angularVelocity, rotation, leftPressures, rightPressures):
    self.acceleration = acceleration
    self.angularVelocity = angularVelocity
    self.rotation = rotation
    self.leftPressures = leftPressures
    self.rightPressures = rightPressures

接收数据包

定义 ArduinoReader 类,从串口中接收数据包。

声明基本方法

对字段做初始化,并提供上下文管理器和一些公共方法。

from serial import Serial # 需要安装 pyserial 模块

class ArduinoReader(object):
  def __init__(self, **kwargs):
    port = kwargs.get('port', None)
    baudrate = kwargs.get('baudrate', 9600)
    timeout = kwargs.get('timeout', 1)
    
    self.__serial = Serial(port=port, baudrate=baudrate, timeout=timeout)
    self.gravity = kwargs.get('gravity', 9.8)
    self.packageFlag0 = kwargs.get('packageFlag0', 0x55)
    self.packageFlag1 = kwargs.get('packageFlag1', 0x59)
    self.packageBodySize = kwargs.get('packageBodySize', 30)

  def open(self):
    self.__serial.open()

  def close(self):
    self.__serial.close()

  def __enter__(self):
    self.__serial.__enter__()
    return self

  def __exit__(self, extype, value, trace):
    self.__serial.__exit__(extype, value, trace)

  @property
  def canRead(self):
    return self.__serial.is_open

检验标识数据

阻塞当前线程,不断从串口中读取两个字节,与预设的标识字节做比较,相等时从方法中返回,准备读取包体。

def __readSingleByte(self):
  data = self.__serial.read(1)
  return data[0] if len(data) == 1 else None

def __checkPackageFlags(self):
  previous = self.__readSingleByte()

  while True:
    current = self.__readSingleByte()
    # 前面两个字节是标识位
    if previous == self.packageFlag0 and current == self.packageFlag1:
      return
    else:
      previous = current

解析为 Python 对象

从串口中读取一定数量的字节,利用 CStream 解析字节序列中的内容,最后创建一个 ArduinoPackage 对象并返回。

def __makePackage(self, cbytes):
  if len(cbytes) != self.packageBodySize:
    return None

  stream = CStream(cbytes)

  accel = stream.readFloatVec(lambda i, v: round(v * 16 * self.gravity, 4))
  angularV = stream.readFloatVec(lambda i, v: round(v * 2000, 4))
  rotation = stream.readFloatVec(lambda i, v: round(v * 180, 4))
  
  left = stream.readIntVec(lambda i, v: (v), signed=False)
  right = stream.readIntVec(lambda i, v: (v), signed=False)

  return ArduinoPackage(accel, angularV, rotation, left, right)

def read(self):
  if not self.canRead:
    return None

  self.__checkPackageFlags()
  cbytes = self.__serial.read(self.packageBodySize)
  return self.__makePackage(cbytes)

使用

from arduino_reader import ArduinoReader

arduinoConfig = {
  "port": "COM6",
  "baudrate": 9600,
  "timeout": 1,
  "gravity": 9.8,
  "packageFlag0": 0x55,
  "packageFlag1": 0x59,
  "packageBodySize": 30
}

with ArduinoReader(**arduinoConfig) as reader:
  while True:
    package = reader.read()
    # ...


点击此处可投喂作者 (っ´▽`)っ

感谢!您的名字将被记录于「」~

微信赞赏码

微信