VLink 2.0.0
A high-performance communication middleware
载入中...
搜索中...
未找到
bag_reader.h
浏览该文件的文档.
1/*
2 * Copyright (C) 2026 by Thun Lu. All rights reserved.
3 * Author: Thun Lu <thun.lu@zohomail.cn>
4 * Repo: https://github.com/thun-res/vlink
5 * _ __ __ _ __
6 * | | / / / / (_) ____ / /__
7 * | | / / / / / / / __ \ / //_/
8 * | |/ / / /___ / / / / / / / ,<
9 * |___/ /_____/ /_/ /_/ /_/ /_/|_|
10 *
11 * Licensed under the Apache License, Version 2.0 (the "License");
12 * you may not use this file except in compliance with the License.
13 * You may obtain a copy of the License at
14 *
15 * http://www.apache.org/licenses/LICENSE-2.0
16 *
17 * Unless required by applicable law or agreed to in writing, software
18 * distributed under the License is distributed on an "AS IS" BASIS,
19 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20 * See the License for the specific language governing permissions and
21 * limitations under the License.
22 */
23
24/**
25 * @file bag_reader.h
26 * @brief Abstract base class for VLink bag file playback with time-based seeking and rate control.
27 *
28 * @details
29 * @c BagReader is an abstract @c MessageLoop-based player that reads VLink bag files and
30 * replays recorded messages through an @c OutputCallback. Concrete implementations are
31 * @c DatabaseReader (SQLite-backed) and @c McapReader (MCAP-format).
32 *
33 * Playback features:
34 * - Configurable playback rate (e.g., @c rate=2.0 for 2x speed).
35 * - Loop playback via the @c times field (@c kInfinite = -1 for endless loop).
36 * - Time-range filtering via @c begin_time and @c end_time.
37 * - Jump-to-timestamp seeking with optional forced play.
38 * - Per-URL output filtering via @c Config::filter_urls whitelist.
39 * - Background integrity check, reindex, and fix operations via @c std::future.
40 * - Plugin interface for custom URL/type mapping.
41 *
42 * @par Playback example
43 * @code
44 * auto reader = vlink::BagReader::create("/data/log.vdb");
45 * reader->register_output_callback([](int64_t ts, const std::string& url,
46 * vlink::ActionType action, const vlink::Bytes& data) {
47 * VLOG_I("ts=", ts, " url=", url, " size=", data.size());
48 * });
49 * reader->async_run();
50 *
51 * vlink::BagReader::Config cfg;
52 * cfg.rate = 1.0;
53 * cfg.times = vlink::BagReader::kInfinite;
54 * reader->play(cfg);
55 * @endcode
56 *
57 * @note
58 * - Call @c async_run() before @c play().
59 * - @c check(), @c reindex(), and @c fix() run on a background thread and return
60 * a @c std::future<bool> for result polling.
61 * - The file format is auto-detected from the extension by @c create()
62 * (@c .vcap / @c .vcapx -> McapReader, otherwise -> DatabaseReader).
63 */
64
65#pragma once
66
67#include <cstdint>
68#include <functional>
69#include <future>
70#include <memory>
71#include <string>
72#include <string_view>
73#include <unordered_map>
74#include <unordered_set>
75#include <vector>
76
77#include "../base/bytes.h"
78#include "../base/macros.h"
80#include "../impl/types.h"
82
83namespace vlink {
84
85/**
86 * @class BagReader
87 * @brief Abstract VLink bag file player with time control, seeking, and integrity tools.
88 *
89 * @details
90 * Inherits @c MessageLoop to drive playback on a dedicated thread.
91 * Concrete subclasses (@c DatabaseReader, @c McapReader) implement format-specific I/O.
92 */
94 public:
95 /**
96 * @brief Sentinel value for the @c Config::times field to indicate endless loop playback.
97 */
98 static constexpr int kInfinite{-1};
99
100 /**
101 * @brief Playback state of the reader.
102 *
103 * | State | Description |
104 * | -------- | --------------------------------------------- |
105 * | kStopped | Not playing; reset to beginning |
106 * | kPaused | Playback suspended; can be resumed |
107 * | kPlaying | Actively delivering messages to the callback |
108 */
109 enum Status : uint8_t {
110 kStoped = 0, ///< Stopped (not playing).
111 kPaused = 1, ///< Paused mid-playback.
112 kPlaying = 2, ///< Actively playing.
113 };
114
115 /**
116 * @struct Info
117 * @brief Metadata extracted from the bag file header and index.
118 *
119 * @details
120 * Available after construction. Contains file-level metadata and per-URL statistics.
121 */
122 struct Info final {
123 /**
124 * @struct UrlMeta
125 * @brief Per-URL statistics extracted from the bag index.
126 */
127 struct VLINK_EXPORT UrlMeta final {
128 bool valid{false}; ///< @c true if this UrlMeta was successfully populated.
129 int index{0}; ///< Numeric index of this URL in the bag's URL table.
130 std::string url; ///< Full VLink URL string.
131 std::string url_type; ///< Communication model type (e.g., "Event", "Method", "Field").
132 std::string ser_type; ///< Serialisation type string (e.g., "demo.proto.PointCloud", "raw").
133 SchemaType schema_type{SchemaType::kUnknown}; ///< Coarse schema family associated with this URL.
134 size_t count{0}; ///< Total number of messages recorded for this URL.
135 size_t size{0}; ///< Total compressed bytes recorded for this URL.
136 double freq{0}; ///< Average message frequency (Hz).
137 double loss{0}; ///< Declared message loss ratio [0.0, 1.0].
138
139 /**
140 * @brief Comparison operator for sorting UrlMeta entries.
141 *
142 * @details
143 * Sorts primarily by the URL's transport priority, then by URL string
144 * lexicographically, and finally by numeric index as a tie-breaker.
145 *
146 * @param target Right-hand side.
147 * @return @c true if @c *this should sort before @p target.
148 */
149 bool operator<(const UrlMeta& target) const noexcept;
150 };
151
152 std::string file_name; ///< Absolute path to the bag file.
153 std::string tag_name; ///< Tag name stored in the bag header.
154 std::string version; ///< Bag format version string.
155 std::string storage_type; ///< Storage backend (e.g., "sqlite", "mcap").
156 std::string compression_type; ///< Compression algorithm used (e.g., "lzav", "zstd").
157 std::string time_accuracy; ///< Timestamp resolution (e.g., "us", "ns").
158 std::string process_name; ///< Name of the recording process.
159 std::string date_time; ///< Recording start date/time string.
160 bool has_completed{false}; ///< @c true if the recording was cleanly finalized.
161 bool has_idx_elapsed{false}; ///< @c true if an elapsed-time index is present.
162 bool has_idx_url{false}; ///< @c true if a URL index is present.
163 bool has_schema{false}; ///< @c true if any schemas are embedded.
164 int32_t timezone{0}; ///< Timezone offset in minutes from UTC.
165 int64_t start_timestamp{0}; ///< Recording start timestamp (milliseconds since epoch).
166 int64_t blank_duration{0}; ///< Total blank (gap) duration (milliseconds).
167 int64_t total_duration{0}; ///< Total recording duration (milliseconds).
168 int64_t file_size{0}; ///< File size in bytes.
169 int64_t total_raw_size{0}; ///< Total uncompressed payload size (bytes).
170 int64_t message_count{0}; ///< Total number of messages across all URLs.
171 int64_t split_count{0}; ///< Number of file splits (0 if single file).
172 int64_t split_by_size{0}; ///< Split threshold by size (bytes).
173 int64_t split_by_time{0}; ///< Split threshold by time (milliseconds).
174 std::vector<UrlMeta> url_metas; ///< Per-URL statistics, one entry per recorded topic.
175 };
176
177 /**
178 * @struct Config
179 * @brief Playback configuration passed to @c play().
180 */
181 struct Config final {
182 int64_t begin_time{0}; ///< Playback start timestamp (ms). 0 = from beginning.
183 int64_t end_time{0}; ///< Playback end timestamp (ms). 0 = until end.
184 int times{1}; ///< Number of loops. @c kInfinite (-1) = loop forever.
185 double rate{1.0}; ///< Playback rate multiplier. 1.0 = real time.
186 bool skip_blank{false}; ///< If @c true, skip silent gaps between messages.
187 int64_t force_delay{-1}; ///< Override inter-message delay (ms). -1 = use timestamps.
188 bool auto_pause{false}; ///< If @c true, pause automatically at each message.
189 bool auto_quit{false}; ///< If @c true, quit the loop thread when playback ends.
190 std::unordered_set<std::string> filter_urls; ///< Whitelist of URLs to play. Empty = all URLs.
191 };
192
193 /**
194 * @brief Callback type fired for each replayed message.
195 *
196 * @details
197 * Called on the BagReader's loop thread. The @p data reference is valid only
198 * for the duration of the callback.
199 *
200 * @param microseconds_timestamp Message timestamp in microseconds.
201 * @param url Topic URL string.
202 * @param action_type Action type (kPublish, kRequest, etc.).
203 * @param data Serialized message payload.
204 */
205 using OutputCallback = std::function<void(int64_t microseconds_timestamp, const std::string& url,
206 ActionType action_type, const Bytes& data)>;
207
208 /**
209 * @brief Callback fired whenever the playback @c Status changes.
210 *
211 * @param status New playback status.
212 */
213 using StatusCallback = std::function<void(Status status)>;
214
215 /**
216 * @brief Callback fired when the reader has opened the file and is ready to start playing.
217 */
218 using ReadyCallback = std::function<void()>;
219
220 /**
221 * @brief Callback fired when playback has finished (or was interrupted).
222 *
223 * @param is_interrupted @c true if @c stop() was called before natural end.
224 */
225 using FinishCallback = std::function<void(bool is_interrupted)>;
226
227 /**
228 * @brief Creates a concrete @c BagReader for @p path, selecting the implementation by extension.
229 *
230 * @details
231 * - @c .vcap / @c .vcapx -- @c McapReader (MCAP format)
232 * - All other extensions -- @c DatabaseReader (SQLite)
233 *
234 * @param path Path to the bag file.
235 * @param read_only If @c true, open in read-only mode (no write operations).
236 * @param try_to_fix If @c true, attempt to repair a corrupt bag on open.
237 * @return Shared pointer to the new reader.
238 */
239 [[nodiscard]] static std::shared_ptr<BagReader> create(const std::string& path, bool read_only = true,
240 bool try_to_fix = false);
241
242 /**
243 * @brief Constructs the reader for @p path.
244 *
245 * @param path Path to the bag file.
246 * @param read_only Open in read-only mode.
247 * @param try_to_fix Attempt repair if the file is corrupt.
248 */
249 explicit BagReader(const std::string& path, bool read_only = true, bool try_to_fix = false);
250
251 /**
252 * @brief Destructor -- stops playback and releases file resources.
253 */
254 virtual ~BagReader(); // NOLINT(modernize-use-override)
255
256 /**
257 * @brief Attaches a @c BagReaderPluginInterface for custom URL/type conversion.
258 *
259 * @details
260 * The plugin's @c convert_url_meta() is called for each URL in the bag to allow
261 * remapping before messages are dispatched to @c OutputCallback.
262 *
263 * @param plugin_interface Plugin to bind. May be @c nullptr to detach.
264 */
265 virtual void bind_plugin_interface(const std::shared_ptr<BagReaderPluginInterface>& plugin_interface);
266
267 /**
268 * @brief Registers a callback fired whenever the playback status changes.
269 *
270 * @param status_callback Callback receiving the new @c Status value.
271 */
272 virtual void register_status_callback(StatusCallback&& status_callback);
273
274 /**
275 * @brief Registers a callback fired when the reader is ready to start playing.
276 *
277 * @param ready_callback Callback invoked once the file is open and parsed.
278 */
279 virtual void register_ready_callback(ReadyCallback&& ready_callback);
280
281 /**
282 * @brief Registers a callback fired when playback ends or is interrupted.
283 *
284 * @param finish_callback Callback receiving @c is_interrupted flag.
285 */
286 virtual void register_finish_callback(FinishCallback&& finish_callback);
287
288 /**
289 * @brief Registers the callback that receives replayed messages.
290 *
291 * @param output_callback Called for each message during playback.
292 */
293 virtual void register_output_callback(OutputCallback&& output_callback);
294
295 /**
296 * @brief Starts playback with the given @p config.
297 *
298 * @details
299 * Must be called after @c async_run(). The reader transitions to @c kPlaying.
300 *
301 * @param config Playback configuration.
302 */
303 virtual void play(const Config& config) = 0;
304
305 /**
306 * @brief Stops playback and resets the reader to the beginning.
307 *
308 * @details
309 * Transitions the reader to @c kStopped. The @c FinishCallback is fired with
310 * @c is_interrupted = @c true.
311 */
312 virtual void stop() = 0;
313
314 /**
315 * @brief Pauses playback at the current position.
316 *
317 * @details Transitions from @c kPlaying to @c kPaused.
318 */
319 virtual void pause() = 0;
320
321 /**
322 * @brief Resumes a paused playback from the current position.
323 *
324 * @details Transitions from @c kPaused to @c kPlaying.
325 */
326 virtual void resume() = 0;
327
328 /**
329 * @brief Advances one message while paused, then pauses again.
330 *
331 * @details Useful for single-stepping through a bag in debug sessions.
332 */
333 virtual void pause_to_next() = 0;
334
335 /**
336 * @brief Seeks to @p begin_time and resumes playback at @p rate with @p times loops.
337 *
338 * @param begin_time Seek target timestamp in milliseconds (relative to recording start).
339 * @param rate New playback rate multiplier.
340 * @param times Number of loops after the jump.
341 * @param force_to_play If @c true, forces play state even if currently paused.
342 */
343 virtual void jump(int64_t begin_time, double rate, int times, bool force_to_play = false) = 0;
344
345 /**
346 * @brief Verifies the integrity of the bag file asynchronously.
347 *
348 * @return @c std::future<bool> that resolves to @c true if the file is intact.
349 */
350 virtual std::future<bool> check() = 0;
351
352 /**
353 * @brief Rebuilds the index tables asynchronously.
354 *
355 * @return @c std::future<bool> that resolves to @c true on success.
356 */
357 virtual std::future<bool> reindex() = 0;
358
359 /**
360 * @brief Repairs a corrupt bag file asynchronously.
361 *
362 * @param rebuild If @c true, rebuilds the entire index from scratch.
363 * @return @c std::future<bool> that resolves to @c true if repair succeeded.
364 */
365 virtual std::future<bool> fix(bool rebuild = false) = 0;
366
367 /**
368 * @brief Updates the tag name stored in the bag's metadata.
369 *
370 * @param tag_name New tag name string.
371 */
372 virtual void tag(const std::string& tag_name) = 0;
373
374 /**
375 * @brief Returns the current playback position as a recording timestamp.
376 *
377 * @return Current message timestamp in milliseconds (recording time, relative to start).
378 */
379 [[nodiscard]] virtual int64_t get_timestamp() const = 0;
380
381 /**
382 * @brief Returns the current playback position in real elapsed time.
383 *
384 * @return Elapsed time since playback started (milliseconds).
385 */
386 [[nodiscard]] virtual int64_t get_real_timestamp() const = 0;
387
388 /**
389 * @brief Returns the current playback status.
390 *
391 * @return One of @c kStopped, @c kPaused, or @c kPlaying.
392 */
393 [[nodiscard]] virtual Status get_status() const = 0;
394
395 /**
396 * @brief Returns the bag file metadata and per-URL statistics.
397 *
398 * @return Const reference to the @c Info struct populated at open time.
399 */
400 [[nodiscard]] virtual const Info& get_info() const = 0;
401
402 /**
403 * @brief Scans the bag and returns all embedded schemas.
404 *
405 * @return Vector of @c SchemaData descriptors found in the bag.
406 */
407 [[nodiscard]] virtual std::vector<SchemaData> detect_schema() = 0;
408
409 /**
410 * @brief Returns the serialisation type string for a given @p url.
411 *
412 * @param url Topic URL to look up.
413 * @return Serialisation type (e.g., @c "demo.proto.PointCloud"), or an empty string if unknown.
414 */
415 [[nodiscard]] virtual std::string get_ser_type(const std::string& url) const = 0;
416
417 /**
418 * @brief Returns the coarse schema family for a given @p url.
419 *
420 * @param url Topic URL to look up.
421 * @return Schema family, or @c SchemaType::kUnknown if unavailable.
422 */
423 [[nodiscard]] virtual SchemaType get_schema_type(const std::string& url) const = 0;
424
425 /**
426 * @brief Returns @c true if the bag spans multiple split files.
427 *
428 * @return @c true when reading a split bag.
429 */
430 [[nodiscard]] virtual bool is_split_mode() const = 0;
431
432 /**
433 * @brief Returns the zero-based index of the current split file being read.
434 *
435 * @return Current split file index, or 0 for single-file bags.
436 */
437 [[nodiscard]] virtual int get_split_index() const = 0;
438
439 /**
440 * @brief Returns @c true if a jump-to-timestamp seek is currently in progress.
441 *
442 * @return @c true while seeking.
443 */
444 [[nodiscard]] virtual bool is_jumping() const = 0;
445
446 protected:
447 /**
448 * @brief Rebuilds URL metadata lookup maps after plugin remapping.
449 *
450 * @details
451 * Reader implementations call this after @c process_url_metas() mutates the
452 * per-URL metadata list, ensuring @c get_ser_type() and @c get_schema_type()
453 * both observe the remapped metadata instead of stale pre-plugin entries.
454 *
455 * @param url_metas Remapped URL metadata list.
456 * @param ser_map Output lookup map: URL -> serialisation type.
457 * @param schema_type_map Output lookup map: URL -> coarse schema family.
458 */
459 static void rebuild_url_meta_maps(const std::vector<Info::UrlMeta>& url_metas,
460 std::unordered_map<std::string, std::string>& ser_map,
461 std::unordered_map<std::string, SchemaType>& schema_type_map);
462
463 void process_output(int64_t timestamp, const std::string& url, ActionType action_type, const Bytes& data);
464
465 void process_url_metas(std::vector<Info::UrlMeta>& url_metas);
466
467 static ActionType convert_action(std::string_view str);
468
469 private:
470 std::unique_ptr<struct BagReaderImpl> impl_;
471
473};
474
475} // namespace vlink
Plugin interface for custom bag reader URL/type transformation and message processing.
Versatile byte buffer with small-buffer optimisation, ownership semantics and compression.
Platform-independent macro definitions for the VLink library.
#define VLINK_EXPORT
定义 macros.h:85
#define VLINK_DISALLOW_COPY_AND_ASSIGN(classname)
Deletes the copy constructor and copy-assignment operator of classname.
定义 macros.h:184
Single-threaded event loop with three queue types, timer management and task scheduling.
Core type definitions shared across all VLink node implementations.