VLink 2.0.0
A high-performance communication middleware
载入中...
搜索中...
未找到
getter-inl.h
浏览该文件的文档.
1/*
2 * Copyright (C) 2026 by Thun Lu. All rights reserved.
3 * Author: Thun Lu <thun.lu@zohomail.cn>
4 * Repo: https://github.com/thun-res/vlink
5 * _ __ __ _ __
6 * | | / / / / (_) ____ / /__
7 * | | / / / / / / / __ \ / //_/
8 * | |/ / / /___ / / / / / / / ,<
9 * |___/ /_____/ /_/ /_/ /_/ /_/|_|
10 *
11 * Licensed under the Apache License, Version 2.0 (the "License");
12 * you may not use this file except in compliance with the License.
13 * You may obtain a copy of the License at
14 *
15 * http://www.apache.org/licenses/LICENSE-2.0
16 *
17 * Unless required by applicable law or agreed to in writing, software
18 * distributed under the License is distributed on an "AS IS" BASIS,
19 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20 * See the License for the specific language governing permissions and
21 * limitations under the License.
22 */
23
24#pragma once
25
26#include <memory>
27#include <string>
28#include <utility>
29
31#include "../base/logger.h"
32#include "../getter.h"
33#include "../impl/url.h"
34#include "../serializer.h"
35
36namespace vlink {
37
38template <typename ValueT, SecurityType SecT>
40 InitType type) {
41 return std::make_unique<Getter<ValueT, SecT>>(url_str, type);
42}
43
44template <typename ValueT, SecurityType SecT>
46 InitType type) {
47 return std::make_shared<Getter<ValueT, SecT>>(url_str, type);
48}
49
50template <typename ValueT, SecurityType SecT>
51template <typename ConfT, typename>
52inline Getter<ValueT, SecT>::Getter(const ConfT& conf, InitType type) {
53 static_assert(ConfT::get_allow_impl_type() & kImplType, "Conf does not support getter mode.");
54
55 if VUNLIKELY (!conf.parse(kImplType) || !conf.is_valid()) {
56 VLOG_F(conf, " getter configuration is invalid or could not be parsed.");
57 return;
58 }
59
60 this->impl_ = conf.create_getter();
61
62 if VUNLIKELY (!this->impl_) {
63 VLOG_F(conf, " getter implementation not available for this transport.");
64 return;
65 }
66
67 this->impl_->transport_type = conf.get_transport_type();
69 this->impl_->schema_type = Serializer::get_schema_type<kValueType, ValueT>();
70 this->impl_->is_cdr_type = Serializer::is_cdr_type<ValueT>();
71
72 if constexpr (std::is_same_v<ConfT, Url>) {
73 this->impl_->url = conf.get_str();
74 }
75
76 if constexpr (SecT == SecurityType::kWithSecurity) {
77 this->impl_->is_security_type = true;
78 this->enable_security();
79 }
80
81 if (type == InitType::kWithInit) {
82 this->init(); // NOLINT(clang-analyzer-optin.cplusplus.VirtualCall)
83 }
84}
85
86template <typename ValueT, SecurityType SecT>
87inline Getter<ValueT, SecT>::Getter(const std::string& url_str, InitType type)
88 : Getter<ValueT, SecT>(Url(url_str), type) {}
89
90template <typename ValueT, SecurityType SecT>
91inline std::optional<ValueT> Getter<ValueT, SecT>::get() const {
92 std::lock_guard lock(mtx_);
93 return value_;
94}
95
96template <typename ValueT, SecurityType SecT>
97inline bool Getter<ValueT, SecT>::wait_for_value(std::chrono::milliseconds timeout) {
98 if VUNLIKELY (timeout.count() == 0) {
99 VLOG_W("Getter: Timeout value is 0, using infinite wait instead.");
100 timeout = Timeout::kInfinite;
101 }
102
103 std::unique_lock lock(mtx_);
104
105 this->impl_->reset_interrupted();
106
107 if (value_.has_value()) {
108 return true;
109 }
110
111 has_value_notification_ = false;
112
113 auto predicate = [this]() -> bool { return has_value_notification_ || this->impl_->is_interrupted(); };
114
115 if (timeout.count() < 0) {
116 cv_.wait(lock, std::move(predicate));
117 return !this->impl_->is_interrupted();
118 } else {
119 return cv_.wait_for(lock, timeout, std::move(predicate)) && !this->impl_->is_interrupted();
120 }
121}
122
123template <typename ValueT, SecurityType SecT>
125 if VUNLIKELY (this->impl_->is_listened) {
126 VLOG_F("Getter has already been listened.");
127 }
128
129 callback_ = std::move(callback);
130
131 this->impl_->is_listened = true;
132
133 return true;
134}
135
136template <typename ValueT, SecurityType SecT>
138 std::lock_guard lock(mtx_);
139 change_reporting_ = enable;
141
142template <typename ValueT, SecurityType SecT>
143inline void Getter<ValueT, SecT>::set_manual_unloan(bool manual_unloan) {
144 this->impl_->set_manual_unloan(manual_unloan);
145 this->is_manual_unloan_ = manual_unloan;
146}
147
148template <typename ValueT, SecurityType SecT>
150 this->impl_->set_latency_and_lost_enabled(enable);
152
153template <typename ValueT, SecurityType SecT>
155 return this->impl_->is_latency_and_lost_enabled();
156}
157
158template <typename ValueT, SecurityType SecT>
159inline int64_t Getter<ValueT, SecT>::get_latency() const {
160 return this->impl_->get_latency();
161}
162
163template <typename ValueT, SecurityType SecT>
165 return this->impl_->get_lost();
166}
167
168template <typename ValueT, SecurityType SecT>
170 std::lock_guard lock(mtx_);
171 return change_reporting_;
172}
173
174template <typename ValueT, SecurityType SecT>
177 return false;
178 }
179
180 listen_bytes([this](const Bytes& data) {
181#ifndef VLINK_DISABLE_PROFILER
182 CpuProfilerGuard profiler_guard(this->impl_->profiler.get());
183#endif
184
186 std::lock_guard lock(mtx_);
187 if (change_reporting_) {
188 if (value_.has_value() && last_cache_ == data) {
189 return;
190 }
191
192 last_cache_ = data;
193 }
194 }
195
196 if constexpr (std::is_same_v<ValueT, Bytes>) {
197 if (callback_) {
198 callback_(data);
199 }
200
201 {
202 std::lock_guard lock(mtx_);
204 has_value_notification_ = true;
205
206 value_.emplace(data);
207 }
208
209 cv_.notify_all();
210
211 } else {
212 thread_local auto value = this->template get_default_value<ValueT>();
213
214 if VUNLIKELY (!Serializer::deserialize<kValueType>(data, value, this->impl_->transport_type)) {
215 VLOG_T("Getter deserialize failed, url: ", this->impl_->url, ".");
216 return;
217 }
218
219 if (callback_) {
220 callback_(value);
221 }
222
224 std::lock_guard lock(mtx_);
225
226 has_value_notification_ = true;
227
228 value_.emplace(value);
229 }
231 cv_.notify_all();
232 }
233 });
234
235 return true;
236}
238template <typename ValueT, SecurityType SecT>
241
242 cv_.notify_all();
243}
245template <typename ValueT, SecurityType SecT>
247 if VUNLIKELY (this->has_inited_) {
248 this->impl_->deinit_ext();
249 this->impl_->impl_type = kSubscriber;
250 this->impl_->init_ext();
251 } else {
252 this->impl_->impl_type = kSubscriber;
253 }
254}
255
256template <typename ValueT, SecurityType SecT>
257inline void Getter<ValueT, SecT>::listen_bytes(NodeImpl::MsgCallback&& callback) {
258 if (!this->has_inited_) {
259 return;
260 }
261
262 this->impl_->listen([this, callback = std::move(callback)](const Bytes& data) {
263 if constexpr (SecT == SecurityType::kWithSecurity) {
264 Bytes sec_data;
265
266 if VUNLIKELY (!this->security_->decrypt(data, sec_data)) {
267 VLOG_T("Getter decrypt failed, url: ", this->impl_->url, ".");
268 return;
269 }
271 this->invoke_callback(callback, sec_data);
272 } else {
273 this->impl_->try_record(ActionType::kGet, data);
274
275 this->invoke_callback(callback, data);
276 }
277 });
278}
280} // namespace vlink
RAII guard that automatically calls CpuProfiler::begin() and CpuProfiler::end().
Type-safe field-model reader for VLink topics.
Global singleton logger with three output styles and pluggable backends.
#define VLOG_F(...)
定义 logger.h:856
#define VLOG_W(...)
定义 logger.h:852
#define VLOG_T(...)
定义 logger.h:846
#define VUNLIKELY(...)
Shorthand alias for VLINK_UNLIKELY. Hints that the expression is unlikely true.
定义 macros.h:302
Compile-time type-detection and serialisation utilities for VLink messages.
URL-based transport configuration dispatcher for VLink nodes.