38template <
typename ReqT,
typename RespT, SecurityType SecT>
40 const std::string& url_str,
InitType type) {
41 return std::make_unique<Client<ReqT, RespT, SecT>>(url_str, type);
44template <
typename ReqT,
typename RespT, SecurityType SecT>
46 const std::string& url_str,
InitType type) {
47 return std::make_shared<Client<ReqT, RespT, SecT>>(url_str, type);
50template <
typename ReqT,
typename RespT, SecurityType SecT>
51template <
typename ConfT,
typename>
53 static_assert(ConfT::get_allow_impl_type() &
kImplType,
"Conf does not support client mode.");
56 VLOG_F(conf,
" client configuration is invalid or could not be parsed.");
60 this->
impl_ = conf.create_client();
63 VLOG_F(conf,
" client implementation not available for this transport.");
67 this->
impl_->transport_type = conf.get_transport_type();
73 if (!this->
impl_->ser_type.empty() || !resp_ser_type.empty()) {
74 this->
impl_->ser_type +=
";" + resp_ser_type;
79 constexpr auto kReqSchemaType = Serializer::get_schema_type<kReqType, ReqT>();
80 constexpr auto kRespSchemaType = Serializer::get_schema_type<kRespType, RespT>();
82 if constexpr (
kHasResp && kReqSchemaType != kRespSchemaType) {
85 this->
impl_->schema_type = kReqSchemaType;
92 if constexpr (std::is_same_v<ConfT, Url>) {
93 this->
impl_->url = conf.get_str();
97 this->
impl_->is_security_type =
true;
106template <
typename ReqT,
typename RespT, SecurityType SecT>
108 :
Client<ReqT, RespT, SecT>(
Url(url_str), type) {}
110template <
typename ReqT,
typename RespT, SecurityType SecT>
113 std::lock_guard lock(future_mtx_);
117template <
typename ReqT,
typename RespT, SecurityType SecT>
119 this->
impl_->detect_connected(std::move(callback));
122template <
typename ReqT,
typename RespT, SecurityType SecT>
125 VLOG_W(
"Client: Timeout value is 0, using infinite wait instead.");
129 return this->
impl_->wait_for_connected(timeout);
132template <
typename ReqT,
typename RespT, SecurityType SecT>
134 return this->
impl_->is_connected();
137template <
typename ReqT,
typename RespT, SecurityType SecT>
140 VLOG_W(
"Client: Timeout value is 0, using infinite wait instead.");
144#ifndef VLINK_DISABLE_PROFILER
148 static_assert(
kHasResp,
"Invoke requires a response type.");
152 if constexpr (std::is_same_v<ReqT, Bytes> && std::is_same_v<RespT, Bytes>) {
153 ret = call_bytes(req, [&resp](
const Bytes& resp_data) { resp = resp_data; }, timeout);
161 req_data = this->
impl_->loan(ser_size);
170 VLOG_T(
"Client serialize failed, url: ", this->
impl_->url,
".");
174 this->
impl_->return_loan(req_data);
183 [
this, &resp](
const Bytes& resp_data) {
185 VLOG_T(
"Client deserialize failed, url: ", this->
impl_->url,
".");
194template <
typename ReqT,
typename RespT, SecurityType SecT>
197 VLOG_W(
"Client: Timeout value is 0, using infinite wait instead.");
201#ifndef VLINK_DISABLE_PROFILER
208 return std::make_optional<RespT>(resp);
214template <
typename ReqT,
typename RespT, SecurityType SecT>
216#ifndef VLINK_DISABLE_PROFILER
220 static_assert(
kHasResp,
"Invoke requires a response type.");
224 if constexpr (std::is_same_v<ReqT, Bytes> && std::is_same_v<RespT, Bytes>) {
225 ret = call_bytes(req, [callback = std::move(callback)](
const Bytes& resp_data) { callback(resp_data); });
233 req_data = this->
impl_->loan(ser_size);
242 VLOG_T(
"Client serialize failed, url: ", this->impl_->url,
".");
246 this->
impl_->return_loan(req_data);
253 ret = call_bytes(req_data, [
this, callback = std::move(callback)](
const Bytes& resp_data) {
254 thread_local auto resp = this->
template get_default_value<RespT>();
257 VLOG_T(
"Client deserialize failed, url: ", this->impl_->url,
".");
268template <
typename ReqT,
typename RespT, SecurityType SecT>
270#ifndef VLINK_DISABLE_PROFILER
274 static_assert(
kHasResp,
"async_invoke requires a response type.");
276 auto pro = std::make_shared<std::promise<RespT>>();
277 auto future = pro->get_future();
280 int64_t target_seq = 0;
283 std::lock_guard lock(future_mtx_);
284 target_seq = future_seq_++;
285 future_map_.emplace(target_seq, pro);
288 auto cleanup_on_error = [
this, target_seq, pro](
const std::string& error_str) {
289 std::lock_guard lock(future_mtx_);
290 future_map_.erase(target_seq);
294 }
catch (std::exception&) {
295 pro->set_exception(std::current_exception());
299 if constexpr (std::is_same_v<ReqT, Bytes> && std::is_same_v<RespT, Bytes>) {
300 ret = call_bytes(req, [
this, target_seq](
const Bytes& resp_data) {
301 std::lock_guard lock(future_mtx_);
302 auto it = future_map_.find(target_seq);
303 if (it != future_map_.end()) {
304 it->second->set_value(resp_data);
305 future_map_.erase(it);
312 if (this->is_support_loan_) {
314 req_data = this->impl_->loan(ser_size);
316 if VUNLIKELY (ser_size != 0 && req_data.empty()) {
317 cleanup_on_error(
"Client async_invoke error (Failed to Loan)");
324 VLOG_T(
"Client serialize failed, url: ", this->impl_->url,
".");
327 if (this->is_support_loan_) {
328 this->impl_->return_loan(req_data);
332 cleanup_on_error(
"Client async_invoke error (Failed to serialize req)");
336 ret = call_bytes(req_data, [
this, target_seq](
const Bytes& resp_data) {
337 bool convert_success =
false;
339 thread_local auto resp = this->
template get_default_value<RespT>();
342 convert_success =
true;
344 VLOG_T(
"Client deserialize failed, url: ", this->impl_->url,
".");
347 std::lock_guard lock(future_mtx_);
349 auto it = future_map_.find(target_seq);
351 if (it != future_map_.end()) {
352 if (convert_success) {
353 it->second->set_value(resp);
357 }
catch (std::exception&) {
358 it->second->set_exception(std::current_exception());
362 future_map_.erase(it);
368 cleanup_on_error(
"Client async_invoke error (Failed to call)");
374template <
typename ReqT,
typename RespT, SecurityType SecT>
376#ifndef VLINK_DISABLE_PROFILER
380 static_assert(!
kHasResp,
"Send not supported; use invoke() for request-response.");
384 if constexpr (std::is_same_v<ReqT, Bytes>) {
385 ret = call_bytes(req);
393 req_data = this->
impl_->loan(ser_size);
402 VLOG_T(
"Client serialize failed, url: ", this->
impl_->url,
".");
406 this->
impl_->return_loan(req_data);
413 ret = call_bytes(req_data);
419template <
typename ReqT,
typename RespT, SecurityType SecT>
421 std::chrono::milliseconds timeout) {
425 if VUNLIKELY (!this->security_->encrypt(req_data, req_sec_data)) {
426 VLOG_T(
"Client encrypt failed, url: ", this->impl_->url,
".");
430 return this->impl_->call(
432 [
this, callback = std::move(callback)](
const Bytes& resp_data) {
435 if VUNLIKELY (!this->security_->decrypt(resp_data, resp_sec_data)) {
436 VLOG_T(
"Client decrypt failed, url: ", this->impl_->url,
".");
441 this->invoke_callback(callback, resp_sec_data);
448 return this->impl_->call(
450 [
this, callback = std::move(callback)](
const Bytes& resp_data) {
454 this->invoke_callback(callback, resp_data);
Versatile 128-byte byte buffer with SBO, five ownership modes and compression helpers.
定义 bytes.h:113
bool empty() const noexcept
Returns true if the buffer is empty (no data pointer and size == 0).
定义 bytes.h:880
void detect_connected(ConnectCallback &&callback)
Registers a callback invoked when the server connection state changes.
定义 client-inl.h:118
std::unique_ptr< Client< ReqT, RespT, SecT > > UniquePtr
Unique-pointer alias.
定义 client.h:126
static SharedPtr create_shared(const std::string &url_str, InitType type=InitType::kWithInit)
Creates a Client on the heap wrapped in a shared_ptr.
定义 client-inl.h:45
bool wait_for_connected(std::chrono::milliseconds timeout=Timeout::kDefaultInterval)
Blocks until a server is available or the timeout expires.
定义 client-inl.h:123
std::shared_ptr< Client< ReqT, RespT, SecT > > SharedPtr
Shared-pointer alias.
定义 client.h:129
Client(const ConfT &conf, InitType type=InitType::kWithInit)
Constructs a client from a typed transport configuration object.
定义 client-inl.h:52
static UniquePtr create_unique(const std::string &url_str, InitType type=InitType::kWithInit)
Creates a Client on the heap wrapped in a unique_ptr.
定义 client-inl.h:39
std::function< void(const RespT &)> RespCallback
Callback type for async response delivery.
定义 client.h:135
static constexpr bool kHasResp
true when RespT is not EmptyType (client expects a response).
定义 client.h:141
bool invoke(const ReqT &req, RespT &resp, std::chrono::milliseconds timeout=Timeout::kDefaultInterval)
Sends a request and blocks until the response is received.
定义 client-inl.h:138
std::future< RespT > async_invoke(const ReqT &req)
Sends a request and returns a std::future for the response.
定义 client-inl.h:269
~Client() override
定义 client-inl.h:111
bool is_connected() const
Returns true if a server is currently available.
定义 client-inl.h:133
static constexpr ImplType kImplType
Node role identifier (kClient).
定义 client.h:138
bool send(const ReqT &req)
Sends a fire-and-forget request with no response.
定义 client-inl.h:375
NodeImpl::ConnectCallback ConnectCallback
Callback type fired when server connection state changes.
定义 client.h:132
RAII scope guard that brackets a CpuProfiler active interval.
定义 cpu_profiler_guard.h:67
Indicates a general runtime failure.
定义 exception.h:86
std::function< void(const Bytes &)> MsgCallback
Callback delivering a raw serialised message to a SubscriberImpl or GetterImpl.
定义 node_impl.h:177
std::unique_ptr< ClientImpl > impl_
定义 node.h:558
virtual bool init()
定义 node-inl.h:39
bool is_support_loan_
定义 node.h:562
virtual bool deinit()
定义 node-inl.h:57
TypeT get_default_value()
定义 node-inl.h:356
void enable_security()
定义 node-inl.h:332
Type-safe method-model client (caller side) for VLink RPC.
RAII guard that automatically calls CpuProfiler::begin() and CpuProfiler::end().
Global singleton logger with three output styles and pluggable backends.
#define VLOG_F(...)
定义 logger.h:856
#define VLOG_W(...)
定义 logger.h:852
#define VLOG_T(...)
定义 logger.h:846
#define VUNLIKELY(...)
Shorthand alias for VLINK_UNLIKELY. Hints that the expression is unlikely true.
定义 macros.h:302
#define VLIKELY(...)
Shorthand alias for VLINK_LIKELY. Hints that the expression is likely true.
定义 macros.h:297
bool deserialize(const Bytes &src, T &des, TransportType transport)
Deserializes src bytes into des with explicit type and transport hints.
定义 serializer-inl.h:488
size_t get_serialized_size(const T &src) noexcept
Returns the exact serialised byte size for a given src value.
定义 serializer-inl.h:295
constexpr bool is_cdr_type() noexcept
Returns true if T is a FastDDS CDR-serialisable type.
定义 serializer-inl.h:648
std::string get_serialized_type() noexcept
Returns the serialisation type name string for T with explicit TypeT.
定义 serializer-inl.h:229
bool serialize(const T &src, Bytes &des, TransportType transport, uint8_t offset)
Serializes src into des with explicit type and transport hints.
定义 serializer-inl.h:355
InitType
Controls whether a node is initialised immediately at construction.
定义 types.h:132
@ kWithInit
Initialise immediately in the constructor.
定义 types.h:134
@ kUnknown
Decode category is not known.
定义 types.h:185
@ kClientResponse
RPC response received by a Client node.
定义 types.h:165
@ kClientRequest
RPC request sent by a Client node.
定义 types.h:164
@ kWithSecurity
Encrypted and authenticated transport.
定义 types.h:150
Compile-time type-detection and serialisation utilities for VLink messages.
static constexpr std::chrono::milliseconds kInfinite
Wait indefinitely (negative timeout).
定义 types.h:203
URL-based Conf dispatcher that routes to the correct transport backend.
定义 url.h:161
URL-based transport configuration dispatcher for VLink nodes.