VLink 2.0.0
A high-performance communication middleware
载入中...
搜索中...
未找到
node_impl.h
浏览该文件的文档.
1/*
2 * Copyright (C) 2026 by Thun Lu. All rights reserved.
3 * Author: Thun Lu <thun.lu@zohomail.cn>
4 * Repo: https://github.com/thun-res/vlink
5 * _ __ __ _ __
6 * | | / / / / (_) ____ / /__
7 * | | / / / / / / / __ \ / //_/
8 * | |/ / / /___ / / / / / / / ,<
9 * |___/ /_____/ /_/ /_/ /_/ /_/|_|
10 *
11 * Licensed under the Apache License, Version 2.0 (the "License");
12 * you may not use this file except in compliance with the License.
13 * You may obtain a copy of the License at
14 *
15 * http://www.apache.org/licenses/LICENSE-2.0
16 *
17 * Unless required by applicable law or agreed to in writing, software
18 * distributed under the License is distributed on an "AS IS" BASIS,
19 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20 * See the License for the specific language governing permissions and
21 * limitations under the License.
22 */
23
24/**
25 * @file node_impl.h
26 * @brief Abstract transport node base classes used by all VLink node implementations.
27 *
28 * @details
29 * This header defines two base classes that form the backbone of the VLink
30 * transport abstraction:
31 *
32 * - @c AbstractNode -- minimal interface for retrieving the underlying native
33 * transport handle (e.g. a DDS DataWriter pointer).
34 * - @c NodeImpl -- the main base class for @c PublisherImpl, @c SubscriberImpl,
35 * @c ServerImpl, @c ClientImpl, @c SetterImpl, and @c GetterImpl. It
36 * provides common infrastructure:
37 *
38 * | Feature | API |
39 * | ---------------------- | ------------------------------------------------- |
40 * | Transport initialise | @c init() / @c deinit() |
41 * | Zero-copy loan | @c loan() / @c return_loan() |
42 * | Suspend / resume | @c suspend() / @c resume() |
43 * | Interrupt | @c interrupt() / @c reset_interrupted() |
44 * | Property store | @c set_property() / @c get_property() |
45 * | Message loop attach | @c attach() / @c detach() / @c get_message_loop() |
46 * | Status callback | @c register_status_handler() / @c call_status() |
47 * | Data recording | @c set_record_path() / @c try_record() |
48 * | Discovery registration | @c init_ext() / @c deinit_ext() |
49 * | Version check | @c check_version() |
50 * | Global init | @c global_init() -- called once per process |
51 *
52 * @par Callback Types
53 * @c NodeImpl defines the standardised callback signatures used throughout all
54 * transport backends:
55 *
56 * | Type | Signature | Used by |
57 * | ----------------- | --------------------------------------- | -------------------------- |
58 * | ConnectCallback | @c void(bool) | PublisherImpl, ClientImpl, ServerImpl |
59 * | StatusCallback | @c void(const Status::BasePtr&) | DDS status events |
60 * | SyncCallback | @c void() | SetterImpl sync |
61 * | ReqRespCallback | @c void(uint64_t, const Bytes&, Bytes*) | ServerImpl listen |
62 * | MsgCallback | @c void(const Bytes&) | SubscriberImpl, GetterImpl |
63 * | IntraMsgCallback | @c void(const IntraData&) | intra:// subscriber |
64 */
65
66#pragma once
67
68#include <any>
69#include <atomic>
70#include <functional>
71#include <memory>
72#include <string>
73
74#include "../base/bytes.h"
75#include "../extension/status.h"
76#include "../impl/conf.h"
77#include "../impl/types.h"
78#include "./intra_data.h"
79#include "./ssl_options.h"
80
81namespace vlink {
82
83/**
84 * @class AbstractNode
85 * @brief Minimal interface for accessing the underlying native transport handle.
86 *
87 * @details
88 * Concrete transport backends (e.g. @c DdsPublisherImpl) inherit from both
89 * @c AbstractNode and their corresponding @c NodeImpl subclass.
90 * @c get_native_handle() returns an @c std::any wrapping the backend-specific
91 * handle, allowing advanced users to access transport internals without
92 * breaking the VLink API boundary.
93 *
94 * @note The base implementation returns an empty @c std::any.
95 */
97 public:
98 /**
99 * @brief Returns the underlying native transport handle wrapped in @c std::any.
100 *
101 * @details
102 * Transport-specific subclasses override this to return, for example, a
103 * DDS @c DataWriter or @c DataReader pointer. Callers must @c std::any_cast
104 * to the correct type. The base implementation returns an empty @c std::any.
105 *
106 * @return Backend-specific handle, or empty @c std::any in the base class.
107 */
108 virtual std::any get_native_handle() const;
109
110 protected:
112 virtual ~AbstractNode();
113
114 private:
116};
117
118/**
119 * @class NodeImpl
120 * @brief Abstract base for all VLink transport backend node implementations.
121 *
122 * @details
123 * Every concrete transport backend (e.g. @c DdsPublisherImpl, @c ShmSubscriberImpl)
124 * ultimately derives from @c NodeImpl. The class provides:
125 * - The pure-virtual @c init() / @c deinit() lifecycle interface.
126 * - Common property storage shared between the @c Conf layer and the node.
127 * - An optional @c MessageLoop attachment for callback dispatch.
128 * - An @c interrupt() mechanism to unblock blocking operations.
129 * - Hooks for data recording (@c try_record) and discovery reporting
130 * (@c init_ext / @c deinit_ext).
131 * - A static @c global_init() that must be called before any node is created
132 * (invoked automatically by the constructor, but safe to call multiple times).
133 *
134 * @note @c suspend() and @c resume() are optional -- the default implementations
135 * log a warning and return @c false if not overridden.
136 *
137 * @note @c register_status_handler() and @c call_status() are only supported on
138 * DDS-family transports (@c kDds, @c kDdsc, @c kDdsr, @c kDdst). Calling
139 * them on other transport types logs a warning and is a no-op.
140 */
142 public:
143 /**
144 * @brief Callback invoked when the peer connection state changes.
145 *
146 * @param bool @c true when connected; @c false when disconnected.
147 */
148 using ConnectCallback = std::function<void(bool)>;
149
150 /**
151 * @brief Callback invoked on DDS status events (e.g. deadline missed).
152 *
153 * @param ptr Polymorphic status object; downcast to the concrete type.
154 */
155 using StatusCallback = std::function<void(const Status::BasePtr& ptr)>;
156
157 /**
158 * @brief Callback invoked when a @c SetterImpl sync completes.
159 */
160 using SyncCallback = std::function<void()>;
161
162 /**
163 * @brief Callback for @c ServerImpl request/response processing.
164 *
165 * @details
166 * Parameters: @c (req_id, request_bytes, response_bytes_ptr).
167 * The handler writes its response into @c *response_bytes_ptr; if
168 * @c response_bytes_ptr is @c nullptr the server is in fire-and-forget mode.
169 */
170 using ReqRespCallback = std::function<void(uint64_t, const Bytes&, Bytes*)>;
171
172 /**
173 * @brief Callback delivering a raw serialised message to a @c SubscriberImpl or @c GetterImpl.
174 *
175 * @param bytes Received payload; lifetime is scoped to the callback.
176 */
177 using MsgCallback = std::function<void(const Bytes&)>;
178
179 /**
180 * @brief Callback delivering an in-process @c IntraData message.
181 *
182 * @details
183 * Only used on @c intra:// transport. The @c IntraData is a @c shared_ptr
184 * to the payload type, so no copy occurs.
185 */
186 using IntraMsgCallback = std::function<void(const IntraData&)>;
187
188 /**
189 * @brief Initialises the underlying transport channel.
190 *
191 * @details
192 * Called by the Node<> template after all properties have been set.
193 * Concrete implementations create DDS entities, SHM channels, etc.
194 */
195 virtual void init() = 0;
196
197 /**
198 * @brief Tears down the underlying transport channel.
199 *
200 * @details
201 * Called by the Node<> template destructor. Releases all transport
202 * resources (e.g. DDS entities, SHM segments).
203 */
204 virtual void deinit() = 0;
205
206 /**
207 * @brief Temporarily suspends message delivery without tearing down the channel.
208 *
209 * @details
210 * The base implementation logs a warning and returns @c false.
211 * Only a subset of transport backends support this operation.
212 *
213 * @return @c true if suspended successfully; @c false if unsupported.
214 */
215 virtual bool suspend();
216
217 /**
218 * @brief Resumes message delivery after a @c suspend() call.
219 *
220 * @details
221 * The base implementation logs a warning and returns @c false.
222 *
223 * @return @c true if resumed successfully; @c false if unsupported.
224 */
225 virtual bool resume();
226
227 /**
228 * @brief Returns @c true when the node is currently suspended.
229 *
230 * @details
231 * The base implementation logs a warning and returns @c false.
232 *
233 * @return @c true if the node is suspended; @c false otherwise or if unsupported.
234 */
235 [[nodiscard]] virtual bool is_suspend() const;
236
237 /**
238 * @brief Signals any blocking operations to unblock and return immediately.
239 *
240 * @details
241 * Sets the internal interrupted flag and, in derived classes
242 * (e.g. @c PublisherImpl, @c ClientImpl), also notifies waiting
243 * condition variables to release threads blocked in @c wait_for_*.
244 * Call @c reset_interrupted() before resuming normal operations.
245 */
246 virtual void interrupt();
247
248 /**
249 * @brief Returns @c true if the transport supports zero-copy loaning.
250 *
251 * @details
252 * Zero-copy loan is a DDS / SHM feature that lets the publisher write
253 * directly into pre-allocated shared memory. The base returns @c false.
254 *
255 * @return @c true if @c loan() / @c return_loan() are supported.
256 */
257 [[nodiscard]] virtual bool is_support_loan() const;
258
259 /**
260 * @brief Borrows a write buffer of @p size bytes from the transport.
261 *
262 * @details
263 * Returns an empty @c Bytes in the base implementation. Transports
264 * that support zero-copy override this to return a @c Bytes view into
265 * pre-allocated shared memory.
266 *
267 * @param size Required buffer size in bytes.
268 * @return Borrowed @c Bytes, or empty on failure.
269 */
270 [[nodiscard]] virtual Bytes loan(int64_t size);
271
272 /**
273 * @brief Returns a previously loaned buffer to the transport.
274 *
275 * @details
276 * Must be called if @c loan() returned a valid buffer and the caller
277 * decided not to publish (e.g. serialisation failed). The base returns
278 * @c false.
279 *
280 * @param bytes Buffer previously returned by @c loan().
281 * @return @c true on success; @c false if unsupported.
282 */
283 virtual bool return_loan(const Bytes& bytes);
284
285 /**
286 * @brief Configures manual unloan mode for zero-copy transports.
287 *
288 * @details
289 * When @p manual_unloan is @c true the transport does not automatically
290 * release the loaned buffer after publish; the caller must call
291 * @c return_loan() explicitly. The base is a no-op.
292 *
293 * @param manual_unloan @c true to enable manual release; @c false for automatic.
294 */
295 virtual void set_manual_unloan(bool manual_unloan);
296
297 /**
298 * @brief Returns a pointer to the associated @c Conf configuration object.
299 *
300 * @details
301 * Concrete backends override this to return their typed @c Conf subclass
302 * (e.g. @c DdsConf). The base returns @c nullptr.
303 *
304 * @return Pointer to the owning @c Conf, or @c nullptr in the base.
305 */
306 [[nodiscard]] virtual const struct Conf* get_conf() const;
307
308 /**
309 * @brief Returns a pointer to the @c AbstractNode peer (if any).
310 *
311 * @details
312 * Used to access the native handle when the impl is split from the node.
313 * The base returns @c nullptr.
314 *
315 * @return Pointer to the @c AbstractNode, or @c nullptr.
316 */
317 [[nodiscard]] virtual const AbstractNode* get_abstract_node() const;
318
319 /**
320 * @brief Retrieves a transport-specific status object.
321 *
322 * @details
323 * Only supported on DDS-family transports. The base logs a warning and
324 * returns @c Status::Unknown.
325 *
326 * @param type The type of status to retrieve (e.g. deadline missed).
327 * @return Polymorphic status object; never @c nullptr.
328 */
329 [[nodiscard]] virtual Status::BasePtr get_status(Status::Type type) const;
330
331 /**
332 * @brief Checks whether @p version matches the runtime VLink library version.
333 *
334 * @details
335 * Compares @p version against @c VLINK_VERSION_MAJOR/MINOR/PATCH. On the
336 * first mismatch a warning is logged (once per process). Version checks are
337 * advisory; mismatches do not prevent node creation.
338 *
339 * @param version Compile-time version embedded by the application.
340 * @return @c true if versions match; @c false otherwise.
341 */
342 virtual bool check_version(const Version& version);
343
344 /**
345 * @brief Attaches the node to a @c MessageLoop for callback dispatch.
346 *
347 * @details
348 * Once attached, @c call_status() posts callbacks onto the loop thread.
349 * Returns @c false if a loop is already attached.
350 *
351 * @param message_loop Loop to attach to; must not be @c nullptr.
352 * @return @c true on success; @c false if already attached.
353 */
354 virtual bool attach(class MessageLoop* message_loop);
355
356 /**
357 * @brief Detaches the node from its @c MessageLoop.
358 *
359 * @details
360 * Waits for the loop to become idle (if the call is from a different thread)
361 * before clearing the pointer, ensuring no callbacks are in-flight after
362 * this call returns.
363 *
364 * @return @c true on success; @c false if no loop was attached.
365 */
366 virtual bool detach();
367
368 /**
369 * @brief Returns the @c MessageLoop this node is attached to.
370 *
371 * @return Pointer to the loop, or @c nullptr if not attached.
372 */
373 [[nodiscard]] class MessageLoop* get_message_loop() const;
374
375 /**
376 * @brief Returns a typed pointer to the conf by downcasting to @c T.
377 *
378 * @details
379 * Convenience wrapper around @c get_conf() for transport backends that
380 * need to access their own @c Conf subclass.
381 *
382 * @tparam T Concrete @c Conf subclass to cast to.
383 * @return @c const T* pointer, or @c nullptr if @c get_conf() returns null.
384 */
385 template <typename T>
386 [[nodiscard]] const T* get_target_conf() const;
387
388 /**
389 * @brief Registers a callback for DDS status events.
390 *
391 * @details
392 * Only effective on DDS-family transports (@c kDds, @c kDdsc, @c kDdsr,
393 * @c kDdst). A warning is logged if called on other transport types.
394 * If a @c MessageLoop is attached, the callback is dispatched on that thread.
395 *
396 * @param callback Handler invoked with each status change.
397 */
399
400 /**
401 * @brief Returns @c true if a status handler has been registered.
402 *
403 * @details
404 * Only effective on DDS-family transports; logs a warning and returns
405 * @c false for other transport types.
406 *
407 * @return @c true if a non-null status callback is registered.
408 */
409 [[nodiscard]] bool has_register_status() const;
410
411 /**
412 * @brief Dispatches a status event to the registered status handler.
413 *
414 * @details
415 * If a @c MessageLoop is attached the callback is posted onto the loop thread;
416 * otherwise it is called directly. Only effective on DDS-family transports.
417 *
418 * @param ptr Status object to deliver.
419 */
421
422 /**
423 * @brief Sets a named transport property on this node.
424 *
425 * @details
426 * Properties are key/value strings (e.g. @c "dds.ip" = @c "192.168.1.1").
427 * Must be called before @c init() to take effect. The property map is
428 * protected by a shared mutex for thread safety.
429 *
430 * @param prop Property name.
431 * @param value Property value.
432 */
433 void set_property(const std::string& prop, const std::string& value);
434
435 /**
436 * @brief Retrieves a named transport property.
437 *
438 * @param prop Property name.
439 * @return Property value, or empty string if not set.
440 */
441 [[nodiscard]] std::string get_property(const std::string& prop) const;
442
443 /**
444 * @brief Returns a snapshot of all properties set on this node.
445 *
446 * @return A copy of the internal @c PropertiesMap.
447 */
449
450 /**
451 * @brief Enables or disables discovery reporting for this node.
452 *
453 * @details
454 * When @c false the node is hidden from @c DiscoveryReporter / @c DiscoveryViewer.
455 * Proxy-internal channels use this to suppress themselves from topology views.
456 * Must be called before @c init_ext().
457 *
458 * @param enable @c true to report to discovery (default); @c false to suppress.
459 */
460 void set_discovery_enabled(bool enable);
461
462 /**
463 * @brief Returns @c true if discovery reporting is enabled for this node.
464 *
465 * @return Current discovery-enabled state.
466 */
467 [[nodiscard]] bool get_discovery_enabled() const;
468
469 /**
470 * @brief Sets the file path for per-node message recording.
471 *
472 * @details
473 * When non-empty a @c BagWriter instance is obtained (or shared with other
474 * nodes writing to the same path) and @c try_record() will capture messages.
475 * Pass an empty string to disable per-node recording.
476 *
477 * @param path File path for the bag; empty disables recording.
478 */
479 void set_record_path(const std::string& path);
480
481 /**
482 * @brief Merges SSL/TLS options into the node property map.
483 *
484 * @details
485 * Acquires the helper mutex and calls @c SslOptions::parse_to() to
486 * write the non-default fields of @p options as @c ssl.* entries in
487 * @c helper_->property_map. The transport factory reads these entries
488 * during connection setup to configure TLS.
489 *
490 * @param options The SSL/TLS configuration to merge.
491 *
492 * @see SslOptions::parse_to(), Node::set_ssl_options()
493 */
494 void set_ssl_options(const SslOptions& options);
495
496 /**
497 * @brief Records a message to the global and/or per-node bag writers.
498 *
499 * @details
500 * Queries the global @c BagWriter (if one is active) and the per-node writer
501 * set by @c set_record_path(). CDR DDS messages and intra messages are
502 * excluded by default.
503 *
504 * @param action_type The role this message is being recorded for.
505 * @param data Raw serialised message bytes.
506 */
507 void try_record(ActionType action_type, const Bytes& data);
508
509 /**
510 * @brief Clears the interrupted flag set by @c interrupt().
511 *
512 * @details
513 * Must be called before re-using a blocking operation after it has been
514 * interrupted, e.g. before calling @c wait_for_subscribers() again.
515 */
517
518 /**
519 * @brief Returns @c true if @c interrupt() has been called and not yet reset.
520 *
521 * @return @c true when the interrupted flag is set.
522 */
523 [[nodiscard]] bool is_interrupted() const;
524
525 /**
526 * @brief Registers the node with the global @c DiscoveryReporter.
527 *
528 * @details
529 * Called at the end of @c init() by all Node<> template specialisations.
530 * Optionally starts the @c CpuProfiler if global profiling is enabled.
531 * Skips registration for CDR, security, and (by default) intra nodes.
532 */
533 void init_ext();
534
535 /**
536 * @brief Deregisters the node from the global @c DiscoveryReporter.
537 *
538 * @details
539 * Called at the start of @c deinit() by all Node<> template specialisations.
540 * Restarts the @c CpuProfiler if global profiling was running.
541 */
543
544 /**
545 * @brief Initialises process-wide VLink singletons.
546 *
547 * @details
548 * Ensures the @c Logger, memory pool, global @c BagWriter, and global
549 * @c DiscoveryReporter are initialised. Safe to call multiple times; only
550 * the first call has an effect. The constructor calls this automatically.
551 */
552 static void global_init();
553
554 std::atomic_bool has_suspend{false}; ///< Atomic suspend state flag (currently unused by default impls).
555
556 std::string url; ///< Full URL string of this node (e.g. @c "dds://my/topic").
557 std::string ser_type; ///< Serialisation type string (e.g. @c "demo.proto.PointCloud").
558 ImplType impl_type{kUnknownImplType}; ///< Role of this implementation node.
559 SchemaType schema_type{SchemaType::kUnknown}; ///< Coarse schema family reported to discovery and bag/proxy paths.
560 TransportType transport_type{TransportType::kUnknown}; ///< Transport backend of this implementation node.
561 bool is_cdr_type{false}; ///< @c true when using DDS native CDR serialisation.
562 bool is_security_type{false}; ///< @c true when security-authenticated transport is enabled.
563 bool is_discovery_enabled{true}; ///< Whether this node is reported to the discovery layer.
564 std::unique_ptr<class CpuProfiler> profiler; ///< Optional per-node CPU profiler (only when global profiling is on).
565
566 protected:
567 explicit NodeImpl(ImplType type);
568
569 virtual ~NodeImpl();
570
571 private:
572 std::unique_ptr<struct NodeImplHelper> helper_;
573
575};
576
577////////////////////////////////////////////////////////////////
578/// Details
579////////////////////////////////////////////////////////////////
580
581template <typename T>
582inline const T* NodeImpl::get_target_conf() const {
583 return static_cast<const T*>(get_conf());
584}
585
586} // namespace vlink
Versatile byte buffer with small-buffer optimisation, ownership semantics and compression.
Abstract transport configuration base class and associated helper macros.
Zero-serialisation in-process message container for intra:// transport.
#define VLINK_EXPORT
定义 macros.h:85
#define VLINK_DISALLOW_COPY_AND_ASSIGN(classname)
Deletes the copy constructor and copy-assignment operator of classname.
定义 macros.h:184
Transport-layer SSL/TLS configuration for VLink communication backends.
DDS-compatible status type hierarchy for VLink publisher and subscriber callbacks.
Core type definitions shared across all VLink node implementations.