VLink 2.0.0
A high-performance communication middleware
Loading...
Searching...
No Matches
subscriber-inl.h
Go to the documentation of this file.
1/*
2 * Copyright (C) 2026 by Thun Lu. All rights reserved.
3 * Author: Thun Lu <thun.lu@zohomail.cn>
4 * Repo: https://github.com/thun-res/vlink
5 * _ __ __ _ __
6 * | | / / / / (_) ____ / /__
7 * | | / / / / / / / __ \ / //_/
8 * | |/ / / /___ / / / / / / / ,<
9 * |___/ /_____/ /_/ /_/ /_/ /_/|_|
10 *
11 * Licensed under the Apache License, Version 2.0 (the "License");
12 * you may not use this file except in compliance with the License.
13 * You may obtain a copy of the License at
14 *
15 * http://www.apache.org/licenses/LICENSE-2.0
16 *
17 * Unless required by applicable law or agreed to in writing, software
18 * distributed under the License is distributed on an "AS IS" BASIS,
19 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20 * See the License for the specific language governing permissions and
21 * limitations under the License.
22 */
23
24#pragma once
25
26#include <memory>
27#include <string>
28#include <utility>
29
31#include "../base/logger.h"
32#include "../impl/url.h"
33#include "../serializer.h"
34#include "../subscriber.h"
35
36namespace vlink {
37
38template <typename MsgT, SecurityType SecT>
40 InitType type) {
41 return std::make_unique<Subscriber<MsgT, SecT>>(url_str, type);
42}
43
44template <typename MsgT, SecurityType SecT>
46 InitType type) {
47 return std::make_shared<Subscriber<MsgT, SecT>>(url_str, type);
48}
49
50template <typename MsgT, SecurityType SecT>
51template <typename ConfT, typename>
52inline Subscriber<MsgT, SecT>::Subscriber(const ConfT& conf, InitType type) {
53 static_assert(ConfT::get_allow_impl_type() & kImplType, "Conf does not support subscriber mode.");
54
55 if VUNLIKELY (!conf.parse(kImplType) || !conf.is_valid()) {
56 VLOG_F(conf, " subscriber configuration is invalid or could not be parsed.");
57 return;
58 }
59
60 this->impl_ = conf.create_subscriber();
61
62 if VUNLIKELY (!this->impl_) {
63 VLOG_F(conf, " subscriber implementation not available for this transport.");
64 return;
65 }
66
67 this->impl_->transport_type = conf.get_transport_type();
69 this->impl_->schema_type = Serializer::get_schema_type<kMsgType, MsgT>();
70 this->impl_->is_cdr_type = Serializer::is_cdr_type<MsgT>();
71
72 if constexpr (std::is_same_v<ConfT, Url>) {
73 this->impl_->url = conf.get_str();
74 }
75
76 if constexpr (SecT == SecurityType::kWithSecurity) {
77 this->impl_->is_security_type = true;
78 this->enable_security();
79 }
80
81 if (type == InitType::kWithInit) {
82 this->init(); // NOLINT(clang-analyzer-optin.cplusplus.VirtualCall)
83 }
84}
85
86template <typename MsgT, SecurityType SecT>
87inline Subscriber<MsgT, SecT>::Subscriber(const std::string& url_str, InitType type)
88 : Subscriber<MsgT, SecT>(Url(url_str), type) {}
89
90template <typename MsgT, SecurityType SecT>
92 if constexpr (Traits::IsSharedPtr<MsgT>()) {
93 if constexpr (std::is_base_of_v<IntraDataType, typename MsgT::element_type>) {
94 static_assert(SecT != SecurityType::kWithSecurity, "IntraData must without security.");
95
96 if (this->impl_->transport_type == TransportType::kIntra) {
97 return listen_intra(std::move(callback));
98 }
99 }
100 }
101
102 return listen_bytes([this, callback = std::move(callback)](const Bytes& data) {
103#ifndef VLINK_DISABLE_PROFILER
104 CpuProfilerGuard profiler_guard(this->impl_->profiler.get());
105#endif
106
107 if constexpr (std::is_same_v<MsgT, Bytes>) {
108 (void)this;
109
110 callback(data);
111 } else {
112 thread_local auto msg = this->template get_default_value<MsgT>();
113
114 if VUNLIKELY (!Serializer::deserialize<kMsgType>(data, msg, this->impl_->transport_type)) {
115 VLOG_T("Subscriber deserialize failed, url: ", this->impl_->url, ".");
116 return;
117 }
118
119 callback(msg);
120 }
121 });
122}
123
124template <typename MsgT, SecurityType SecT>
125inline void Subscriber<MsgT, SecT>::set_manual_unloan(bool manual_unloan) {
126 this->impl_->set_manual_unloan(manual_unloan);
127 this->is_manual_unloan_ = manual_unloan;
128}
129
130template <typename MsgT, SecurityType SecT>
132 this->impl_->set_latency_and_lost_enabled(enable);
133}
134
135template <typename MsgT, SecurityType SecT>
137 return this->impl_->is_latency_and_lost_enabled();
138}
139
140template <typename MsgT, SecurityType SecT>
142 return this->impl_->get_latency();
143}
144
145template <typename MsgT, SecurityType SecT>
147 return this->impl_->get_lost();
148}
149
150template <typename MsgT, SecurityType SecT>
152 if VUNLIKELY (this->has_inited_) {
153 this->impl_->deinit_ext();
154 this->impl_->impl_type = kGetter;
155 this->impl_->init_ext();
156 } else {
157 this->impl_->impl_type = kGetter;
158 }
159}
160
161template <typename MsgT, SecurityType SecT>
162inline bool Subscriber<MsgT, SecT>::listen_bytes(NodeImpl::MsgCallback&& callback) {
163 if VUNLIKELY (!this->has_inited_) {
164 VLOG_F("Subscriber::listen_bytes() called before init().");
165 }
166
167 if VUNLIKELY (this->impl_->is_listened) {
168 VLOG_F("Subscriber has already been listened, url: ", this->impl_->url, ".");
169 }
170
171 bool ret = this->impl_->listen([this, callback = std::move(callback)](const Bytes& data) {
172 if constexpr (SecT == SecurityType::kWithSecurity) {
173 Bytes sec_data;
174
175 if VUNLIKELY (!this->security_->decrypt(data, sec_data)) {
176 VLOG_T("Subscriber decrypt failed, url: ", this->impl_->url, ".");
177 return;
178 }
179
180 this->invoke_callback(callback, sec_data);
181 } else {
182 this->impl_->try_record(ActionType::kSubscribe, data);
183
184 this->invoke_callback(callback, data);
185 }
186 });
187
188 this->impl_->is_listened = ret;
189
190 return ret;
191}
192
193template <typename MsgT, SecurityType SecT>
194inline bool Subscriber<MsgT, SecT>::listen_intra(NodeImpl::IntraMsgCallback&& callback) {
195 if VUNLIKELY (!this->has_inited_) {
196 VLOG_F("Subscriber::listen_intra() called before init().");
197 }
199 if VUNLIKELY (this->impl_->is_listened) {
200 VLOG_F("Subscriber has already been listened, url: ", this->impl_->url, ".");
201 }
202
203 bool ret = this->impl_->listen([this, callback = std::move(callback)](const IntraData& intra_data) {
204#ifndef VLINK_DISABLE_PROFILER
205 CpuProfilerGuard profiler_guard(this->impl_->profiler.get());
206#endif
207
208 if constexpr (Traits::IsSharedPtr<MsgT>()) {
209#if defined(NDEBUG) || defined(__ANDROID__)
210 auto intra_msg = std::static_pointer_cast<typename MsgT::element_type>(intra_data);
211#else
212 auto intra_msg = std::dynamic_pointer_cast<typename MsgT::element_type>(intra_data);
213#endif
214
215 if VLIKELY (intra_msg) {
216 this->invoke_callback(callback, intra_msg);
217 } else {
218 VLOG_T("Subscriber get intra data failed, url: ", this->impl_->url, ".");
219 }
220 }
221 });
222
223 this->impl_->is_listened = ret;
224
225 return ret;
226}
227
228} // namespace vlink
RAII guard that automatically calls CpuProfiler::begin() and CpuProfiler::end().
Global singleton logger with three output styles and pluggable backends.
#define VLOG_F(...)
Definition logger.h:856
#define VLOG_T(...)
Definition logger.h:846
#define VUNLIKELY(...)
Shorthand alias for VLINK_UNLIKELY. Hints that the expression is unlikely true.
Definition macros.h:302
#define VLIKELY(...)
Shorthand alias for VLINK_LIKELY. Hints that the expression is likely true.
Definition macros.h:297
Compile-time type-detection and serialisation utilities for VLink messages.
Type-safe event-model subscriber for VLink topics.
URL-based transport configuration dispatcher for VLink nodes.