0%

C++线程知识简介

C++线程知识简介

目的

假设我们使用的是多核 CPU,且无法避免使用多线程。站在产品的稳定性和性能优化的角度,对线程知识进行简介。

范围

为了在多线程的编程环境中,更好发挥多核CPU的性能,并对多线程相关的缺陷加以了解并进行规避。着重于讲并行而非并发的情况。

并行(Parallel)与并发(Concurrent):

  • 并行: 单线程。
  • 并发: 多线程。

并发:

任务1 任务2
执行语句1
执行语句2
执行语句3
执行语句4

并行:

任务1 任务2
执行语句1 执行语句2

热身

  1. 一个数据如果一个时刻是只读的,那么在这个时刻该数据是线程安全的。
  2. 一个数据被多个线程同时读写,那么该数据是线程不安全的。
  3. 在同一个线程中,对一个普通互斥量加锁两次,会发生死锁。
  4. intunsignedchardouble 等基本类型,均为线程不安全的。
  5. 互斥量的创建、加锁和解锁操作本身,并不耗时。

场景

线程间共享数据

场景举例:

  1. 通信线程之间同步消息序列号。
  2. ATS线程写入列车信息,其他线程读取。
  • C++ STL 互斥量版本
1
2
3
4
std::mutex mtx;
mtx.lock();
....
mtx.unlock();
  • C++ STL 读写锁版本
1
2
3
4
5
6
7
8
9
10
11
12
13
std::shared_mutex mutex;
int sharedData = 0;

void readerThread()
{
std::shared_lock<std::shared_mutex> lock(mutex);
}

void writerThread()
{
std::unique_lock<std::shared_mutex> lock(mutex);
sharedData += 1;
}
写入线程 读取线程1 读取线程2
readerThread() readerThread()
writerThread()
  • C++ STL 原子变量版本
1
2
3
4
5
6
7
8
9
10
11
12
13
std::atomic<int> i;

/// 线程安全
void set()
{
i++;
}

/// 线程安全
int get()
{
return i;
}
  • Windows API 版本
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
HANDLE hMutex;

// 创建互斥量
hMutex = CreateMutex(NULL, FALSE, NULL);
if (hMutex == NULL)
{
return 0;
}

// 加锁
WaitForSingleObject(hMutex, INFINITE);

...

// 解锁
ReleaseMutex(hMutex);

// 关闭互斥量
CloseHandle(hMutex);
  • Windows 临界区版本
1
2
3
4
5
6
7
8
9
10
CRITICAL_SECTION cs;
// 初始化临界区
InitializeCriticalSection(&cs);
// 加锁
EnterCriticalSection(&cs);
...
// 解锁
LeaveCriticalSection(&cs);
// 删除临界区
DeleteCriticalSection(&cs);

注: EnterCriticalSectionWaitForSingleObject 的区别:

  1. WaitForSingleObject 因为涉及到 用户态和内核态的切换,更慢。
  2. WaitForSingleObject 可以用于进程间的同步,而 EnterCriticalSection 不能。
  3. WaitForSingleObject 可以达到超时等待的效果,而 EnterCriticalSection 会一直等待。
  4. 在同一线程中多次调用 WaitForSingleObjectEnterCriticalSection 都不会产生死锁。
  • pthread 版本
1
2
3
4
5
6
7
8
pthread_mutex_t mutex;
pthread_mutex_init(&mutex, NULL);
pthread_mutex_lock(&mutex);

...

pthread_mutex_unlock(&mutex);
pthread_mutex_destroy(&mutex);
  • 注:pthread_mutex_t 默认是不可被同一线程加锁两次的,即不可重入。如果想要可以重入则需要设置属性。
1
2
3
4
5
6
pthread_mutexattr_t mutex_attr;
pthread_mutexattr_init(&mutex_attr);
pthread_mutexattr_settype(&mutex_attr, PTHREAD_MUTEX_RECURSIVE); // 设置为可重入

pthread_mutex_t mutex;
pthread_mutex_init(&mutex, &mutex_attr); // 使用属性,初始化互斥量
  • gos 库版本
1
2
3
4
5
6
7
HANDLE mutex_ = gos_mutex_init();
gos_mutex(mutex_);

...

gos_unmutex(mutex_);
gos_mutex_free(mutex_);

注:

  1. gos 库版本的互斥量,在 window 下是可重入的, 在 linux 下是不可重入的。

后台运行周期性任务

1
2
3
4
5
6
7
8
class NTPClient
{
public:
/// 进行NTP同步, 后台线程周期运行
void ntp_sync();
/// 获取NTP是否成功, 业务线程调用
bool get_ntp_valid();
};
  • C++ STL 标准库版本
1
2
3
4
5
6
7
8
9
10
11
12
13
14
NTPClient client;

void ntp_sync_thread()
{
while (true)
{
client.ntp_sync();
std::this_thread::sleep_for(std::chrono::seconds(10));
}
}

std::thread t(ntp_sync_thread);
/// 分离线程
t.detach();
  • pthread 版本:
1
2
3
4
5
6
7
8
9
pthread_attr_t      stAttr;

/// 设置 pthread 属性
pthread_attr_init(&stAttr);
pthread_attr_setdetachstate(&stAttr, PTHREAD_CREATE_DETACHED);

/// 初始化线程
pthread_t thread_id;
pthread_create(&thread_id, NULL, threadFunction, &client);
  • windows 版本:
1
2
3
4
5
6
7
/// 1. Windows API:
HANDLE hThread = CreateThread(NULL, 0, threadFunction, &client, 0, NULL);
// 分离线程
CloseHandle(hThread);

/// 2. Vistual C++ CRT 版本
uintptr_t hThread = _beginthreadex(NULL, 0, threadFunction, &client, 0, NULL);

为什么选择 _beginthreadex 而不是 CreateThread?

_beginthreadex 为每个线程分配自己的tiddata内存结构, 其中保存了 C 语言中的全局变量, 如 errno

参考资料: windows 多线程: CreateThread、_beginthread、_beginthreadex、AfxBeginThread 的区别

  • gos 库版本
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class NTPClientThread: public GThread
{
public :
NTPClientThread()
{
Start();
}

virtual GOS_THREAD_RET ThreadEntry(void* pPara)
{
while (true)
{
client.ntp_sync();
GThread::Sleep(10000);
}
return 0;
}

private:
NTPClient client;
};

后台运行耗时任务(一次性任务)

场景举例:

  1. 耗时函数放到后台运行,结果想要获取时再主动获取。
  2. 两个执行时间非常长的函数,并行执行可节省时间。
  3. 等待打印机打印的同时,继续执行其他任务。

实现思路:

  1. 启动一个线程,运行一个函数,函数运行结束,线程退出。
  2. 业务线程: 创建后台线程。
  3. 后台线程: 运行函数,函数结束后退出。
  4. 业务线程: 等待后台线程结束后,取得函数结果。
  • C++ STL 标准库版本
1
2
3
4
5
6
7
8
9
10
11
int i = 0;

std::thread t([&i](){
i = 1;
});

// 等待后台线程结束
t.join();

// 获取线程函数结果
std::cout << i << std::endl;
  • C++ STL 异步版本
1
2
3
4
5
6
7
8
9
10
11
12
13
int i = 0;

// 使用 std::async 在后台执行函数并获取结果
auto future = std::async(std::launch::async, [&i]() {
i = 1;
return i;
});

// 等待后台线程结束并获取结果
i = future.get();

// 输出线程函数结果
std::cout << i << std::endl;
  • Windows API 版本
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
int i = 0;

DWORD WINAPI f(LPVOID lpParam)
{
i = 1;
return 0;
}

HANDLE hThread = CreateThread(NULL, 0, f, NULL, 0, NULL);

// 等待后台线程结束
WaitForSingleObject(hThread, INFINITE);

// 关闭线程句柄
CloseHandle(hThread);

// 获取线程函数结果
std::cout << i << std::endl;
  • pthread 版本
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
int i = 0;

void f()
{
i = 1;
}

pthread_t t;
pthread_create(&t, NULL, f, NULL);

// 等待后台线程结束
pthread_join(t, NULL);

// 获取线程函数结果
std::cout << i << std::endl;

线程主动停止与资源释放

线程正在运行时,对线程进行销毁(free, delete),可能会访问到已经被释放的内存,导致程序崩溃。

因此,线程停止时需要等待线程结束后再释放资源。

对于 joinable 的线程,需要调用 join 函数等待线程结束后再释放资源。

但对于 detach 的线程,如何知晓线程函数执行完毕。

场景举例: 视频播放线程的主动停止。

1
2
3
4
5
6
7
8
9
class ThreadPlayAndRec : public GThread
{};

ThreadPlayAndRec* p = new ThreadPlayAndRec();
p->Start();
...
p->Stop();
/// 通知线程结束后需要等待多久,线程函数才会结束
delete p;

解决办法: 设置结束标识位来判断。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
VOID ThreadPlayAndRec::Free()
{
m_ulThreadState = THREAD_STATE_CLOSING;
Stop();
while(1)
{
if (m_ulThreadState == THREAD_STATE_FREE)
{
break;
}

gos_sleep_1ms(1);
}

}

ThreadPlayAndRec* p = new ThreadPlayAndRec();
p->Start(); ///< 通知线程开始
...
p->Stop(); ///< 通知线程结束
p->Free(); ///< 阻塞等待线程结束
delete p;

温州S2项目对 GThread 的改动:

1
2
3
ThreadPlayAndRec* p = new ThreadPlayAndRec();
p->Start(); ///< 通知线程开始
delete p; ///< 等待线程结束后, 释放资源

线程唤醒(线程池)

场景举例: 线程池中的线程,在任务队列出现任务时,唤醒一个线程进行处理。

  • Linux 信号量举例
1
2
3
4
5
6
7
8
9
10
11
/// 初始化
sem_init()

/// 等待信号量唤醒
sem_wait()

/// 信号量唤醒
sem_post()

/// 回收资源
sem_destroy()
生产者线程 业务线程1 业务线程2
初始化 sem_init()
等待唤醒 sem_wait()
等待唤醒 sem_wait()
消息入列 sem_post()
被唤醒后, 取出消息并处理
回收资源 sem_destroy()
  • pthread 条件变量版本
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
int flag = 0;

void* threadFunction(void* arg) {
pthread_mutex_lock(&mutex);
while (!flag) {
pthread_cond_wait(&cond, &mutex);
}
printf("Thread received signal\n");
pthread_mutex_unlock(&mutex);

...

return NULL;
}

int main() {
pthread_t thread;
pthread_create(&thread, NULL, threadFunction, NULL);

/// 主线程等待一段时间后发送信号
sleep(2);
pthread_mutex_lock(&mutex);
flag = 1;
/// 唤醒线程
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);

pthread_join(thread, NULL);

return 0;
}
  • C++ STL 风格代码举例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void producers()
{
std::unique_lock<std::mutex> lock(queue_mutex);
/// 这个封装好的函数放入任务列表中
tasks.emplace(&f);

/// 通知一个阻塞中的线程,任务队列中有任务了
condition.notify_one();
}

void consumer()
{
std::unique_lock<std::mutex> lock(this->queue_mutex);
/// 阻塞等待直到函数返回 true
this->condition.wait(lock, [this] { return !this->tasks.empty(); });
/// 从任务队列中拿出来一个任务
task = this->tasks.front();
this->tasks.pop();
}
生产者线程 业务线程1 业务线程2
条件变量阻塞等待(wait())
条件变量阻塞等待(wait())
消息入列
唤醒线程(notify_one())
被唤醒(条件变量停止阻塞)
获取消息并执行业务

线程安全中所涉及的问题

死锁

  • 情况1:
线程1 线程2
获取互斥量1 获取互斥量2
获取互斥量2 获取互斥量1

解决办法:

  1. 一次只获取一个互斥量。
1
2
3
4
5
6
7
8
9
10
11

{
std::lock_guard<std::mutex> lock1(mutex1);
...
}

{
std::lock_guard<std::mutex> lock2(mutex2);
...
}

  1. 使用 STL 的语法
1
2
3
4
{
std::scoped_lock lock(mutex1, mutex2);
std::cout << "Main thread acquired locks" << std::endl;
}
  • 情况2:
线程1
获取互斥量1
获取互斥量1

解决办法:

  1. 使用带有可重入属性的互斥量。
1
2
3
4
5
6
7
8
9
10
11
/// STL
std::recursive_mutex mutex;

/// pthread
pthread_mutexattr_init(&attr);
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);

/// window API
EnterCriticalSection(&cs);
...
LeaveCriticalSection(&cs);
  • 情况3:
线程1 线程2
等待线程2 join 等待线程1 join

解决办法:

  1. 在同一线程创建其他线程,也在同一线程进行 join。

ABA问题

线程1 线程2
查询余额, 并存储进变量 i 查询余额, 并存储进变量 i
if(i >= 50) if(i >= 50)
i = i - 50; i = i - 50;
更新余额为 50
更新余额为 50

解决办法:

  1. 串行执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|          线程1           |          线程2           |
| :----------------------: | :----------------------: |
| 获取互斥量 mtx1 | 获取互斥量 mtx1 |
| 获取互斥量成功 | |
| 查询余额, 并存储进变量 i | |
| if(i >= 50) | |
| i = i - 50; | |
| 更新余额为 50 | |
| 释放互斥量 | |
| | 获取互斥量成功 |
| | 查询余额, 并存储进变量 i |
| | if(i >= 100) |
| | i = i - 50; |
| | 更新余额为 50 |

2. 使用 CAS(Compare And Swap) 的方法

```c++
| 线程1 | 线程2 |
| :--------------------------------------------------: | :--------------------------------------------------: |
| 查询余额, 并存储进变量 i | 查询余额, 并存储进变量 i |
| if(i == 100) | if(i == 100) |
| i = i - 50; | i = i - 50; |
| 如果当前值为 100 则更新约为 50(compare_and_exchange) | |
| 更新成功,事务结束 | |
| | 如果当前值为 100 则更新约为 50(compare_and_exchange) |
| | 更新失败,事务结束 |

初始化单例

C++11之前以下全局变量,线程不安全。

1
ThreadPlayAndRec g_ThreadPlayAndRec; ///< 全局变量,在程序启动时线程不安全
  • 错误的做法1:
1
2
3
4
5
6
ThreadPlayAndRec* p = NULL;

if(!p)
{
p = new ThreadPlayAndRec();
}
线程1 线程2
if(!p) if(!p)
new ThreadPlayAndRec(); new ThreadPlayAndRec();
p 被赋值 p 被赋值
线程1 线程2
if(!p)
new ThreadPlayAndRec(); if(!p)
p 被赋值 new ThreadPlayAndRec();
p 被赋值
  • 错误的做法2:
1
2
3
4
5
6
7
8
9
10
11
ThreadPlayAndRec* p = NULL;
std::mutex mtx;

if(!p) // 第一次检查
{
std::lock_guard<std::mutex> guard(mtx);
if(!p) // 第二次检查
{
p = new ThreadPlayAndRec();
}
}

正常流程:

线程1 线程2
if(!p) 第一次检查 if(!p) 第一次检查
尝试获取互斥量 尝试获取互斥量
获取互斥量成功
if(!p) 再次判断是否为空
new ThreadPlayAndRec();
p 被赋值
释放互斥量
获取互斥量成功
if(!p) 再次判断是否为空
p 不为空,流程结束

错误流程:

线程1 线程2
if(!p) 第一次检查
尝试获取互斥量
获取互斥量成功
if(!p) 再次判断是否为空
new ThreadPlayAndRec();
p 被赋值 if(!p) 第一次检查

最后一步产生了读写竞争。

  • 解决办法:
  1. 在 C++11 标准下使用 全局/静态变量。
  2. 使用 C++11 中提供的 std::call_once 保证初始化函数只被调用一次。
1
2
3
std::once_flag flag;
ThreadPlayAndRec* p = NULL;
std::call_once(flag, []() { p = new ThreadPlayAndRec(); });
  1. 使用互斥量
1
2
3
4
5
6
7
8
ThreadPlayAndRec* p = NULL;
std::mutex mtx;

std::lock_guard<std::mutex> guard(mtx);
if(!p)
{
p = new ThreadPlayAndRec();
}
  1. 在 C++11 之前,使用 Linux API 或者 Windows API 函数。

linux:

1
2
3
4
5
6
7
8
9
pthread_once_t once_control = PTHREAD_ONCE_INIT;

void init_function()
{
std::cout << "Initialization function executed" << std::endl;
}

/// 只会执行一次
pthread_once(&once_control, init_function);

windows:

1
2
3
4
5
6
7
EnterCriticalSection(&cs);
if (!initialized)
{
init_function();
initialized = true;
}
LeaveCriticalSection(&cs);
  1. 不管, 成本最低也是最适合我们的方法。

  2. 使用 gos::singleton, 其实现思路是 C++11 之前使用 double-check lock 的方法, C++11 之后使用 std::call_once

多个线程同时开始

测试线程需要被测试函数同时并行执行。

  • C++ STL 条件变量实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
std::mutex mutex_;
std::condition_variable cv_;
bool ready_ = false;

std::jthread j([this]() mutable
{
std::unique_lock<std::mutex> mutex(mutex_);
/// 等待同步开始
cv_.wait(mutex, [this] { return ready_; });
/// 执行函数
f();
}));

/// 锁定互斥量,以修改 ready_
mutex_.lock();
ready_ = true;
/// 发送给所有的线程,可以开始运行
cv_.notify_all();
mutex_.unlock();

隐藏的多线程问题

  • 场景1: 在通信线程,直接使用修改界面的语句。
    注: 界面有自己的线程调度,如果在其他线程中直接修改界面,可能会导致界面崩溃。

  • 场景2: 在回调函数中使用共享变量(全局、静态或类成员变量)。
    注: 回调函数一定在其他线程中执行,如果在回调函数中使用共享变量,可能会导致线程不安全。