VLink 2.0.0
A high-performance communication middleware
Loading...
Searching...
No Matches
bag_reader_processor.h
Go to the documentation of this file.
1/*
2 * Copyright (C) 2026 by Thun Lu. All rights reserved.
3 * Author: Thun Lu <thun.lu@zohomail.cn>
4 * Repo: https://github.com/thun-res/vlink
5 * _ __ __ _ __
6 * | | / / / / (_) ____ / /__
7 * | | / / / / / / / __ \ / //_/
8 * | |/ / / /___ / / / / / / / ,<
9 * |___/ /_____/ /_/ /_/ /_/ /_/|_|
10 *
11 * Licensed under the Apache License, Version 2.0 (the "License");
12 * you may not use this file except in compliance with the License.
13 * You may obtain a copy of the License at
14 *
15 * http://www.apache.org/licenses/LICENSE-2.0
16 *
17 * Unless required by applicable law or agreed to in writing, software
18 * distributed under the License is distributed on an "AS IS" BASIS,
19 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20 * See the License for the specific language governing permissions and
21 * limitations under the License.
22 */
23
24/**
25 * @file bag_reader_processor.h
26 * @brief Time-ordered message buffer for smoothing bag playback across split files.
27 *
28 * @details
29 * @c BagReaderProcessor accepts messages pushed from one or more @c BagReader instances
30 * and delivers them to an @c OutputCallback in ascending timestamp order. It uses a
31 * time-windowed cache to absorb out-of-order messages that can arise when reading
32 * consecutive split files concurrently.
33 *
34 * The processor maintains an internal sorted queue. Messages are held for at least
35 * @c Config::min_cache_time milliseconds before being flushed, allowing late-arriving
36 * messages from the current split to be sorted before delivery.
37 *
38 * @par Typical usage
39 * @code
40 * vlink::BagReaderProcessor processor;
41 * processor.register_output_callback([](int64_t ts, const std::string& url,
42 * vlink::ActionType action, const vlink::Bytes& data) {
43 * // process in order
44 * });
45 *
46 * // Feed from multiple readers:
47 * reader_a->register_output_callback([&](auto ts, auto& url, auto action, auto& data) {
48 * processor.push(ts, url, action, data);
49 * });
50 * reader_b->register_output_callback([&](auto ts, auto& url, auto action, auto& data) {
51 * processor.push(ts, url, action, data);
52 * });
53 * @endcode
54 */
55
56#pragma once
57
58#include <cstdint>
59#include <functional>
60#include <memory>
61#include <mutex>
62#include <string>
63
64#include "../base/bytes.h"
65#include "../impl/types.h"
66
67namespace vlink {
68
69/**
70 * @class BagReaderProcessor
71 * @brief Time-sorted message relay that buffers and orders messages before delivery.
72 *
73 * @details
74 * Thread-safe. Multiple threads may call @c push() concurrently.
75 */
77 public:
78 /**
79 * @brief Callback type fired for each message in timestamp order.
80 *
81 * @details
82 * Called from an internal processing thread after the cache window has elapsed.
83 */
85 std::function<void(int64_t timestamp, const std::string& url, ActionType action_type, const Bytes& data)>;
86
87 /**
88 * @struct Config
89 * @brief Configuration for the time-ordered message cache.
90 */
91 struct Config final {
92 int64_t min_cache_time{500}; ///< Minimum cache time in milliseconds before flushing.
93 int64_t max_cache_size{1024LL * 1024LL * 256}; ///< Maximum cache size in bytes (256 MiB).
94
95 Config() {} // NOLINT(modernize-use-equals-default)
96 };
97
98 /**
99 * @brief Constructs the processor with the given @p config.
100 *
101 * @param config Cache time and size limits.
102 */
103 explicit BagReaderProcessor(const Config& config = Config());
104
105 /**
106 * @brief Destructor -- flushes remaining cached messages and stops the processing thread.
107 */
109
110 /**
111 * @brief Registers the callback that receives time-ordered messages.
112 *
113 * @details
114 * Only one callback may be registered. A subsequent call replaces the previous one.
115 *
116 * @param output_callback Callback invoked for each message in order.
117 */
119
120 /**
121 * @brief Pushes a message into the time-ordered cache.
122 *
123 * @details
124 * Thread-safe. The message is inserted into an internal sorted queue keyed by
125 * @p timestamp. Messages are delivered to the @c OutputCallback after the
126 * @c min_cache_time window has elapsed.
127 *
128 * @param timestamp Message timestamp in microseconds.
129 * @param url Topic URL string.
130 * @param action_type Action type.
131 * @param data Serialized payload bytes.
132 */
133 void push(int64_t timestamp, const std::string& url, ActionType action_type, const Bytes& data);
134
135 private:
136 bool on_check();
137
138 void on_output(std::unique_lock<std::mutex>& lock);
139
140 void on_run();
141
142 void on_exec(bool at_end);
143
144 std::unique_ptr<struct BagReaderProcessorImpl> impl_;
145};
146
147} // namespace vlink
Versatile byte buffer with small-buffer optimisation, ownership semantics and compression.
#define VLINK_EXPORT
Definition macros.h:85
Core type definitions shared across all VLink node implementations.