VLink 提供完整的消息录制与回放功能,支持将通信消息持久化到文件, 并在离线状态下以任意速率重新播放。这一能力类似于 ROS 的 rosbag, 可用于调试、数据分析、仿真回灌等场景。
**相关文档**:CLI 录制/回放工具 vlink-bag 的详细用法参见 13-cli-tools.md;可视化回放器参见 14-viewer.md;录制相关环境变量参见 21-environment-vars.md。
概念与架构
Bag 录制与回放架构
录制与回放完整流程
录制与回放流程
文件格式支持
| 格式 | 扩展名 | 后端实现 | 压缩算法 |
| VDB | .vdb / .vdbx | DatabaseWriter/Reader (SQLite) | LZAV(唯一实际算法) |
| VCAP | .vcap / .vcapx | McapWriter/Reader | Zstandard(唯一实际算法) |
BagWriter::create() 和 BagReader::create() 按文件扩展名自动选择实现: .vcap / .vcapx 走 MCAP,其他扩展名一律走 SQLite。两种后端共用统一的 BagWriter / BagReader 抽象接口和 Config 结构。
BagWriter — 录制接口
概述
vlink::BagWriter 继承自 MessageLoop,所有写入操作在内部循环线程上异步执行。 push() 方法线程安全、非阻塞,适合在通信回调中直接调用。
创建 Writer
writer->async_run();
Abstract base class for VLink bag file recording with split, compression and global writer support.
@ kCompressAuto
Automatic algorithm selection.
Definition bag_writer.h:109
static std::shared_ptr< BagWriter > create(const std::string &path, const Config &config={})
Creates a concrete BagWriter instance for path.
Configuration for recording behaviour, splitting, compression, and limits.
Definition bag_writer.h:122
int64_t max_task_depth
Max pending write tasks in the queue.
Definition bag_writer.h:138
std::string tag_name
Optional tag name stored in the bag header.
Definition bag_writer.h:123
int64_t split_by_time
Split file every N milliseconds. 0 = disabled.
Definition bag_writer.h:133
CompressType compress
Compression algorithm.
Definition bag_writer.h:124
int64_t split_by_size
Split file when it reaches this size (bytes).
Definition bag_writer.h:132
bool wal_mode
Enable SQLite WAL mode for crash resilience.
Definition bag_writer.h:125
录制消息
writer->push(
"dds://sensors/lidar",
"demo.proto.PointCloud",
payload
);
int64_t ts = get_my_timestamp_us();
frame_data, &ts);
debug_data, nullptr, true);
Versatile 128-byte byte buffer with SBO, five ownership modes and compression helpers.
Definition bytes.h:113
@ kProtobuf
Decode using the Protocol Buffers stack.
Definition types.h:188
@ kRaw
Treat the payload as opaque/raw bytes.
Definition types.h:186
@ kPublish
Message published by a Publisher node.
Definition types.h:168
压缩类型
CompressType 枚举(bag_writer.h):
| 枚举 | 值 |
| kCompressNone | 0 |
| kCompressAuto | 1 |
| kCompressZstd | 2 |
| kCompressLz4 | 3 |
| kCompressLzav | 4 |
**各后端的实际行为**(源码参见 database_writer.cc:184、mcap_writer.cc:163):
| 后端 | 启用压缩条件 | 实际使用算法 | 其他枚举值 |
| SQLite(<tt>.vdb / .vdbx) | kCompressAuto 或 kCompressLzav | 仅 LZAV | kCompressZstd / kCompressLz4 / kCompressNone 一律不压缩 |
| MCAP(<tt>.vcap / .vcapx) | kCompressAuto 或 kCompressZstd | 仅 Zstandard | kCompressLz4 / kCompressLzav / kCompressNone 一律不压缩;若编译时未启用 ENABLE_ZSTD 也不压缩 |
**枚举名不代表后端实际支持**:文档里不要写"SQLite 支持 zstd/lz4"或"MCAP 支持 LZAV"。
**其他压缩相关参数**:
- compress_start_size(默认 128 字节):小于此大小的 payload 不压缩。
- compress_level:SQLite 后端仅区分 > 3(LZAV 高压缩比模式)与 <= 3(普通模式); MCAP 后端映射到 mcap::CompressionLevel(0=Default、1=Fastest、2=Fast、3=Default、4=Slow、5=Slowest)。
- ignore_compress_urls:集合中的 URL 永不压缩,即使启用了压缩。
文件分割
writer->register_split_callback(
[](int index, const std::string& filename) {
VLOG_I(
"split #", index,
" -> ", filename);
},
false
);
#define VLOG_I(...)
Definition logger.h:850
bool split_name_by_time
Append timestamp to split file names.
Definition bag_writer.h:127
分割文件命名规则:
- 主文件:recording.vdb
- 分割 1:recording_1.vdb(或 recording_1_20260318_120000.vdb)
- 分割 2:recording_2.vdb
Schema 嵌入
writer->register_schema_callback(
return get_schema_for_type(ser_type, schema_type);
});
schema.
name =
"sensors.LidarPoint";
schema.
data = proto_file_descriptor_bytes;
writer->push_schema(schema);
SchemaType
Coarse runtime schema family used by discovery, bag metadata, and proxy routing.
Definition types.h:184
Carries one serialized schema blob for runtime registration or embedding.
Definition types.h:246
SchemaType schema_type
Coarse runtime schema family derived from encoding.
Definition types.h:249
Bytes data
Raw serialized schema bytes (e.g. FileDescriptorSet or BFBS).
Definition types.h:250
std::string encoding
Schema encoding identifier (e.g. "protobuf" or "flatbuffers").
Definition types.h:248
std::string name
Schema subject name, typically a fully-qualified message or table type.
Definition types.h:247
Config 参数完整说明
| 参数 | 默认值 | 说明 |
| tag_name | 空 | 录制标签,存储在文件头 |
| compress | kCompressNone | 压缩算法 |
| wal_mode | false | SQLite WAL 模式,提高崩溃恢复能力 |
| enable_limit | false | 启用行数/字节数上限 |
| split_name_by_time | false | 分割文件名附加时间戳 |
| sync_mode | false | 同步写盘(更安全但更慢) |
| optimize_on_exit | false | 关闭时执行 VACUUM/优化 |
| max_row_count | 50 亿 | 最大消息行数(超出后分割) |
| max_bytes_size | 512 GiB | 最大文件字节数 |
| split_by_size | 1 GiB | 按大小分割阈值 |
| split_by_time | 0(禁用) | 按时间分割(毫秒) |
| cache_size | 4 MiB | SQLite 页缓存大小 |
| begin_time | 0 | 录制起始时间戳(毫秒),0 表示立即开始 |
| compress_start_size | 128 bytes | 小于此大小不压缩 |
| compress_level | 3 | 压缩级别(算法相关) |
| max_task_depth | 20000 | 最大排队写入任务数 |
| max_memory_size | 2 GiB | 最大内存缓存大小 |
| start_timestamp | 0 | 覆盖 bag 起始时间戳(毫秒),0 使用系统时间 |
| ignore_compress_urls | 空集合 | 这些 URL 的消息永不压缩 |
全局 Writer(环境变量激活)
# 必选:设置后首次调用 global_get() 会用默认 Config 自动 create() 并 async_run()
export VLINK_BAG_PATH=/data/auto_record.vdb
# 可选:当某个 Writer 的 Config::tag_name 为空时,写入的 tag_name 会回退到此值
# (未设置时默认为字符串 "Empty")
export VLINK_BAG_TAG=my_session
if (gw) {
}
static BagWriter * global_get()
Returns the process-global BagWriter activated by the VLINK_BAG_PATH environment variable.
全局 Writer 由进程级静态变量持有,析构时自动 flush。注意全局 Writer 的 Config 固定为默认值(不读取 VLINK_BAG_TAG 作为 Config::tag_name); VLINK_BAG_TAG 仅作为所有 Writer 在 Config::tag_name 为空时的兜底。
查找已创建的 Writer
if (existing) {
existing->push(...);
}
static std::shared_ptr< BagWriter > filter_get(const std::string &path)
Returns an existing writer for path, or creates and starts a new one.
BagReader — 回放接口
概述
vlink::BagReader 继承自 MessageLoop,回放在内部循环线程上驱动。 可配置回放速率、时间范围、循环次数和 URL 过滤。
创建 Reader
false);
true,
true);
Abstract base class for VLink bag file playback with time-based seeking and rate control.
static std::shared_ptr< BagReader > create(const std::string &path, bool read_only=true, bool try_to_fix=false)
Creates a concrete BagReader for path, selecting the implementation by extension.
读取 Bag 信息
打开后立即可读取文件元数据(无需启动回放):
const auto& info = reader->get_info();
VLOG_I(
"file: ", info.file_name);
VLOG_I(
"duration: ", info.total_duration / 1000,
" seconds");
VLOG_I(
"messages: ", info.message_count);
VLOG_I(
"version: ", info.version);
VLOG_I(
"compression: ", info.compression_type);
for (const auto& meta : info.url_metas) {
" count=", meta.count,
" freq=", meta.freq, " Hz",
" size=", meta.size / 1024, " KB",
" ser=", meta.ser_type);
}
注册回调
reader->register_output_callback(
[](int64_t timestamp, const std::string& url,
VLOG_I(
"ts=", timestamp,
" url=", url,
});
const char* names[] = {"stopped", "paused", "playing"};
VLOG_I(
"playback status: ", names[(
int)s]);
});
reader->register_ready_callback([] {
});
reader->register_finish_callback([](bool interrupted) {
VLOG_I(
"playback finished, interrupted=", interrupted);
});
Status
Playback state of the reader.
Definition bag_reader.h:109
size_t size() const noexcept
Returns the number of usable bytes (excluding the prefix offset region).
Definition bytes.h:868
ActionType
Identifies the type of message action for recording purposes.
Definition types.h:162
启动回放
reader->async_run();
cfg.
filter_urls = {
"dds://sensors/lidar",
"dds://sensors/camera"};
reader->play(cfg);
Playback configuration passed to play().
Definition bag_reader.h:181
bool skip_blank
If true, skip silent gaps between messages.
Definition bag_reader.h:186
int64_t begin_time
Playback start timestamp (ms). 0 = from beginning.
Definition bag_reader.h:182
std::unordered_set< std::string > filter_urls
Whitelist of URLs to play. Empty = all URLs.
Definition bag_reader.h:190
int times
Number of loops. kInfinite (-1) = loop forever.
Definition bag_reader.h:184
double rate
Playback rate multiplier. 1.0 = real time.
Definition bag_reader.h:185
int64_t end_time
Playback end timestamp (ms). 0 = until end.
Definition bag_reader.h:183
回放控制
reader->pause();
reader->resume();
reader->pause_to_next();
reader->jump(5 * 1000LL, 1.0, 1,
true);
reader->stop();
int64_t current_ts = reader->get_timestamp();
int64_t elapsed_real = reader->get_real_timestamp();
bool is_jumping = reader->is_jumping();
速率控制示例
static constexpr int kInfinite
Sentinel value for the Config::times field to indicate endless loop playback.
Definition bag_reader.h:98
int64_t force_delay
Override inter-message delay (ms). -1 = use timestamps.
Definition bag_reader.h:187
bool auto_quit
If true, quit the loop thread when playback ends.
Definition bag_reader.h:189
时间范围过滤
文件完整性与修复
auto check_future = reader->check();
bool ok = check_future.get();
auto reindex_future = reader->reindex();
bool reindexed = reindex_future.get();
auto fix_future = reader->fix(false);
bool fixed = fix_future.get();
auto rebuild_future = reader->fix(true);
bool rebuilt = rebuild_future.get();
Proto Schema 检测
auto schemas = reader->detect_schema();
for (const auto& s : schemas) {
}
std::string ser = reader->get_ser_type("dds://sensors/lidar");
McapWriter / McapReader — MCAP 格式
MCAP(Message Capture Archive Protocol)是面向时间序列消息的索引化二进制格式, 可被 Foxglove Studio 直接打开。VLink 的 MCAP 支持需要编译时启用 ENABLE_ZSTD 才能启用压缩。
McapWriter
auto writer = std::make_shared<vlink::McapWriter>("/data/recording.vcap", config);
writer->async_run();
@ kCompressZstd
Zstandard compression.
Definition bag_writer.h:110
Concrete BagWriter implementation for the MCAP bag file format.
McapReader
auto reader = std::make_shared<vlink::McapReader>("/data/recording.vcap");
reader->register_output_callback([](int64_t ts, const std::string& url,
});
reader->async_run();
reader->play(cfg);
Concrete BagReader implementation for the MCAP bag file format.
MCAP 格式特点:
- 文件头包含 Schema 和 Channel 元数据,支持离线自省。
- 支持随机访问(索引化)。
- 可被 Foxglove Studio 直接打开可视化。
- 与 .vdb 共用同一个 Config 结构和 play() / register_output_callback() 接口; 压缩算法固定为 Zstandard。
BagReaderProcessor — 多文件时序合并
功能概述
vlink::BagReaderProcessor 是一个时序排序处理器,用于同时读取**多个 BagReader** (如录制时按大小或时间分割产生的多个文件),将来自不同文件的消息按时间戳排序后, 以正确的时序顺序输出到统一的回调。
典型使用场景:
- 分片录制文件的有序回放(如按 1 GiB 分割的多个 .vdb 文件)
- 多传感器分别录制后的时间对齐合并
- 多源数据流的离线时序重建
原理
BagReaderProcessor Flow
内部维护一个基于 std::deque 的排序队列和一个独立的处理线程。 多个 Reader 的回调线程通过 push() 将消息送入队列,处理器在缓冲窗口 (min_cache_time)满足后,按时间戳顺序逐条输出到 OutputCallback。
工作流程:
- 多个 BagReader 各自在独立线程中读取消息
- 每条消息通过 push() 进入 BagReaderProcessor 的内部队列(线程安全)
- 处理线程检查队列首尾时间差是否超过 min_cache_time
- 满足条件后,按原始推送时间间隔依次输出,还原录制时的时序节奏
- 当缓存大小达到 max_cache_size 上限时,push() 会阻塞等待消费
Config 配置
struct Config final {
int64_t min_cache_time{500};
int64_t max_cache_size{1024UL * 1024UL * 256};
};
| 参数 | 默认值 | 说明 |
| min_cache_time | 500 ms | 队列首尾时间差达到此值后才开始输出,用于吸收乱序 |
| max_cache_size | 256 MiB | 缓存字节上限,超过时 push() 阻塞等待消费 |
min_cache_time 的选取建议:
- 设置过小(如 100 ms)可能导致来自不同文件的消息未完全排序就被输出
- 设置过大(如 5000 ms)会增加内存占用和输出延迟
- 通常 500 ms 可满足大多数场景
API 说明
| 方法 | 说明 |
| BagReaderProcessor(const Config& config = Config()) | 构造并启动内部处理线程 |
| ~BagReaderProcessor() | 析构,刷新剩余缓存消息并停止处理线程 |
| register_output_callback(OutputCallback&& cb) | 注册时序排序后的输出回调,仅支持一个 |
| push(int64_t timestamp, const string& url, ActionType action, const Bytes& data) | 推入一条消息,线程安全,可能阻塞 |
OutputCallback 签名:
using OutputCallback = std::function<void(
int64_t timestamp,
const std::string& url,
ActionType action_type,
const Bytes& data
)>;
基本使用示例
int main() {
processor.register_output_callback(
[](int64_t ts, const std::string& url,
VLOG_I(
"[ordered] ts=", ts,
" url=", url,
});
reader_a->register_output_callback(
[&](int64_t ts, const std::string& url,
processor.push(ts, url, action, data);
});
reader_b->register_output_callback(
[&](int64_t ts, const std::string& url,
processor.push(ts, url, action, data);
});
reader_a->async_run();
reader_b->async_run();
reader_a->play(cfg);
reader_b->play(cfg);
reader_a->wait_for_quit();
reader_b->wait_for_quit();
return 0;
}
Time-ordered message buffer for smoothing bag playback across split files.
Time-sorted message relay that buffers and orders messages before delivery.
Definition bag_reader_processor.h:76
static void init(const std::string &app_name="", const std::string &log_path="") noexcept
Initialises the logger singleton.
Global singleton logger with three output styles and pluggable backends.
Configuration for the time-ordered message cache.
Definition bag_reader_processor.h:91
int64_t max_cache_size
Maximum cache size in bytes (256 MiB).
Definition bag_reader_processor.h:93
int64_t min_cache_time
Minimum cache time in milliseconds before flushing.
Definition bag_reader_processor.h:92
多传感器分割文件合并回放
int main() {
std::vector<std::string> files = {
"/data/drive_0.vdb",
"/data/drive_1.vdb",
"/data/drive_2.vdb"
};
[&](int64_t ts, const std::string& url,
pub.publish(data);
});
std::vector<std::shared_ptr<vlink::BagReader>> readers;
for (const auto& file : files) {
reader->register_output_callback(
[&](int64_t ts, const std::string& url,
processor.
push(ts, url, action, data);
});
reader->async_run();
readers.push_back(reader);
}
for (auto& reader : readers) {
reader->play(cfg);
}
for (auto& reader : readers) {
reader->wait_for_quit();
}
return 0;
}
void register_output_callback(OutputCallback &&output_callback)
Registers the callback that receives time-ordered messages.
void push(int64_t timestamp, const std::string &url, ActionType action_type, const Bytes &data)
Pushes a message into the time-ordered cache.
Type-safe publisher for the VLink event communication model.
Definition publisher.h:102
注意事项
- push() 是线程安全的,可从多个 Reader 的回调线程并发调用
- 当缓存达到 max_cache_size 上限时,push() 会阻塞直到消费线程释放空间
- 析构时会自动刷新队列中的剩余消息并停止处理线程
- 仅支持注册一个 OutputCallback,后续注册会替换前一个
- OutputCallback 在内部处理线程中调用,回调内不应执行长耗时操作
支持的序列化格式
| ser_type 字符串示例 | 序列化格式 | 说明 |
| "demo.proto.PointCloud" | Protocol Buffers | 具体消息类型名,schema_type 应为 kProtobuf |
| "demo.fbs.CameraFrame" | FlatBuffers | 具体表类型名,schema_type 应为 kFlatbuffers |
| "cdr" | CDR(DDS 格式) | DDS 传输原生格式 |
| "raw" | POD / 原始字节 | 无序列化,直接存储,schema_type 通常为 kRaw |
| "string" | std::string | UTF-8 字符串,schema_type 通常为 kRaw |
| "custom" | 自定义 | 自定义负载;若无 protobuf/fbs 家族信息,schema_type 通常为 kRaw |
完整的序列化格式列表参见 06-serialization.md。
对 bag/proxy/viewer/webviz/monitor 这一整条运行时链路来说,schema_type 是显式路由信息。 只有确实拿不到 schema 家族时才应使用 kUnknown;对 raw / text / json / 自定义字节流,应该优先写入 kRaw。
录制时 ser_type 原样存入文件,回放时原样提供给 OutputCallback, 应用层根据此字段选择对应的反序列化方式。
与 VLink 通信 API 集成
在 VLink 节点内录制时,将 BagWriter 注入通信回调是最简洁的模式:
writer->async_run();
sub.listen([&](const LidarPoint& msg) {
process_lidar(msg);
});
Type-safe subscriber for the VLink event communication model.
Definition subscriber.h:110
bool serialize(const T &src, Bytes &des, TransportType transport, uint8_t offset)
Serializes src into des with explicit type and transport hints.
Definition serializer-inl.h:355
回放时反向操作:
reader->register_output_callback(
[&](int64_t ts, const std::string& url,
if (url == "dds://sensors/lidar") {
LidarPoint msg;
pub.publish(msg);
}
}
});
reader->async_run();
reader->play(cfg);
bool deserialize(const Bytes &src, T &des, TransportType transport)
Deserializes src bytes into des with explicit type and transport hints.
Definition serializer-inl.h:488
与 CLI 工具 vlink-bag 的关联
vlink-bag 是命令行工具,底层用的正是本章讨论的 BagWriter / BagReader / McapWriter / McapReader API。完整参数见 13-cli-tools.md。
八个子命令:record / play / info / clone / check / reindex / fix / tag。
# 录制(录所有发现到的 URL)
vlink-bag record /data/recording.vdb
# 录制指定 URL
vlink-bag record /data/sensors.vdb -u dds://sensors/lidar dds://sensors/camera
# 带压缩录制(SQLite 下启用 LZAV,MCAP 下启用 Zstd)
vlink-bag record /data/recording.vdb -p
# 回放(默认实时速率)
vlink-bag play /data/recording.vdb
# 2x 加速回放
vlink-bag play /data/recording.vdb -r 2.0
# 其余子命令
vlink-bag info /data/recording.vdb
vlink-bag check /data/recording.vdb
vlink-bag reindex /data/recording.vdb
vlink-bag fix /data/recording.vdb
vlink-bag clone /data/recording.vdb /data/copy.vdb
vlink-bag tag /data/recording.vdb new_tag_name
通过环境变量快速开启进程级录制(详见上节"全局 Writer"):
export VLINK_BAG_PATH=/data/auto_record.vdb
./my_vlink_app
完整录制示例
int main() {
writer->register_split_callback(
[](int idx, const std::string& fname) {
VLOG_I(
"new split file: ", fname);
}, false);
writer->async_run();
writer->quit();
});
int seq = 0;
while (writer->is_running()) {
std::memset(data.
data(), seq & 0xFF, 256);
seq++;
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
writer->wait_for_quit();
VLOG_I(
"recording saved, splits=", writer->get_split_index());
return 0;
}
@ kCompressLzav
LZAV built-in compression.
Definition bag_writer.h:112
uint8_t * data() noexcept
Returns a pointer to the start of the user data region (after the prefix offset).
Definition bytes.h:860
static Bytes create(size_t size, uint8_t offset=0) noexcept
Creates an owned Bytes buffer of the given size.
VLINK_EXPORT void register_terminate_signal(std::function< void(int)> &&callback, bool is_async=false, bool pass_through=false) noexcept
Registers a callback for graceful termination signals (SIGTERM, SIGINT, etc.).
int64_t compress_level
Compression level (codec-specific).
Definition bag_writer.h:137
Platform-agnostic system utilities for process, thread, network and signal management.
完整回放示例
int main(int argc, char* argv[]) {
if (argc < 2) {
VLOG_F(
"usage: ", argv[0],
" <bag_file>");
}
const auto& info = reader->get_info();
VLOG_I(
"file: ", info.file_name);
VLOG_I(
"duration: ", info.total_duration / 1000,
" s");
VLOG_I(
"messages: ", info.message_count);
for (const auto& m : info.url_metas) {
m.count, " msgs @ ",
m.freq, " Hz, ser=", m.ser_type);
}
}
});
reader->register_output_callback(
[](int64_t ts, const std::string& url,
VLOG_D(
"ts=", ts,
" url=", url,
});
reader->async_run();
reader->play(cfg);
reader->wait_for_quit();
return 0;
}
@ kStoped
Stopped (not playing).
Definition bag_reader.h:110
#define VLOG_D(...)
Definition logger.h:848
#define VLOG_F(...)
Definition logger.h:856