VLink 2.0.0
A high-performance communication middleware
载入中...
搜索中...
未找到
subscriber.h
浏览该文件的文档.
1/*
2 * Copyright (C) 2026 by Thun Lu. All rights reserved.
3 * Author: Thun Lu <thun.lu@zohomail.cn>
4 * Repo: https://github.com/thun-res/vlink
5 * _ __ __ _ __
6 * | | / / / / (_) ____ / /__
7 * | | / / / / / / / __ \ / //_/
8 * | |/ / / /___ / / / / / / / ,<
9 * |___/ /_____/ /_/ /_/ /_/ /_/|_|
10 *
11 * Licensed under the Apache License, Version 2.0 (the "License");
12 * you may not use this file except in compliance with the License.
13 * You may obtain a copy of the License at
14 *
15 * http://www.apache.org/licenses/LICENSE-2.0
16 *
17 * Unless required by applicable law or agreed to in writing, software
18 * distributed under the License is distributed on an "AS IS" BASIS,
19 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20 * See the License for the specific language governing permissions and
21 * limitations under the License.
22 */
23
24/**
25 * @file subscriber.h
26 * @brief Type-safe event-model subscriber for VLink topics.
27 *
28 * @details
29 * @c Subscriber<MsgT, SecT> is the read side of the VLink event model.
30 * It registers a callback that is invoked with each deserialized @c MsgT
31 * message delivered from any matching @c Publisher on the same URL.
32 *
33 * @par Event Model Data Flow
34 * @code
35 * Publisher<T> Transport Back-end Subscriber<T>
36 * |-- publish(msg) --------> | |
37 * | serialize(msg) |-- bytes delivery ---------> |
38 * | | |--> callback(msg)
39 * | | | deserialize(bytes)
40 * @endcode
41 *
42 * @par Supported Message Types
43 * | Category | Type | Serializer |
44 * | --------------- | --------------------------- | ---------------- |
45 * | Raw bytes | @c Bytes | kBytesType |
46 * | Protobuf | @c MyProto (MessageLite) | kProtoType |
47 * | Protobuf ptr | @c MyProto* (Arena-managed) | kProtoPtrType |
48 * | FlatBuffers | @c MyTableT (NativeTable) | kFlatTableType |
49 * | FlatBuffers | @c MyTable* (Table ptr) | kFlatPtrType |
50 * | CDR (DDS only) | @c MyCdrType | kCdrType |
51 * | Standard layout | POD struct / trivial type | kStandardType |
52 * | String | @c std::string | kStringType |
53 * | Custom | @c T (has operator>>/<<) | kCustomType |
54 *
55 * @par Basic Usage
56 * @code
57 * Subscriber<MyMsg> sub("dds://vehicle/speed");
58 * sub.listen([](const MyMsg& msg) {
59 * std::cout << "speed: " << msg.value << std::endl;
60 * });
61 * @endcode
62 *
63 * @par Zero-copy Intra Transport
64 * When @c MsgT is a shared pointer type whose @c element_type derives from
65 * @c IntraDataType (generated via @c VLINK_INTRA_DATA_DECLARE) and the URL
66 * uses @c intra://, the shared pointer is forwarded zero-copy (no
67 * serialization):
68 * @code
69 * VLINK_INTRA_DATA_DECLARE(MyProtoMsg, MyIntra)
70 * Subscriber<MyIntra> sub("intra://my_topic");
71 * sub.listen([](const MyIntra& data) { ... });
72 * @endcode
73 *
74 * @par Latency and Sample-loss Tracking
75 * @code
76 * Subscriber<MyMsg> sub("dds://my_topic");
77 * sub.set_latency_and_lost_enabled(true);
78 * // ... after receiving messages:
79 * int64_t latency_us = sub.get_latency();
80 * SampleLostInfo lost = sub.get_lost();
81 * std::cout << "lost: " << lost.lost << " / " << lost.total << std::endl;
82 * @endcode
83 *
84 * @note Calling @c listen() more than once is a fatal error. The subscriber
85 * must be initialised before @c listen() is called.
86 *
87 * @tparam MsgT Message type. Must satisfy @c Serializer::is_supported().
88 * @tparam SecT Security mode; defaults to @c SecurityType::kWithoutSecurity.
89 */
90
91#pragma once
92
93#include <functional>
94#include <memory>
95#include <string>
96
98#include "./node.h"
99
100namespace vlink {
101
102/**
103 * @class Subscriber
104 * @brief Type-safe subscriber for the VLink event communication model.
105 *
106 * @tparam MsgT Message type.
107 * @tparam SecT Security mode.
108 */
109template <typename MsgT, SecurityType SecT = SecurityType::kWithoutSecurity>
110class Subscriber : public Node<SubscriberImpl, SecT> {
111 public:
112 /** @brief Unique-pointer alias for heap allocation. */
113 using UniquePtr = std::unique_ptr<Subscriber<MsgT, SecT>>;
114
115 /** @brief Shared-pointer alias for heap allocation. */
116 using SharedPtr = std::shared_ptr<Subscriber<MsgT, SecT>>;
117
118 /** @brief User-facing callback type for received messages. */
119 using MsgCallback = std::function<void(const MsgT&)>;
120
121 /** @brief Node role identifier (@c kSubscriber). */
122 static constexpr ImplType kImplType = kSubscriber;
123
124 /** @brief Serializer type resolved at compile time from @c MsgT. */
126
127 static_assert(Serializer::is_supported(kMsgType), "<MsgT> is not a supported Serializer type.");
128
129 /**
130 * @brief Creates a @c Subscriber on the heap wrapped in a @c unique_ptr.
131 *
132 * @param url_str Topic URL string (e.g. @c "dds://vehicle/speed").
133 * @param type @c kWithInit to call @c init() immediately (default).
134 * @return @c UniquePtr owning the new subscriber.
135 */
136 [[nodiscard]] static UniquePtr create_unique(const std::string& url_str, InitType type = InitType::kWithInit);
137
138 /**
139 * @brief Creates a @c Subscriber on the heap wrapped in a @c shared_ptr.
140 *
141 * @param url_str Topic URL string.
142 * @param type @c kWithInit to call @c init() immediately (default).
143 * @return @c SharedPtr owning the new subscriber.
144 */
145 [[nodiscard]] static SharedPtr create_shared(const std::string& url_str, InitType type = InitType::kWithInit);
146
147 /**
148 * @brief Constructs a subscriber from a typed transport configuration object.
149 *
150 * @details
151 * Accepts any @c Conf-derived configuration (e.g. @c DdsConf, @c ShmConf).
152 * A compile-time @c static_assert verifies the configuration supports the
153 * subscriber role.
154 *
155 * @tparam ConfT @c Conf-derived configuration type.
156 * @param conf Populated configuration object.
157 * @param type @c kWithInit to call @c init() immediately (default).
158 */
159 // NOLINTNEXTLINE(modernize-use-constraints)
160 template <typename ConfT, typename = std::enable_if_t<std::is_base_of_v<Conf, ConfT>>>
161 explicit Subscriber(const ConfT& conf, InitType type = InitType::kWithInit);
162
163 /**
164 * @brief Constructs a subscriber from a URL string.
165 *
166 * @param url_str Topic URL (e.g. @c "shm://vehicle/speed").
167 * @param type @c kWithInit to call @c init() immediately (default).
168 */
169 explicit Subscriber(const std::string& url_str, InitType type = InitType::kWithInit);
170
171 /**
172 * @brief Registers the receive callback for incoming messages.
173 *
174 * @details
175 * The callback is invoked on every delivery, already deserialized into @c MsgT.
176 * For @c intra:// with an @c IntraDataType-derived shared pointer type
177 * (generated via @c VLINK_INTRA_DATA_DECLARE) the pointer is forwarded
178 * zero-copy. The callback runs on the transport thread unless the
179 * node is @c attach()ed to a @c MessageLoop.
180 *
181 * Calling @c listen() more than once is a fatal error.
182 *
183 * @warning The deserialized message object is @c thread_local and reused
184 * across invocations. Do not store references or pointers to the
185 * callback argument beyond the callback scope; copy the data if
186 * you need to retain it. This does not apply to @c Bytes or
187 * @c IntraData message types, which are passed by value/shared
188 * pointer.
189 *
190 * @note The subscriber must be initialised (either via @c kWithInit in the
191 * constructor or by calling @c init() explicitly) before @c listen()
192 * is called. Calling @c listen() on an uninitialised subscriber
193 * triggers a fatal error.
194 *
195 * @param callback @c void(const MsgT&) invoked for each received message.
196 * @return @c true if registered successfully; @c false on error.
197 */
198 bool listen(MsgCallback&& callback);
199
200 /**
201 * @brief Enables or disables manual-unloan mode for zero-copy receives.
202 *
203 * @details
204 * When enabled, the user must call @c return_loan() after consuming a received
205 * loaned buffer. Only meaningful on loan-capable transports (e.g. @c shm://).
206 *
207 * @param manual_unloan @c true to enable; @c false for automatic (default).
208 */
209 void set_manual_unloan(bool manual_unloan) override;
210
211 /**
212 * @brief Enables or disables per-message latency and sample-loss tracking.
213 *
214 * @param enable @c true to start tracking; @c false to stop.
215 */
216 void set_latency_and_lost_enabled(bool enable);
217
218 /**
219 * @brief Returns @c true if latency and sample-loss tracking is active.
220 *
221 * @return @c true if @c set_latency_and_lost_enabled(true) was called.
222 */
223 [[nodiscard]] bool is_latency_and_lost_enabled() const;
224
225 /**
226 * @brief Returns the most recently measured end-to-end message latency.
227 *
228 * @details
229 * Only meaningful when tracking is enabled. Measured from publication
230 * timestamp to receive timestamp.
231 *
232 * @return Latency in microseconds; @c 0 if tracking is disabled.
233 */
234 [[nodiscard]] int64_t get_latency() const;
235
236 /**
237 * @brief Returns cumulative sample delivery statistics.
238 *
239 * @details
240 * Only meaningful when tracking is enabled.
241 *
242 * @return @c SampleLostInfo with @c total expected and @c lost sample counts.
243 */
244 [[nodiscard]] SampleLostInfo get_lost() const;
245
246 /**
247 * @brief Changes this subscriber's role to @c kGetter (field-reader).
248 *
249 * @details
250 * Updates @c impl_->impl_type from @c kSubscriber to @c kGetter so that
251 * transport-specific field semantics (latest-value delivery) are applied.
252 * If called after @c init(), the extension is automatically reinitialised.
253 * Used internally by @c Getter.
254 */
255 void mark_as_getter();
256
257 private:
258 bool listen_bytes(NodeImpl::MsgCallback&& callback);
259
260 bool listen_intra(NodeImpl::IntraMsgCallback&& callback);
261};
262
263/**
264 * @class SecuritySubscriber
265 * @brief Convenience alias for @c Subscriber with message security enabled.
266 *
267 * @details
268 * Equivalent to @c Subscriber<MsgT, SecurityType::kWithSecurity>. Decrypts
269 * every incoming message using the configured security key or callbacks.
270 *
271 * @note Not supported on @c intra:// or @c dds:// CDR transport.
272 *
273 * @tparam MsgT Message type.
274 */
275template <typename MsgT>
276class SecuritySubscriber : public Subscriber<MsgT, SecurityType::kWithSecurity> {
277 public:
279};
280
281} // namespace vlink
282
Base CRTP template for all VLink communication nodes.
Abstract base class for all transport-specific subscriber implementations.