C++线程知识简介

6 分钟阅读

C++线程知识简介

目的

假设我们使用的是多核 CPU,且无法避免使用多线程。站在产品的稳定性和性能优化的角度,对线程知识进行简介。

范围

为了在多线程的编程环境中,更好发挥多核CPU的性能,并对多线程相关的缺陷加以了解并进行规避。着重于讲并行而非并发的情况。

并行(Parallel)与并发(Concurrent):

  • 并行: 单线程。
  • 并发: 多线程。

并发:

任务1 任务2
执行语句1  
  执行语句2
执行语句3  
  执行语句4

并行:

任务1 任务2
执行语句1 执行语句2

热身

  1. 一个数据如果一个时刻是只读的,那么在这个时刻该数据是线程安全的。
  2. 一个数据被多个线程同时读写,那么该数据是线程不安全的。
  3. 在同一个线程中,对一个普通互斥量加锁两次,会发生死锁。
  4. intunsignedchardouble 等基本类型,均为线程不安全的。
  5. 互斥量的创建、加锁和解锁操作本身,并不耗时。

场景

线程间共享数据

场景举例:

  1. 通信线程之间同步消息序列号。
  2. ATS线程写入列车信息,其他线程读取。
  • C++ STL 互斥量版本
std::mutex mtx;
mtx.lock();
....
mtx.unlock();
  • C++ STL 读写锁版本
std::shared_mutex mutex;
int sharedData = 0;

void readerThread()
{
    std::shared_lock<std::shared_mutex> lock(mutex);
}

void writerThread()
{
    std::unique_lock<std::shared_mutex> lock(mutex);
    sharedData += 1;
}
写入线程 读取线程1 读取线程2
  readerThread() readerThread()
writerThread()    
  • C++ STL 原子变量版本
std::atomic<int> i;

/// 线程安全
void set()
{
    i++;
}

/// 线程安全
int get()
{
    return i;
}
  • Windows API 版本
HANDLE hMutex;

// 创建互斥量
hMutex = CreateMutex(NULL, FALSE, NULL);
if (hMutex == NULL)
{
    return 0;
}

// 加锁
WaitForSingleObject(hMutex, INFINITE);

...

// 解锁
ReleaseMutex(hMutex);

// 关闭互斥量
CloseHandle(hMutex);
  • Windows 临界区版本
CRITICAL_SECTION cs;
// 初始化临界区
InitializeCriticalSection(&cs);
// 加锁
EnterCriticalSection(&cs);
...
// 解锁
LeaveCriticalSection(&cs);
// 删除临界区
DeleteCriticalSection(&cs);

注: EnterCriticalSectionWaitForSingleObject 的区别:

  1. WaitForSingleObject 因为涉及到 用户态和内核态的切换,更慢。
  2. WaitForSingleObject 可以用于进程间的同步,而 EnterCriticalSection 不能。
  3. WaitForSingleObject 可以达到超时等待的效果,而 EnterCriticalSection 会一直等待。
  4. 在同一线程中多次调用 WaitForSingleObjectEnterCriticalSection 都不会产生死锁。
  • pthread 版本
pthread_mutex_t mutex;
pthread_mutex_init(&mutex, NULL);
pthread_mutex_lock(&mutex);

...

pthread_mutex_unlock(&mutex);
pthread_mutex_destroy(&mutex);
  • 注:pthread_mutex_t 默认是不可被同一线程加锁两次的,即不可重入。如果想要可以重入则需要设置属性。
pthread_mutexattr_t mutex_attr;
pthread_mutexattr_init(&mutex_attr);
pthread_mutexattr_settype(&mutex_attr, PTHREAD_MUTEX_RECURSIVE); // 设置为可重入

pthread_mutex_t mutex;
pthread_mutex_init(&mutex, &mutex_attr); // 使用属性,初始化互斥量
  • gos 库版本
HANDLE mutex_ = gos_mutex_init();
gos_mutex(mutex_);

...

gos_unmutex(mutex_);
gos_mutex_free(mutex_);

注:

  1. gos 库版本的互斥量,在 window 下是可重入的, 在 linux 下是不可重入的。

后台运行周期性任务

class NTPClient
{
public:
    /// 进行NTP同步, 后台线程周期运行
    void ntp_sync();
    /// 获取NTP是否成功, 业务线程调用
    bool get_ntp_valid();
};
  • C++ STL 标准库版本
NTPClient client;

void ntp_sync_thread()
{
    while (true)
    {
        client.ntp_sync();
        std::this_thread::sleep_for(std::chrono::seconds(10));
    }
}

std::thread t(ntp_sync_thread);
/// 分离线程
t.detach();
  • pthread 版本:
pthread_attr_t      stAttr;

/// 设置 pthread 属性
pthread_attr_init(&stAttr);
pthread_attr_setdetachstate(&stAttr, PTHREAD_CREATE_DETACHED);

/// 初始化线程
pthread_t thread_id;
pthread_create(&thread_id, NULL, threadFunction, &client);
  • windows 版本:
/// 1. Windows API:
HANDLE hThread = CreateThread(NULL, 0, threadFunction, &client, 0, NULL);
// 分离线程
CloseHandle(hThread);

/// 2. Vistual C++ CRT 版本
uintptr_t hThread = _beginthreadex(NULL, 0, threadFunction, &client, 0, NULL);

为什么选择 _beginthreadex 而不是 CreateThread?

_beginthreadex 为每个线程分配自己的tiddata内存结构, 其中保存了 C 语言中的全局变量, 如 errno

参考资料: windows 多线程: CreateThread、_beginthread、_beginthreadex、AfxBeginThread 的区别

  • gos 库版本
class NTPClientThread: public GThread
{
public :
    NTPClientThread()
    {
        Start();
    }

    virtual GOS_THREAD_RET ThreadEntry(void* pPara)
    {
        while (true)
        {
            client.ntp_sync();
            GThread::Sleep(10000);
        }
        return 0;
    }

private:
    NTPClient client;
};

后台运行耗时任务(一次性任务)

场景举例:

  1. 耗时函数放到后台运行,结果想要获取时再主动获取。
  2. 两个执行时间非常长的函数,并行执行可节省时间。
  3. 等待打印机打印的同时,继续执行其他任务。

实现思路:

  1. 启动一个线程,运行一个函数,函数运行结束,线程退出。
  2. 业务线程: 创建后台线程。
  3. 后台线程: 运行函数,函数结束后退出。
  4. 业务线程: 等待后台线程结束后,取得函数结果。
  • C++ STL 标准库版本
int i = 0;

std::thread t([&i](){
    i = 1;
});

// 等待后台线程结束
t.join();

// 获取线程函数结果
std::cout << i << std::endl;
  • C++ STL 异步版本
int i = 0;

// 使用 std::async 在后台执行函数并获取结果
auto future = std::async(std::launch::async, [&i]() {
    i = 1;
    return i;
});

// 等待后台线程结束并获取结果
i = future.get();

// 输出线程函数结果
std::cout << i << std::endl;
  • Windows API 版本
int i = 0;

DWORD WINAPI f(LPVOID lpParam)
{
    i = 1;
    return 0;
}

HANDLE hThread = CreateThread(NULL, 0, f, NULL, 0, NULL);

// 等待后台线程结束
WaitForSingleObject(hThread, INFINITE);

// 关闭线程句柄
CloseHandle(hThread);

// 获取线程函数结果
std::cout << i << std::endl;
  • pthread 版本
int i = 0;

void f()
{
    i = 1;
}

pthread_t t;
pthread_create(&t, NULL, f, NULL);

// 等待后台线程结束
pthread_join(t, NULL);

// 获取线程函数结果
std::cout << i << std::endl;

线程主动停止与资源释放

线程正在运行时,对线程进行销毁(free, delete),可能会访问到已经被释放的内存,导致程序崩溃。

因此,线程停止时需要等待线程结束后再释放资源。

对于 joinable 的线程,需要调用 join 函数等待线程结束后再释放资源。

但对于 detach 的线程,如何知晓线程函数执行完毕。

场景举例: 视频播放线程的主动停止。

class ThreadPlayAndRec : public GThread
{};

ThreadPlayAndRec* p = new ThreadPlayAndRec();
p->Start();
...
p->Stop();
/// 通知线程结束后需要等待多久,线程函数才会结束
delete p;

解决办法: 设置结束标识位来判断。

VOID ThreadPlayAndRec::Free()
{
    m_ulThreadState = THREAD_STATE_CLOSING;
    Stop();
    while(1)
    {
        if (m_ulThreadState == THREAD_STATE_FREE)
        {
            break;
        }

        gos_sleep_1ms(1);
    }

}

ThreadPlayAndRec* p = new ThreadPlayAndRec();
p->Start(); ///< 通知线程开始
...
p->Stop(); ///< 通知线程结束
p->Free(); ///< 阻塞等待线程结束
delete p;

温州S2项目对 GThread 的改动:

ThreadPlayAndRec* p = new ThreadPlayAndRec();
p->Start(); ///< 通知线程开始
delete p; ///< 等待线程结束后, 释放资源

线程唤醒(线程池)

场景举例: 线程池中的线程,在任务队列出现任务时,唤醒一个线程进行处理。

  • Linux 信号量举例
/// 初始化
sem_init()

/// 等待信号量唤醒
sem_wait()

/// 信号量唤醒
sem_post()

/// 回收资源
sem_destroy()
生产者线程 业务线程1 业务线程2
初始化 sem_init()    
  等待唤醒 sem_wait()  
    等待唤醒 sem_wait()
消息入列 sem_post()    
  被唤醒后, 取出消息并处理  
回收资源 sem_destroy()    
  • pthread 条件变量版本
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
int flag = 0;

void* threadFunction(void* arg) {
    pthread_mutex_lock(&mutex);
    while (!flag) {
        pthread_cond_wait(&cond, &mutex);
    }
    printf("Thread received signal\n");
    pthread_mutex_unlock(&mutex);

    ...

    return NULL;
}

int main() {
    pthread_t thread;
    pthread_create(&thread, NULL, threadFunction, NULL);

    /// 主线程等待一段时间后发送信号
    sleep(2);
    pthread_mutex_lock(&mutex);
    flag = 1;
    /// 唤醒线程
    pthread_cond_signal(&cond);
    pthread_mutex_unlock(&mutex);

    pthread_join(thread, NULL);

    return 0;
}
  • C++ STL 风格代码举例
void producers()
{
    std::unique_lock<std::mutex> lock(queue_mutex);
    /// 这个封装好的函数放入任务列表中
    tasks.emplace(&f);

    /// 通知一个阻塞中的线程,任务队列中有任务了
    condition.notify_one();
}

void consumer()
{
    std::unique_lock<std::mutex> lock(this->queue_mutex);
    /// 阻塞等待直到函数返回 true
    this->condition.wait(lock, [this] { return !this->tasks.empty(); });
    /// 从任务队列中拿出来一个任务
    task = this->tasks.front();
    this->tasks.pop();
}
生产者线程 业务线程1 业务线程2
  条件变量阻塞等待(wait())  
    条件变量阻塞等待(wait())
消息入列    
唤醒线程(notify_one())    
  被唤醒(条件变量停止阻塞)  
  获取消息并执行业务  

线程安全中所涉及的问题

死锁

  • 情况1:
线程1 线程2
获取互斥量1 获取互斥量2
获取互斥量2 获取互斥量1

解决办法:

  1. 一次只获取一个互斥量。

{
std::lock_guard<std::mutex> lock1(mutex1);
...
}

{
std::lock_guard<std::mutex> lock2(mutex2);
...
}

  1. 使用 STL 的语法
{
    std::scoped_lock lock(mutex1, mutex2);
    std::cout << "Main thread acquired locks" << std::endl;
}
  • 情况2:
线程1
获取互斥量1
获取互斥量1

解决办法:

  1. 使用带有可重入属性的互斥量。
/// STL
std::recursive_mutex mutex;

/// pthread
pthread_mutexattr_init(&attr);
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);

/// window API
EnterCriticalSection(&cs);
...
LeaveCriticalSection(&cs);
  • 情况3:
线程1 线程2
等待线程2 join 等待线程1 join

解决办法:

  1. 在同一线程创建其他线程,也在同一线程进行 join。

ABA问题

线程1 线程2
查询余额, 并存储进变量 i 查询余额, 并存储进变量 i
if(i >= 50) if(i >= 50)
i = i - 50; i = i - 50;
更新余额为 50  
  更新余额为 50

解决办法:

  1. 串行执行
|          线程1           |          线程2           |
| :----------------------: | :----------------------: |
|     获取互斥量 mtx1      |     获取互斥量 mtx1      |
|      获取互斥量成功      |                          |
| 查询余额, 并存储进变量 i |                          |
|       if(i >= 50)        |                          |
|       i = i - 50;        |                          |
|      更新余额为 50       |                          |
|        释放互斥量        |                          |
|                          |      获取互斥量成功      |
|                          | 查询余额, 并存储进变量 i |
|                          |       if(i >= 100)       |
|                          |       i = i - 50;        |
|                          |      更新余额为 50       |

2. 使用 CAS(Compare And Swap) 的方法

```c++
|                        线程1                         |                        线程2                         |
| :--------------------------------------------------: | :--------------------------------------------------: |
|               查询余额, 并存储进变量 i               |               查询余额, 并存储进变量 i               |
|                     if(i == 100)                     |                     if(i == 100)                     |
|                     i = i - 50;                      |                     i = i - 50;                      |
| 如果当前值为 100 则更新约为 50(compare_and_exchange) |                                                      |
|                  更新成功,事务结束                  |                                                      |
|                                                      | 如果当前值为 100 则更新约为 50(compare_and_exchange) |
|                                                      |                  更新失败,事务结束                  |

初始化单例

C++11之前以下全局变量,线程不安全。

ThreadPlayAndRec g_ThreadPlayAndRec; ///< 全局变量,在程序启动时线程不安全
  • 错误的做法1:
ThreadPlayAndRec* p = NULL;

if(!p)
{
    p = new ThreadPlayAndRec();
}
线程1 线程2
if(!p) if(!p)
new ThreadPlayAndRec(); new ThreadPlayAndRec();
p 被赋值 p 被赋值
线程1 线程2
if(!p)  
new ThreadPlayAndRec(); if(!p)
p 被赋值 new ThreadPlayAndRec();
  p 被赋值
  • 错误的做法2:
ThreadPlayAndRec* p = NULL;
std::mutex mtx;

if(!p) // 第一次检查
{
    std::lock_guard<std::mutex> guard(mtx);
    if(!p) // 第二次检查
    {
        p = new ThreadPlayAndRec();
    }
}

正常流程:

线程1 线程2
if(!p) 第一次检查 if(!p) 第一次检查
尝试获取互斥量 尝试获取互斥量
获取互斥量成功  
if(!p) 再次判断是否为空  
new ThreadPlayAndRec();  
p 被赋值  
释放互斥量  
  获取互斥量成功
  if(!p) 再次判断是否为空
  p 不为空,流程结束

错误流程:

线程1 线程2
if(!p) 第一次检查  
尝试获取互斥量  
获取互斥量成功  
if(!p) 再次判断是否为空  
new ThreadPlayAndRec();  
p 被赋值 if(!p) 第一次检查

最后一步产生了读写竞争。

  • 解决办法:
  1. 在 C++11 标准下使用 全局/静态变量。
  2. 使用 C++11 中提供的 std::call_once 保证初始化函数只被调用一次。
std::once_flag flag;
ThreadPlayAndRec* p = NULL;
std::call_once(flag, []() { p = new ThreadPlayAndRec(); });
  1. 使用互斥量
ThreadPlayAndRec* p = NULL;
std::mutex mtx;

std::lock_guard<std::mutex> guard(mtx);
if(!p)
{
    p = new ThreadPlayAndRec();
}
  1. 在 C++11 之前,使用 Linux API 或者 Windows API 函数。

linux:

pthread_once_t once_control = PTHREAD_ONCE_INIT;

void init_function()
{
    std::cout << "Initialization function executed" << std::endl;
}

/// 只会执行一次
pthread_once(&once_control, init_function);

windows:

EnterCriticalSection(&cs);
if (!initialized)
{
    init_function();
    initialized = true;
}
LeaveCriticalSection(&cs);
  1. 不管, 成本最低也是最适合我们的方法。

  2. 使用 gos::singleton, 其实现思路是 C++11 之前使用 double-check lock 的方法, C++11 之后使用 std::call_once

多个线程同时开始

测试线程需要被测试函数同时并行执行。

  • C++ STL 条件变量实现
std::mutex mutex_;
std::condition_variable cv_;
bool ready_ = false;

std::jthread j([this]() mutable
                {
                    std::unique_lock<std::mutex> mutex(mutex_);
                    /// 等待同步开始
                    cv_.wait(mutex, [this] { return ready_; });
                    /// 执行函数
                    f();
                }));

/// 锁定互斥量,以修改 ready_
mutex_.lock();
ready_ = true;
/// 发送给所有的线程,可以开始运行
cv_.notify_all();
mutex_.unlock();

隐藏的多线程问题

  • 场景1: 在通信线程,直接使用修改界面的语句。 注: 界面有自己的线程调度,如果在其他线程中直接修改界面,可能会导致界面崩溃。

  • 场景2: 在回调函数中使用共享变量(全局、静态或类成员变量)。 注: 回调函数一定在其他线程中执行,如果在回调函数中使用共享变量,可能会导致线程不安全。

更新时间: