VLink 2.0.0
A high-performance communication middleware
载入中...
搜索中...
未找到
client-inl.h
浏览该文件的文档.
1/*
2 * Copyright (C) 2026 by Thun Lu. All rights reserved.
3 * Author: Thun Lu <thun.lu@zohomail.cn>
4 * Repo: https://github.com/thun-res/vlink
5 * _ __ __ _ __
6 * | | / / / / (_) ____ / /__
7 * | | / / / / / / / __ \ / //_/
8 * | |/ / / /___ / / / / / / / ,<
9 * |___/ /_____/ /_/ /_/ /_/ /_/|_|
10 *
11 * Licensed under the Apache License, Version 2.0 (the "License");
12 * you may not use this file except in compliance with the License.
13 * You may obtain a copy of the License at
14 *
15 * http://www.apache.org/licenses/LICENSE-2.0
16 *
17 * Unless required by applicable law or agreed to in writing, software
18 * distributed under the License is distributed on an "AS IS" BASIS,
19 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20 * See the License for the specific language governing permissions and
21 * limitations under the License.
22 */
23
24#pragma once
25
26#include <memory>
27#include <string>
28#include <utility>
29
31#include "../base/logger.h"
32#include "../client.h"
33#include "../impl/url.h"
34#include "../serializer.h"
35
36namespace vlink {
37
38template <typename ReqT, typename RespT, SecurityType SecT>
40 const std::string& url_str, InitType type) {
41 return std::make_unique<Client<ReqT, RespT, SecT>>(url_str, type);
42}
43
44template <typename ReqT, typename RespT, SecurityType SecT>
46 const std::string& url_str, InitType type) {
47 return std::make_shared<Client<ReqT, RespT, SecT>>(url_str, type);
48}
49
50template <typename ReqT, typename RespT, SecurityType SecT>
51template <typename ConfT, typename>
52inline Client<ReqT, RespT, SecT>::Client(const ConfT& conf, InitType type) {
53 static_assert(ConfT::get_allow_impl_type() & kImplType, "Conf does not support client mode.");
54
55 if VUNLIKELY (!conf.parse(kImplType) || !conf.is_valid()) {
56 VLOG_F(conf, " client configuration is invalid or could not be parsed.");
57 return;
58 }
59
60 this->impl_ = conf.create_client();
61
62 if VUNLIKELY (!this->impl_) {
63 VLOG_F(conf, " client implementation not available for this transport.");
64 return;
65 }
66
67 this->impl_->transport_type = conf.get_transport_type();
69
70 if constexpr (kHasResp) {
71 const auto resp_ser_type = Serializer::get_serialized_type<kRespType, RespT>();
72
73 if (!this->impl_->ser_type.empty() || !resp_ser_type.empty()) {
74 this->impl_->ser_type += ";" + resp_ser_type;
75 }
76 }
77
78 {
79 constexpr auto kReqSchemaType = Serializer::get_schema_type<kReqType, ReqT>();
80 constexpr auto kRespSchemaType = Serializer::get_schema_type<kRespType, RespT>();
81
82 if constexpr (kHasResp && kReqSchemaType != kRespSchemaType) {
83 this->impl_->schema_type = SchemaType::kUnknown;
84 } else {
85 this->impl_->schema_type = kReqSchemaType;
86 }
87 }
88
89 this->impl_->is_cdr_type = Serializer::is_cdr_type<ReqT>();
90 this->impl_->is_resp_type = kHasResp;
91
92 if constexpr (std::is_same_v<ConfT, Url>) {
93 this->impl_->url = conf.get_str();
94 }
95
96 if constexpr (SecT == SecurityType::kWithSecurity) {
97 this->impl_->is_security_type = true;
98 this->enable_security();
99 }
100
101 if (type == InitType::kWithInit) {
102 this->init(); // NOLINT(clang-analyzer-optin.cplusplus.VirtualCall)
103 }
104}
105
106template <typename ReqT, typename RespT, SecurityType SecT>
107inline Client<ReqT, RespT, SecT>::Client(const std::string& url_str, InitType type)
108 : Client<ReqT, RespT, SecT>(Url(url_str), type) {}
109
110template <typename ReqT, typename RespT, SecurityType SecT>
112 this->deinit();
113 std::lock_guard lock(future_mtx_);
114 future_map_.clear();
115}
116
117template <typename ReqT, typename RespT, SecurityType SecT>
119 this->impl_->detect_connected(std::move(callback));
120}
121
122template <typename ReqT, typename RespT, SecurityType SecT>
123inline bool Client<ReqT, RespT, SecT>::wait_for_connected(std::chrono::milliseconds timeout) {
124 if VUNLIKELY (timeout.count() == 0) {
125 VLOG_W("Client: Timeout value is 0, using infinite wait instead.");
126 timeout = Timeout::kInfinite;
127 }
128
129 return this->impl_->wait_for_connected(timeout);
130}
131
132template <typename ReqT, typename RespT, SecurityType SecT>
134 return this->impl_->is_connected();
135}
136
137template <typename ReqT, typename RespT, SecurityType SecT>
138inline bool Client<ReqT, RespT, SecT>::invoke(const ReqT& req, RespT& resp, std::chrono::milliseconds timeout) {
139 if VUNLIKELY (timeout.count() == 0) {
140 VLOG_W("Client: Timeout value is 0, using infinite wait instead.");
141 timeout = Timeout::kInfinite;
142 }
143
144#ifndef VLINK_DISABLE_PROFILER
145 CpuProfilerGuard profiler_guard(this->impl_->profiler.get());
146#endif
147
148 static_assert(kHasResp, "Invoke requires a response type.");
149
150 bool ret = false;
151
152 if constexpr (std::is_same_v<ReqT, Bytes> && std::is_same_v<RespT, Bytes>) {
153 ret = call_bytes(req, [&resp](const Bytes& resp_data) { resp = resp_data; }, timeout);
154 } else {
155 Bytes req_data;
156
157 if constexpr (SecT != SecurityType::kWithSecurity) {
158 if (this->is_support_loan_) {
160
161 req_data = this->impl_->loan(ser_size);
162
163 if VUNLIKELY (ser_size != 0 && req_data.empty()) {
164 return false;
165 }
166 }
167 }
169 if VUNLIKELY (!Serializer::serialize<kReqType>(req, req_data, this->impl_->transport_type)) {
170 VLOG_T("Client serialize failed, url: ", this->impl_->url, ".");
171
172 if constexpr (SecT != SecurityType::kWithSecurity) {
173 if (this->is_support_loan_) {
174 this->impl_->return_loan(req_data);
175 }
176 }
177
178 return false;
179 }
180
181 ret = call_bytes(
182 req_data,
183 [this, &resp](const Bytes& resp_data) {
184 if VUNLIKELY (!Serializer::deserialize<kRespType>(resp_data, resp, this->impl_->transport_type)) {
185 VLOG_T("Client deserialize failed, url: ", this->impl_->url, ".");
186 }
187 },
188 timeout);
189 }
190
191 return ret;
192}
194template <typename ReqT, typename RespT, SecurityType SecT>
195inline std::optional<RespT> Client<ReqT, RespT, SecT>::invoke(const ReqT& req, std::chrono::milliseconds timeout) {
196 if VUNLIKELY (timeout.count() == 0) {
197 VLOG_W("Client: Timeout value is 0, using infinite wait instead.");
198 timeout = Timeout::kInfinite;
199 }
200
201#ifndef VLINK_DISABLE_PROFILER
202 CpuProfilerGuard profiler_guard(this->impl_->profiler.get());
203#endif
205 thread_local auto resp = this->template get_default_value<RespT>();
206
207 if VLIKELY (invoke(req, resp, timeout)) {
208 return std::make_optional<RespT>(resp);
209 }
210
211 return std::nullopt;
212}
213
214template <typename ReqT, typename RespT, SecurityType SecT>
215inline bool Client<ReqT, RespT, SecT>::invoke(const ReqT& req, RespCallback&& callback) {
216#ifndef VLINK_DISABLE_PROFILER
217 CpuProfilerGuard profiler_guard(this->impl_->profiler.get());
218#endif
219
220 static_assert(kHasResp, "Invoke requires a response type.");
221
222 bool ret = false;
223
224 if constexpr (std::is_same_v<ReqT, Bytes> && std::is_same_v<RespT, Bytes>) {
225 ret = call_bytes(req, [callback = std::move(callback)](const Bytes& resp_data) { callback(resp_data); });
226 } else {
227 Bytes req_data;
228
229 if constexpr (SecT != SecurityType::kWithSecurity) {
230 if (this->is_support_loan_) {
231 size_t ser_size = Serializer::get_serialized_size<kReqType>(req);
232
233 req_data = this->impl_->loan(ser_size);
234
235 if VUNLIKELY (ser_size != 0 && req_data.empty()) {
236 return false;
237 }
238 }
239 }
240
241 if VUNLIKELY (!Serializer::serialize<kReqType>(req, req_data, this->impl_->transport_type)) {
242 VLOG_T("Client serialize failed, url: ", this->impl_->url, ".");
244 if constexpr (SecT != SecurityType::kWithSecurity) {
245 if (this->is_support_loan_) {
246 this->impl_->return_loan(req_data);
247 }
248 }
249
250 return false;
251 }
252
253 ret = call_bytes(req_data, [this, callback = std::move(callback)](const Bytes& resp_data) {
254 thread_local auto resp = this->template get_default_value<RespT>();
255
256 if VUNLIKELY (!Serializer::deserialize<kRespType>(resp_data, resp, this->impl_->transport_type)) {
257 VLOG_T("Client deserialize failed, url: ", this->impl_->url, ".");
258 return;
259 }
260
261 callback(resp);
262 });
263 }
264
265 return ret;
266}
267
268template <typename ReqT, typename RespT, SecurityType SecT>
269inline std::future<RespT> Client<ReqT, RespT, SecT>::async_invoke(const ReqT& req) {
270#ifndef VLINK_DISABLE_PROFILER
271 CpuProfilerGuard profiler_guard(this->impl_->profiler.get());
272#endif
273
274 static_assert(kHasResp, "async_invoke requires a response type.");
275
276 auto pro = std::make_shared<std::promise<RespT>>();
277 auto future = pro->get_future();
278
279 bool ret = false;
280 int64_t target_seq = 0;
281
282 {
283 std::lock_guard lock(future_mtx_);
284 target_seq = future_seq_++;
285 future_map_.emplace(target_seq, pro);
286 }
288 auto cleanup_on_error = [this, target_seq, pro](const std::string& error_str) {
289 std::lock_guard lock(future_mtx_);
290 future_map_.erase(target_seq);
291
292 try {
293 throw Exception::RuntimeError(error_str);
294 } catch (std::exception&) {
295 pro->set_exception(std::current_exception());
296 }
297 };
298
299 if constexpr (std::is_same_v<ReqT, Bytes> && std::is_same_v<RespT, Bytes>) {
300 ret = call_bytes(req, [this, target_seq](const Bytes& resp_data) {
301 std::lock_guard lock(future_mtx_);
302 auto it = future_map_.find(target_seq);
303 if (it != future_map_.end()) {
304 it->second->set_value(resp_data);
305 future_map_.erase(it);
306 }
307 });
308 } else {
309 Bytes req_data;
310
311 if constexpr (SecT != SecurityType::kWithSecurity) {
312 if (this->is_support_loan_) {
313 size_t ser_size = Serializer::get_serialized_size<kReqType>(req);
314 req_data = this->impl_->loan(ser_size);
315
316 if VUNLIKELY (ser_size != 0 && req_data.empty()) {
317 cleanup_on_error("Client async_invoke error (Failed to Loan)");
318 return future;
319 }
320 }
321 }
322
323 if VUNLIKELY (!Serializer::serialize<kReqType>(req, req_data, this->impl_->transport_type)) {
324 VLOG_T("Client serialize failed, url: ", this->impl_->url, ".");
325
326 if constexpr (SecT != SecurityType::kWithSecurity) {
327 if (this->is_support_loan_) {
328 this->impl_->return_loan(req_data);
329 }
330 }
331
332 cleanup_on_error("Client async_invoke error (Failed to serialize req)");
333 return future;
334 }
335
336 ret = call_bytes(req_data, [this, target_seq](const Bytes& resp_data) {
337 bool convert_success = false;
338
339 thread_local auto resp = this->template get_default_value<RespT>();
340
341 if VLIKELY (Serializer::deserialize<kRespType>(resp_data, resp, this->impl_->transport_type)) {
342 convert_success = true;
343 } else {
344 VLOG_T("Client deserialize failed, url: ", this->impl_->url, ".");
345 }
346
347 std::lock_guard lock(future_mtx_);
348
349 auto it = future_map_.find(target_seq);
350
351 if (it != future_map_.end()) {
352 if (convert_success) {
353 it->second->set_value(resp);
354 } else {
355 try {
356 throw Exception::RuntimeError("Client async_invoke error (Failed to deserialize resp)");
357 } catch (std::exception&) {
358 it->second->set_exception(std::current_exception());
359 }
360 }
361
362 future_map_.erase(it);
363 }
364 });
365 }
366
367 if VUNLIKELY (!ret) {
368 cleanup_on_error("Client async_invoke error (Failed to call)");
369 }
370
371 return future;
372}
373
374template <typename ReqT, typename RespT, SecurityType SecT>
375inline bool Client<ReqT, RespT, SecT>::send(const ReqT& req) {
376#ifndef VLINK_DISABLE_PROFILER
377 CpuProfilerGuard profiler_guard(this->impl_->profiler.get());
378#endif
379
380 static_assert(!kHasResp, "Send not supported; use invoke() for request-response.");
381
382 bool ret = false;
383
384 if constexpr (std::is_same_v<ReqT, Bytes>) {
385 ret = call_bytes(req);
386 } else {
387 Bytes req_data;
388
389 if constexpr (SecT != SecurityType::kWithSecurity) {
390 if (this->is_support_loan_) {
391 size_t ser_size = Serializer::get_serialized_size<kReqType>(req);
392
393 req_data = this->impl_->loan(ser_size);
394
395 if VUNLIKELY (ser_size != 0 && req_data.empty()) {
396 return false;
397 }
398 }
399 }
400
401 if VUNLIKELY (!Serializer::serialize<kReqType>(req, req_data, this->impl_->transport_type)) {
402 VLOG_T("Client serialize failed, url: ", this->impl_->url, ".");
403
404 if constexpr (SecT != SecurityType::kWithSecurity) {
405 if (this->is_support_loan_) {
406 this->impl_->return_loan(req_data);
407 }
408 }
409
410 return false;
411 }
412
413 ret = call_bytes(req_data);
414 }
415
416 return ret;
417}
418
419template <typename ReqT, typename RespT, SecurityType SecT>
420inline bool Client<ReqT, RespT, SecT>::call_bytes(const Bytes& req_data, NodeImpl::MsgCallback&& callback,
421 std::chrono::milliseconds timeout) {
422 if constexpr (SecT == SecurityType::kWithSecurity) {
423 Bytes req_sec_data;
424
425 if VUNLIKELY (!this->security_->encrypt(req_data, req_sec_data)) {
426 VLOG_T("Client encrypt failed, url: ", this->impl_->url, ".");
427 return false;
428 }
429
430 return this->impl_->call(
431 req_sec_data,
432 [this, callback = std::move(callback)](const Bytes& resp_data) {
433 Bytes resp_sec_data;
434
435 if VUNLIKELY (!this->security_->decrypt(resp_data, resp_sec_data)) {
436 VLOG_T("Client decrypt failed, url: ", this->impl_->url, ".");
437 return;
438 }
439
440 if (callback) {
441 this->invoke_callback(callback, resp_sec_data);
442 }
443 },
444 timeout);
445 } else {
446 this->impl_->try_record(ActionType::kClientRequest, req_data);
447
448 return this->impl_->call(
449 req_data,
450 [this, callback = std::move(callback)](const Bytes& resp_data) {
451 if (callback) {
452 this->impl_->try_record(ActionType::kClientResponse, resp_data);
453
454 this->invoke_callback(callback, resp_data);
455 }
456 },
457 timeout);
458 }
459}
460
461} // namespace vlink
Type-safe method-model client (caller side) for VLink RPC.
RAII guard that automatically calls CpuProfiler::begin() and CpuProfiler::end().
Global singleton logger with three output styles and pluggable backends.
#define VLOG_F(...)
定义 logger.h:856
#define VLOG_W(...)
定义 logger.h:852
#define VLOG_T(...)
定义 logger.h:846
#define VUNLIKELY(...)
Shorthand alias for VLINK_UNLIKELY. Hints that the expression is unlikely true.
定义 macros.h:302
#define VLIKELY(...)
Shorthand alias for VLINK_LIKELY. Hints that the expression is likely true.
定义 macros.h:297
Compile-time type-detection and serialisation utilities for VLink messages.
URL-based transport configuration dispatcher for VLink nodes.