VLink 2.0.0
A high-performance communication middleware
载入中...
搜索中...
未找到
mpmc_queue.h
浏览该文件的文档.
1/*
2 * Copyright (C) 2026 by Thun Lu. All rights reserved.
3 * Author: Thun Lu <thun.lu@zohomail.cn>
4 * Repo: https://github.com/thun-res/vlink
5 * _ __ __ _ __
6 * | | / / / / (_) ____ / /__
7 * | | / / / / / / / __ \ / //_/
8 * | |/ / / /___ / / / / / / / ,<
9 * |___/ /_____/ /_/ /_/ /_/ /_/|_|
10 *
11 * Licensed under the Apache License, Version 2.0 (the "License");
12 * you may not use this file except in compliance with the License.
13 * You may obtain a copy of the License at
14 *
15 * http://www.apache.org/licenses/LICENSE-2.0
16 *
17 * Unless required by applicable law or agreed to in writing, software
18 * distributed under the License is distributed on an "AS IS" BASIS,
19 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20 * See the License for the specific language governing permissions and
21 * limitations under the License.
22 */
23
24/**
25 * @file mpmc_queue.h
26 * @brief Lock-free bounded multi-producer multi-consumer queue with optional blocking behaviour.
27 *
28 * @details
29 * @c MpmcQueue<T> is a fixed-capacity, cache-line-aligned, lock-free MPMC ring buffer
30 * based on a turn-counting algorithm. Each slot contains an atomic turn counter that
31 * tracks whether the slot is empty (ready for a producer) or full (ready for a consumer).
32 *
33 * Concurrency model:
34 * - Producers atomically increment @c head_ to claim a slot, then wait until
35 * @c chunk.turn == turn(head) * 2 (slot is empty) before constructing the value.
36 * - Consumers atomically increment @c tail_ to claim a slot, then wait until
37 * @c chunk.turn == turn(tail) * 2 + 1 (slot is full) before moving the value out.
38 * - All waits spin for @c kFirstSpinTimes (32) iterations before calling @c yield_cpu().
39 *
40 * Behaviour modes:
41 *
42 * | Behavior | Effect on emplace/push | Effect on pop |
43 * | --------------------- | ----------------------------------- | -------------------------------- |
44 * | @c kNoBehavior | No notification | No blocking |
45 * | @c kConditionBehavior | Signals @c cv_not_empty_ on push | Signals @c cv_not_full_ on pop |
46 *
47 * @c kConditionBehavior enables @c wait_not_empty() and @c wait_not_full() to wake correctly.
48 * Use it with @c kBlockStrategy message loops.
49 *
50 * Cache-line alignment:
51 * - @c head_ and @c tail_ are each aligned to 64 bytes to prevent false sharing.
52 * - Each @c Chunk slot is also 64-byte aligned.
53 * - The queue object itself is a multiple of 64 bytes.
54 *
55 * @note
56 * - @c emplace() / @c pop() block indefinitely (spinning) until a slot is available.
57 * For bounded producers, use @c try_emplace() / @c try_push() instead.
58 * - @c notify_to_quit() sets a quit flag and wakes all blocked @c wait_not_empty() /
59 * @c wait_not_full() calls. After calling this, further pushes are silently dropped.
60 * - Capacity must be >= 1; passing 0 throws @c std::invalid_argument.
61 * - The @c VLINK_NO_INSTRUMENT attribute suppresses GCC's @c -finstrument-functions on Linux.
62 *
63 * @par Example
64 * @code
65 * vlink::MpmcQueue<int> q(1024);
66 *
67 * // Producer thread:
68 * q.push<vlink::MpmcQueue<int>::kConditionBehavior>(42);
69 *
70 * // Consumer thread:
71 * q.wait_not_empty();
72 * int val;
73 * q.pop<vlink::MpmcQueue<int>::kConditionBehavior>(val);
74 * @endcode
75 */
76
77#pragma once
78
79#include <array>
80#include <atomic>
81#include <chrono>
82#include <cstddef>
83#include <limits>
84#include <memory>
85#include <mutex>
86#include <new>
87#include <stdexcept>
88#include <utility>
89
91#include "./macros.h"
92#include "./utils.h"
93
94#if defined(__linux__)
95#define VLINK_NO_INSTRUMENT __attribute__((no_instrument_function))
96#else
97#define VLINK_NO_INSTRUMENT
98#endif
99
100namespace vlink {
101
102/**
103 * @class MpmcQueue
104 * @brief Fixed-capacity, lock-free, cache-line-aligned MPMC ring buffer.
105 *
106 * @tparam T Element type. Must be moveable.
107 */
108template <typename T>
110 public:
111 /**
112 * @brief Controls whether condition-variable notifications are sent on push/pop.
113 *
114 * | Value | Behaviour |
115 * | ------------------- | ------------------------------------------------------ |
116 * | @c kNoBehavior | No notifications; used with busy-wait or polling |
117 * | @c kConditionBehavior| Notifies @c cv_not_empty_ / @c cv_not_full_ on change |
118 */
119 enum Behavior : uint8_t { kNoBehavior = 0, kConditionBehavior = 1 };
120
121 /**
122 * @brief Constructs a @c MpmcQueue with the given fixed capacity.
123 *
124 * @param capacity Maximum number of elements. Must be >= 1.
125 * @throws std::invalid_argument if @p capacity < 1.
126 * @throws std::bad_alloc if the internal chunk array cannot be allocated.
127 */
128 explicit MpmcQueue(size_t capacity) VLINK_NO_INSTRUMENT;
129
130 /**
131 * @brief Destructor. Destroys any elements still in the queue.
132 */
134
135 /**
136 * @brief In-place constructs an element and blocks until a slot is available.
137 *
138 * @details
139 * If @c BehaviorT == @c kConditionBehavior, notifies @c wait_not_empty() waiters after push.
140 * Silently drops the element if @c notify_to_quit() has been called.
141 *
142 * @tparam BehaviorT Notification behaviour. Default: @c kNoBehavior.
143 * @tparam Args Constructor argument types.
144 * @param args Arguments forwarded to @c T's constructor.
145 */
146 template <Behavior BehaviorT = kNoBehavior, typename... Args>
147 void emplace(Args&&... args) noexcept VLINK_NO_INSTRUMENT;
148
149 /**
150 * @brief In-place constructs an element without blocking.
151 *
152 * @details
153 * Returns @c false immediately if the queue is full.
154 *
155 * @tparam BehaviorT Notification behaviour. Default: @c kNoBehavior.
156 * @tparam Args Constructor argument types.
157 * @param args Arguments forwarded to @c T's constructor.
158 * @return @c true if the element was enqueued; @c false if the queue was full.
159 */
160 template <Behavior BehaviorT = kNoBehavior, typename... Args>
161 [[nodiscard]] bool try_emplace(Args&&... args) noexcept VLINK_NO_INSTRUMENT;
162
163 /**
164 * @brief Pushes a value (by forwarding) and blocks until a slot is available.
165 *
166 * @details
167 * Forwards to @c emplace<BehaviorT>(std::forward<P>(v)).
168 *
169 * @tparam BehaviorT Notification behaviour. Default: @c kNoBehavior.
170 * @tparam P Value type (lvalue or rvalue reference).
171 * @param v Value to push.
172 */
173 template <Behavior BehaviorT = kNoBehavior, typename P>
174 void push(P&& v) noexcept VLINK_NO_INSTRUMENT;
175
176 /**
177 * @brief Pushes a value without blocking; returns @c false if the queue is full.
178 *
179 * @tparam BehaviorT Notification behaviour. Default: @c kNoBehavior.
180 * @tparam P Value type.
181 * @param v Value to push.
182 * @return @c true if pushed; @c false if full.
183 */
184 template <Behavior BehaviorT = kNoBehavior, typename P>
185 [[nodiscard]] bool try_push(P&& v) noexcept VLINK_NO_INSTRUMENT;
186
187 /**
188 * @brief Pops a value by move and blocks until an element is available.
189 *
190 * @details
191 * If @c BehaviorT == @c kConditionBehavior, notifies @c wait_not_full() waiters after pop.
192 * Returns without modifying @p v if @c notify_to_quit() has been called.
193 *
194 * @tparam BehaviorT Notification behaviour. Default: @c kNoBehavior.
195 * @param v Output: receives the popped element via move.
196 */
197 template <Behavior BehaviorT = kNoBehavior>
198 void pop(T& v) noexcept VLINK_NO_INSTRUMENT;
199
200 /**
201 * @brief Pops a value without blocking; returns @c false if the queue is empty.
202 *
203 * @tparam BehaviorT Notification behaviour. Default: @c kNoBehavior.
204 * @param v Output: receives the popped element via move.
205 * @return @c true if an element was popped; @c false if empty.
206 */
207 template <Behavior BehaviorT = kNoBehavior>
208 [[nodiscard]] bool try_pop(T& v) noexcept VLINK_NO_INSTRUMENT;
209
210 /**
211 * @brief Returns the fixed capacity of the queue.
212 *
213 * @return Capacity as passed to the constructor.
214 */
215 [[nodiscard]] size_t capacity() const noexcept VLINK_NO_INSTRUMENT;
216
217 /**
218 * @brief Returns an approximation of the current number of elements.
219 *
220 * @details
221 * If @p real is @c false (default), returns @c head - tail (fast but may be slightly stale).
222 * If @p real is @c true, retries up to 50 times until a stable snapshot is obtained.
223 *
224 * @param real If @c true, use a more accurate but slower measurement. Default: @c false.
225 * @return Approximate element count.
226 */
227 [[nodiscard]] size_t size(bool real = false) const noexcept VLINK_NO_INSTRUMENT;
228
229 /**
230 * @brief Returns @c true if the queue appears to be empty.
231 *
232 * @param real If @c true, use a more accurate measurement. Default: @c false.
233 * @return @c true if @c size(real) == 0.
234 */
235 [[nodiscard]] bool empty(bool real = false) const noexcept VLINK_NO_INSTRUMENT;
236
237 /**
238 * @brief Returns @c true if the queue appears to be full.
239 *
240 * @param real If @c true, use a more accurate measurement. Default: @c false.
241 * @return @c true if @c size(real) >= @c capacity().
242 */
243 [[nodiscard]] bool is_full(bool real = false) const noexcept VLINK_NO_INSTRUMENT;
244
245 /**
246 * @brief Blocks until the queue is not empty (or a timeout elapses).
247 *
248 * @details
249 * If @p timeout is @c std::chrono::milliseconds(0), blocks indefinitely.
250 * Returns immediately if the queue already has elements.
251 * Returns @c false if @c notify_to_quit() was called.
252 *
253 * @param timeout Wait duration. 0 = wait indefinitely. Default: 0.
254 * @return @c true if the queue became non-empty; @c false on quit.
255 */
256 bool wait_not_empty(std::chrono::milliseconds timeout = std::chrono::milliseconds(0)) noexcept VLINK_NO_INSTRUMENT;
257
258 /**
259 * @brief Blocks until the queue has space (or a timeout elapses).
260 *
261 * @details
262 * If @p timeout is @c std::chrono::milliseconds(0), blocks indefinitely.
263 * Returns immediately if the queue is not full.
264 * Returns @c false if @c notify_to_quit() was called.
265 *
266 * @param timeout Wait duration. 0 = wait indefinitely. Default: 0.
267 * @return @c true if space is available; @c false on quit.
268 */
269 bool wait_not_full(std::chrono::milliseconds timeout = std::chrono::milliseconds(0)) noexcept VLINK_NO_INSTRUMENT;
270
271 /**
272 * @brief Signals all blocked @c wait_not_empty() / @c wait_not_full() callers to exit.
273 *
274 * @details
275 * Sets the quit flag so that all subsequent @c emplace/push calls are silently dropped and
276 * all blocking @c pop / wait calls return immediately. Used during graceful shutdown.
277 */
278 void notify_to_quit() noexcept VLINK_NO_INSTRUMENT;
279
280 private:
281#if defined(__cpp_aligned_new)
282 template <typename ChunkT>
283 using AlignedAllocator = std::allocator<ChunkT>;
284#else
285 template <typename ChunkT>
286 struct AlignedAllocator {
287 using value_type = ChunkT;
288
289 ChunkT* allocate(std::size_t n) {
290 if (n > std::numeric_limits<std::size_t>::max() / sizeof(ChunkT)) {
291 throw std::bad_array_new_length();
292 }
293
294#ifdef _WIN32
295 auto* p = static_cast<ChunkT*>(_aligned_malloc(sizeof(ChunkT) * n, alignof(ChunkT)));
296
297 if (p == nullptr) {
298 throw std::bad_alloc();
299 }
300#else
301 ChunkT* p;
302
303 if (posix_memalign(reinterpret_cast<void**>(&p), alignof(ChunkT), sizeof(ChunkT) * n) != 0) {
304 throw std::bad_alloc();
305 }
306#endif
307
308 return p;
309 }
310
311 void deallocate(ChunkT* p, std::size_t) {
312#ifdef _WIN32
313 _aligned_free(p);
314#else
315 free(p);
316#endif
317 }
318 };
319#endif
320
321 static constexpr std::memory_order kMemoryOrderAcquire = std::memory_order_acquire;
322 static constexpr std::memory_order kMemoryOrderRelease = std::memory_order_release;
323 static constexpr std::memory_order kMemoryOrderRelaxed = std::memory_order_relaxed;
324
325 static constexpr size_t kInterferenceSize = 64U;
326 static constexpr size_t kFirstSpinTimes = 32U;
327
328 [[nodiscard]] constexpr size_t idx(size_t i) const noexcept { return i % capacity_; }
329
330 [[nodiscard]] constexpr size_t turn(size_t i) const noexcept { return i / capacity_; }
331
332 struct Chunk {
333 ~Chunk() noexcept {
334 if ((turn.load(kMemoryOrderAcquire) & 1U) != 0U) {
335 destroy();
336 }
337 }
338
339 template <typename... Args>
340 void construct(Args&&... args) noexcept {
341 new (storage.data()) T(std::forward<Args>(args)...);
342 }
343
344 void destroy() noexcept { reinterpret_cast<T*>(storage.data())->~T(); }
345
346 [[nodiscard]] T&& move() noexcept { return std::move(*reinterpret_cast<T*>(storage.data())); }
347
348 alignas(kInterferenceSize) std::atomic<size_t> turn{0U};
349 alignas(alignof(T)) std::array<uint8_t, sizeof(T)> storage;
350 };
351
352 alignas(kInterferenceSize) std::atomic<size_t> head_{0U};
353
354 Chunk* chunk_{nullptr};
355 size_t capacity_{0};
356
357 mutable std::mutex cv_mtx_;
358
359 alignas(kInterferenceSize) std::atomic<size_t> tail_{0U};
360
361 vlink::condition_variable cv_not_empty_;
362 vlink::condition_variable cv_not_full_;
363
364#if defined(__has_cpp_attribute) && __has_cpp_attribute(no_unique_address)
365 AlignedAllocator<Chunk> allocator_ [[no_unique_address]];
366#else
367 AlignedAllocator<Chunk> allocator_;
368#endif
369
370 struct alignas(kInterferenceSize) QuitFlag {
371 std::atomic_bool value{false};
372 char padding[kInterferenceSize - sizeof(std::atomic_bool)];
373 } quit_flag_;
374
376};
377
378////////////////////////////////////////////////////////////////
379/// Details
380////////////////////////////////////////////////////////////////
381
382template <typename T>
383inline MpmcQueue<T>::MpmcQueue(size_t capacity) : capacity_(capacity) {
384 static_assert(alignof(Chunk) == kInterferenceSize,
385 "Slot must be aligned to cache line boundary to prevent false sharing");
386 static_assert(sizeof(Chunk) % kInterferenceSize == 0,
387 "Slot size must be a multiple of cache line size to prevent "
388 "false sharing between adjacent slots");
389 static_assert(sizeof(MpmcQueue) % kInterferenceSize == 0,
390 "Queue size must be a multiple of cache line size to "
391 "prevent false sharing between adjacent queues");
392
393 if VUNLIKELY (capacity_ < 1U) {
394 throw std::invalid_argument("capacity < 1U");
395 }
396
397 chunk_ = allocator_.allocate(capacity_ + 1);
398
399 if VUNLIKELY (reinterpret_cast<size_t>(chunk_) % alignof(Chunk) != 0U) {
400 allocator_.deallocate(chunk_, capacity_ + 1);
401 throw std::bad_alloc();
402 }
403
404 for (size_t i = 0U; i < capacity_; ++i) {
405 new (&chunk_[i]) Chunk();
406 }
407}
408
409template <typename T>
410inline MpmcQueue<T>::~MpmcQueue() noexcept {
411 for (size_t i = 0U; i < capacity_; ++i) {
412 chunk_[i].~Chunk();
413 }
414
415 allocator_.deallocate(chunk_, capacity_ + 1);
416}
417
418template <typename T>
419template <typename MpmcQueue<T>::Behavior BehaviorT, typename... Args>
420inline void MpmcQueue<T>::emplace(Args&&... args) noexcept {
421 if VUNLIKELY (quit_flag_.value.load(kMemoryOrderAcquire)) {
422 return;
423 }
424
425 if constexpr (BehaviorT == kConditionBehavior) {
426 wait_not_full(std::chrono::milliseconds(0));
427 if VUNLIKELY (quit_flag_.value.load(kMemoryOrderAcquire)) {
428 return;
429 }
430 }
431
432 const auto head = head_.fetch_add(1U);
433 auto& chunk = chunk_[idx(head)];
434
435 size_t spin = 0;
436
437 while (turn(head) * 2U != chunk.turn.load(kMemoryOrderAcquire)) {
438 if VUNLIKELY (quit_flag_.value.load(kMemoryOrderAcquire)) {
439 chunk.construct();
440 chunk.turn.store((turn(head) * 2U) + 1U, kMemoryOrderRelease);
441 return;
442 }
443
444 if (++spin < kFirstSpinTimes) {
445 } else {
447 }
448 }
449
450 chunk.construct(std::forward<Args>(args)...);
451 chunk.turn.store((turn(head) * 2U) + 1U, kMemoryOrderRelease);
452
453 if constexpr (BehaviorT == kConditionBehavior) {
454 std::lock_guard lock(cv_mtx_);
455 cv_not_empty_.notify_one();
456 }
457}
458
459template <typename T>
460template <typename MpmcQueue<T>::Behavior BehaviorT, typename... Args>
461inline bool MpmcQueue<T>::try_emplace(Args&&... args) noexcept {
462 if VUNLIKELY (quit_flag_.value.load(kMemoryOrderAcquire)) {
463 return false;
464 }
465
466 auto head = head_.load(kMemoryOrderAcquire);
467
468 for (;;) {
469 auto& chunk = chunk_[idx(head)];
470
471 if VLIKELY (turn(head) * 2U == chunk.turn.load(kMemoryOrderAcquire)) {
472 if (head_.compare_exchange_strong(head, head + 1U)) {
473 chunk.construct(std::forward<Args>(args)...);
474 chunk.turn.store((turn(head) * 2U) + 1U, kMemoryOrderRelease);
475
476 if constexpr (BehaviorT == kConditionBehavior) {
477 std::lock_guard lock(cv_mtx_);
478 cv_not_empty_.notify_one();
479 }
480
481 return true;
482 }
483 } else {
484 const auto prev_head = head;
485 head = head_.load(kMemoryOrderAcquire);
486 if VUNLIKELY (head == prev_head) {
487 return false;
488 }
489 }
490 }
491}
492
493template <typename T>
494template <typename MpmcQueue<T>::Behavior BehaviorT, typename P>
495inline void MpmcQueue<T>::push(P&& v) noexcept {
496 emplace<BehaviorT>(std::forward<P>(v));
497}
498
499template <typename T>
500template <typename MpmcQueue<T>::Behavior BehaviorT, typename P>
501inline bool MpmcQueue<T>::try_push(P&& v) noexcept {
502 return try_emplace<BehaviorT>(std::forward<P>(v));
503}
504
505template <typename T>
506template <typename MpmcQueue<T>::Behavior BehaviorT>
507inline void MpmcQueue<T>::pop(T& v) noexcept {
508 auto const tail = tail_.fetch_add(1U);
509 auto& chunk = chunk_[idx(tail)];
510
511 size_t spin = 0;
512
513 while (turn(tail) * 2U + 1U != chunk.turn.load(kMemoryOrderAcquire)) {
514 if VUNLIKELY (quit_flag_.value.load(kMemoryOrderAcquire)) {
515 chunk.turn.store((turn(tail) * 2U) + 2U, kMemoryOrderRelease);
516 return;
517 }
518
519 if (++spin < kFirstSpinTimes) {
520 } else {
522 }
523 }
524
525 v = chunk.move();
526 chunk.destroy();
527 chunk.turn.store((turn(tail) * 2U) + 2U, kMemoryOrderRelease);
528
529 if constexpr (BehaviorT == kConditionBehavior) {
530 std::lock_guard lock(cv_mtx_);
531 cv_not_full_.notify_one();
532 }
533}
534
535template <typename T>
536template <typename MpmcQueue<T>::Behavior BehaviorT>
537inline bool MpmcQueue<T>::try_pop(T& v) noexcept {
538 if VUNLIKELY (quit_flag_.value.load(kMemoryOrderAcquire)) {
539 return false;
540 }
541
542 auto tail = tail_.load(kMemoryOrderAcquire);
543
544 // LCOV_EXCL_START
545 // GCOVR_EXCL_START
546
547 for (;;) {
548 auto& chunk = chunk_[idx(tail)];
549
550 if VLIKELY (turn(tail) * 2U + 1U == chunk.turn.load(kMemoryOrderAcquire)) {
551 if (tail_.compare_exchange_strong(tail, tail + 1U)) {
552 v = chunk.move();
553 chunk.destroy();
554 chunk.turn.store((turn(tail) * 2U) + 2U, kMemoryOrderRelease);
555
556 if constexpr (BehaviorT == kConditionBehavior) {
557 std::lock_guard lock(cv_mtx_);
558 cv_not_full_.notify_one();
559 }
560
561 return true;
562 }
563 } else {
564 const auto prev_tail = tail;
565 tail = tail_.load(kMemoryOrderAcquire);
566
567 if VUNLIKELY (tail == prev_tail) {
568 return false;
569 }
570 }
571
572 if VUNLIKELY (quit_flag_.value.load(kMemoryOrderAcquire)) {
573 return false;
574 }
575 }
576
577 // GCOVR_EXCL_STOP
578 // LCOV_EXCL_STOP
579}
580
581template <typename T>
582inline size_t MpmcQueue<T>::capacity() const noexcept {
583 return capacity_;
584}
585
586template <typename T>
587inline size_t MpmcQueue<T>::size(bool real) const noexcept {
588 static auto safe_diff = [](size_t h, size_t t) -> size_t { return (h >= t) ? (h - t) : 0U; };
589
590 if (real) {
591 static constexpr size_t kMaxRetry = 50U;
592 size_t retry_cnt = 0;
593
594 size_t t = tail_.load(kMemoryOrderAcquire);
595 size_t h = head_.load(kMemoryOrderAcquire);
596
597 while (retry_cnt < kMaxRetry) {
598 size_t t2 = tail_.load(kMemoryOrderAcquire);
599
600 if (t == t2) {
601 return safe_diff(h, t);
602 }
603
604 t = t2;
605 h = head_.load(kMemoryOrderAcquire);
606 retry_cnt++;
607
609 }
610
611 return safe_diff(h, t);
612 } else {
613 auto h = head_.load(kMemoryOrderAcquire);
614 auto t = tail_.load(kMemoryOrderAcquire);
615
616 return safe_diff(h, t);
617 }
618}
619
620template <typename T>
621inline bool MpmcQueue<T>::empty(bool real) const noexcept {
622 return size(real) == 0;
623}
624
625template <typename T>
626bool MpmcQueue<T>::is_full(bool real) const noexcept {
627 return size(real) >= capacity_;
628}
629
630template <typename T>
631inline bool MpmcQueue<T>::wait_not_empty(std::chrono::milliseconds timeout) noexcept {
632 if (!empty(true)) {
633 return true;
634 }
635
636 std::unique_lock lock(cv_mtx_);
637
638 bool ret = true;
639
640 if (timeout == std::chrono::milliseconds(0)) {
641 cv_not_empty_.wait(lock, [this]() { return !empty(true) || quit_flag_.value.load(kMemoryOrderAcquire); });
642 } else {
643 ret = cv_not_empty_.wait_for(lock, timeout,
644 [this]() { return !empty(true) || quit_flag_.value.load(kMemoryOrderAcquire); });
645 }
646
647 if VUNLIKELY (quit_flag_.value.load(kMemoryOrderAcquire)) {
648 return false;
649 }
650
651 return ret;
652}
653
654template <typename T>
655inline bool MpmcQueue<T>::wait_not_full(std::chrono::milliseconds timeout) noexcept {
656 if (!is_full(true)) {
657 return true;
658 }
659
660 std::unique_lock lock(cv_mtx_);
661
662 bool ret = true;
663
664 if (timeout == std::chrono::milliseconds(0)) {
665 cv_not_full_.wait(lock, [this]() { return !is_full(true) || quit_flag_.value.load(kMemoryOrderAcquire); });
666 } else {
667 ret = cv_not_full_.wait_for(lock, timeout,
668 [this]() { return !is_full(true) || quit_flag_.value.load(kMemoryOrderAcquire); });
669 }
670
671 if VUNLIKELY (quit_flag_.value.load(kMemoryOrderAcquire)) {
672 return false;
673 }
674
675 return ret;
676}
677
678template <typename T>
679inline void MpmcQueue<T>::notify_to_quit() noexcept {
680 std::lock_guard lock(cv_mtx_);
681
682 quit_flag_.value.store(true, kMemoryOrderRelease);
683
684 cv_not_empty_.notify_all();
685 cv_not_full_.notify_all();
686}
687
688} // namespace vlink
POSIX monotonic-clock condition variable replacing std::condition_variable.
Platform-independent macro definitions for the VLink library.
#define VUNLIKELY(...)
Shorthand alias for VLINK_UNLIKELY. Hints that the expression is unlikely true.
定义 macros.h:302
#define VLIKELY(...)
Shorthand alias for VLINK_LIKELY. Hints that the expression is likely true.
定义 macros.h:297
#define VLINK_DISALLOW_COPY_AND_ASSIGN(classname)
Deletes the copy constructor and copy-assignment operator of classname.
定义 macros.h:184
#define VLINK_NO_INSTRUMENT
定义 mpmc_queue.h:97
STL namespace
Platform-agnostic system utilities for process, thread, network and signal management.