消息总线(Message Bus)是一种基于发布-订阅(Publish-Subscribe)模式的通信机制,用于实现系统中各模块之间的松耦合通信。
核心概念:
主要优势:
uORB(Micro Object Request Broker)是 PX4 飞控系统中的轻量级消息总线实现,后被 RT-Thread 等系统借鉴并移植。
uORB 的特性包括:
典型使用示例(RT-Thread 下):
// 发布者代码 orb_advert_t att_pub = orb_advertise(ORB_ID(attitude), &att); orb_publish(ORB_ID(attitude), att_pub, &att); // 订阅者代码 int att_sub = orb_subscribe(ORB_ID(attitude)); orb_copy(ORB_ID(attitude), att_sub, &att);
FreeRTOS 本身并未内置消息总线机制,仅提供基础的进程间通信(IPC)组件:
因此,若需实现发布-订阅模型,必须自行构建上层逻辑。
| 方案 | 复杂度 | 性能 | 解耦程度 | 内存占用 | 适用场景 | 推荐度 |
|---|---|---|---|---|---|---|
| 方案一:消息队列 | ? | ????? | ?? | 低 | 简单点对点通信 | ??? |
| 方案二:事件组 | ? | ???? | ? | 极低 | 状态同步 | ?? |
| 方案三:流缓冲区 | ?? | ????? | ?? | 中 | 数据流传输 | ?? |
| 方案四:自实现发布-订阅 | ??? | ???? | ????? | 中 | 复杂系统 | ????? |
| 方案五:第三方库 | ?? | ??? | ????? | 高 | 快速开发 | ??? |
| 方案 | RAM | Flash | 代码量 |
|---|---|---|---|
| 消息队列 | ~1KB | ~0.5KB | 10 行 |
| 事件组 | ~0.5KB | ~0.3KB | 10 行 |
| 流缓冲区 | ~2KB | ~0.5KB | 15 行 |
| 自实现 PubSub | ~5KB | ~3KB | 300 行 |
| MicroROS | ~64KB | ~128KB | 0 行(库) |
利用 FreeRTOS 提供的原生消息队列功能进行点对点通信。每个消息主题对应一个独立的消息队列,发布者向队列写入数据,订阅者从中读取。
xQueueCreate
架构图:
发布者任务 订阅者任务1 订阅者任务2
| | |
|---> Queue1 ----------->| |
| |
|---> Queue2 ----------------------------------->|
特点:
// ============================================
// 定义消息类型
// ============================================
typedef struct {
uint32_t timestamp;
float roll;
float pitch;
float yaw;
} attitude_msg_t;
typedef struct {
uint32_t timestamp;
float lat;
float lon;
float alt;
} gps_msg_t;
// ============================================
// 创建消息队列(每个主题一个队列)
// ============================================
QueueHandle_t attitude_queue;
QueueHandle_t gps_queue;
QueueHandle_t sensor_queue;
void init_message_queues(void) {
// 创建姿态消息队列(深度10,元素大小为 attitude_msg_t)
attitude_queue = xQueueCreate(10, sizeof(attitude_msg_t));
通过事件标志位通知机制实现轻量级状态广播。适用于简单的状态同步场景,如任务就绪、中断触发等。
优点:内存占用极低,响应迅速;缺点:无法传递结构化数据,仅适合布尔型状态通知。
基于 FreeRTOS 的流缓冲区(Stream Buffer)实现连续数据流的高效传输,尤其适合传感器数据、日志流等不定长数据。
优势:支持变长消息,高吞吐量,底层优化良好;限制:需手动管理解析逻辑,不适合结构复杂的主题路由。
在 FreeRTOS 基础上封装一套完整的发布-订阅中间件,包含主题注册、订阅管理、回调分发等功能。
关键设计要素:
虽然开发成本较高,但能显著提升系统的模块化和可扩展性。
引入成熟的消息总线库,如 MicroROS、Eclipse Paho、MQTT-SN 或其他嵌入式 Pub/Sub 框架。
优势:功能完整,跨平台兼容,节省开发时间;代价:资源消耗大,可能超出小型 MCU 承载能力。
综合评估各项指标:
以下为基于消息队列的主题通信简化实现:
// 初始化所有队列
void message_bus_init(void) {
attitude_queue = xQueueCreate(10, sizeof(attitude_msg_t));
gps_queue = xQueueCreate(5, sizeof(gps_msg_t));
if (!attitude_queue || !gps_queue) {
// 错误处理
}
}
// 发布姿态消息
bool publish_attitude(const attitude_msg_t *msg) {
return xQueueSend(attitude_queue, msg, portMAX_DELAY) == pdTRUE;
}
// 订阅并读取最新姿态
bool subscribe_attitude(attitude_msg_t *out) {
return xQueueReceive(attitude_queue, out, portMAX_DELAY) == pdTRUE;
}
// 初始化消息队列
void init_message_queues(void) {
// 创建用于GPS数据传输的队列,容量为5个GPS消息
gps_queue = xQueueCreate(5, sizeof(gps_msg_t));
// 创建传感器数据队列,支持20个传感器消息存储
sensor_queue = xQueueCreate(20, sizeof(sensor_msg_t));
}
xQueueCreate
// ============================================
// 姿态估计任务(发布者)
// ============================================
void attitude_estimator_task(void *pvParameters) {
attitude_msg_t msg;
for(;;) {
// 更新时间戳并计算当前姿态角
msg.timestamp = xTaskGetTickCount();
msg.roll = calculate_roll();
msg.pitch = calculate_pitch();
msg.yaw = calculate_yaw();
// 向所有订阅该消息的队列发送副本(非阻塞方式)
xQueueSend(attitude_queue_ctrl, &msg, 0);
xQueueSend(attitude_queue_oled, &msg, 0);
xQueueSend(attitude_queue_log, &msg, 0);
// 控制发送频率:每10ms执行一次,即100Hz
vTaskDelay(pdMS_TO_TICKS(10));
}
}
// ============================================
// 姿态控制任务(订阅者1)
// ============================================
void attitude_controller_task(void *pvParameters) {
attitude_msg_t msg;
for(;;) {
// 阻塞式接收姿态消息,确保获取最新数据
if (xQueueReceive(attitude_queue_ctrl, &msg, portMAX_DELAY) == pdTRUE) {
// 利用接收到的姿态信息进行飞行控制运算
control_attitude(msg.roll, msg.pitch, msg.yaw);
}
}
}
// ============================================
// OLED显示任务(订阅者2)
// ============================================
void oled_display_task(void *pvParameters) {
attitude_msg_t msg;
for(;;) {
// 非阻塞读取姿态队列,若无数据则立即返回
if (xQueueReceive(attitude_queue_oled, &msg, 0) == pdTRUE) {
// 将姿态数据显示在OLED屏幕上
display_attitude(msg.roll, msg.pitch, msg.yaw);
}
// 显示刷新周期设为100ms,即10Hz
vTaskDelay(pdMS_TO_TICKS(100));
}
}
// ============================================
// 多订阅者机制说明
// ============================================
问题描述:
上述初始设计中仅使用单一消息队列,导致一旦某个任务从队列中取走消息后,其他任务将无法再接收到该数据。这使得多个订阅者无法同时获得同一份发布信息,限制了系统的并发处理能力。
解决方案:
为解决此问题,采用“一对多”消息分发策略——为每个需要接收消息的任务分别创建独立的消息队列。发布者在生成数据后,向每一个订阅者的专属队列发送一份数据拷贝,从而实现多任务同时订阅同一主题的效果。
// 修改后的多订阅者队列定义
QueueHandle_t attitude_queue_ctrl; // 专供姿态控制器使用的队列
QueueHandle_t attitude_queue_oled; // OLED显示任务专用队列
QueueHandle_t attitude_queue_log; // 日志记录任务使用的队列
// 队列初始化函数
void init_message_queues(void) {
attitude_queue_ctrl = xQueueCreate(10, sizeof(attitude_msg_t)); // 控制器:较高实时性要求
attitude_queue_oled = xQueueCreate(5, sizeof(attitude_msg_t)); // 显示任务:更新较慢
attitude_queue_log = xQueueCreate(20, sizeof(attitude_msg_t)); // 日志任务:大缓存应对突发写入
}
// 发布者任务 - IMU
void imu_task(void *pvParameters) {
attitude_msg_t msg;
for(;;) {
// 获取姿态数据
get_attitude(&msg.roll, &msg.pitch, &msg.yaw);
// 更新事件标志:姿态已更新
xEventGroupSetBits(sensor_events, EVENT_ATTITUDE_UPDATED);
vTaskDelay(pdMS_TO_TICKS(10));
}
}
// ============================================
// 订阅者 1 - 姿态控制器
// ============================================
void attitude_controller_task(void *pvParameters) {
EventBits_t bits;
for(;;) {
// 等待姿态更新事件(阻塞方式)
bits = xEventGroupWaitBits(
sensor_events,
EVENT_ATTITUDE_UPDATED,
pdTRUE, // 清除事件位
pdFALSE, // 不需要所有位都置位
portMAX_DELAY
);
if (bits & EVENT_ATTITUDE_UPDATED) {
// 执行姿态控制逻辑
control_attitude_from_sensor();
}
}
}
// ============================================
// 订阅者 2 - OLED 显示
// ============================================
void oled_display_task(void *pvParameters) {
EventBits_t bits;
for(;;) {
// 超时等待姿态更新事件
bits = xEventGroupWaitBits(
sensor_events,
EVENT_ATTITUDE_UPDATED,
pdTRUE, // 清除事件位
pdFALSE, // OR 条件
pdMS_TO_TICKS(100)
);
if (bits & EVENT_ATTITUDE_UPDATED) {
update_oled_with_new_attitude();
}
// 周期性刷新显示
refresh_display();
}
}
// ============================================
// 订阅者 3 - 日志记录
// ============================================
void logger_task(void *pvParameters) {
EventBits_t bits;
for(;;) {
bits = xEventGroupWaitBits(
sensor_events,
EVENT_ATTITUDE_UPDATED,
pdTRUE,
pdFALSE,
portMAX_DELAY
);
if (bits & EVENT_ATTITUDE_UPDATED) {
log_current_attitude();
}
}
}
优点:
缺点:
适用场景:
事件组本身内存消耗:
xEventGroupCreate
整体架构内存分布:
发布者任务 事件组 订阅者任务
| [24 bits] |
|--- Set Bit 0 ---> [●○○○...] |
| | |
| |--- Wait Bits -------->|
结论:事件组方案在内存使用上具有显著优势,无论订阅者数量如何增长,核心事件存储空间保持恒定,无额外队列复制开销。
优点:
缺点:
适用场景:
单个队列所占内存:
队列控制块:~80 字节
队列存储空间:队列深度 × 消息大小
示例:
attitude_queue = xQueueCreate(10, sizeof(attitude_msg_t));
// attitude_msg_t = 16 字节(4 + 4 + 4 + 4)
// 总内存 = 80 + 10 × 16 = 240 字节
多订阅者情况下的总内存消耗:
3 个订阅者,每个队列深度 10:
总内存 = 3 × 240 = 720 字节
如果有 10 个订阅者:
总内存 = 10 × 240 = 2400 字节
结论:随着订阅者数量上升,内存浪费呈线性增长。每个队列均独立缓存完整消息副本,导致资源利用率急剧下降。
// ============================================
// IMU 数据发布任务
// ============================================
void imu_task(void *pvParameters) {
for(;;) {
// 获取 IMU 传感器数据
read_imu_data();
// 触发对应事件标志位
xEventGroupSetBits(sensor_events, EVENT_IMU_UPDATED);
// 延迟10毫秒,实现100Hz运行频率
vTaskDelay(pdMS_TO_TICKS(10));
}
}
xQueueCreate
// ============================================
// GPS 数据发布任务
// ============================================
void gps_task(void *pvParameters) {
for(;;) {
// 读取当前 GPS 信息
read_gps_data();
// 设置事件组中的 GPS 更新标志
xEventGroupSetBits(sensor_events, EVENT_GPS_UPDATED);
// 每100毫秒执行一次(10Hz)
vTaskDelay(pdMS_TO_TICKS(100));
}
}
// ============================================
// 姿态估计算法任务(等待单一事件)
// ============================================
void attitude_estimator_task(void *pvParameters) {
EventBits_t bits;
for(;;) {
// 阻塞等待 IMU 数据更新事件
bits = xEventGroupWaitBits(
sensor_events, // 使用的事件组
EVENT_IMU_UPDATED, // 目标事件位
pdTRUE, // 等待后自动清除标志位
pdFALSE, // 不要求所有位都置位(OR模式)
portMAX_DELAY // 无限期等待
);
if (bits & EVENT_IMU_UPDATED) {
// 检测到 IMU 更新,开始姿态解算
estimate_attitude();
}
}
}
// ============================================
// 多传感器融合任务(等待多个事件同时发生,AND模式)
// ============================================
void sensor_fusion_task(void *pvParameters) {
EventBits_t bits;
for(;;) {
// 等待 IMU 和磁力计均完成更新
bits = xEventGroupWaitBits(
sensor_events,
EVENT_IMU_UPDATED | EVENT_MAG_UPDATED, // 同时等待两个事件
pdTRUE, // 执行后清除事件位
pdTRUE, // 必须全部满足(AND条件)
portMAX_DELAY
);
if (bits & (EVENT_IMU_UPDATED | EVENT_MAG_UPDATED)) {
// 当两个传感器数据都已就绪,启动融合算法
sensor_fusion();
}
}
}
// ============================================
// 数据记录任务(监听任一传感器更新,OR模式)
// ============================================
void data_logger_task(void *pvParameters) {
EventBits_t bits;
for(;;) {
// 只要 GPS、IMU 或气压计任意一个更新即触发
bits = xEventGroupWaitBits(
sensor_events,
EVENT_GPS_UPDATED | EVENT_IMU_UPDATED | EVENT_BARO_UPDATED,
pdTRUE, // 处理后清除事件标志
pdFALSE, // 任意一个事件满足即可(OR逻辑)
portMAX_DELAY
);
// 分别判断哪个传感器发生了更新并记录
if (bits & EVENT_GPS_UPDATED) {
log_gps_data();
}
if (bits & EVENT_IMU_UPDATED) {
log_imu_data();
}
if (bits & EVENT_BARO_UPDATED) {
log_baro_data();
}
}
}
// ============================================
// 共享数据结构定义
// ============================================
typedef struct {
float roll;
float pitch;
float yaw;
} attitude_data_t;
// 全局变量用于共享姿态数据
attitude_data_t g_attitude_data;
// 互斥锁,确保对共享数据的安全访问
SemaphoreHandle_t g_attitude_mutex;
// 初始化函数:创建互斥量
void init_shared_data(void) {
g_attitude_mutex = xSemaphoreCreateMutex();
}
// ============================================
// 发布者任务:姿态估计与事件触发
// ============================================
void attitude_estimator_task(void *pvParameters) {
attitude_data_t local_data;
for(;;) {
// 执行姿态计算
local_data.roll = calculate_roll();
local_data.pitch = calculate_pitch();
local_data.yaw = calculate_yaw();
// 获取互斥锁,安全写入共享数据
xSemaphoreTake(g_attitude_mutex, portMAX_DELAY);
g_attitude_data = local_data;
xSemaphoreGive(g_attitude_mutex);
// 设置事件标志,通知其他任务数据已更新
xEventGroupSetBits(sensor_events, EVENT_ATTITUDE_UPDATED);
// 每10ms执行一次
vTaskDelay(pdMS_TO_TICKS(10));
}
}
// ============================================
// 订阅者任务:监听事件并使用最新姿态数据
// ============================================
void attitude_controller_task(void *pvParameters) {
EventBits_t bits;
attitude_data_t local_data;
for(;;) {
// 阻塞等待姿态更新事件(无限等待)
bits = xEventGroupWaitBits(
sensor_events,
EVENT_ATTITUDE_UPDATED,
pdTRUE, // 触发后清除标志位
pdFALSE, // 只需任一事件发生(OR逻辑)
portMAX_DELAY
);
if (bits & EVENT_ATTITUDE_UPDATED) {
// 加锁读取共享数据,防止竞争
xSemaphoreTake(g_attitude_mutex, portMAX_DELAY);
local_data = g_attitude_data;
xSemaphoreGive(g_attitude_mutex);
// 使用获取的姿态信息进行控制运算
control_attitude(local_data.roll, local_data.pitch, local_data.yaw);
}
}
}
xQueueCreate
// ============================================
// 显示任务:非阻塞轮询事件并刷新OLED
// ============================================
void oled_display_task(void *pvParameters) {
EventBits_t bits;
for(;;) {
// 非阻塞方式查询事件状态(超时时间为0)
bits = xEventGroupWaitBits(
sensor_events,
EVENT_ATTITUDE_UPDATED,
pdTRUE, // 等待成功后自动清除该位
pdFALSE, // OR模式,任意一位匹配即可
0 // 不等待,立即返回结果
);
if (bits & EVENT_ATTITUDE_UPDATED) {
// 收到更新通知,刷新显示屏内容
update_oled_display();
}
// 固定周期延时,实现约10Hz的刷新频率
vTaskDelay(pdMS_TO_TICKS(100));
}
}
// ============================================
// 传感器事件处理:检测气压计更新
// ============================================
if (bits & EVENT_BARO_UPDATED) {
log_baro_data();
}
// ============================================
// 方案说明:事件组结合共享数据机制
// ============================================
核心问题:
事件组本身仅能传递状态标志,无法携带实际数据。
解决方案:
采用“事件组 + 全局共享数据 + 互斥锁”的组合方式。
通过事件组实现高效的异步通知,利用全局变量传递具体数据,并借助互斥量保证多任务环境下的数据一致性与安全性。
发布者任务 订阅者任务1 订阅者任务2
| | |
|---> Queue1 ----------->| |
| |
|---> Queue2 ----------------------------------->|
// ============================================
// 优缺点分析
// ============================================
优势特点:
适合状态同步:适用于传感器数据就绪、系统运行状态变更等场景。
性能优异:事件操作的开销非常小,响应迅速。
事件组控制块:~8 字节
事件位:24 位(3 字节)
总内存:~8 字节(极小!)
消息队列(深度 10):~240 字节
事件组:~8 字节
节省:240 - 8 = 232 字节(96% 节省!)
结论:事件组在低功耗、小内存的嵌入式系统中表现突出,是一种高效的同步机制。
利用 FreeRTOS 提供的流缓冲区功能(Stream Buffer),实现连续数据流的可靠传输。
xStreamBufferCreate
架构示意图:
发布者任务 流缓冲区(FIFO) 订阅者任务
| [============] |
|--- Write --> [data data...] --- Read --->|
核心特性:
// ============================================
// 创建流缓冲区
// ============================================
StreamBufferHandle_t sensor_stream;
void init_stream(void) {
// 创建流缓冲区(大小 1024 字节,触发阈值 1 字节)
sensor_stream = xStreamBufferCreate(1024, 1);
}
// ============================================
// 发布者任务 - 传感器数据流
// ============================================
typedef struct {
uint32_t timestamp;
float accel_x;
float accel_y;
float accel_z;
float gyro_x;
float gyro_y;
float gyro_z;
} sensor_data_t;
void sensor_task(void *pvParameters) {
sensor_data_t data;
for(;;) {
// 读取传感器数据
data.timestamp = xTaskGetTickCount();
read_sensor(&data);
// 发布数据流(非阻塞)
size_t sent = xStreamBufferSend(
sensor_stream,
&data,
sizeof(data),
0 // 不等待
);
if(sent != sizeof(data)) {
// 缓冲区满,数据丢失
error_handler("Stream buffer full");
}
vTaskDelay(pdMS_TO_TICKS(10)); // 100Hz
}
}
// ============================================
// 订阅者任务 - 数据处理
// ============================================
void processing_task(void *pvParameters) {
sensor_data_t data;
for(;;) {
// 接收数据流(阻塞等待)
size_t received = xStreamBufferReceive(
sensor_stream,
&data,
sizeof(data),
portMAX_DELAY // 永久等待
);
if(received == sizeof(data)) {
// 处理传感器数据
process_sensor_data(&data);
}
}
}
// ============================================
// 从中断上下文发送数据(如 UART RX)
// ============================================
void UART_RX_IRQHandler(void) {
BaseType_t xHigherPriorityTaskWoken = pdFALSE;
uint8_t byte = UART_ReadByte();
// 从中断发送数据
xStreamBufferSendFromISR(
sensor_stream,
&byte,
1,
&xHigherPriorityTaskWoken
);
}
优点:
缺点:
适用场景:
通过自行设计一个完整的发布-订阅式消息总线系统,实现类似RT-Thread中uORB的功能,提供高内聚、低耦合的任务间通信方案。
系统架构如下所示:
消息总线(PubSub)
|
+------------------+------------------+
| | |
主题表 订阅者表 消息缓存
[attitude] [task1, task2] [最新消息]
[gps] [task3] [最新消息]
[sensor] [task1, task4] [最新消息]
| | |
+------------------+------------------+
|
+------------------+------------------+
| | |
发布者任务 订阅者任务1 订阅者任务2
核心数据结构说明:
// 主题信息结构体
typedef struct {
const char *name; // 主题名称
uint16_t msg_size; // 消息大小(字节)
void *last_msg; // 存储最新消息的缓存区
msg_metadata_t metadata; // 消息元数据(如时间戳、序列号)
subscriber_t subscribers[]; // 动态数组:存储所有订阅者
SemaphoreHandle_t mutex; // 互斥信号量,保障线程安全
} topic_t;
// 订阅者信息结构体
typedef struct {
TaskHandle_t task; // 关联的任务句柄
QueueHandle_t queue; // 可选的消息队列,用于异步接收
callback_t callback; // 可选的回调函数,事件触发时调用
} subscriber_t;
主要特性:
由于整体实现代码较长(约500行),以下分为头文件与源文件两部分展示。
头文件:pubsub.h
// ============================================
// pubsub.h - 发布订阅消息总线头文件
// ============================================
#ifndef PUBSUB_H
#define PUBSUB_H
#include "FreeRTOS.h"
#include "semphr.h"
#include "task.h"
#include "queue.h"
// ============================================
// 配置参数
// ============================================
#define MAX_SUBSCRIBERS 10 // 每个主题最多支持的订阅者数量
#define MAX_TOPICS 20 // 系统最大主题数量
// ============================================
// 消息元数据定义
// ============================================
typedef struct {
uint32_t timestamp; // 时间戳(基于系统tick)
uint32_t sequence; // 消息递增序列号
uint16_t size; // 实际数据大小
} msg_metadata_t;
// ============================================
// 订阅者回调函数类型定义
// ============================================
typedef void (*subscriber_callback_t)(const void *data, msg_metadata_t *meta);
// ============================================
// 订阅者信息结构体
// ============================================
typedef struct {
TaskHandle_t task; // 所属任务句柄
subscriber_callback_t callback; // 回调函数指针(可为空)
QueueHandle_t queue; // 消息队列句柄(可选)
} subscriber_t;
// ============================================
// 主题信息结构定义
// ============================================
typedef struct {
const char *name; // 主题的名称标识
uint16_t msg_size; // 单条消息所占内存大小
void *last_msg; // 缓存最近一次发布的消息内容
msg_metadata_t metadata; // 消息相关的元数据信息
subscriber_t subscribers[MAX_SUBSCRIBERS]; // 当前主题的所有订阅者数组
uint8_t subscriber_count; // 当前已注册的订阅者数量
SemaphoreHandle_t mutex; // 用于线程安全访问的互斥锁
uint8_t advertised; // 标记该主题是否已被发布声明
} topic_t;
// ============================================
// 订阅者结构体定义
// ============================================
typedef struct {
union {
QueueHandle_t queue; // 若使用队列方式接收,存储队列句柄
subscriber_callback_t callback; // 若使用回调方式,则保存回调函数指针
} endpoint;
uint8_t type; // 订阅类型:队列或回调
uint8_t active; // 指示该订阅者是否处于激活状态
} subscriber_t;
// ============================================
// 主题句柄类型别名
// ============================================
typedef topic_t* topic_handle_t;
// ============================================
// 核心API函数声明
// ============================================
/**
* @brief 初始化整个发布/订阅系统
* 需要在系统启动时调用,完成内部资源的初始化
*/
void pubsub_init(void);
/**
* @brief 声明一个可发布的主题(由发布者调用)
* @param name 主题唯一名称
* @param msg_size 消息数据结构的字节大小
* @return 成功返回主题句柄,失败则返回 NULL
*/
topic_handle_t pubsub_advertise(const char *name, uint16_t msg_size);
/**
* @brief 向指定主题发布一条消息
* @param topic 当前要发布的主题句柄
* @param data 指向待发送消息数据的常量指针
* @return 成功返回0,失败返回-1
*/
int pubsub_publish(topic_handle_t topic, const void *data);
/**
* @brief 以队列方式订阅某个主题
* @param name 主题名称
* @param queue_length 消息队列的最大长度
* @return 返回创建的消息队列句柄;若失败则返回 NULL
*/
QueueHandle_t pubsub_subscribe_queue(const char *name, uint8_t queue_length);
/**
* @brief 使用回调机制订阅某一主题
* @param name 要订阅的主题名称
* @param callback 用户定义的消息处理回调函数
* @return 成功返回0,失败返回-1
*/
int pubsub_subscribe_callback(const char *name, subscriber_callback_t callback);
/**
* @brief 非阻塞地获取某主题的最新消息(适用于轮询场景)
* @param name 主题名称
* @param data 输出参数:用于存放拷贝消息的缓冲区
* @param meta 可选参数:接收元数据信息,允许传入 NULL
* @return 成功返回0,失败返回-1
*/
int pubsub_copy(const char *name, void *data, msg_metadata_t *meta);
/**
* @brief 取消对某一主题的订阅
* @param name 要取消订阅的主题名称
* @return 成功返回0,失败返回-1
*/
int pubsub_unsubscribe(const char *name);
/**
* @brief 打印指定主题的运行统计信息(主要用于调试和监控)
* @param name 主题名称
*/
void pubsub_print_stats(const char *name);
#endif // PUBSUB_H
// ============================================
// pubsub.c - 发布订阅消息总线核心实现
// ============================================
#include "pubsub.h"
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
// ============================================
// 全局变量定义
// ============================================
static topic_t topics[MAX_TOPICS]; // 存储所有主题的数组
static uint8_t topic_count = 0; // 当前已注册的主题总数
static SemaphoreHandle_t global_mutex; // 用于线程安全操作的全局互斥锁
// ============================================
// 内部辅助函数
// ============================================
/**
* @brief 根据名称查找指定主题
*
* 遍历当前主题表,匹配给定的主题名称。
* 若找到则返回对应主题的指针,否则返回 NULL。
*/
static topic_t* find_topic(const char *name) {
for (uint8_t i = 0; i < topic_count; i++) {
if (strcmp(topics[i].name, name) == 0) {
return &topics[i];
}
}
return NULL;
}
// ============================================
// 核心 API 实现
// ============================================
/**
* @brief 初始化发布/订阅系统
*
* 创建全局互斥锁,并重置主题表及相关状态。
*/
void pubsub_init(void) {
global_mutex = xSemaphoreCreateMutex(); // 初始化保护共享资源的锁
memset(topics, 0, sizeof(topics)); // 清空主题存储区
topic_count = 0; // 重置主题计数器
}
/**
* @brief 注册一个新主题(发布者端调用)
*
* 若主题已存在,则返回现有句柄;否则创建新主题并初始化其各项参数。
*/
topic_handle_t pubsub_advertise(const char *name, uint16_t msg_size) {
xSemaphoreTake(global_mutex, portMAX_DELAY);
// 检查主题是否已经注册
topic_t *topic = find_topic(name);
if (topic != NULL) {
xSemaphoreGive(global_mutex);
return topic; // 已存在,直接返回
}
// 判断主题数量是否已达上限
if (topic_count >= MAX_TOPICS) {
xSemaphoreGive(global_mutex);
return NULL;
}
// 分配新的主题槽位
topic = &topics[topic_count++];
topic->name = name;
topic->msg_size = msg_size;
topic->last_msg = pvPortMalloc(msg_size); // 动态分配消息缓存空间
topic->mutex = xSemaphoreCreateMutex(); // 为该主题创建独立锁
topic->subscriber_count = 0;
topic->advertised = 1;
topic->metadata.sequence = 0;
topic->metadata.timestamp = 0;
topic->metadata.size = msg_size;
// 初始化订阅者列表和消息缓存
memset(topic->subscribers, 0, sizeof(topic->subscribers));
memset(topic->last_msg, 0, msg_size);
xSemaphoreGive(global_mutex);
return topic;
}
/**
* @brief 向指定主题发布数据
*
* 更新主题的元信息(时间戳、序列号),复制数据到缓存,
* 并通知所有订阅者有新消息到达。
*/
int pubsub_publish(topic_handle_t topic, const void *data) {
if (topic == NULL || data == NULL) {
return -1;
}
xSemaphoreTake(topic->mutex, portMAX_DELAY);
// 更新消息元数据
topic->metadata.timestamp = xTaskGetTickCount();
topic->metadata.sequence++;
// 将新消息写入缓存
memcpy(topic->last_msg, data, topic->msg_size);
// xQueueCreate
// 唤醒所有订阅者处理最新消息
for (int i = 0; i < topic->subscriber_count; i++) {
if (topic->subscribers[i] != NULL) {
xSemaphoreGive(topic->subscribers[i]);
}
}
xSemaphoreGive(topic->mutex);
return 0;
}
// 遍历所有订阅者并分发数据
for (uint8_t i = 0; i < topic->subscriber_count; i++) {
subscriber_t *sub = &topic->subscribers[i];
if (!sub->active) continue;
// 若订阅者使用队列接收,则将数据发送至其队列中(非阻塞方式)
if (sub->queue != NULL) {
xQueueSend(sub->queue, data, 0);
}
// 若订阅者设置了回调函数,则直接调用该函数进行通知
if (sub->callback != NULL) {
sub->callback(data, &topic->metadata);
}
}
xSemaphoreGive(topic->mutex);
return 0;
// 创建基于队列的订阅:返回一个队列句柄,用于接收指定主题的消息
QueueHandle_t pubsub_subscribe_queue(const char *name, uint8_t queue_length) {
xSemaphoreTake(global_mutex, portMAX_DELAY);
// 查找对应名称的主题
topic_t *topic = find_topic(name);
if (topic == NULL) {
xSemaphoreGive(global_mutex);
return NULL;
}
xSemaphoreTake(topic->mutex, portMAX_DELAY);
// 检查当前订阅者数量是否已达上限
if (topic->subscriber_count >= MAX_SUBSCRIBERS) {
xSemaphoreGive(topic->mutex);
xSemaphoreGive(global_mutex);
return NULL;
}
// 分配新的订阅者条目
subscriber_t *sub = &topic->subscribers[topic->subscriber_count++];
sub->task = xTaskGetCurrentTaskHandle();
sub->queue = xQueueCreate(queue_length, topic->msg_size);
sub->callback = NULL;
sub->active = 1;
QueueHandle_t queue = sub->queue;
xSemaphoreGive(topic->mutex);
xSemaphoreGive(global_mutex);
return queue;
}
// 注册基于回调的订阅:提供一个回调函数,在有新消息时被调用
int pubsub_subscribe_callback(const char *name, subscriber_callback_t callback) {
xSemaphoreTake(global_mutex, portMAX_DELAY);
// 定位目标主题
topic_t *topic = find_topic(name);
if (topic == NULL) {
xSemaphoreGive(global_mutex);
return -1;
}
xSemaphoreTake(topic->mutex, portMAX_DELAY);
// 确保未超过最大订阅者限制
if (topic->subscriber_count >= MAX_SUBSCRIBERS) {
xSemaphoreGive(topic->mutex);
xSemaphoreGive(global_mutex);
return -1;
}
// 添加新的回调型订阅者
subscriber_t *sub = &topic->subscribers[topic->subscriber_count++];
sub->task = xTaskGetCurrentTaskHandle();
sub->queue = NULL;
sub->callback = callback;
sub->active = 1;
xSemaphoreGive(topic->mutex);
xSemaphoreGive(global_mutex);
return 0;
}
// 从指定主题复制最新消息到用户提供的缓冲区
int pubsub_copy(const char *name, void *data, msg_metadata_t *meta) {
xSemaphoreTake(global_mutex, portMAX_DELAY);
// 获取主题信息
topic_t *topic = find_topic(name);
if (topic == NULL || !topic->advertised) {
xSemaphoreTake(global_mutex, portMAX_DELAY);
// 查找主题
topic_t *topic = find_topic(name);
if (topic == NULL) {
xSemaphoreGive(global_mutex);
return -1;
}
xSemaphoreTake(topic->mutex, portMAX_DELAY);
// 拷贝最新消息内容
memcpy(data, topic->last_msg, topic->msg_size);
// 若需获取元数据,则一并拷贝
if (meta != NULL) {
memcpy(meta, &topic->metadata, sizeof(msg_metadata_t));
}
xSemaphoreGive(topic->mutex);
xSemaphoreGive(global_mutex);
return 0;
/**
* 取消订阅指定主题
* @param name 主题名称
* @return 成功返回0,失败返回-1
*/
int pubsub_unsubscribe(const char *name) {
xSemaphoreTake(global_mutex, portMAX_DELAY);
// 定位对应的主题
topic_t *topic = find_topic(name);
if (topic == NULL) {
xSemaphoreGive(global_mutex);
return -1;
}
xSemaphoreTake(topic->mutex, portMAX_DELAY);
TaskHandle_t current_task = xTaskGetCurrentTaskHandle();
// 遍历订阅者列表,查找当前任务
for (uint8_t i = 0; i < topic->subscriber_count; i++) {
if (topic->subscribers[i].task == current_task) {
// 若存在关联队列,则删除
if (topic->subscribers[i].queue != NULL) {
vQueueDelete(topic->subscribers[i].queue);
}
// 标记该订阅为非激活状态
topic->subscribers[i].active = 0;
xSemaphoreGive(topic->mutex);
xSemaphoreGive(global_mutex);
return 0;
}
}
xSemaphoreGive(topic->mutex);
xSemaphoreGive(global_mutex);
return -1;
}
/**
* 打印指定主题的统计信息
* @param name 主题名称
*/
void pubsub_print_stats(const char *name) {
xSemaphoreTake(global_mutex, portMAX_DELAY);
topic_t *topic = find_topic(name);
if (topic == NULL) {
printf("Topic '%s' not found\n", name);
xSemaphoreGive(global_mutex);
return;
}
xSemaphoreTake(topic->mutex, portMAX_DELAY);
printf("=== Topic: %s ===\n", topic->name);
printf("Message size: %u bytes\n", topic->msg_size);
printf("Sequence: %u\n", topic->metadata.sequence);
printf("Last update: %u ticks\n", topic->metadata.timestamp);
printf("Subscribers: %u\n", topic->subscriber_count);
for (uint8_t i = 0; i < topic->subscriber_count; i++) {
if (topic->subscribers[i].active) {
printf(" [%u] Task: %p, Queue: %p, Callback: %p\n",
i,
topic->subscribers[i].task,
topic->subscribers[i].queue,
topic->subscribers[i].callback);
}
}
xSemaphoreGive(topic->mutex);
xSemaphoreGive(global_mutex);
}
xQueueCreate
// ============================================ // 定义自定义消息结构体 // ============================================
typedef struct {
float roll;
float pitch;
float yaw;
} attitude_msg_t;
// ============================================
// 姿态估计任务 - 发布者
// ============================================
void attitude_estimator_task(void *pvParameters) {
topic_handle_t attitude_topic = pubsub_advertise("attitude", sizeof(attitude_msg_t));
attitude_msg_t msg;
for(;;) {
// 获取当前姿态角数据
msg.roll = calculate_roll();
msg.pitch = calculate_pitch();
msg.yaw = calculate_yaw();
// 向主题发布更新后的姿态信息
pubsub_publish(attitude_topic, &msg);
// 控制发布频率为100Hz(每10ms一次)
vTaskDelay(pdMS_TO_TICKS(10));
}
}
// ============================================
// 姿态控制任务 - 队列订阅方式
// ============================================
void attitude_controller_task(void *pvParameters) {
// 创建队列订阅,缓冲深度设为5条消息
QueueHandle_t queue = pubsub_subscribe_queue("attitude", 5);
attitude_msg_t msg;
for(;;) {
// 从队列中接收姿态数据(永久阻塞等待新消息)
if (xQueueReceive(queue, &msg, portMAX_DELAY) == pdTRUE) {
// 利用接收到的姿态进行控制运算
control_attitude(msg.roll, msg.pitch, msg.yaw);
}
}
}
// ============================================
// 日志记录任务 - 回调订阅方式
// ============================================
void logger_task(void *pvParameters) {
// 注册回调函数以监听"attitude"主题
pubsub_subscribe_callback("attitude", attitude_callback);
for(;;) {
// 主任务可执行其他操作或休眠
// 消息到达时会自动触发回调处理
vTaskDelay(pdMS_TO_TICKS(1000));
}
}
// ============================================
// 回调处理函数 - 接收并记录姿态数据
// ============================================
void attitude_callback(const void *data, msg_metadata_t *meta) {
const attitude_msg_t *msg = (const attitude_msg_t *)data;
// 输出姿态值到日志系统
log_attitude(msg->roll, msg->pitch, msg->yaw);
// 打印附加的元信息
printf("Seq: %u, Time: %u ms\n",
meta->sequence,
meta->timestamp);
}
// ============================================
// OLED显示任务 - 轮询订阅方式
// ============================================
void oled_display_task(void *pvParameters) {
attitude_msg_t msg;
msg_metadata_t meta;
for(;;) {
// 尝试复制最新发布的姿态数据(非阻塞调用)
if (pubsub_copy("attitude", &msg, &meta) == 0) {
// 更新显示屏上的姿态角度
display_attitude(msg.roll, msg.pitch, msg.yaw);
// 展示消息序列号等元数据
display_sequence(meta.sequence);
}
// 每隔一段时间尝试获取一次最新状态
vTaskDelay(pdMS_TO_TICKS(100)); // 示例:10Hz刷新率
}
}
// ============================================xQueueCreate
QueueHandle_t att_queue = pubsub_subscribe_queue("attitude", 5);
attitude_msg_t msg;
for(;;) {
if(xQueueReceive(att_queue, &msg, portMAX_DELAY) == pdTRUE) {
control_attitude(msg.roll, msg.pitch, msg.yaw);
}
}
pubsub_copy发布消息性能表现:
pubsub_publish() 执行时间:
- 互斥锁操作:~50 cycles
- 内存拷贝:~100 cycles(16 字节消息)
- 队列发送(3 个订阅者):~150 cycles
- 总计:~300 cycles
在 168MHz STM32F407 上:
300 cycles ≈ 1.8 微秒
订阅消息处理性能:
队列方式:
- xQueueReceive():~50 cycles
- 总计:~50 cycles ≈ 0.3 微秒
轮询方式:
- pubsub_copy():~150 cycles
- 总计:~150 cycles ≈ 0.9 微秒
整体内存占用情况:
主题表(20 个主题):
- topic_t × 20 ≈ 2KB
每个主题:
- 消息缓存:msg_size(如 16 字节)
- 订阅者列表:10 × 20 字节 = 200 字节
- 互斥锁:80 字节
- 总计:~300 字节 + msg_size
每个订阅者(队列方式):
- 队列控制块:80 字节
- 队列存储:queue_depth × msg_size(如 5 × 16 = 80 字节)
- 总计:~160 字节
示例(3 个主题,每个 3 个订阅者):
- 主题表:3 × 300 = 900 字节
- 订阅者队列:9 × 160 = 1440 字节
- 总计:~2.3KB
简介:
MIcroROS 是 ROS 2 针对微控制器推出的轻量化版本,内建完整的 DDS(Data Distribution Service)发布-订阅通信框架,专为资源受限设备优化。
官方网站:https://micro.ros.org/
主要特性:
示例代码:
#include <rcl/rcl.h>
#include <std_msgs/msg/int32.h>
#include <geometry_msgs/msg/twist.h>
// ============================================
// 发布者任务
// ============================================
rcl_publisher_t publisher;
std_msgs__msg__Int32 msg;
void publisher_task(void *pvParameters) {
// 初始化发布者实例
rcl_publisher_init(&publisher, &node,
ROSIDL_GET_MSG_TYPE_SUPPORT(std_msgs, msg, Int32),
"topic_name", &publisher_ops);
for(;;) {
msg.data = 42;
rcl_publish(&publisher, &msg, NULL);
vTaskDelay(pdMS_TO_TICKS(100));
}
}
// ============================================
// 订阅者回调函数
// ============================================
void subscription_callback(const void *msgin) {
const std_msgs__msg__Int32 *msg = (const std_msgs__msg__Int32 *)msgin;
printf("Received: %d\n", msg->data);
}
rcl_subscription_t subscription;
void subscriber_task(void *pvParameters) {
// 初始化订阅者
rcl_subscription_init(&subscription, &node,
ROSIDL_GET_MSG_TYPE_SUPPORT(std_msgs, msg, Int32),
"topic_name", &subscription_ops);
for(;;) {
rcl_wait_set_t wait_set = rcl_get_zero_initialized_wait_set();
rcl_wait(&wait_set, RCL_MS_TO_NS(100));
if(rcl_take(&subscription, &msg, NULL) == RCL_RET_OK) {
subscription_callback(&msg);
}
}
}
简介:MQTT 客户端库,适用于本地消息总线或网络通信场景。
GitHub:https://github.com/knolleary/pubsubclient
示例代码:
#include <PubSubClient.h>
PubSubClient client;
// ============================================
// 发布
// ============================================
void publish_attitude(float roll, float pitch, float yaw) {
char payload[64];
snprintf(payload, sizeof(payload), "%.2f,%.2f,%.2f", roll, pitch, yaw);
client.publish("attitude", (uint8_t*)payload, strlen(payload));
}
// ============================================
// 订阅
// ============================================
void callback(char* topic, byte* payload, unsigned int length) {
if(strcmp(topic, "attitude") == 0) {
// 解析数据
float roll, pitch, yaw;
sscanf((char*)payload, "%f,%f,%f", &roll, &pitch, &yaw);
// 处理数据
control_attitude(roll, pitch, yaw);
}
}
void setup() {
client.setCallback(callback);
client.subscribe("attitude");
}
优点:
缺点:
适用场景:
优势特点:
局限性:
典型应用场景:
简介:采用 Protocol Buffers 进行数据序列化,并结合自定义消息总线实现高效通信。
GitHub:https://github.com/nanopb/nanopb
示例代码:
// attitude.proto
syntax = "proto3";
message Attitude {
float roll = 1;
float pitch = 2;
float yaw = 3;
uint32 timestamp = 4;
}
#include "attitude.pb.h"
#include "pb_encode.h"
#include "pb_decode.h"
// ============================================
// 发布
// ============================================
void publish_attitude(float roll, float pitch, float yaw) {
Attitude msg = Attitude_init_zero;
msg.roll = roll;
msg.pitch = pitch;
msg.yaw = yaw;
msg.timestamp = xTaskGetTickCount();
uint8_t buffer[128];
pb_ostream_t stream = pb_ostream_from_buffer(buffer, sizeof(buffer));
pb_encode(&stream, Attitude_fields, &msg);
// 发布到消息总线
pubsub_publish("attitude", buffer, stream.bytes_written);
}
// ============================================
// 订阅
// ============================================
// ============================================
void attitude_callback(const void *data, size_t size) {
Attitude msg = Attitude_init_zero;
pb_istream_t stream = pb_istream_from_buffer(data, size);
if (pb_decode(&stream, Attitude_fields, &msg)) {
control_attitude(msg.roll, msg.pitch, msg.yaw);
}
}
| 方案 | 发布时间(cycles) | 订阅时间(cycles) | 总延迟(cycles / μs) |
|---|---|---|---|
| 消息队列 | ~50 | ~50 | ~100 cycles (0.6 μs) |
| 事件组 | ~30 | ~40 | ~70 cycles (0.4 μs) |
| 流缓冲区 | ~60 | ~60 | ~120 cycles (0.7 μs) |
| 自实现 PubSub | ~300 | ~50 | ~350 cycles (2.1 μs) |
| MicroROS | ~5000 | ~5000 | ~10000 cycles (60 μs) |
测试环境:STM32F407 @ 168MHz,传输消息长度为 16 字节
| 方案 | RAM(单主题) | Flash 占用 | 备注说明 |
|---|---|---|---|
| 消息队列 | ~240 字节 | ~0.5KB | 每个订阅者独立分配一个队列 |
| 事件组 | ~8 字节 | ~0.3KB | 仅用于传递状态标志 |
| 流缓冲区 | ~1KB | ~0.5KB | 缓冲区大小可灵活配置 |
| 自实现 PubSub | ~500 字节 | ~3KB | 包含主题注册与管理功能 |
| MicroROS | ~64KB | ~128KB | 集成完整的 ROS 2 协议栈 |
测试条件:运行 10 个发布主题,每个主题有 3 个订阅者,发布频率为 100Hz
| 方案 | CPU 占用率 | 备注 |
|---|---|---|
| 消息队列 | ~0.5% | 资源消耗极低 |
| 事件组 | ~0.3% | 最低开销 |
| 流缓冲区 | ~0.6% | 整体较低 |
| 自实现 PubSub | ~1.5% | 中等水平 |
| MicroROS | ~5% | 开销相对较高 |
| 应用场景 | 推荐方案 | 选择依据 |
|---|---|---|
| 飞控系统 | 自实现 PubSub | 解耦性强、支持多订阅者、实时性能优异 |
| IoT 终端设备 | 消息队列 | 实现简单、资源消耗低 |
| 机器人平台 | MicroROS | 原生支持 ROS 2 生态体系 |
| 工业自动化控制 | 自实现 PubSub | 可靠性高、具备良好实时响应能力 |
| 状态同步 | 事件组 | 轻量化设计,适合标志位传递 |
| 连续数据流处理 | 流缓冲区 | 高效传输,适用于持续数据流 |
flight_controller/
├── pubsub.h # 消息总线头文件
├── pubsub.c # 消息总线实现
├── messages.h # 消息定义
├── imu_task.c # IMU 任务
├── attitude_est.c # 姿态估计任务
├── attitude_ctrl.c # 姿态控制任务
├── logger.c # 日志任务
└── main.c # 主函数
#ifndef MESSAGES_H
#define MESSAGES_H
#include <stdint.h>
// IMU 数据结构
typedef struct {
float accel_x;
float accel_y;
float accel_z;
float gyro_x;
float gyro_y;
float gyro_z;
} imu_msg_t;
// 姿态信息结构
typedef struct {
float roll;
float pitch;
float yaw;
} attitude_msg_t;
// GPS 定位数据结构
typedef struct {
double lat;
double lon;
float alt;
float speed;
} gps_msg_t;
// 控制指令结构
typedef struct {
typedef struct {
float throttle;
float roll_cmd;
float pitch_cmd;
float yaw_cmd;
} control_msg_t;
#endif
main.c:
#include "FreeRTOS.h"
#include "task.h"
#include "pubsub.h"
#include "messages.h"
// 任务函数声明
void imu_task(void *pvParameters);
void attitude_estimator_task(void *pvParameters);
void attitude_controller_task(void *pvParameters);
void logger_task(void *pvParameters);
int main(void) {
// 初始化底层硬件
HAL_Init();
SystemClock_Config();
// 消息通信机制初始化
pubsub_init();
// 创建系统任务,分配栈空间与优先级
xTaskCreate(imu_task, "IMU", 512, NULL, 4, NULL);
xTaskCreate(attitude_estimator_task, "ATT_EST", 512, NULL, 3, NULL);
xTaskCreate(attitude_controller_task, "ATT_CTRL", 512, NULL, 2, NULL);
xTaskCreate(logger_task, "LOGGER", 1024, NULL, 1, NULL);
// 启动任务调度
vTaskStartScheduler();
// 防止程序跑出主循环
for(;;);
}
FreeRTOS 内核并未内置消息总线功能,开发者需自行设计通信架构。对于功能较为简单的应用,可采用 FreeRTOS 自带的消息队列进行模块间通信;而在模块较多、交互复杂的项目中,推荐使用自定义的发布-订阅(PubSub)机制。
自主实现的 PubSub 方案在解耦程度、运行效率和代码维护性之间达到了良好平衡,特别适用于对实时性和结构清晰度有较高要求的系统。相比之下,事件组更适合用于多任务间的同步控制,而不适合传递实际数据内容。
若项目属于机器人领域且资源充足,MicroROS 是一个强大选择,但其对内存和处理器性能消耗较大,不适合资源受限的场景。
针对飞控类系统:
面向通用嵌入式项目:
| 项目规模 | 推荐通信方案 |
|---|---|
| 小型(< 5 模块) | 消息队列 |
| 中型(5-15 模块) | 自实现发布-订阅(PubSub) |
| 大型(> 15 模块) | MicroROS |
扫码加好友,拉您进群



收藏
