progschj/thread_pool
Github上这个库(progschj/thread_pool)的点赞最多,学习一下。
接口定义
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| class ThreadPool { public: ThreadPool(size_t); template <class F, class... Args> auto enqueue(F&& f, Args&&... args) -> std::future<typename std::invoke_result<F(Args...)>::type>; ~ThreadPool();
private: std::vector<std::thread> workers; std::queue<std::function<void()> > tasks;
std::mutex queue_mutex; std::condition_variable condition; bool stop; };
|
构造函数和消费者实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
|
inline ThreadPool::ThreadPool(size_t threads) : stop(false) { for (size_t i = 0; i < threads; ++i) workers.emplace_back([this] { for (;;) { std::function<void()> task;
{ std::unique_lock<std::mutex> lock(this->queue_mutex); this->condition.wait( lock, [this] { return this->stop || !this->tasks.empty(); }); if (this->stop && this->tasks.empty()) return; task = std::move(this->tasks.front()); this->tasks.pop(); }
task(); } }); }
|
析构函数
1 2 3 4 5 6 7 8 9
| inline ThreadPool::~ThreadPool() { { std::unique_lock<std::mutex> lock(queue_mutex); stop = true; } condition.notify_all(); for (std::thread& worker : workers) worker.join(); }
|
生产者函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
|
template <class F, class... Args> auto ThreadPool::enqueue(F&& f, Args&&... args) -> std::future<typename std::invoke_result<F(Args...)>::type> { using return_type = typename std::invoke_result<F(Args...)>::type;
auto task = std::make_shared<std::packaged_task<return_type()> >( std::bind(std::forward<F>(f), std::forward<Args>(args)...)); std::future<return_type> res = task->get_future(); { std::unique_lock<std::mutex> lock(queue_mutex);
if (stop) throw std::runtime_error("enqueue on stopped ThreadPool"); tasks.emplace([task]() { (*task)(); }); } condition.notify_one(); return res; }
|