VLink 2.0.0
A high-performance communication middleware
Loading...
Searching...
No Matches
vlink::MessageLoop Class Reference

Single-threaded serial task dispatcher with integrated timer support. More...

#include <message_loop.h>

Inheritance diagram for vlink::MessageLoop:
Collaboration diagram for vlink::MessageLoop:

Public Types

enum  Type : uint8_t { kNormalType = 0 , kLockfreeType = 1 , kPriorityType = 2 }
 Queue implementation type. More...
enum  Strategy : uint8_t { kOptimizationStrategy = 0 , kPopStrategy = 1 , kBlockStrategy = 2 }
 Idle strategy controlling CPU and latency trade-offs. More...
enum  Priority : uint16_t {
  kNoPriority = 0 , kLowestPriority = 1 , kTimerPriority = 50 , kNormalPriority = 100 ,
  kHighestPriority = std::numeric_limits<uint16_t>::max()
}
 Pre-defined task priority levels for kPriorityType loops. More...
using Callback = std::function<void()>
 Callback type for tasks and event handlers.

Public Member Functions

 MessageLoop ()
 Constructs a MessageLoop with kNormalType queue.
 MessageLoop (Type type)
 Constructs a MessageLoop with the specified queue type.
virtual ~MessageLoop ()
 Destructor. Calls quit(true) and waits for the background thread (if any).
void set_name (const std::string &name)
 Sets a human-readable name for this loop (visible in profiling tools).
const std::string & get_name () const
 Returns the name set via set_name().
Type get_type () const
 Returns the queue type this loop was constructed with.
Strategy get_strategy () const
 Returns the current idle dispatch strategy.
void set_strategy (Strategy strategy)
 Changes the idle dispatch strategy.
void register_begin_handler (Callback &&callback)
 Registers a callback invoked once when the loop thread starts.
void register_end_handler (Callback &&callback)
 Registers a callback invoked once when the loop thread exits.
void register_idle_handler (Callback &&callback)
 Registers a callback invoked each time the task queue becomes empty.
bool run ()
 Runs the message loop on the calling thread (blocking).
bool async_run ()
 Starts the message loop on a new background thread (non-blocking).
bool spin ()
 Runs the loop continuously in a spin mode (blocking; no background thread).
bool spin_once (bool block=true)
 Processes one batch of pending tasks and timers.
bool quit (bool force=false)
 Requests the loop to exit cleanly.
bool wait_for_idle (int ms=Timer::kInfinite, bool check=true)
 Waits until the task queue is drained.
bool wait_for_quit (int ms=Timer::kInfinite, bool check=true)
 Waits until the loop has fully exited (after quit() was called).
bool post_task (Callback &&callback)
 Posts a task to the queue for execution on the loop thread.
bool post_task_with_priority (Callback &&callback, uint16_t priority)
 Posts a task with an explicit priority (requires kPriorityType loop).
template<typename CallbackT, typename = std::enable_if_t<!std::is_convertible_v<CallbackT, Schedule::RetCallback>>>
Schedule::Status exec_task (const Schedule::Config &config, CallbackT &&callback)
 Posts a scheduled task and returns a Schedule::Status for chaining callbacks.
template<typename CallbackT, typename = std::enable_if_t<std::is_convertible_v<CallbackT, Schedule::RetCallback>>>
Schedule::RetStatus exec_task (const Schedule::Config &config, CallbackT &&callback)
 Posts a scheduled task and returns a Schedule::RetStatus for chaining callbacks.
bool wakeup ()
 Wakes the loop thread if it is sleeping (e.g., in kBlockStrategy).
void reset_lockfree_capacity ()
 Resets the lock-free queue to its initial capacity.
bool is_running () const
 Returns true if the loop is currently running (started and not quit).
bool is_ready_to_quit () const
 Returns true if quit() has been called and the loop is winding down.
bool is_busy () const
 Returns true if the loop is currently executing a task.
size_t get_task_count () const
 Returns the number of tasks currently in the queue.
virtual size_t get_max_task_count () const
 Returns the maximum queue depth.
virtual size_t get_max_timer_count () const
 Returns the maximum number of timers that can be attached to this loop.
virtual uint32_t get_max_elapsed_time () const
 Returns the maximum allowed task execution time in milliseconds.
virtual bool is_in_same_thread () const
 Returns true if the calling thread is the same as the loop thread.
template<class FunctionT, class... ArgsT, typename ResultT = std::invoke_result_t<FunctionT, ArgsT...>>
std::future< ResultT > invoke_task (FunctionT &&function, ArgsT &&... args)
 Dispatches a callable to the loop thread and returns a std::future for the result.
template<class FunctionT, class... ArgsT, typename ResultT = std::invoke_result_t<FunctionT, ArgsT...>>
std::future< ResultT > invoke_task_with_priority (FunctionT &&function, uint16_t priority, ArgsT &&... args)
 Dispatches a callable with an explicit priority and returns a std::future.

Protected Member Functions

virtual void on_begin ()
 Called from the loop thread just before the first task is processed.
virtual void on_end ()
 Called from the loop thread just after the last task has been processed.
virtual void on_idle ()
 Called from the loop thread each time the queue becomes empty.
virtual void on_task_changed (Callback &&callback, uint32_t start_time)
 Called before each task is executed.
virtual void on_task_timeout (Callback &&callback, uint32_t elapsed_time)
 Called when a task's execution time exceeds get_max_elapsed_time().

Detailed Description

Single-threaded serial task dispatcher with integrated timer support.

All tasks and timer callbacks run on a single thread. Thread-safe posting of tasks is allowed from any thread via post_task().

Member Typedef Documentation

◆ Callback

using vlink::MessageLoop::Callback = std::function<void()>

Callback type for tasks and event handlers.

Member Enumeration Documentation

◆ Priority

Pre-defined task priority levels for kPriorityType loops.

Higher numeric values are dispatched first. Custom priority values may be used between kLowestPriority and kHighestPriority.

Enumerator
kNoPriority 

No priority (FIFO order).

kLowestPriority 

Lowest real priority.

kTimerPriority 

Used internally for timer callbacks.

kNormalPriority 

Default task priority.

kHighestPriority 

Highest available priority.

◆ Strategy

Idle strategy controlling CPU and latency trade-offs.

See class documentation for a comparison table.

Enumerator
kOptimizationStrategy 

Balance latency and CPU via yield.

kPopStrategy 

Busy-poll (lowest latency, highest CPU).

kBlockStrategy 

Block on condition variable (lowest CPU).

◆ Type

enum vlink::MessageLoop::Type : uint8_t

Queue implementation type.

Selects the internal task queue algorithm. See class documentation for a comparison table.

Enumerator
kNormalType 

Mutex-protected FIFO queue (default).

kLockfreeType 

Lock-free MPMC queue.

kPriorityType 

Priority-ordered queue.

Constructor & Destructor Documentation

◆ MessageLoop() [1/2]

vlink::MessageLoop::MessageLoop ( )

Constructs a MessageLoop with kNormalType queue.

Here is the caller graph for this function:

◆ MessageLoop() [2/2]

vlink::MessageLoop::MessageLoop ( Type type)
explicit

Constructs a MessageLoop with the specified queue type.

Parameters
typeQueue implementation to use.

◆ ~MessageLoop()

virtual vlink::MessageLoop::~MessageLoop ( )
virtual

Destructor. Calls quit(true) and waits for the background thread (if any).

Member Function Documentation

◆ async_run()

bool vlink::MessageLoop::async_run ( )

Starts the message loop on a new background thread (non-blocking).

Returns immediately. The background thread runs until quit() is called.

Returns
true if the thread was started; false if already running.

◆ exec_task() [1/2]

template<typename CallbackT, typename>
Schedule::RetStatus vlink::MessageLoop::exec_task ( const Schedule::Config & config,
CallbackT && callback )

Posts a scheduled task and returns a Schedule::RetStatus for chaining callbacks.

This overload is for callbacks returning bool. Chain on_then (fires if true) and on_else (fires if false) on the returned status.

Template Parameters
CallbackTCallable type returning bool.
Parameters
configScheduling configuration.
callbackCallable to execute.
Returns
Schedule::RetStatus for chaining.
Here is the call graph for this function:

◆ exec_task() [2/2]

template<typename CallbackT, typename>
Schedule::Status vlink::MessageLoop::exec_task ( const Schedule::Config & config,
CallbackT && callback )

Posts a scheduled task and returns a Schedule::Status for chaining callbacks.

Details.

This overload is for callbacks returning void. The Schedule::Config can specify a delay, priority, schedule timeout and execution timeout. Chain on_schedule_timeout, on_execution_timeout or on_catch on the returned status.

Template Parameters
CallbackTCallable type returning void.
Parameters
configScheduling configuration.
callbackCallable to execute.
Returns
Schedule::Status for chaining.
Here is the call graph for this function:
Here is the caller graph for this function:

◆ get_max_elapsed_time()

virtual uint32_t vlink::MessageLoop::get_max_elapsed_time ( ) const
nodiscardvirtual

Returns the maximum allowed task execution time in milliseconds.

When a task exceeds this duration, on_task_timeout() is called. Returns 0 to disable timeout checking.

Returns
Maximum execution time in ms.

Reimplemented in vlink::DiscoveryReporter, vlink::DiscoveryViewer, vlink::ProxyAPI, and vlink::ProxyServer.

◆ get_max_task_count()

virtual size_t vlink::MessageLoop::get_max_task_count ( ) const
nodiscardvirtual

Returns the maximum queue depth.

Returns
kMaxTaskSize (10000) by default.

Reimplemented in vlink::DatabaseReader, vlink::DatabaseWriter, vlink::DiscoveryReporter, vlink::DiscoveryViewer, vlink::McapReader, vlink::McapWriter, vlink::ProxyAPI, and vlink::ProxyServer.

◆ get_max_timer_count()

virtual size_t vlink::MessageLoop::get_max_timer_count ( ) const
nodiscardvirtual

Returns the maximum number of timers that can be attached to this loop.

Returns
kMaxTimerSize (100) by default.

◆ get_name()

const std::string & vlink::MessageLoop::get_name ( ) const
nodiscard

Returns the name set via set_name().

Returns
Reference to the name string.

◆ get_strategy()

Strategy vlink::MessageLoop::get_strategy ( ) const
nodiscard

Returns the current idle dispatch strategy.

Returns
Current strategy.

◆ get_task_count()

size_t vlink::MessageLoop::get_task_count ( ) const
nodiscard

Returns the number of tasks currently in the queue.

Returns
Pending task count.

◆ get_type()

Type vlink::MessageLoop::get_type ( ) const
nodiscard

Returns the queue type this loop was constructed with.

Returns
Queue type.
Here is the caller graph for this function:

◆ invoke_task()

template<class FunctionT, class... ArgsT, typename ResultT>
std::future< ResultT > vlink::MessageLoop::invoke_task ( FunctionT && function,
ArgsT &&... args )
inlinenodiscard

Dispatches a callable to the loop thread and returns a std::future for the result.

Thread-safe. The future becomes ready after the callable is executed on the loop thread.

Warning
Do not call .get() on the future from the same thread as the loop; doing so will deadlock.
Template Parameters
FunctionTCallable type.
ArgsTArgument types.
ResultTReturn type (deduced).
Parameters
functionCallable to dispatch.
argsArguments forwarded to the callable.
Returns
std::future<ResultT> that becomes ready when the task completes.
Here is the call graph for this function:
Here is the caller graph for this function:

◆ invoke_task_with_priority()

template<class FunctionT, class... ArgsT, typename ResultT>
std::future< ResultT > vlink::MessageLoop::invoke_task_with_priority ( FunctionT && function,
uint16_t priority,
ArgsT &&... args )
inlinenodiscard

Dispatches a callable with an explicit priority and returns a std::future.

Same as invoke_task() but the task is enqueued at priority level. Requires a kPriorityType loop for priority to take effect.

Template Parameters
FunctionTCallable type.
ArgsTArgument types.
ResultTReturn type (deduced).
Parameters
functionCallable to dispatch.
priorityDispatch priority.
argsArguments forwarded to the callable.
Returns
std::future<ResultT>.
Here is the call graph for this function:
Here is the caller graph for this function:

◆ is_busy()

bool vlink::MessageLoop::is_busy ( ) const
nodiscard

Returns true if the loop is currently executing a task.

Returns
true if a task callback is in progress on the loop thread.

◆ is_in_same_thread()

virtual bool vlink::MessageLoop::is_in_same_thread ( ) const
nodiscardvirtual

Returns true if the calling thread is the same as the loop thread.

Used internally to detect if a task is calling back into the loop synchronously. For MultiLoop, returns true if the caller is any of the worker threads.

Returns
true if called from the loop's own thread.

Reimplemented in vlink::MultiLoop.

Here is the call graph for this function:

◆ is_ready_to_quit()

bool vlink::MessageLoop::is_ready_to_quit ( ) const
nodiscard

Returns true if quit() has been called and the loop is winding down.

Returns
true if the loop is in the process of quitting.

◆ is_running()

bool vlink::MessageLoop::is_running ( ) const
nodiscard

Returns true if the loop is currently running (started and not quit).

Returns
true if the loop is active.

◆ on_begin()

virtual void vlink::MessageLoop::on_begin ( )
protectedvirtual

Called from the loop thread just before the first task is processed.

Override in subclasses to perform per-thread initialisation.

Reimplemented in vlink::DatabaseReader, vlink::DatabaseWriter, vlink::DiscoveryReporter, vlink::DiscoveryViewer, vlink::McapReader, vlink::McapWriter, vlink::MultiLoop, vlink::ProxyAPI, and vlink::ProxyServer.

◆ on_end()

virtual void vlink::MessageLoop::on_end ( )
protectedvirtual

Called from the loop thread just after the last task has been processed.

Override in subclasses to perform per-thread cleanup.

Reimplemented in vlink::DatabaseReader, vlink::DatabaseWriter, vlink::DiscoveryReporter, vlink::DiscoveryViewer, vlink::McapReader, vlink::McapWriter, vlink::MultiLoop, vlink::ProxyAPI, and vlink::ProxyServer.

◆ on_idle()

virtual void vlink::MessageLoop::on_idle ( )
protectedvirtual

Called from the loop thread each time the queue becomes empty.

Override in subclasses to perform idle work (e.g., statistics updates).

◆ on_task_changed()

virtual void vlink::MessageLoop::on_task_changed ( Callback && callback,
uint32_t start_time )
protectedvirtual

Called before each task is executed.

Provides the task callback and the monotonic start timestamp (in milliseconds). Override to implement per-task tracing or accounting.

Parameters
callbackThe task about to be executed.
start_timeMillisecond timestamp at which the task was dequeued.

Reimplemented in vlink::MultiLoop.

◆ on_task_timeout()

virtual void vlink::MessageLoop::on_task_timeout ( Callback && callback,
uint32_t elapsed_time )
protectedvirtual

Called when a task's execution time exceeds get_max_elapsed_time().

Override to log or handle slow tasks.

Parameters
callbackThe task that timed out.
elapsed_timeActual execution time in milliseconds.
Here is the call graph for this function:

◆ post_task()

bool vlink::MessageLoop::post_task ( Callback && callback)

Posts a task to the queue for execution on the loop thread.

Thread-safe. Returns false if the loop is shutting down or the queue is full (kMaxTaskSize).

Parameters
callbackTask to execute.
Returns
true if the task was successfully enqueued.
Here is the caller graph for this function:

◆ post_task_with_priority()

bool vlink::MessageLoop::post_task_with_priority ( Callback && callback,
uint16_t priority )

Posts a task with an explicit priority (requires kPriorityType loop).

Tasks with higher priority values are dispatched before lower-priority tasks. For kNormalType and kLockfreeType loops, priority is ignored and the task is enqueued in FIFO order.

Parameters
callbackTask to execute.
priorityDispatch priority.
Returns
true if enqueued successfully.
Here is the call graph for this function:
Here is the caller graph for this function:

◆ quit()

bool vlink::MessageLoop::quit ( bool force = false)

Requests the loop to exit cleanly.

Signals the loop to stop after finishing the current task. If force is true, remaining queued tasks are discarded.

Parameters
forceIf true, discard pending tasks. Default: false.
Returns
true on success.

◆ register_begin_handler()

void vlink::MessageLoop::register_begin_handler ( Callback && callback)

Registers a callback invoked once when the loop thread starts.

Parameters
callbackCalled from the loop thread before the first task is processed.

◆ register_end_handler()

void vlink::MessageLoop::register_end_handler ( Callback && callback)

Registers a callback invoked once when the loop thread exits.

Parameters
callbackCalled from the loop thread after the last task has been processed.

◆ register_idle_handler()

void vlink::MessageLoop::register_idle_handler ( Callback && callback)

Registers a callback invoked each time the task queue becomes empty.

Parameters
callbackCalled from the loop thread on each idle cycle.

◆ reset_lockfree_capacity()

void vlink::MessageLoop::reset_lockfree_capacity ( )

Resets the lock-free queue to its initial capacity.

Only applicable to kLockfreeType loops. Useful after a large burst of tasks to reclaim internal capacity bookkeeping.

◆ run()

bool vlink::MessageLoop::run ( )

Runs the message loop on the calling thread (blocking).

Processes tasks and fires timers until quit() is called.

Returns
true if the loop ran and exited normally; false if already running.

◆ set_name()

void vlink::MessageLoop::set_name ( const std::string & name)

Sets a human-readable name for this loop (visible in profiling tools).

Parameters
nameName string.

◆ set_strategy()

void vlink::MessageLoop::set_strategy ( Strategy strategy)

Changes the idle dispatch strategy.

Parameters
strategyNew strategy. Takes effect on the next idle cycle.

◆ spin()

bool vlink::MessageLoop::spin ( )

Runs the loop continuously in a spin mode (blocking; no background thread).

Calls spin_once() repeatedly until quit() is called.

Returns
true on normal exit.

◆ spin_once()

bool vlink::MessageLoop::spin_once ( bool block = true)

Processes one batch of pending tasks and timers.

Intended for integration into an existing event loop.

Parameters
blockIf true and the queue is empty, blocks until a task arrives. If false, returns immediately if the queue is empty. Default: true.
Returns
true if at least one task was processed; false if the loop should quit.

◆ wait_for_idle()

bool vlink::MessageLoop::wait_for_idle ( int ms = Timer::kInfinite,
bool check = true )

Waits until the task queue is drained.

Parameters
msMaximum wait time in milliseconds. Timer::kInfinite for unlimited. Default: kInfinite.
checkIf true, also verify the loop is in the idle state. Default: true.
Returns
true if the queue drained within the timeout.

◆ wait_for_quit()

bool vlink::MessageLoop::wait_for_quit ( int ms = Timer::kInfinite,
bool check = true )

Waits until the loop has fully exited (after quit() was called).

Parameters
msMaximum wait time in milliseconds. Timer::kInfinite for unlimited. Default: kInfinite.
checkIf true, also verify the loop thread has joined. Default: true.
Returns
true if the loop exited within the timeout.

◆ wakeup()

bool vlink::MessageLoop::wakeup ( )

Wakes the loop thread if it is sleeping (e.g., in kBlockStrategy).

Returns
true if the wakeup signal was sent.

The documentation for this class was generated from the following file: