38template <
typename ReqT,
typename RespT, SecurityType SecT>
40 const std::string& url_str,
InitType type) {
41 return std::make_unique<Server<ReqT, RespT, SecT>>(url_str, type);
44template <
typename ReqT,
typename RespT, SecurityType SecT>
46 const std::string& url_str,
InitType type) {
47 return std::make_shared<Server<ReqT, RespT, SecT>>(url_str, type);
50template <
typename ReqT,
typename RespT, SecurityType SecT>
51template <
typename ConfT,
typename>
53 static_assert(ConfT::get_allow_impl_type() &
kImplType,
"Conf does not support server mode.");
56 VLOG_F(conf,
" server configuration is invalid or could not be parsed.");
60 this->
impl_ = conf.create_server();
63 VLOG_F(conf,
" server implementation not available for this transport.");
67 this->
impl_->transport_type = conf.get_transport_type();
73 if (!this->
impl_->ser_type.empty() || !resp_ser_type.empty()) {
74 this->
impl_->ser_type +=
";" + resp_ser_type;
79 constexpr auto kReqSchemaType = Serializer::get_schema_type<kReqType, ReqT>();
80 constexpr auto kRespSchemaType = Serializer::get_schema_type<kRespType, RespT>();
82 if constexpr (
kHasResp && kReqSchemaType != kRespSchemaType) {
85 this->
impl_->schema_type = kReqSchemaType;
92 if constexpr (std::is_same_v<ConfT, Url>) {
93 this->
impl_->url = conf.get_str();
97 this->
impl_->is_security_type =
true;
106template <
typename ReqT,
typename RespT, SecurityType SecT>
108 :
Server<ReqT, RespT, SecT>(
Url(url_str), type) {}
110template <
typename ReqT,
typename RespT, SecurityType SecT>
112 static_assert(!
kHasResp,
"Reply not supported; use listen(ReqRespCallback&&) instead.");
114 this->
impl_->is_sync_type =
true;
116 return listen_bytes([
this, callback = std::move(callback)](uint64_t,
const Bytes& req_data,
Bytes*) {
117#ifndef VLINK_DISABLE_PROFILER
121 if constexpr (std::is_same_v<ReqT, Bytes>) {
129 VLOG_T(
"Server deserialize failed, url: ", this->
impl_->url,
".");
138template <
typename ReqT,
typename RespT, SecurityType SecT>
140 static_assert(
kHasResp,
"Must have reply.");
142 this->
impl_->is_sync_type =
true;
144 return listen_bytes([
this, callback = std::move(callback)](uint64_t req_id,
const Bytes& req_data,
Bytes* resp_data) {
145#ifndef VLINK_DISABLE_PROFILER
150 VLOG_E(
"Server resp_data pointer is null.");
154 if constexpr (std::is_same_v<ReqT, Bytes> && std::is_same_v<RespT, Bytes>) {
157 callback(req_data, *resp_data);
159 reply_bytes<true>(req_id, *resp_data,
true, resp_data);
161 thread_local auto req = this->
template get_default_value<ReqT>();
165 VLOG_T(
"Server deserialize failed, url: ", this->
impl_->url,
".");
172 if (this->is_support_loan_) {
175 *resp_data = this->impl_->loan(ser_size);
184 VLOG_T(
"Server serialize failed, url: ", this->impl_->url,
".");
187 if (this->is_support_loan_) {
188 this->impl_->return_loan(*resp_data);
195 reply_bytes<true>(req_id, *resp_data,
true, resp_data);
200template <
typename ReqT,
typename RespT, SecurityType SecT>
202 static_assert(
kHasResp,
"Must have reply.");
204 this->
impl_->is_sync_type =
false;
206 return listen_bytes([
this, callback = std::move(callback)](uint64_t req_id,
const Bytes& req_data,
Bytes*) {
207#ifndef VLINK_DISABLE_PROFILER
211 if constexpr (std::is_same_v<ReqT, Bytes>) {
214 callback(req_id, req_data);
219 VLOG_T(
"Server deserialize failed, url: ", this->
impl_->url,
".");
223 callback(req_id, req);
228template <
typename ReqT,
typename RespT, SecurityType SecT>
230 static_assert(
kHasResp,
"Reply requires a response type.");
233 VLOG_F(
"Server::reply() requires listen() to be called first.");
237 VLOG_F(
"Server::reply() is not available in synchronous listen mode.");
240 if constexpr (std::is_same_v<RespT, Bytes>) {
241 return reply_bytes<false>(req_id, resp,
false);
246 if (this->is_support_loan_) {
249 resp_data = this->impl_->loan(ser_size);
258 VLOG_T(
"Server serialize failed, url: ", this->impl_->url,
".");
261 if (this->is_support_loan_) {
262 this->impl_->return_loan(resp_data);
269 bool ret = reply_bytes<false>(req_id, resp_data,
false);
275template <
typename ReqT,
typename RespT, SecurityType SecT>
276inline bool Server<ReqT, RespT, SecT>::has_clients()
const {
277 return this->impl_->has_clients();
280template <
typename ReqT,
typename RespT, SecurityType SecT>
283 VLOG_F(
"Server::listen_bytes() called before init().");
286 if VUNLIKELY (this->impl_->is_listened) {
287 VLOG_F(
"Server has already been listened, url: ", this->impl_->url,
".");
290 bool ret = this->impl_->listen(
291 [
this, callback = std::move(callback)](uint64_t req_id,
const Bytes& req_data,
Bytes* resp_data) {
294 if VUNLIKELY (!this->security_->decrypt(req_data, sec_req_data)) {
295 VLOG_T(
"Server decrypt failed, url: ", this->impl_->url,
".");
299 this->invoke_callback(callback, req_id, sec_req_data, resp_data);
303 this->invoke_callback(callback, req_id, req_data, resp_data);
307 this->impl_->is_listened = ret;
312template <
typename ReqT,
typename RespT, SecurityType SecT>
313template <
bool HasPtrT>
314inline bool Server<ReqT, RespT, SecT>::reply_bytes(uint64_t req_id,
const Bytes& resp_data,
bool is_sync,
315 Bytes* resp_data_ptr) {
317 VLOG_F(
"Server::reply_bytes() called before init().");
323 if VUNLIKELY (!this->security_->encrypt(resp_data, sec_resp_data)) {
324 VLOG_T(
"Server encrypt failed, url: ", this->impl_->url,
".");
328 if constexpr (HasPtrT) {
329 *resp_data_ptr = sec_resp_data;
332 return this->impl_->reply(req_id, sec_resp_data, is_sync);
334 if constexpr (HasPtrT) {
335 *resp_data_ptr = resp_data;
340 return this->impl_->reply(req_id, resp_data, is_sync);
Versatile 128-byte byte buffer with SBO, five ownership modes and compression helpers.
Definition bytes.h:113
bool empty() const noexcept
Returns true if the buffer is empty (no data pointer and size == 0).
Definition bytes.h:880
RAII scope guard that brackets a CpuProfiler active interval.
Definition cpu_profiler_guard.h:67
std::function< void(uint64_t, const Bytes &, Bytes *)> ReqRespCallback
Callback for ServerImpl request/response processing.
Definition node_impl.h:170
std::unique_ptr< ServerImpl > impl_
Definition node.h:558
virtual bool init()
Definition node-inl.h:39
TypeT get_default_value()
Definition node-inl.h:356
void enable_security()
Definition node-inl.h:332
bool reply(uint64_t req_id, const RespT &resp)
Sends an asynchronous response for a previously received request.
Definition server-inl.h:229
std::unique_ptr< Server< ReqT, RespT, SecT > > UniquePtr
Unique-pointer alias.
Definition server.h:111
static constexpr ImplType kImplType
Node role identifier (kServer).
Definition server.h:132
static SharedPtr create_shared(const std::string &url_str, InitType type=InitType::kWithInit)
Creates a Server on the heap wrapped in a shared_ptr.
Definition server-inl.h:45
static UniquePtr create_unique(const std::string &url_str, InitType type=InitType::kWithInit)
Creates a Server on the heap wrapped in a unique_ptr.
Definition server-inl.h:39
std::function< void(const ReqT &, RespT &)> ReqRespCallback
Synchronous callback – response filled in-place inside the callback.
Definition server.h:120
Server(const ConfT &conf, InitType type=InitType::kWithInit)
Constructs a server from a typed transport configuration object.
Definition server-inl.h:52
static constexpr bool kHasResp
true when RespT is not EmptyType (server has a response).
Definition server.h:135
std::function< void(uint64_t, const ReqT &)> ReqAsyncRespCallback
Asynchronous callback – response sent later via reply(req_id, resp).
Definition server.h:129
std::shared_ptr< Server< ReqT, RespT, SecT > > SharedPtr
Shared-pointer alias.
Definition server.h:114
std::function< void(const ReqT &)> ReqCallback
Fire-and-forget callback – no response (RespT must be EmptyType).
Definition server.h:117
bool listen(ReqCallback &&callback)
Registers a fire-and-forget request callback (no response).
Definition server-inl.h:111
bool listen_for_reply(ReqAsyncRespCallback &&callback)
Registers an asynchronous request callback (reply sent later).
Definition server-inl.h:201
RAII guard that automatically calls CpuProfiler::begin() and CpuProfiler::end().
Global singleton logger with three output styles and pluggable backends.
#define VLOG_E(...)
Definition logger.h:854
#define VLOG_F(...)
Definition logger.h:856
#define VLOG_T(...)
Definition logger.h:846
#define VUNLIKELY(...)
Shorthand alias for VLINK_UNLIKELY. Hints that the expression is unlikely true.
Definition macros.h:302
bool deserialize(const Bytes &src, T &des, TransportType transport)
Deserializes src bytes into des with explicit type and transport hints.
Definition serializer-inl.h:488
size_t get_serialized_size(const T &src) noexcept
Returns the exact serialised byte size for a given src value.
Definition serializer-inl.h:295
constexpr bool is_cdr_type() noexcept
Returns true if T is a FastDDS CDR-serialisable type.
Definition serializer-inl.h:648
std::string get_serialized_type() noexcept
Returns the serialisation type name string for T with explicit TypeT.
Definition serializer-inl.h:229
bool serialize(const T &src, Bytes &des, TransportType transport, uint8_t offset)
Serializes src into des with explicit type and transport hints.
Definition serializer-inl.h:355
InitType
Controls whether a node is initialised immediately at construction.
Definition types.h:132
@ kWithInit
Initialise immediately in the constructor.
Definition types.h:134
@ kUnknown
Decode category is not known.
Definition types.h:185
@ kServerRequest
RPC request received by a Server node.
Definition types.h:166
@ kServerResponse
RPC response sent by a Server node.
Definition types.h:167
@ kWithSecurity
Encrypted and authenticated transport.
Definition types.h:150
Compile-time type-detection and serialisation utilities for VLink messages.
Type-safe method-model server (handler side) for VLink RPC.
URL-based Conf dispatcher that routes to the correct transport backend.
Definition url.h:161
URL-based transport configuration dispatcher for VLink nodes.