38template <
typename MsgT, SecurityType SecT>
41 return std::make_unique<Publisher<MsgT, SecT>>(url_str, type);
44template <
typename MsgT, SecurityType SecT>
47 return std::make_shared<Publisher<MsgT, SecT>>(url_str, type);
50template <
typename MsgT, SecurityType SecT>
51template <
typename ConfT,
typename>
53 static_assert(ConfT::get_allow_impl_type() &
kImplType,
"Conf does not support publisher mode.");
56 VLOG_F(conf,
" publisher configuration is invalid or could not be parsed.");
60 this->
impl_ = conf.create_publisher();
63 VLOG_F(conf,
" publisher implementation not available for this transport.");
67 this->
impl_->transport_type = conf.get_transport_type();
69 this->
impl_->schema_type = Serializer::get_schema_type<kMsgType, MsgT>();
72 if constexpr (std::is_same_v<ConfT, Url>) {
73 this->
impl_->url = conf.get_str();
77 this->
impl_->is_security_type =
true;
86template <
typename MsgT, SecurityType SecT>
90template <
typename MsgT, SecurityType SecT>
92 return this->
impl_->detect_subscribers(std::move(callback));
95template <
typename MsgT, SecurityType SecT>
98 VLOG_W(
"Publisher: Timeout value is 0, using infinite wait instead.");
102 return this->
impl_->wait_for_subscribers(timeout);
105template <
typename MsgT, SecurityType SecT>
107 return this->
impl_->has_subscribers();
110template <
typename MsgT, SecurityType SecT>
112#ifndef VLINK_DISABLE_PROFILER
117 if (!this->
impl_->has_subscribers()) {
123 if constexpr (std::is_base_of_v<IntraDataType, typename MsgT::element_type>) {
127 return write_intra(msg);
132 if constexpr (std::is_same_v<MsgT, Bytes>) {
133 return write_bytes(msg);
141 msg_data = this->
impl_->loan(ser_size);
143 if VUNLIKELY (ser_size != 0 && msg_data.empty()) {
150 VLOG_T(
"Publisher serialize failed, url: ", this->impl_->url,
".");
153 if (this->is_support_loan_) {
161 bool ret = write_bytes(msg_data);
167template <
typename MsgT, SecurityType SecT>
171#ifndef VLINK_DISABLE_PROFILER
176 if (!this->
impl_->has_subscribers()) {
184template <
typename MsgT, SecurityType SecT>
187 this->
impl_->deinit_ext();
189 this->
impl_->init_ext();
195template <
typename MsgT, SecurityType SecT>
196inline bool Publisher<MsgT, SecT>::write_bytes(
const Bytes& data) {
200 if VUNLIKELY (!this->security_->encrypt(data, sec_data)) {
201 VLOG_T(
"Publisher encrypt failed, url: ", this->impl_->url,
".");
205 return this->impl_->write(sec_data);
209 return this->impl_->write(data);
213template <
typename MsgT, SecurityType SecT>
214inline bool Publisher<MsgT, SecT>::write_intra(
const IntraData& intra_data) {
215 return this->impl_->write(intra_data);
Versatile 128-byte byte buffer with SBO, five ownership modes and compression helpers.
Definition bytes.h:113
static Bytes shallow_copy(uint8_t *data, size_t size) noexcept
Creates a non-owning Bytes alias pointing to an external mutable buffer.
RAII scope guard that brackets a CpuProfiler active interval.
Definition cpu_profiler_guard.h:67
std::unique_ptr< PublisherImpl > impl_
Definition node.h:558
virtual bool init()
Definition node-inl.h:39
bool is_support_loan_
Definition node.h:562
std::atomic_bool has_inited_
Definition node.h:556
void enable_security()
Definition node-inl.h:332
void mark_as_setter()
Changes this publisher's role to kSetter (field-writer).
Definition publisher-inl.h:185
NodeImpl::ConnectCallback ConnectCallback
Callback type fired when subscriber presence changes.
Definition publisher.h:111
void detect_subscribers(ConnectCallback &&callback)
Registers a callback invoked when subscriber presence changes.
Definition publisher-inl.h:91
Publisher(const ConfT &conf, InitType type=InitType::kWithInit)
Constructs a publisher from a typed transport configuration object.
Definition publisher-inl.h:52
bool publish(const MsgT &msg, bool force=false)
Serialises and publishes msg to all current subscribers.
Definition publisher-inl.h:111
bool has_subscribers() const
Returns true if at least one subscriber is currently present.
Definition publisher-inl.h:106
static constexpr ImplType kImplType
Node role identifier (kPublisher).
Definition publisher.h:114
bool publish_fbb(const void *fbb, bool force=false)
Publishes a pre-finished FlatBufferBuilder buffer directly.
Definition publisher-inl.h:168
bool wait_for_subscribers(std::chrono::milliseconds timeout=Timeout::kDefaultInterval)
Blocks until at least one subscriber is present or the timeout expires.
Definition publisher-inl.h:96
std::unique_ptr< Publisher< MsgT, SecT > > UniquePtr
Unique-pointer alias for heap allocation.
Definition publisher.h:105
std::shared_ptr< Publisher< MsgT, SecT > > SharedPtr
Shared-pointer alias for heap allocation.
Definition publisher.h:108
static SharedPtr create_shared(const std::string &url_str, InitType type=InitType::kWithInit)
Creates a Publisher on the heap wrapped in a shared_ptr.
Definition publisher-inl.h:45
static UniquePtr create_unique(const std::string &url_str, InitType type=InitType::kWithInit)
Creates a Publisher on the heap wrapped in a unique_ptr.
Definition publisher-inl.h:39
RAII guard that automatically calls CpuProfiler::begin() and CpuProfiler::end().
Global singleton logger with three output styles and pluggable backends.
#define VLOG_F(...)
Definition logger.h:856
#define VLOG_W(...)
Definition logger.h:852
#define VLOG_T(...)
Definition logger.h:846
#define VUNLIKELY(...)
Shorthand alias for VLINK_UNLIKELY. Hints that the expression is unlikely true.
Definition macros.h:302
size_t get_serialized_size(const T &src) noexcept
Returns the exact serialised byte size for a given src value.
Definition serializer-inl.h:295
constexpr bool is_cdr_type() noexcept
Returns true if T is a FastDDS CDR-serialisable type.
Definition serializer-inl.h:648
std::string get_serialized_type() noexcept
Returns the serialisation type name string for T with explicit TypeT.
Definition serializer-inl.h:229
bool serialize(const T &src, Bytes &des, TransportType transport, uint8_t offset)
Serializes src into des with explicit type and transport hints.
Definition serializer-inl.h:355
InitType
Controls whether a node is initialised immediately at construction.
Definition types.h:132
@ kWithInit
Initialise immediately in the constructor.
Definition types.h:134
@ kPublish
Message published by a Publisher node.
Definition types.h:168
@ kIntra
In-process queue (intra://).
Definition types.h:109
@ kSetter
Field setter (update latest value).
Definition types.h:95
std::shared_ptr< IntraDataType > IntraData
Shared-ownership handle for an IntraDataType payload.
Definition intra_data.h:96
@ kWithSecurity
Encrypted and authenticated transport.
Definition types.h:150
Type-safe event-model publisher for VLink topics.
Compile-time type-detection and serialisation utilities for VLink messages.
Definition serializer-inl.h:123
static constexpr std::chrono::milliseconds kInfinite
Wait indefinitely (negative timeout).
Definition types.h:203
Detects whether type T is a std::shared_ptr specialization.
Definition traits.h:224
URL-based Conf dispatcher that routes to the correct transport backend.
Definition url.h:161
URL-based transport configuration dispatcher for VLink nodes.