VLink 2.0.0
A high-performance communication middleware
Loading...
Searching...
No Matches
abstract_factory.h
Go to the documentation of this file.
1/*
2 * Copyright (C) 2026 by Thun Lu. All rights reserved.
3 * Author: Thun Lu <thun.lu@zohomail.cn>
4 * Repo: https://github.com/thun-res/vlink
5 * _ __ __ _ __
6 * | | / / / / (_) ____ / /__
7 * | | / / / / / / / __ \ / //_/
8 * | |/ / / /___ / / / / / / / ,<
9 * |___/ /_____/ /_/ /_/ /_/ /_/|_|
10 *
11 * Licensed under the Apache License, Version 2.0 (the "License");
12 * you may not use this file except in compliance with the License.
13 * You may obtain a copy of the License at
14 *
15 * http://www.apache.org/licenses/LICENSE-2.0
16 *
17 * Unless required by applicable law or agreed to in writing, software
18 * distributed under the License is distributed on an "AS IS" BASIS,
19 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20 * See the License for the specific language governing permissions and
21 * limitations under the License.
22 */
23
24/**
25 * @file abstract_factory.h
26 * @brief Topic-keyed factory and multi-implementation callback registry for VLink nodes.
27 *
28 * @details
29 * This header provides two cooperating templates used internally by every VLink
30 * node type (Publisher, Subscriber, Client, Server, Setter, Getter) to multiplex
31 * callbacks across multiple concurrent transport implementations sharing the same
32 * logical topic:
33 *
34 * @par AbstractObject<FilterT>
35 * A per-topic object that holds:
36 * - A set of active @c NodeImpl* instances registered on this topic.
37 * - Per-impl callback maps for all six callback types (server-connect, sub-connect,
38 * req/resp, msg, intra-msg, status).
39 * - Traversal helpers that iterate over all registered callbacks while holding
40 * a @c std::recursive_mutex, with early-exit support via @c ignore_called().
41 *
42 * @par AbstractFactory<FilterT>
43 * A map-based factory keyed on @c FilterT (typically @c std::string topic name)
44 * that creates and caches @c AbstractObject<FilterT> instances. Objects are stored
45 * as @c std::weak_ptr so they are automatically destroyed when no @c NodeImpl holds
46 * a reference.
47 *
48 * @par Usage Model
49 * @code
50 * // All Publisher<T> nodes on "dds://my_topic" share one AbstractObject:
51 * auto obj = factory.get_object<MyObject>("dds://my_topic");
52 * obj->add_impl(impl_ptr);
53 * obj->register_msg_callback(impl_ptr, [](NodeImpl*, const MsgCallback& cb) {
54 * cb(bytes);
55 * });
56 *
57 * // When a message arrives, dispatch to all registered impls:
58 * obj->traverse_msg_callback([&](NodeImpl* impl, const MsgCallback& cb) {
59 * cb(msg_data);
60 * });
61 * @endcode
62 *
63 * @note All public methods on @c AbstractObject are thread-safe; they acquire
64 * the internal @c std::recursive_mutex before modifying or reading state.
65 *
66 * @tparam FilterT The key type used to look up objects in the factory
67 * (e.g. @c std::string for topic URLs).
68 */
69
70#pragma once
71
72#include <algorithm>
73#include <atomic>
74#include <map>
75#include <memory>
76#include <mutex>
77#include <string>
78#include <unordered_map>
79#include <unordered_set>
80#include <utility>
81
82#include "../base/logger.h"
83#include "./node_impl.h"
84
85namespace vlink {
86
87/**
88 * @class AbstractObject
89 * @brief Per-topic registry of @c NodeImpl instances and their associated callbacks.
90 *
91 * @details
92 * Maintains an unordered set of active @c NodeImpl* pointers and six separate
93 * callback maps (server-connect, subscriber-connect, req/resp, msg, intra-msg,
94 * status). All mutations and traversals are protected by a @c std::recursive_mutex
95 * to allow safe use from multiple threads.
96 *
97 * @tparam FilterT Key type used by the owning @c AbstractFactory to look up this
98 * object (typically a topic URL string).
99 */
100template <typename FilterT>
102 public:
103 using ImplList = std::unordered_set<NodeImpl*>; ///< Set of registered impl pointers.
104
105 using ConnectCallbackMap = std::unordered_map<NodeImpl*, NodeImpl::ConnectCallback>; ///< Per-impl connect callbacks.
107 std::unordered_map<NodeImpl*, NodeImpl::ReqRespCallback>; ///< Per-impl req/resp callbacks.
108 using MsgCallbackMap = std::unordered_map<NodeImpl*, NodeImpl::MsgCallback>; ///< Per-impl message callbacks.
110 std::unordered_map<NodeImpl*, NodeImpl::IntraMsgCallback>; ///< Per-impl intra-msg callbacks.
111 using StatusCallbackMap = std::unordered_map<NodeImpl*, NodeImpl::StatusCallback>; ///< Per-impl status callbacks.
112
114 std::function<void(NodeImpl*, const NodeImpl::ConnectCallback&)>; ///< Traversal visitor for connect callbacks.
116 std::function<void(NodeImpl*, const NodeImpl::ReqRespCallback&)>; ///< Traversal visitor for req/resp callbacks.
118 std::function<void(NodeImpl*, const NodeImpl::MsgCallback&)>; ///< Traversal visitor for message callbacks.
119 using FindIntraMsgCallback = std::function<void(
120 NodeImpl*, const NodeImpl::IntraMsgCallback&)>; ///< Traversal visitor for intra-msg callbacks.
122 std::function<void(NodeImpl*, const NodeImpl::StatusCallback&)>; ///< Traversal visitor for status callbacks.
123
124 /**
125 * @brief Registers a @c NodeImpl instance with this topic object.
126 *
127 * @details
128 * Inserts @p impl into the active implementation set and updates the cached
129 * @c first_impl_ pointer. Thread-safe.
130 *
131 * @param impl Non-owning pointer to the @c NodeImpl to register.
132 * @return @c true if @p impl was newly inserted; @c false if it was already
133 * present.
134 */
135 bool add_impl(NodeImpl* impl);
136
137 /**
138 * @brief Unregisters a @c NodeImpl instance and removes all its callbacks.
139 *
140 * @details
141 * Erases @p impl from the active set and removes its entries from all six
142 * callback maps. Thread-safe.
143 *
144 * @param impl Non-owning pointer to the @c NodeImpl to unregister.
145 * @return @c true if @p impl was found and removed; @c false otherwise.
146 */
147 bool remove_impl(NodeImpl* impl);
148
149 /**
150 * @brief Returns the most recently added @c NodeImpl pointer.
151 *
152 * @details
153 * The "first" impl is set to the last value passed to @c add_impl(). If the
154 * current first impl is removed via @c remove_impl(), it is reassigned to an
155 * arbitrary remaining impl, or @c nullptr if the set is empty.
156 *
157 * @return Pointer to the most recently registered @c NodeImpl, or @c nullptr.
158 */
159 [[nodiscard]] NodeImpl* get_first_impl() const;
160
161 /**
162 * @brief Returns @c true if @p impl is currently registered with this object.
163 *
164 * @param impl Pointer to check.
165 * @return @c true if @p impl is in the active set; @c false otherwise.
166 */
167 [[nodiscard]] bool is_contains_impl(NodeImpl* impl) const;
168
169 /**
170 * @brief Returns @c true if at least one @c NodeImpl is currently registered.
171 *
172 * @return @c true if the active implementation set is non-empty.
173 */
174 [[nodiscard]] bool has_impl() const;
175
176 /**
177 * @brief Registers a server-side connect-change callback for @p impl.
178 *
179 * @details
180 * Stores the callback in the server-connect map keyed by @p impl. A subsequent
181 * call to @c traverse_server_connect_callback() will invoke this callback.
182 *
183 * @param impl The @c NodeImpl this callback belongs to.
184 * @param callback Callable @c void(bool) invoked on client-presence changes.
185 * @return @c true if the callback was inserted; @c false if one was
186 * already registered for @p impl.
187 */
189
190 /**
191 * @brief Registers a subscriber-side connect-change callback for @p impl.
192 *
193 * @details
194 * Stores the callback in the subscriber-connect map keyed by @p impl.
195 *
196 * @param impl The @c NodeImpl this callback belongs to.
197 * @param callback Callable @c void(bool) invoked on subscriber-presence changes.
198 * @return @c true if the callback was inserted; @c false if already set.
199 */
201
202 /**
203 * @brief Registers a request/response callback for @p impl.
204 *
205 * @param impl The @c NodeImpl this callback belongs to.
206 * @param callback Callable invoked for each incoming RPC request.
207 * @return @c true if inserted; @c false if already registered.
208 */
210
211 /**
212 * @brief Registers a serialised-message receive callback for @p impl.
213 *
214 * @param impl The @c NodeImpl this callback belongs to.
215 * @param callback Callable @c void(const Bytes&) invoked on each message.
216 * @return @c true if inserted; @c false if already registered.
217 */
219
220 /**
221 * @brief Registers an in-process zero-copy message callback for @p impl.
222 *
223 * @param impl The @c NodeImpl this callback belongs to.
224 * @param callback Callable @c void(const IntraData&) invoked on each intra message.
225 * @return @c true if inserted; @c false if already registered.
226 */
228
229 /**
230 * @brief Registers a transport-status callback for @p impl.
231 *
232 * @param impl The @c NodeImpl this callback belongs to.
233 * @param callback Callable invoked on transport status changes.
234 * @return @c true if inserted; @c false if already registered.
235 */
237
238 /**
239 * @brief Returns @c true if no server-connect callbacks are registered.
240 *
241 * @return @c true when the server-connect callback map is empty.
242 */
243 [[nodiscard]] bool server_connect_map_is_empty() const;
244
245 /**
246 * @brief Returns @c true if no subscriber-connect callbacks are registered.
247 *
248 * @return @c true when the subscriber-connect callback map is empty.
249 */
250 [[nodiscard]] bool sub_connect_map_is_empty() const;
251
252 /**
253 * @brief Returns @c true if no request/response callbacks are registered.
254 *
255 * @return @c true when the req/resp callback map is empty.
256 */
257 [[nodiscard]] bool req_resp_map_is_empty() const;
258
259 /**
260 * @brief Returns @c true if no message callbacks are registered.
261 *
262 * @return @c true when the message callback map is empty.
263 */
264 [[nodiscard]] bool msg_map_is_empty() const;
265
266 /**
267 * @brief Returns @c true if no status callbacks are registered.
268 *
269 * @return @c true when the status callback map is empty.
270 */
271 [[nodiscard]] bool status_map_is_empty() const;
272
273 /**
274 * @brief Invokes @p callback for each registered server-connect callback.
275 *
276 * @details
277 * Iterates over all entries in the server-connect map while holding the mutex.
278 * The visitor receives the @c NodeImpl* and the stored @c ConnectCallback.
279 * Iteration can be short-circuited by calling @c ignore_called() inside the
280 * visitor.
281 *
282 * @param callback Visitor called as @c callback(impl, stored_callback) for each entry.
283 */
285
286 /**
287 * @brief Invokes @p callback for each registered subscriber-connect callback.
288 *
289 * @param callback Visitor called for each subscriber-connect entry.
290 */
292
293 /**
294 * @brief Invokes @p callback for each registered request/response callback.
295 *
296 * @param callback Visitor called for each req/resp entry.
297 */
299
300 /**
301 * @brief Invokes @p callback for each registered serialised-message callback.
302 *
303 * @param callback Visitor called for each message callback entry.
304 */
305 void traverse_msg_callback(const FindMsgCallback& callback);
306
307 /**
308 * @brief Invokes @p callback for each registered in-process message callback.
309 *
310 * @param callback Visitor called for each intra-msg entry.
311 */
313
314 /**
315 * @brief Invokes @p callback for each registered status callback.
316 *
317 * @param callback Visitor called for each status entry.
318 */
319 void traverse_status_callback(const FindStatusCallback& callback);
320
321 protected:
323
324 ~AbstractObject() override;
325
326 [[nodiscard]] bool has_called() const;
327
328 void ignore_called();
329
330 private:
331 template <typename CallbackMapT, typename CallbackT>
332 void traverse_internal_callback(const CallbackMapT& map, const CallbackT& callback);
333
334 bool has_called_{false};
335 bool ignore_called_{false};
336 ImplList impl_list_;
337 mutable std::recursive_mutex mtx_;
338 ConnectCallbackMap server_connect_callback_map_;
339 ConnectCallbackMap sub_connect_callback_map_;
340 ReqRespCallbackMap req_resp_callback_map_;
341 MsgCallbackMap msg_callback_map_;
342 IntraMsgCallbackMap intra_msg_callback_map_;
343 StatusCallbackMap status_callback_map_;
344 NodeImpl* first_impl_{nullptr};
345
347};
348
349/**
350 * @class AbstractFactory
351 * @brief Topic-keyed factory that creates and caches @c AbstractObject instances.
352 *
353 * @details
354 * Maintains a @c std::map<FilterT, std::weak_ptr<Object>> so that multiple
355 * @c NodeImpl instances sharing the same topic key reuse the same
356 * @c AbstractObject. Objects are reference-counted: the entry is automatically
357 * removed from the map when the last @c shared_ptr to the object is destroyed,
358 * preventing stale entries from accumulating.
359 *
360 * @note This class is not copy-constructible or copy-assignable.
361 *
362 * @tparam FilterT The key type used to identify topics (e.g. @c std::string).
363 */
364template <typename FilterT>
366 using Object = AbstractObject<FilterT>;
367 using Map = std::map<FilterT, std::weak_ptr<Object>>;
368 using Set = std::unordered_set<Object*>;
369
370 public:
371 /**
372 * @brief Returns @c true if @p ptr is a live object tracked by this factory.
373 *
374 * @details
375 * Checks the internal set of raw pointers to verify that @p ptr points to an
376 * object that was created by this factory and has not yet been destroyed.
377 *
378 * @param ptr Raw pointer to check.
379 * @return @c true if the object is currently alive; @c false otherwise.
380 */
381 [[nodiscard]] bool has_object(Object* ptr) const;
382
383 /**
384 * @brief Retrieves or creates the @c AbstractObject for the given @p filter key.
385 *
386 * @details
387 * If an object already exists for @p filter and is still alive (the @c weak_ptr
388 * is valid), the existing @c shared_ptr is returned. Otherwise a new
389 * @c ObjectT is heap-allocated, wrapped in a @c shared_ptr with a custom deleter
390 * that removes the entry from the internal map on destruction, and cached.
391 *
392 * @tparam ObjectT Concrete subclass of @c AbstractObject<FilterT> to instantiate.
393 *
394 * @param filter Topic key used to look up or create the object.
395 * @return A @c shared_ptr<ObjectT> for the given @p filter.
396 */
397 template <typename ObjectT>
398 [[nodiscard]] std::shared_ptr<ObjectT> get_object(const FilterT& filter);
399
400 protected:
401 /**
402 * @brief Protected default constructor.
403 */
405
406 /**
407 * @brief Protected virtual destructor.
408 */
410
411 private:
412 Set set_;
413 Map map_;
414 mutable std::mutex mtx_;
415
417};
418
419////////////////////////////////////////////////////////////////
420/// Details
421////////////////////////////////////////////////////////////////
422
423template <typename FilterT>
425 std::lock_guard lock(mtx_);
426
427 first_impl_ = impl;
428
429 return impl_list_.emplace(impl).second;
430}
431
432template <typename FilterT>
434 std::lock_guard lock(mtx_);
435
436 if VUNLIKELY (impl_list_.erase(impl) == 0) {
437 return false;
438 }
439
440 if (first_impl_ == impl) {
441 first_impl_ = impl_list_.empty() ? nullptr : *impl_list_.begin();
442 }
443
444 server_connect_callback_map_.erase(impl);
445 sub_connect_callback_map_.erase(impl);
446 req_resp_callback_map_.erase(impl);
447 msg_callback_map_.erase(impl);
448 intra_msg_callback_map_.erase(impl);
449 status_callback_map_.erase(impl);
450 return true;
451}
452
453template <typename FilterT>
455 std::lock_guard lock(mtx_);
456 return first_impl_;
457}
458
459template <typename FilterT>
461 std::lock_guard lock(mtx_);
462 return impl_list_.find(impl) != impl_list_.end();
463}
464
465template <typename FilterT>
467 std::lock_guard lock(mtx_);
468 return !impl_list_.empty();
469}
470
471template <typename FilterT>
473 NodeImpl::ConnectCallback&& callback) {
474 std::lock_guard lock(this->mtx_);
475 return server_connect_callback_map_.try_emplace(impl, std::move(callback)).second;
476}
477
478template <typename FilterT>
480 NodeImpl::ConnectCallback&& callback) {
481 std::lock_guard lock(this->mtx_);
482 return sub_connect_callback_map_.try_emplace(impl, std::move(callback)).second;
483}
484
485template <typename FilterT>
487 std::lock_guard lock(this->mtx_);
488 return req_resp_callback_map_.try_emplace(impl, std::move(callback)).second;
489}
490
491template <typename FilterT>
493 std::lock_guard lock(this->mtx_);
494 return msg_callback_map_.try_emplace(impl, std::move(callback)).second;
495}
496
497template <typename FilterT>
499 NodeImpl::IntraMsgCallback&& callback) {
500 std::lock_guard lock(this->mtx_);
501 return intra_msg_callback_map_.try_emplace(impl, std::move(callback)).second;
502}
503
504template <typename FilterT>
506 std::lock_guard lock(this->mtx_);
507 return status_callback_map_.try_emplace(impl, std::move(callback)).second;
508}
509
510template <typename FilterT>
512 std::lock_guard lock(this->mtx_);
513 return server_connect_callback_map_.empty();
514}
515
516template <typename FilterT>
518 std::lock_guard lock(this->mtx_);
519 return sub_connect_callback_map_.empty();
520}
521
522template <typename FilterT>
524 std::lock_guard lock(this->mtx_);
525 return req_resp_callback_map_.empty();
526}
527
528template <typename FilterT>
530 std::lock_guard lock(this->mtx_);
531 return msg_callback_map_.empty();
532}
533
534template <typename FilterT>
536 std::lock_guard lock(this->mtx_);
537 return status_callback_map_.empty();
538}
539
540template <typename FilterT>
542 this->traverse_internal_callback(server_connect_callback_map_, callback);
543}
544
545template <typename FilterT>
547 this->traverse_internal_callback(sub_connect_callback_map_, callback);
548}
549
550template <typename FilterT>
552 this->traverse_internal_callback(req_resp_callback_map_, callback);
553}
554
555template <typename FilterT>
557 this->traverse_internal_callback(msg_callback_map_, callback);
558}
559
560template <typename FilterT>
562 this->traverse_internal_callback(intra_msg_callback_map_, callback);
563}
564
565template <typename FilterT>
567 this->traverse_internal_callback(status_callback_map_, callback);
568}
569
570template <typename FilterT>
572
573template <typename FilterT>
575
576template <typename FilterT>
578 return has_called_;
579}
580
581template <typename FilterT>
583 ignore_called_ = true;
584}
585
586template <typename FilterT>
587template <typename CallbackMapT, typename CallbackT>
588inline void AbstractObject<FilterT>::traverse_internal_callback(const CallbackMapT& map, const CallbackT& callback) {
589 std::lock_guard lock(mtx_);
590
591 this->ignore_called_ = false;
592 this->has_called_ = false;
593
594 for (const auto& [impl, target_callback] : map) {
595 callback(impl, target_callback);
596
597 if VUNLIKELY (this->ignore_called_) {
598 this->ignore_called_ = false;
599 } else {
600 this->has_called_ = true;
601 }
602 }
603}
604
605template <typename FilterT>
606inline bool AbstractFactory<FilterT>::has_object(Object* ptr) const {
607 std::lock_guard lock(mtx_);
608 return set_.count(ptr) > 0;
609}
610
611template <typename FilterT>
612template <typename ObjectT>
613inline std::shared_ptr<ObjectT> AbstractFactory<FilterT>::get_object(const FilterT& filter) {
614 static_assert(std::is_base_of_v<Object, ObjectT>, "ObjectT must be derived from AbstractObject");
615 std::shared_ptr<ObjectT> obj;
616 {
617 std::unique_lock lock(mtx_);
618
619 const auto& deleter = [this, filter](ObjectT* obj) {
620 {
621 std::lock_guard lock(mtx_);
622 set_.erase(obj);
623 map_.erase(filter);
624 }
625
626 delete obj;
627 };
628
629 auto iter = map_.find(filter);
630 if (iter == map_.end()) {
631 lock.unlock();
632 auto* obj_ptr = new ObjectT(filter);
633 lock.lock();
634
635 auto [it, inserted] = map_.try_emplace(filter, std::weak_ptr<Object>());
636 if (inserted) {
637 obj = std::shared_ptr<ObjectT>(obj_ptr, deleter);
638 it->second = obj;
639 set_.emplace(obj_ptr);
640 } else {
641 // Another thread inserted while we were unlocked; discard our object.
642 delete obj_ptr;
643 obj = std::static_pointer_cast<ObjectT>(it->second.lock());
644 }
645 } else {
646 obj = std::static_pointer_cast<ObjectT>(iter->second.lock());
647 }
648 return obj;
649 }
650}
651
652template <typename FilterT>
654
655template <typename FilterT>
657
658} // namespace vlink
Global singleton logger with three output styles and pluggable backends.
#define VUNLIKELY(...)
Shorthand alias for VLINK_UNLIKELY. Hints that the expression is unlikely true.
Definition macros.h:302
#define VLINK_DISALLOW_COPY_AND_ASSIGN(classname)
Deletes the copy constructor and copy-assignment operator of classname.
Definition macros.h:184
Abstract transport node base classes used by all VLink node implementations.