VLink 2.0.0
A high-performance communication middleware
载入中...
搜索中...
未找到
database_writer.h
浏览该文件的文档.
1/*
2 * Copyright (C) 2026 by Thun Lu. All rights reserved.
3 * Author: Thun Lu <thun.lu@zohomail.cn>
4 * Repo: https://github.com/thun-res/vlink
5 * _ __ __ _ __
6 * | | / / / / (_) ____ / /__
7 * | | / / / / / / / __ \ / //_/
8 * | |/ / / /___ / / / / / / / ,<
9 * |___/ /_____/ /_/ /_/ /_/ /_/|_|
10 *
11 * Licensed under the Apache License, Version 2.0 (the "License");
12 * you may not use this file except in compliance with the License.
13 * You may obtain a copy of the License at
14 *
15 * http://www.apache.org/licenses/LICENSE-2.0
16 *
17 * Unless required by applicable law or agreed to in writing, software
18 * distributed under the License is distributed on an "AS IS" BASIS,
19 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20 * See the License for the specific language governing permissions and
21 * limitations under the License.
22 */
23
24/**
25 * @file database_writer.h
26 * @brief Concrete BagWriter implementation for the SQLite-backed VLink bag format.
27 *
28 * @details
29 * @c DatabaseWriter records VLink messages to a SQLite @c .vdb file. It extends
30 * @c BagWriter with transactional write caching (WAL mode, batch commit), optional
31 * VACUUM optimisation on exit, and in-place schema embedding for Protobuf introspection.
32 *
33 * Internally, messages are accumulated in a memory cache and committed in batches to
34 * reduce SQLite write overhead. Cache parameters are configurable via @c BagWriter::Config.
35 *
36 * @par Usage
37 * @code
38 * vlink::BagWriter::Config cfg;
39 * cfg.compress = vlink::BagWriter::kCompressLzav;
40 * cfg.wal_mode = true;
41 *
42 * auto writer = vlink::BagWriter::create("/data/recording.vdb", cfg);
43 * // or explicitly:
44 * auto writer = std::make_shared<vlink::DatabaseWriter>("/data/recording.vdb", cfg);
45 * writer->async_run();
46 * writer->push("dds://my/topic", "demo.proto.PointCloud", vlink::SchemaType::kProtobuf,
47 * vlink::ActionType::kPublish, data);
48 * @endcode
49 *
50 * @see BagWriter, McapWriter
51 */
52
53#pragma once
54
55#include <memory>
56#include <string>
57
58#include "./bag_writer.h"
59
60namespace vlink {
61
62/**
63 * @class DatabaseWriter
64 * @brief Concrete SQLite-backed bag file recorder with transactional write caching.
65 *
66 * @details
67 * All virtual methods from @c BagWriter are implemented. Prefer using
68 * @c BagWriter::create() for format-agnostic construction.
69 */
71 public:
72 /**
73 * @brief Constructs a @c DatabaseWriter for the given @p path.
74 *
75 * @param path Path to the output @c .vdb file. Created if it does not exist.
76 * @param config Recording configuration.
77 */
78 explicit DatabaseWriter(const std::string& path, const Config& config = {});
79
80 /**
81 * @brief Destructor -- commits remaining cached writes and closes the SQLite database.
82 */
83 ~DatabaseWriter() override;
84
85 /**
86 * @brief Registers a callback invoked when a file split occurs.
87 *
88 * @param callback Called with (split_index, new_filename) on each split.
89 * @param before If @c true, fires before the new file is opened; otherwise after.
90 */
91 void register_split_callback(SplitCallback&& callback, bool before) override;
92
93 /**
94 * @brief Registers a callback that resolves serialisation type strings to @c SchemaData.
95 *
96 * @param callback Function mapping (@c ser_type, @c schema_type) to @c SchemaData.
97 */
98 void register_schema_callback(SchemaCallback&& callback) override;
99
100 /**
101 * @brief Embeds a @c SchemaData into the SQLite bag for offline introspection.
102 *
103 * @param schema_data Schema descriptor to store.
104 * @param immediate If @c true, merges synchronously; otherwise enqueues.
105 * @return @c false only when the immediate merge fails.
106 */
107 bool push_schema(const SchemaData& schema_data, bool immediate = false) override;
108
109 /**
110 * @brief Records one message to the SQLite bag file.
111 *
112 * @param url VLink URL of the topic.
113 * @param ser_type Serialisation type string.
114 * @param schema_type Coarse schema family for the payload.
115 * @param action_type Action type (@c kPublish, @c kRequest, etc.).
116 * @param data Serialized payload bytes.
117 * @param microseconds_timestamp Optional custom timestamp in microseconds.
118 * @c nullptr means use the current system time.
119 * @param immediate If @c true, writes synchronously bypassing the queue.
120 * @return Sequence number of the recorded message, or a negative value on error.
121 */
122 int64_t push(const std::string& url, const std::string& ser_type, SchemaType schema_type, ActionType action_type,
123 const Bytes& data, int64_t* microseconds_timestamp = nullptr, bool immediate = false) override;
124
125 /**
126 * @brief Returns @c true if the writer is actively recording to disk.
127 */
128 [[nodiscard]] bool is_dumping() const override;
129
130 /**
131 * @brief Returns @c true if split-file mode is active.
132 */
133 [[nodiscard]] bool is_split_mode() const override;
134
135 /**
136 * @brief Returns the zero-based index of the current split file.
137 */
138 [[nodiscard]] int get_split_index() const override;
139
140 /**
141 * @brief Sets the expected message loss ratio for a given URL.
142 *
143 * @param url Topic URL.
144 * @param loss Loss ratio in the range [0.0, 1.0].
145 */
146 void set_url_loss(const std::string& url, double loss) override;
147
148 protected:
149 size_t get_max_task_count() const override;
150
151 void on_begin() override;
152
153 void on_end() override;
154
155 private:
156 void open(const std::string& path);
157
158 void close();
159
160 bool write(const std::string& url, const std::string& ser_type, SchemaType schema_type, ActionType action_type,
161 const Bytes& data, int64_t microseconds_timestamp);
162
163 bool write_filex(bool complete = true);
164
165 bool begin_cache();
166
167 bool sync_cache();
168
169 bool rollback_cache();
170
171 bool merge_schema(SchemaData& schema_data);
172
173 bool load_schema(const std::string& ser_type, SchemaType& schema_type, SchemaData& schema_data);
174
175 bool insert_schema(const SchemaData& schema_data);
176
177 std::unique_ptr<struct DatabaseWriterImpl> impl_;
178
180};
181
182} // namespace vlink
Abstract base class for VLink bag file recording with split, compression and global writer support.
#define VLINK_EXPORT
定义 macros.h:85
#define VLINK_DISALLOW_COPY_AND_ASSIGN(classname)
Deletes the copy constructor and copy-assignment operator of classname.
定义 macros.h:184