38template <
typename ValueT, SecurityType SecT>
41 return std::make_unique<Getter<ValueT, SecT>>(url_str, type);
44template <
typename ValueT, SecurityType SecT>
47 return std::make_shared<Getter<ValueT, SecT>>(url_str, type);
50template <
typename ValueT, SecurityType SecT>
51template <
typename ConfT,
typename>
53 static_assert(ConfT::get_allow_impl_type() &
kImplType,
"Conf does not support getter mode.");
56 VLOG_F(conf,
" getter configuration is invalid or could not be parsed.");
60 this->
impl_ = conf.create_getter();
63 VLOG_F(conf,
" getter implementation not available for this transport.");
67 this->
impl_->transport_type = conf.get_transport_type();
69 this->
impl_->schema_type = Serializer::get_schema_type<kValueType, ValueT>();
72 if constexpr (std::is_same_v<ConfT, Url>) {
73 this->
impl_->url = conf.get_str();
77 this->
impl_->is_security_type =
true;
86template <
typename ValueT, SecurityType SecT>
88 :
Getter<ValueT, SecT>(
Url(url_str), type) {}
90template <
typename ValueT, SecurityType SecT>
92 std::lock_guard lock(mtx_);
96template <
typename ValueT, SecurityType SecT>
99 VLOG_W(
"Getter: Timeout value is 0, using infinite wait instead.");
103 std::unique_lock lock(mtx_);
105 this->
impl_->reset_interrupted();
107 if (value_.has_value()) {
111 has_value_notification_ =
false;
113 auto predicate = [
this]() ->
bool {
return has_value_notification_ || this->
impl_->is_interrupted(); };
115 if (timeout.count() < 0) {
116 cv_.wait(lock, std::move(predicate));
117 return !this->
impl_->is_interrupted();
119 return cv_.wait_for(lock, timeout, std::move(predicate)) && !this->
impl_->is_interrupted();
123template <
typename ValueT, SecurityType SecT>
126 VLOG_F(
"Getter has already been listened.");
129 callback_ = std::move(callback);
136template <
typename ValueT, SecurityType SecT>
138 std::lock_guard lock(mtx_);
139 change_reporting_ = enable;
142template <
typename ValueT, SecurityType SecT>
144 this->
impl_->set_manual_unloan(manual_unloan);
148template <
typename ValueT, SecurityType SecT>
150 this->
impl_->set_latency_and_lost_enabled(enable);
153template <
typename ValueT, SecurityType SecT>
155 return this->
impl_->is_latency_and_lost_enabled();
158template <
typename ValueT, SecurityType SecT>
160 return this->
impl_->get_latency();
163template <
typename ValueT, SecurityType SecT>
165 return this->
impl_->get_lost();
168template <
typename ValueT, SecurityType SecT>
170 std::lock_guard lock(mtx_);
171 return change_reporting_;
174template <
typename ValueT, SecurityType SecT>
180 listen_bytes([
this](
const Bytes& data) {
181#ifndef VLINK_DISABLE_PROFILER
186 std::lock_guard lock(mtx_);
187 if (change_reporting_) {
188 if (value_.has_value() && last_cache_ == data) {
196 if constexpr (std::is_same_v<ValueT, Bytes>) {
202 std::lock_guard lock(mtx_);
204 has_value_notification_ =
true;
206 value_.emplace(data);
212 thread_local auto value = this->
template get_default_value<ValueT>();
215 VLOG_T(
"Getter deserialize failed, url: ", this->impl_->url,
".");
224 std::lock_guard lock(mtx_);
226 has_value_notification_ =
true;
228 value_.emplace(value);
238template <
typename ValueT, SecurityType SecT>
245template <
typename ValueT, SecurityType SecT>
248 this->
impl_->deinit_ext();
250 this->
impl_->init_ext();
256template <
typename ValueT, SecurityType SecT>
262 this->impl_->listen([
this, callback = std::move(callback)](
const Bytes& data) {
266 if VUNLIKELY (!this->security_->decrypt(data, sec_data)) {
267 VLOG_T(
"Getter decrypt failed, url: ", this->impl_->url,
".");
275 this->invoke_callback(callback, data);
Versatile 128-byte byte buffer with SBO, five ownership modes and compression helpers.
定义 bytes.h:113
RAII scope guard that brackets a CpuProfiler active interval.
定义 cpu_profiler_guard.h:67
Getter(const ConfT &conf, InitType type=InitType::kWithInit)
Constructs a getter from a typed transport configuration object.
定义 getter-inl.h:52
bool init() override
Initialises the getter and registers the internal delivery callback.
定义 getter-inl.h:175
void set_change_reporting(bool enable)
Enables or disables change-reporting (suppress duplicate values).
定义 getter-inl.h:137
std::shared_ptr< Getter< ValueT, SecT > > SharedPtr
Shared-pointer alias.
定义 getter.h:111
int64_t get_latency() const
Returns the most recently measured end-to-end value latency.
定义 getter-inl.h:159
std::optional< ValueT > get() const
Returns the latest cached value, if one has been received.
定义 getter-inl.h:91
bool wait_for_value(std::chrono::milliseconds timeout=Timeout::kDefaultInterval)
Blocks until a value is received or the timeout expires.
定义 getter-inl.h:97
void set_manual_unloan(bool manual_unloan) override
Enables or disables manual-unloan mode for zero-copy receives.
定义 getter-inl.h:143
SampleLostInfo get_lost() const
Returns cumulative sample delivery statistics.
定义 getter-inl.h:164
bool get_change_reporting() const
Returns true if change-reporting mode is currently active.
定义 getter-inl.h:169
static SharedPtr create_shared(const std::string &url_str, InitType type=InitType::kWithInit)
Creates a Getter on the heap wrapped in a shared_ptr.
定义 getter-inl.h:45
bool listen(MsgCallback &&callback)
Registers a callback invoked whenever a new value arrives.
定义 getter-inl.h:124
void mark_as_subscriber()
Changes this getter's role to kSubscriber (event-receiver).
定义 getter-inl.h:246
void interrupt() override
Interrupts any blocking wait_for_value() call.
定义 getter-inl.h:239
std::unique_ptr< Getter< ValueT, SecT > > UniquePtr
Unique-pointer alias.
定义 getter.h:108
std::function< void(const ValueT &)> MsgCallback
User-facing callback type for value-change notifications.
定义 getter.h:114
static UniquePtr create_unique(const std::string &url_str, InitType type=InitType::kWithInit)
Creates a Getter on the heap wrapped in a unique_ptr.
定义 getter-inl.h:39
bool is_latency_and_lost_enabled() const
Returns true if latency and sample-loss tracking is active.
定义 getter-inl.h:154
void set_latency_and_lost_enabled(bool enable)
Enables or disables per-value latency and sample-loss tracking.
定义 getter-inl.h:149
static constexpr ImplType kImplType
Node role identifier (kGetter).
定义 getter.h:117
std::function< void(const Bytes &)> MsgCallback
Callback delivering a raw serialised message to a SubscriberImpl or GetterImpl.
定义 node_impl.h:177
virtual void interrupt()
Unblocks any active blocking wait on this node.
定义 node-inl.h:142
std::unique_ptr< GetterImpl > impl_
定义 node.h:558
void invoke_callback(const CallbackT &callback, ArgsT &&... args)
定义 node-inl.h:345
virtual bool init()
Initialises the node and its transport back-end.
定义 node-inl.h:39
bool is_manual_unloan_
定义 node.h:563
std::atomic_bool has_inited_
定义 node.h:556
void enable_security()
定义 node-inl.h:332
RAII guard that automatically calls CpuProfiler::begin() and CpuProfiler::end().
Type-safe field-model reader for VLink topics.
Global singleton logger with three output styles and pluggable backends.
#define VLOG_F(...)
定义 logger.h:856
#define VLOG_W(...)
定义 logger.h:852
#define VLOG_T(...)
定义 logger.h:846
#define VUNLIKELY(...)
Shorthand alias for VLINK_UNLIKELY. Hints that the expression is unlikely true.
定义 macros.h:302
bool deserialize(const Bytes &src, T &des, TransportType transport)
Deserializes src bytes into des with explicit type and transport hints.
定义 serializer-inl.h:488
constexpr bool is_cdr_type() noexcept
Returns true if T is a FastDDS CDR-serialisable type.
定义 serializer-inl.h:648
std::string get_serialized_type() noexcept
Returns the serialisation type name string for T with explicit TypeT.
定义 serializer-inl.h:229
InitType
Controls whether a node is initialised immediately at construction.
定义 types.h:132
@ kWithInit
Initialise immediately in the constructor.
定义 types.h:134
@ kGet
Field value read by a Getter node.
定义 types.h:171
@ kSubscriber
Event subscriber (receive broadcast).
定义 types.h:94
@ kWithSecurity
Encrypted and authenticated transport.
定义 types.h:150
Compile-time type-detection and serialisation utilities for VLink messages.
Cumulative sample delivery statistics for a subscriber or getter.
定义 types.h:217
static constexpr std::chrono::milliseconds kInfinite
Wait indefinitely (negative timeout).
定义 types.h:203
URL-based Conf dispatcher that routes to the correct transport backend.
定义 url.h:161
URL-based transport configuration dispatcher for VLink nodes.