1// Copyright(c) 2015-present, Gabi Melman & spdlog contributors.
2// Distributed under the MIT License (http://opensource.org/licenses/MIT)
3
4#pragma once
5
6#ifndef SPDLOG_HEADER_ONLY
7 #include <spdlog/details/thread_pool.h>
8#endif
9
10#include <cassert>
11#include <spdlog/common.h>
12
13namespace spdlog {
14namespace details {
15
16SPDLOG_INLINE thread_pool::thread_pool(size_t q_max_items,
17 size_t threads_n,
18 std::function<void()> on_thread_start,
19 std::function<void()> on_thread_stop)
20 : q_(q_max_items) {
21 if (threads_n == 0 || threads_n > 1000) {
22 throw_spdlog_ex(
23 msg: "spdlog::thread_pool(): invalid threads_n param (valid "
24 "range is 1-1000)");
25 }
26 for (size_t i = 0; i < threads_n; i++) {
27 threads_.emplace_back(args: [this, on_thread_start, on_thread_stop] {
28 on_thread_start();
29 this->thread_pool::worker_loop_();
30 on_thread_stop();
31 });
32 }
33}
34
35SPDLOG_INLINE thread_pool::thread_pool(size_t q_max_items,
36 size_t threads_n,
37 std::function<void()> on_thread_start)
38 : thread_pool(q_max_items, threads_n, on_thread_start, [] {}) {}
39
40SPDLOG_INLINE thread_pool::thread_pool(size_t q_max_items, size_t threads_n)
41 : thread_pool(
42 q_max_items, threads_n, [] {}, [] {}) {}
43
44// message all threads to terminate gracefully join them
45SPDLOG_INLINE thread_pool::~thread_pool() {
46 SPDLOG_TRY {
47 for (size_t i = 0; i < threads_.size(); i++) {
48 post_async_msg_(new_msg: async_msg(async_msg_type::terminate), overflow_policy: async_overflow_policy::block);
49 }
50
51 for (auto &t : threads_) {
52 t.join();
53 }
54 }
55 SPDLOG_CATCH_STD
56}
57
58void SPDLOG_INLINE thread_pool::post_log(async_logger_ptr &&worker_ptr,
59 const details::log_msg &msg,
60 async_overflow_policy overflow_policy) {
61 async_msg async_m(std::move(worker_ptr), async_msg_type::log, msg);
62 post_async_msg_(new_msg: std::move(async_m), overflow_policy);
63}
64
65std::future<void> SPDLOG_INLINE thread_pool::post_flush(async_logger_ptr &&worker_ptr,
66 async_overflow_policy overflow_policy) {
67 std::promise<void> promise;
68 std::future<void> future = promise.get_future();
69 post_async_msg_(new_msg: async_msg(std::move(worker_ptr), async_msg_type::flush, std::move(promise)),
70 overflow_policy);
71 return future;
72}
73
74size_t SPDLOG_INLINE thread_pool::overrun_counter() { return q_.overrun_counter(); }
75
76void SPDLOG_INLINE thread_pool::reset_overrun_counter() { q_.reset_overrun_counter(); }
77
78size_t SPDLOG_INLINE thread_pool::discard_counter() { return q_.discard_counter(); }
79
80void SPDLOG_INLINE thread_pool::reset_discard_counter() { q_.reset_discard_counter(); }
81
82size_t SPDLOG_INLINE thread_pool::queue_size() { return q_.size(); }
83
84void SPDLOG_INLINE thread_pool::post_async_msg_(async_msg &&new_msg,
85 async_overflow_policy overflow_policy) {
86 if (overflow_policy == async_overflow_policy::block) {
87 q_.enqueue(item: std::move(new_msg));
88 } else if (overflow_policy == async_overflow_policy::overrun_oldest) {
89 q_.enqueue_nowait(item: std::move(new_msg));
90 } else {
91 assert(overflow_policy == async_overflow_policy::discard_new);
92 q_.enqueue_if_have_room(item: std::move(new_msg));
93 }
94}
95
96void SPDLOG_INLINE thread_pool::worker_loop_() {
97 while (process_next_msg_()) {
98 }
99}
100
101// process next message in the queue
102// return true if this thread should still be active (while no terminate msg
103// was received)
104bool SPDLOG_INLINE thread_pool::process_next_msg_() {
105 async_msg incoming_async_msg;
106 q_.dequeue(popped_item&: incoming_async_msg);
107
108 switch (incoming_async_msg.msg_type) {
109 case async_msg_type::log: {
110 incoming_async_msg.worker_ptr->backend_sink_it_(msg: incoming_async_msg);
111 return true;
112 }
113 case async_msg_type::flush: {
114 incoming_async_msg.worker_ptr->backend_flush_();
115 incoming_async_msg.flush_promise.set_value();
116 return true;
117 }
118
119 case async_msg_type::terminate: {
120 return false;
121 }
122
123 default: {
124 assert(false);
125 }
126 }
127
128 return true;
129}
130
131} // namespace details
132} // namespace spdlog
133