方法模型是 VLink 三种通信模型之一,对应 RPC(远程过程调用)语义:Client 发送请求, Server 处理后返回响应。方法模型支持**多个 Client 对一个 Server**(N:1)的请求-响应通信,同时也支持无需响应的 fire-and-forget 单向模式。 每次请求/响应是一对一配对关系,并支持超时控制。Node 基类的通用 API(init / deinit / attach / set_property 等)请参阅 节点基类与生命周期。
目录
- 概念与架构
- Client API
- Server API
- 五种调用模式详解
- 超时处理
- 错误处理
- wait_for_connected 用法
- 完整使用示例
- 并发调用场景
- 模型选择
概念与架构
方法模型数据流
方法模型数据流
关键特性
- **N:1**:多个 Client 可连接同一个 Server,每个请求对应一个响应
- **类型安全**:请求类型 ReqT 和响应类型 RespT 在编译时固定
- **多种调用模式**:同步阻塞、optional 返回、异步回调、future 异步
- **fire-and-forget**:RespT 省略时(默认为 EmptyType)只发不收
- **超时控制**:所有阻塞调用均支持超时,默认使用 Timeout::kDefaultInterval
- **连接感知**:Client 可感知 Server 的上线/下线状态
与其他模型的关系
三种通信模型对比
Client API
类模板声明
template <typename ReqT,
typename RespT = Traits::EmptyType,
SecurityType SecT = SecurityType::kWithoutSecurity>
class Client : public Node<ClientImpl, SecT>;
当 RespT 为默认的 Traits::EmptyType 时,Client 仅发送请求,不等待响应, 即 fire-and-forget 模式,此时只有 send() 方法可用。
编译期常量与类型别名
using UniquePtr = std::unique_ptr<Client<ReqT, RespT, SecT>>;
using SharedPtr = std::shared_ptr<Client<ReqT, RespT, SecT>>;
using ConnectCallback = NodeImpl::ConnectCallback;
using RespCallback = std::function<void(const RespT&)>;
static constexpr ImplType kImplType = kClient;
static constexpr bool kHasResp = !std::is_same_v<RespT, Traits::EmptyType>;
static constexpr Serializer::Type kReqType = Serializer::get_type_of<ReqT>();
static constexpr Serializer::Type kRespType = Serializer::get_type_of<RespT>();
工厂方法
[[nodiscard]] static UniquePtr create_unique(const std::string& url_str,
InitType type = InitType::kWithInit);
[[nodiscard]] static SharedPtr create_shared(const std::string& url_str,
InitType type = InitType::kWithInit);
构造函数
explicit Client(const std::string& url_str,
InitType type = InitType::kWithInit);
template <typename ConfT>
explicit Client(const ConfT& conf,
InitType type = InitType::kWithInit);
连接感知
void detect_connected(ConnectCallback&& callback);
bool wait_for_connected(std::chrono::milliseconds timeout = Timeout::kDefaultInterval);
[[nodiscard]] bool is_connected() const;
调用方法
[[nodiscard]] bool invoke(const ReqT& req, RespT& resp,
std::chrono::milliseconds timeout = Timeout::kDefaultInterval);
[[nodiscard]] std::optional<RespT> invoke(const ReqT& req,
std::chrono::milliseconds timeout = Timeout::kDefaultInterval);
bool invoke(const ReqT& req, RespCallback&& callback);
[[nodiscard]] std::future<RespT> async_invoke(const ReqT& req);
bool send(const ReqT& req);
继承自 Node 的公共 API
Node 基类继承的公共 API(init / deinit / attach / interrupt / set_security_key 等)请参阅 节点基类与生命周期。
Server API
类模板声明
template <typename ReqT,
typename RespT = Traits::EmptyType,
SecurityType SecT = SecurityType::kWithoutSecurity>
class Server : public Node<ServerImpl, SecT>;
回调类型定义
using ReqCallback = std::function<void(const ReqT&)>;
using ReqRespCallback = std::function<void(const ReqT&, RespT&)>;
using ReqAsyncRespCallback = std::function<void(uint64_t req_id, const ReqT&)>;
工厂方法与构造函数
[[nodiscard]] static UniquePtr create_unique(const std::string& url_str,
InitType type = InitType::kWithInit);
[[nodiscard]] static SharedPtr create_shared(const std::string& url_str,
InitType type = InitType::kWithInit);
explicit Server(const std::string& url_str,
InitType type = InitType::kWithInit);
template <typename ConfT>
explicit Server(const ConfT& conf,
InitType type = InitType::kWithInit);
监听方法
bool listen(ReqCallback&& callback);
bool listen(ReqRespCallback&& callback);
bool listen_for_reply(ReqAsyncRespCallback&& callback);
注意:listen() 和 listen_for_reply() 只能调用一次,重复调用是 fatal error。
异步响应发送
bool reply(uint64_t req_id, const RespT& resp);
安全别名
template <typename ReqT, typename RespT = Traits::EmptyType>
class SecurityServer : public Server<ReqT, RespT, SecurityType::kWithSecurity>;
template <typename ReqT, typename RespT = Traits::EmptyType>
class SecurityClient : public Client<ReqT, RespT, SecurityType::kWithSecurity>;
五种调用模式详解
模式对比总览
| 模式 | 方法签名 | 是否阻塞 | 超时支持 | 适用场景 |
| 同步(输出参数) | invoke(req, resp&, timeout) -> bool | 是 | 是 | 简单同步调用,结果明确 |
| 同步(optional) | invoke(req, timeout) -> optional<Resp> | 是 | 是 | 链式调用,无需声明临时变量 |
| 异步(回调) | invoke(req, RespCallback) | 否 | 否 | 事件驱动架构,不阻塞主线程 |
| 异步(future) | async_invoke(req) -> future<Resp> | 否 | 可用 future.wait_for | 并发调用,统一等待多个结果 |
| 仅发送 | send(req) -> bool | 否 | 否 | fire-and-forget,无需响应 |
模式一:同步调用(输出参数)
Client<Req, Resp> client("dds://my_service");
client.wait_for_connected();
Req req;
req.set_param(42);
Resp resp;
bool ok = client.invoke(req, resp, std::chrono::seconds(3));
if (ok) {
std::cout << "result: " << resp.result() << std::endl;
} else {
std::cerr << "invoke timeout or failed" << std::endl;
}
模式二:同步调用(optional 返回)
if (auto r = client.invoke(req, std::chrono::seconds(3))) {
std::cout << "result: " << r->result() << std::endl;
} else {
std::cerr << "invoke timeout or failed" << std::endl;
}
模式三:异步调用(回调)
bool ok = client.invoke(req, [](const Resp& resp) {
std::cout << "async result: " << resp.result() << std::endl;
});
if (!ok) {
std::cerr << "failed to send request" << std::endl;
}
模式四:异步调用(future)
auto future = client.async_invoke(req);
do_other_work();
if (future.wait_for(std::chrono::seconds(3)) == std::future_status::ready) {
try {
Resp resp = future.get();
std::cout << "result: " << resp.result() << std::endl;
} catch (const std::exception& e) {
std::cerr << "error: " << e.what() << std::endl;
}
} else {
std::cerr << "future timeout" << std::endl;
}
模式五:仅发送(fire-and-forget)
当 RespT 为 Traits::EmptyType(默认值)时,Client 仅发送请求,不等待任何响应。 此模式通过 send() 方法调用。
Client<Req> client("dds://my_notification");
client.wait_for_connected();
Req req;
req.set_event_type(1);
bool ok = client.send(req);
if (!ok) {
std::cerr << "failed to send request" << std::endl;
}
**注意**:当 Client 声明了 RespT(非 EmptyType)时,send() 方法不可用, 编译器会报错。反之,fire-and-forget 模式下 invoke() 和 async_invoke() 不可用。
超时处理
超时默认值
VLink 在 include/vlink/impl/types.h 中定义两个 std::chrono::milliseconds 常量(struct Timeout):
- Timeout::kDefaultInterval = 5'000ms(5 秒)—— 所有阻塞方法的默认值。
- Timeout::kInfinite = -1ms —— 负值表示无限等待。
源码中 timeout == 0 会打印警告并按无限等待处理,应避免传 0。
超时单位
所有超时参数均为 std::chrono::milliseconds,推荐使用字面量:
using namespace std::chrono_literals;
client.wait_for_connected(5s);
client.invoke(req, resp, 500ms);
client.invoke(req, resp, 3000ms);
client.invoke(req, resp, std::chrono::milliseconds(1000));
超时处理最佳实践
if (!client.wait_for_connected(10s)) {
VLOG_W(
"Server not available within 10s, aborting.");
return -1;
}
Resp resp;
if (!client.invoke(req, resp, 3s)) {
VLOG_W(
"Invoke timed out after 3s.");
return -1;
}
#define VLOG_W(...)
定义 logger.h:852
中断阻塞等待
可从其他线程调用 interrupt() 立即中断所有阻塞等待:
Client<Req, Resp> client("dds://my_service");
std::thread t([&client]() {
std::this_thread::sleep_for(2s);
client.interrupt();
});
bool ok = client.wait_for_connected(30s);
错误处理
invoke() 返回 false 的原因
| 原因 | 说明 |
| 请求序列化失败 | 消息数据无效或序列化器返回错误 |
| 传输层发送失败 | 底层 IPC/DDS/SHM 写入失败 |
| 响应超时 | Server 未在超时时间内返回响应 |
| 响应反序列化失败 | Server 返回的字节流无法解析为 RespT |
| 节点未初始化 | 在 init() 前调用(fatal log 会打印) |
| Server 已断开 | Server 在请求发出后下线 |
async_invoke() 的异常处理
async_invoke() 失败时不返回 false,而是在 future 中设置异常:
auto future = client.async_invoke(req);
try {
Resp resp = future.get();
process(resp);
std::cerr << "async_invoke failed: " << e.what() << std::endl;
} catch (const std::exception& e) {
std::cerr << "unexpected error: " << e.what() << std::endl;
}
Indicates a general runtime failure.
定义 exception.h:86
服务端错误处理
Server 的回调可以通过不填充 resp(或填充错误码)来表达处理失败:
Server<Req, Resp> server("dds://my_service");
server.listen([](const Req& req, Resp& resp) {
if (!req.is_valid()) {
resp.set_error_code(-1);
resp.set_error_msg("invalid request");
return;
}
resp.set_result(compute(req));
resp.set_error_code(0);
});
wait_for_connected 用法
在发起调用前通常需要等待 Server 上线,有三种方式:
方式一:阻塞等待(最简单)
Client<Req, Resp> client("dds://my_service");
if (!client.wait_for_connected(10s)) {
std::cerr << "Server did not start within 10s." << std::endl;
return -1;
}
client.invoke(req, resp);
方式二:非阻塞检查
Client<Req, Resp> client("dds://my_service");
while (!client.is_connected()) {
std::this_thread::sleep_for(100ms);
if (should_exit) {
return -1;
}
}
client.invoke(req, resp);
方式三:事件回调(推荐用于服务发现)
Client<Req, Resp> client("dds://my_service");
client.detect_connected([&client](bool connected) {
if (connected) {
std::cout << "Server is online, can invoke now." << std::endl;
Req req;
Resp resp;
client.invoke(req, resp);
} else {
std::cout << "Server went offline." << std::endl;
}
});
配合 MessageLoop 的正确用法
MessageLoop loop;
Client<Req, Resp> client("dds://my_service");
client.attach(&loop);
client.detect_connected([&client](bool connected) {
if (connected) {
do_something_with_client(client);
}
});
loop.run();
完整使用示例
示例一:helloworld(Protobuf 同步 RPC)
这是一个典型的加法服务,来自 VLink 自带的 helloworld 示例:
#include "helloworld.pb.h"
int main() {
server.listen([](const Helloworld::Request& req, Helloworld::Response& resp) {
int sum = req.left() + req.right();
resp.set_sum(sum);
printf("[Server] %d + %d = %d\n", req.left(), req.right(), sum);
});
return 0;
}
Single-threaded serial task dispatcher with integrated timer support.
定义 message_loop.h:106
bool run()
Runs the message loop on the calling thread (blocking).
bool quit(bool force=false)
Requests the loop to exit cleanly.
Type-safe server for the VLink method (RPC) communication model.
定义 server.h:108
Single-threaded event loop with three queue types, timer management and task scheduling.
VLINK_EXPORT void register_terminate_signal(std::function< void(int)> &&callback, bool is_async=false, bool pass_through=false) noexcept
Registers a callback for graceful termination signals (SIGTERM, SIGINT, etc.).
Platform-agnostic system utilities for process, thread, network and signal management.
#include "helloworld.pb.h"
using namespace std::chrono_literals;
int main() {
if (!client.wait_for_connected(5s)) {
printf("[Client] Server not ready.\n");
return -1;
}
Helloworld::Request req;
req.set_left(10);
req.set_right(32);
Helloworld::Response resp;
if (!client.invoke(req, resp, 3s)) {
printf("[Client] Invoke failed (timeout).\n");
return -1;
}
printf("[Client] 10 + 32 = %d\n", resp.sum());
return 0;
}
Type-safe client for the VLink method (RPC) communication model.
定义 client.h:123
示例二:异步服务器(listen_for_reply)
适合处理耗时任务(如文件读写、数据库查询)时将响应推迟到任务完成后发送:
#include <thread>
#include <queue>
#include <mutex>
#include "task.pb.h"
struct PendingTask {
uint64_t req_id;
Task::Request request;
};
std::queue<PendingTask> task_queue;
std::mutex queue_mutex;
void worker_thread() {
while (true) {
PendingTask task;
{
std::lock_guard lock(queue_mutex);
if (task_queue.empty()) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
continue;
}
task = task_queue.front();
task_queue.pop();
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
Task::Response resp;
resp.set_result("processed: " + task.request.data());
resp.set_ok(true);
g_server->
reply(task.req_id, resp);
printf("[Worker] replied to req_id=%lu\n", task.req_id);
}
}
int main() {
g_server = &server;
std::lock_guard lock(queue_mutex);
task_queue.push({req_id, req});
printf("[Server] queued req_id=%lu\n", req_id);
});
std::thread worker(worker_thread);
std::this_thread::sleep_for(std::chrono::seconds(60));
return 0;
}
bool reply(uint64_t req_id, const RespT &resp)
Sends an asynchronous response for a previously received request.
定义 server-inl.h:229
bool listen_for_reply(ReqAsyncRespCallback &&callback)
Registers an asynchronous request callback (reply sent later).
定义 server-inl.h:201
示例三:fire-and-forget(无响应 RPC)
适合单向通知类场景,Client 不需要等待任何确认:
#include "notify.pb.h"
server.listen([](const Notify::Event& evt) {
printf("[Server] received event: type=%d msg=%s\n",
evt.type(), evt.message().c_str());
});
client.wait_for_connected(5s);
Notify::Event evt;
evt.set_type(1);
evt.set_message("system started");
bool ok = client.send(evt);
printf("[Client] send %s\n", ok ? "ok" : "failed");
示例四:并发 future 调用
#include <vector>
#include <future>
#include "math.pb.h"
using namespace std::chrono_literals;
int main() {
client.wait_for_connected(5s);
std::vector<std::future<Math::Response>> futures;
for (int i = 0; i < 10; ++i) {
Math::Request req;
req.set_value(i);
futures.push_back(client.async_invoke(req));
}
for (int i = 0; i < 10; ++i) {
try {
if (futures[i].wait_for(3s) == std::future_status::ready) {
Math::Response resp = futures[i].get();
printf("[Client] result[%d] = %d\n", i, resp.result());
} else {
printf("[Client] request[%d] timed out\n", i);
}
} catch (const std::exception& e) {
printf("[Client] request[%d] failed: %s\n", i, e.what());
}
}
return 0;
}
示例五:安全 RPC
server.set_security_key("shared-secret-key");
server.listen([](const Auth::Request& req, Auth::Response& resp) {
resp.set_token("valid-token-" + req.username());
});
client.set_security_key("shared-secret-key");
Convenience alias for Client with message security enabled.
定义 client.h:324
Convenience alias for Server with message security enabled.
定义 server.h:263
完整安全加密配置请参阅 安全加密。
示例六:SOME/IP 服务(车载场景)(Beta)
**注意**:someip:// 为 Beta 后端,API 可能变化。生产环境推荐使用 dds:// 或 ddsc://。
using namespace std::chrono_literals;
server.listen([](const Speed::Request& req, Speed::Response& resp) {
resp.set_speed_kmh(120.5);
});
client.wait_for_connected(5s);
Speed::Request req;
if (auto r = client.invoke(req, 1s)) {
printf("speed: %.1f km/h\n", r->speed_kmh());
}
Transport configuration for the someip:// SOME/IP (vsomeip) backend.
并发调用场景
Client 的线程安全性
同一个 Client 对象可以从多个线程并发调用,VLink 内部使用互斥锁保护 future 映射:
client.wait_for_connected(5s);
auto thread_func = [&client](int id) {
Req req;
req.set_id(id);
Resp resp;
if (client.invoke(req, resp, 3s)) {
printf("[Thread %d] result=%d\n", id, resp.result());
}
};
std::thread t1(thread_func, 1);
std::thread t2(thread_func, 2);
std::thread t3(thread_func, 3);
t1.join();
t2.join();
t3.join();
高并发推荐使用 async_invoke
std::vector<std::future<Resp>> futures;
for (auto& req : batch_requests) {
futures.push_back(client.async_invoke(req));
}
for (auto& f : futures) {
Resp resp = f.get();
process(resp);
}
Server 的回调线程模型
Server 的回调默认在传输线程上执行。若有共享状态,需要加锁或绑定 MessageLoop:
std::mutex state_mutex;
int shared_counter = 0;
server.listen([&](const Req& req, Resp& resp) {
std::lock_guard lock(state_mutex);
shared_counter++;
resp.set_count(shared_counter);
});
server.attach(&loop);
server.listen([&](const Req& req, Resp& resp) {
shared_counter++;
resp.set_count(shared_counter);
});
模型选择
- 通知多个接收方、不需要确认 -> Event 模型
- 查询结果 / 触发操作并确认 -> Method 模型
- 最新值同步(类似属性/寄存器语义)-> Field 模型
三种模型的完整对比表请见 Event 模型 第 1 节。
相关文档