1// Copyright(c) 2015-present, Gabi Melman & spdlog contributors.
2// Distributed under the MIT License (http://opensource.org/licenses/MIT)
3
4#pragma once
5
6// multi producer-multi consumer blocking queue.
7// enqueue(..) - will block until room found to put the new message.
8// enqueue_nowait(..) - will return immediately with false if no room left in
9// the queue.
10// dequeue_for(..) - will block until the queue is not empty or timeout have
11// passed.
12
13#include <spdlog/details/circular_q.h>
14
15#include <atomic>
16#include <condition_variable>
17#include <mutex>
18
19namespace spdlog {
20namespace details {
21
22template <typename T>
23class mpmc_blocking_queue {
24public:
25 using item_type = T;
26 explicit mpmc_blocking_queue(size_t max_items)
27 : q_(max_items) {}
28
29#ifndef __MINGW32__
30 // try to enqueue and block if no room left
31 void enqueue(T &&item) {
32 {
33 std::unique_lock<std::mutex> lock(queue_mutex_);
34 pop_cv_.wait(lock, [this] { return !this->q_.full(); });
35 q_.push_back(std::move(item));
36 }
37 push_cv_.notify_one();
38 }
39
40 // enqueue immediately. overrun oldest message in the queue if no room left.
41 void enqueue_nowait(T &&item) {
42 {
43 std::unique_lock<std::mutex> lock(queue_mutex_);
44 q_.push_back(std::move(item));
45 }
46 push_cv_.notify_one();
47 }
48
49 void enqueue_if_have_room(T &&item) {
50 bool pushed = false;
51 {
52 std::unique_lock<std::mutex> lock(queue_mutex_);
53 if (!q_.full()) {
54 q_.push_back(std::move(item));
55 pushed = true;
56 }
57 }
58
59 if (pushed) {
60 push_cv_.notify_one();
61 } else {
62 ++discard_counter_;
63 }
64 }
65
66 // dequeue with a timeout.
67 // Return true, if succeeded dequeue item, false otherwise
68 bool dequeue_for(T &popped_item, std::chrono::milliseconds wait_duration) {
69 {
70 std::unique_lock<std::mutex> lock(queue_mutex_);
71 if (!push_cv_.wait_for(lock, wait_duration, [this] { return !this->q_.empty(); })) {
72 return false;
73 }
74 popped_item = std::move(q_.front());
75 q_.pop_front();
76 }
77 pop_cv_.notify_one();
78 return true;
79 }
80
81 // blocking dequeue without a timeout.
82 void dequeue(T &popped_item) {
83 {
84 std::unique_lock<std::mutex> lock(queue_mutex_);
85 push_cv_.wait(lock, [this] { return !this->q_.empty(); });
86 popped_item = std::move(q_.front());
87 q_.pop_front();
88 }
89 pop_cv_.notify_one();
90 }
91
92#else
93 // apparently mingw deadlocks if the mutex is released before cv.notify_one(),
94 // so release the mutex at the very end each function.
95
96 // try to enqueue and block if no room left
97 void enqueue(T &&item) {
98 std::unique_lock<std::mutex> lock(queue_mutex_);
99 pop_cv_.wait(lock, [this] { return !this->q_.full(); });
100 q_.push_back(std::move(item));
101 push_cv_.notify_one();
102 }
103
104 // enqueue immediately. overrun oldest message in the queue if no room left.
105 void enqueue_nowait(T &&item) {
106 std::unique_lock<std::mutex> lock(queue_mutex_);
107 q_.push_back(std::move(item));
108 push_cv_.notify_one();
109 }
110
111 void enqueue_if_have_room(T &&item) {
112 bool pushed = false;
113 std::unique_lock<std::mutex> lock(queue_mutex_);
114 if (!q_.full()) {
115 q_.push_back(std::move(item));
116 pushed = true;
117 }
118
119 if (pushed) {
120 push_cv_.notify_one();
121 } else {
122 ++discard_counter_;
123 }
124 }
125
126 // dequeue with a timeout.
127 // Return true, if succeeded dequeue item, false otherwise
128 bool dequeue_for(T &popped_item, std::chrono::milliseconds wait_duration) {
129 std::unique_lock<std::mutex> lock(queue_mutex_);
130 if (!push_cv_.wait_for(lock, wait_duration, [this] { return !this->q_.empty(); })) {
131 return false;
132 }
133 popped_item = std::move(q_.front());
134 q_.pop_front();
135 pop_cv_.notify_one();
136 return true;
137 }
138
139 // blocking dequeue without a timeout.
140 void dequeue(T &popped_item) {
141 std::unique_lock<std::mutex> lock(queue_mutex_);
142 push_cv_.wait(lock, [this] { return !this->q_.empty(); });
143 popped_item = std::move(q_.front());
144 q_.pop_front();
145 pop_cv_.notify_one();
146 }
147
148#endif
149
150 size_t overrun_counter() {
151 std::unique_lock<std::mutex> lock(queue_mutex_);
152 return q_.overrun_counter();
153 }
154
155 size_t discard_counter() { return discard_counter_.load(m: std::memory_order_relaxed); }
156
157 size_t size() {
158 std::unique_lock<std::mutex> lock(queue_mutex_);
159 return q_.size();
160 }
161
162 void reset_overrun_counter() {
163 std::unique_lock<std::mutex> lock(queue_mutex_);
164 q_.reset_overrun_counter();
165 }
166
167 void reset_discard_counter() { discard_counter_.store(i: 0, m: std::memory_order_relaxed); }
168
169private:
170 std::mutex queue_mutex_;
171 std::condition_variable push_cv_;
172 std::condition_variable pop_cv_;
173 spdlog::details::circular_q<T> q_;
174 std::atomic<size_t> discard_counter_{0};
175};
176} // namespace details
177} // namespace spdlog
178