C++线程知识简介
目的
假设我们使用的是多核 CPU,且无法避免使用多线程。站在产品的稳定性和性能优化的角度,对线程知识进行简介。
范围
为了在多线程的编程环境中,更好发挥多核CPU的性能,并对多线程相关的缺陷加以了解并进行规避。着重于讲并行
而非并发
的情况。
并行(Parallel
)与并发(Concurrent
):
并发:
任务1 |
任务2 |
执行语句1 |
|
|
执行语句2 |
执行语句3 |
|
|
执行语句4 |
并行:
热身
- 一个数据如果一个时刻是只读的,那么在这个时刻该数据是线程安全的。
- 一个数据被多个线程同时读写,那么该数据是线程不安全的。
- 在同一个线程中,对一个普通互斥量加锁两次,会发生死锁。
int
、unsigned
、char
、double
等基本类型,均为线程不安全的。
- 互斥量的创建、加锁和解锁操作本身,并不耗时。
场景
线程间共享数据
场景举例:
- 通信线程之间同步消息序列号。
- ATS线程写入列车信息,其他线程读取。
1 2 3 4
| std::mutex mtx; mtx.lock(); .... mtx.unlock();
|
1 2 3 4 5 6 7 8 9 10 11 12 13
| std::shared_mutex mutex; int sharedData = 0;
void readerThread() { std::shared_lock<std::shared_mutex> lock(mutex); }
void writerThread() { std::unique_lock<std::shared_mutex> lock(mutex); sharedData += 1; }
|
写入线程 |
读取线程1 |
读取线程2 |
|
readerThread() |
readerThread() |
writerThread() |
|
|
1 2 3 4 5 6 7 8 9 10 11 12 13
| std::atomic<int> i;
void set() { i++; }
int get() { return i; }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| HANDLE hMutex;
hMutex = CreateMutex(NULL, FALSE, NULL); if (hMutex == NULL) { return 0; }
WaitForSingleObject(hMutex, INFINITE);
...
ReleaseMutex(hMutex);
CloseHandle(hMutex);
|
1 2 3 4 5 6 7 8 9 10
| CRITICAL_SECTION cs;
InitializeCriticalSection(&cs);
EnterCriticalSection(&cs); ...
LeaveCriticalSection(&cs);
DeleteCriticalSection(&cs);
|
注: EnterCriticalSection
和 WaitForSingleObject
的区别:
WaitForSingleObject
因为涉及到 用户态和内核态的切换,更慢。
WaitForSingleObject
可以用于进程间的同步,而 EnterCriticalSection
不能。
WaitForSingleObject
可以达到超时等待的效果,而 EnterCriticalSection
会一直等待。
- 在同一线程中多次调用
WaitForSingleObject
和 EnterCriticalSection
都不会产生死锁。
1 2 3 4 5 6 7 8
| pthread_mutex_t mutex; pthread_mutex_init(&mutex, NULL); pthread_mutex_lock(&mutex);
...
pthread_mutex_unlock(&mutex); pthread_mutex_destroy(&mutex);
|
- 注:
pthread_mutex_t
默认是不可被同一线程加锁两次的,即不可重入。如果想要可以重入则需要设置属性。
1 2 3 4 5 6
| pthread_mutexattr_t mutex_attr; pthread_mutexattr_init(&mutex_attr); pthread_mutexattr_settype(&mutex_attr, PTHREAD_MUTEX_RECURSIVE);
pthread_mutex_t mutex; pthread_mutex_init(&mutex, &mutex_attr);
|
1 2 3 4 5 6 7
| HANDLE mutex_ = gos_mutex_init(); gos_mutex(mutex_);
...
gos_unmutex(mutex_); gos_mutex_free(mutex_);
|
注:
- gos 库版本的互斥量,在
window
下是可重入的, 在 linux
下是不可重入的。
后台运行周期性任务
1 2 3 4 5 6 7 8
| class NTPClient { public: void ntp_sync(); bool get_ntp_valid(); };
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| NTPClient client;
void ntp_sync_thread() { while (true) { client.ntp_sync(); std::this_thread::sleep_for(std::chrono::seconds(10)); } }
std::thread t(ntp_sync_thread);
t.detach();
|
1 2 3 4 5 6 7 8 9
| pthread_attr_t stAttr;
pthread_attr_init(&stAttr); pthread_attr_setdetachstate(&stAttr, PTHREAD_CREATE_DETACHED);
pthread_t thread_id; pthread_create(&thread_id, NULL, threadFunction, &client);
|
1 2 3 4 5 6 7
| HANDLE hThread = CreateThread(NULL, 0, threadFunction, &client, 0, NULL);
CloseHandle(hThread);
uintptr_t hThread = _beginthreadex(NULL, 0, threadFunction, &client, 0, NULL);
|
为什么选择 _beginthreadex
而不是 CreateThread
?
_beginthreadex
为每个线程分配自己的tiddata
内存结构, 其中保存了 C 语言中的全局变量, 如 errno
。
参考资料: windows 多线程: CreateThread、_beginthread、_beginthreadex、AfxBeginThread 的区别
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| class NTPClientThread: public GThread { public : NTPClientThread() { Start(); }
virtual GOS_THREAD_RET ThreadEntry(void* pPara) { while (true) { client.ntp_sync(); GThread::Sleep(10000); } return 0; }
private: NTPClient client; };
|
后台运行耗时任务(一次性任务)
场景举例:
- 耗时函数放到后台运行,结果想要获取时再主动获取。
- 两个执行时间非常长的函数,并行执行可节省时间。
- 等待打印机打印的同时,继续执行其他任务。
实现思路:
- 启动一个线程,运行一个函数,函数运行结束,线程退出。
- 业务线程: 创建后台线程。
- 后台线程: 运行函数,函数结束后退出。
- 业务线程: 等待后台线程结束后,取得函数结果。
1 2 3 4 5 6 7 8 9 10 11
| int i = 0;
std::thread t([&i](){ i = 1; });
t.join();
std::cout << i << std::endl;
|
1 2 3 4 5 6 7 8 9 10 11 12 13
| int i = 0;
auto future = std::async(std::launch::async, [&i]() { i = 1; return i; });
i = future.get();
std::cout << i << std::endl;
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| int i = 0;
DWORD WINAPI f(LPVOID lpParam) { i = 1; return 0; }
HANDLE hThread = CreateThread(NULL, 0, f, NULL, 0, NULL);
WaitForSingleObject(hThread, INFINITE);
CloseHandle(hThread);
std::cout << i << std::endl;
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| int i = 0;
void f() { i = 1; }
pthread_t t; pthread_create(&t, NULL, f, NULL);
pthread_join(t, NULL);
std::cout << i << std::endl;
|
线程主动停止与资源释放
线程正在运行时,对线程进行销毁(free, delete),可能会访问到已经被释放的内存,导致程序崩溃。
因此,线程停止时需要等待线程结束后再释放资源。
对于 joinable
的线程,需要调用 join
函数等待线程结束后再释放资源。
但对于 detach
的线程,如何知晓线程函数执行完毕。
场景举例: 视频播放线程的主动停止。
1 2 3 4 5 6 7 8 9
| class ThreadPlayAndRec : public GThread {};
ThreadPlayAndRec* p = new ThreadPlayAndRec(); p->Start(); ... p->Stop();
delete p;
|
解决办法: 设置结束标识位来判断。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| VOID ThreadPlayAndRec::Free() { m_ulThreadState = THREAD_STATE_CLOSING; Stop(); while(1) { if (m_ulThreadState == THREAD_STATE_FREE) { break; }
gos_sleep_1ms(1); }
}
ThreadPlayAndRec* p = new ThreadPlayAndRec(); p->Start(); ... p->Stop(); p->Free(); delete p;
|
温州S2项目对 GThread 的改动:
1 2 3
| ThreadPlayAndRec* p = new ThreadPlayAndRec(); p->Start(); delete p;
|
线程唤醒(线程池)
场景举例: 线程池中的线程,在任务队列出现任务时,唤醒一个线程进行处理。
1 2 3 4 5 6 7 8 9 10 11
| sem_init()
sem_wait()
sem_post()
sem_destroy()
|
生产者线程 |
业务线程1 |
业务线程2 |
初始化 sem_init() |
|
|
|
等待唤醒 sem_wait() |
|
|
|
等待唤醒 sem_wait() |
消息入列 sem_post() |
|
|
|
被唤醒后, 取出消息并处理 |
|
回收资源 sem_destroy() |
|
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| pthread_cond_t cond = PTHREAD_COND_INITIALIZER; pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; int flag = 0;
void* threadFunction(void* arg) { pthread_mutex_lock(&mutex); while (!flag) { pthread_cond_wait(&cond, &mutex); } printf("Thread received signal\n"); pthread_mutex_unlock(&mutex);
...
return NULL; }
int main() { pthread_t thread; pthread_create(&thread, NULL, threadFunction, NULL);
sleep(2); pthread_mutex_lock(&mutex); flag = 1; pthread_cond_signal(&cond); pthread_mutex_unlock(&mutex);
pthread_join(thread, NULL);
return 0; }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| void producers() { std::unique_lock<std::mutex> lock(queue_mutex); tasks.emplace(&f);
condition.notify_one(); }
void consumer() { std::unique_lock<std::mutex> lock(this->queue_mutex); this->condition.wait(lock, [this] { return !this->tasks.empty(); }); task = this->tasks.front(); this->tasks.pop(); }
|
生产者线程 |
业务线程1 |
业务线程2 |
|
条件变量阻塞等待(wait() ) |
|
|
|
条件变量阻塞等待(wait() ) |
消息入列 |
|
|
唤醒线程(notify_one() ) |
|
|
|
被唤醒(条件变量停止阻塞) |
|
|
获取消息并执行业务 |
|
线程安全中所涉及的问题
死锁
线程1 |
线程2 |
获取互斥量1 |
获取互斥量2 |
获取互斥量2 |
获取互斥量1 |
解决办法:
- 一次只获取一个互斥量。
1 2 3 4 5 6 7 8 9 10 11
| { std::lock_guard<std::mutex> lock1(mutex1); ... }
{ std::lock_guard<std::mutex> lock2(mutex2); ... }
|
- 使用 STL 的语法
1 2 3 4
| { std::scoped_lock lock(mutex1, mutex2); std::cout << "Main thread acquired locks" << std::endl; }
|
解决办法:
- 使用带有可重入属性的互斥量。
1 2 3 4 5 6 7 8 9 10 11
| std::recursive_mutex mutex;
pthread_mutexattr_init(&attr); pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
EnterCriticalSection(&cs); ... LeaveCriticalSection(&cs);
|
线程1 |
线程2 |
等待线程2 join |
等待线程1 join |
解决办法:
- 在同一线程创建其他线程,也在同一线程进行 join。
ABA问题
线程1 |
线程2 |
查询余额, 并存储进变量 i |
查询余额, 并存储进变量 i |
if(i >= 50) |
if(i >= 50) |
i = i - 50; |
i = i - 50; |
更新余额为 50 |
|
|
更新余额为 50 |
解决办法:
- 串行执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| | 线程1 | 线程2 | | :----------------------: | :----------------------: | | 获取互斥量 mtx1 | 获取互斥量 mtx1 | | 获取互斥量成功 | | | 查询余额, 并存储进变量 i | | | if(i >= 50) | | | i = i - 50; | | | 更新余额为 50 | | | 释放互斥量 | | | | 获取互斥量成功 | | | 查询余额, 并存储进变量 i | | | if(i >= 100) | | | i = i - 50; | | | 更新余额为 50 |
2. 使用 CAS(Compare And Swap) 的方法
```c++ | 线程1 | 线程2 | | :--------------------------------------------------: | :--------------------------------------------------: | | 查询余额, 并存储进变量 i | 查询余额, 并存储进变量 i | | if(i == 100) | if(i == 100) | | i = i - 50; | i = i - 50; | | 如果当前值为 100 则更新约为 50(compare_and_exchange) | | | 更新成功,事务结束 | | | | 如果当前值为 100 则更新约为 50(compare_and_exchange) | | | 更新失败,事务结束 |
|
初始化单例
C++11之前以下全局变量,线程不安全。
1
| ThreadPlayAndRec g_ThreadPlayAndRec;
|
1 2 3 4 5 6
| ThreadPlayAndRec* p = NULL;
if(!p) { p = new ThreadPlayAndRec(); }
|
线程1 |
线程2 |
if(!p) |
if(!p) |
new ThreadPlayAndRec(); |
new ThreadPlayAndRec(); |
p 被赋值 |
p 被赋值 |
线程1 |
线程2 |
if(!p) |
|
new ThreadPlayAndRec(); |
if(!p) |
p 被赋值 |
new ThreadPlayAndRec(); |
|
p 被赋值 |
1 2 3 4 5 6 7 8 9 10 11
| ThreadPlayAndRec* p = NULL; std::mutex mtx;
if(!p) { std::lock_guard<std::mutex> guard(mtx); if(!p) { p = new ThreadPlayAndRec(); } }
|
正常流程:
线程1 |
线程2 |
if(!p) 第一次检查 |
if(!p) 第一次检查 |
尝试获取互斥量 |
尝试获取互斥量 |
获取互斥量成功 |
|
if(!p) 再次判断是否为空 |
|
new ThreadPlayAndRec(); |
|
p 被赋值 |
|
释放互斥量 |
|
|
获取互斥量成功 |
|
if(!p) 再次判断是否为空 |
|
p 不为空,流程结束 |
错误流程:
线程1 |
线程2 |
if(!p) 第一次检查 |
|
尝试获取互斥量 |
|
获取互斥量成功 |
|
if(!p) 再次判断是否为空 |
|
new ThreadPlayAndRec(); |
|
p 被赋值 |
if(!p) 第一次检查 |
最后一步产生了读写竞争。
- 在 C++11 标准下使用 全局/静态变量。
- 使用 C++11 中提供的
std::call_once
保证初始化函数只被调用一次。
1 2 3
| std::once_flag flag; ThreadPlayAndRec* p = NULL; std::call_once(flag, []() { p = new ThreadPlayAndRec(); });
|
- 使用互斥量
1 2 3 4 5 6 7 8
| ThreadPlayAndRec* p = NULL; std::mutex mtx;
std::lock_guard<std::mutex> guard(mtx); if(!p) { p = new ThreadPlayAndRec(); }
|
- 在 C++11 之前,使用
Linux API
或者 Windows API
函数。
linux:
1 2 3 4 5 6 7 8 9
| pthread_once_t once_control = PTHREAD_ONCE_INIT;
void init_function() { std::cout << "Initialization function executed" << std::endl; }
pthread_once(&once_control, init_function);
|
windows:
1 2 3 4 5 6 7
| EnterCriticalSection(&cs); if (!initialized) { init_function(); initialized = true; } LeaveCriticalSection(&cs);
|
不管, 成本最低也是最适合我们的方法。
使用 gos::singleton
, 其实现思路是 C++11 之前使用 double-check lock 的方法, C++11 之后使用 std::call_once
。
多个线程同时开始
测试线程需要被测试函数同时并行执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| std::mutex mutex_; std::condition_variable cv_; bool ready_ = false;
std::jthread j([this]() mutable { std::unique_lock<std::mutex> mutex(mutex_); cv_.wait(mutex, [this] { return ready_; }); f(); }));
mutex_.lock(); ready_ = true;
cv_.notify_all(); mutex_.unlock();
|
隐藏的多线程问题