VLink 2.0.0
A high-performance communication middleware
载入中...
搜索中...
未找到
publisher_impl.h
浏览该文件的文档.
1/*
2 * Copyright (C) 2026 by Thun Lu. All rights reserved.
3 * Author: Thun Lu <thun.lu@zohomail.cn>
4 * Repo: https://github.com/thun-res/vlink
5 * _ __ __ _ __
6 * | | / / / / (_) ____ / /__
7 * | | / / / / / / / __ \ / //_/
8 * | |/ / / /___ / / / / / / / ,<
9 * |___/ /_____/ /_/ /_/ /_/ /_/|_|
10 *
11 * Licensed under the Apache License, Version 2.0 (the "License");
12 * you may not use this file except in compliance with the License.
13 * You may obtain a copy of the License at
14 *
15 * http://www.apache.org/licenses/LICENSE-2.0
16 *
17 * Unless required by applicable law or agreed to in writing, software
18 * distributed under the License is distributed on an "AS IS" BASIS,
19 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20 * See the License for the specific language governing permissions and
21 * limitations under the License.
22 */
23
24/**
25 * @file publisher_impl.h
26 * @brief Abstract base class for all transport-specific publisher implementations.
27 *
28 * @details
29 * @c PublisherImpl is the intermediate layer between the generic @c Publisher<T> template
30 * and a concrete transport backend (e.g. @c DdsPublisherImpl, @c ShmPublisherImpl).
31 * It inherits from @c NodeImpl and adds publish-side semantics:
32 *
33 * - Subscriber discovery and change notification via @c detect_subscribers() /
34 * @c update_subscribers().
35 * - Blocking wait for at least one subscriber with @c wait_for_subscribers().
36 * - Two write overloads: serialised @c Bytes for network transports and zero-copy
37 * @c IntraData for the @c intra:// backend.
38 *
39 * @par Subscriber Discovery Flow
40 * @code
41 * // Transport detects subscriber appearance or disappearance:
42 * publisher_impl->update_subscribers();
43 * // -> compares has_subscribers() against cached state
44 * // -> notifies the condition variable
45 * // -> fires the ConnectCallback registered via detect_subscribers()
46 * @endcode
47 *
48 * @note Concrete subclasses must implement @c has_subscribers() and
49 * @c write(const Bytes&). @c write(const IntraData&) is only overridden by
50 * the @c intra:// backend; all other transports inherit the base no-op that
51 * logs a warning and returns @c false.
52 */
53
54#pragma once
55
56#include <chrono>
57#include <memory>
58
59#include "./node_impl.h"
60
61namespace vlink {
62
63/**
64 * @class PublisherImpl
65 * @brief Transport-agnostic base for publisher node implementations.
66 *
67 * @details
68 * Provides the subscriber-detection infrastructure (condition variable + callback)
69 * used by @c Publisher<T>::wait_for_subscribers() and
70 * @c Publisher<T>::detect_subscribers(). Concrete backends override
71 * @c has_subscribers() to query the transport layer and call @c update_subscribers()
72 * whenever the subscriber count changes.
73 */
75 public:
76 /**
77 * @brief Destructor.
78 */
79 ~PublisherImpl() override;
80
81 /**
82 * @brief Interrupts the publisher, waking any blocked @c wait_for_subscribers() call.
83 *
84 * @details
85 * Calls @c NodeImpl::interrupt() to set the interrupted flag, then
86 * @c notify_all() on the internal condition variable so that any thread blocked
87 * in @c wait_for_subscribers() returns immediately with @c false.
88 */
89 void interrupt() override;
90
91 /**
92 * @brief Registers a callback to be fired when the subscriber presence changes.
93 *
94 * @details
95 * The @p callback is stored and invoked with @c true when the first subscriber
96 * appears and @c false when the last one disconnects. If subscribers are already
97 * present at registration time the callback is fired immediately with @c true
98 * before this function returns.
99 *
100 * @param callback Callable @c void(bool) to invoke on subscriber change.
101 */
102 virtual void detect_subscribers(ConnectCallback&& callback);
103
104 /**
105 * @brief Blocks until at least one subscriber is present or the timeout elapses.
106 *
107 * @details
108 * Returns immediately if @c has_subscribers() is already @c true. Otherwise
109 * waits on an internal condition variable that is notified by
110 * @c update_subscribers() and @c interrupt().
111 *
112 * - @p timeout < 0 (e.g. @c Timeout::kInfinite): waits indefinitely.
113 * - @p timeout >= 0: returns @c false if no subscriber arrives within the period.
114 *
115 * @param timeout Maximum time to wait; negative value means wait forever.
116 * @return @c true if a subscriber was detected; @c false on timeout
117 * or interruption.
118 */
119 virtual bool wait_for_subscribers(std::chrono::milliseconds timeout);
120
121 /**
122 * @brief Returns @c true when at least one subscriber is currently connected.
123 *
124 * @details
125 * Must be implemented by each concrete transport backend. Called by
126 * @c wait_for_subscribers() and @c update_subscribers() to determine whether
127 * the subscriber presence state has changed.
128 *
129 * @return @c true if one or more subscribers are connected; @c false otherwise.
130 */
131 [[nodiscard]] virtual bool has_subscribers() const = 0;
132
133 /**
134 * @brief Publishes a serialised message to all connected subscribers.
135 *
136 * @details
137 * Must be implemented by each concrete transport backend. @p msg_data contains
138 * the fully serialised payload produced by @c Serializer::serialize().
139 *
140 * @param msg_data Serialised message bytes to transmit.
141 * @return @c true if the message was delivered (or queued) successfully;
142 * @c false on error.
143 */
144 virtual bool write(const Bytes& msg_data) = 0;
145
146 /**
147 * @brief Publishes an in-process zero-copy message.
148 *
149 * @details
150 * Used exclusively on the @c intra:// transport to pass @c IntraData directly
151 * to subscribers in the same process without serialisation. The default
152 * implementation logs a warning and returns @c false; only @c IntraPublisherImpl
153 * overrides this method.
154 *
155 * @param intra_data Shared pointer to the in-process message payload.
156 * @return @c true if the message was delivered; @c false if this
157 * transport does not support @c IntraData.
158 */
159 virtual bool write(const IntraData& intra_data);
160
161 /**
162 * @brief Notifies the subscriber-detection subsystem that subscriber presence may
163 * have changed.
164 *
165 * @details
166 * Called by the concrete transport backend whenever a subscriber connects or
167 * disconnects. Compares the current @c has_subscribers() result against the
168 * cached state; if it differs, the condition variable is notified and the
169 * registered @c ConnectCallback is fired.
170 *
171 * @note
172 * This method is intended to be called from the transport's internal discovery
173 * thread, not from user code.
174 */
176
177 protected:
178 /**
179 * @brief Protected constructor; initialises the publisher with @c kPublisher role.
180 */
182
183 private:
184 std::unique_ptr<struct PublisherImplHelper> helper_;
185
187};
188
189} // namespace vlink
#define VLINK_EXPORT
定义 macros.h:85
#define VLINK_DISALLOW_COPY_AND_ASSIGN(classname)
Deletes the copy constructor and copy-assignment operator of classname.
定义 macros.h:184
Abstract transport node base classes used by all VLink node implementations.