VLink 2.0.0
A high-performance communication middleware
载入中...
搜索中...
未找到
message_loop.h
浏览该文件的文档.
1/*
2 * Copyright (C) 2026 by Thun Lu. All rights reserved.
3 * Author: Thun Lu <thun.lu@zohomail.cn>
4 * Repo: https://github.com/thun-res/vlink
5 * _ __ __ _ __
6 * | | / / / / (_) ____ / /__
7 * | | / / / / / / / __ \ / //_/
8 * | |/ / / /___ / / / / / / / ,<
9 * |___/ /_____/ /_/ /_/ /_/ /_/|_|
10 *
11 * Licensed under the Apache License, Version 2.0 (the "License");
12 * you may not use this file except in compliance with the License.
13 * You may obtain a copy of the License at
14 *
15 * http://www.apache.org/licenses/LICENSE-2.0
16 *
17 * Unless required by applicable law or agreed to in writing, software
18 * distributed under the License is distributed on an "AS IS" BASIS,
19 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20 * See the License for the specific language governing permissions and
21 * limitations under the License.
22 */
23
24/**
25 * @file message_loop.h
26 * @brief Single-threaded event loop with three queue types, timer management and task scheduling.
27 *
28 * @details
29 * @c MessageLoop is the primary task dispatcher in VLink. It owns a task queue and an
30 * associated timer registry. Tasks posted via @c post_task() are executed serially on
31 * the loop's thread. Timers registered with @c Timer::attach() fire their callbacks as
32 * regular queue tasks.
33 *
34 * Queue types:
35 *
36 * | Type | Queue implementation | Max tasks | Notes |
37 * | ----------------- | ------------------------------ | ---------------- | ------------------------------ |
38 * | @c kNormalType | Mutex-protected std::queue | 10000 | Default; no priority support |
39 * | @c kLockfreeType | MpmcQueue (lock-free MPMC) | 10000 | Fastest single-producer path |
40 * | @c kPriorityType | Priority queue | 10000 | Supports task priority levels |
41 *
42 * Dispatch strategies:
43 *
44 * | Strategy | Behaviour |
45 * | --------------------- | ----------------------------------------------------------------------- |
46 * | @c kOptimizationStrategy | Yields CPU when queue is empty; balances latency vs CPU usage |
47 * | @c kPopStrategy | Busy-waits by repeatedly polling the queue (lowest latency) |
48 * | @c kBlockStrategy | Blocks on a condition variable when the queue is empty (lowest CPU) |
49 *
50 * Run modes:
51 * - @c run() -- blocks the calling thread until @c quit() is called.
52 * - @c async_run() -- launches a new background thread and returns immediately.
53 * - @c spin() -- calls @c spin_once() in a loop; suitable for use in an existing event loop.
54 * - @c spin_once() -- processes one batch of pending tasks (optionally blocking).
55 *
56 * Task execution with scheduling:
57 * @c exec_task() wraps a callback in a @c Schedule::Config (delay, priority, timeouts) and
58 * posts it. It returns a @c Schedule::Status or @c Schedule::RetStatus that can be chained
59 * with @c on_then / @c on_else / @c on_catch / @c on_schedule_timeout callbacks.
60 *
61 * @note
62 * - Maximum task queue depth is 10000 (@c kMaxTaskSize); posts beyond this fail silently.
63 * - Maximum active timer count is 100 (@c kMaxTimerSize).
64 * - @c invoke_task() dispatches a callable and returns a @c std::future for the result.
65 * Blocking on the future from the same thread as the loop will deadlock.
66 *
67 * @par Example
68 * @code
69 * vlink::MessageLoop loop;
70 * loop.async_run();
71 *
72 * loop.post_task([] { do_work(); });
73 *
74 * // Blocking invoke from another thread:
75 * auto fut = loop.invoke_task([]() -> int { return compute(); });
76 * int result = fut.get(); // waits for the loop to process the task
77 *
78 * loop.quit();
79 * loop.wait_for_quit();
80 * @endcode
81 */
82
83#pragma once
84
85#include <functional>
86#include <future>
87#include <limits>
88#include <memory>
89#include <string>
90#include <type_traits>
91#include <utility>
92
93#include "./schedule.h"
94#include "./timer.h"
95
96namespace vlink {
97
98/**
99 * @class MessageLoop
100 * @brief Single-threaded serial task dispatcher with integrated timer support.
101 *
102 * @details
103 * All tasks and timer callbacks run on a single thread. Thread-safe posting of
104 * tasks is allowed from any thread via @c post_task().
105 */
107 public:
108 /**
109 * @brief Callback type for tasks and event handlers.
110 */
111 using Callback = std::function<void()>;
112
113 /**
114 * @brief Queue implementation type.
115 *
116 * @details
117 * Selects the internal task queue algorithm. See class documentation for a comparison table.
118 */
119 enum Type : uint8_t {
120 kNormalType = 0, ///< Mutex-protected FIFO queue (default)
121 kLockfreeType = 1, ///< Lock-free MPMC queue
122 kPriorityType = 2, ///< Priority-ordered queue
123 };
124
125 /**
126 * @brief Idle strategy controlling CPU and latency trade-offs.
127 *
128 * @details
129 * See class documentation for a comparison table.
130 */
131 enum Strategy : uint8_t {
132 kOptimizationStrategy = 0, ///< Balance latency and CPU via yield
133 kPopStrategy = 1, ///< Busy-poll (lowest latency, highest CPU)
134 kBlockStrategy = 2, ///< Block on condition variable (lowest CPU)
135 };
136
137 /**
138 * @brief Pre-defined task priority levels for @c kPriorityType loops.
139 *
140 * @details
141 * Higher numeric values are dispatched first. Custom priority values may be used
142 * between @c kLowestPriority and @c kHighestPriority.
143 */
144 enum Priority : uint16_t {
145 kNoPriority = 0, ///< No priority (FIFO order)
146 kLowestPriority = 1, ///< Lowest real priority
147 kTimerPriority = 50, ///< Used internally for timer callbacks
148 kNormalPriority = 100, ///< Default task priority
149 kHighestPriority = std::numeric_limits<uint16_t>::max() ///< Highest available priority
150 };
151
152 /**
153 * @brief Constructs a @c MessageLoop with @c kNormalType queue.
154 */
156
157 /**
158 * @brief Constructs a @c MessageLoop with the specified queue type.
159 *
160 * @param type Queue implementation to use.
161 */
162 explicit MessageLoop(Type type);
163
164 /**
165 * @brief Destructor. Calls @c quit(true) and waits for the background thread (if any).
166 */
167 virtual ~MessageLoop();
168
169 /**
170 * @brief Sets a human-readable name for this loop (visible in profiling tools).
171 *
172 * @param name Name string.
173 */
174 void set_name(const std::string& name);
175
176 /**
177 * @brief Returns the name set via @c set_name().
178 *
179 * @return Reference to the name string.
180 */
181 [[nodiscard]] const std::string& get_name() const;
182
183 /**
184 * @brief Returns the queue type this loop was constructed with.
185 *
186 * @return Queue type.
187 */
188 [[nodiscard]] Type get_type() const;
189
190 /**
191 * @brief Returns the current idle dispatch strategy.
192 *
193 * @return Current strategy.
194 */
195 [[nodiscard]] Strategy get_strategy() const;
196
197 /**
198 * @brief Changes the idle dispatch strategy.
199 *
200 * @param strategy New strategy. Takes effect on the next idle cycle.
201 */
202 void set_strategy(Strategy strategy);
203
204 /**
205 * @brief Registers a callback invoked once when the loop thread starts.
206 *
207 * @param callback Called from the loop thread before the first task is processed.
208 */
210
211 /**
212 * @brief Registers a callback invoked once when the loop thread exits.
213 *
214 * @param callback Called from the loop thread after the last task has been processed.
215 */
217
218 /**
219 * @brief Registers a callback invoked each time the task queue becomes empty.
220 *
221 * @param callback Called from the loop thread on each idle cycle.
222 */
224
225 /**
226 * @brief Runs the message loop on the calling thread (blocking).
227 *
228 * @details
229 * Processes tasks and fires timers until @c quit() is called.
230 *
231 * @return @c true if the loop ran and exited normally; @c false if already running.
232 */
233 bool run();
234
235 /**
236 * @brief Starts the message loop on a new background thread (non-blocking).
237 *
238 * @details
239 * Returns immediately. The background thread runs until @c quit() is called.
240 *
241 * @return @c true if the thread was started; @c false if already running.
242 */
243 bool async_run();
244
245 /**
246 * @brief Runs the loop continuously in a spin mode (blocking; no background thread).
247 *
248 * @details
249 * Calls @c spin_once() repeatedly until @c quit() is called.
250 *
251 * @return @c true on normal exit.
252 */
253 bool spin();
254
255 /**
256 * @brief Processes one batch of pending tasks and timers.
257 *
258 * @details
259 * Intended for integration into an existing event loop.
260 *
261 * @param block If @c true and the queue is empty, blocks until a task arrives.
262 * If @c false, returns immediately if the queue is empty. Default: @c true.
263 * @return @c true if at least one task was processed; @c false if the loop should quit.
264 */
265 bool spin_once(bool block = true);
266
267 /**
268 * @brief Requests the loop to exit cleanly.
269 *
270 * @details
271 * Signals the loop to stop after finishing the current task. If @p force is @c true,
272 * remaining queued tasks are discarded.
273 *
274 * @param force If @c true, discard pending tasks. Default: @c false.
275 * @return @c true on success.
276 */
277 bool quit(bool force = false);
278
279 /**
280 * @brief Waits until the task queue is drained.
281 *
282 * @param ms Maximum wait time in milliseconds. @c Timer::kInfinite for unlimited. Default: @c kInfinite.
283 * @param check If @c true, also verify the loop is in the idle state. Default: @c true.
284 * @return @c true if the queue drained within the timeout.
285 */
286 bool wait_for_idle(int ms = Timer::kInfinite, bool check = true);
287
288 /**
289 * @brief Waits until the loop has fully exited (after @c quit() was called).
290 *
291 * @param ms Maximum wait time in milliseconds. @c Timer::kInfinite for unlimited. Default: @c kInfinite.
292 * @param check If @c true, also verify the loop thread has joined. Default: @c true.
293 * @return @c true if the loop exited within the timeout.
294 */
295 bool wait_for_quit(int ms = Timer::kInfinite, bool check = true);
296
297 /**
298 * @brief Posts a task to the queue for execution on the loop thread.
299 *
300 * @details
301 * Thread-safe. Returns @c false if the loop is shutting down or the queue is full (@c kMaxTaskSize).
302 *
303 * @param callback Task to execute.
304 * @return @c true if the task was successfully enqueued.
305 */
306 bool post_task(Callback&& callback);
307
308 /**
309 * @brief Posts a task with an explicit priority (requires @c kPriorityType loop).
310 *
311 * @details
312 * Tasks with higher priority values are dispatched before lower-priority tasks.
313 * For @c kNormalType and @c kLockfreeType loops, priority is ignored and the task is
314 * enqueued in FIFO order.
315 *
316 * @param callback Task to execute.
317 * @param priority Dispatch priority.
318 * @return @c true if enqueued successfully.
319 */
320 bool post_task_with_priority(Callback&& callback, uint16_t priority);
321
322 /**
323 * @brief Posts a scheduled task and returns a @c Schedule::Status for chaining callbacks.
324 *
325 * @details
326 * This overload is for callbacks returning @c void.
327 * The @c Schedule::Config can specify a delay, priority, schedule timeout and execution timeout.
328 * Chain @c on_schedule_timeout, @c on_execution_timeout or @c on_catch on the returned status.
329 *
330 * @tparam CallbackT Callable type returning @c void.
331 * @param config Scheduling configuration.
332 * @param callback Callable to execute.
333 * @return @c Schedule::Status for chaining.
334 */
335 // NOLINTNEXTLINE(modernize-use-constraints)
336 template <typename CallbackT, typename = std::enable_if_t<!std::is_convertible_v<CallbackT, Schedule::RetCallback>>>
337 Schedule::Status exec_task(const Schedule::Config& config, CallbackT&& callback);
338
339 /**
340 * @brief Posts a scheduled task and returns a @c Schedule::RetStatus for chaining callbacks.
341 *
342 * @details
343 * This overload is for callbacks returning @c bool. Chain @c on_then (fires if @c true)
344 * and @c on_else (fires if @c false) on the returned status.
345 *
346 * @tparam CallbackT Callable type returning @c bool.
347 * @param config Scheduling configuration.
348 * @param callback Callable to execute.
349 * @return @c Schedule::RetStatus for chaining.
350 */
351 // NOLINTNEXTLINE(modernize-use-constraints)
352 template <typename CallbackT, typename = std::enable_if_t<std::is_convertible_v<CallbackT, Schedule::RetCallback>>>
353 Schedule::RetStatus exec_task(const Schedule::Config& config, CallbackT&& callback);
354
355 /**
356 * @brief Wakes the loop thread if it is sleeping (e.g., in @c kBlockStrategy).
357 *
358 * @return @c true if the wakeup signal was sent.
359 */
360 bool wakeup();
361
362 /**
363 * @brief Resets the lock-free queue to its initial capacity.
364 *
365 * @details
366 * Only applicable to @c kLockfreeType loops. Useful after a large burst of tasks
367 * to reclaim internal capacity bookkeeping.
368 */
370
371 /**
372 * @brief Returns @c true if the loop is currently running (started and not quit).
373 *
374 * @return @c true if the loop is active.
375 */
376 [[nodiscard]] bool is_running() const;
377
378 /**
379 * @brief Returns @c true if @c quit() has been called and the loop is winding down.
380 *
381 * @return @c true if the loop is in the process of quitting.
382 */
383 [[nodiscard]] bool is_ready_to_quit() const;
384
385 /**
386 * @brief Returns @c true if the loop is currently executing a task.
387 *
388 * @return @c true if a task callback is in progress on the loop thread.
389 */
390 [[nodiscard]] bool is_busy() const;
391
392 /**
393 * @brief Returns the number of tasks currently in the queue.
394 *
395 * @return Pending task count.
396 */
397 [[nodiscard]] size_t get_task_count() const;
398
399 /**
400 * @brief Returns the maximum queue depth.
401 *
402 * @return @c kMaxTaskSize (10000) by default.
403 */
404 [[nodiscard]] virtual size_t get_max_task_count() const;
405
406 /**
407 * @brief Returns the maximum number of timers that can be attached to this loop.
408 *
409 * @return @c kMaxTimerSize (100) by default.
410 */
411 [[nodiscard]] virtual size_t get_max_timer_count() const;
412
413 /**
414 * @brief Returns the maximum allowed task execution time in milliseconds.
415 *
416 * @details
417 * When a task exceeds this duration, @c on_task_timeout() is called.
418 * Returns 0 to disable timeout checking.
419 *
420 * @return Maximum execution time in ms.
421 */
422 [[nodiscard]] virtual uint32_t get_max_elapsed_time() const;
423
424 /**
425 * @brief Returns @c true if the calling thread is the same as the loop thread.
426 *
427 * @details
428 * Used internally to detect if a task is calling back into the loop synchronously.
429 * For @c MultiLoop, returns @c true if the caller is any of the worker threads.
430 *
431 * @return @c true if called from the loop's own thread.
432 */
433 [[nodiscard]] virtual bool is_in_same_thread() const;
434
435 /**
436 * @brief Dispatches a callable to the loop thread and returns a @c std::future for the result.
437 *
438 * @details
439 * Thread-safe. The future becomes ready after the callable is executed on the loop thread.
440 *
441 * @warning Do not call @c .get() on the future from the same thread as the loop; doing so will
442 * deadlock.
443 *
444 * @tparam FunctionT Callable type.
445 * @tparam ArgsT Argument types.
446 * @tparam ResultT Return type (deduced).
447 * @param function Callable to dispatch.
448 * @param args Arguments forwarded to the callable.
449 * @return @c std::future<ResultT> that becomes ready when the task completes.
450 */
451 template <class FunctionT, class... ArgsT, typename ResultT = std::invoke_result_t<FunctionT, ArgsT...>>
452 [[nodiscard]] std::future<ResultT> invoke_task(FunctionT&& function, ArgsT&&... args);
453
454 /**
455 * @brief Dispatches a callable with an explicit priority and returns a @c std::future.
456 *
457 * @details
458 * Same as @c invoke_task() but the task is enqueued at @p priority level.
459 * Requires a @c kPriorityType loop for priority to take effect.
460 *
461 * @tparam FunctionT Callable type.
462 * @tparam ArgsT Argument types.
463 * @tparam ResultT Return type (deduced).
464 * @param function Callable to dispatch.
465 * @param priority Dispatch priority.
466 * @param args Arguments forwarded to the callable.
467 * @return @c std::future<ResultT>.
468 */
469 template <class FunctionT, class... ArgsT, typename ResultT = std::invoke_result_t<FunctionT, ArgsT...>>
470 [[nodiscard]] std::future<ResultT> invoke_task_with_priority(FunctionT&& function, uint16_t priority,
471 ArgsT&&... args);
472
473 protected:
474 /**
475 * @brief Called from the loop thread just before the first task is processed.
476 *
477 * @details
478 * Override in subclasses to perform per-thread initialisation.
479 */
480 virtual void on_begin();
481
482 /**
483 * @brief Called from the loop thread just after the last task has been processed.
484 *
485 * @details
486 * Override in subclasses to perform per-thread cleanup.
487 */
488 virtual void on_end();
489
490 /**
491 * @brief Called from the loop thread each time the queue becomes empty.
492 *
493 * @details
494 * Override in subclasses to perform idle work (e.g., statistics updates).
495 */
496 virtual void on_idle();
497
498 /**
499 * @brief Called before each task is executed.
500 *
501 * @details
502 * Provides the task callback and the monotonic start timestamp (in milliseconds).
503 * Override to implement per-task tracing or accounting.
504 *
505 * @param callback The task about to be executed.
506 * @param start_time Millisecond timestamp at which the task was dequeued.
507 */
508 virtual void on_task_changed(Callback&& callback, uint32_t start_time);
509
510 /**
511 * @brief Called when a task's execution time exceeds @c get_max_elapsed_time().
512 *
513 * @details
514 * Override to log or handle slow tasks.
515 *
516 * @param callback The task that timed out.
517 * @param elapsed_time Actual execution time in milliseconds.
518 */
519 virtual void on_task_timeout(Callback&& callback, uint32_t elapsed_time);
520
521 private:
522 static uint64_t get_current_nano_time();
523
524 bool add_timer(Timer* timer);
525
526 bool remove_timer(Timer* timer);
527
528 bool push_task(Callback&& callback, uint16_t priority);
529
530 void push_normal_task(Callback&& callback);
531
532 bool push_lockfree_task(Callback&& callback);
533
534 void push_priority_task(Callback&& callback, uint16_t priority);
535
536 void do_consume();
537
538 bool process_normal_task(bool block);
539
540 bool process_lockfree_task(bool block);
541
542 bool process_priority_task(bool block);
543
544 bool process_timer_task(int64_t& next_sleep_time);
545
546 friend Timer;
547 std::unique_ptr<struct MessageLoopImpl> impl_;
548
550};
551
552////////////////////////////////////////////////////////////////
553/// Details
554////////////////////////////////////////////////////////////////
555
556template <typename CallbackT, typename>
557Schedule::Status MessageLoop::exec_task(const Schedule::Config& config, CallbackT&& callback) {
558 Schedule::Callback wrapper_callback;
559
560 // NOLINTNEXTLINE(bugprone-move-forwarding-reference)
561 auto status = Schedule::process(config, std::move(callback), wrapper_callback);
562
563 bool post_ret = false;
564
565 if (config.delay_ms > 0) {
566 post_ret = Timer::call_once(this, config.delay_ms, std::move(wrapper_callback), config.priority);
567 } else {
568 if (get_type() == kPriorityType && config.priority != kNoPriority) {
569 post_ret = post_task_with_priority(std::move(wrapper_callback), config.priority);
570 } else {
571 post_ret = post_task(std::move(wrapper_callback));
572 }
573 }
574
575 if VUNLIKELY (!post_ret) {
576 status.set_valid(false);
577 }
578
579 return status;
580}
581
582template <typename CallbackT, typename>
583Schedule::RetStatus MessageLoop::exec_task(const Schedule::Config& config, CallbackT&& callback) {
584 Schedule::Callback wrapper_callback;
585
586 // NOLINTNEXTLINE(bugprone-move-forwarding-reference)
587 auto status = Schedule::process_with_ret(config, std::move(callback), wrapper_callback);
588
589 bool post_ret = false;
590
591 if (config.delay_ms > 0) {
592 post_ret = Timer::call_once(this, config.delay_ms, std::move(wrapper_callback), config.priority);
593 } else {
594 if (get_type() == kPriorityType && config.priority != kNoPriority) {
595 post_ret = post_task_with_priority(std::move(wrapper_callback), config.priority);
596 } else {
597 post_ret = post_task(std::move(wrapper_callback));
598 }
599 }
600
601 if VUNLIKELY (!post_ret) {
602 status.set_valid(false);
603 }
604
605 return status;
606}
607
608template <class FunctionT, class... ArgsT, typename ResultT>
609inline std::future<ResultT> MessageLoop::invoke_task(FunctionT&& function, ArgsT&&... args) {
610 auto task = std::make_shared<std::packaged_task<ResultT()>>(
611 std::bind(std::forward<FunctionT>(function), std::forward<ArgsT>(args)...)); // NOLINT(modernize-avoid-bind)
612
613 std::future<ResultT> res = task->get_future();
614
615 using TaskFunction = bool (MessageLoop::*)(Callback&&);
616
617 std::invoke(static_cast<TaskFunction>(&MessageLoop::post_task), this, [task]() { (*task.get())(); });
618
619 return res;
620}
621
622template <class FunctionT, class... ArgsT, typename ResultT>
623inline std::future<ResultT> MessageLoop::invoke_task_with_priority(FunctionT&& function, uint16_t priority,
624 ArgsT&&... args) {
625 auto task = std::make_shared<std::packaged_task<ResultT()>>(
626 std::bind(std::forward<FunctionT>(function), std::forward<ArgsT>(args)...)); // NOLINT(modernize-avoid-bind)
627
628 std::future<ResultT> res = task->get_future();
629
630 using TaskFunction = bool (MessageLoop::*)(Callback&&, uint16_t);
631
632 std::invoke(
633 static_cast<TaskFunction>(&MessageLoop::post_task_with_priority), this, [task]() { (*task.get())(); }, priority);
634
635 return res;
636}
637
638} // namespace vlink
#define VUNLIKELY(...)
Shorthand alias for VLINK_UNLIKELY. Hints that the expression is unlikely true.
定义 macros.h:302
#define VLINK_EXPORT
定义 macros.h:85
#define VLINK_DISALLOW_COPY_AND_ASSIGN(classname)
Deletes the copy constructor and copy-assignment operator of classname.
定义 macros.h:184
RAII task scheduling wrapper with delay, priority, timeouts and result chaining.
Event-loop-driven periodic/one-shot timer with configurable priority.