VLink 2.0.0
A high-performance communication middleware
载入中...
搜索中...
未找到
node.h
浏览该文件的文档.
1/*
2 * Copyright (C) 2026 by Thun Lu. All rights reserved.
3 * Author: Thun Lu <thun.lu@zohomail.cn>
4 * Repo: https://github.com/thun-res/vlink
5 * _ __ __ _ __
6 * | | / / / / (_) ____ / /__
7 * | | / / / / / / / __ \ / //_/
8 * | |/ / / /___ / / / / / / / ,<
9 * |___/ /_____/ /_/ /_/ /_/ /_/|_|
10 *
11 * Licensed under the Apache License, Version 2.0 (the "License");
12 * you may not use this file except in compliance with the License.
13 * You may obtain a copy of the License at
14 *
15 * http://www.apache.org/licenses/LICENSE-2.0
16 *
17 * Unless required by applicable law or agreed to in writing, software
18 * distributed under the License is distributed on an "AS IS" BASIS,
19 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20 * See the License for the specific language governing permissions and
21 * limitations under the License.
22 */
23
24/**
25 * @file node.h
26 * @brief Base CRTP template for all VLink communication nodes.
27 *
28 * @details
29 * @c Node<ImplT, SecT> is the common base class inherited by @c Publisher,
30 * @c Subscriber, @c Server, @c Client, @c Setter, and @c Getter. It owns
31 * the transport-specific implementation pointer (@c impl_), drives the node
32 * lifecycle, and provides the shared services described below.
33 *
34 * @par Architecture Overview
35 * @code
36 * +---------------------------------------------------+
37 * | User Application |
38 * | Publisher<T> Subscriber<T> Client<Req,Resp> |
39 * | Server<Req,Resp> Getter<T> Setter<T> |
40 * +-------------------+-------------------------------+
41 * | inherits
42 * +-------------------v-------------------------------+
43 * | Node<ImplT, SecT> |
44 * | lifecycle loans security properties profiler |
45 * +-------------------+-------------------------------+
46 * | owns (unique_ptr)
47 * +-------------------v-------------------------------+
48 * | ImplT (PublisherImpl / SubscriberImpl / ...) |
49 * +-------------------+-------------------------------+
50 * | creates / calls
51 * +-------------------v-------------------------------+
52 * | Transport Back-end |
53 * | intra shm shm2 dds ddsc zenoh ... |
54 * +---------------------------------------------------+
55 * @endcode
56 *
57 * @par Lifecycle
58 * | Step | Method | Notes |
59 * | ------------------ | --------------------- | ------------------------------------------- |
60 * | Construction | constructor | Parses URL, creates impl via Conf factory. |
61 * | Initialisation | @c init() | Calls impl init + init_ext, sets loan flag. |
62 * | Use | publish/listen/invoke | Normal operation. |
63 * | Interrupt | @c interrupt() | Unblocks any blocking wait immediately. |
64 * | De-initialisation | @c deinit() | Calls interrupt(), then impl deinit. |
65 * | Destruction | destructor | Calls @c deinit() automatically. |
66 *
67 * @par Deferred Initialisation
68 * @code
69 * Publisher<MyMsg> pub("dds://topic", InitType::kWithoutInit);
70 * pub.set_ser_type("my.custom.Type"); // configure before init
71 * pub.init();
72 * @endcode
73 *
74 * @par Security
75 * Enable per-message encryption by using @c SecurityType::kWithSecurity:
76 * @code
77 * SecurityPublisher<MyMsg> pub("shm://topic");
78 * pub.set_security_key("my-secret");
79 * @endcode
80 * @note @c intra:// and @c dds:// with CDR serialisation do NOT support security.
81 *
82 * @par Zero-copy Loans
83 * On @c shm:// (Iceoryx) and @c shm2:// (Iceoryx2) transports, a loaned
84 * buffer avoids extra copies:
85 * @code
86 * if (pub.is_support_loan()) {
87 * Bytes buf = pub.loan(sizeof(MyStruct));
88 * // fill buf ...
89 * pub.publish(buf); // loan is returned automatically
90 * }
91 * @endcode
92 *
93 * @tparam ImplT Concrete transport implementation (must inherit @c NodeImpl).
94 * @tparam SecT Security mode: @c kWithoutSecurity (default) or @c kWithSecurity.
95 */
96
97#pragma once
98
99#include <atomic>
100#include <memory>
101#include <mutex>
102#include <optional>
103#include <string>
104
105#include "./extension/security.h"
106#include "./impl/node_impl.h"
107
108namespace vlink {
109
110/**
111 * @class Node
112 * @brief Transport-agnostic CRTP base for all VLink communication nodes.
113 *
114 * @details
115 * All six VLink communication primitives inherit from this template.
116 * It manages the @c ImplT implementation pointer and provides the shared
117 * API surface described in the file-level documentation above.
118 *
119 * @tparam ImplT Concrete impl class (e.g. @c PublisherImpl, @c GetterImpl).
120 * @tparam SecT Security mode (@c kWithoutSecurity or @c kWithSecurity).
121 */
122template <typename ImplT, SecurityType SecT>
123class Node {
124 public:
125 /**
126 * @brief Callback type for node status-change notifications.
127 */
129
130 /**
131 * @brief Initialises the node and its transport back-end.
132 *
133 * @details
134 * Uses an atomic compare-exchange to guard against double-initialisation.
135 * On success: calls @c impl_->init(), @c impl_->init_ext(), and queries
136 * the transport for zero-copy loan support. Calling @c init() on an
137 * already-initialised node is a no-op that returns @c false.
138 *
139 * @return @c true on first successful initialisation; @c false otherwise.
140 */
141 virtual bool init();
142
143 /**
144 * @brief Shuts down the node and releases all transport resources.
145 *
146 * @details
147 * Uses an atomic compare-exchange to prevent double-deinit. Calls
148 * @c interrupt() first, then @c impl_->deinit() and @c impl_->deinit_ext().
149 * When safe-quit mode is active the deinit sequence runs under the quit mutex.
150 * The destructor calls @c deinit() automatically, so explicit calls are only
151 * needed for early shutdown.
152 *
153 * @return @c true on first successful deinit; @c false if not initialised.
154 */
155 virtual bool deinit();
156
157 /**
158 * @brief Unblocks any active blocking wait on this node.
159 *
160 * @details
161 * Signals the internal interrupted flag and wakes the condition variable so
162 * that calls such as @c wait_for_subscribers(), @c wait_for_connected(), and
163 * @c wait_for_value() return immediately with @c false. Subclasses such as
164 * @c Getter, @c Publisher, and @c Client override this to also wake their
165 * own blocking condition variables.
166 */
167 virtual void interrupt();
168
169 /**
170 * @brief Returns @c true if @c init() has been successfully called.
171 *
172 * @return @c true when the node is in the initialised state.
173 */
174 [[nodiscard]] bool has_inited() const;
175
176 /**
177 * @brief Returns @c true if the transport supports zero-copy loaned buffers.
178 *
179 * @details
180 * Currently the @c shm:// (Iceoryx) and @c shm2:// (Iceoryx2) back-ends
181 * return @c true here. When loans are supported, @c publish() / @c set()
182 * / @c reply() will automatically use them to avoid an extra memory copy.
183 *
184 * @return @c true if @c loan() / @c return_loan() are meaningful.
185 */
186 [[nodiscard]] bool is_support_loan() const;
187
188 /**
189 * @brief Allocates a loaned buffer from the transport memory pool.
190 *
191 * @details
192 * Returns a @c Bytes backed by transport-managed memory of @p size bytes.
193 * The caller must either pass it to a publish/write/reply call (which
194 * returns the loan automatically) or call @c return_loan() explicitly.
195 * Returns an empty @c Bytes on failure or if the transport has no loan pool.
196 *
197 * @param size Requested size in bytes (@c 0 is valid for empty messages).
198 * @return Loaned @c Bytes, or empty @c Bytes on failure.
199 */
200 [[nodiscard]] Bytes loan(int64_t size);
201
202 /**
203 * @brief Returns a previously loaned buffer back to the transport pool.
204 *
205 * @details
206 * Must be called if a loaned buffer obtained via @c loan() is not consumed by
207 * a publish/write call. Failing to return a loan can exhaust the shared
208 * memory pool.
209 *
210 * @param bytes The loaned @c Bytes to return.
211 * @return @c true on success; @c false if the buffer is not a valid loan.
212 */
213 bool return_loan(const Bytes& bytes);
214
215 /**
216 * @brief Enables or disables manual-unloan mode for zero-copy receives.
217 *
218 * @details
219 * In manual-unloan mode the user is responsible for calling @c return_loan()
220 * after consuming received data. The base implementation logs a warning;
221 * only @c Subscriber and @c Getter override this method.
222 *
223 * @param manual_unloan @c true to enable; @c false for automatic (default).
224 */
225 virtual void set_manual_unloan(bool manual_unloan);
226
227 /**
228 * @brief Returns @c true if manual-unloan mode is active.
229 *
230 * @return @c true if @c set_manual_unloan(true) was called.
231 */
232 [[nodiscard]] virtual bool is_manual_unloan() const;
233
234 /**
235 * @brief Suspends message delivery on this node.
236 *
237 * @details
238 * Behaviour is transport-dependent: some back-ends buffer incoming messages
239 * while suspended; others drop them. Pair with @c resume() to re-enable.
240 *
241 * @return @c true if suspension succeeded; @c false on error.
242 */
243 bool suspend();
244
245 /**
246 * @brief Resumes message delivery after a @c suspend().
247 *
248 * @return @c true if resumption succeeded; @c false on error.
249 */
250 bool resume();
251
252 /**
253 * @brief Returns @c true if the node is currently suspended.
254 *
255 * @return @c true while @c suspend() is in effect.
256 */
257 [[nodiscard]] bool is_suspend() const;
258
259 /**
260 * @brief Attaches the node to a @c MessageLoop for callback dispatching.
261 *
262 * @details
263 * After attachment, incoming-message callbacks are posted to @p message_loop
264 * rather than invoked on the transport thread. This serialises delivery to
265 * the loop's thread, which is useful for single-threaded application code.
266 *
267 * @param message_loop Non-null pointer to the target @c MessageLoop.
268 * @return @c true on success; @c false if a @c MessageLoop is
269 * already attached.
270 */
271 bool attach(class MessageLoop* message_loop);
272
273 /**
274 * @brief Detaches the node from its current @c MessageLoop.
275 *
276 * @details
277 * After detachment, callbacks are again invoked on the transport thread.
278 *
279 * @return @c true on success; @c false if no loop was attached.
280 */
281 bool detach();
282
283 /**
284 * @brief Returns the @c MessageLoop this node is attached to.
285 *
286 * @return Pointer to the attached @c MessageLoop, or @c nullptr.
287 */
288 [[nodiscard]] class MessageLoop* get_message_loop() const;
289
290 /**
291 * @brief Returns the abstract node handle for graph introspection.
292 *
293 * @details
294 * The @c AbstractNode pointer can be used with @c AbstractFactory to query
295 * peer nodes in the same transport graph, or passed to the proxy monitoring
296 * API for runtime topology inspection.
297 *
298 * @return Non-owning pointer to the @c AbstractNode, or @c nullptr if the
299 * transport back-end does not expose an @c AbstractNode.
300 */
301 [[nodiscard]] const AbstractNode* get_abstract_node() const;
302
303 /**
304 * @brief Returns the current status object for the specified status type.
305 *
306 * @details
307 * Returns a polymorphic status shared pointer. The concrete type and set of
308 * available types depend on the active transport. If the transport does not
309 * support the requested @p type, a @c Status::Unknown instance is returned
310 * and a warning is logged.
311 *
312 * @param type Category of status to retrieve.
313 * @return Shared pointer to status data; returns @c Status::Unknown
314 * when the transport does not support the query.
315 */
316 [[nodiscard]] Status::BasePtr get_status(Status::Type type) const;
317
318 /**
319 * @brief Registers a handler called when the node's status changes.
320 *
321 * @details
322 * Only one handler can be registered; subsequent calls replace the previous
323 * one. The handler is invoked with a @c Status::BasePtr describing the new
324 * state (e.g. connected, disconnected, error).
325 *
326 * @param callback Callable @c void(Status::BasePtr) invoked on status changes.
327 */
329
330 /**
331 * @brief Sets a transport-specific key-value property on the node.
332 *
333 * @details
334 * Provides an extensibility mechanism for back-end-specific tuning knobs
335 * that do not have dedicated API methods. Recognised keys depend on the
336 * active transport.
337 *
338 * @param prop Property key string.
339 * @param value Property value string.
340 */
341 void set_property(const std::string& prop, const std::string& value);
342
343 /**
344 * @brief Retrieves a transport-specific property value.
345 *
346 * @param prop Property key string.
347 * @return Property value string; empty if key is not recognised.
348 */
349 [[nodiscard]] std::string get_property(const std::string& prop) const;
350
351 /**
352 * @brief Returns the @c TransportType of the transport this node is bound to.
353 *
354 * @return Enumerator such as @c kDds, @c kShm, @c kIntra, etc.
355 */
356 [[nodiscard]] TransportType get_transport_type() const;
357
358 /**
359 * @brief Returns the URL string used to construct this node.
360 *
361 * @details
362 * Non-empty only when the node was constructed via a URL string or @c Url
363 * object; empty for @c ConfT-based construction.
364 *
365 * @return Reference to the URL (e.g. @c "dds://vehicle/speed").
366 */
367 [[nodiscard]] const std::string& get_url() const;
368
369 /**
370 * @brief Sets the symmetric encryption key for message security.
371 *
372 * @details
373 * Requires @c SecT == @c kWithSecurity (enforced by @c static_assert).
374 * Not supported for @c intra:// or @c dds:// CDR nodes (triggers fatal log).
375 *
376 * @param key Encryption key string; interpretation depends on the security impl.
377 */
378 void set_security_key(const std::string& key);
379
380 /**
381 * @brief Sets the filesystem path for message bag recording.
382 *
383 * @details
384 * Enables recording of each published/received message to a bag file.
385 * Not supported on @c intra:// or @c dds:// CDR nodes (triggers fatal log).
386 *
387 * @param path Path to the recording directory or file.
388 */
389 void set_record_path(const std::string& path);
390
391 /**
392 * @brief Installs custom encrypt and decrypt callbacks.
393 *
394 * @details
395 * Alternative to @c set_security_key() for custom cipher implementations.
396 * Requires @c SecT == @c kWithSecurity (enforced by @c static_assert).
397 *
398 * @param encrypt_callback @c bool(const Bytes& in, Bytes& out) -- encrypts a message, returns true on success.
399 * @param decrypt_callback @c bool(const Bytes& in, Bytes& out) -- decrypts a message, returns true on success.
400 */
401 void set_security_callbacks(Security::Callback&& encrypt_callback, Security::Callback&& decrypt_callback);
402
403 /**
404 * @brief Overrides the runtime wire metadata for this node.
405 *
406 * @details
407 * @p ser_type stores the concrete runtime type identifier, while
408 * @p schema_type stores the coarse decoder family used by discovery, proxy,
409 * and bag metadata. When @p schema_type is @c SchemaType::kUnknown (the
410 * default), the node does not explicitly override the current family; in
411 * that mode it keeps the existing protobuf / flatbuffers family unless the
412 * new @p ser_type itself clearly implies @c kRaw or @c kZeroCopy. Passing
413 * an empty @p ser_type clears both fields.
414 *
415 * If called after @c init(), the transport extension is restarted once so
416 * external metadata stays in sync.
417 *
418 * @param ser_type Concrete serialisation type identifier, or empty to clear the current metadata.
419 * @param schema_type Explicit coarse schema family to expose; default
420 * @c kUnknown preserves the current family unless
421 * @p ser_type implies a different one.
422 */
423 void set_ser_type(const std::string& ser_type, SchemaType schema_type = SchemaType::kUnknown);
424
425 /**
426 * @brief Returns the current serialisation type string.
427 *
428 * @return Reference to the type string stored in the impl.
429 */
430 [[nodiscard]] const std::string& get_ser_type() const;
431
432 /**
433 * @brief Returns the current coarse schema family.
434 *
435 * @return The schema family stored in the node implementation.
436 */
437 [[nodiscard]] SchemaType get_schema_type() const;
438
439 /**
440 * @brief Enables or disables peer-discovery on this node.
441 *
442 * @details
443 * Disabling discovery reduces CPU and network overhead for nodes that never
444 * need to locate peers. If called after @c init(), the transport extension
445 * is automatically reinitialised to apply the change.
446 *
447 * @param enable @c true (default) to enable discovery; @c false to disable.
448 */
449 void set_discovery_enabled(bool enable);
450
451 /**
452 * @brief Returns @c true if peer-discovery is currently enabled.
453 *
454 * @return @c true if discovery is active.
455 */
456 [[nodiscard]] bool get_discovery_enabled() const;
457
458 /**
459 * @brief Binds a Protobuf Arena for arena-allocated message objects.
460 *
461 * @details
462 * Required when @c MsgT is a raw Protobuf pointer type (e.g. @c MyProto*).
463 * The arena must outlive this node. Forgetting to bind an arena for
464 * proto-pointer types causes a fatal log on the first received message.
465 *
466 * @param proto_arena Pointer to a @c google::protobuf::Arena instance (as @c void*).
467 */
468 void bind_proto_arena(void* proto_arena);
469
470 /**
471 * @brief Returns the cumulative CPU usage ratio for this node.
472 *
473 * @details
474 * Returns the percentage of wall-clock time that this node has spent in
475 * active publish/receive operations since the profiler was started.
476 * Available only when the CPU profiler is built in (i.e. @c VLINK_DISABLE_PROFILER
477 * is not defined) and global profiling is enabled via the @c VLINK_PROFILER_ENABLE
478 * environment variable. Returns @c -1.0 if no profiler is attached to the impl.
479 *
480 * @return CPU usage percentage [0.0, 100.0], or @c -1.0 if unavailable.
481 */
482 [[nodiscard]] double get_cpu_usage() const;
483
484 /**
485 * @brief Returns @c true if safe-quit mode is currently active.
486 *
487 * @details
488 * Safe-quit mode holds a @c std::mutex around all user callbacks and around
489 * @c deinit(), preventing use-after-free races when a node is destroyed while
490 * a callback is in flight.
491 *
492 * @return @c true if the safe-quit mutex is engaged.
493 */
494 [[nodiscard]] bool get_safety_quit() const;
495
496 /**
497 * @brief Enables or disables safe-quit mode.
498 *
499 * @details
500 * When @p safety_quit is @c true, an internal @c std::mutex is allocated and
501 * locked around every callback invocation and around @c deinit(). Enable
502 * this when the node's lifetime is shorter than the callback's scope. There
503 * is a small synchronisation overhead; avoid enabling it on hot paths.
504 *
505 * @param safety_quit @c true to enable; @c false to disable (default).
506 */
507 void set_safety_quit(bool safety_quit);
508
509 /**
510 * @brief Configures transport-layer SSL/TLS encryption for this node.
511 *
512 * @details
513 * Merges the fields of @p options into the node's internal property map
514 * via @c SslOptions::parse_to(). The transport back-end reads the
515 * resulting @c ssl.* properties during @c init() to set up a TLS
516 * connection. This method must be called **before** @c init() for the
517 * settings to take effect.
518 *
519 * SSL is considered enabled when @c SslOptions::is_valid() returns
520 * @c true (i.e. at least @c ca_file or @c cert_file is non-empty).
521 * Not all back-ends support TLS; see the @c SslOptions file-level
522 * documentation for the per-backend compatibility table.
523 *
524 * This call is thread-safe; the property map is updated under a mutex.
525 *
526 * @par Example
527 * @code
528 * Publisher<MyMsg> pub("mqtt://sensor/data", InitType::kWithoutInit);
529 * SslOptions ssl;
530 * ssl.ca_file = "/etc/certs/ca.pem";
531 * ssl.cert_file = "/etc/certs/client.pem";
532 * ssl.key_file = "/etc/certs/client-key.pem";
533 * pub.set_ssl_options(ssl);
534 * pub.init();
535 * @endcode
536 *
537 * @param options The SSL/TLS configuration to apply.
538 *
539 * @see SslOptions, set_property()
540 */
541 void set_ssl_options(const SslOptions& options);
542
543 protected:
544 Node();
545
546 virtual ~Node();
547
548 void enable_security();
549
550 template <typename CallbackT, typename... ArgsT>
551 void invoke_callback(const CallbackT& callback, ArgsT&&... args);
552
553 template <typename TypeT>
554 TypeT get_default_value();
555
556 std::atomic_bool has_inited_{false};
557
558 std::unique_ptr<ImplT> impl_;
559 std::optional<Security> security_;
560 std::optional<std::mutex> quit_mtx_;
561 void* proto_arena_{nullptr};
562 bool is_support_loan_{false};
563 bool is_manual_unloan_{false};
564
565 private:
567};
568
569} // namespace vlink
570
571#include "./internal/node-inl.h"
#define VLINK_DISALLOW_COPY_AND_ASSIGN(classname)
Deletes the copy constructor and copy-assignment operator of classname.
定义 macros.h:184
Abstract transport node base classes used by all VLink node implementations.
AES-128-CBC encryption/decryption with optional custom callback override.