0%

C++ 异步运算接口

std::async介绍

下面是一个很好的并行计算的例子。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#include <future>
#include <iostream>
#include <string>

bool is_prime(int x)
{
for (int i = 2; i < x; i++)
{
if (x % i == 0)
return false;
}
return true;
}

int main()
{
/** is_prime(700020007)这个函数调用隐藏于主线程,异步执行 */
std::future<bool> fut = std::async(is_prime, 700020007);
std::cout << "please wait";
std::chrono::milliseconds span(100);
/** 这个异步调用函数等待100ms,如果没有计算完就继续等待 */
while (fut.wait_for(span) != std::future_status::ready)
std::cout << ".";
std::cout << std::endl;

/** 计算完毕后,获取函数返回值 */
std::cout << "final result: " << std::boolalpha << fut.get() << std::endl;

system("pause");
return 0;
}

std::async中的第一个参数是启动策略,它控制std::async的异步行为,我们可以用三种不同的启动策略来创建std::async
·std::launch::async
保证异步行为,即传递函数将在单独的线程中执行
·std::launch::deferred
当其他线程调用get()来访问共享状态时,将调用非异步行为
·std::launch::async | std::launch::deferred
默认行为。有了这个启动策略,它可以异步运行或不运行,这取决于系统的负载,但我们无法控制它。

见下面例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
#include <iostream>
#include <string>
#include <chrono>
#include <thread>
#include <future>

using namespace std::chrono;

std::string fetchDataFromDB(std::string recvData)
{
//确保函数要5秒才能执行完成
std::this_thread::sleep_for(seconds(5));

//处理创建数据库连接、获取数据等事情
return "DB_" + recvData;
}

std::string fetchDataFromFile(std::string recvData)
{
//确保函数要5秒才能执行完成
std::this_thread::sleep_for(seconds(5));

//处理获取文件数据
return "File_" + recvData;
}

int main()
{
//获取开始时间
system_clock::time_point start = system_clock::now();

/** 使用std::launch::async,来指定其异步执行 */
std::future<std::string> resultFromDB = std::async(std::launch::async,
fetchDataFromDB, "Data");

//从文件获取数据
std::string fileData = fetchDataFromFile("Data");

//从DB获取数据
//数据在future<std::string>对象中可获取之前,将一直阻塞
std::string dbData = resultFromDB.get();

//获取结束时间
auto end = system_clock::now();

auto diff = duration_cast<std::chrono::seconds>(end - start).count();
std::cout << "Total Time taken= " << diff << "Seconds" << std::endl;

//组装数据
std::string data = dbData + " :: " + fileData;

//输出组装的数据
std::cout << "Data = " << data << std::endl;

return 0;
}

std::promise介绍

std::promise的作用就是提供一个不同线程之间的数据同步机制,它可以存储一个某种类型的值,并将其传递给对应的future, 即使这个future不在同一个线程中也可以安全的访问到这个值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#include <iostream>       // std::cout
#include <functional> // std::ref
#include <thread> // std::thread
#include <future> // std::promise, std::future

void print_int (std::future<int>& fut)
{
std::cout << "Enter print_int: " << std::endl;
int x = fut.get(); ///< 在这里会等待外部std::promise变量set_value进来,否则会一致阻塞在这里
std::cout << "value: " << x << '\n';
}

int main ()
{
std::promise<int> prom; // 创建一个std::promise变量

std::future<int> fut = prom.get_future(); // 创建一个std::future变量

std::thread th1 (print_int, std::ref(fut)); // 创建一个线程执行函数print_int

std::this_thread::sleep_for(std::chrono::seconds(3));
prom.set_value (10); // 传值进入线程th1

th1.join();
system("pause");
return 0;
}

std::packaged_task介绍

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#include <iostream>     // std::cout
#include <future> // std::packaged_task, std::future
#include <chrono> // std::chrono::seconds
#include <thread> // std::thread, std::this_thread::sleep_for

// count down taking a second for each value:
int countdown (int from, int to)
{
for (int i = from; i != to; --i)
{
std::cout << i << '\n';
std::this_thread::sleep_for(std::chrono::seconds(1));
}
std::cout << "Lift off!\n";
return from - to;
}

int main ()
{
// 创建一个std::packaged_task对象
std::packaged_task<int(int, int)> tsk (countdown);
// 创建一个std::future对象,用于跨线程异步获取该线程返回的值
std::future<int> ret = tsk.get_future();
// 把线程对象移动进一个可运行的线程中
std::thread th (std::move(tsk), 10, 0);
// 让该线程从主线程中分离
th.detach();
// ...
// 利用std::future对象来获取已经分离开的线程运行是否结束的返回的值
int value = ret.get();
std::cout << "The countdown lasted for " << value << " seconds.\n";

system("pause");
return 0;
}

硬件支持的线程数量

由于硬件支持的并行线程数量有限,如果创建线程的数量比硬件支持的数量要多,那么CPU进行的上下文切换可能会浪费大量时间,所以了解硬件支持的线程数量是高效并行编程的重点。

使用std::thread::hardware_concurrency()来获取硬件支持的线程数量。

1
2
3
4
5
6
7
#include <iostream>
#include <thread>

int main() {
unsigned int n = std::thread::hardware_concurrency();
std::cout << n << " concurrent threads are supported.\n";
}

std::thread::yield介绍

关于std::thread::yield 和 std::sleep_for的比较

例子:

1
2
3
4
5
6
7
8
9
10
11
12
void worker_thread() {
while (true) {
std::function<void()> task;
if (work_queue.try_pop(task)) {
/// 获取到任务就运行
task();
} else {
/// 没有获取到就休息一下
std::this_thread::yield();
}
}
}