VLink 2.0.0
A high-performance communication middleware
Loading...
Searching...
No Matches
server-inl.h
Go to the documentation of this file.
1/*
2 * Copyright (C) 2026 by Thun Lu. All rights reserved.
3 * Author: Thun Lu <thun.lu@zohomail.cn>
4 * Repo: https://github.com/thun-res/vlink
5 * _ __ __ _ __
6 * | | / / / / (_) ____ / /__
7 * | | / / / / / / / __ \ / //_/
8 * | |/ / / /___ / / / / / / / ,<
9 * |___/ /_____/ /_/ /_/ /_/ /_/|_|
10 *
11 * Licensed under the Apache License, Version 2.0 (the "License");
12 * you may not use this file except in compliance with the License.
13 * You may obtain a copy of the License at
14 *
15 * http://www.apache.org/licenses/LICENSE-2.0
16 *
17 * Unless required by applicable law or agreed to in writing, software
18 * distributed under the License is distributed on an "AS IS" BASIS,
19 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20 * See the License for the specific language governing permissions and
21 * limitations under the License.
22 */
23
24#pragma once
25
26#include <memory>
27#include <string>
28#include <utility>
29
31#include "../base/logger.h"
32#include "../impl/url.h"
33#include "../serializer.h"
34#include "../server.h"
35
36namespace vlink {
37
38template <typename ReqT, typename RespT, SecurityType SecT>
40 const std::string& url_str, InitType type) {
41 return std::make_unique<Server<ReqT, RespT, SecT>>(url_str, type);
42}
43
44template <typename ReqT, typename RespT, SecurityType SecT>
46 const std::string& url_str, InitType type) {
47 return std::make_shared<Server<ReqT, RespT, SecT>>(url_str, type);
48}
49
50template <typename ReqT, typename RespT, SecurityType SecT>
51template <typename ConfT, typename>
52inline Server<ReqT, RespT, SecT>::Server(const ConfT& conf, InitType type) {
53 static_assert(ConfT::get_allow_impl_type() & kImplType, "Conf does not support server mode.");
54
55 if VUNLIKELY (!conf.parse(kImplType) || !conf.is_valid()) {
56 VLOG_F(conf, " server configuration is invalid or could not be parsed.");
57 return;
58 }
59
60 this->impl_ = conf.create_server();
61
62 if VUNLIKELY (!this->impl_) {
63 VLOG_F(conf, " server implementation not available for this transport.");
64 return;
65 }
66
67 this->impl_->transport_type = conf.get_transport_type();
69
70 if constexpr (kHasResp) {
71 const auto resp_ser_type = Serializer::get_serialized_type<kRespType, RespT>();
72
73 if (!this->impl_->ser_type.empty() || !resp_ser_type.empty()) {
74 this->impl_->ser_type += ";" + resp_ser_type;
75 }
76 }
77
78 {
79 constexpr auto kReqSchemaType = Serializer::get_schema_type<kReqType, ReqT>();
80 constexpr auto kRespSchemaType = Serializer::get_schema_type<kRespType, RespT>();
81
82 if constexpr (kHasResp && kReqSchemaType != kRespSchemaType) {
83 this->impl_->schema_type = SchemaType::kUnknown;
84 } else {
85 this->impl_->schema_type = kReqSchemaType;
86 }
87 }
88
89 this->impl_->is_cdr_type = Serializer::is_cdr_type<ReqT>();
90 this->impl_->is_resp_type = kHasResp;
91
92 if constexpr (std::is_same_v<ConfT, Url>) {
93 this->impl_->url = conf.get_str();
94 }
95
96 if constexpr (SecT == SecurityType::kWithSecurity) {
97 this->impl_->is_security_type = true;
98 this->enable_security();
99 }
100
101 if (type == InitType::kWithInit) {
102 this->init(); // NOLINT(clang-analyzer-optin.cplusplus.VirtualCall)
103 }
104}
105
106template <typename ReqT, typename RespT, SecurityType SecT>
107inline Server<ReqT, RespT, SecT>::Server(const std::string& url_str, InitType type)
108 : Server<ReqT, RespT, SecT>(Url(url_str), type) {}
109
110template <typename ReqT, typename RespT, SecurityType SecT>
112 static_assert(!kHasResp, "Reply not supported; use listen(ReqRespCallback&&) instead.");
113
114 this->impl_->is_sync_type = true;
115
116 return listen_bytes([this, callback = std::move(callback)](uint64_t, const Bytes& req_data, Bytes*) {
117#ifndef VLINK_DISABLE_PROFILER
118 CpuProfilerGuard profiler_guard(this->impl_->profiler.get());
119#endif
120
121 if constexpr (std::is_same_v<ReqT, Bytes>) {
122 (void)this;
123
124 callback(req_data);
125 } else {
126 thread_local auto req = this->template get_default_value<ReqT>();
127
128 if VUNLIKELY (!Serializer::deserialize<kReqType>(req_data, req, this->impl_->transport_type)) {
129 VLOG_T("Server deserialize failed, url: ", this->impl_->url, ".");
130 return;
131 }
132
133 callback(req);
134 }
135 });
136}
137
138template <typename ReqT, typename RespT, SecurityType SecT>
140 static_assert(kHasResp, "Must have reply.");
141
142 this->impl_->is_sync_type = true;
143
144 return listen_bytes([this, callback = std::move(callback)](uint64_t req_id, const Bytes& req_data, Bytes* resp_data) {
145#ifndef VLINK_DISABLE_PROFILER
146 CpuProfilerGuard profiler_guard(this->impl_->profiler.get());
147#endif
148
149 if VUNLIKELY (!resp_data) {
150 VLOG_E("Server resp_data pointer is null.");
151 return;
152 }
154 if constexpr (std::is_same_v<ReqT, Bytes> && std::is_same_v<RespT, Bytes>) {
155 (void)this;
156
157 callback(req_data, *resp_data);
158
159 reply_bytes<true>(req_id, *resp_data, true, resp_data);
160 } else {
161 thread_local auto req = this->template get_default_value<ReqT>();
162 thread_local auto resp = this->template get_default_value<RespT>();
163
164 if VUNLIKELY (!Serializer::deserialize<kReqType>(req_data, req, this->impl_->transport_type)) {
165 VLOG_T("Server deserialize failed, url: ", this->impl_->url, ".");
166 return;
167 }
168
169 callback(req, resp);
170
171 if constexpr (SecT != SecurityType::kWithSecurity) {
172 if (this->is_support_loan_) {
173 size_t ser_size = Serializer::get_serialized_size<kRespType>(resp);
174
175 *resp_data = this->impl_->loan(ser_size);
176
177 if VUNLIKELY (ser_size != 0 && resp_data->empty()) {
178 return;
179 }
180 }
181 }
182
183 if VUNLIKELY (!Serializer::serialize<kRespType>(resp, *resp_data, this->impl_->transport_type)) {
184 VLOG_T("Server serialize failed, url: ", this->impl_->url, ".");
185
186 if constexpr (SecT != SecurityType::kWithSecurity) {
187 if (this->is_support_loan_) {
188 this->impl_->return_loan(*resp_data);
189 }
190 }
191
192 return;
193 }
194
195 reply_bytes<true>(req_id, *resp_data, true, resp_data);
196 }
197 });
199
200template <typename ReqT, typename RespT, SecurityType SecT>
202 static_assert(kHasResp, "Must have reply.");
203
204 this->impl_->is_sync_type = false;
205
206 return listen_bytes([this, callback = std::move(callback)](uint64_t req_id, const Bytes& req_data, Bytes*) {
207#ifndef VLINK_DISABLE_PROFILER
208 CpuProfilerGuard profiler_guard(this->impl_->profiler.get());
209#endif
210
211 if constexpr (std::is_same_v<ReqT, Bytes>) {
212 (void)this;
213
214 callback(req_id, req_data);
215 } else {
216 thread_local auto req = this->template get_default_value<ReqT>();
217
218 if VUNLIKELY (!Serializer::deserialize<kReqType>(req_data, req, this->impl_->transport_type)) {
219 VLOG_T("Server deserialize failed, url: ", this->impl_->url, ".");
220 return;
221 }
222
223 callback(req_id, req);
225 });
226}
227
228template <typename ReqT, typename RespT, SecurityType SecT>
229inline bool Server<ReqT, RespT, SecT>::reply(uint64_t req_id, const RespT& resp) {
230 static_assert(kHasResp, "Reply requires a response type.");
231
232 if VUNLIKELY (!this->impl_->is_listened) {
233 VLOG_F("Server::reply() requires listen() to be called first.");
234 }
235
236 if VUNLIKELY (this->impl_->is_sync_type) {
237 VLOG_F("Server::reply() is not available in synchronous listen mode.");
238 }
240 if constexpr (std::is_same_v<RespT, Bytes>) {
241 return reply_bytes<false>(req_id, resp, false);
242 } else {
243 Bytes resp_data;
244
245 if constexpr (SecT != SecurityType::kWithSecurity) {
246 if (this->is_support_loan_) {
247 size_t ser_size = Serializer::get_serialized_size<kRespType>(resp);
248
249 resp_data = this->impl_->loan(ser_size);
250
251 if VUNLIKELY (ser_size != 0 && resp_data.empty()) {
252 return false;
253 }
254 }
255 }
256
257 if VUNLIKELY (!Serializer::serialize<kRespType>(resp, resp_data, this->impl_->transport_type)) {
258 VLOG_T("Server serialize failed, url: ", this->impl_->url, ".");
259
260 if constexpr (SecT != SecurityType::kWithSecurity) {
261 if (this->is_support_loan_) {
262 this->impl_->return_loan(resp_data);
263 }
264 }
265
266 return false;
267 }
268
269 bool ret = reply_bytes<false>(req_id, resp_data, false);
270
271 return ret;
272 }
273}
274
275template <typename ReqT, typename RespT, SecurityType SecT>
276inline bool Server<ReqT, RespT, SecT>::has_clients() const {
277 return this->impl_->has_clients();
278}
279
280template <typename ReqT, typename RespT, SecurityType SecT>
281inline bool Server<ReqT, RespT, SecT>::listen_bytes(NodeImpl::ReqRespCallback&& callback) {
282 if VUNLIKELY (!this->has_inited_) {
283 VLOG_F("Server::listen_bytes() called before init().");
284 }
285
286 if VUNLIKELY (this->impl_->is_listened) {
287 VLOG_F("Server has already been listened, url: ", this->impl_->url, ".");
288 }
289
290 bool ret = this->impl_->listen(
291 [this, callback = std::move(callback)](uint64_t req_id, const Bytes& req_data, Bytes* resp_data) {
292 if constexpr (SecT == SecurityType::kWithSecurity) {
293 Bytes sec_req_data;
294 if VUNLIKELY (!this->security_->decrypt(req_data, sec_req_data)) {
295 VLOG_T("Server decrypt failed, url: ", this->impl_->url, ".");
296 return;
297 }
298
299 this->invoke_callback(callback, req_id, sec_req_data, resp_data);
300 } else {
301 this->impl_->try_record(ActionType::kServerRequest, req_data);
302
303 this->invoke_callback(callback, req_id, req_data, resp_data);
304 }
305 });
306
307 this->impl_->is_listened = ret;
308
309 return ret;
310}
311
312template <typename ReqT, typename RespT, SecurityType SecT>
313template <bool HasPtrT>
314inline bool Server<ReqT, RespT, SecT>::reply_bytes(uint64_t req_id, const Bytes& resp_data, bool is_sync,
315 Bytes* resp_data_ptr) {
316 if VUNLIKELY (!this->has_inited_) {
317 VLOG_F("Server::reply_bytes() called before init().");
318 }
319
320 if constexpr (SecT == SecurityType::kWithSecurity) {
321 Bytes sec_resp_data;
322
323 if VUNLIKELY (!this->security_->encrypt(resp_data, sec_resp_data)) {
324 VLOG_T("Server encrypt failed, url: ", this->impl_->url, ".");
325 return false;
326 }
327
328 if constexpr (HasPtrT) {
329 *resp_data_ptr = sec_resp_data;
330 }
331
332 return this->impl_->reply(req_id, sec_resp_data, is_sync);
333 } else {
334 if constexpr (HasPtrT) {
335 *resp_data_ptr = resp_data;
336 }
337
338 this->impl_->try_record(ActionType::kServerResponse, resp_data);
339
340 return this->impl_->reply(req_id, resp_data, is_sync);
341 }
342}
343
344} // namespace vlink
RAII guard that automatically calls CpuProfiler::begin() and CpuProfiler::end().
Global singleton logger with three output styles and pluggable backends.
#define VLOG_E(...)
Definition logger.h:854
#define VLOG_F(...)
Definition logger.h:856
#define VLOG_T(...)
Definition logger.h:846
#define VUNLIKELY(...)
Shorthand alias for VLINK_UNLIKELY. Hints that the expression is unlikely true.
Definition macros.h:302
Compile-time type-detection and serialisation utilities for VLink messages.
Type-safe method-model server (handler side) for VLink RPC.
URL-based transport configuration dispatcher for VLink nodes.