|
VLink 2.0.0
A high-performance communication middleware
|
Abstract asynchronous message recorder backed by a MessageLoop event queue.
更多...
#include <bag_writer.h>
类 | |
| struct | Config |
| Configuration for recording behaviour, splitting, compression, and limits. 更多... | |
Public 类型 | |
| enum | CompressType : uint8_t { kCompressNone = 0 , kCompressAuto = 1 , kCompressZstd = 2 , kCompressLz4 = 3 , kCompressLzav = 4 } |
| Compression algorithm applied to each recorded payload. 更多... | |
| using | SplitCallback = std::function<void(int split_index, const std::string& split_filename)> |
| Callback fired when a split occurs. | |
| using | SchemaCallback = std::function<SchemaData(const std::string& ser_type, SchemaType schema_type)> |
Callback that resolves a serialisation type string to a SchemaData. | |
| using | SystemClock = std::chrono::time_point<std::chrono::system_clock, std::chrono::milliseconds> |
| System clock type used for file-name timestamp generation. | |
| Public 类型 继承自 vlink::MessageLoop | |
| enum | Type : uint8_t { kNormalType = 0 , kLockfreeType = 1 , kPriorityType = 2 } |
| Queue implementation type. 更多... | |
| enum | Strategy : uint8_t { kOptimizationStrategy = 0 , kPopStrategy = 1 , kBlockStrategy = 2 } |
| Idle strategy controlling CPU and latency trade-offs. 更多... | |
| enum | Priority : uint16_t { kNoPriority = 0 , kLowestPriority = 1 , kTimerPriority = 50 , kNormalPriority = 100 , kHighestPriority = std::numeric_limits<uint16_t>::max() } |
Pre-defined task priority levels for kPriorityType loops. 更多... | |
| using | Callback = std::function<void()> |
| Callback type for tasks and event handlers. | |
Public 成员函数 | |
| BagWriter (const std::string &path, const Config &config={}) | |
Constructs a BagWriter for path with the given config. | |
| virtual | ~BagWriter () |
| Destructor – stops the recording loop and flushes pending writes. | |
| virtual void | register_split_callback (SplitCallback &&callback, bool before)=0 |
| Registers a callback invoked when a file split occurs. | |
| virtual void | register_schema_callback (SchemaCallback &&callback)=0 |
Registers a callback that resolves serialisation type strings to SchemaData. | |
| virtual bool | push_schema (const SchemaData &schema_data, bool immediate=false)=0 |
Embeds a SchemaData into the bag for later offline introspection. | |
| virtual int64_t | push (const std::string &url, const std::string &ser_type, SchemaType schema_type, ActionType action_type, const Bytes &data, int64_t *microseconds_timestamp=nullptr, bool immediate=false)=0 |
| Records one message to the bag. | |
| virtual bool | is_dumping () const =0 |
Returns true if the writer is actively recording to disk. | |
| virtual bool | is_split_mode () const =0 |
Returns true if the writer is in split-file mode. | |
| virtual int | get_split_index () const =0 |
| Returns the zero-based index of the current split file. | |
| virtual void | set_url_loss (const std::string &url, double loss)=0 |
| Sets the expected message loss ratio for a given URL. | |
| Public 成员函数 继承自 vlink::MessageLoop | |
| MessageLoop () | |
Constructs a MessageLoop with kNormalType queue. | |
| MessageLoop (Type type) | |
Constructs a MessageLoop with the specified queue type. | |
| virtual | ~MessageLoop () |
Destructor. Calls quit(true) and waits for the background thread (if any). | |
| void | set_name (const std::string &name) |
| Sets a human-readable name for this loop (visible in profiling tools). | |
| const std::string & | get_name () const |
Returns the name set via set_name(). | |
| Type | get_type () const |
| Returns the queue type this loop was constructed with. | |
| Strategy | get_strategy () const |
| Returns the current idle dispatch strategy. | |
| void | set_strategy (Strategy strategy) |
| Changes the idle dispatch strategy. | |
| void | register_begin_handler (Callback &&callback) |
| Registers a callback invoked once when the loop thread starts. | |
| void | register_end_handler (Callback &&callback) |
| Registers a callback invoked once when the loop thread exits. | |
| void | register_idle_handler (Callback &&callback) |
| Registers a callback invoked each time the task queue becomes empty. | |
| bool | run () |
| Runs the message loop on the calling thread (blocking). | |
| bool | async_run () |
| Starts the message loop on a new background thread (non-blocking). | |
| bool | spin () |
| Runs the loop continuously in a spin mode (blocking; no background thread). | |
| bool | spin_once (bool block=true) |
| Processes one batch of pending tasks and timers. | |
| bool | quit (bool force=false) |
| Requests the loop to exit cleanly. | |
| bool | wait_for_idle (int ms=Timer::kInfinite, bool check=true) |
| Waits until the task queue is drained. | |
| bool | wait_for_quit (int ms=Timer::kInfinite, bool check=true) |
Waits until the loop has fully exited (after quit() was called). | |
| bool | post_task (Callback &&callback) |
| Posts a task to the queue for execution on the loop thread. | |
| bool | post_task_with_priority (Callback &&callback, uint16_t priority) |
Posts a task with an explicit priority (requires kPriorityType loop). | |
| template<typename CallbackT, typename = std::enable_if_t<!std::is_convertible_v<CallbackT, Schedule::RetCallback>>> | |
| Schedule::Status | exec_task (const Schedule::Config &config, CallbackT &&callback) |
Posts a scheduled task and returns a Schedule::Status for chaining callbacks. | |
| template<typename CallbackT, typename = std::enable_if_t<std::is_convertible_v<CallbackT, Schedule::RetCallback>>> | |
| Schedule::RetStatus | exec_task (const Schedule::Config &config, CallbackT &&callback) |
Posts a scheduled task and returns a Schedule::RetStatus for chaining callbacks. | |
| bool | wakeup () |
Wakes the loop thread if it is sleeping (e.g., in kBlockStrategy). | |
| void | reset_lockfree_capacity () |
| Resets the lock-free queue to its initial capacity. | |
| bool | is_running () const |
Returns true if the loop is currently running (started and not quit). | |
| bool | is_ready_to_quit () const |
Returns true if quit() has been called and the loop is winding down. | |
| bool | is_busy () const |
Returns true if the loop is currently executing a task. | |
| size_t | get_task_count () const |
| Returns the number of tasks currently in the queue. | |
| virtual size_t | get_max_task_count () const |
| Returns the maximum queue depth. | |
| virtual size_t | get_max_timer_count () const |
| Returns the maximum number of timers that can be attached to this loop. | |
| virtual uint32_t | get_max_elapsed_time () const |
| Returns the maximum allowed task execution time in milliseconds. | |
| virtual bool | is_in_same_thread () const |
Returns true if the calling thread is the same as the loop thread. | |
| template<class FunctionT, class... ArgsT, typename ResultT = std::invoke_result_t<FunctionT, ArgsT...>> | |
| std::future< ResultT > | invoke_task (FunctionT &&function, ArgsT &&... args) |
Dispatches a callable to the loop thread and returns a std::future for the result. | |
| template<class FunctionT, class... ArgsT, typename ResultT = std::invoke_result_t<FunctionT, ArgsT...>> | |
| std::future< ResultT > | invoke_task_with_priority (FunctionT &&function, uint16_t priority, ArgsT &&... args) |
Dispatches a callable with an explicit priority and returns a std::future. | |
静态 Public 成员函数 | |
| static std::shared_ptr< BagWriter > | create (const std::string &path, const Config &config={}) |
Creates a concrete BagWriter instance for path. | |
| static std::shared_ptr< BagWriter > | filter_get (const std::string &path) |
Returns an existing writer for path, or creates and starts a new one. | |
| static BagWriter * | global_get () |
Returns the process-global BagWriter activated by the VLINK_BAG_PATH environment variable. | |
Protected 成员函数 | |
| void | get_url_meta (const std::string &url, const std::string &ser, int &url_index, int &ser_index) const |
| void | get_url_meta (int url_index, int ser_index, std::string &url, std::string &ser) const |
| Protected 成员函数 继承自 vlink::MessageLoop | |
| virtual void | on_begin () |
| Called from the loop thread just before the first task is processed. | |
| virtual void | on_end () |
| Called from the loop thread just after the last task has been processed. | |
| virtual void | on_idle () |
| Called from the loop thread each time the queue becomes empty. | |
| virtual void | on_task_changed (Callback &&callback, uint32_t start_time) |
| Called before each task is executed. | |
| virtual void | on_task_timeout (Callback &&callback, uint32_t elapsed_time) |
Called when a task's execution time exceeds get_max_elapsed_time(). | |
静态 Protected 成员函数 | |
| static const std::string & | get_default_tag_name () |
| static const std::string & | get_default_app_name () |
| static SchemaPluginInterface * | get_schema_interface () |
| static int32_t | get_default_timezone_diff () |
| static std::string_view | convert_action (ActionType type) |
| static std::string | get_format_date (SystemClock *current=nullptr, bool file_format=false) |
Abstract asynchronous message recorder backed by a MessageLoop event queue.
Must be constructed via create() (for managed lifetime) or directly (for custom ownership). After construction call async_run() to start the recording loop, then use push() to record messages.
| using vlink::BagWriter::SchemaCallback = std::function<SchemaData(const std::string& ser_type, SchemaType schema_type)> |
Callback that resolves a serialisation type string to a SchemaData.
When a new URL with an unknown ser_type appears, this callback is invoked to retrieve the corresponding schema for embedding in the bag. The extra schema_type hint lets callers distinguish schema families that share the same concrete type name.
| using vlink::BagWriter::SplitCallback = std::function<void(int split_index, const std::string& split_filename)> |
Callback fired when a split occurs.
Called with the zero-based split index and the path of the newly created file. The before parameter of register_split_callback() controls whether the callback fires before or after the new file is opened.
| using vlink::BagWriter::SystemClock = std::chrono::time_point<std::chrono::system_clock, std::chrono::milliseconds> |
System clock type used for file-name timestamp generation.
| enum vlink::BagWriter::CompressType : uint8_t |
Compression algorithm applied to each recorded payload.
| Kind | Algorithm | Notes |
|---|---|---|
| kCompressNone | No compress | Raw bytes stored as-is |
| kCompressAuto | Auto select | Picks best algorithm per payload |
| kCompressZstd | Zstandard | Good ratio, moderate speed |
| kCompressLz4 | LZ4 | Fast compression/decompression |
| kCompressLzav | LZAV | Fast, lightweight, built-in |
| 枚举值 | |
|---|---|
| kCompressNone | No compression. |
| kCompressAuto | Automatic algorithm selection. |
| kCompressZstd | Zstandard compression. |
| kCompressLz4 | LZ4 compression. |
| kCompressLzav | LZAV built-in compression. |
|
explicit |
Constructs a BagWriter for path with the given config.
Opens or creates the output file. Must call async_run() before writing.
| path | Output file path. |
| config | Recording configuration. |
|
virtual |
Destructor – stops the recording loop and flushes pending writes.
|
staticprotected |
|
staticnodiscard |
Creates a concrete BagWriter instance for path.
Selects the implementation based on the file extension:
.vcap / .vcapx – McapWriter (MCAP format)DatabaseWriter (SQLite) The returned writer has not yet started its event loop; call async_run().| path | Output file path. |
| config | Recording configuration. Defaults to Config{}. |
|
staticnodiscard |
Returns an existing writer for path, or creates and starts a new one.
Searches the global writer registry. If a writer matching path is alive, returns a shared pointer to it. Otherwise creates a new writer for path, calls async_run() on it, registers it in the global registry, and returns it. The writer is automatically removed from the registry when the last shared pointer to it is released.
| path | Output file path. |
nullptr).
|
staticprotected |
|
staticprotected |
|
staticprotected |
|
staticprotected |
|
staticprotected |
|
nodiscardpure virtual |
Returns the zero-based index of the current split file.
Returns 0 if split mode is not active.
在 vlink::DatabaseWriter , 以及 vlink::McapWriter 内被实现.
|
protected |
|
protected |
|
static |
Returns the process-global BagWriter activated by the VLINK_BAG_PATH environment variable.
The global writer is created automatically on first access if VLINK_BAG_PATH is set. Returns nullptr if the environment variable is not set.
nullptr.
|
nodiscardpure virtual |
Returns true if the writer is actively recording to disk.
true if the backing file is open and being written. 在 vlink::DatabaseWriter , 以及 vlink::McapWriter 内被实现.
|
nodiscardpure virtual |
Returns true if the writer is in split-file mode.
true when Config::split_by_size or Config::split_by_time is non-zero. 在 vlink::DatabaseWriter , 以及 vlink::McapWriter 内被实现.
|
pure virtual |
Records one message to the bag.
The message is enqueued on the recording loop and written asynchronously. If microseconds_timestamp is nullptr, the current system time is used.
| url | VLink URL of the topic (e.g., "dds://my/topic"). |
| ser_type | Serialisation type string (e.g., "demo.proto.PointCloud", "raw"). |
| schema_type | Coarse schema family for the payload. |
| action_type | Action type (kPublish, kRequest, etc.). |
| data | Serialized payload bytes. |
| microseconds_timestamp | Optional pointer to a custom timestamp (microseconds). |
| immediate | If true, writes synchronously bypassing the queue. |
在 vlink::DatabaseWriter , 以及 vlink::McapWriter 内被实现.
|
pure virtual |
Embeds a SchemaData into the bag for later offline introspection.
| schema_data | Schema descriptor to store. |
| immediate | If true, merges synchronously; otherwise enqueues. |
false only when the immediate merge fails. 在 vlink::DatabaseWriter , 以及 vlink::McapWriter 内被实现.
|
pure virtual |
Registers a callback that resolves serialisation type strings to SchemaData.
Called when a push() introduces a URL with an unknown ser_type.
| callback | Function mapping ser_type string to SchemaData. |
在 vlink::DatabaseWriter , 以及 vlink::McapWriter 内被实现.
|
pure virtual |
Registers a callback invoked when a file split occurs.
| callback | Called with (split_index, new_filename) on each split. |
| before | If true, the callback fires before the new file is opened; if false, it fires after. |
在 vlink::DatabaseWriter , 以及 vlink::McapWriter 内被实现.
|
pure virtual |
Sets the expected message loss ratio for a given URL.
Stored as metadata in the bag. Used for post-processing diagnostics to distinguish intentional drops from unexpected loss.
| url | Topic URL. |
| loss | Loss ratio in the range [0.0, 1.0]. |
在 vlink::DatabaseWriter , 以及 vlink::McapWriter 内被实现.