阅读C++线程池源码

1 分钟阅读

progschj/thread_pool

Github上这个库(progschj/thread_pool)的点赞最多,学习一下。

接口定义

class ThreadPool {
 public:
  ThreadPool(size_t);
  /// 任务入列
  template <class F, class... Args>
  auto enqueue(F&& f, Args&&... args)
      -> std::future<typename std::invoke_result<F(Args...)>::type>;
  ~ThreadPool();

 private:
  /// 所有的工作线程
  std::vector<std::thread> workers;
  /// 任务队列
  std::queue<std::function<void()> > tasks;

  /// 用于同步的互斥锁和条件变量
  std::mutex queue_mutex;
  std::condition_variable condition;
  bool stop;  ///< 用于判断所有线程是否需要结束
};

构造函数和消费者实现

/// @name     ThreadPool
/// @brief    用于创建若干个线程,并规定消费者函数
///
/// @param    threads   [in]    要创建的线程数量
///
/// @return   NONE
///
/// @date     2020-06-27 16:17:50
/// @warning  线程安全
inline ThreadPool::ThreadPool(size_t threads) : stop(false) {
  for (size_t i = 0; i < threads; ++i)
    workers.emplace_back([this] {
      for (;;) {
        std::function<void()> task;

        {
          /// 获取同步锁
          std::unique_lock<std::mutex> lock(this->queue_mutex);
          /// 阻塞等待获取任务,直到任务队列不为空
          this->condition.wait(
              lock, [this] { return this->stop || !this->tasks.empty(); });
          /// 如果stop标志位为true,且任务列表都执行完毕后,该线程退出
          if (this->stop && this->tasks.empty()) return;
          /// 从任务队列中拿出来一个任务
          task = std::move(this->tasks.front());
          this->tasks.pop();
        }  ///< 这里释放锁

        /// 执行该任务函数
        task();
      }
    });
}

析构函数

inline ThreadPool::~ThreadPool() {
  {
    std::unique_lock<std::mutex> lock(queue_mutex);
    stop = true;
  }
  /// 在stop位, 置为true后通知所有线程执行一次,然后等待所有线程处理完任务后join()
  condition.notify_all();
  for (std::thread& worker : workers) worker.join();
}

生产者函数

/// @name     enqueue
/// @brief    用于添加任务函数到任务队列中
///
/// @param    f     [in]    任务函数
/// @param    args  [in]    任务函数的入参列表
///
/// @return   取决于任务函数的返回值
///
/// @date     2020-06-27 16:06:30
/// @warning  线程安全
template <class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args)
    -> std::future<typename std::invoke_result<F(Args...)>::type> {
  using return_type = typename std::invoke_result<F(Args...)>::type;

  /// 这里封装一个异步的线程并执行刚刚传入的函数,这个函数通过bind改类型为void()
  auto task = std::make_shared<std::packaged_task<return_type()> >(
      std::bind(std::forward<F>(f), std::forward<Args>(args)...));
  /// 创建一个这个函数的未来的值, 这个未来值不获取就不会进行计算
  std::future<return_type> res = task->get_future();
  {
    std::unique_lock<std::mutex> lock(queue_mutex);

    /// 假如说没有让这个线程停止则继续,否则抛出异常阻止线程池结束后在入列
    if (stop) throw std::runtime_error("enqueue on stopped ThreadPool");
    /// 这个封装好的函数放入任务列表中
    tasks.emplace([task]() { (*task)(); });
  }
  /// 通知一个阻塞中的线程,任务队列中有任务了
  condition.notify_one();
  return res;
}