38template <
typename MsgT, SecurityType SecT>
41 return std::make_unique<Subscriber<MsgT, SecT>>(url_str, type);
44template <
typename MsgT, SecurityType SecT>
47 return std::make_shared<Subscriber<MsgT, SecT>>(url_str, type);
50template <
typename MsgT, SecurityType SecT>
51template <
typename ConfT,
typename>
53 static_assert(ConfT::get_allow_impl_type() &
kImplType,
"Conf does not support subscriber mode.");
56 VLOG_F(conf,
" subscriber configuration is invalid or could not be parsed.");
60 this->
impl_ = conf.create_subscriber();
63 VLOG_F(conf,
" subscriber implementation not available for this transport.");
67 this->
impl_->transport_type = conf.get_transport_type();
69 this->
impl_->schema_type = Serializer::get_schema_type<kMsgType, MsgT>();
72 if constexpr (std::is_same_v<ConfT, Url>) {
73 this->
impl_->url = conf.get_str();
77 this->
impl_->is_security_type =
true;
86template <
typename MsgT, SecurityType SecT>
90template <
typename MsgT, SecurityType SecT>
93 if constexpr (std::is_base_of_v<IntraDataType, typename MsgT::element_type>) {
97 return listen_intra(std::move(callback));
102 return listen_bytes([
this, callback = std::move(callback)](
const Bytes& data) {
103#ifndef VLINK_DISABLE_PROFILER
107 if constexpr (std::is_same_v<MsgT, Bytes>) {
115 VLOG_T(
"Subscriber deserialize failed, url: ", this->
impl_->url,
".");
124template <
typename MsgT, SecurityType SecT>
126 this->
impl_->set_manual_unloan(manual_unloan);
130template <
typename MsgT, SecurityType SecT>
132 this->
impl_->set_latency_and_lost_enabled(enable);
135template <
typename MsgT, SecurityType SecT>
137 return this->
impl_->is_latency_and_lost_enabled();
140template <
typename MsgT, SecurityType SecT>
142 return this->
impl_->get_latency();
145template <
typename MsgT, SecurityType SecT>
147 return this->
impl_->get_lost();
150template <
typename MsgT, SecurityType SecT>
153 this->
impl_->deinit_ext();
155 this->
impl_->init_ext();
161template <
typename MsgT, SecurityType SecT>
164 VLOG_F(
"Subscriber::listen_bytes() called before init().");
168 VLOG_F(
"Subscriber has already been listened, url: ", this->
impl_->url,
".");
171 bool ret = this->
impl_->listen([
this, callback = std::move(callback)](
const Bytes& data) {
176 VLOG_T(
"Subscriber decrypt failed, url: ", this->
impl_->url,
".");
188 this->
impl_->is_listened = ret;
193template <
typename MsgT, SecurityType SecT>
196 VLOG_F(
"Subscriber::listen_intra() called before init().");
200 VLOG_F(
"Subscriber has already been listened, url: ", this->
impl_->url,
".");
203 bool ret = this->impl_->listen([
this, callback = std::move(callback)](
const IntraData& intra_data) {
204#ifndef VLINK_DISABLE_PROFILER
209#if defined(NDEBUG) || defined(__ANDROID__)
210 auto intra_msg = std::static_pointer_cast<typename MsgT::element_type>(intra_data);
212 auto intra_msg = std::dynamic_pointer_cast<typename MsgT::element_type>(intra_data);
218 VLOG_T(
"Subscriber get intra data failed, url: ", this->impl_->url,
".");
Versatile 128-byte byte buffer with SBO, five ownership modes and compression helpers.
Definition bytes.h:113
RAII scope guard that brackets a CpuProfiler active interval.
Definition cpu_profiler_guard.h:67
std::function< void(const IntraData &)> IntraMsgCallback
Callback delivering an in-process IntraData message.
Definition node_impl.h:186
std::function< void(const Bytes &)> MsgCallback
Callback delivering a raw serialised message to a SubscriberImpl or GetterImpl.
Definition node_impl.h:177
std::unique_ptr< SubscriberImpl > impl_
Definition node.h:558
void invoke_callback(const CallbackT &callback, ArgsT &&... args)
Definition node-inl.h:345
virtual bool init()
Definition node-inl.h:39
std::optional< Security > security_
Definition node.h:559
TypeT get_default_value()
Definition node-inl.h:356
bool is_manual_unloan_
Definition node.h:563
std::atomic_bool has_inited_
Definition node.h:556
void enable_security()
Definition node-inl.h:332
static constexpr ImplType kImplType
Node role identifier (kSubscriber).
Definition subscriber.h:122
bool listen(MsgCallback &&callback)
Registers the receive callback for incoming messages.
Definition subscriber-inl.h:91
static UniquePtr create_unique(const std::string &url_str, InitType type=InitType::kWithInit)
Creates a Subscriber on the heap wrapped in a unique_ptr.
Definition subscriber-inl.h:39
std::shared_ptr< Subscriber< MsgT, SecT > > SharedPtr
Shared-pointer alias for heap allocation.
Definition subscriber.h:116
void mark_as_getter()
Changes this subscriber's role to kGetter (field-reader).
Definition subscriber-inl.h:151
bool is_latency_and_lost_enabled() const
Returns true if latency and sample-loss tracking is active.
Definition subscriber-inl.h:136
static SharedPtr create_shared(const std::string &url_str, InitType type=InitType::kWithInit)
Creates a Subscriber on the heap wrapped in a shared_ptr.
Definition subscriber-inl.h:45
SampleLostInfo get_lost() const
Returns cumulative sample delivery statistics.
Definition subscriber-inl.h:146
std::function< void(const MsgT &)> MsgCallback
User-facing callback type for received messages.
Definition subscriber.h:119
int64_t get_latency() const
Returns the most recently measured end-to-end message latency.
Definition subscriber-inl.h:141
void set_latency_and_lost_enabled(bool enable)
Enables or disables per-message latency and sample-loss tracking.
Definition subscriber-inl.h:131
std::unique_ptr< Subscriber< MsgT, SecT > > UniquePtr
Unique-pointer alias for heap allocation.
Definition subscriber.h:113
void set_manual_unloan(bool manual_unloan) override
Enables or disables manual-unloan mode for zero-copy receives.
Definition subscriber-inl.h:125
Subscriber(const ConfT &conf, InitType type=InitType::kWithInit)
Constructs a subscriber from a typed transport configuration object.
Definition subscriber-inl.h:52
RAII guard that automatically calls CpuProfiler::begin() and CpuProfiler::end().
Global singleton logger with three output styles and pluggable backends.
#define VLOG_F(...)
Definition logger.h:856
#define VLOG_T(...)
Definition logger.h:846
#define VUNLIKELY(...)
Shorthand alias for VLINK_UNLIKELY. Hints that the expression is unlikely true.
Definition macros.h:302
#define VLIKELY(...)
Shorthand alias for VLINK_LIKELY. Hints that the expression is likely true.
Definition macros.h:297
bool deserialize(const Bytes &src, T &des, TransportType transport)
Deserializes src bytes into des with explicit type and transport hints.
Definition serializer-inl.h:488
constexpr bool is_cdr_type() noexcept
Returns true if T is a FastDDS CDR-serialisable type.
Definition serializer-inl.h:648
std::string get_serialized_type() noexcept
Returns the serialisation type name string for T with explicit TypeT.
Definition serializer-inl.h:229
InitType
Controls whether a node is initialised immediately at construction.
Definition types.h:132
@ kWithInit
Initialise immediately in the constructor.
Definition types.h:134
@ kSubscribe
Message received by a Subscriber node.
Definition types.h:169
@ kIntra
In-process queue (intra://).
Definition types.h:109
@ kGetter
Field getter (retrieve latest value).
Definition types.h:96
std::shared_ptr< IntraDataType > IntraData
Shared-ownership handle for an IntraDataType payload.
Definition intra_data.h:96
@ kWithSecurity
Encrypted and authenticated transport.
Definition types.h:150
Compile-time type-detection and serialisation utilities for VLink messages.
Cumulative sample delivery statistics for a subscriber or getter.
Definition types.h:217
Detects whether type T is a std::shared_ptr specialization.
Definition traits.h:224
URL-based Conf dispatcher that routes to the correct transport backend.
Definition url.h:161
Type-safe event-model subscriber for VLink topics.
URL-based transport configuration dispatcher for VLink nodes.