0%

阅读C++线程池源码

progschj/thread_pool

Github上这个库(progschj/thread_pool)的点赞最多,学习一下。

接口定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class ThreadPool {
public:
ThreadPool(size_t);
/// 任务入列
template <class F, class... Args>
auto enqueue(F&& f, Args&&... args)
-> std::future<typename std::invoke_result<F(Args...)>::type>;
~ThreadPool();

private:
/// 所有的工作线程
std::vector<std::thread> workers;
/// 任务队列
std::queue<std::function<void()> > tasks;

/// 用于同步的互斥锁和条件变量
std::mutex queue_mutex;
std::condition_variable condition;
bool stop; ///< 用于判断所有线程是否需要结束
};

构造函数和消费者实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/// @name     ThreadPool
/// @brief 用于创建若干个线程,并规定消费者函数
///
/// @param threads [in] 要创建的线程数量
///
/// @return NONE
///
/// @date 2020-06-27 16:17:50
/// @warning 线程安全
inline ThreadPool::ThreadPool(size_t threads) : stop(false) {
for (size_t i = 0; i < threads; ++i)
workers.emplace_back([this] {
for (;;) {
std::function<void()> task;

{
/// 获取同步锁
std::unique_lock<std::mutex> lock(this->queue_mutex);
/// 阻塞等待获取任务,直到任务队列不为空
this->condition.wait(
lock, [this] { return this->stop || !this->tasks.empty(); });
/// 如果stop标志位为true,且任务列表都执行完毕后,该线程退出
if (this->stop && this->tasks.empty()) return;
/// 从任务队列中拿出来一个任务
task = std::move(this->tasks.front());
this->tasks.pop();
} ///< 这里释放锁

/// 执行该任务函数
task();
}
});
}

析构函数

1
2
3
4
5
6
7
8
9
inline ThreadPool::~ThreadPool() {
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
}
/// 在stop位, 置为true后通知所有线程执行一次,然后等待所有线程处理完任务后join()
condition.notify_all();
for (std::thread& worker : workers) worker.join();
}

生产者函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/// @name     enqueue
/// @brief 用于添加任务函数到任务队列中
///
/// @param f [in] 任务函数
/// @param args [in] 任务函数的入参列表
///
/// @return 取决于任务函数的返回值
///
/// @date 2020-06-27 16:06:30
/// @warning 线程安全
template <class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args)
-> std::future<typename std::invoke_result<F(Args...)>::type> {
using return_type = typename std::invoke_result<F(Args...)>::type;

/// 这里封装一个异步的线程并执行刚刚传入的函数,这个函数通过bind改类型为void()
auto task = std::make_shared<std::packaged_task<return_type()> >(
std::bind(std::forward<F>(f), std::forward<Args>(args)...));
/// 创建一个这个函数的未来的值, 这个未来值不获取就不会进行计算
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex);

/// 假如说没有让这个线程停止则继续,否则抛出异常阻止线程池结束后在入列
if (stop) throw std::runtime_error("enqueue on stopped ThreadPool");
/// 这个封装好的函数放入任务列表中
tasks.emplace([task]() { (*task)(); });
}
/// 通知一个阻塞中的线程,任务队列中有任务了
condition.notify_one();
return res;
}