C++线程知识简介
C++线程知识简介
目的
假设我们使用的是多核 CPU,且无法避免使用多线程。站在产品的稳定性和性能优化的角度,对线程知识进行简介。
范围
为了在多线程的编程环境中,更好发挥多核CPU的性能,并对多线程相关的缺陷加以了解并进行规避。着重于讲并行
而非并发
的情况。
并行(Parallel
)与并发(Concurrent
):
- 并行: 单线程。
- 并发: 多线程。
并发:
任务1 | 任务2 |
---|---|
执行语句1 | |
执行语句2 | |
执行语句3 | |
执行语句4 |
并行:
任务1 | 任务2 |
---|---|
执行语句1 | 执行语句2 |
热身
- 一个数据如果一个时刻是只读的,那么在这个时刻该数据是线程安全的。
- 一个数据被多个线程同时读写,那么该数据是线程不安全的。
- 在同一个线程中,对一个普通互斥量加锁两次,会发生死锁。
int
、unsigned
、char
、double
等基本类型,均为线程不安全的。- 互斥量的创建、加锁和解锁操作本身,并不耗时。
场景
线程间共享数据
场景举例:
- 通信线程之间同步消息序列号。
- ATS线程写入列车信息,其他线程读取。
- C++ STL 互斥量版本
std::mutex mtx;
mtx.lock();
....
mtx.unlock();
- C++ STL 读写锁版本
std::shared_mutex mutex;
int sharedData = 0;
void readerThread()
{
std::shared_lock<std::shared_mutex> lock(mutex);
}
void writerThread()
{
std::unique_lock<std::shared_mutex> lock(mutex);
sharedData += 1;
}
写入线程 | 读取线程1 | 读取线程2 |
---|---|---|
readerThread() | readerThread() | |
writerThread() |
- C++ STL 原子变量版本
std::atomic<int> i;
/// 线程安全
void set()
{
i++;
}
/// 线程安全
int get()
{
return i;
}
- Windows API 版本
HANDLE hMutex;
// 创建互斥量
hMutex = CreateMutex(NULL, FALSE, NULL);
if (hMutex == NULL)
{
return 0;
}
// 加锁
WaitForSingleObject(hMutex, INFINITE);
...
// 解锁
ReleaseMutex(hMutex);
// 关闭互斥量
CloseHandle(hMutex);
- Windows 临界区版本
CRITICAL_SECTION cs;
// 初始化临界区
InitializeCriticalSection(&cs);
// 加锁
EnterCriticalSection(&cs);
...
// 解锁
LeaveCriticalSection(&cs);
// 删除临界区
DeleteCriticalSection(&cs);
注: EnterCriticalSection
和 WaitForSingleObject
的区别:
WaitForSingleObject
因为涉及到 用户态和内核态的切换,更慢。WaitForSingleObject
可以用于进程间的同步,而EnterCriticalSection
不能。WaitForSingleObject
可以达到超时等待的效果,而EnterCriticalSection
会一直等待。- 在同一线程中多次调用
WaitForSingleObject
和EnterCriticalSection
都不会产生死锁。
- pthread 版本
pthread_mutex_t mutex;
pthread_mutex_init(&mutex, NULL);
pthread_mutex_lock(&mutex);
...
pthread_mutex_unlock(&mutex);
pthread_mutex_destroy(&mutex);
- 注:
pthread_mutex_t
默认是不可被同一线程加锁两次的,即不可重入。如果想要可以重入则需要设置属性。
pthread_mutexattr_t mutex_attr;
pthread_mutexattr_init(&mutex_attr);
pthread_mutexattr_settype(&mutex_attr, PTHREAD_MUTEX_RECURSIVE); // 设置为可重入
pthread_mutex_t mutex;
pthread_mutex_init(&mutex, &mutex_attr); // 使用属性,初始化互斥量
- gos 库版本
HANDLE mutex_ = gos_mutex_init();
gos_mutex(mutex_);
...
gos_unmutex(mutex_);
gos_mutex_free(mutex_);
注:
- gos 库版本的互斥量,在
window
下是可重入的, 在linux
下是不可重入的。
后台运行周期性任务
class NTPClient
{
public:
/// 进行NTP同步, 后台线程周期运行
void ntp_sync();
/// 获取NTP是否成功, 业务线程调用
bool get_ntp_valid();
};
- C++ STL 标准库版本
NTPClient client;
void ntp_sync_thread()
{
while (true)
{
client.ntp_sync();
std::this_thread::sleep_for(std::chrono::seconds(10));
}
}
std::thread t(ntp_sync_thread);
/// 分离线程
t.detach();
- pthread 版本:
pthread_attr_t stAttr;
/// 设置 pthread 属性
pthread_attr_init(&stAttr);
pthread_attr_setdetachstate(&stAttr, PTHREAD_CREATE_DETACHED);
/// 初始化线程
pthread_t thread_id;
pthread_create(&thread_id, NULL, threadFunction, &client);
- windows 版本:
/// 1. Windows API:
HANDLE hThread = CreateThread(NULL, 0, threadFunction, &client, 0, NULL);
// 分离线程
CloseHandle(hThread);
/// 2. Vistual C++ CRT 版本
uintptr_t hThread = _beginthreadex(NULL, 0, threadFunction, &client, 0, NULL);
为什么选择 _beginthreadex
而不是 CreateThread
?
_beginthreadex
为每个线程分配自己的tiddata
内存结构, 其中保存了 C 语言中的全局变量, 如 errno
。
参考资料: windows 多线程: CreateThread、_beginthread、_beginthreadex、AfxBeginThread 的区别
- gos 库版本
class NTPClientThread: public GThread
{
public :
NTPClientThread()
{
Start();
}
virtual GOS_THREAD_RET ThreadEntry(void* pPara)
{
while (true)
{
client.ntp_sync();
GThread::Sleep(10000);
}
return 0;
}
private:
NTPClient client;
};
后台运行耗时任务(一次性任务)
场景举例:
- 耗时函数放到后台运行,结果想要获取时再主动获取。
- 两个执行时间非常长的函数,并行执行可节省时间。
- 等待打印机打印的同时,继续执行其他任务。
实现思路:
- 启动一个线程,运行一个函数,函数运行结束,线程退出。
- 业务线程: 创建后台线程。
- 后台线程: 运行函数,函数结束后退出。
- 业务线程: 等待后台线程结束后,取得函数结果。
- C++ STL 标准库版本
int i = 0;
std::thread t([&i](){
i = 1;
});
// 等待后台线程结束
t.join();
// 获取线程函数结果
std::cout << i << std::endl;
- C++ STL 异步版本
int i = 0;
// 使用 std::async 在后台执行函数并获取结果
auto future = std::async(std::launch::async, [&i]() {
i = 1;
return i;
});
// 等待后台线程结束并获取结果
i = future.get();
// 输出线程函数结果
std::cout << i << std::endl;
- Windows API 版本
int i = 0;
DWORD WINAPI f(LPVOID lpParam)
{
i = 1;
return 0;
}
HANDLE hThread = CreateThread(NULL, 0, f, NULL, 0, NULL);
// 等待后台线程结束
WaitForSingleObject(hThread, INFINITE);
// 关闭线程句柄
CloseHandle(hThread);
// 获取线程函数结果
std::cout << i << std::endl;
- pthread 版本
int i = 0;
void f()
{
i = 1;
}
pthread_t t;
pthread_create(&t, NULL, f, NULL);
// 等待后台线程结束
pthread_join(t, NULL);
// 获取线程函数结果
std::cout << i << std::endl;
线程主动停止与资源释放
线程正在运行时,对线程进行销毁(free, delete),可能会访问到已经被释放的内存,导致程序崩溃。
因此,线程停止时需要等待线程结束后再释放资源。
对于 joinable
的线程,需要调用 join
函数等待线程结束后再释放资源。
但对于 detach
的线程,如何知晓线程函数执行完毕。
场景举例: 视频播放线程的主动停止。
class ThreadPlayAndRec : public GThread
{};
ThreadPlayAndRec* p = new ThreadPlayAndRec();
p->Start();
...
p->Stop();
/// 通知线程结束后需要等待多久,线程函数才会结束
delete p;
解决办法: 设置结束标识位来判断。
VOID ThreadPlayAndRec::Free()
{
m_ulThreadState = THREAD_STATE_CLOSING;
Stop();
while(1)
{
if (m_ulThreadState == THREAD_STATE_FREE)
{
break;
}
gos_sleep_1ms(1);
}
}
ThreadPlayAndRec* p = new ThreadPlayAndRec();
p->Start(); ///< 通知线程开始
...
p->Stop(); ///< 通知线程结束
p->Free(); ///< 阻塞等待线程结束
delete p;
温州S2项目对 GThread 的改动:
ThreadPlayAndRec* p = new ThreadPlayAndRec();
p->Start(); ///< 通知线程开始
delete p; ///< 等待线程结束后, 释放资源
线程唤醒(线程池)
场景举例: 线程池中的线程,在任务队列出现任务时,唤醒一个线程进行处理。
- Linux 信号量举例
/// 初始化
sem_init()
/// 等待信号量唤醒
sem_wait()
/// 信号量唤醒
sem_post()
/// 回收资源
sem_destroy()
生产者线程 | 业务线程1 | 业务线程2 |
---|---|---|
初始化 sem_init() | ||
等待唤醒 sem_wait() | ||
等待唤醒 sem_wait() | ||
消息入列 sem_post() | ||
被唤醒后, 取出消息并处理 | ||
回收资源 sem_destroy() |
- pthread 条件变量版本
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
int flag = 0;
void* threadFunction(void* arg) {
pthread_mutex_lock(&mutex);
while (!flag) {
pthread_cond_wait(&cond, &mutex);
}
printf("Thread received signal\n");
pthread_mutex_unlock(&mutex);
...
return NULL;
}
int main() {
pthread_t thread;
pthread_create(&thread, NULL, threadFunction, NULL);
/// 主线程等待一段时间后发送信号
sleep(2);
pthread_mutex_lock(&mutex);
flag = 1;
/// 唤醒线程
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);
pthread_join(thread, NULL);
return 0;
}
- C++ STL 风格代码举例
void producers()
{
std::unique_lock<std::mutex> lock(queue_mutex);
/// 这个封装好的函数放入任务列表中
tasks.emplace(&f);
/// 通知一个阻塞中的线程,任务队列中有任务了
condition.notify_one();
}
void consumer()
{
std::unique_lock<std::mutex> lock(this->queue_mutex);
/// 阻塞等待直到函数返回 true
this->condition.wait(lock, [this] { return !this->tasks.empty(); });
/// 从任务队列中拿出来一个任务
task = this->tasks.front();
this->tasks.pop();
}
生产者线程 | 业务线程1 | 业务线程2 |
---|---|---|
条件变量阻塞等待(wait() ) |
||
条件变量阻塞等待(wait() ) |
||
消息入列 | ||
唤醒线程(notify_one() ) |
||
被唤醒(条件变量停止阻塞) | ||
获取消息并执行业务 |
线程安全中所涉及的问题
死锁
- 情况1:
线程1 | 线程2 |
---|---|
获取互斥量1 | 获取互斥量2 |
获取互斥量2 | 获取互斥量1 |
解决办法:
- 一次只获取一个互斥量。
{
std::lock_guard<std::mutex> lock1(mutex1);
...
}
{
std::lock_guard<std::mutex> lock2(mutex2);
...
}
- 使用 STL 的语法
{
std::scoped_lock lock(mutex1, mutex2);
std::cout << "Main thread acquired locks" << std::endl;
}
- 情况2:
线程1 |
---|
获取互斥量1 |
获取互斥量1 |
解决办法:
- 使用带有可重入属性的互斥量。
/// STL
std::recursive_mutex mutex;
/// pthread
pthread_mutexattr_init(&attr);
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
/// window API
EnterCriticalSection(&cs);
...
LeaveCriticalSection(&cs);
- 情况3:
线程1 | 线程2 |
---|---|
等待线程2 join | 等待线程1 join |
解决办法:
- 在同一线程创建其他线程,也在同一线程进行 join。
ABA问题
线程1 | 线程2 |
---|---|
查询余额, 并存储进变量 i | 查询余额, 并存储进变量 i |
if(i >= 50) | if(i >= 50) |
i = i - 50; | i = i - 50; |
更新余额为 50 | |
更新余额为 50 |
解决办法:
- 串行执行
| 线程1 | 线程2 |
| :----------------------: | :----------------------: |
| 获取互斥量 mtx1 | 获取互斥量 mtx1 |
| 获取互斥量成功 | |
| 查询余额, 并存储进变量 i | |
| if(i >= 50) | |
| i = i - 50; | |
| 更新余额为 50 | |
| 释放互斥量 | |
| | 获取互斥量成功 |
| | 查询余额, 并存储进变量 i |
| | if(i >= 100) |
| | i = i - 50; |
| | 更新余额为 50 |
2. 使用 CAS(Compare And Swap) 的方法
```c++
| 线程1 | 线程2 |
| :--------------------------------------------------: | :--------------------------------------------------: |
| 查询余额, 并存储进变量 i | 查询余额, 并存储进变量 i |
| if(i == 100) | if(i == 100) |
| i = i - 50; | i = i - 50; |
| 如果当前值为 100 则更新约为 50(compare_and_exchange) | |
| 更新成功,事务结束 | |
| | 如果当前值为 100 则更新约为 50(compare_and_exchange) |
| | 更新失败,事务结束 |
初始化单例
C++11之前以下全局变量,线程不安全。
ThreadPlayAndRec g_ThreadPlayAndRec; ///< 全局变量,在程序启动时线程不安全
- 错误的做法1:
ThreadPlayAndRec* p = NULL;
if(!p)
{
p = new ThreadPlayAndRec();
}
线程1 | 线程2 |
---|---|
if(!p) | if(!p) |
new ThreadPlayAndRec(); | new ThreadPlayAndRec(); |
p 被赋值 | p 被赋值 |
线程1 | 线程2 |
---|---|
if(!p) | |
new ThreadPlayAndRec(); | if(!p) |
p 被赋值 | new ThreadPlayAndRec(); |
p 被赋值 |
- 错误的做法2:
ThreadPlayAndRec* p = NULL;
std::mutex mtx;
if(!p) // 第一次检查
{
std::lock_guard<std::mutex> guard(mtx);
if(!p) // 第二次检查
{
p = new ThreadPlayAndRec();
}
}
正常流程:
线程1 | 线程2 |
---|---|
if(!p) 第一次检查 | if(!p) 第一次检查 |
尝试获取互斥量 | 尝试获取互斥量 |
获取互斥量成功 | |
if(!p) 再次判断是否为空 | |
new ThreadPlayAndRec(); | |
p 被赋值 | |
释放互斥量 | |
获取互斥量成功 | |
if(!p) 再次判断是否为空 | |
p 不为空,流程结束 |
错误流程:
线程1 | 线程2 |
---|---|
if(!p) 第一次检查 | |
尝试获取互斥量 | |
获取互斥量成功 | |
if(!p) 再次判断是否为空 | |
new ThreadPlayAndRec(); | |
p 被赋值 | if(!p) 第一次检查 |
最后一步产生了读写竞争。
- 解决办法:
- 在 C++11 标准下使用 全局/静态变量。
- 使用 C++11 中提供的
std::call_once
保证初始化函数只被调用一次。
std::once_flag flag;
ThreadPlayAndRec* p = NULL;
std::call_once(flag, []() { p = new ThreadPlayAndRec(); });
- 使用互斥量
ThreadPlayAndRec* p = NULL;
std::mutex mtx;
std::lock_guard<std::mutex> guard(mtx);
if(!p)
{
p = new ThreadPlayAndRec();
}
- 在 C++11 之前,使用
Linux API
或者Windows API
函数。
linux:
pthread_once_t once_control = PTHREAD_ONCE_INIT;
void init_function()
{
std::cout << "Initialization function executed" << std::endl;
}
/// 只会执行一次
pthread_once(&once_control, init_function);
windows:
EnterCriticalSection(&cs);
if (!initialized)
{
init_function();
initialized = true;
}
LeaveCriticalSection(&cs);
-
不管, 成本最低也是最适合我们的方法。
-
使用
gos::singleton
, 其实现思路是 C++11 之前使用 double-check lock 的方法, C++11 之后使用std::call_once
。
多个线程同时开始
测试线程需要被测试函数同时并行执行。
- C++ STL 条件变量实现
std::mutex mutex_;
std::condition_variable cv_;
bool ready_ = false;
std::jthread j([this]() mutable
{
std::unique_lock<std::mutex> mutex(mutex_);
/// 等待同步开始
cv_.wait(mutex, [this] { return ready_; });
/// 执行函数
f();
}));
/// 锁定互斥量,以修改 ready_
mutex_.lock();
ready_ = true;
/// 发送给所有的线程,可以开始运行
cv_.notify_all();
mutex_.unlock();
隐藏的多线程问题
-
场景1: 在通信线程,直接使用修改界面的语句。 注: 界面有自己的线程调度,如果在其他线程中直接修改界面,可能会导致界面崩溃。
-
场景2: 在回调函数中使用共享变量(全局、静态或类成员变量)。 注: 回调函数一定在其他线程中执行,如果在回调函数中使用共享变量,可能会导致线程不安全。