VLink 2.0.0
A high-performance communication middleware
载入中...
搜索中...
未找到
bag_writer.h
浏览该文件的文档.
1/*
2 * Copyright (C) 2026 by Thun Lu. All rights reserved.
3 * Author: Thun Lu <thun.lu@zohomail.cn>
4 * Repo: https://github.com/thun-res/vlink
5 * _ __ __ _ __
6 * | | / / / / (_) ____ / /__
7 * | | / / / / / / / __ \ / //_/
8 * | |/ / / /___ / / / / / / / ,<
9 * |___/ /_____/ /_/ /_/ /_/ /_/|_|
10 *
11 * Licensed under the Apache License, Version 2.0 (the "License");
12 * you may not use this file except in compliance with the License.
13 * You may obtain a copy of the License at
14 *
15 * http://www.apache.org/licenses/LICENSE-2.0
16 *
17 * Unless required by applicable law or agreed to in writing, software
18 * distributed under the License is distributed on an "AS IS" BASIS,
19 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20 * See the License for the specific language governing permissions and
21 * limitations under the License.
22 */
23
24/**
25 * @file bag_writer.h
26 * @brief Abstract base class for VLink bag file recording with split, compression and global writer support.
27 *
28 * @details
29 * @c BagWriter is an abstract @c MessageLoop-based recorder that captures VLink messages
30 * (URL + serialisation type + payload) to a bag file. Concrete implementations are
31 * @c DatabaseWriter (SQLite-backed) and @c McapWriter (MCAP-format).
32 *
33 * Key features:
34 * - Asynchronous recording via the inherited @c MessageLoop queue.
35 * - Pluggable compression: none, auto, Zstd, LZ4, LZAV.
36 * - File splitting by size or by time, with optional time-stamped names.
37 * - WAL (Write-Ahead Log) mode for crash resilience.
38 * - Global singleton writer activated by the @c VLINK_BAG_PATH environment variable.
39 * - Schema embedding for offline introspection.
40 *
41 * @par Creating a writer
42 * @code
43 * auto writer = vlink::BagWriter::create("/data/log.vdb");
44 * writer->async_run();
45 * writer->push("dds://my/topic", "demo.proto.PointCloud", vlink::SchemaType::kProtobuf,
46 * vlink::ActionType::kPublish, data);
47 * @endcode
48 *
49 * @par Global writer
50 * @code
51 * // Set VLINK_BAG_PATH=/data/log.vdb before launching the process.
52 * // Then retrieve the global instance anywhere:
53 * auto* gw = vlink::BagWriter::global_get();
54 * if (gw) {
55 * gw->push("intra://my/topic", "raw", vlink::SchemaType::kRaw, vlink::ActionType::kPublish, data);
56 * }
57 * @endcode
58 *
59 * @note
60 * - @c create() selects the concrete implementation based on the file extension
61 * (@c .vcap / @c .vcapx -> McapWriter, otherwise -> DatabaseWriter).
62 * - @c push() is thread-safe and non-blocking; recording is done on the loop thread.
63 * - The @c immediate flag bypasses the task queue and writes synchronously (use with care).
64 */
65
66#pragma once
67
68#include <chrono>
69#include <cstdint>
70#include <functional>
71#include <memory>
72#include <string>
73#include <string_view>
74#include <unordered_set>
75
76#include "../base/bytes.h"
77#include "../base/macros.h"
79#include "../impl/types.h"
80
81namespace vlink {
82
84
85/**
86 * @class BagWriter
87 * @brief Abstract asynchronous message recorder backed by a @c MessageLoop event queue.
88 *
89 * @details
90 * Must be constructed via @c create() (for managed lifetime) or directly (for custom ownership).
91 * After construction call @c async_run() to start the recording loop, then use @c push() to
92 * record messages.
93 */
95 public:
96 /**
97 * @brief Compression algorithm applied to each recorded payload.
98 *
99 * | Kind | Algorithm | Notes |
100 * | --------------- | ------------ | --------------------------------------- |
101 * | kCompressNone | No compress | Raw bytes stored as-is |
102 * | kCompressAuto | Auto select | Picks best algorithm per payload |
103 * | kCompressZstd | Zstandard | Good ratio, moderate speed |
104 * | kCompressLz4 | LZ4 | Fast compression/decompression |
105 * | kCompressLzav | LZAV | Fast, lightweight, built-in |
106 */
107 enum CompressType : uint8_t {
108 kCompressNone = 0, ///< No compression.
109 kCompressAuto = 1, ///< Automatic algorithm selection.
110 kCompressZstd = 2, ///< Zstandard compression.
111 kCompressLz4 = 3, ///< LZ4 compression.
112 kCompressLzav = 4, ///< LZAV built-in compression.
113 };
114
115 /**
116 * @struct Config
117 * @brief Configuration for recording behaviour, splitting, compression, and limits.
118 *
119 * @details
120 * All size fields are in bytes; all time fields are in milliseconds unless noted otherwise.
121 */
122 struct Config final {
123 std::string tag_name; ///< Optional tag name stored in the bag header.
125 bool wal_mode{false}; ///< Enable SQLite WAL mode for crash resilience.
126 bool enable_limit{false}; ///< Enable max_row_count / max_bytes_size limits.
127 bool split_name_by_time{false}; ///< Append timestamp to split file names.
128 bool sync_mode{false}; ///< Enable synchronous writes to disk.
129 bool optimize_on_exit{false}; ///< Run VACUUM/optimise on file close.
130 int64_t max_row_count{5'000'000'000LL}; ///< Max rows before splitting (if enable_limit).
131 int64_t max_bytes_size{1024LL * 1024LL * 1024LL * 512LL}; ///< Max file bytes before splitting (if enable_limit).
132 int64_t split_by_size{1024LL * 1024LL * 1024LL * 1LL}; ///< Split file when it reaches this size (bytes).
133 int64_t split_by_time{0}; ///< Split file every N milliseconds. 0 = disabled.
134 int64_t begin_time{0}; ///< Recording start timestamp (ms). 0 = now.
135 int64_t cache_size{1024LL * 1024LL * 4}; ///< SQLite page cache size (bytes).
136 int64_t compress_start_size{128}; ///< Minimum payload size (bytes) to compress.
137 int64_t compress_level{3}; ///< Compression level (codec-specific).
138 int64_t max_task_depth{20000}; ///< Max pending write tasks in the queue.
139 int64_t max_memory_size{1024LL * 1024LL * 1024LL * 2LL}; ///< Max in-memory cache size (bytes).
140 int64_t start_timestamp{0}; ///< Override the bag start timestamp (ms since epoch).
141 std::unordered_set<std::string> ignore_compress_urls; ///< URLs whose payloads are never compressed.
142
143 Config() {} // NOLINT(modernize-use-equals-default)
144 };
145
146 /**
147 * @brief Callback fired when a split occurs.
148 *
149 * @details
150 * Called with the zero-based split index and the path of the newly created file.
151 * The @c before parameter of @c register_split_callback() controls whether the
152 * callback fires before or after the new file is opened.
153 */
154 using SplitCallback = std::function<void(int split_index, const std::string& split_filename)>;
155
156 /**
157 * @brief Callback that resolves a serialisation type string to a @c SchemaData.
158 *
159 * @details
160 * When a new URL with an unknown @c ser_type appears, this callback is invoked to
161 * retrieve the corresponding schema for embedding in the bag. The extra
162 * @c schema_type hint lets callers distinguish schema families that share
163 * the same concrete type name.
164 */
165 using SchemaCallback = std::function<SchemaData(const std::string& ser_type, SchemaType schema_type)>;
166
167 /**
168 * @brief System clock type used for file-name timestamp generation.
169 */
170 using SystemClock = std::chrono::time_point<std::chrono::system_clock, std::chrono::milliseconds>;
171
172 /**
173 * @brief Creates a concrete @c BagWriter instance for @p path.
174 *
175 * @details
176 * Selects the implementation based on the file extension:
177 * - @c .vcap / @c .vcapx -- @c McapWriter (MCAP format)
178 * - All other extensions -- @c DatabaseWriter (SQLite)
179 * The returned writer has not yet started its event loop; call @c async_run().
180 *
181 * @param path Output file path.
182 * @param config Recording configuration. Defaults to @c Config{}.
183 * @return Shared pointer to the new writer.
184 */
185 [[nodiscard]] static std::shared_ptr<BagWriter> create(const std::string& path, const Config& config = {});
186
187 /**
188 * @brief Returns an existing writer for @p path, or creates and starts a new one.
189 *
190 * @details
191 * Searches the global writer registry. If a writer matching @p path is alive,
192 * returns a shared pointer to it. Otherwise creates a new writer for @p path,
193 * calls @c async_run() on it, registers it in the global registry, and returns it.
194 * The writer is automatically removed from the registry when the last shared
195 * pointer to it is released.
196 *
197 * @param path Output file path.
198 * @return Shared pointer to the writer (never @c nullptr).
199 */
200 [[nodiscard]] static std::shared_ptr<BagWriter> filter_get(const std::string& path);
201
202 /**
203 * @brief Returns the process-global @c BagWriter activated by the @c VLINK_BAG_PATH environment variable.
204 *
205 * @details
206 * The global writer is created automatically on first access if @c VLINK_BAG_PATH is set.
207 * Returns @c nullptr if the environment variable is not set.
208 *
209 * @return Raw pointer to the global writer, or @c nullptr.
210 */
212
213 /**
214 * @brief Constructs a @c BagWriter for @p path with the given @p config.
215 *
216 * @details
217 * Opens or creates the output file. Must call @c async_run() before writing.
218 *
219 * @param path Output file path.
220 * @param config Recording configuration.
221 */
222 explicit BagWriter(const std::string& path, const Config& config = {});
223
224 /**
225 * @brief Destructor -- stops the recording loop and flushes pending writes.
226 */
227 virtual ~BagWriter(); // NOLINT(modernize-use-override)
228
229 /**
230 * @brief Registers a callback invoked when a file split occurs.
231 *
232 * @param callback Called with (split_index, new_filename) on each split.
233 * @param before If @c true, the callback fires before the new file is opened;
234 * if @c false, it fires after.
235 */
236 virtual void register_split_callback(SplitCallback&& callback, bool before) = 0;
237
238 /**
239 * @brief Registers a callback that resolves serialisation type strings to @c SchemaData.
240 *
241 * @details
242 * Called when a @c push() introduces a URL with an unknown @c ser_type.
243 *
244 * @param callback Function mapping ser_type string to @c SchemaData.
245 */
246 virtual void register_schema_callback(SchemaCallback&& callback) = 0;
247
248 /**
249 * @brief Embeds a @c SchemaData into the bag for later offline introspection.
250 *
251 * @param schema_data Schema descriptor to store.
252 * @param immediate If @c true, merges synchronously; otherwise enqueues.
253 * @return @c false only when the immediate merge fails.
254 */
255 virtual bool push_schema(const SchemaData& schema_data, bool immediate = false) = 0;
256
257 /**
258 * @brief Records one message to the bag.
259 *
260 * @details
261 * The message is enqueued on the recording loop and written asynchronously.
262 * If @p microseconds_timestamp is @c nullptr, the current system time is used.
263 *
264 * @param url VLink URL of the topic (e.g., @c "dds://my/topic").
265 * @param ser_type Serialisation type string (e.g., @c "demo.proto.PointCloud", @c "raw").
266 * @param schema_type Coarse schema family for the payload.
267 * @param action_type Action type (@c kPublish, @c kRequest, etc.).
268 * @param data Serialized payload bytes.
269 * @param microseconds_timestamp Optional pointer to a custom timestamp (microseconds).
270 * @param immediate If @c true, writes synchronously bypassing the queue.
271 * @return Sequence number (monotonically increasing) of the recorded message,
272 * or a negative value on error.
273 */
274 virtual int64_t push(const std::string& url, const std::string& ser_type, SchemaType schema_type,
275 ActionType action_type, const Bytes& data, int64_t* microseconds_timestamp = nullptr,
276 bool immediate = false) = 0;
277
278 /**
279 * @brief Returns @c true if the writer is actively recording to disk.
280 *
281 * @return @c true if the backing file is open and being written.
282 */
283 [[nodiscard]] virtual bool is_dumping() const = 0;
284
285 /**
286 * @brief Returns @c true if the writer is in split-file mode.
287 *
288 * @return @c true when @c Config::split_by_size or @c Config::split_by_time is non-zero.
289 */
290 [[nodiscard]] virtual bool is_split_mode() const = 0;
291
292 /**
293 * @brief Returns the zero-based index of the current split file.
294 *
295 * @details
296 * Returns 0 if split mode is not active.
297 *
298 * @return Current split file index.
299 */
300 [[nodiscard]] virtual int get_split_index() const = 0;
301
302 /**
303 * @brief Sets the expected message loss ratio for a given URL.
304 *
305 * @details
306 * Stored as metadata in the bag. Used for post-processing diagnostics to
307 * distinguish intentional drops from unexpected loss.
308 *
309 * @param url Topic URL.
310 * @param loss Loss ratio in the range [0.0, 1.0].
311 */
312 virtual void set_url_loss(const std::string& url, double loss) = 0;
313
314 protected:
315 void get_url_meta(const std::string& url, const std::string& ser, int& url_index, int& ser_index) const;
316
317 void get_url_meta(int url_index, int ser_index, std::string& url, std::string& ser) const;
318
319 static const std::string& get_default_tag_name();
320
321 static const std::string& get_default_app_name();
322
324
326
327 static std::string_view convert_action(ActionType type);
328
329 static std::string get_format_date(SystemClock* current = nullptr, bool file_format = false);
330
331 private:
332 std::unique_ptr<struct BagWriterImpl> impl_;
333
335};
336
337} // namespace vlink
Versatile byte buffer with small-buffer optimisation, ownership semantics and compression.
Platform-independent macro definitions for the VLink library.
#define VLINK_EXPORT
定义 macros.h:85
#define VLINK_DISALLOW_COPY_AND_ASSIGN(classname)
Deletes the copy constructor and copy-assignment operator of classname.
定义 macros.h:184
Single-threaded event loop with three queue types, timer management and task scheduling.
Core type definitions shared across all VLink node implementations.