thread_pool 源码学习
源码定义
我们先概览一下spdlog-thread_pool定义
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 class SPDLOG_API thread_pool{ public : using item_type = async_msg; using q_type = details::mpmc_blocking_queue<item_type>; thread_pool (size_t q_max_items, size_t threads_n, std::function<void ()> on_thread_start); thread_pool (size_t q_max_items, size_t threads_n); ~thread_pool (); thread_pool (const thread_pool &) = delete ; thread_pool &operator =(thread_pool &&) = delete ; void post_log (async_logger_ptr &&worker_ptr, const details::log_msg &msg, async_overflow_policy overflow_policy) ; void post_flush (async_logger_ptr &&worker_ptr, async_overflow_policy overflow_policy) ; size_t overrun_counter () ; private : q_type q_; std::vector<std::thread> threads_; void post_async_msg_ (async_msg &&new_msg, async_overflow_policy overflow_policy) ; void worker_loop_ () ; bool process_next_msg_ () ; };
基本成员函数
首先我们从thread_pll中最基本的五个成员函数开始看。
1 2 3 4 5 6 7 8 9 thread_pool (size_t q_max_items, size_t threads_n, std::function<void ()> on_thread_start); thread_pool (size_t q_max_items, size_t threads_n);~thread_pool (); thread_pool (const thread_pool &) = delete ;thread_pool &operator =(thread_pool &&) = delete ;
可以看到该类删除了拷贝构造,移动构造,标志该类不可以被拷贝和移动。
其中有两个构造函数,我们来详细看看它们的实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 inline thread_pool (size_t q_max_items, size_t threads_n, std::function<void ()> on_thread_start) : q_(q_max_items) ///< 任务队列的最大数目 { if (threads_n == 0 || threads_n > 1000 ) { throw ( "spdlog::thread_pool(): invalid threads_n param (valid " "range is 1-1000)" ); } for (size_t i = 0 ; i < threads_n; i++) { threads_.emplace_back ([this , on_thread_start] { on_thread_start (); this ->thread_pool::worker_loop_ (); }); } } inline thread_pool::thread_pool (size_t q_max_items, size_t threads_n) : thread_pool(q_max_items, threads_n, [] { }) {}
接着我们来看一下析构函数执行了什么
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 ~thread_pool () { try { for (size_t i = 0 ; i < threads_.size (); i++) { post_async_msg_ (async_msg (async_msg_type::terminate), async_overflow_policy::block); } for (auto &t : threads_) { t.join (); } } catch (...) { } }
生产者逻辑
接着我们来看公有的两个接口函数的实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 void post_log (async_logger_ptr &&worker_ptr, const log_msg &msg, async_overflow_policy overflow_policy) { async_msg async_m (std::move(worker_ptr), async_msg_type::log, msg) ; post_async_msg_ (std::move (async_m), overflow_policy); } void post_flush (async_logger_ptr &&worker_ptr, async_overflow_policy overflow_policy) { post_async_msg_ (async_msg (std::move (worker_ptr), async_msg_type::flush), overflow_policy); } size_t overrun_counter () { return q_.overrun_counter (); }
post_log
和 post_flush
执行了一个差不多的任务,就是写日志,这两个函数都调用了post_async_msg_()
来执行具体的任务们就来看看post_async_msg_()
到底执行了什么。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 void post_async_msg_ (async_msg &&new_msg, async_overflow_policy overflow_policy) { if (overflow_policy == async_overflow_policy::block) { q_.enqueue (std::move (new_msg)); } else { q_.enqueue_nowait (std::move (new_msg)); } }
消费者逻辑
如上面的实现,我们知道这是一个生产者,从外部插入到本对象内的任务队列,等待消费者来处理这些消息
我们来看看消费者到底执行了什么。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 void worker_loop_ () { while (process_next_msg_ ()) { } } bool process_next_msg_ () { async_msg incoming_async_msg; bool dequeued = q_.dequeue_for (incoming_async_msg, std::chrono::seconds (10 )); if (!dequeued) { return true ; } switch (incoming_async_msg.msg_type) { case async_msg_type::log: { incoming_async_msg.worker_ptr->backend_sink_it_ (incoming_async_msg); return true ; } case async_msg_type::flush: { incoming_async_msg.worker_ptr->backend_flush_ (); return true ; } case async_msg_type::terminate: { return false ; } default : { assert (false ); } } return true ; }
上面代码的逻辑我们可以看到:首先由worker_loop()
函数来不停的执行消费者函数。
而消费者函数在不停地去任务队列中获取任务最后由backend_sink_it_()
和 backend_flush_()
两个函数来执行真正地任务。
任务队列
很简单的一个消费者和生产者的队列,但最核心的部分被一个任务队列mpmc_blocking_queue<async_msg>
给封装了,让我们继续深入来看看这个任务队列。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 template <typename T>class mpmc_blocking_queue { public : using item_type = T; explicit mpmc_blocking_queue (size_t max_items) : q_(max_items) { } void enqueue (T &&item) { { std::unique_lock<std::mutex> lock (queue_mutex_) ; pop_cv_.wait (lock, [this ] { return !this ->q_.full (); }); q_.push_back (std::move (item)); } push_cv_.notify_one (); } void enqueue_nowait (T &&item) { { std::unique_lock<std::mutex> lock (queue_mutex_) ; q_.push_back (std::move (item)); } push_cv_.notify_one (); } bool dequeue_for (T &popped_item, std::chrono::milliseconds wait_duration) { { std::unique_lock<std::mutex> lock (queue_mutex_) ; if (!push_cv_.wait_for (lock, wait_duration, [this ] { return !this ->q_.empty (); })) { return false ; } popped_item = std::move (q_.front ()); q_.pop_front (); } pop_cv_.notify_one (); return true ; } size_t overrun_counter () { std::unique_lock<std::mutex> lock (queue_mutex_) ; return q_.overrun_counter (); } private : std::mutex queue_mutex_; std::condition_variable push_cv_; std::condition_variable pop_cv_; circular_q<T> q_; };
我们来看看这个队列是怎么实现线程安全的。
其中q_
这个循环队列不是线程安全的,所以加上了一个queue_mutex
这个互斥锁用来同步所有成员函数的顺序并配合条件变量实现等待获取的功能。
spdlog-thread_pool
的实现逻辑很清晰,我们可以对比一下Github上另一个thread-pool: progschj/ThreadPool 的实现。
由于需要写入的任务很明确,就是处理异步日志,所以任务的队列直接写死了处理异步日志消息。而progschj/ThreadPool的实现则更加灵活。我们可以看看我的另一篇博客阅读progschj/thread_pool源码 对progschj/ThreadPool的介绍