VLink 2.0.0
A high-performance communication middleware
Loading...
Searching...
No Matches
ack_manager.h
Go to the documentation of this file.
1/*
2 * Copyright (C) 2026 by Thun Lu. All rights reserved.
3 * Author: Thun Lu <thun.lu@zohomail.cn>
4 * Repo: https://github.com/thun-res/vlink
5 * _ __ __ _ __
6 * | | / / / / (_) ____ / /__
7 * | | / / / / / / / __ \ / //_/
8 * | |/ / / /___ / / / / / / / ,<
9 * |___/ /_____/ /_/ /_/ /_/ /_/|_|
10 *
11 * Licensed under the Apache License, Version 2.0 (the "License");
12 * you may not use this file except in compliance with the License.
13 * You may obtain a copy of the License at
14 *
15 * http://www.apache.org/licenses/LICENSE-2.0
16 *
17 * Unless required by applicable law or agreed to in writing, software
18 * distributed under the License is distributed on an "AS IS" BASIS,
19 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20 * See the License for the specific language governing permissions and
21 * limitations under the License.
22 */
23
24/**
25 * @file ack_manager.h
26 * @brief Request/acknowledgement synchronisation manager for blocking RPC calls.
27 *
28 * @details
29 * @c AckManager provides the blocking/notify mechanism used by the VLink method
30 * model (@c Client / @c Server) to implement synchronous request/response round-trips
31 * across transports. A caller creates a @c Request token, invokes @c process()
32 * to send the request and wait for its acknowledgement, and either the transport
33 * callback calls @c notify() on the same request or the caller can cancel it via
34 * @c remove().
35 *
36 * @par Lifecycle
37 * @code
38 * Thread A (caller) Thread B (transport callback)
39 *
40 * request = create_request()
41 * process(request, timeout_ms, send_fn)
42 * -> send_fn() publishes the request
43 * -> blocks on condition variable
44 * notify(request, fill_response_fn)
45 * -> erase request from set
46 * -> call fill_response_fn()
47 * -> notify_one on the cv
48 * -> wakes up, returns true
49 * @endcode
50 *
51 * @par Interruption
52 * @c clear() marks the manager as interrupted and wakes all waiting @c process()
53 * callers, causing them to return @c false. This is used during shutdown to
54 * unblock any in-flight RPC calls.
55 *
56 * @par Thread Safety
57 * All public methods are thread-safe. @c process() may be called from multiple
58 * threads simultaneously; each call tracks its own @c RequestPtr.
59 */
60
61#pragma once
62
63#include <functional>
64#include <memory>
65#include <mutex>
66#include <set>
67
69#include "../base/macros.h"
70
71namespace vlink {
72
73/**
74 * @class AckManager
75 * @brief Thread-safe request/acknowledgement synchronisation manager.
76 *
77 * @details
78 * Manages a set of in-flight requests, each represented by a @c RequestPtr.
79 * Used internally by @c ClientImpl implementations to implement blocking
80 * @c call() semantics.
81 */
83 private:
84 struct Request;
85
86 public:
87 /**
88 * @brief Callback invoked by @c process() to send the request over the transport.
89 *
90 * @details
91 * Called while the request is already registered in the pending set.
92 * Should return @c false if the send fails (e.g. no server connected),
93 * causing @c process() to remove the request and return @c false immediately.
94 */
95 using ProcessCallback = std::function<bool()>;
96
97 /**
98 * @brief Optional callback invoked inside @c notify() while holding the request lock.
99 *
100 * @details
101 * Provides the responder a chance to fill the response buffer before the
102 * waiting @c process() thread is woken. May be @c nullptr.
103 */
104 using NotifyCallback = std::function<void()>;
105
106 /**
107 * @brief Shared ownership handle for an in-flight request token.
108 *
109 * @details
110 * Returned by @c create_request() and passed to @c process(), @c notify(),
111 * and @c remove(). Requests are ordered by a monotonic sequence number.
112 */
113 using RequestPtr = std::shared_ptr<Request>;
114
115 /**
116 * @brief Default constructor.
117 */
118 AckManager() noexcept;
119
120 /**
121 * @brief Destructor.
122 */
123 ~AckManager() noexcept;
124
125 /**
126 * @brief Allocates a new in-flight request token with a unique sequence number.
127 *
128 * @details
129 * The returned @c RequestPtr must be passed to @c process() to register
130 * the request and block until the corresponding @c notify() call.
131 *
132 * @return A new @c RequestPtr with a monotonically increasing sequence number.
133 */
134 [[nodiscard]] RequestPtr create_request() noexcept;
135
136 /**
137 * @brief Registers the request, invokes the send callback, and blocks until acknowledged.
138 *
139 * @details
140 * Steps:
141 * -# Adds @p request to the pending set (returns @c false immediately if the
142 * manager is interrupted).
143 * -# Calls @c process_callback(); if it returns @c false the request is removed
144 * and @c process() returns @c false.
145 * -# Blocks on a per-request condition variable:
146 * - If @p ms < 0: waits indefinitely.
147 * - If @p ms >= 0: waits for at most @p ms milliseconds.
148 * -# Returns @c true if woken by @c notify(); @c false on timeout or interruption.
149 *
150 * @param request Token returned by @c create_request().
151 * @param ms Wait timeout in milliseconds; negative = infinite.
152 * @param process_callback Callable that sends the request; returns @c false to abort.
153 * @return @c true if notified successfully; @c false on failure or timeout.
154 */
155 [[nodiscard]] bool process(RequestPtr request, int ms, ProcessCallback&& process_callback) noexcept;
156
157 /**
158 * @brief Acknowledges a pending request and optionally fills the response.
159 *
160 * @details
161 * Removes @p request from the pending set, calls @p notify_callback (if set)
162 * while holding the request lock, then signals the condition variable to wake
163 * the blocked @c process() call.
164 *
165 * Returns @c false when @p request is not found in the pending set (e.g. already
166 * timed out or removed).
167 *
168 * @param request Token of the request to acknowledge.
169 * @param notify_callback Optional callback to fill the response before waking the caller.
170 * @return @c true if the request was found and notified; @c false otherwise.
171 */
172 bool notify(RequestPtr request, NotifyCallback&& notify_callback = nullptr) noexcept;
173
174 /**
175 * @brief Removes a pending request without notifying the waiting caller.
176 *
177 * @details
178 * Use this to cancel a request before it is acknowledged, e.g. when the
179 * transport determines the request cannot be delivered.
180 *
181 * @param request Token to remove.
182 * @return @c true if the request was found and erased; @c false otherwise.
183 */
184 bool remove(RequestPtr request) noexcept;
185
186 /**
187 * @brief Interrupts all pending requests and wakes all blocked @c process() calls.
188 *
189 * @details
190 * Sets the interrupted flag so that new @c process() calls return @c false
191 * immediately, swaps out the pending set, and @c notify_all() on every
192 * pending request's condition variable. Called during node shutdown to
193 * avoid deadlocks.
194 */
195 void clear() noexcept;
196
197 private:
198 struct Request final {
199 int64_t seq{0};
200 std::mutex mtx;
202
203 struct Compare final {
204 bool operator()(const RequestPtr& left, const RequestPtr& right) const noexcept {
205 if (!left || !right) {
206 return left < right;
207 }
208
209 return left->seq < right->seq;
210 }
211 };
212 };
213
214 bool is_interrupted_{false};
215 int64_t request_seq_{0};
216 mutable std::mutex mtx_;
217 std::set<RequestPtr, Request::Compare> request_set_;
218
220};
221
222} // namespace vlink
POSIX monotonic-clock condition variable replacing std::condition_variable.
Platform-independent macro definitions for the VLink library.
#define VLINK_EXPORT
Definition macros.h:85
#define VLINK_DISALLOW_COPY_AND_ASSIGN(classname)
Deletes the copy constructor and copy-assignment operator of classname.
Definition macros.h:184