VLink 2.0.0
A high-performance communication middleware
Loading...
Searching...
No Matches
graph_task.h
Go to the documentation of this file.
1/*
2 * Copyright (C) 2026 by Thun Lu. All rights reserved.
3 * Author: Thun Lu <thun.lu@zohomail.cn>
4 * Repo: https://github.com/thun-res/vlink
5 * _ __ __ _ __
6 * | | / / / / (_) ____ / /__
7 * | | / / / / / / / __ \ / //_/
8 * | |/ / / /___ / / / / / / / ,<
9 * |___/ /_____/ /_/ /_/ /_/ /_/|_|
10 *
11 * Licensed under the Apache License, Version 2.0 (the "License");
12 * you may not use this file except in compliance with the License.
13 * You may obtain a copy of the License at
14 *
15 * http://www.apache.org/licenses/LICENSE-2.0
16 *
17 * Unless required by applicable law or agreed to in writing, software
18 * distributed under the License is distributed on an "AS IS" BASIS,
19 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20 * See the License for the specific language governing permissions and
21 * limitations under the License.
22 */
23
24/**
25 * @file graph_task.h
26 * @brief DAG-based task graph with condition branching, cycle detection, and DOT export.
27 *
28 * @details
29 * @c GraphTask implements a directed acyclic graph (DAG) of work units. Tasks define
30 * their dependencies by calling @c precede() (this task must finish before the target)
31 * or @c succeed() (the target must finish before this task). The entire graph is then
32 * submitted to any @c MessageLoop, @c MultiLoop or @c ThreadPool via @c execute().
33 *
34 * Task types:
35 *
36 * | Factory | Callback signature | Use case |
37 * | ------------------------------ | ------------------------ | ----------------------------------- |
38 * | @c create(callback) | @c void() | Regular work task |
39 * | @c create_condition(callback) | @c int() | Branch selector (returns branch id) |
40 *
41 * Condition task:
42 * A condition task returns an integer selecting which successor sub-graph to activate.
43 * All other successors are skipped. This enables if/switch style DAG branching.
44 *
45 * Execution policies:
46 *
47 * | Policy | Behaviour |
48 * | ----------------- | ----------------------------------------------------------------- |
49 * | @c kPolicyOnce | Task runs exactly once per @c execute() call (default) |
50 * | @c kPolicyMultiple| Task may run multiple times in one @c execute() pass |
51 * | @c kPolicyWaitAll | Task waits for all predecessors before running (default DAG rule) |
52 *
53 * Operator syntax for building graphs:
54 * @code
55 * auto A = vlink::GraphTask::create("A", []{ step_a(); });
56 * auto B = vlink::GraphTask::create("B", []{ step_b(); });
57 * auto C = vlink::GraphTask::create("C", []{ step_c(); });
58 *
59 * A-- > B-- > C; // A depends on B, B depends on C (execution order: C, B, A)
60 * @endcode
61 *
62 * @note
63 * - @c execute() traverses the reachable sub-graph and dispatches all ready tasks to the engine.
64 * - @c has_cycle() detects cycles using DFS; cycles cause undefined behaviour at runtime.
65 * - @c export_to_dot() produces a Graphviz DOT string for visualisation.
66 *
67 * @par Example
68 * @code
69 * vlink::MultiLoop engine(4);
70 * engine.async_run();
71 *
72 * auto load = vlink::GraphTask::create("load", [] { load_data(); });
73 * auto proc = vlink::GraphTask::create("proc", [] { process(); });
74 * auto save = vlink::GraphTask::create("save", [] { save_data(); });
75 *
76 * // A-- > B calls A->succeed(B), meaning B must complete before A.
77 * // So save-- > proc-- > load produces execution order: load, proc, save.
78 * save-- > proc-- > load;
79 *
80 * assert(!save->has_cycle());
81 * save->execute(&engine);
82 * @endcode
83 */
84
85#pragma once
86
87#include <functional>
88#include <memory>
89#include <string>
90#include <unordered_set>
91#include <utility>
92#include <vector>
93
94#include "./macros.h"
95#include "./traits.h"
96
97namespace vlink {
98
99/**
100 * @class GraphTask
101 * @brief Node in a directed acyclic task graph supporting condition branching and parallel execution.
102 *
103 * @details
104 * Must be created via one of the static factory methods (@c create / @c create_condition).
105 * Inherits from @c std::enable_shared_from_this to safely pass @c shared_ptr to internal callbacks.
106 */
107class VLINK_EXPORT GraphTask final : public std::enable_shared_from_this<GraphTask> {
108 public:
109 /**
110 * @brief Execution state of the task within a single @c execute() pass.
111 */
112 enum Status : uint8_t {
113 kStatusInActive = 0, ///< Not yet submitted or cancelled
114 kStatusPending = 1, ///< Waiting for predecessors to complete
115 kStatusRunning = 2, ///< Currently executing
116 kStatusDone = 3, ///< Execution completed
117 };
118
119 /**
120 * @brief Execution policy controlling how many times and when the task runs.
121 */
122 enum Policy : uint8_t {
123 kPolicyOnce = 0, ///< Run exactly once per execute() call (default)
124 kPolicyMultiple = 1, ///< Allow multiple invocations in a single pass
125 kPolicyWaitAll = 2, ///< Wait for ALL predecessors before running
126 };
127
128 /**
129 * @brief Callback type for regular (void-returning) tasks.
130 */
131 using Callback = std::function<void()>;
132
133 /**
134 * @brief Callback type for condition tasks; returns the branch index to activate.
135 */
136 using ConditionCallback = std::function<int()>;
137
138 /**
139 * @brief Callback for status change notifications.
140 *
141 * @details
142 * Called whenever the task's @c Status changes.
143 * First argument is the task name; second is the new status.
144 */
145 using StatusCallback = std::function<void(const std::string&, Status)>;
146
147 /**
148 * @brief Creates a regular (void) task node.
149 *
150 * @param callback Work function to execute.
151 * @param condition_number Number of successor branches available. 0 = no branches.
152 * @return Shared pointer to the new task.
153 */
154 [[nodiscard]] static std::shared_ptr<GraphTask> create(Callback&& callback, int condition_number = 0);
155
156 /**
157 * @brief Creates a named regular (void) task node.
158 *
159 * @param name Task name (used in DOT export and status callbacks).
160 * @param callback Work function to execute.
161 * @param condition_number Number of successor branches available. 0 = no branches.
162 * @return Shared pointer to the new task.
163 */
164 [[nodiscard]] static std::shared_ptr<GraphTask> create(const std::string& name, Callback&& callback,
165 int condition_number = 0);
166
167 /**
168 * @brief Creates a condition task that returns a branch index.
169 *
170 * @details
171 * The integer returned by @p callback selects which successor sub-graph to activate.
172 * Return values outside [0, condition_number) skip all successors.
173 *
174 * @param callback Condition function returning the branch index.
175 * @param condition_number Number of possible branches.
176 * @return Shared pointer to the new condition task.
177 */
178 [[nodiscard]] static std::shared_ptr<GraphTask> create_condition(ConditionCallback&& callback,
179 int condition_number = 0);
180
181 /**
182 * @brief Creates a named condition task.
183 *
184 * @param name Task name.
185 * @param callback Condition function returning the branch index.
186 * @param condition_number Number of possible branches.
187 * @return Shared pointer to the new condition task.
188 */
189 [[nodiscard]] static std::shared_ptr<GraphTask> create_condition(const std::string& name,
190 ConditionCallback&& callback,
191 int condition_number = 0);
192
193 /**
194 * @brief Submits this task (and all reachable successors) to a graph execution engine.
195 *
196 * @details
197 * Traverses the reachable sub-graph, identifies tasks whose predecessors have all
198 * completed, and posts them to @p graph_engine using either @c post_task() or
199 * @c post_task_with_priority() (if available).
200 *
201 * @tparam GraphEngineT Any type providing @c post_task(Callback) and optionally
202 * @c post_task_with_priority(Callback, uint16_t).
203 * @c MessageLoop, @c MultiLoop and @c ThreadPool all satisfy this.
204 * @param graph_engine Pointer to the execution engine.
205 */
206 template <class GraphEngineT>
207 void execute(GraphEngineT* graph_engine);
208
209 /**
210 * @brief Cancels this task; sets its status to @c kStatusInActive.
211 *
212 * @details
213 * A cancelled task will not be executed even if all its predecessors complete.
214 */
215 void cancel();
216
217 /**
218 * @brief Declares that this task must complete before @p task starts.
219 *
220 * @details
221 * Equivalent to @c task->succeed(this_task).
222 *
223 * @param task Successor task.
224 */
225 void precede(const std::shared_ptr<GraphTask>& task);
226
227 /**
228 * @brief Declares that @p task must complete before this task starts.
229 *
230 * @details
231 * Equivalent to @c task->precede(this_task).
232 *
233 * @param task Predecessor task.
234 */
235 void succeed(const std::shared_ptr<GraphTask>& task);
236
237 /**
238 * @brief Registers a callback invoked whenever this task's status changes.
239 *
240 * @param callback Called with (name, new_status) on every status transition.
241 */
243
244 /**
245 * @brief Sets the task name used in DOT export and status callbacks.
246 *
247 * @param name Task name string.
248 */
249 void set_name(const std::string& name);
250
251 /**
252 * @brief Sets a group name for visual grouping in DOT export.
253 *
254 * @param name Group name string.
255 */
256 void set_group_name(const std::string& name);
257
258 /**
259 * @brief Sets the number of condition branches this task can select.
260 *
261 * @param condition_number Branch count (used for condition tasks).
262 */
263 void set_condition_number(int condition_number);
264
265 /**
266 * @brief Sets the task dispatch priority (used by priority-aware engines).
267 *
268 * @param priority Priority value.
269 */
270 void set_priority(uint16_t priority);
271
272 /**
273 * @brief Sets the maximum recursion depth to guard against infinitely deep graphs.
274 *
275 * @details
276 * If a task graph exceeds this depth during execution, a @c kFatal log is emitted.
277 * The default value is 10000.
278 *
279 * @param depth Maximum depth.
280 */
281 void set_max_recursion_depth(uint32_t depth);
282
283 /**
284 * @brief Sets the execution policy.
285 *
286 * @param policy See @c Policy enum.
287 */
288 void set_policy(Policy policy);
289
290 /**
291 * @brief Returns the task name.
292 *
293 * @return Task name string.
294 */
295 [[nodiscard]] std::string get_name() const;
296
297 /**
298 * @brief Returns the group name.
299 *
300 * @return Group name string.
301 */
302 [[nodiscard]] std::string get_group_name() const;
303
304 /**
305 * @brief Returns the number of condition branches.
306 *
307 * @return Condition number.
308 */
309 [[nodiscard]] int get_condition_number() const;
310
311 /**
312 * @brief Returns the dispatch priority.
313 *
314 * @return Priority value.
315 */
316 [[nodiscard]] uint16_t get_priority() const;
317
318 /**
319 * @brief Returns the maximum recursion depth.
320 *
321 * @return Max recursion depth. Default is 10000.
322 */
323 [[nodiscard]] uint32_t get_max_recursion_depth() const;
324
325 /**
326 * @brief Returns the execution policy.
327 *
328 * @return Policy value.
329 */
330 [[nodiscard]] Policy get_policy() const;
331
332 /**
333 * @brief Returns the current execution status of this task.
334 *
335 * @return Status value.
336 */
337 [[nodiscard]] Status get_status() const;
338
339 /**
340 * @brief Removes a previously added predecessor dependency.
341 *
342 * @param task The predecessor to remove.
343 */
344 void remove_precede_task(const std::shared_ptr<GraphTask>& task);
345
346 /**
347 * @brief Removes a previously added successor dependency.
348 *
349 * @param task The successor to remove.
350 */
351 void remove_succeed_task(const std::shared_ptr<GraphTask>& task);
352
353 /**
354 * @brief Returns the list of predecessor tasks (this task depends on them).
355 *
356 * @return Vector of weak pointers to predecessor tasks.
357 */
358 [[nodiscard]] std::vector<std::weak_ptr<GraphTask>> get_precede_task_list() const;
359
360 /**
361 * @brief Returns the list of successor tasks (they depend on this task).
362 *
363 * @return Vector of weak pointers to successor tasks.
364 */
365 [[nodiscard]] std::vector<std::weak_ptr<GraphTask>> get_succeed_task_list() const;
366
367 /**
368 * @brief Returns @c true if this task was created with @c create_condition().
369 *
370 * @return @c true for condition tasks.
371 */
372 [[nodiscard]] bool is_condition_task() const;
373
374 /**
375 * @brief Detects whether the reachable sub-graph contains any cycles.
376 *
377 * @details
378 * Uses DFS with a recursion stack to find back edges. A cycle-free graph is required
379 * for correct @c execute() behaviour.
380 *
381 * @return @c true if a cycle is detected.
382 */
383 [[nodiscard]] bool has_cycle() const;
384
385 /**
386 * @brief Exports the reachable sub-graph as a Graphviz DOT string.
387 *
388 * @details
389 * The DOT output can be passed to @c dot -Tpng to produce a dependency diagram.
390 *
391 * @return DOT language string representing the task graph.
392 */
393 [[nodiscard]] std::string export_to_dot() const;
394
395 protected:
396 using FindTaskCallback = std::function<void(const std::shared_ptr<GraphTask>&)>;
397
398 explicit GraphTask(Callback&& callback, int condition_number);
399
400 explicit GraphTask(const std::string& name, Callback&& callback, int condition_number);
401
402 explicit GraphTask(ConditionCallback&& callback, int condition_number);
403
404 explicit GraphTask(const std::string& name, ConditionCallback&& callback, int condition_number);
405
407
409
410 private:
411 int invoke(bool once);
412
413 void wait();
414
415 void notify(int condition_number);
416
417 void update_status(Status status);
418
419 bool detect_cycle(const GraphTask* task, std::unordered_set<const GraphTask*>& visited,
420 std::unordered_set<const GraphTask*>& recursion_stack) const;
421
422 static void clear_invalid_task(const std::shared_ptr<GraphTask>& task);
423
424 std::unique_ptr<struct GraphTaskImpl> impl_;
425
427};
428
429using GraphTaskPtr = std::shared_ptr<GraphTask>;
430
431////////////////////////////////////////////////////////////////
432/// Details
433////////////////////////////////////////////////////////////////
434
435template <class GraphEngineT>
436inline void GraphTask::execute(GraphEngineT* graph_engine) {
437 auto self = shared_from_this();
438
439 process_and_traverse([self, graph_engine](const std::shared_ptr<GraphTask>& task) {
440 constexpr bool kHaspriority = VLINK_HAS_MEMBER(GraphEngineT, post_task_with_priority);
441 [[maybe_unused]] constexpr uint8_t kPriorityType = 2;
442
443 auto task_func = [self, task]() {
444 if (task.get() != self.get()) {
445 task->wait();
446 }
447
448 int ret = task->invoke(true);
449
450 if (ret >= 0) {
451 task->notify(ret);
452 }
453 };
454
455 if constexpr (kHaspriority) {
456 if constexpr (VLINK_HAS_MEMBER(GraphEngineT, get_type)) {
457 if (graph_engine->get_type() == kPriorityType) {
458 graph_engine->post_task_with_priority(std::move(task_func), task->get_priority());
459 } else {
460 graph_engine->post_task(std::move(task_func));
461 }
462 } else {
463 graph_engine->post_task(std::move(task_func));
464 }
465 } else {
466 graph_engine->post_task(std::move(task_func));
467 }
468 });
469}
470
471[[maybe_unused]] static inline GraphTaskPtr& operator--(GraphTaskPtr& task, int) { return task; }
472
473[[maybe_unused]] static inline GraphTaskPtr& operator>(GraphTaskPtr& task, GraphTaskPtr& target_task) {
474 task->succeed(target_task);
475 return target_task;
476}
477
478[[maybe_unused]] static inline GraphTaskPtr& operator<(GraphTaskPtr& task, GraphTaskPtr& target_task) {
479 task->precede(target_task);
480 return target_task;
481}
482
483[[maybe_unused]] static inline GraphTaskPtr& operator--(GraphTaskPtr& task) { return task; }
484
485} // namespace vlink
Platform-independent macro definitions for the VLink library.
#define VLINK_EXPORT
Definition macros.h:85
#define VLINK_DISALLOW_COPY_AND_ASSIGN(classname)
Deletes the copy constructor and copy-assignment operator of classname.
Definition macros.h:184
Compile-time type-trait utilities used internally by VLink.
#define VLINK_HAS_MEMBER(T, member)
Macro Definitions.
Definition traits.h:350