VLink 2.0.0
A high-performance communication middleware
载入中...
搜索中...
未找到
  1. 基础库

VLink 的 base 基础库提供了一套轻量、高性能的底层工具集,供通信核心与上层应用共同使用。 所有组件均以 C++17 编写,无第三方强制依赖,可独立集成。

相关文档:


11.1 组件总览

组件 头文件 功能简述
Logger base/logger.h 全局单例日志器,四种输出风格,可插拔后端
Bytes base/bytes.h 128 字节固定大小缓冲区,SBO + 五种所有权
MessageLoop base/message_loop.h 单线程事件循环,集成定时器与优先级队列
Timer base/timer.h 事件循环驱动的周期/单次定时器
WheelTimer base/wheel_timer.h 哈希时间轮,O(1) 插入/删除大量超时
ElapsedTimer base/elapsed_timer.h 高精度计时,支持墙钟和 CPU 活跃时间
DeadlineTimer base/deadline_timer.h 绝对截止时间的无锁超时检测
ThreadPool base/thread_pool.h 固定线程数的通用并行任务池
Process base/process.h 跨平台子进程管理,管道 I/O,异步回调
MultiLoop base/multi_loop.h 多线程事件循环,继承 MessageLoop,并行执行
MpmcQueue base/mpmc_queue.h 无锁有界 MPMC 环形队列
ObjectPool base/object_pool.h 线程安全的通用对象池,RAII 自动归还
SpinLock base/spin_lock.h 自适应自旋锁及 RAII 守卫
Semaphore base/semaphore.h 进程内计数信号量,支持超时
SysSemaphore base/sys_semaphore.h 跨进程命名计数信号量
SysSharemem base/sys_sharemem.h 跨进程命名共享内存区域
Schedule base/schedule.h 带延迟、优先级、超时的任务调度包装器
GraphTask base/graph_task.h 有向无环图任务调度,支持条件分支
CpuProfiler base/cpu_profiler.h CPU 利用率分析器
CpuProfilerGuard base/cpu_profiler_guard.h CpuProfiler 的 RAII 自动守护
FastStream base/fast_stream.h 高性能输出流(Logger 内部引擎)
Format base/format.h 轻量无堆分配的 {} 占位符格式化器
Utils base/utils.h 平台无关的进程、线程、信号工具函数

11.2 日志系统 Logger

概述

vlink::Logger 是一个全局单例日志器,通过 Logger::init() 初始化,通过宏在任意位置写入日志。 它同时支持控制台 Sink 和文件 Sink,每个 Sink 可独立配置最低日志级别。

日志级别

枚举 用途
0 kTrace 详细跟踪,用于排查内部逻辑
1 kDebug 开发调试信息
2 kInfo 正常运行信息
3 kWarn 异常但可恢复的情况
4 kError 可恢复错误
5 kFatal 写日志后抛出 Exception::RuntimeError
6 kOff 关闭对应 Sink

当消息级别 >= kDetailLevel(默认 kWarn)时,宏自动在消息前追加 {filename:line} 定位信息。

四种输出风格

风格 宏前缀 示例 说明
流式(VLOG) VLOG_I VLOG_I("x=", x) 使用 FastStream,零堆分配
格式化(MLOG) MLOG_I MLOG_I("x={}", x) 使用 VLink format::format_to_n
C 风格(CLOG) CLOG_I CLOG_I("x=d", x) 使用 std::snprintf
RAII 流(SLOG) SLOG_I SLOG_I << "x=" << x WrapperStream 析构时自动提交

每种风格对应六个级别的短宏(_T/_D/_I/_W/_E/_F)。每条消息最大 4096 字节(kLocalBufferSize),超出将被截断。

编译期过滤

// 在包含头文件之前定义,低于该级别的调用在编译期零开销消除
#define VLINK_LOG_LEVEL 2 // 只保留 Info 及以上
#define VLINK_LOG_DETAIL_LEVEL 3 // Warn 以上才附加文件行号
// 禁用短宏别名(VLOG_I 等),只保留 VLINK_LOG_I 形式
#define VLINK_LOG_DISABLE_SHORT

自定义后端

// 注册自定义控制台处理器
[](vlink::Logger::Level level, std::string_view msg) {
my_console_output(level, msg);
});
// 注册自定义文件处理器
[](vlink::Logger::Level level, std::string_view msg) {
my_file_output(level, msg);
});

回溯环形缓冲区

// 启用回溯,保留最后 100 条消息
// 崩溃前刷出所有保留消息
// 关闭回溯

完整使用示例

int main() {
// 初始化:程序名 + 日志文件路径(可选)
vlink::Logger::init("my_app", "/var/log/my_app.log");
// 仅将 Info 以上输出到控制台
// 文件记录 Debug 以上
int node_id = 42;
double temp = 78.5;
// 流式风格
VLOG_I("node started, id=", node_id);
// 格式化风格
MLOG_W("temperature is {} C, threshold exceeded", temp);
// C 风格
CLOG_E("errno=%d msg=%s", errno, strerror(errno));
// RAII 流风格(可跨行)
SLOG_D << "values: " << node_id << " temp=" << temp;
// Fatal 会抛出异常
try {
VLOG_F("unrecoverable condition: ", "disk full");
} catch (const std::exception& e) {
// e.what() 包含日志消息
}
return 0;
}
Global singleton logger with three output styles and pluggable backends.
#define CLOG_E(...)
定义 logger.h:866
#define VLOG_F(...)
定义 logger.h:856
#define SLOG_D
定义 logger.h:884
#define VLOG_I(...)
定义 logger.h:850
#define MLOG_W(...)
定义 logger.h:876

11.3 字节缓冲区 Bytes

Bytes 五种所有权模式

概述

vlink::Bytes 是 VLink 的核心数据载体,总大小固定为 **128 字节**。 96 字节以内的数据直接存储在对象内部(小缓冲优化 SBO),无需堆分配。 超出部分从内存池或系统堆分配。

所有权模型

创建方式 拥有内存 复制行为 典型用途
Bytes::create(n) 深拷贝 新建分配
Bytes::shallow_copy() 指针别名 零拷贝包装外部缓冲区
Bytes::deep_copy() 深拷贝 拥有外部数据的副本
Bytes::loan_internal() 否(借用) 指针别名 Iceoryx 零拷贝借用(shm 后端)
Bytes::shallow_copy_ptr() 指针别名 包装不透明指针(size == 0)

内存布局

Bytes 内存布局

offset 字段为协议头预留前缀区域,data() = real_data() + offset()

常用操作

// 创建 64 字节缓冲区(<= 96,使用 SBO,无堆分配)
auto buf = vlink::Bytes::create(64);
std::memcpy(buf.data(), payload, 64);
// 创建带 8 字节前缀区的缓冲区(用于协议头)
auto buf2 = vlink::Bytes::create(100, /*offset=*/8);
// real_data() 指向原始起始,data() 偏移 8 字节
// 零拷贝包装外部只读缓冲区
auto view = vlink::Bytes::shallow_copy(ext_ptr, ext_size);
// view.is_owner() == false,不会释放 ext_ptr
// 深拷贝
auto owned = vlink::Bytes::deep_copy(ext_ptr, ext_size);
// owned.is_owner() == true
// 从 std::string 构造
auto str_buf = vlink::Bytes::from_string("hello world");
// 初始化列表构造
vlink::Bytes raw_bytes = {0x01, 0x02, 0x03, 0x04};
// 基本访问
uint8_t* p = buf.data();
size_t n = buf.size();
bool is_owner = buf.is_owner();
bool is_empty = buf.empty();
// 迭代器(for 范围循环)
for (uint8_t byte : buf) {
process(byte);
}
// 转换
std::string s = buf.to_string();
std::string_view sv = buf.to_string_view();
std::vector<uint8_t> v = buf.to_raw_data();
// 调整大小
buf.resize(128); // 如需扩容则重新分配
buf.shrink_to(32); // 仅缩减逻辑大小,不重新分配
// 清空(释放所有权内存)
buf.clear();
Versatile byte buffer with small-buffer optimisation, ownership semantics and compression.

压缩支持(LZAV)

// 压缩(使用 LZAV 算法)
auto compressed = vlink::Bytes::compress_data(raw.data(), raw.size());
// 检测是否已压缩(校验 4 字节头部魔数 + 4 字节尾部魔数)
if (vlink::Bytes::is_compress_data(compressed.data(), compressed.size())) {
compressed.data(), compressed.size());
}
// 高压缩比模式(更慢)
auto hi_compressed = vlink::Bytes::compress_data(raw.data(), raw.size(),
/*high_ratio=*/true);

compress_data() 对输入大小上限为 kMaxCompressCacheSize(1 MiB), 超出会返回空 Bytes

工具函数

// Base64 编解码
std::string b64 = vlink::Bytes::encode_to_base64(buf);
// CRC-32 校验
uint32_t crc = vlink::Bytes::get_crc_32(buf);
// Hex 字符串
std::string hex = vlink::Bytes::convert_to_hex_str(buf.data(), buf.size());
// 字节序反转
auto reversed = vlink::Bytes::reverse_order(buf);
// 内存池(Linux PMR,减少分配开销)
vlink::Bytes::init_memory_pool(); // 应用启动时调用一次
// ... 使用 Bytes ...
vlink::Bytes::release_memory_pool(); // 应用退出前调用

11.4 消息循环 MessageLoop

vlink::MessageLoop 是 VLink 中的核心任务调度器,也是定时器、Schedule 等机制的基础。 它实现了一个**单线程串行事件循环**:所有任务在同一个线程上顺序执行, 回调内部无需加锁即可安全访问共享状态。

核心概念

MessageLoop 核心概念
  • **单线程串行**:所有投递到同一个 MessageLoop 的任务严格串行,不会并发。
  • **线程安全投递**:post_task() 本身是线程安全的,可从任意线程调用。
  • **集成定时器**:Timer attach 到 MessageLoop 后,定时回调与普通任务共用同一线程。
  • **最大队列深度**:默认 10000(kMaxTaskSize),超出时 post_task() 返回 false
  • **最大定时器数**:默认 100(kMaxTimerSize),超出时 Timer::attach() 失败。

队列类型

类型(Type) 内部实现 最大任务数 特点
kNormalType mutex + std::queue 10000 默认,FIFO 无优先级
kLockfreeType 无锁 MpmcQueue 10000 单生产者路径最快
kPriorityType 优先级队列 10000 支持任务优先级,数值大先执行

空闲策略

策略(Strategy) 行为 适用场景
kOptimizationStrategy yield 平衡延迟与 CPU 使用率 默认,通用
kPopStrategy 忙轮询,持续检测队列(最低延迟) 实时性要求极高
kBlockStrategy 条件变量阻塞等待任务(最低 CPU) 低频任务、省电场景

运行模式

run() – 阻塞运行

在**调用线程**上运行事件循环,阻塞直到 quit() 被调用。 适合主线程驱动的模型。

loop.register_begin_handler([] { VLOG_I("loop started"); });
loop.register_end_handler([] { VLOG_I("loop stopped"); });
// 在另一个线程中(或定时器中)调用 quit()
std::thread stopper([&] {
std::this_thread::sleep_for(std::chrono::seconds(5));
loop.quit();
});
loop.run(); // 阻塞 ~5 秒
stopper.join();

async_run() – 后台线程运行

立即在**新后台线程**上启动循环,调用线程不阻塞。 这是最常用的模式。

loop.async_run(); // 立即返回,后台线程已启动
loop.post_task([] { do_work(); });
loop.quit();
loop.wait_for_quit(); // 等待后台线程退出

spin() / spin_once() – 手动驱动

MessageLoop 嵌入已有事件循环,手动驱动。

// spin() 阻塞循环(不启动后台线程)
// loop.spin();
// spin_once() 处理一批任务后返回
while (app_running) {
loop.spin_once(/*block=*/false); // 非阻塞
do_other_work();
}

基本任务投递

post_task()

loop.async_run();
// 从任意线程安全投递
loop.post_task([] {
// 在循环线程上执行
VLOG_I("hello from loop thread");
});
// 携带上下文
std::string data = "sensor_data";
loop.post_task([data]() {
process(data);
});

post_task_with_priority()

仅对 kPriorityType 类型的循环有效,其他类型退化为 FIFO。

ploop.async_run();
// 高优先级任务优先执行
ploop.post_task_with_priority(
[] { handle_critical_alert(); },
ploop.post_task_with_priority(
[] { handle_normal_msg(); },
// 预定义优先级常量见下表

MessageLoop 预定义优先级常量:

常量 说明
kNoPriority 0 无优先级,按 FIFO 顺序执行
kLowestPriority 1 最低优先级
kTimerPriority 50 内部定时器默认使用
kNormalPriority 100 标准应用任务
kHighestPriority 65535 最高优先级,紧急/实时任务

注意:优先级值越大越先执行。仅 kPriorityType 队列支持优先级调度,其他队列类型忽略优先级参数,退化为 FIFO 顺序。 QoS 扩展字段 Qos::Additions::Priority 是独立于 MessageLoop 优先级的另一套枚举,详见 08-qos.md

invoke_task() – 带返回值

从**外部线程**投递任务并等待结果,通过 std::future 获取返回值。

loop.async_run();
// 获取循环线程内的计算结果
auto future = loop.invoke_task([]() -> int {
return expensive_compute();
});
// 阻塞等待(在外部线程上调用 .get() 是安全的)
int result = future.get();
VLOG_I("result=", result);

**警告**: 绝对不要在循环**自身的线程**上调用 future.get()。 这会导致死锁:任务等待被执行,而线程在等待任务,互相阻塞。 使用 is_in_same_thread() 检测当前是否在循环线程内。

// 安全调用示例
if (!loop.is_in_same_thread()) {
auto fut = loop.invoke_task([]{ return get_state(); });
auto state = fut.get(); // 安全
} else {
// 已经在循环线程内,直接调用
auto state = get_state();
}

与定时器集成

Timer 绑定到 MessageLoop 后,定时回调与普通任务共用**同一线程**。

loop.async_run();
// 每 1000 ms 执行一次
vlink::Timer heartbeat(&loop, 1000, vlink::Timer::kInfinite, [&]() {
VLOG_I("heartbeat tick");
loop.post_task([] { send_heartbeat(); });
});
heartbeat.start();
// 延迟 500 ms 后执行一次
vlink::Timer::call_once(&loop, 500, [] {
VLOG_I("delayed init");
});

延迟执行通过 exec_taskSchedule::Config::delay_msTimer::call_once 实现;周期执行直接用 Timer(见上文示例)。

exec_task – 带调度配置的任务

exec_task() 是比 post_task() 更强大的投递接口,支持链式延续回调。

void 回调

loop.exec_task(
/*delay_ms=*/100,
/*priority=*/50,
/*schedule_timeout_ms=*/500, // 500ms 内未开始执行则触发
/*execution_timeout_ms=*/200 // 执行超过 200ms 则触发
},
[] {
long_running_op();
})
.on_schedule_timeout([] {
VLOG_W("task was not scheduled in time");
})
.on_execution_timeout([] {
VLOG_W("task execution took too long");
})
.on_catch([](std::exception& e) {
VLOG_E("exception: ", e.what());
});
#define VLOG_E(...)
定义 logger.h:854
#define VLOG_W(...)
定义 logger.h:852

bool 回调(带结果链)

loop.exec_task(
[]() -> bool {
return try_connect_to_server();
})
.on_then([]() -> bool {
// 连接成功,开始认证
return do_auth();
})
.on_then([]() -> bool {
// 认证成功,订阅主题
subscribe_all();
return true;
})
.on_else([] {
VLOG_W("connection or auth failed, will retry");
schedule_retry();
});

生命周期管理

loop.async_run();
// 检查状态
bool running = loop.is_running();
bool quitting = loop.is_ready_to_quit();
bool busy = loop.is_busy();
size_t queued = loop.get_task_count();
// 等待队列清空(最多等 1000 ms)
loop.wait_for_idle(1000);
// 请求退出(等待当前任务完成)
loop.quit();
// 强制退出(丢弃剩余任务)
loop.quit(/*force=*/true);
// 等待后台线程完全退出
loop.wait_for_quit(/*ms=*/2000); // 最多等 2 秒

生命周期回调钩子

// 在循环线程启动、第一个任务执行前调用
VLOG_I("loop thread started");
});
// 在循环线程退出前调用
VLOG_I("loop thread stopping");
});
// 每次任务队列变空时调用(频率可能很高)
// 不要在此做重操作
});

也可以通过继承重载虚函数,实现更细粒度的监控:

class MyLoop : public vlink::MessageLoop {
protected:
void on_begin() override {
}
void on_end() override {
VLOG_I("loop ended");
}
void on_idle() override {
// 队列空闲时的定期统计
}
// 每个任务执行前调用(start_time 为毫秒时间戳)
void on_task_changed(Callback&& cb, uint32_t start_time) override {
// 可用于任务追踪
}
// 任务执行时间超过 get_max_elapsed_time() 时触发
void on_task_timeout(Callback&& cb, uint32_t elapsed_ms) override {
VLOG_W("slow task: ", elapsed_ms, " ms");
}
};

在通信回调中使用 MessageLoop

VLink 的通信回调(Subscriber、Server 等)在传输层的**内部线程**上触发。 将收到的消息 post 到自己的 MessageLoop 是串行化处理的标准模式:

my_loop.async_run();
vlink::Subscriber<MyMsg> sub("dds://my/topic");
sub.listen([&](const MyMsg& msg) {
// 此回调在 DDS 内部线程上触发
// 拷贝数据后 post 到业务循环
auto copy = msg;
my_loop.post_task([copy = std::move(copy)]() {
// 在 my_loop 线程上安全处理
process_message(copy);
});
});

多 MessageLoop 场景

每个模块可以拥有独立的 MessageLoop,实现关注点分离:

vlink::MessageLoop sensor_loop; // 传感器数据处理
vlink::MessageLoop control_loop; // 控制逻辑
vlink::MessageLoop log_loop; // 日志异步写入
sensor_loop.async_run();
control_loop.async_run();
log_loop.async_run();
// 传感器数据 -> 控制循环
sensor_loop.post_task([&] {
SensorData data = read_sensor();
control_loop.post_task([data]() {
apply_control(data);
});
});

不同循环间通过 post_task() 传递消息是线程安全的。

与 ThreadPool 配合

MessageLoop 串行、ThreadPool 并行,常见模式是把 CPU 密集型任务下发到 ThreadPool,结果通过 post_task() 回到 MessageLoop 进行串行更新:

vlink::ThreadPool compute_pool(4);
ui_loop.async_run();
// 在 UI 循环触发异步计算
ui_loop.post_task([&] {
compute_pool.post_task([&] {
auto result = heavy_compute(); // 并行计算
// 计算完成后回到 UI 循环
ui_loop.post_task([result]() {
update_ui(result); // 串行更新
});
});
});

线程安全说明

操作 是否线程安全 说明
post_task() 可从任意线程并发调用
post_task_with_priority() 同上
invoke_task() 是,但有注意 不能在同一循环线程上 .get()
quit() 可从任意线程调用
is_running() / is_busy() 状态读取
run() / async_run() 只能从构造循环的线程调用一次
循环线程内访问共享状态 安全 串行,无数据竞争
多个循环线程访问同一共享状态 不安全 需要额外同步(mutex/原子量)

注意事项与常见陷阱

死锁

// 错误:在循环线程内等待 future
loop.post_task([&] {
auto fut = loop.invoke_task([] { return 1; });
int v = fut.get(); // 死锁!循环线程在等自己
});
// 正确:仅从外部线程等待 future
if (!loop.is_in_same_thread()) {
auto fut = loop.invoke_task([] { return 1; });
int v = fut.get(); // 安全
}

递归调用

post_task() 可以在循环线程内调用(从正在执行的任务中投递新任务),这是安全的:

loop.post_task([&] {
// 合法:投递下一个任务
loop.post_task([&] {
do_next_step();
});
});

但不要在任务内调用 wait_for_idle(),这会导致阻塞自身。

队列满

bool ok = loop.post_task([] { ... });
if (!ok) {
VLOG_W("queue full, task dropped");
// 考虑限流或增大 max_task_count
}

通过继承重载 get_max_task_count() 调整队列上限:

class MyLoop : public vlink::MessageLoop {
public:
size_t get_max_task_count() const override { return 50000; }
};

长时间任务阻塞循环

MessageLoop 是单线程的。一个耗时很长的任务会阻塞所有其他任务(包括定时器)。 对于耗时操作,应使用 ThreadPool 异步执行,完成后 post 结果。

完整示例:定时任务 + 异步回调串行化

// 生命周期钩子
VLOG_I("main-loop started");
});
loop.async_run();
// 周期性状态上报(每 2 秒)
vlink::Timer status_timer(&loop, 2000, vlink::Timer::kInfinite, [&] {
VLOG_I("status: queued=", loop.get_task_count());
});
status_timer.start();
// 3 秒后执行一次性初始化
vlink::Timer::call_once(&loop, 3000, [] {
VLOG_I("delayed init done");
});
// 带超时的异步任务
loop.exec_task(
vlink::Schedule::Config{/*delay_ms=*/500, 0, 1000, 200},
[]() -> bool {
return connect_to_remote();
})
.on_then([]() -> bool {
subscribe_topics();
return true;
})
.on_else([] { schedule_reconnect(); })
.on_execution_timeout([] { VLOG_W("connect timed out"); });
// 10 秒后优雅退出
vlink::Timer::call_once(&loop, 10000, [&] {
loop.quit();
});
VLOG_I("shutdown complete");
Single-threaded event loop with three queue types, timer management and task scheduling.
RAII task scheduling wrapper with delay, priority, timeouts and result chaining.
Event-loop-driven periodic/one-shot timer with configurable priority.

11.5 定时器

定时器类型对比

Timer – 事件循环定时器

概述

vlink::TimerMessageLoop 集成,回调在循环线程上串行触发,无需额外同步。 当 interval_ms 传入 0 时,间隔会回退到内部的 kMinInterval 保护值。

构造与使用

// 每 500 ms 重复触发(kInfinite = 无限次)
vlink::Timer timer(&loop, 500, vlink::Timer::kInfinite, []() {
VLOG_I("tick");
});
timer.start();
// 仅触发 3 次
vlink::Timer once_timer(&loop, 1000, 3, []() {
VLOG_I("count down");
});
once_timer.start();
// 单次触发(fire-and-forget,无需管理 Timer 生命周期)
vlink::Timer::call_once(&loop, 200, []() {
VLOG_I("delayed once");
});
loop.run(); // 阻塞直到 quit()

严格模式

timer.set_strict(true);
// 当循环线程繁忙、错过了某几个 tick 时,
// 严格模式会在下一次迭代立即补发错过的回调,维持调度精度。

主要 API

timer.start(); // 启动定时器
timer.stop(); // 停止定时器
timer.restart(); // 重置计数并重新启动
timer.set_interval(100); // 修改间隔(ms),下次 start/restart 生效
timer.set_loop_count(10); // 修改触发次数
timer.set_callback([]{ ... }); // 替换回调
timer.set_priority(100); // 设置优先级(仅 kPriorityType 循环有效)
timer.set_strict(true); // 启用严格(追赶)模式
timer.is_active(); // 是否正在运行
timer.get_invoke_count(); // 已触发次数
timer.get_remain_loop_count(); // 剩余次数(kInfinite 表示无限)
timer.get_priority(); // 获取当前优先级
timer.attach(&loop); // 绑定到 MessageLoop
timer.detach(); // 从 MessageLoop 解绑
// 静态方法:单次触发(fire-and-forget)
// 完整签名:
// static bool call_once(MessageLoop* loop, uint32_t interval_ms,
// Callback&& callback, uint16_t priority = 0);
vlink::Timer::call_once(&loop, 200, []{ ... });
vlink::Timer::call_once(&loop, 200, []{ ... }, /*priority=*/50);

WheelTimer – 哈希时间轮定时器

概述

vlink::WheelTimer 使用哈希时间轮算法管理**大量并发超时**(数十万级别), 插入、删除均为 O(1),适合连接保活、会话超时等场景。

时间轮参数:

  • slots:时间槽数量,决定最大无折叠精度范围(slots * interval_ms
  • interval_ms:每个槽的时间(毫秒),所有定时器分辨率

超过 slots * interval_ms 的超时通过轮次计数器处理,仍为 O(1)。

使用示例

// 256 个槽,每槽 10 ms -> 最大精度范围 2.56 s(超出使用轮次)
vlink::WheelTimer wheel(256, 10);
wheel.start();
// 单次超时,1000 ms 后触发
auto key = wheel.add(1000, [](vlink::WheelTimer::Key k) {
VLOG_I("timeout key=", k);
});
// 周期性重复,每 500 ms 触发一次
auto repeat_key = wheel.add(500, [](vlink::WheelTimer::Key k) {
VLOG_I("heartbeat key=", k);
}, /*repeat_ms=*/500);
// 取消定时器
wheel.remove(key);
// 查询剩余时间(近似值,受槽边界影响)
uint32_t remaining = wheel.get_remaining_time(repeat_key);
// 暂停/恢复
wheel.pause();
wheel.resume();
// 限制追赶槽数(防止系统休眠后的回调风暴)
wheel.set_catchup_limit(10);
wheel.stop();
Hash-wheel timer for managing large numbers of concurrent timeouts efficiently.

**注意**:回调在 WheelTimer 内部工作线程上调用,需要线程安全操作。 通常将结果 post 到 MessageLoop 处理。

ElapsedTimer – 高精度计时器

概述

vlink::ElapsedTimer 测量从 start() 调用开始的经过时间,支持两种时钟源和三种精度。

时钟源(Method) 描述
kCpuTimestamp 单调墙钟(Linux CLOCK_MONOTONIC_RAW
kCpuActiveTime 进程 CPU 活跃时间(user + kernel)
精度(Accuracy) 单位
kMilli 毫秒
kMicro 微秒
kNano 纳秒

使用示例

// 默认:墙钟 + 毫秒
t.start();
do_work();
int64_t ms = t.get(); // 经过的毫秒数,未启动则返回 -1
// 微秒精度的 CPU 时间
cpu_t.start();
heavy_compute();
int64_t cpu_us = cpu_t.get();
// restart() 原子地返回经过时间并重置起点
int64_t delta = t.restart();
// 停止
t.stop(); // get() 此后返回 -1
t.is_active(); // false
// 静态工具:获取当前时间戳(无需创建对象)
High-resolution elapsed-time measurement with configurable clock source and precision.

性能测量惯用法

bench.start();
for (int i = 0; i < 10000; i++) {
do_operation();
}
int64_t total_us = bench.get();
VLOG_I("avg latency = ", (total_us / 10000), " us");

DeadlineTimer – 绝对截止时间定时器

概述

vlink::DeadlineTimer 存储一个绝对到期时间戳(原子 64 位),用于**无锁超时检测**。 多线程并发读取 has_expired()remaining_time() 无需加锁。

// 构造即设置 200 ms 截止时间
while (!dt.has_expired()) {
process_events();
}
int64_t left = dt.remaining_time(); // 0 表示已过期或无效
// 重置为 500 ms 后到期
dt.set_deadline(500);
// 设置绝对时间戳
dt.set_deadline_abs(abs + 1000);
// 清空截止时间:is_valid() 返回 false,has_expired() 也返回 false
dt.reset();
An absolute-deadline timer for lightweight, lock-free timeout tracking.

remaining_time() 返回 0 同时可能意味着"无效"或"已过期"; 在做精确判断时应结合 is_valid() 使用。


11.6 线程池 ThreadPool

线程池架构

概述

vlink::ThreadPool 维护固定数量的工作线程,用于**并行执行**任务。 与 MessageLoop 的区别:无定时器支持,任务可并发执行,适合 CPU 密集型工作。

队列类型与空闲策略

类型(Type) 内部队列 说明
kNormalType mutex + std::queue 默认,通用
kLockfreeType 无锁 MPMC 队列 低竞争延迟
策略(Strategy) 行为
kOptimizationStrategy yield 平衡延迟与 CPU
kPopStrategy 忙轮询,最低延迟最高 CPU
kBlockStrategy 条件变量阻塞,最低 CPU

使用示例

// 8 个工作线程
pool.set_name("compute-pool");
// 提交无返回值任务
pool.post_task([] {
heavy_work();
});
// 提交有返回值任务,获取 future
auto fut = pool.invoke_task([]() -> int {
return compute_answer();
});
int result = fut.get(); // 等待结果
// 查询队列深度
size_t pending = pool.get_task_count();
// 检测是否在工作线程内(防止死锁)
if (!pool.is_in_work_thread()) {
auto inner_fut = pool.invoke_task([]{ return 42; });
inner_fut.get(); // 安全:调用者在外部线程
}
// 关闭(等待当前任务完成)
pool.shutdown();
General-purpose thread pool for parallel task execution.

**警告**:在线程池的工作线程内调用 invoke_task(...).get() 会死锁! 使用 is_in_work_thread() 检测,或改用 post_task()


11.7 进程管理 Process

vlink::Process 是 VLink 基础库提供的跨平台子进程管理类,API 设计参考了 Qt 的 QProcess, 用于启动、监控、通信和终止子进程。它支持 stdout/stderr 的管道捕获、stdin 写入、 异步回调通知,以及同步执行辅助方法,可在 Linux、macOS、Windows 和 QNX 上使用。

概述

在自动驾驶和具身智能场景中,经常需要从主进程启动并管理外部程序(如传感器驱动、数据采集工具、 诊断程序等)。Process 封装了底层的 fork/exec(POSIX)和 CreateProcess(Windows)调用, 提供统一的 C++ 接口完成以下工作:

  • 启动子进程并传递命令行参数
  • 配置子进程的环境变量和工作目录
  • 通过管道捕获子进程的标准输出和标准错误
  • 向子进程的标准输入写入数据
  • 注册异步回调,在数据可读、状态变化或进程退出时获得通知
  • 优雅终止(SIGTERM)或强制杀死(SIGKILL)子进程
  • 提供静态方法 execute()start_detached() 用于简化的同步执行和分离启动

头文件

Cross-platform child process management with I/O piping and async callbacks.

生命周期状态

Process 内部维护一个三态状态机:

状态 数值 说明
kNotRunningState 0 未启动或已退出
kStartingState 1 start() 已调用,正在等待 exec 完成
kRunningState 2 子进程正在运行

状态转换流程:

Process 状态机

通过 get_state() 查询当前状态,通过 is_running() 快速判断是否正在运行。

退出状态

枚举 数值 说明
kNormalExitStatus 0 子进程正常退出(exit() 或 main 返回)
kCrashExitStatus 1 子进程被信号杀死或崩溃

错误码

枚举 数值 说明
kNoError 0 无错误
kUnknownError 1 未知错误
kStartError 2 启动失败(如可执行文件不存在)
kCrashedError 3 子进程崩溃
kTimedOutError 4 等待操作超时
kWriteError 5 写入 stdin 失败
kReadError 6 读取 stdout/stderr 失败
kBufferOverflowError 7 输出超出 max_buffer_size 限制

I/O 通道模式

Process::Mode 控制子进程的 stdout 和 stderr 如何路由。必须在 start() 之前通过 set_process_mode() 设置。

模式 数值 stdout stderr 适用场景
kSeparateMode 0 缓冲到管道 缓冲到管道 需要分别读取 stdout/stderr
kMergedMode 1 缓冲到管道 合并到 stdout 管道 不区分 stdout/stderr
kForwardedMode 2 转发到父进程 转发到父进程 子进程输出直接显示在终端
kForwardedOutputMode 3 转发到父进程 缓冲到管道 stdout 直接显示,stderr 捕获
kForwardedErrorMode 4 缓冲到管道 转发到父进程 stdout 捕获,stderr 直接显示

注意事项:

  • kForwardedMode 下,read_all_output()read_all_error() 不会返回数据, 因为输出被直接转发到了父进程的终端。
  • kMergedMode 下,所有子进程输出都通过 read_all_output() 读取, read_all_error() 不会有数据。
  • 默认模式为 kSeparateMode
// 现在 stdout 和 stderr 分别缓冲到不同的管道中

核心 API

构造与析构

Process(); // 构造,不启动子进程
~Process(); // 析构,等待子进程退出(最多 5 秒),超时则强杀

析构函数的行为:

  1. 如果子进程仍在运行,先发送 terminate()(SIGTERM)
  2. 等待 kDestructorWaitTimeoutMs(5000 ms)
  3. 如果仍未退出,发送 kill()(SIGKILL)
  4. 再等待 1000 ms 后清理资源

Process 不可拷贝、不可移动。

start()

void start(const std::string& program, const std::vector<std::string>& arguments = {});

异步启动子进程。调用后进程状态从 kNotRunningState 转为 kStartingState, 成功后转为 kRunningState。如果已有进程在运行,则设置 kStartError 并返回。

proc.start("/bin/echo", {"hello", "world"});
proc.wait_for_started(3000);

start_command()

void start_command(const std::string& command);

将一个完整的命令行字符串按空白字符拆分为程序名和参数列表,然后调用 start()。 支持双引号、单引号和转义字符。

proc.start_command("/bin/echo hello world");

等待

bool wait_for_started(int msecs = kDefaultWaitTimeoutMs); // 默认 3000 ms
bool wait_for_finished(int msecs = kDefaultWaitTimeoutMs); // 默认 3000 ms
bool wait_for_ready_read(int msecs = kDefaultWaitTimeoutMs); // 默认 3000 ms
  • wait_for_started():阻塞等待子进程进入 kRunningState。传入 kInfinite(-1)表示无限等待。
  • wait_for_finished():阻塞等待子进程退出。
  • wait_for_ready_read():阻塞等待管道中有新数据可读。

终止子进程

void terminate(); // POSIX: SIGTERM, Windows: TerminateProcess()
void kill(); // POSIX: SIGKILL, Windows: TerminateProcess(9)
void close(bool force_kill_on_timeout = false); // 先 terminate,超时后可选 kill
proc.start("/bin/sleep", {"60"});
proc.wait_for_started(3000);
proc.close(true); // 先 SIGTERM,超时后 SIGKILL

状态查询

State get_state() const; // 当前生命周期状态
Error get_error() const; // 最近的错误码
int get_exit_code() const; // 退出码(进程退出后有效)
ExitStatus get_exit_status() const; // 退出方式(正常/崩溃)
bool is_running() const; // 是否正在运行
int64_t get_process_id() const; // 操作系统进程 ID(未运行时为 -1)

异步回调

所有回调均在内部监控线程中被调用,访问共享数据时需要注意线程安全。 回调必须在 start() 之前注册。

// 子进程退出时触发
MLOG_I("Process exited with code: {}", code);
} else {
VLOG_E("Process crashed!");
}
});
// stdout 有新数据时触发
std::string line;
while (proc.can_read_line_stdout()) {
proc.read_line_stdout(line);
VLOG_I("stdout: ", line);
}
});
// stderr 有新数据时触发
proc.register_ready_read_stderr_callback([&proc]() { ... });
// 进程状态变化时触发
switch (state) {
VLOG_I("Process starting...");
break;
VLOG_I("Process running");
break;
VLOG_I("Process stopped");
break;
}
});
// 发生错误时触发
#define MLOG_I(...)
定义 logger.h:874

读写操作

读取 stdout

方法 说明
bytes_available_stdout() 返回 stdout 缓冲区中可读字节数
can_read_line_stdout() 是否有完整的一行(以换行符结尾)
read_line_stdout(std::string& line) 读取一行(含换行符),从缓冲区移除
read_stdout(std::vector<uint8_t>& buf, size_t) 读取指定字节数到字节数组
read_all_output(std::string& str) 读取全部 stdout 数据到字符串
read_all_output(std::vector<uint8_t>& buf) 读取全部 stdout 数据到字节数组

读取 stderr

方法 说明
bytes_available_stderr() 返回 stderr 缓冲区中可读字节数
can_read_line_stderr() 是否有完整的一行
read_line_stderr(std::string& line) 读取一行
read_stderr(std::vector<uint8_t>& buf, size_t) 读取指定字节数
read_all_error(std::string& str) 读取全部 stderr 数据到字符串
read_all_error(std::vector<uint8_t>& buf) 读取全部 stderr 数据到字节数组

读取全部输出(stdout + stderr 合并)

方法 说明
read_all(std::string& str) 读取 stdout + stderr 合并到字符串
read_all(std::vector<uint8_t>& buf) 读取 stdout + stderr 合并到字节数组

写入 stdin

size_t write(const std::vector<uint8_t>& buffer, int timeout_ms = kDefaultWriteTimeoutMs);
size_t write(const std::string& str, int timeout_ms = kDefaultWriteTimeoutMs);

向子进程的标准输入写入数据。返回实际写入的字节数。默认超时 5000 ms。

proc.start("/bin/cat");
proc.wait_for_started(3000);
proc.write("hello_stdin\n");
proc.close_write_channel(); // 子进程(cat)收到 EOF 后退出
proc.wait_for_finished(3000);
std::string output;
proc.read_all_output(output);
// output 包含 "hello_stdin\n"

同步辅助方法

Process::execute()

static int execute(const std::string& program,
const std::vector<std::string>& arguments = {},
int timeout_ms = kDefaultExecuteTimeoutMs); // 默认 30000 ms

静态方法,同步执行一个外部程序并等待其完成。返回退出码,超时或启动失败返回 -1。

int code = vlink::Process::execute("/bin/sh", {"-c", "exit 42"}, 5000);
// code == 42

Process::start_detached()

static bool start_detached(const std::string& program,
const std::vector<std::string>& arguments = {});

静态方法,启动一个完全分离的子进程。分离的进程与父进程无关,父进程退出不影响它。

POSIX 实现:双 fork() + setsid(),孙进程被 init 收养,stdin/stdout/stderr 重定向到 /dev/null

bool ok = vlink::Process::start_detached("/usr/bin/my_daemon", {"--config", "/etc/my.conf"});

缓冲区管理

void set_max_buffer_size(size_t size); // 默认 16 MB
size_t get_max_buffer_size() const;

当子进程输出超过此限制时,多余数据被丢弃,并设置 kBufferOverflowError

环境变量与工作目录

// 环境变量
proc.set_inherit_environment(true); // 继承父进程环境变量(默认 false)
proc.set_environment({{"MY_VAR", "hello"}, {"DEBUG", "1"}});
// 工作目录
proc.set_working_directory("/tmp");

内部常量

常量 说明
kInfinite -1 无限等待标记
kDefaultWaitTimeoutMs 3000 wait_for_started/finished 默认超时
kDefaultWriteTimeoutMs 5000 write() 默认超时
kDefaultExecuteTimeoutMs 30000 execute() 默认超时
kDestructorWaitTimeoutMs 5000 析构函数等待子进程退出的超时
默认缓冲区大小(内部) 16 MB stdout/stderr 缓冲区上限
管道读取块大小(内部) 8192 每次从管道读取的最大字节数
监控线程轮询间隔(内部) 50 ms poll/waitpid 的超时时间

跨平台注意事项

  • **Linux / macOS**:使用 fork() + execvp()/execve(),管道使用 pipe() + dup2(),非阻塞 O_NONBLOCKpoll() 检测数据就绪。
  • **Windows**:使用 CreateProcessW() + CreatePipe(),重叠 I/O 读取管道,WaitForSingleObject() 等待退出。
  • **QNX**:与 Linux 类似使用 POSIX API,sysconf(_SC_OPEN_MAX) 获取最大 fd 数。
  • Process 不可拷贝、不可移动,同一对象同一时间只能管理一个子进程。

完整示例

捕获子进程输出

#include <iostream>
#include <string>
int main() {
proc.start("/bin/echo", {"hello", "vlink"});
proc.wait_for_finished(3000);
std::string output;
proc.read_all_output(output);
std::cout << "Output: " << output << std::endl;
// 输出: hello vlink
return 0;
}

异步监听子进程输出

#include <iostream>
#include <string>
int main() {
std::string line;
while (proc.can_read_line_stdout()) {
proc.read_line_stdout(line);
std::cout << "Received: " << line;
}
});
std::cout << "Process finished, exit code: " << code << std::endl;
});
proc.start("/bin/sh", {"-c", "echo line1; echo line2; echo line3"});
proc.wait_for_finished(5000);
return 0;
}

与子进程交互(读写 stdin/stdout)

#include <iostream>
#include <string>
int main() {
proc.start("/bin/cat");
proc.wait_for_started(3000);
proc.write("hello from parent\n");
proc.write("another line\n");
proc.wait_for_finished(3000);
std::string output;
proc.read_all_output(output);
std::cout << output;
// 输出:
// hello from parent
// another line
return 0;
}

11.8 并发原语

并发组件对比表

任务调度组件对比

特性 MessageLoop ThreadPool MultiLoop
执行模型 单线程串行 固定线程池并行 事件循环 + 线程池并行
任务顺序保证 严格 FIFO 无保证 无保证
定时器支持 支持 不支持 支持(继承自 MessageLoop)
生命周期管理 run/quit/async_run 构造即启动/shutdown async_run/quit(继承)
队列类型 Normal/Lockfree/Priority Normal/Lockfree Normal/Lockfree/Priority
API 兼容性 基类 独立 与 MessageLoop 完全兼容
适用场景 有序任务派发 纯计算并行 需要定时器又需要并行处理的场景

同步原语对比

特性 SpinLock std::mutex Semaphore
等待方式 自旋(用户态) 内核态阻塞 条件变量阻塞
适用临界区长度 极短(几条指令) 任意 不限
上下文切换
公平性 无保证 由 OS 调度 FIFO
递归支持 不支持(会死锁) 可用 recursive_mutex 不适用
CPU 占用 高(忙等待) 低(休眠) 低(休眠)

MultiLoop 多线程循环

MultiLoop 继承自 MessageLoop,保持了完全相同的 post_task() / exec_task() / invoke_task() API。核心区别在于 on_task_changed() 方法被重写:任务不再在事件循环线程上直接执行,而是被转发到内部的 ThreadPool 上并行执行。

架构示意:

post_task()
调用方 ---------> MessageLoop 队列 ---------> on_task_changed()
|
ThreadPool (N 个工作线程)
/ | | \
thread0 t1 t2 thread(N-1)

关键特性:

  • N 个工作线程共享同一个任务队列
  • 任务不保证执行顺序
  • is_in_same_thread() 对任何工作线程返回 true
  • on_begin()on_end() 在每个工作线程上各调用一次
  • 定时器回调在某个工作线程上触发(非确定性)
  • 析构函数等待所有工作线程结束

构造参数

// 默认 4 个工作线程,kNormalType 队列
// 指定线程数
// 指定线程数和队列类型
vlink::MultiLoop loop(4, MessageLoop::kLockfreeType);

核心 API

loop.async_run();
loop.post_task([] { do_work(); });
loop.post_task_with_priority([] { urgent_work(); }, MessageLoop::kHighestPriority);
auto fut = loop.invoke_task([]() -> int { return compute(); });
int result = fut.get();
loop.wait_for_idle(3000);
loop.quit();
loop.wait_for_quit(2000);

完整示例

#include <atomic>
#include <iostream>
int main() {
loop.async_run();
std::atomic<int> counter{0};
constexpr int kTasks = 100;
for (int i = 0; i < kTasks; ++i) {
loop.post_task([&counter]() {
counter.fetch_add(1, std::memory_order_relaxed);
});
}
loop.wait_for_idle(5000);
std::cout << "Completed " << counter.load() << " tasks" << std::endl;
loop.quit();
loop.wait_for_quit(2000);
return 0;
}
Multi-threaded event loop backed by a pool of worker threads sharing one task queue.

适用场景

  • 需要定时器功能同时又需要任务并行执行的场景
  • 希望复用 MessageLoop API 但获得多线程吞吐量
  • 传感器数据处理流水线,既有周期性采集又有并行计算需求

注意事项

  • 任务回调中的共享状态必须由调用方自行保护(SpinLock、mutex 等)
  • 不要在任务回调中调用 invoke_task() 并同步等待结果,否则可能因线程池耗尽而死锁
  • MultiLoop 的名称自动编号为 MultiLoop_0MultiLoop_1 等,可通过 set_name() 覆盖

MpmcQueue 无锁队列

算法原理

MpmcQueue<T> 是一个固定容量、无锁、缓存行对齐的 MPMC 环形缓冲区,基于 turn-counting(轮次计数) 算法实现。

核心思路:

  • 环形缓冲区的每个槽位(Chunk)包含一个原子轮次计数器 turn
  • 生产者通过原子递增 head_ 来声明一个槽位,然后等待 chunk.turn == turn(head) * 2(槽位为空)后写入
  • 消费者通过原子递增 tail_ 来声明一个槽位,然后等待 chunk.turn == turn(tail) * 2 + 1(槽位有数据)后读取
  • 写入完成后将 turn 设为 turn(head) * 2 + 1(标记为满)
  • 读取完成后将 turn 设为 turn(tail) * 2 + 2(标记为空,下一轮可用)

等待策略:

  • 前 32 次(kFirstSpinTimes)空转
  • 超过 32 次后调用 Utils::yield_cpu()(x86 上是 PAUSE 指令,ARM 上是 WFE/YIELD)

64 字节对齐防伪共享

伪共享(False Sharing)是多核并发编程中的常见性能陷阱:当两个不同的原子变量恰好位于同一条缓存行(通常 64 字节)上时,一个核心的写操作会导致另一个核心的缓存行失效,即使它们访问的是不同的变量。

MpmcQueue 通过以下手段避免伪共享:

内存布局(64 字节对齐):
[ head_ (atomic, 64B 对齐) | padding... ] <-- 独占缓存行
[ chunk_*, capacity_ | cv_mtx_ ]
[ tail_ (atomic, 64B 对齐) | padding... ] <-- 独占缓存行
[ cv_not_empty_, cv_not_full_ ]
[ allocator_ ]
[ quit_flag_ (64B 对齐) | padding... ] <-- 独占缓存行

每个 Chunk 槽位也是 64 字节对齐的:

Chunk[0]: [ turn (atomic) | storage (T 的原始字节) | padding ] 64B 对齐
Chunk[1]: [ turn (atomic) | storage (T 的原始字节) | padding ] 64B 对齐
...

API 概览

阻塞式操作

q.push(42); // 阻塞式入队(队列满时自旋等待)
q.emplace(42); // 阻塞式原位构造入队
int val;
q.pop(val); // 阻塞式出队(队列空时自旋等待)

非阻塞式操作

bool ok = q.try_push(42); // 队列满时立即返回 false
bool ok = q.try_emplace(42);
int val;
bool ok = q.try_pop(val); // 队列空时立即返回 false

条件变量通知模式

当与 kBlockStrategy 消息循环配合使用时,需要启用条件变量通知:

int val;
q.wait_not_empty();
q.wait_not_empty(std::chrono::milliseconds(100));
q.wait_not_full();

状态查询

size_t cap = q.capacity(); // 固定容量
size_t sz = q.size(); // 近似大小(快速但可能略有偏差)
size_t sz2 = q.size(true); // 更精确的大小(较慢,最多重试 50 次)
bool e = q.empty(); // 是否为空
bool f = q.is_full(); // 是否已满

优雅退出

q.notify_to_quit();
// 此后 try_push/try_pop 返回 false
// 此后 emplace/push 静默丢弃数据
// 此后 pop 不修改输出参数直接返回

Behavior 枚举

push 时行为 pop 时行为
kNoBehavior 无通知 无通知
kConditionBehavior 通知 cv_not_empty_ 通知 cv_not_full_

容量规划建议

  • 容量必须 >= 1,传入 0 会抛出 std::invalid_argument
  • 每个槽位占用至少 64 字节(一个缓存行),加上 sizeof(T) 并向上对齐到 64 的倍数
  • 经验公式:容量 = 预期突发峰值的 2-4 倍

完整示例:生产者-消费者

#include <atomic>
#include <thread>
#include <vector>
#include <iostream>
int main() {
std::atomic<int> sum{0};
constexpr int kProducers = 4;
constexpr int kConsumers = 4;
constexpr int kItemsPerProducer = 1000;
constexpr int kTotal = kProducers * kItemsPerProducer;
std::atomic<int> consumed_count{0};
std::vector<std::thread> consumers;
for (int c = 0; c < kConsumers; ++c) {
consumers.emplace_back([&]() {
while (true) {
int val = 0;
if (consumed_count.load() >= kTotal) {
break;
}
if (q.try_pop(val)) {
sum.fetch_add(val, std::memory_order_relaxed);
consumed_count.fetch_add(1);
} else {
std::this_thread::yield();
}
}
});
}
std::vector<std::thread> producers;
for (int p = 0; p < kProducers; ++p) {
producers.emplace_back([&, p]() {
int base = p * kItemsPerProducer;
for (int i = 0; i < kItemsPerProducer; ++i) {
q.push(base + i);
}
});
}
for (auto& t : producers) { t.join(); }
while (consumed_count.load() < kTotal) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
for (auto& t : consumers) { t.join(); }
std::cout << "Total sum: " << sum.load() << std::endl;
return 0;
}
Lock-free bounded multi-producer multi-consumer queue with optional blocking behaviour.

ObjectPool 对象池

概述

ObjectPool<T> 是一个线程安全的通用对象池,通过回收和重用对象来减少堆分配开销。它特别适用于需要频繁创建和销毁对象的热路径,例如消息缓冲区、序列化上下文等。

三种获取方式

方法 返回类型 自动归还 使用场景
get() unique_ptr<T, PoolDeleter> 单一所有权,推荐默认使用
get_shared() shared_ptr<T> 需要共享所有权
borrow() T*(裸指针) 需要手动控制归还时机

对于 get()get_shared(),当智能指针析构时,对象自动归还到池中而非被删除。borrow() 返回裸指针,必须由调用方手动调用 give_back() 归还。

ResetPolicy 枚举

策略 获取时重置 归还时重置 适用场景
kPolicyNone 不可变或无状态对象
kPolicyRelease 归还前清理(默认策略)
kPolicyAcquire 使用前清理
kPolicyBoth 双侧清理

构造参数

auto pool = std::make_shared<vlink::ObjectPool<Buffer>>(
[]{ return std::make_unique<Buffer>(4096); }, // factory_callback
4, // initial_size: 预填充 4 个对象
16, // max_size: 最多 16 个对象(0 表示无限)
[](Buffer& b){ b.clear(); }, // reset_callback
vlink::ObjectPool<Buffer>::kPolicyRelease // policy: 归还时重置
);

注意:ObjectPool 必须通过 std::make_shared 创建,因为 PoolDeleter 持有 weak_ptr 引向池本身。

线程安全保证

所有公共方法通过内部互斥锁保护,可安全地从多个线程并发调用。但需要注意:

  • 工厂回调在锁外执行,因此工厂回调本身必须是线程安全的
  • 重置回调在锁外执行,同样需要线程安全
  • 如果重置回调抛出异常,借出计数会正确递减,不会泄漏

统计信息

auto s = pool->stats();
// s.pool_size -- 当前池中空闲对象数
// s.borrowed -- 当前被借出的对象数
// s.total_created -- 累计创建的对象总数
// s.max_size -- 最大允许对象数(0 表示无限)

完整示例

#include <iostream>
#include <thread>
#include <vector>
struct Buffer {
std::vector<uint8_t> data;
explicit Buffer(size_t sz) : data(sz) {}
void clear() { std::fill(data.begin(), data.end(), 0); }
};
int main() {
auto pool = std::make_shared<vlink::ObjectPool<Buffer>>(
[]{ return std::make_unique<Buffer>(4096); },
4, // 预填充 4 个
16, // 最多 16 个
[](Buffer& b){ b.clear(); },
);
// RAII 方式:get() 返回 unique_ptr,析构时自动归还
{
auto buf = pool->get();
buf->data[0] = 0x42;
}
// 共享方式:get_shared() 返回 shared_ptr
{
auto buf = pool->get_shared();
auto buf_copy = buf; // 引用计数 +1
}
// 手动方式:borrow/give_back
{
Buffer* raw = pool->borrow();
raw->data[0] = 0xFF;
pool->give_back(raw);
}
auto s = pool->stats();
std::cout << "Pool size: " << s.pool_size
<< ", Borrowed: " << s.borrowed
<< ", Total created: " << s.total_created << std::endl;
return 0;
}
Thread-safe generic object pool with configurable reset policy and RAII ownership.

池耗尽处理

max_size > 0 且所有对象都被借出时,get()get_shared()borrow() 会抛出 std::runtime_error。建议的处理策略:

  • 设置合理的 max_size 上限
  • 使用 try-catch 捕获耗尽异常并做降级处理
  • 通过 stats() 监控池状态,在接近耗尽前发出告警

SpinLock 自旋锁

适用场景

SpinLock 适用于极短的临界区(几条 CPU 指令),在这种场景下 std::mutex 的上下文切换开销远大于等待时间本身。

典型场景:

  • 原子地修改几个关联变量
  • 更新统计计数器
  • 保护简短的指针交换

不适合的场景:

  • 临界区包含 I/O 操作
  • 临界区执行时间不确定
  • 低优先级线程可能被高优先级线程抢占导致优先级反转

自适应退避策略

SpinLock 使用指数退避策略来减少总线竞争:

第 1 轮: 自旋 1 次后 yield_cpu()
第 2 轮: 自旋 2 次后 yield_cpu()
第 3 轮: 自旋 4 次后 yield_cpu()
...
第 N 轮: 自旋 min(2^N, 1024) 次后 yield_cpu()
超过 50000 次总自旋: 记录错误日志并 sleep 10 微秒(安全阀机制)

yield_cpu() 在不同平台上映射为不同的指令:

平台 指令 作用
x86/x64 PAUSE 降低流水线功耗,避免内存序违规
ARM WFE/YIELD 让出执行到事件到达
其他 sched_yield 系统级让出

与 std::mutex 对比

指标 SpinLock std::mutex
锁获取延迟 极低(纳秒级,无竞争时) 中等(涉及系统调用)
竞争时开销 CPU 自旋浪费 线程挂起/唤醒
适合持有时长 极短(纳秒到微秒) 任意
内存开销 64 字节(缓存行对齐) 平台相关(通常 40-80 字节)
递归锁定 死锁 可用 recursive_mutex
优先级反转 可能发生 部分 OS 有优先级继承

API

// 手动方式
lock.lock(); // 获取锁(自旋等待)
// 临界区...
lock.unlock(); // 释放锁
// 尝试获取(非阻塞)
if (lock.try_lock()) {
// 获取成功
lock.unlock();
}
// RAII 方式(推荐)
{
vlink::SpinLockGuard guard(lock);
// 临界区...
} // 自动释放

SpinLock 满足 C++ Lockable 具名要求,也可以与 std::lock_guard 配合使用:

{
std::lock_guard<vlink::SpinLock> guard(lock);
// 临界区...
}

完整示例

#include <thread>
#include <vector>
#include <iostream>
int main() {
int counter = 0;
constexpr int kThreads = 8;
constexpr int kOps = 10000;
std::vector<std::thread> workers;
for (int i = 0; i < kThreads; ++i) {
workers.emplace_back([&]() {
for (int j = 0; j < kOps; ++j) {
vlink::SpinLockGuard guard(lock);
++counter;
}
});
}
for (auto& w : workers) {
w.join();
}
std::cout << "Counter: " << counter << std::endl;
// 输出: Counter: 80000
return 0;
}
Adaptive, cache-line-aligned spin lock and RAII guard.

Semaphore 信号量

概述

Semaphore 是一个进程内计数信号量,提供经典的 P/V(acquire/release)语义。内部基于 std::mutex 和条件变量实现,条件变量使用 CLOCK_MONOTONIC 以避免 Linux 上 NTP 时钟跳变导致的问题。

API

vlink::Semaphore sem(0); // 初始计数为 0
// 获取 n 个许可(默认 1),可选超时
bool ok = sem.acquire(1); // 无限等待
bool ok = sem.acquire(1, 100); // 最多等待 100 毫秒
bool ok = sem.acquire(1, Semaphore::kInfinite); // 无限等待(等同于默认)
// 释放 n 个许可(默认 1),唤醒等待者
sem.release(1);
sem.release(3); // 一次释放 3 个许可
// 重置计数为 0
sem.reset(); // 不中断等待者
sem.reset(true); // 中断所有等待者(acquire 返回 false)
// 查询当前计数(仅用于调试)
size_t count = sem.get_count();

生产者-消费者模式

#include <thread>
#include <queue>
#include <mutex>
#include <iostream>
int main() {
std::queue<int> buffer;
std::mutex mtx;
std::thread producer([&]() {
for (int i = 0; i < 10; ++i) {
{
std::lock_guard<std::mutex> lock(mtx);
buffer.push(i);
}
sem.release();
}
});
std::thread consumer([&]() {
for (int i = 0; i < 10; ++i) {
sem.acquire();
int val;
{
std::lock_guard<std::mutex> lock(mtx);
val = buffer.front();
buffer.pop();
}
std::cout << "Consumed: " << val << std::endl;
}
});
producer.join();
consumer.join();
return 0;
}
In-process counting semaphore with optional timeout.

资源池限流

vlink::Semaphore pool_sem(4); // 最多 4 个并发访问
void access_resource() {
if (pool_sem.acquire(1, 5000)) { // 等待最多 5 秒
// 使用共享资源...
pool_sem.release();
} else {
// 超时处理
}
}

优雅关闭

std::thread worker([&]() {
while (sem.acquire(1)) {
// 处理任务...
}
// acquire 返回 false 表示被中断,退出循环
});
sem.reset(true); // 中断所有等待者
worker.join();

注意事项

  • acquire() 的超时使用毫秒为单位,kInfinite(-1)表示无限等待
  • get_count() 返回的是快照值,在并发环境下仅供调试参考
  • 这是进程内信号量;如需跨进程同步,请使用 SysSemaphore
  • reset(true) 是破坏性操作,仅在受控关闭时使用

并发原语选型决策树

需要任务调度?
|
+-- 需要串行执行 ---------> MessageLoop
|
+-- 需要并行执行
|
+-- 需要定时器 -------> MultiLoop
|
+-- 纯并行计算 -------> ThreadPool
需要线程间数据传输?
|
+-- 固定容量 + 高吞吐 ----> MpmcQueue
|
+-- 需要背压控制 ----------> MpmcQueue + wait_not_full/wait_not_empty
|
+-- 对象重用 + 减少分配 ---> ObjectPool
需要互斥保护?
|
+-- 极短临界区(纳秒级)---> SpinLock
|
+-- 一般临界区 ------------> std::mutex
|
+-- 需要条件等待 ----------> vlink::ConditionVariable(CLOCK_MONOTONIC,避免 NTP 跳变)
需要线程间信号通知?
|
+-- 简单信号 --------------> Semaphore
|
+-- 需要计数限流 ----------> Semaphore(初始计数 = 并发上限)

避免伪共享

伪共享是并发性能的隐形杀手。VLink 的 MpmcQueue 和 SpinLock 已经通过 64 字节对齐避免了内部伪共享。在编写自己的并发数据结构时,请遵循以下原则:

  1. 不同线程频繁访问的原子变量应当分别对齐到独立的缓存行
// 错误:两个原子变量可能共享缓存行
struct Bad {
std::atomic<int> counter_a;
std::atomic<int> counter_b;
};
// 正确:每个原子变量独占缓存行
struct Good {
alignas(64) std::atomic<int> counter_a;
alignas(64) std::atomic<int> counter_b;
};
  1. 数组中的元素如果被不同线程并发访问,每个元素应对齐到缓存行
  2. 只读数据和读写数据分离:将不变的配置字段与频繁更新的状态字段放在不同的缓存行上

组合使用示例

以下示例展示如何组合使用 MultiLoop、MpmcQueue 和 ObjectPool 构建一个高性能的消息处理管道:

#include <iostream>
#include <memory>
struct Message {
int id{0};
std::string payload;
void reset() { id = 0; payload.clear(); }
};
int main() {
auto msg_pool = std::make_shared<vlink::ObjectPool<Message>>(
[]{ return std::make_unique<Message>(); },
8, // 预填充 8 个
64, // 最多 64 个
[](Message& m){ m.reset(); },
);
loop.async_run();
for (int i = 0; i < 100; ++i) {
loop.post_task([&msg_pool, i]() {
auto msg = msg_pool->get();
msg->id = i;
msg->payload = "data_" + std::to_string(i);
// msg 离开作用域时自动归还到池中
});
}
loop.wait_for_idle(5000);
loop.quit();
loop.wait_for_quit(2000);
auto s = msg_pool->stats();
std::cout << "Pool stats - idle: " << s.pool_size
<< ", total created: " << s.total_created << std::endl;
return 0;
}

11.9 IPC 原语

VLink 在 vlink::base 层提供了两个跨进程 IPC 原语类:

头文件 功能
SysSemaphore include/vlink/base/sys_semaphore.h 命名计数信号量
SysSharemem include/vlink/base/sys_sharemem.h 命名共享内存区域

这两个类直接封装操作系统的 IPC 机制,为上层模块和用户代码提供最底层的进程间同步与数据共享能力。它们不依赖任何第三方库,仅使用 POSIX 或 Win32 原生 API。

SysSemaphore 系统信号量

基本概念

SysSemaphore 是一个命名的、跨进程的计数信号量。多个进程通过相同的名称访问同一个内核信号量对象,实现 P/V 操作(即 acquire/release)。

核心特点:

  • **命名语义**:通过操作系统命名空间标识,任何知道名称的进程都可以打开同一个信号量。
  • **计数信号量**:内部维护一个非负整数计数器。release() 增加计数,acquire() 减少计数;当计数为 0 时 acquire() 阻塞调用者。
  • **RAII 管理**:析构函数自动调用 detach() 关闭句柄。

生命周期

构造 --> attach(name) --> acquire()/release() --> detach(force) --> 析构
  1. **构造**:SysSemaphore(count) 创建对象,count 为初始计数(默认 0)。此时尚未关联任何 OS 信号量。
  2. **attach(name)**:创建或打开一个命名信号量。如果该名称的信号量在系统中不存在,则以指定初始计数创建;如果已存在,则直接打开(忽略构造时的初始计数)。
  3. **acquire(n, timeout_ms)**:P 操作,将计数减少 n。如果计数不足则阻塞,直到超时或其他线程/进程释放了足够的许可。
  4. **release(n)**:V 操作,将计数增加 n,唤醒最多 n 个阻塞在 acquire() 上的等待者。
  5. **detach(force)**:关闭句柄。若 force=true 则同时从 OS 命名空间中删除(sem_unlink);若 force=false 则仅关闭本进程的句柄。
  6. **析构**:自动调用 detach(false)(仅关闭句柄,不删除)。

API 参考

class SysSemaphore final {
public:
static constexpr int kInfinite{-1};
explicit SysSemaphore(size_t count = 0);
~SysSemaphore();
bool attach(const std::string& name);
bool detach(bool force = true);
bool acquire(size_t n = 1, int timeout_ms = kInfinite);
void release(size_t n = 1);
bool is_attached() const;
size_t get_count() const;
};
方法 返回值 说明
attach bool 成功返回 true;重复 attach 会返回 false 并打印错误日志
detach bool 未 attach 时返回 false
acquire bool 超时或错误返回 false;timeout_ms=0 为非阻塞尝试
release void n=0 时不做任何操作
is_attached bool 查询当前是否已关联 OS 信号量
get_count size_t 获取当前计数快照(仅用于诊断,值可能随即改变)

POSIX 命名规则

在 POSIX 系统(Linux、macOS)上,命名信号量的名称必须以 / 开头,例如 /vlink_ready/my_sem_01。名称中不能包含额外的 /。名称长度受 NAME_MAX 限制(通常 255 字节)。

在 Windows 上,信号量名称可以是任意非空字符串。

超时机制

timeout_ms 值 行为 底层 API (POSIX)
kInfinite (-1) 无限等待,直到获取到许可 sem_wait()
0 非阻塞尝试,立即返回 sem_timedwait(now)
> 0 最多等待指定毫秒数 sem_timedwait()

SysSharemem 系统共享内存

基本概念

SysSharemem 封装了操作系统命名共享内存区域。一个进程通过 create() 创建并映射一块共享内存,其他进程通过相同名称调用 attach() 将该内存映射到自己的地址空间,从而实现零拷贝的数据共享。

核心特点:

  • **命名语义**:与 SysSemaphore 类似,通过 OS 命名空间标识共享内存对象。
  • **显式创建/附加分离**:create() 分配并映射新区域,attach() 仅映射已存在的区域。
  • **访问模式控制**:支持只读 (kReadOnly) 和读写 (kReadWrite) 两种映射模式。
  • **RAII 管理**:析构函数自动调用 detach(false) 取消映射(但不删除 OS 对象)。

生命周期

创建者: 构造 --> create(name, size) --> data() 读写 --> detach(true) 删除
附加者: 构造 --> attach(name) --> data() 读写 --> detach(false) 仅取消映射

API 参考

class SysSharemem final {
public:
enum Mode : uint8_t {
kReadOnly = 0, // PROT_READ
kReadWrite = 1 // PROT_READ | PROT_WRITE
};
SysSharemem();
~SysSharemem();
bool create(const std::string& name, size_t size, Mode mode = kReadWrite);
bool attach(const std::string& name, Mode mode = kReadWrite);
bool detach(bool force = true);
bool is_attached() const;
void* data();
const void* data() const;
size_t size() const;
};
方法 返回值 说明
create bool 同名已存在时返回 false(O_EXCL 语义)
attach bool 对象不存在时返回 false
detach bool 未 attach 时返回 false
is_attached bool 数据指针、大小、句柄、名称均有效时返回 true
data() void* 未 attach 时返回 nullptr
data() const const void* 只读访问,未 attach 时返回 nullptr
size() size_t 返回映射区域大小(字节),未 attach 时返回 0

访问模式

Mode POSIX mmap 标志 Windows 标志 说明
kReadOnly PROT_READ FILE_MAP_READ 只读映射

| kReadWrite | PROT_READ | PROT_WRITE | FILE_MAP_ALL_ACCESS | 读写映射(默认) |

kReadOnly 模式下,通过 data() 返回的指针写入数据会触发段错误(SIGSEGV / Access Violation)。

跨进程同步模式

SysSemaphoreSysSharemem 配合使用是实现跨进程数据交换的经典模式:

创建者进程 消费者进程
| |
|-- SysSharemem::create("/data", 4096) |
|-- SysSemaphore(0).attach("/notify") |
| |
|-- 写入数据到 shm.data() |-- SysSharemem::attach("/data")
|-- sem.release() ---- 信号 ----> |-- SysSemaphore(0).attach("/notify")
| |-- sem.acquire() // 收到信号
| |-- 读取 shm.data()
| |

与 VLink 传输后端的关系

层级 组件 说明
用户 API 层 Publisher / Subscriber / Client面向业务的发布/订阅/RPC 接口
传输后端层 shm://(Iceoryx)、dds://(FastDDS) 等 封装具体传输协议,用户通过 URL 选择
OS IPC 原语层 SysSemaphoreSysSharemem 直接封装操作系统 API

平台 API 映射表

SysSemaphore 平台 API 映射

操作 Linux (POSIX) macOS (GCD) Windows (Win32)
创建/打开 sem_open() dispatch_semaphore_create() CreateSemaphore()
关闭句柄 sem_close() 释放 dispatch 对象 CloseHandle()
删除(unlink) sem_unlink() 不支持(GCD 信号量无命名空间) 不支持(句柄关闭即释放)
P 操作 sem_wait() dispatch_semaphore_wait() WaitForSingleObjectEx()
P 超时 sem_timedwait() dispatch_semaphore_wait(timeout) WaitForSingleObjectEx(ms)
V 操作 sem_post() dispatch_semaphore_signal() ReleaseSemaphore()
获取计数 sem_getvalue() 不支持 ReleaseSemaphore(0, &cnt)
命名规则 必须以 / 开头 名称被忽略(非命名信号量) 任意非空字符串

注意事项:

  • **macOS**:使用 GCD 的 dispatch_semaphore,不支持命名语义,无法真正跨进程使用。get_count() 也不可用。
  • **Windows**:detach(force) 中的 force 参数被忽略,Win32 信号量在所有句柄关闭后自动销毁。
  • **Linux**:命名信号量在 /dev/shm/ 目录下以 sem. 前缀的文件形式存在。

SysSharemem 平台 API 映射

操作 Linux (POSIX) Windows (Win32) Android
创建 shm_open() + ftruncate() CreateFileMapping() 不支持
打开 shm_open() OpenFileMapping() 不支持
映射 mmap(MAP_SHARED) MapViewOfFile() 不支持
取消映射 munmap() UnmapViewOfFile() 不支持
删除(unlink) shm_unlink() CloseHandle() 不支持
获取大小 fstat() VirtualQuery() 不支持
防止缩小 fcntl(F_ADD_SEALS) 不适用 不支持
命名规则 必须以 / 开头 任意非空字符串 不支持
权限 0600 (owner rw) / 0400 (r) 通过映射标志控制 不支持

注意事项:

  • **Android**:Bionic libc 不支持 shm_open()create()attach() 均返回 false。需使用 ashmemmemfd_create 替代。
  • **QNX**:detach() 时通过 fstat()st_nlink 判断附加计数,最后一个附加者 detach 时自动 unlink。

完整示例

生产者进程

#include <cstdio>
#include <cstring>
struct SharedMessage {
uint32_t seq;
uint32_t length;
char payload[256];
};
int main() {
if (!shm.create("/vlink_demo_shm", sizeof(SharedMessage))) {
std::fprintf(stderr, "Failed to create shared memory\n");
return 1;
}
if (!sem.attach("/vlink_demo_sem")) {
std::fprintf(stderr, "Failed to create semaphore\n");
shm.detach(true);
return 1;
}
auto* msg = static_cast<SharedMessage*>(shm.data());
msg->seq = 1;
const char* text = "Hello from producer!";
msg->length = static_cast<uint32_t>(std::strlen(text));
std::memcpy(msg->payload, text, msg->length + 1);
std::printf("Producer: wrote message seq=%u\n", msg->seq);
sem.release();
sem.detach(true);
shm.detach(true);
return 0;
}
Named, cross-process counting semaphore backed by the OS IPC layer.
Named cross-process shared memory region.

消费者进程

#include <cstdio>
int main() {
if (!shm.attach("/vlink_demo_shm", vlink::SysSharemem::kReadOnly)) {
std::fprintf(stderr, "Failed to attach shared memory\n");
return 1;
}
if (!sem.attach("/vlink_demo_sem")) {
std::fprintf(stderr, "Failed to attach semaphore\n");
shm.detach(false);
return 1;
}
std::printf("Consumer: waiting for data...\n");
if (!sem.acquire(1, 5000)) {
std::fprintf(stderr, "Consumer: timeout waiting for data\n");
sem.detach(false);
shm.detach(false);
return 1;
}
const auto* msg = static_cast<const SharedMessage*>(shm.data());
std::printf("Consumer: received message seq=%u, payload=\"%s\"\n",
msg->seq, msg->payload);
sem.detach(false);
shm.detach(false);
return 0;
}

IPC 原语注意事项

  • **资源泄漏**:POSIX 命名信号量和共享内存在进程崩溃后不会自动从命名空间中删除。应在程序启动时检查并清理遗留对象。
  • **析构行为**:两个类的析构函数均调用 detach(false),仅关闭句柄不删除 OS 对象。如需删除,必须手动调用 detach(true)
  • **并发访问**:共享内存本身不提供同步保护,必须配合 SysSemaphore 或进程间互斥锁使用。
  • **未初始化内存**:SysSharemem::create() 分配的区域不会自动清零,使用前应 std::memset(shm.data(), 0, shm.size())
  • **权限控制**:POSIX 下 create() 使用 0600 权限,不同用户需调整权限。
  • **大小限制**:受 /dev/shm tmpfs 大小限制。大型数据传输建议使用 VLink 的 shm:// 传输后端。
  • **不可拷贝**:SysSemaphoreSysSharemem 均不可拷贝,需通过指针或引用传递。

11.10 任务调度

Schedule – 任务调度包装器

概述

vlink::Schedule 是一个非构造工具结构体,通过 MessageLoop::exec_task() 使用。 它将回调包装在 Config 配置中,支持**延迟、优先级、调度超时和执行超时**, 并返回一个 RAII 句柄用于链式注册延续回调。

Config 字段

字段 含义
delay_ms 任务发布前的延迟(毫秒)
priority 调度优先级(用于 kPriorityType 循环)
schedule_timeout_ms 任务未在此时间内启动则触发超时
execution_timeout_ms 任务执行超过此时间则触发超时

使用示例

loop.async_run();
// void 回调,100 ms 延迟,执行超时 500 ms
loop.exec_task(vlink::Schedule::Config{100, 0, 0, 500},
[] { expensive_op(); })
.on_execution_timeout([] { VLOG_W("task took too long!"); })
.on_catch([](std::exception& e) { VLOG_E("exception: ", e.what()); });
// bool 返回值回调,支持链式 then/else
[]() -> bool { return try_connect(); })
.on_then([]() -> bool {
start_session();
return true;
})
.on_then([]() -> bool {
subscribe_topics();
return true;
})
.on_else([] { retry_later(); })
.on_schedule_timeout([] { VLOG_W("task not scheduled in time"); });

GraphTask – 有向无环图任务调度

DAG 任务调度示例

概述

vlink::GraphTask 实现 DAG(有向无环图)任务调度。 每个任务节点通过 precede() / succeed() 声明依赖关系, 然后调用 execute() 提交到任意兼容引擎(MessageLoopMultiLoopThreadPool)。

任务类型

工厂方法 回调签名 用途
create(callback) void() 普通工作任务
create_condition(cb) int() 条件分支(返回值选择分支)

依赖声明语法

GraphTask 重载了 -- >-- < 运算符用于声明依赖关系:

表达式 等价调用 含义
A -- > B A->succeed(B) B 是 A 的前驱(B 先执行)
A -- < B A->precede(B) A 是 B 的前驱(A 先执行)

也可以使用 precede() / succeed() 方法直接声明依赖。

执行策略

策略 含义
kPolicyOnce 每次 execute() 调用最多运行一次(默认)
kPolicyMultiple 单次 execute() 可运行多次
kPolicyWaitAll 等待所有前驱完成后才运行

使用示例

engine.async_run();
// 创建节点
auto load = vlink::GraphTask::create("load", [] { load_data(); });
auto proc = vlink::GraphTask::create("proc", [] { process(); });
auto save = vlink::GraphTask::create("save", [] { save_data(); });
auto clean = vlink::GraphTask::create("clean", [] { cleanup(); });
// 声明依赖:load -> proc -> save
// -> clean
// 注意:A-- >B 表示 "B 必须在 A 之前完成"(B 是 A 的前驱)
// 所以要表达 load->proc->save,需要反向书写:
save -- > proc -- > load;
clean -- > proc;
// 检测环
assert(!save->has_cycle());
// 监听状态变化
save->register_status_callback([](const std::string& name,
VLOG_D("task ", name, " status=", (int)s);
});
// 从终端节点提交执行(会自动遍历整个可达子图,从根节点开始执行)
save->execute(&engine);
// 条件分支示例
[]() -> int {
return is_valid() ? 0 : 1;
}, /*condition_number=*/2);
auto branch_a = vlink::GraphTask::create("branch_a", [] { handle_valid(); });
auto branch_b = vlink::GraphTask::create("branch_b", [] { handle_invalid(); });
// check 是两个分支的前驱:check 执行后根据返回值选择分支
branch_a -- > check; // 分支 0(check 返回 0 时执行 branch_a)
branch_b -- > check; // 分支 1(check 返回 1 时执行 branch_b)
// 从任一分支提交即可遍历整个图
branch_a->execute(&engine);
// DOT 可视化导出
std::string dot = save->export_to_dot();
DAG-based task graph with condition branching, cycle detection, and DOT export.
#define VLOG_D(...)
定义 logger.h:848

11.11 性能分析

VLink 内置轻量级性能分析工具,用于量化节点和通信操作的 CPU 利用率, 同时提供高性能日志格式化流。

CpuProfiler CPU 分析器

vlink::CpuProfiler 通过 begin() / end() 配对调用,测量 CPU 活跃时间 占总墙钟时间(wall-clock time)的百分比。

工作原理

内部维护两个 ElapsedTimer

计时器 时钟类型 用途
cpu_active_timer_ kCpuActiveTime 每次 begin/end 区间的 CPU 活跃时间
cpu_timestamp_timer_ kCpuTimestamp 从首次 begin 到当前的总墙钟时间

利用率计算公式:

utilisation (%) = (sum of active intervals / total elapsed time) * 100

全局开关

CpuProfiler 的激活受环境变量 VLINK_PROFILER_ENABLE 控制。 该值在首次调用 is_global_enabled() 时读取并缓存,整个进程生命周期内不再变更。

# 启用性能分析
export VLINK_PROFILER_ENABLE=1
# 禁用(默认)
export VLINK_PROFILER_ENABLE=0

API 说明

方法 说明
CpuProfiler() 构造,所有累加器初始化为零
begin() 标记活跃区间开始,重置活跃计时器;首次调用同时启动墙钟计时器
end() 标记活跃区间结束,累加本次活跃时间到 total_active_
get() 返回当前 CPU 利用率百分比 [0.0, 100.0],不重置任何状态
restart() 返回当前利用率并重置所有累加器,之后 get() 返回 0.0
is_global_enabled() (static) 返回环境变量 VLINK_PROFILER_ENABLE 是否为 "1"

注意事项:

  • begin()end() 内部通过 SpinLock 保护,可从任意线程调用
  • 允许连续多次 begin() 而不调用 end()(每次 begin 重置活跃计时器基线)
  • end() 在未调用 begin() 的情况下安全执行,负值会被忽略
  • 实例不可拷贝、不可赋值

基本用法

for (auto& item : work_items) {
profiler.begin();
process(item);
profiler.end();
}
double pct = profiler.get(); // 例如 42.5 表示 42.5%
double reset_pct = profiler.restart();
// 此后 profiler.get() == 0.0
Per-instance CPU utilisation profiler gated by a global environment variable.

CpuProfilerGuard RAII 守护

vlink::CpuProfilerGuard 是一个轻量 RAII 包装器: 构造时调用 profiler->begin(),析构时调用 profiler->end(), 保证即使发生异常也能正确关闭活跃区间。

方法 说明
CpuProfilerGuard(CpuProfiler* profiler) 构造并调用 begin();profiler 为 nullptr 时为空操作
~CpuProfilerGuard() 析构并调用 end();profiler 为 nullptr 时为空操作

注意事项:

  • 传入 nullptr 是安全的,构造和析构均为空操作
  • 不可拷贝、不可移动,仅用作栈上局部变量
  • 在热路径中建议先检查 is_global_enabled() 以跳过构造开销
void process_frame() {
vlink::CpuProfilerGuard guard(&profiler);
// ... 执行工作 ...
} // 离开作用域时自动调用 profiler.end()
// 在热路径中结合全局开关使用
void hot_path_callback(const SensorData& data) {
vlink::CpuProfiler::is_global_enabled() ? &profiler : nullptr);
process(data);
}
RAII guard that automatically calls CpuProfiler::begin() and CpuProfiler::end().

与 Node 的集成

VLink 的所有通信节点(Publisher、Subscriber、Client、Server、Getter、Setter) 内部持有一个可选的 CpuProfiler 实例。当全局性能分析开启时,每次 publish、 receive、request、respond 等操作都会自动通过 CpuProfilerGuard 记录 CPU 活跃时间。

通过 Node::get_cpu_usage() 可获取该节点从创建至今的 CPU 利用率百分比:

vlink::Subscriber<SensorMsg> sub("dds://sensors/lidar");
// ... 运行一段时间后 ...
double cpu = sub.get_cpu_usage();
if (cpu >= 0) {
VLOG_I("subscriber CPU usage: ", cpu, "%");
} else {
VLOG_W("profiler not available (VLINK_PROFILER_ENABLE not set)");
}
返回值 含义
[0.0, 100.0] 正常的 CPU 利用率百分比
-1.0 性能分析器未挂载(未启用 VLINK_PROFILER_ENABLE)

启用步骤:

# 1. 设置环境变量
export VLINK_PROFILER_ENABLE=1
# 2. 启动应用
./my_vlink_app

无需修改代码,通信节点会自动开始采集 CPU 利用率数据。

FastStream 高性能流

vlink::FastStream 是 VLink Logger 内部使用的高性能输出流,继承自 std::ostream, 支持所有标准流格式化操作符,同时提供零拷贝的缓冲区交付机制。

设计要点

  • 内部由一个 StringBuf(继承自 std::streambuf)驱动,数据存储在 std::string
  • 默认初始容量 256 字节,自动增长,增长步幅上限为 8 KiB(kMaxExpandSize
  • take_view() 返回内部缓冲区的 std::string_view,实现零拷贝交付
  • write_raw() 绕过 std::ostream 格式化,直接写入原始字节,适合预格式化字符串
  • 非线程安全,Logger 内部采用 thread_local 实例

API 说明

方法 说明
FastStream() 构造,初始容量 256 字节
reset() 清空缓冲区内容并重置流状态,不释放内存
take_view() 返回缓冲区内容的 string_view,有效期至下次写入
append_to(std::string& target) 将缓冲区内容追加到外部字符串,不重置缓冲区
size() 返回当前缓冲区已写入字节数
capacity() 返回当前缓冲区分配容量(字节)
shrink_to_fit() 释放多余容量,最小保留 64 字节(kMinCapacity
write_raw(const char* data, size_t len) 直接写入原始字节,绕过 ostream 格式化,返回 *this
operator<< 兼容所有 std::ostream 格式化操作符

缓冲区管理

常量 说明
kDefaultCapacity 256 构造时的初始缓冲区容量(字节)
kMinCapacity 64 shrink_to_fit 后的最小容量
kMaxExpandSize 8192 单次增长的上限步幅(8 KiB)

用法示例

stream << "sensor_id=" << 42 << " value=" << 3.14;
std::string_view view = stream.take_view();
// view == "sensor_id=42 value=3.14"
write_to_sink(view); // 零拷贝交付
// 原始字节写入
stream.reset();
const char* header = "[INFO] ";
stream.write_raw(header, 7);
stream << "message content";
// 内存回收
stream.reset();
stream.shrink_to_fit();
A high-performance std::ostream backed by a resizable std::string buffer.

完整代码示例

手动性能分析

int main() {
vlink::Logger::init("profiler-demo");
VLOG_W("profiler is disabled, set VLINK_PROFILER_ENABLE=1 to enable");
}
// 方式 1: 手动 begin/end
profiler.begin();
heavy_computation();
profiler.end();
VLOG_I("CPU usage after computation: ", profiler.get(), "%");
// 方式 2: RAII guard
{
vlink::CpuProfilerGuard guard(&profiler);
another_computation();
}
VLOG_I("CPU usage after both: ", profiler.get(), "%");
double final_usage = profiler.restart();
VLOG_I("final usage: ", final_usage, "%");
VLOG_I("after restart: ", profiler.get(), "%"); // 0.0
return 0;
}

11.12 工具类

Format – 轻量格式化器

概述

vlink::format 是一个仅头文件的轻量 {} 占位符格式化器,专为日志热路径设计。 所有格式化写入栈分配缓冲区或用户提供的输出迭代器,**不触及堆**。

支持的类型

C++ 类型 输出示例
int / short / signed char 42
unsigned / unsigned short / ... 42
long / long long 123456789
bool true / false
char A
float / double 3.14
const char* / std::string / std::string_view hello
任意指针 T* 0x7ffe1234
任意枚举 底层整数值
其他自定义类型 编译期 static_assert 报错

占位符语法:

  • {} – 按顺序消耗参数
  • {0}, {1} – 显式位置索引
  • {{ / }} – 字面量花括号

使用示例

char buf[128];
// format_to_n:写入至多 n 个字符
buf, sizeof(buf) - 1, "x={} y={} name={}", 3, 4.5, "world");
buf[result.size] = '\0';
// buf == "x=3 y=4.5 name=world"
// 检查是否截断
if (result.truncated) {
// 实际内容超过缓冲区大小
}
// 固定数组重载
char arr[64];
auto r2 = vlink::format::format_to(arr, "value={}", 42);
// 输出迭代器重载
std::string out;
vlink::format::format_to(std::back_inserter(out), "n={}", 99);
Lightweight header-only {} placeholder formatter with no dynamic allocation.

Utils – 系统工具函数

概述

vlink::Utils 命名空间提供跨平台的系统级工具函数,覆盖进程、线程、网络、信号等方面。 所有函数均为 noexcept,错误以空字符串、false 或哨兵值(如 pid == -1)表示。

进程与应用信息

std::string path = vlink::Utils::get_app_path(); // 可执行文件完整路径
std::string dir = vlink::Utils::get_app_dir(); // 所在目录
std::string name = vlink::Utils::get_app_name(); // 可执行文件名
std::string host = vlink::Utils::get_host_name(); // 主机名
int32_t pid = vlink::Utils::get_pid(); // 进程 ID
std::string tmp = vlink::Utils::get_tmp_dir(); // 临时目录
std::string mid = vlink::Utils::get_machine_id();// 机器唯一 ID
Platform-agnostic system utilities for process, thread, network and signal management.

环境变量

std::string val = vlink::Utils::get_env("MY_VAR", "default");
vlink::Utils::set_env("MY_VAR", "value");

线程管理

// 设置当前线程名(Linux 最多 15 字符)
// 设置调度策略和优先级(需要 CAP_SYS_NICE)
// CPU 亲和性(bit mask:bit 0 = core 0)
vlink::Utils::set_thread_stick(0b0011); // 绑定到 core 0 和 core 1
// 获取原生线程 ID
// CPU 让步(最优机器指令:x86 PAUSE, ARM YIELD, RISC-V fence)

信号处理

// 注册终止信号处理器(SIGTERM + SIGINT)
[](int sig) {
VLOG_I("terminating, signal=", sig);
app.shutdown();
},
/*is_async=*/false,
/*pass_through=*/false);
// 注册崩溃信号处理器(SIGSEGV, SIGABRT 等)
});

网络工具

auto ipv4_list = vlink::Utils::get_all_ipv4_address(/*filter_available=*/true);
std::string iface = vlink::Utils::get_interface_name_by_ipv4("192.168.1.100");

其他工具

// 进程单例守护(防止重复启动)
VLOG_F("another instance is already running");
}
// 等待设备节点出现
vlink::Utils::wait_for_device("/dev/video0", /*timeout_ms=*/5000);
// CPU 和内存使用率
// 释放系统内存(malloc_trim)
// 时区偏差(秒)
int32_t tz = vlink::Utils::get_timezone_diff(); // UTC+8 -> 28800