VLink 2.0.0
A high-performance communication middleware
Loading...
Searching...
No Matches
publisher-inl.h
Go to the documentation of this file.
1/*
2 * Copyright (C) 2026 by Thun Lu. All rights reserved.
3 * Author: Thun Lu <thun.lu@zohomail.cn>
4 * Repo: https://github.com/thun-res/vlink
5 * _ __ __ _ __
6 * | | / / / / (_) ____ / /__
7 * | | / / / / / / / __ \ / //_/
8 * | |/ / / /___ / / / / / / / ,<
9 * |___/ /_____/ /_/ /_/ /_/ /_/|_|
10 *
11 * Licensed under the Apache License, Version 2.0 (the "License");
12 * you may not use this file except in compliance with the License.
13 * You may obtain a copy of the License at
14 *
15 * http://www.apache.org/licenses/LICENSE-2.0
16 *
17 * Unless required by applicable law or agreed to in writing, software
18 * distributed under the License is distributed on an "AS IS" BASIS,
19 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20 * See the License for the specific language governing permissions and
21 * limitations under the License.
22 */
23
24#pragma once
25
26#include <memory>
27#include <string>
28#include <utility>
29
31#include "../base/logger.h"
32#include "../impl/url.h"
33#include "../publisher.h"
34#include "../serializer.h"
35
36namespace vlink {
37
38template <typename MsgT, SecurityType SecT>
40 InitType type) {
41 return std::make_unique<Publisher<MsgT, SecT>>(url_str, type);
42}
43
44template <typename MsgT, SecurityType SecT>
46 InitType type) {
47 return std::make_shared<Publisher<MsgT, SecT>>(url_str, type);
48}
49
50template <typename MsgT, SecurityType SecT>
51template <typename ConfT, typename>
52inline Publisher<MsgT, SecT>::Publisher(const ConfT& conf, InitType type) {
53 static_assert(ConfT::get_allow_impl_type() & kImplType, "Conf does not support publisher mode.");
54
55 if VUNLIKELY (!conf.parse(kImplType) || !conf.is_valid()) {
56 VLOG_F(conf, " publisher configuration is invalid or could not be parsed.");
57 return;
58 }
59
60 this->impl_ = conf.create_publisher();
61
62 if VUNLIKELY (!this->impl_) {
63 VLOG_F(conf, " publisher implementation not available for this transport.");
64 return;
65 }
66
67 this->impl_->transport_type = conf.get_transport_type();
69 this->impl_->schema_type = Serializer::get_schema_type<kMsgType, MsgT>();
70 this->impl_->is_cdr_type = Serializer::is_cdr_type<MsgT>();
71
72 if constexpr (std::is_same_v<ConfT, Url>) {
73 this->impl_->url = conf.get_str();
74 }
75
76 if constexpr (SecT == SecurityType::kWithSecurity) {
77 this->impl_->is_security_type = true;
78 this->enable_security();
79 }
80
81 if (type == InitType::kWithInit) {
82 this->init(); // NOLINT(clang-analyzer-optin.cplusplus.VirtualCall)
83 }
84}
85
86template <typename MsgT, SecurityType SecT>
87inline Publisher<MsgT, SecT>::Publisher(const std::string& url_str, InitType type)
88 : Publisher<MsgT, SecT>(Url(url_str), type) {}
89
90template <typename MsgT, SecurityType SecT>
92 return this->impl_->detect_subscribers(std::move(callback));
93}
94
95template <typename MsgT, SecurityType SecT>
96inline bool Publisher<MsgT, SecT>::wait_for_subscribers(std::chrono::milliseconds timeout) {
97 if VUNLIKELY (timeout.count() == 0) {
98 VLOG_W("Publisher: Timeout value is 0, using infinite wait instead.");
99 timeout = Timeout::kInfinite;
100 }
101
102 return this->impl_->wait_for_subscribers(timeout);
103}
104
105template <typename MsgT, SecurityType SecT>
107 return this->impl_->has_subscribers();
108}
109
110template <typename MsgT, SecurityType SecT>
111inline bool Publisher<MsgT, SecT>::publish(const MsgT& msg, bool force) {
112#ifndef VLINK_DISABLE_PROFILER
113 CpuProfilerGuard profiler_guard(this->impl_->profiler.get());
114#endif
115
116 if (!force) {
117 if (!this->impl_->has_subscribers()) {
118 return false;
119 }
120 }
121
122 if constexpr (Traits::IsSharedPtr<MsgT>()) {
123 if constexpr (std::is_base_of_v<IntraDataType, typename MsgT::element_type>) {
124 static_assert(SecT != SecurityType::kWithSecurity, "IntraData must without security.");
125
126 if (this->impl_->transport_type == TransportType::kIntra) {
127 return write_intra(msg);
129 }
130 }
131
132 if constexpr (std::is_same_v<MsgT, Bytes>) {
133 return write_bytes(msg);
134 } else {
135 Bytes msg_data;
136
137 if constexpr (SecT != SecurityType::kWithSecurity) {
138 if (this->is_support_loan_) {
139 size_t ser_size = Serializer::get_serialized_size<kMsgType>(msg);
140
141 msg_data = this->impl_->loan(ser_size);
142
143 if VUNLIKELY (ser_size != 0 && msg_data.empty()) {
144 return false;
145 }
146 }
147 }
148
149 if VUNLIKELY (!Serializer::serialize<kMsgType>(msg, msg_data, this->impl_->transport_type)) {
150 VLOG_T("Publisher serialize failed, url: ", this->impl_->url, ".");
151
152 if constexpr (SecT != SecurityType::kWithSecurity) {
153 if (this->is_support_loan_) {
154 this->impl_->return_loan(msg_data);
155 }
156 }
157
158 return false;
159 }
160
161 bool ret = write_bytes(msg_data);
162
163 return ret;
164 }
165}
166
167template <typename MsgT, SecurityType SecT>
168bool Publisher<MsgT, SecT>::publish_fbb(const void* fbb, bool force) {
169 const auto* fbb_ptr = static_cast<const flatbuffers::FlatBufferBuilder*>(fbb);
170
171#ifndef VLINK_DISABLE_PROFILER
172 CpuProfilerGuard profiler_guard(this->impl_->profiler.get());
173#endif
174
175 if (!force) {
176 if (!this->impl_->has_subscribers()) {
177 return false;
178 }
179 }
181 return write_bytes(Bytes::shallow_copy(fbb_ptr->GetBufferPointer(), fbb_ptr->GetSize()));
182}
183
184template <typename MsgT, SecurityType SecT>
186 if VUNLIKELY (this->has_inited_) {
187 this->impl_->deinit_ext();
188 this->impl_->impl_type = kSetter;
189 this->impl_->init_ext();
190 } else {
191 this->impl_->impl_type = kSetter;
192 }
194
195template <typename MsgT, SecurityType SecT>
196inline bool Publisher<MsgT, SecT>::write_bytes(const Bytes& data) {
197 if constexpr (SecT == SecurityType::kWithSecurity) {
198 Bytes sec_data;
199
200 if VUNLIKELY (!this->security_->encrypt(data, sec_data)) {
201 VLOG_T("Publisher encrypt failed, url: ", this->impl_->url, ".");
202 return false;
204
205 return this->impl_->write(sec_data);
206 } else {
207 this->impl_->try_record(ActionType::kPublish, data);
208
209 return this->impl_->write(data);
210 }
211}
212
213template <typename MsgT, SecurityType SecT>
214inline bool Publisher<MsgT, SecT>::write_intra(const IntraData& intra_data) {
215 return this->impl_->write(intra_data);
216}
217
218} // namespace vlink
RAII guard that automatically calls CpuProfiler::begin() and CpuProfiler::end().
Global singleton logger with three output styles and pluggable backends.
#define VLOG_F(...)
Definition logger.h:856
#define VLOG_W(...)
Definition logger.h:852
#define VLOG_T(...)
Definition logger.h:846
#define VUNLIKELY(...)
Shorthand alias for VLINK_UNLIKELY. Hints that the expression is unlikely true.
Definition macros.h:302
Type-safe event-model publisher for VLink topics.
Compile-time type-detection and serialisation utilities for VLink messages.
Definition serializer-inl.h:123
URL-based transport configuration dispatcher for VLink nodes.