95#define VLINK_NO_INSTRUMENT __attribute__((no_instrument_function))
97#define VLINK_NO_INSTRUMENT
281#if defined(__cpp_aligned_new)
282 template <
typename ChunkT>
283 using AlignedAllocator = std::allocator<ChunkT>;
285 template <
typename ChunkT>
286 struct AlignedAllocator {
287 using value_type = ChunkT;
289 ChunkT* allocate(std::size_t n) {
290 if (n > std::numeric_limits<std::size_t>::max() /
sizeof(ChunkT)) {
291 throw std::bad_array_new_length();
295 auto* p =
static_cast<ChunkT*
>(_aligned_malloc(
sizeof(ChunkT) * n,
alignof(ChunkT)));
298 throw std::bad_alloc();
303 if (posix_memalign(
reinterpret_cast<void**
>(&p),
alignof(ChunkT),
sizeof(ChunkT) * n) != 0) {
304 throw std::bad_alloc();
311 void deallocate(ChunkT* p, std::size_t) {
321 static constexpr std::memory_order kMemoryOrderAcquire = std::memory_order_acquire;
322 static constexpr std::memory_order kMemoryOrderRelease = std::memory_order_release;
323 static constexpr std::memory_order kMemoryOrderRelaxed = std::memory_order_relaxed;
325 static constexpr size_t kInterferenceSize = 64U;
326 static constexpr size_t kFirstSpinTimes = 32U;
328 [[nodiscard]]
constexpr size_t idx(
size_t i)
const noexcept {
return i % capacity_; }
330 [[nodiscard]]
constexpr size_t turn(
size_t i)
const noexcept {
return i / capacity_; }
334 if ((turn.load(kMemoryOrderAcquire) & 1U) != 0U) {
339 template <
typename... Args>
340 void construct(Args&&... args)
noexcept {
341 new (storage.data()) T(std::forward<Args>(args)...);
344 void destroy() noexcept {
reinterpret_cast<T*
>(storage.data())->~T(); }
346 [[nodiscard]] T&& move() noexcept {
return std::move(*
reinterpret_cast<T*
>(storage.data())); }
348 alignas(kInterferenceSize) std::atomic<size_t> turn{0U};
349 alignas(
alignof(T)) std::array<uint8_t,
sizeof(T)> storage;
352 alignas(kInterferenceSize) std::atomic<size_t> head_{0U};
354 Chunk* chunk_{
nullptr};
357 mutable std::mutex cv_mtx_;
359 alignas(kInterferenceSize) std::atomic<size_t> tail_{0U};
364#if defined(__has_cpp_attribute) && __has_cpp_attribute(no_unique_address)
365 AlignedAllocator<Chunk> allocator_ [[no_unique_address]];
367 AlignedAllocator<Chunk> allocator_;
370 struct alignas(kInterferenceSize) QuitFlag {
371 std::atomic_bool value{
false};
372 char padding[kInterferenceSize -
sizeof(std::atomic_bool)];
384 static_assert(
alignof(Chunk) == kInterferenceSize,
385 "Slot must be aligned to cache line boundary to prevent false sharing");
386 static_assert(
sizeof(Chunk) % kInterferenceSize == 0,
387 "Slot size must be a multiple of cache line size to prevent "
388 "false sharing between adjacent slots");
389 static_assert(
sizeof(
MpmcQueue) % kInterferenceSize == 0,
390 "Queue size must be a multiple of cache line size to "
391 "prevent false sharing between adjacent queues");
394 throw std::invalid_argument(
"capacity < 1U");
397 chunk_ = allocator_.allocate(capacity_ + 1);
399 if VUNLIKELY (
reinterpret_cast<size_t>(chunk_) %
alignof(Chunk) != 0U) {
400 allocator_.deallocate(chunk_, capacity_ + 1);
401 throw std::bad_alloc();
404 for (
size_t i = 0U; i < capacity_; ++i) {
405 new (&chunk_[i]) Chunk();
411 for (
size_t i = 0U; i < capacity_; ++i) {
415 allocator_.deallocate(chunk_, capacity_ + 1);
419template <typename MpmcQueue<T>::Behavior BehaviorT,
typename... Args>
421 if VUNLIKELY (quit_flag_.value.load(kMemoryOrderAcquire)) {
427 if VUNLIKELY (quit_flag_.value.load(kMemoryOrderAcquire)) {
432 const auto head = head_.fetch_add(1U);
433 auto& chunk = chunk_[idx(head)];
437 while (turn(head) * 2U != chunk.turn.load(kMemoryOrderAcquire)) {
438 if VUNLIKELY (quit_flag_.value.load(kMemoryOrderAcquire)) {
440 chunk.turn.store((turn(head) * 2U) + 1U, kMemoryOrderRelease);
444 if (++spin < kFirstSpinTimes) {
450 chunk.construct(std::forward<Args>(args)...);
451 chunk.turn.store((turn(head) * 2U) + 1U, kMemoryOrderRelease);
454 std::lock_guard lock(cv_mtx_);
455 cv_not_empty_.notify_one();
460template <typename MpmcQueue<T>::Behavior BehaviorT,
typename... Args>
462 if VUNLIKELY (quit_flag_.value.load(kMemoryOrderAcquire)) {
466 auto head = head_.load(kMemoryOrderAcquire);
469 auto& chunk = chunk_[idx(head)];
471 if VLIKELY (turn(head) * 2U == chunk.turn.load(kMemoryOrderAcquire)) {
472 if (head_.compare_exchange_strong(head, head + 1U)) {
473 chunk.construct(std::forward<Args>(args)...);
474 chunk.turn.store((turn(head) * 2U) + 1U, kMemoryOrderRelease);
477 std::lock_guard lock(cv_mtx_);
478 cv_not_empty_.notify_one();
484 const auto prev_head = head;
485 head = head_.load(kMemoryOrderAcquire);
494template <typename MpmcQueue<T>::Behavior BehaviorT,
typename P>
500template <typename MpmcQueue<T>::Behavior BehaviorT,
typename P>
506template <typename MpmcQueue<T>::Behavior BehaviorT>
508 auto const tail = tail_.fetch_add(1U);
509 auto& chunk = chunk_[idx(tail)];
513 while (turn(tail) * 2U + 1U != chunk.turn.load(kMemoryOrderAcquire)) {
514 if VUNLIKELY (quit_flag_.value.load(kMemoryOrderAcquire)) {
515 chunk.turn.store((turn(tail) * 2U) + 2U, kMemoryOrderRelease);
519 if (++spin < kFirstSpinTimes) {
527 chunk.turn.store((turn(tail) * 2U) + 2U, kMemoryOrderRelease);
530 std::lock_guard lock(cv_mtx_);
531 cv_not_full_.notify_one();
536template <typename MpmcQueue<T>::Behavior BehaviorT>
538 if VUNLIKELY (quit_flag_.value.load(kMemoryOrderAcquire)) {
542 auto tail = tail_.load(kMemoryOrderAcquire);
548 auto& chunk = chunk_[idx(tail)];
550 if VLIKELY (turn(tail) * 2U + 1U == chunk.turn.load(kMemoryOrderAcquire)) {
551 if (tail_.compare_exchange_strong(tail, tail + 1U)) {
554 chunk.turn.store((turn(tail) * 2U) + 2U, kMemoryOrderRelease);
557 std::lock_guard lock(cv_mtx_);
558 cv_not_full_.notify_one();
564 const auto prev_tail = tail;
565 tail = tail_.load(kMemoryOrderAcquire);
572 if VUNLIKELY (quit_flag_.value.load(kMemoryOrderAcquire)) {
588 static auto safe_diff = [](
size_t h,
size_t t) ->
size_t {
return (h >= t) ? (h - t) : 0U; };
591 static constexpr size_t kMaxRetry = 50U;
592 size_t retry_cnt = 0;
594 size_t t = tail_.load(kMemoryOrderAcquire);
595 size_t h = head_.load(kMemoryOrderAcquire);
597 while (retry_cnt < kMaxRetry) {
598 size_t t2 = tail_.load(kMemoryOrderAcquire);
601 return safe_diff(h, t);
605 h = head_.load(kMemoryOrderAcquire);
611 return safe_diff(h, t);
613 auto h = head_.load(kMemoryOrderAcquire);
614 auto t = tail_.load(kMemoryOrderAcquire);
616 return safe_diff(h, t);
622 return size(real) == 0;
627 return size(real) >= capacity_;
636 std::unique_lock lock(cv_mtx_);
640 if (timeout == std::chrono::milliseconds(0)) {
641 cv_not_empty_.wait(lock, [
this]() {
return !
empty(
true) || quit_flag_.value.load(kMemoryOrderAcquire); });
643 ret = cv_not_empty_.wait_for(lock, timeout,
644 [
this]() {
return !
empty(
true) || quit_flag_.value.load(kMemoryOrderAcquire); });
647 if VUNLIKELY (quit_flag_.value.load(kMemoryOrderAcquire)) {
660 std::unique_lock lock(cv_mtx_);
664 if (timeout == std::chrono::milliseconds(0)) {
665 cv_not_full_.wait(lock, [
this]() {
return !
is_full(
true) || quit_flag_.value.load(kMemoryOrderAcquire); });
667 ret = cv_not_full_.wait_for(lock, timeout,
668 [
this]() {
return !
is_full(
true) || quit_flag_.value.load(kMemoryOrderAcquire); });
671 if VUNLIKELY (quit_flag_.value.load(kMemoryOrderAcquire)) {
680 std::lock_guard lock(cv_mtx_);
682 quit_flag_.value.store(
true, kMemoryOrderRelease);
684 cv_not_empty_.notify_all();
685 cv_not_full_.notify_all();
void pop(T &v) noexcept VLINK_NO_INSTRUMENT
Pops a value by move and blocks until an element is available.
Definition mpmc_queue.h:507
bool empty(bool real=false) const noexcept VLINK_NO_INSTRUMENT
Returns true if the queue appears to be empty.
Definition mpmc_queue.h:621
bool wait_not_empty(std::chrono::milliseconds timeout=std::chrono::milliseconds(0)) noexcept VLINK_NO_INSTRUMENT
Blocks until the queue is not empty (or a timeout elapses).
Definition mpmc_queue.h:631
bool try_push(P &&v) noexcept VLINK_NO_INSTRUMENT
Pushes a value without blocking; returns false if the queue is full.
Definition mpmc_queue.h:501
~MpmcQueue() noexcept VLINK_NO_INSTRUMENT
Destructor. Destroys any elements still in the queue.
Definition mpmc_queue.h:410
MpmcQueue(size_t capacity) VLINK_NO_INSTRUMENT
Constructs a MpmcQueue with the given fixed capacity.
Definition mpmc_queue.h:383
Behavior
Controls whether condition-variable notifications are sent on push/pop.
Definition mpmc_queue.h:119
@ kNoBehavior
Definition mpmc_queue.h:119
@ kConditionBehavior
Definition mpmc_queue.h:119
void push(P &&v) noexcept VLINK_NO_INSTRUMENT
Pushes a value (by forwarding) and blocks until a slot is available.
Definition mpmc_queue.h:495
bool try_pop(T &v) noexcept VLINK_NO_INSTRUMENT
Pops a value without blocking; returns false if the queue is empty.
Definition mpmc_queue.h:537
bool wait_not_full(std::chrono::milliseconds timeout=std::chrono::milliseconds(0)) noexcept VLINK_NO_INSTRUMENT
Blocks until the queue has space (or a timeout elapses).
Definition mpmc_queue.h:655
void notify_to_quit() noexcept VLINK_NO_INSTRUMENT
Signals all blocked wait_not_empty() / wait_not_full() callers to exit.
Definition mpmc_queue.h:679
size_t size(bool real=false) const noexcept VLINK_NO_INSTRUMENT
Returns an approximation of the current number of elements.
Definition mpmc_queue.h:587
void emplace(Args &&... args) noexcept VLINK_NO_INSTRUMENT
In-place constructs an element and blocks until a slot is available.
Definition mpmc_queue.h:420
bool is_full(bool real=false) const noexcept VLINK_NO_INSTRUMENT
Returns true if the queue appears to be full.
Definition mpmc_queue.h:626
bool try_emplace(Args &&... args) noexcept VLINK_NO_INSTRUMENT
In-place constructs an element without blocking.
Definition mpmc_queue.h:461
size_t capacity() const noexcept VLINK_NO_INSTRUMENT
Returns the fixed capacity of the queue.
Definition mpmc_queue.h:582
POSIX monotonic-clock condition variable replacing std::condition_variable.
Platform-independent macro definitions for the VLink library.
#define VUNLIKELY(...)
Shorthand alias for VLINK_UNLIKELY. Hints that the expression is unlikely true.
Definition macros.h:302
#define VLIKELY(...)
Shorthand alias for VLINK_LIKELY. Hints that the expression is likely true.
Definition macros.h:297
#define VLINK_DISALLOW_COPY_AND_ASSIGN(classname)
Deletes the copy constructor and copy-assignment operator of classname.
Definition macros.h:184
#define VLINK_NO_INSTRUMENT
Definition mpmc_queue.h:97
VLINK_EXPORT void yield_cpu() noexcept
Emits a CPU pause/yield hint to reduce bus contention in busy-wait loops.
Definition utils.h:412
ConditionVariable condition_variable
Definition condition_variable.h:686
Platform-agnostic system utilities for process, thread, network and signal management.