事件模型是 VLink 三种通信模型之一,适用于**多对多**的异步消息发布与订阅场景。 多个发布者(Publisher)可以向同一个命名主题发布消息,所有订阅了该主题的订阅者 (Subscriber)都会异步收到消息副本。本章介绍 Event 模型的专属 API;Node 基类的通用 API(init / deinit / attach / set_property 等)请参阅 节点基类与生命周期。
目录
- 概念与架构
- 主题命名规则与 URL 中的 transport
- 消息类型支持
- Publisher API
- Subscriber API
- QoS 配置
- 完整使用示例
- 多订阅者场景
- 内存管理注意事项
- 性能调优建议
概念与架构
事件模型数据流
事件模型数据流
多订阅者扇出模式
多订阅者模式
关键特性
- **多对多**:多个 Publisher 可向同一主题发布,任意数量的 Subscriber 均可接收。
- **无历史保留(默认)**:消息发出后 Publisher 不缓存;是否可被后续订阅者看见取决于 QoS Durability。
- **编译期类型安全**:MsgT 通过模板参数固定,Serializer::get_type_of<MsgT>() 编译期推导编解码器。
- **传输切换**:更换 URL 前缀即可切换后端,业务代码无需改动。
- **异步回调**:Subscriber 的 listen() 注册后由传输层驱动回调,不阻塞发布方。
与方法模型、字段模型的区别
| 维度 | 事件模型(Event) | 方法模型(Method) | 字段模型(Field) |
| 通信方向 | 单向(Publisher -> Subscriber) | 双向(Client <-> Server)* | 双向(Setter <-> Getter) |
| 响应 | 无 | 有(请求/响应) | 有(最新值同步) |
| 消费者数量 | 多对多 | N:1(多Client对一Server) | 多对多 |
| 历史值保留 | 取决于 Durability QoS | 不适用 | 始终保留最新值 |
| 典型用途 | 传感器数据流、状态广播 | RPC 调用、服务请求 | 参数同步、配置下发 |
*注:方法模型的 fire-and-forget 模式(RespT 为 EmptyType 时)为单向通信(Client -> Server),无响应。
主题命名规则与 URL 中的 transport
URL 格式
<transport>://<topic_path>[?<query_params>]
支持的 Transport
稳定后端(推荐用于生产环境):
| Transport | 传输后端 | 通信范围 | 零拷贝 | 状态 |
| intra:// | 内置消息队列 | 进程内 | 是 ^1^ | 稳定 |
| shm:// | Iceoryx RouDi | 同机跨进程 | 是 ^2^ | 稳定 |
| dds:// | Fast-DDS RTPS | 跨机器 | 否 | 稳定 |
| ddsc:// | CycloneDDS | 跨机器 | 否 | 稳定 |
^1^ intra:// 的零拷贝通过 shared_ptr<IntraData> 子类实现(引用计数共享指针传递),无序列化开销。 ^2^ shm:// / shm2:// 的零拷贝通过 loan() / return_loan() 接口实现(共享内存借贷缓冲区),详见 节点基类与生命周期 – 零拷贝借贷。
Beta 后端(实验性,API 可能变化):
| Transport | 传输后端 | 通信范围 | 零拷贝 | 状态 |
| shm2:// | Iceoryx2 | 同机跨进程 | 是 | Beta |
| ddsr:// | RTI DDS | 跨机器 | 否 | Beta |
| ddst:// | TravoDDS(国产 DDS) | 跨机器 | 否 | Beta |
| zenoh:// | Zenoh | 跨机/云边 | 否 | Beta |
| someip:// | vsomeip | 车载以太网 | 否 | Beta |
| mqtt:// | MQTT | 跨机/物联网 | 否 | Beta |
| fdbus:// | FDBus IPC | 同机 | 否 | Beta |
| qnx:// | QNX IPC | 同机(QNX) | 否 | Beta |
主题路径规则
- 路径分隔符使用 /,例如 dds://vehicle/chassis/speed
- 同一传输后端下,Publisher 和 Subscriber 的 topic_path 必须完全一致才能匹配
- 跨传输后端不互通(dds://my_topic 与 ddsc://my_topic 是不同的通道)
查询参数(以 DDS 为例)
dds://vehicle/speed?domain=1&depth=10&qos=sensor
| 参数 | 说明 |
| domain | DDS Domain ID,默认先读取 VLINK_DDS_DOMAIN,未设置时为 0,并可通过 ?domain= 显式覆盖 |
| depth | DDS 历史深度,覆盖 QoS 的 history.depth 设置 |
| qos | 命名 QoS profile,需提前调用 DdsConf::register_qos() 注册 |
消息类型支持
VLink 通过 Serializer::get_type_of<T>() 在编译期自动推导序列化方式。共 14 种类型(含 kUnknownType)—— 详见 序列化。事件模型常用的类型:
| 类别 | 类型示例 | 序列化器 (值) |
| 原始字节 | vlink::Bytes | kBytesType (1) |
| 动态类型 | 含 is_vlink_dynamic_data() 成员的类 | kDynamicType (2) |
| 自定义 | 实现 operator>>(Bytes&) / operator<<(const Bytes&) | kCustomType (3) |
| CDR(DDS 专用) | MyCdrType(含 serialize/deserialize(Cdr&)) | kCdrType (4) |
| Protobuf 消息 | 继承 MessageLite(有 SerializeToArray) | kProtoType (5) |
| Protobuf 指针 | MyProto*(Arena 管理) | kProtoPtrType (6) |
| FlatBuffers 表 | MyTableT(NativeTable) | kFlatTableType (7) |
| FlatBuffers 指针 | const MyTable*(Subscriber 侧零拷贝读) | kFlatPtrType (8) |
| FlatBuffers 构建 | 含 fbb_ + Finish() 的结构 | kFlatBuilderType (9) |
| 字符串 | std::string | kStringType (10) |
| C 字符串 | const char* / 字符串字面量 | kCharsType (11) |
| 流序列化 | 支持 stringstream << / >> 且非更高优先类型 | kStreamType (12) |
| 标准布局(POD) | is_trivial && is_standard_layout 的 struct | kStandardType (13) |
| POD 指针 | 指向 trivial + standard_layout 类型的指针 | kStandardPtrType (14) |
注:CDR 仅在 DDS 系列后端有效,且**不支持**消息层加密。intra:// 下若 MsgT 的 element_type 继承 IntraDataType(由 VLINK_INTRA_DATA_DECLARE 生成),走 shared_ptr 零拷贝路径,不做序列化。
Publisher API
类模板声明
template <typename MsgT, SecurityType SecT = SecurityType::kWithoutSecurity>
class Publisher : public Node<PublisherImpl, SecT>;
Publisher<MsgT, SecT> 继承自 Node<PublisherImpl, SecT>,同时拥有 Node 基类 的所有通用 API 和 Publisher 专有的发布相关 API。
工厂方法
[[nodiscard]] static UniquePtr create_unique(const std::string& url_str,
InitType type = InitType::kWithInit);
[[nodiscard]] static SharedPtr create_shared(const std::string& url_str,
InitType type = InitType::kWithInit);
构造函数
explicit Publisher(const std::string& url_str,
InitType type = InitType::kWithInit);
template <typename ConfT>
explicit Publisher(const ConfT& conf,
InitType type = InitType::kWithInit);
InitType::kWithInit(默认)表示构造时立即调用 init(); InitType::kWithoutInit 表示延迟初始化,可在 init() 前调用配置方法。
发布方法
bool publish(const MsgT& msg, bool force = false);
bool publish_fbb(const void* fbb, bool force = false);
订阅者感知
void detect_subscribers(ConnectCallback&& callback);
bool wait_for_subscribers(std::chrono::milliseconds timeout = Timeout::kDefaultInterval);
[[nodiscard]] bool has_subscribers() const;
角色切换
继承自 Node 的公共 API
Node 基类继承的公共 API(init / deinit / attach / interrupt / set_security_key 等)请参阅 节点基类与生命周期。
Subscriber API
类模板声明
template <typename MsgT, SecurityType SecT = SecurityType::kWithoutSecurity>
class Subscriber : public Node<SubscriberImpl, SecT>;
工厂方法
[[nodiscard]] static UniquePtr create_unique(const std::string& url_str,
InitType type = InitType::kWithInit);
[[nodiscard]] static SharedPtr create_shared(const std::string& url_str,
InitType type = InitType::kWithInit);
构造函数
explicit Subscriber(const std::string& url_str,
InitType type = InitType::kWithInit);
template <typename ConfT>
explicit Subscriber(const ConfT& conf,
InitType type = InitType::kWithInit);
订阅方法
bool listen(MsgCallback&& callback);
零拷贝相关
void set_manual_unloan(bool manual_unloan) override;
延迟与丢样统计
void set_latency_and_lost_enabled(bool enable);
[[nodiscard]] bool is_latency_and_lost_enabled() const;
[[nodiscard]] int64_t get_latency() const;
[[nodiscard]] SampleLostInfo get_lost() const;
角色切换
继承自 Node 的公共 API
Node 基类继承的公共 API(init / deinit / attach / interrupt / set_security_key 等)请参阅 节点基类与生命周期。
QoS 配置
QoS(Quality of Service,服务质量)控制消息的可靠性、历史深度、持久化策略等。
设置方式
QoS 通过 URL 查询参数或传输配置对象(Conf)设置,Node 上不存在 set_qos() 方法。
方式一:通过 URL 查询参数
Type-safe publisher for the VLink event communication model.
定义 publisher.h:102
方式二:通过 Qos 对象注册命名 Profile 后在 Conf 中引用
vlink::DdsConf::register_qos("my_profile", my_qos);
vlink::DdsConf conf("my_topic");
conf.qos = "my_profile";
Transport configuration for the dds:// Fast-DDS RTPS backend.
Quality of Service (QoS) policy aggregate for VLink publishers and subscribers.
Kind kind
Durability kind.
定义 qos.h:145
@ kVolatile
No persistence beyond the DataWriter lifetime.
定义 qos.h:139
@ kKeepLast
Keep only the depth most recent samples.
定义 qos.h:118
int32_t depth
Number of samples to keep per instance (KeepLast only).
定义 qos.h:123
Kind kind
History retention kind.
定义 qos.h:122
Kind kind
Publish mode.
定义 qos.h:163
@ kASync
Asynchronous publish via background thread.
定义 qos.h:160
Kind kind
Delivery guarantee kind.
定义 qos.h:102
@ kReliable
Retransmit until acknowledged.
定义 qos.h:99
Aggregate Quality of Service policy for a VLink communication endpoint.
定义 qos.h:86
Durability durability
Sample persistence policy.
定义 qos.h:308
Reliability reliability
Delivery guarantee policy.
定义 qos.h:306
History history
Sample retention policy.
定义 qos.h:307
PublishMode publish_mode
Synchronous or asynchronous publishing.
定义 qos.h:309
方式三:使用预定义 QoS Profile
vlink::DdsConf::register_qos("sensor", vlink::QosProfile::kSensor);
vlink::DdsConf conf("sensor/data");
conf.qos = "sensor";
Pre-defined QoS profiles for common VLink communication patterns.
常用预定义 Profile
以下摘自 include/vlink/extension/qos_profile.h,共 13 个 QosProfile::k*;下表只列常用 7 个:
| Profile | Reliability | History | Durability | PubMode | 适用场景 |
| QosProfile::kEvent | Reliable | KeepLast(10) | Volatile | Sync | 离散控制事件 |
| QosProfile::kSensor | BestEffort | KeepLast(20) | Volatile | ASync | 高频传感器数据 |
| QosProfile::kField | Reliable | KeepLast(1) | TransientLocal | Sync | 最新值状态同步 |
| QosProfile::kParameter | Reliable | KeepLast(1000) | Volatile | Sync | 配置参数 |
| QosProfile::kLight | Reliable | KeepLast(1) | Volatile | ASync | 轻量快速消息 |
| QosProfile::kBest | Reliable | KeepLast(200) | Volatile | Sync | 高吞吐可靠传输 |
| QosProfile::kLarge | Reliable | KeepLast(500) | Volatile | Sync | 大负载传输 |
QoS 对 DDS 系列(dds://、ddsc://、ddsr://、ddst://)和 zenoh:// 有较完整支持;其他后端忽略不支持的字段。
完整 13 个 Profile、Qos 字段含义、兼容规则见 08-qos.md。
完整使用示例
示例一:基础 Protobuf 发布/订阅
#include "sensor.pb.h"
#include <chrono>
#include <thread>
using namespace std::chrono_literals;
void subscriber_main() {
sub.listen([](const sensor::SensorData& msg) {
std::cout << "[Sub] ts=" << msg.timestamp()
<< " value=" << msg.value()
<< " unit=" << msg.unit() << std::endl;
});
std::this_thread::sleep_for(60s);
}
void publisher_main() {
if (!pub.wait_for_subscribers(5s)) {
std::cerr << "No subscribers found within timeout." << std::endl;
return;
}
for (int i = 0; i < 100; ++i) {
sensor::SensorData msg;
msg.set_timestamp(i);
msg.set_value(25.0 + i * 0.1);
msg.set_unit("celsius");
if (!pub.publish(msg)) {
std::cerr << "publish failed at " << i << std::endl;
}
std::this_thread::sleep_for(100ms);
}
}
Type-safe subscriber for the VLink event communication model.
定义 subscriber.h:110
示例二:使用 MessageLoop 绑定(单线程模型)
#include "sensor.pb.h"
int main() {
sub.attach(&loop);
sub.listen([](const sensor::SensorData& msg) {
std::cout << "[Sub] value=" << msg.value() << std::endl;
});
int counter = 0;
timer.
start([&pub, &counter]() {
sensor::SensorData msg;
msg.set_timestamp(++counter);
msg.set_value(20.0 + counter % 10);
msg.set_unit("celsius");
pub.publish(msg);
});
return 0;
}
Single-threaded serial task dispatcher with integrated timer support.
定义 message_loop.h:106
bool run()
Runs the message loop on the calling thread (blocking).
bool quit(bool force=false)
Requests the loop to exit cleanly.
Event-loop-driven repeating or one-shot timer.
定义 timer.h:81
void set_interval(uint32_t interval_ms)
Changes the tick interval.
bool attach(class MessageLoop *message_loop)
Attaches the timer to a MessageLoop.
void start(Callback &&callback=nullptr)
Arms and starts the timer.
void set_loop_count(int32_t loop_count)
Changes the total number of ticks.
static constexpr int kInfinite
Sentinel loop count meaning repeat indefinitely.
定义 timer.h:91
Single-threaded event loop with three queue types, timer management and task scheduling.
VLINK_EXPORT void register_terminate_signal(std::function< void(int)> &&callback, bool is_async=false, bool pass_through=false) noexcept
Registers a callback for graceful termination signals (SIGTERM, SIGINT, etc.).
Event-loop-driven periodic/one-shot timer with configurable priority.
Platform-agnostic system utilities for process, thread, network and signal management.
示例三:POD 结构体发布(零序列化开销)
struct ImuData {
int64_t timestamp_us;
float accel_x, accel_y, accel_z;
float gyro_x, gyro_y, gyro_z;
};
ImuData imu{};
imu.timestamp_us = 12345678;
imu.accel_x = 0.1f;
pub.publish(imu);
sub.listen([](const ImuData& data) {
printf("IMU: ax=%.3f ay=%.3f az=%.3f\n",
data.accel_x, data.accel_y, data.accel_z);
});
示例四:零拷贝 shm:// loan 发布
struct BigStruct {
uint8_t payload[65536];
int64_t timestamp;
};
if (pub.is_support_loan()) {
Bytes buf = pub.loan(
sizeof(BigStruct));
auto* p =
new (buf.
data()) BigStruct{};
p->timestamp = 999;
pub.publish(buf);
}
}
BigStruct msg{};
msg.timestamp = 999;
pub2.publish(msg);
Versatile 128-byte byte buffer with SBO, five ownership modes and compression helpers.
定义 bytes.h:113
bool empty() const noexcept
Returns true if the buffer is empty (no data pointer and size == 0).
定义 bytes.h:880
uint8_t * data() noexcept
Returns a pointer to the start of the user data region (after the prefix offset).
定义 bytes.h:860
示例五:Bytes 类型(原始字节发布)
pub.publish(data);
sub.listen([](
const Bytes& bytes) {
printf(
"Received %zu bytes\n", bytes.
size());
});
size_t size() const noexcept
Returns the number of usable bytes (excluding the prefix offset region).
定义 bytes.h:868
static Bytes create(size_t size, uint8_t offset=0) noexcept
Creates an owned Bytes buffer of the given size.
安全别名
VLink 为事件模型提供安全加密的便捷别名:
template <typename MsgT>
template <typename MsgT>
Convenience alias for Publisher with message security enabled.
定义 publisher.h:275
Convenience alias for Subscriber with message security enabled.
定义 subscriber.h:276
示例六:安全加密发布订阅
pub.set_security_key("my-secret-key-256bit");
sub.set_security_key("my-secret-key-256bit");
sub.listen([](const MyMsg& msg) { });
完整安全加密配置请参阅 安全加密。
多订阅者场景
多个 Subscriber 可以订阅同一主题,每个都会独立收到消息副本:
#include "sensor.pb.h"
sub_logger.listen([](const sensor::SensorData& msg) {
printf("[Logger] speed=%.2f\n", msg.value());
});
sub_controller.listen([](const sensor::SensorData& msg) {
if (msg.value() > 120.0) {
printf("[Controller] Speed limit exceeded!\n");
}
});
sub_recorder.listen([](const sensor::SensorData& msg) {
printf("[Recorder] recording...\n");
});
sensor::SensorData msg;
msg.set_value(100.5);
pub.publish(msg);
多订阅者的 QoS 匹配注意事项
在 DDS 系列传输中,Publisher 和 Subscriber 的 QoS 策略必须兼容,否则连接不会 建立。常见的兼容规则:
| QoS 策略 | 兼容规则 |
| Reliability | Publisher kReliable 兼容 Subscriber kBestEffort 或 kReliable |
| Durability | Publisher 的 kind >= Subscriber 的 kind(Persistent > Transient > TransientLocal > Volatile) |
| History | 独立配置,无跨端约束 |
内存管理注意事项
1. 消息对象的生命周期
publish(msg) 在内部完成序列化后立即返回,调用后 msg 可以安全销毁或复用:
sensor::SensorData msg;
msg.set_value(1.0);
pub.publish(msg);
msg.set_value(2.0);
2. Loan Buffer 的生命周期
通过 loan() 获取的 buffer 由传输后端(共享内存)管理:
Bytes buf = pub.loan(
sizeof(MyStruct));
pub.publish(buf);
if (should_skip) {
pub.return_loan(buf);
}
3. 订阅者回调中的数据引用
回调参数 const MsgT& msg 的生命周期仅限于回调函数体内:
sub.listen([](const sensor::SensorData& msg) {
double v = msg.value();
auto copy = msg;
});
4. 手动 unloan 模式(shm:// 零拷贝接收)
手动归还模式下,必须拿到原始 loan 后归还。对于 Bytes 类型消息更直接:
sub.set_manual_unloan(true);
sub.listen([&sub](
const Bytes& data) {
sub.return_loan(data);
});
其他消息类型(如 POD、Proto)使用手动模式时,需自行从回调参数还原 Bytes 句柄;多数情况下使用默认自动归还即可。
5. 安全退出(safe_quit)
当节点在多线程环境中可能在回调执行期间被销毁时,开启安全退出:
pub.set_safety_quit(true);
性能调优建议
1. 选择合适的传输后端
| 场景 | 推荐传输 | 理由 | 状态 |
| 同进程内高频通信 | intra:// | 无序列化,无 IPC 开销 | 稳定 |
| 同机跨进程大负载(图像/点云) | shm:// | 零拷贝,极低延迟 | 稳定 |
| 同机跨进程小负载 | shm:// | 低延迟 IPC | 稳定 |
| 跨机器标准通信 | dds:// / ddsc:// | 标准 DDS,功能完整 | 稳定 |
| 跨机器高吞吐 | zenoh:// | 现代协议,内置压缩 | Beta |
| 车载以太网 SOA | someip:// | 符合 AUTOSAR 规范 | Beta |
2. QoS 策略优化
vlink::DdsConf::register_qos("sensor", vlink::QosProfile::kSensor);
vlink::DdsConf::register_qos("event", vlink::QosProfile::kEvent);
3. 序列化格式选择
| 格式 | 序列化速度 | 消息大小 | 适用场景 |
| POD struct | 极快 | 固定 | 简单数值数据、IMU、控制指令 |
| FlatBuffers | 快 | 较小 | 复合类型、需要零拷贝读取 |
| Protobuf | 中 | 小 | 通用消息、跨语言、字段可扩展 |
| Bytes | 极快 | 任意 | 自定义二进制协议、图像帧 |
| CDR | 快 | 中 | 与 DDS 原生类型互通 |
4. MessageLoop 线程模型
sub1.attach(&loop);
sub2.attach(&loop);
pub_timer.attach(&loop);
sub.listen([](const MyMsg& msg) {
std::lock_guard lock(global_mutex);
});
5. 减少不必要的订阅者检测
for (int i = 0; i < 1000; ++i) {
if (pub.has_subscribers()) {
pub.publish(msg);
}
}
bool has_sub = false;
pub.detect_subscribers([&has_sub](bool connected) {
has_sub = connected;
});
for (int i = 0; i < 1000; ++i) {
if (has_sub) {
pub.publish(msg);
}
}
6. 延迟调试
sub.set_latency_and_lost_enabled(true);
sub.listen([&sub](const SensorData& msg) {
int64_t lat_us = sub.get_latency();
printf(
"latency=%ldus lost=%" PRIu64
"/%" PRIu64
"\n", lat_us, lost.
lost, lost.
total);
});
Cumulative sample delivery statistics for a subscriber or getter.
定义 types.h:217
uint64_t total
Total number of samples expected (delivered + lost).
定义 types.h:218
uint64_t lost
Number of samples that were dropped or missed.
定义 types.h:219
相关文档