0%

屏障

屏障的这些操作可以强制内存顺序约束,而无需修改任何数据,并且与使用memory_order_relaxed顺序约束的原子操作组合起来使用。屏障是全局操作,能在执行该屏障的线程里影响其他原子操作的顺序。
屏障一般也被称为内存障碍(memory barriers), 它们之所以这样命名, 是因为他们在代码中放置了一行代码,是的特定的操作无法穿越。
在独立变量上的松散操作通常可以自由地被编译器或硬件重新排序。屏障限制了这一自由,并且在之前并不存在的地方引入happens-beforesynchronizes-with关系。

松散操作可以使用平常排序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#include <atomic>
#include <thread>
#include <cassert>
#include <string>
#include <vector>

std::atomic<bool> x, y;
std::atomic<int> z;

void write_x_then_y() {
x.store(true, std::memory_order_relaxed);
std::atomic_thread_fence(std::memory_order_relaxed);
y.store(true, std::memory_order_relaxed);
}

void read_y_then_x() {
while (!y.load(std::memory_order_relaxed));
std::atomic_thread_fence(std::memory_order_acquire);
if (x.load(std::memory_order_relaxed)) ++z;
}

int main() {
x = false;
y = false;
z = 0;
std::thread a(write_x_then_y);
std::thread b(read_y_then_x);
a.join();
b.join();
assert(z.load() != 0);
}

并发数据结构

数据结构的线程安全的基本原理

  • 保证当数据结构不变性被别的线程破坏时的状态不被任何别的线程看到。
  • 注意避免数据结构结构所固有的竞争现象,通过为完整操作提供函数,而不是提供操作步骤。
  • 注意当出现例外时,数据结构是怎么样来保证不变性不被破坏。
  • 当使用数据结构时,通过限制锁的范围和避免使用嵌套锁,来降低产生死锁的机会。

第二个要考虑的是实现真正的并发读取

  • 锁的范围能否被限定,使得一个操作的部分可以在锁外被执行?
  • 数据结构的不同部分能否被不同的互斥元保护?
  • 能否所有操作需要同样级别的保护?
  • 数据结构的一个小改变能否在不影响操作语义情况下提高并发机会?

无锁数据结构:

对于有资格称为无锁的数据结构,就必须能够让多于一个线程可以并发的访问此数据结构。这些线程不需要做相同的操作,无所队列可以允许一个线程push的同时,另一个线程pop,到那时如果两个吸纳成同时试图插入新数据的时候,就会打破无锁队列。不仅如此,如果一个访问数据结构的线程在操作中途被调度器挂起的话别的线程必须仍然能够完成操作而无需等待挂起的线程。

无等待数据结构

无等待的数据结构是一种无锁的数据结构,并且有着额外的特性,每个访问数据结构的线程都可以在有限数量的步骤内完成它的操作,而不用管别的线程的行为,因为其他线程的冲突而可能卷入无限次重试的算法不是无等待。
为了确保每个线程都能够在有限步骤内完成它的操作,就必须保证每个操作都可以在一个操作周期内执行,并且一个线程执行的操作不会导致另一个线程上操作的失败。

无锁数据结构的优点与缺点

使用无锁数据结构的最主要的原因就是为了实现最大程度的并发。对于基于锁的容器,总是有可能一个线程必须阻塞,并在可以继续前等待另一个线程完成其操作。互斥元锁的目的就是通过互斥来阻止并发。
使用无锁数据结构的第二个原因是健壮性。当一个线程在持有锁的时候终止,哪个数据结构就永远被破坏了。但是如果一个线程在操作无所数据结构时终止了,就不会丢失任何数据。

因为不使用任何锁,因此无锁数据结构是不会发生死锁的,尽管有可能存在活锁.
当两个线程都试图修改数据结构,但是对于每个线程来说,另一个线程所作的修改都会要求此线程的操作重新被执行,因此这两个线程都会一直循环和不断尝试,在这种情况下就会发生活锁。除非某个线程先到达(通过协议,通过更快或完全靠运气), 不然此循环会一直继续下去。活锁通常是短暂的,因为它们取决于线程的精确调度,因此活锁会降低性能而不会导致长期问题。
这就是无锁数据结构的缺点,尽管它可以增加在数据结构上操作的并发能力,并且减少了线程等待的时间,但是它可能降低整体性能。首先,无锁代码使用的原子操作可能比非原子操作要慢很多。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
template<typename T>
class lock_free_stack {
private:
struct node {
T data;
node* next;
node(T const& data_) : data(data_) {}
};
std::atomic<node*> head;
public:
void push(T const& data){
node* const new_node = new node(data);
new_node->next = head;
while(!head.compare_exchange_weak(new_node->next, new_node));
}
void pop(T& result){
node* old_head = head.load();
while(!head.compare_exchange_weak(old_head, old_head->next));
result = old_head->data;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
template<typename T>
class lock_free_stack {
private:
std::atomic<unsigned> threads_in_pop;
std::atomic<node*> to_be_deleted;

void try_reclaim(node* old_head) {
if (threads_in_pop == 1) {
node* nodes_to_delete = to_be_deleted.exchange(nullptr);
if (!--threads_in_pop) {
delete_nodes(nodes_to_delete);
}
else if (nodes_to_delete) {
chanin_pending_nodes(nodes_to_delete);
}
}
}
static void delete_node(node* nodes) {
while (nodes) {
node* next = nodes->next;
delete nodes;
nodes = next;
}
}
public:
std::shared_ptr<T> pop() {
++threads_in_pop;
node* old_head = head.load();
while (old_head && !head.compare_exchange_weak(old_head, old_head->next));
std::shared_ptr<T> res;
if (old_head) {
res.swap(old_head->data);
}
try_reclaim(old_head);
return res;
}
};

synchronizes-with关系

synchronizes-with 关系是你只能在原子类型上的操作之间得到的东西。如果一个数据结构包含原子类型,并且在该数据结构上的操作会内部执行适当的原子操作,该数据结构上的操作(如锁定互斥元)可能会提供这种关系,但是从根本上说synchronizes-with关系只出自原子类型上的操作。

基本思想: 在一个变量x上的一个被适当标记的原子写操作w, 与在x上的一个被适当标记的,通过写入(W),或是由与执行最初的写操作W相同的线程在x上的后续原子写操作,或是由任意线程在x上一系列的原子的读-修改-写操作(fetch_add()compare_exchange_weak())来读取所存储的值的原子读操作同步,其中随后通过第一个线程读取的值是通过W写入的值。
换个说法:如果线程A存储一个值而线程B读取该值,那么线程A中存储和线程B中的载入之间存在一种synchronizes-with关系。

happens-before 关系

happens-before(发生于之前)关系是程序中操作顺序的基本构件,它指定了哪些操作看到其他操作的结果。对于单个线程,它是直观的,如果一个操作排在另一个操作之前,那么该操作就发生于另一个操作之前。这就意味着,如果一个操作(A)发生于另一个操作(B)之前的语句里,那么A就发生于B之前。

有时候,单条语句中的操作是有顺序的,例如使用内置的逗号操作符或者使用一个表达式的结果作为另一个表达式的参数。
但一般来说,单条语句中的操作是非顺序的,而且也没有sequenced-before(因此也没有happens-before).当然,一条语句的所有操作在下一句的所有操作之前发生。
对于多线程中,如果线程间的一个线程上的操作A发生于另一个线程上的操作B之前,那么A发生于B之前。

原子操作的内存顺序

有六种内存顺序选项可以应用到原子类型上的操作:

  • memory_order_relaxed
  • memory_order_consume
  • memory_order_acquire
  • memory_order_release
  • memory_order_acq_rel
  • memory_order_seq_cst

除非你为某个特定的操作做出指定,原子类型上的所有操作的内存顺序选项都是memory_order_seq_cst, 这是最严格的可用选项。

尽管有六种选项,它们其实代表了三种模型:

  • 顺序一致(sequentially consistent)顺序(memory_order_seq_cst)
  • 获得-释放(acquire_release)顺序(memory_order_consumememory_order_acquirememory_order_releasememory_order_acq_rel)
  • 松散(relaxed)顺序(memory_order_relaxed)
    这些不同的内存顺序模型在不同的CPU架构上可能有着不同的成本。
    例如:在基于具有通过处理器而非做更改者对操作的可见性进行良好的控制架构上的系统中,
    顺序一致的顺序相对于获取-释放顺序或松散顺序,可能会要求额外的同步指令。
    获取-释放相对于松散顺序,可能会要求额外的同步指令。

如果这些系统拥有很多处理器,这些额外的同步指令可能占据显著的时间量,从而降低该系统的整体性能。
另一方面,为了确保原子性,对于超出需要的获得=释放排序,使用x86或x86-64架构的CPU不会要求额外的指令,甚至对于载入操作,顺序一致顺序不需要任何特殊的处理,尽管在存储时会有一点额外的成本。

不同的内存顺序模型的可用性,允许高手们利用更细粒度的顺序关系来提升性能,在不太关键的情况下,当允许使用默认的顺序一致顺序时,它们是有优势的。

1. 顺序一致顺序

默认的顺序被命名为顺序一致(sequentially consistent), 因为这意味着程序的行为与一个简单的顺序的世界观时一致的。
如果所有原子类型实例上的操作时顺序一致的,多线程程序的行为,就好像是所有这些操作由单个线程以某种特定的顺序执行一致的,多线程程序的行为,就好像是所有这些操作由单个线程以某种特定的顺序进行执行一样。
这意味着如果你的代码在一个线程中有一个操作在另一个之前,其顺序必须对所有其他的线程可见。
从同步的观点来看,顺序一致的存储与读取该存储值的同一个变量的顺序一致载入是同步的。这提供了一种两个(或多个)线程操作的顺序约束,但顺序一致比它更加强大。
在使用顺序一致原子操作的系统中,所有在载入后完成的顺序一致原子操作,也必须出现在其他线程的存储之后。该约束并不会推荐使用具有松散内存顺序的原子操作,它们仍然可以看到操作处于不同的顺序,所以你必须在所有的线程上使用顺序一致的操作。
但易于理解就产生了代价,在一个带有许多处理器的弱顺序机器上,他可能导致显著的性能惩罚,因为操作的整体顺序必须与处理器之间保持一致,可能需要处理器之间密集且昂贵的同步操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
#include <atomic>
#include <thread>
#include <cassert>

std::atomic<bool> x, y;
std::atomic<int> z;

void write_x() {
x.store(true, std::memory_order_seq_cst);
}

void write_y() {
y.store(true, std::memory_order_seq_cst);
}

void read_x_then_y() {
while (!x.load(std::memory_order_seq_cst));
if (y.load(std::memory_order_seq_cst)) {
++z;
}
}

void read_y_then_x() {
while (!y.load(std::memory_order_seq_cst));
if (x.load(std::memory_order_seq_cst)) {
++z;
}
}

int main() {
x = false;
y = false;
z = 0;
std::thread d(read_y_then_x);
std::thread c(read_x_then_y);
std::thread a(write_x);
std::thread b(write_y);

a.join();
b.join();
c.join();
d.join();
assert(z.load() != 0); ///< 这里永不触发
}

顺序一致时最直观和直觉的排序,但也是最昂贵的内存顺序,因为它要求所有线程之间的全局同步。在多处理器系统中,这可能需要处理器之间相当密集和耗时的通信。

2. 非顺序一致的内存顺序

时间不再有单一的全局顺序,这意味着不同的线程可能看到相同的操作的不同方面。你不仅得考虑事情真正的并行发生,而且线程不必和事件的顺序一致。
即使线程正在运行完全相同的代码,由于其他线程中的操作没有明确的顺序约束,它们可能与时间的顺序不一致,因为不同的CPU缓存和内部缓冲区可能为相同的内存保存了不同的值。
在没有其他的顺序约束时,唯一的要求是所有的线程对每个独立变量的修改顺序达成一致。

3. 松散顺序

以松散顺序执行的原子类型上的操作不参与synchronzes-with关系。单线程中的同一个变量的操作仍然服从happens-before关系,但对于其他线程的顺序几乎没有任何要求。唯一的要求是,从同一个线程对单个原子变量的访问不能被重排,一旦给定的线程看到了原子变量的特定值,该线程之后的读取就不能获取该变量更早的值。
在没有任何线程同步的情况下,每个变量的修改顺序时使用memory_order_relaxed的线程之间唯一共享的东西。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#include <atomic>
#include <thread>
#include <cassert>

std::atomic<bool> x, y;
std::atomic<int> z;

void write_x_then_y() {
x.store(true, std::memory_order_relaxed);
y.store(true, std::memory_order_relaxed);
}

void read_y_then_x() {
while (!y.load(std::memory_order_relaxed));
if (x.load(std::memory_order_relaxed)) {
++z;
}
}

int main() {
x = false;
y = false;

z = 0;
std::thread b(read_y_then_x);
std::thread a(write_x_then_y);

a.join();
b.join();

assert(z.load() != 0); ///< 可能会触发
}

这里的assert可以触发,因为x的载入能够读到false,即使y的载入读到了true,并且x的存储发生于y存储之前。x和y是不同的变量,所以关于每个操作所产生的值的可见性没有顺序保证。

不同变量的松散操作可以被自由地重排前提是它们服从所有约束下的happens-before关系(例如在同一个线程中)。
它们并不引入synchronizes-with关系。即便在存储操作中存在happens-before关系,但任一存储和任一载入之间却不存在,所以载入可以在顺序之外看到存储。

4. 获取-释放顺序

获取-释放顺序是松散顺序的进步,操作仍然没有总的顺序,但的确引入了一些同步。在这用顺序模型下,原子载入时获取操作(memory_order_acquire),原子存储时释放操作(memory_order_release), 原子的读-修改-写操作是获取、释放或两者兼备(memory_order_acq_rel)。同步在进行释放的线程和进行获取的线程之间是对偶的。释放操作与读取写入值的获取操作同步。这意味着,不同的线程仍然可以看到不同的排序,但这些顺序是受到限制的

5. 使用获取-释放顺序和MEMORY_ORDER_CONSUME的数据依赖

有两个处理数据历来的新的关系:依赖顺序在其之前(dependency-ordered-before)和带有对其的以来(carries-a-dependency-to)。
sequenced-before相似,carries-a-dependency-to严格适用于的单个线程之内,是操作间数据以来的基本模型。如果操作A的结果被用于操作B的操作数,那么A带有对B的依赖。如果操作A的结果是类似int的标量类型的值,那么如果A的结果存储一个变量中,并且该变量随后被用作操作B的操作数,此关系也是适用的。这种操作具有传递性,所以如果A带有对B的以来且B带有对C的依赖,那么A带有对C的依赖。

另一方面,depency-order-before的关系可以适用于线程之间。它是通过使用标记了memory_order_consume的原子载入操作引入的。
这是memory_order_acquire的一种特例,它限制了对直接依赖的数据同步。标记为memroy_order_releasememory_order_acq_relmemory_order_seq_cst的存储操作A的依赖顺序在标记为memory_order_acquire,那么这与synchronizes-with关系所得到的是相反的。如果操作B带有对操作C的某种依赖,那么A也是依赖顺序在C之前。

如果这对线程间happens-before关系没有影响,那么在同步目的上就无法为你带来任何好处,但它的却实现了:如果A依赖顺序在B之前,则A也是线程间发生于B之前。

这种内存顺序的一个重要用途,是在原子操作载入指向某数据的指针的场合。通过在载入上使用memory_order_consume以及在之前的存储上使用memory_order_release, 你可以确保所指向的数据得到正确的同步,无需在其他非依赖的数据上强加任何同步需求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#include <atomic>
#include <thread>
#include <cassert>
#include <string>

struct X {
int i;
std::string s;
};

std::atomic<X*> p;
std::atomic<int> a;

void create_x() {
X* x = new X;
x->i = 42;
x->s = "hello";
a.store(99, std::memory_order_relaxed);
p.store(x, std::memory_order_release);
}

void use_x() {
X* x;
while (!(x = p.load(std::memory_order_consume))) {
std::this_thread::sleep_for(std::chrono::microseconds(1));
}
assert(x->i == 42);
assert(x->s == "hello");
assert(a.load(std::memory_order_relaxed) == 99);
}

int main() {
std::thread t2(use_x);
std::thread t1(create_x);
t1.join();
t2.join();
}

见一个简单的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#include <iostream>
#include <variant>
#include <string>

struct MyVisitor
{
void operator()(double d) const {
std::cout << d << '\n';
}
void operator()(int i) const {
std::cout << i << '\n';
}
void operator()(const std::string& s) const {
std::cout << s << '\n';
}
};
int main()
{
std::variant<int, double, std::string> var1(42), var2(3.14), var3("visit");

std::visit(MyVisitor(), var1); // calls operator() for matching int type

std::visit(MyVisitor(), var2); // calls operator() for matching double type

std::visit(MyVisitor(), var3); // calls operator() for matching std::string type

return 0;

看官网给的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
#include <iomanip>
#include <iostream>
#include <string>
#include <type_traits>
#include <variant>
#include <vector>

// the variant to visit
using var_t = std::variant<int, long, double, std::string>;

// helper type for the visitor #4
template<class... Ts> struct overloaded : Ts... { using Ts::operator()...; };

int main() {
std::vector<var_t> vec = {10, 15l, 1.5, "hello"};
for (auto& v : vec) {
/// 1. void visitor, only called for side-effects(here, for I/O)
std::visit([](auto&& arg) {std::cout << arg; }, v);

/// 2. value-returning visitor, demonstrates the idiom of returning another variant
var_t w = std::visit([](auto&& arg)->var_t {return arg + arg; }, v);

/// 3. type-matching visitor: a lambda that that handles each type differently.
std::cout << ". After doubling, variant holds.";

std:visit([](auto&& arg) {
using T = std::decay_t<decltype(arg)>;
if constexpr (std::is_same_v<T, int>)
std::cout << "int with value " << arg << '\n';
else if constexpr (std::is_same_v<T, long>)
std::cout << "long with value " << arg << '\n';
else if constexpr (std::is_same_v<T, double>)
std::cout << "double with value " << arg << '\n';
else if constexpr (std::is_same_v<T, std::string>)
std::cout << "std::string with value " << arg << '\n';
else
static_assert(always_false_v<T>, "non-exhaustive visitor!");
}, w);
}

for (auto& v : vec) {
/// 4. another type-matching visitor: a class with 3 overloaded operator()'s
std::visit(overloaded{
[](auto arg) {std::cout << arg << ' '; },
[](double arg) {std::cout << std::fixed << arg << ' '; },
[](const std::string arg) {std::cout << std::quoted(arg) << ' '; },
}, v);
}

system("pause");
}

其中最为核心的代码片段为:

1
2
3
4
5
6
7
8
9
template<class... Fs> struct overload : Fs... { using Fs::operator()...; };
template<class... Fs> overload(Fs...) -> overload<Fs...>;

std::visit(overload
{
[](int x){ ... },
[](long x){ ... },
[](auto...){ ... }
}, v);

使用宏封装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#include <iomanip>
#include <iostream>
#include <string>
#include <type_traits>
#include <variant>
#include <vector>

template<class... Fs> struct overload :Fs... {using Fs::operator()...; };
template<class... Fs> overload(Fs...)->overload<Fs...>;

template<class... Ts>
struct matcher {
std::tuple<Ts...> vs;
template<class... Vs> constexpr matcher(Vs&&... vs) : vs(std::forward<Vs>(vs)...) {}
template<class Fs> constexpr auto operator->*(Fs&& f) const {
auto curry = [&](auto&&... vs) {return std::visit(std::forward<Fs>(f), vs...); };
return std::apply(curry, std::move(vs));
}
};

template<class... Ts> matcher(Ts&&...)->matcher<Ts&&...>;
#define Match(...) matcher{__VA_ARGS__}->* overload

int main() {
std::vector<std::variant<int, double, std::string>> vec{1, 1.0, "ljslkfjskd"};
for (auto& it : vec) {
Match(it) {
[](auto&& x) {std::cout << "unknow type!" << std::endl; },
[](int x) {std::cout << "int: " << x << std::endl; },
[](double x) {std::cout << "double: " << x << std::endl; },
};
}
}

《C++ 并发编程实战》读书笔记(2)

等待一个具有超时条件的条件变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#include <condiiton_variable>
#include <mutex>
#include <chrono>
std::condition_variable cv;
bool done;
std::mutex m;

bool wait_loop(){
auto const timeout = std::chrono::steady_clock::now() + std::chrono::milliseconds(500);
std::unique_lock<std::mutex> lk(m);
while(!done) {
if(cv.wait_until(lk, timeout) == std::cv_status::timeout) break;
}
return done;
}

一个简单的启动线程的实现,不推荐实际使用, 因为创建一个线程时间成本很高。

1
2
3
4
5
6
7
8
9
10
template<typename F, typename A>
std::future<std::result_of<F(A&&)>::type>
spawn_task(F&& f, A&& a) {
typedef std::result_of<F(A&&)>::type result_type;
std::packaged_task<result_type(A&&)> task(std::move(f));
std::future<result_type> res(task.get_future());
std::thread t(std::move(task), std::move(a));
t.detach();
return res;
}

内存模型

C++程序中的所有数据均是对象(object)组成的。这并不是说你可以创建一个派生自int的新类,或是基本类型具有成员函数。这只是一句关于C++中数据的构造块的一种陈述。C++标准定义对象那个为存储区域,尽管它会为这些对象分配属性,如它们的类型和生存周期。
无论什么类型,对象均被存储与一个或多个内存位置中。每个内存位置要么是一个标量类型的对象,比如unsigned shortmy_class*, 要么是相邻位域的序列。如果使用位域,有非常重要的一点必须注意:虽然相邻的位域是不同的对象,但他们仍然算作相同的内存位置。

  • 每个变量都是一个对象,包括其他对象的成员。
  • 每个对象占据至少一个内存位置。
  • intchar这样的基本类型的变量恰好一个内存位置,不论其大小,即使它们相邻或是数组的一部分。
  • 相邻的位域是相同内存位置的一部分。

原子操作

原子操作是一个不可分割的操作。从系统中的任何一个线程中,你都无法观察到一个完成到一半的这种操作,他要么做完了要么没做完。

传统意义上,标准原子类型是不可复制且不可赋值的,因为它们没有拷贝构造函数和拷贝赋值运算符。但是,它们确实支持从相应的内置类型的赋值进行隐式转换并赋值。由于他是一个泛型类模板,操作只限为load()store()exchange()compare_exchange_weak()compare_exchange_strong()
在原子类型上的每一个操作均具有一个可选的内存顺序参数,它可以用来指定所需的内存顺序语义。顺序运算分为三种类型:

  • 存储(store)操作, 可以包括memory_order_relaxedmemory_order_releasememory_order_seq_cst顺序
  • 载入(load)操作,可以包括memory_order_relaxedmemory_order_consumememory_order_acquirememory_order_seq_cst顺序。
  • 读-修改-写(read_modify_write)操作, 可以包括memory_order_relaxedmemory_order_consumememory_order_acquirememory_order_releasememory_order_acq_relmemory_order_seq_cst顺序。
    所有操作的默认顺序为memory_order_seq_cst

使用std::atomic_flag实现一个自旋锁

1
2
3
4
5
6
7
8
9
10
11
12
13
class spinlock_mutex
{
public:
spinlock_mutex():flag(ATOMIC_FLAG_INIT){}
void lock() {
while(flag.test_and_set(std::memory_order_acquire));
}
void unlock(){
flag.clear(std::memory_order_release);
}
private:
std::atomic_flag flag;
};

根据当前值存储一个新值

这个新的操作被称为比较/交换,它以compare_exchange_weak()compare_exchange_strong()成员函数形式出现。
比较/交换操作是是使用原子类型编程的基石,它比较原子变量值和所提供的期望值,
如果两者相等,则存储提供的期望值。
如果两者不等,则期望值更新为原子变量的实际值。
比较/交换函数的返回类型为bool, 如果执行了存储即为true, 反之则为false

对于compare_exchange_weak(),
即使原始值等于期望值也可能出现存储不成功,在这种情况下变量的值是不变的, compare_exchange_weak()的返回值为false
这最有可能发生在缺少单个的比较并交换指令的机器上,此时处理器无法保证该操作被完成–这就是所谓的伪失败,因为失败的原因是时间的函数而不是变量的值。
由于compare_exchange_weak()可能会伪失败,它通常必须用在循环中。

另一方面,仅当实际值不等于expected值时compare_exchange_strong()才保证返回false。这样可以消除对循环的需求。

如果你想要改变变量,无论其初始值是多少, expected的更新就变得很有用,每次经过循环时,excepted被重新载入,因此如果没有其他线程在此期间修改其值,那么compare_exchange_weak()compare_exchange_strong()的调用在下一次循环中应该是成功的。

如果计算待存储的值很简单,为了避免在compare_exchange_weak()可能会伪失败的平台上的双重循环,(于是compare_exchange_strong包含一个循环), 则使用compare_exchange_weak()可能是有好处的。另一方面,
如果计算待存储的值本身是耗时的,当expected值没有变化时,使用compare_exchange_strong()来避免被迫重新计算待存储的值可能时有意义的。对于std::atomic<bool>而言这并不重要,毕竟只有两个值,但对于较大的原子类型这会有所不同。

比较/交换函数还有一点非同寻常,他们可以接受两个内存顺序参数。这就允许内存顺序的语义在成功和失败的情况下有所区别。对于一次成功调用具有memory_order_acq_rel语义而一次失败的调用有着memory_order_relaxed语义,这想必是极好的。一次失败的比较/交换并不进行存储,因此它无法具有memory_order_releasememory_order_acq_rel语义。因此再失败时禁止提供这些值作为顺序。你也不应为失败提供比成功更严格的内存顺序。如果你希望memory_order_acquire或者memory_order_seq_cst作为失败的语义,你也必须为成功指定这些语义。

如果你没有为失败指定一个顺序,就会假定它与成功是相同的,除了顺序的release部分被除去:memory_order_release变成memory_order_relaxed, memory_order_acq_rel变成memory_order_acquire。如果你都没有指定,他们它们通常默认为memory_order_seq_cst, 这为成功和失败都提供了完整的序列顺序。以下对compare_exchange_weak()的两个调用时等价的。

1
2
3
4
std::atomic<bool> b;
bool expected;
b.compare_exchange_weak(expected, true, memory_order_acq_rel, memory_order_acquire);
b.compare_exchange_weak(exprected, true, memory_order_acq_rel);

快速排序

快速排序并行版实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
template<typename ForwardIt>
inline void quick_sort(ForwardIt first, ForwardIt last) {
if(first == last) return;
decltype(std::distance(first, last)) temp = rand() % std::distance(first, last);
auto pivot = *std::next(first, temp);

auto middle1 = std::partition(std::execution::par, first, last, [pivot](const auto& em){
return em < pivot;
});

auto middle2 = std::partition(std::execution::par, middle1, last, [pivot](const auto& em){
return !(pivot < em);
});

quick_sort(first, middle1);
quick_sort(middle2, last);
}

使用要求:

  1. 必须是前进迭代器
  2. 元素必须是可比较的,或有operator<运算符重载

《C++ 并发编程实战》读书笔记(1)

在交换操作中使用std::lock()std::lock_guard

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class some_big_object;

class X {
private:
some_big_object some_detail;
std::mutex m;
public:
X(some_big_object const& sd) : some_detail(sd) {}

friend void swap(X& lhs, X& rhs) {
if(&lhs == &rhs) {
return;
}
std::lock(lhs.m, rhs.m); ///< 这一行执行完后,两个锁都已经加锁了
std::lock_guard<std::mutex> lock_a(lhs.m, std::adopt_lock); ///< 获取这个锁便于函数执行完毕后再解锁
std::lock_guard<std::mutex> lock_b(rhs.m, std::adopt_lock);
swap(lhs.some_detail, rhs.some_detail); ///< 交换数据
}
};

避免死锁的提示

  1. 避免嵌套锁。
    如果你已经持有一个锁,就别再获取其他锁, 原因很简单因为每个线程仅持有一个锁。如果需要获取多个锁,就使用std::lock()这样单个动作来执行。

  2. 在持有锁时,避免调用用户提供的代码。
    因为代码是程序员写的,你不知道它会做什么。如果用户提供的代码也在获取一个锁的话,可能导致死锁。

  3. 以固定的顺序获取锁
    如果你绝对需要获取两个或更多的锁,并且不能以std::lock的单个操作取得,次优的做法是在每个线程中以相同的顺序获取它们。
    见下面例子定义的层级锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
class hierarchical_mutex {
public:
explicit hierarchical_mutex(const unsigned long value)
: hierarchy_value_(value), previous_hierarchy_value_(0) {}

/// @name lock
/// @brief 用来锁定自己的函数,
/// 锁定顺序依照本线程中小的数值先锁定,到锁定大的数值
/// 如果顺序反过来则会抛出异常
///
/// @author Lijiancong, pipinstall@163.com
/// @date 2020-02-17 13:22:50
/// @warning 线程安全
void lock() {
check_for_hierarchy_violation();
internal_mutex_.lock();
update_hierarchy_value();
}

void unlock() {
this_thread_hierarchy_value_ = previous_hierarchy_value_;
internal_mutex_.unlock();
}

bool try_lock() {
check_for_hierarchy_violation();
if (!internal_mutex_.try_lock()) return false;
update_hierarchy_value();
return true;
}

private:
std::mutex internal_mutex_;

unsigned long const hierarchy_value_;
unsigned long previous_hierarchy_value_;
/// thread_local 变量会在每个线程都有一个实例
inline static thread_local unsigned long this_thread_hierarchy_value_ = 0;

void check_for_hierarchy_violation() {
if (this_thread_hierarchy_value_ >= hierarchy_value_) {
throw std::logic_error("mutex hierarchy violated");
}
}

void update_hierarchy_value() {
previous_hierarchy_value_ = this_thread_hierarchy_value_;
this_thread_hierarchy_value_ = hierarchy_value_;
}
};

在交换操作中使用std::lock()std::unique_lock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class some_big_object;

class X {
private:
some_big_object some_detail;
std::mutex m;
public:
X(some_big_object const& sd) : some_detail(sd) {}

firend void swap(X& lhs, X& rhs) {
if(&lhs == & rhs) return;
std::unique_lock<std::mutex> lock_a(lhs.m, std::defer_lock); ///< 获取并延迟(defer)锁定
std::unique_lock<std::mutex> lock_b(rhs.m, std::defer_lock);
std::lock(lock_a, lock_b); ///< 锁定两个锁
swap(lhs.some_detail, rhs.some_detail);
}
};

使用std::call_once的线程安全的类成员延迟初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class X {
private:
connection_info connection_details;
connection_handle connection;

std::once_flag connection_init_flag;

void open_connection() {
connection = connection_manager.open(connection_details);
}
public:
X(connection_info const& connection_details_) : connection_details(conection_details_) {
}

void send_data(data_packet const& data) {
std::call_once(connection_init_flag, &X::open_connection, this);
connection.send_data(data);
}

data_packet receive_data() {
std::call_once(connection_init_flag, &X::open_connection, this);
return connection.receive_data();
}
};

c++11中,初始化被定义在只发生在一个线程上,并且其他线程不可以继续直到初始化完成,所以竞争条件仅仅在于哪个线程会执行初始化,而不会有更多别的问题。对于需要单一全局实例的场合,这可以用作std::call_once的替代品。

1
2
3
4
5
class my_class;
my_class& get_my_class_instance() {
static my_class instance;
return instance;
};

使用std::condition_variable 等待数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
template<typename T>
class threadsafe_queue{
private:
mutable std::mutex mut;
std::queue<T> data_queue;
std::condition_variable data_cond;
public:
threadsafe_queue() = default;
threadsafe_queue(threadsafe_queue const& other) {
std::lock_guard<std::mutex> lk(other.mut);
data_queue = other.data_queue;
}

void push(T new_value) {
std::lock_guard<std::mutex> lk(mut);
data_queue.push(new_value);
data_cond.notify_one();
}

void wait_and_pop(T& value) {
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk,[this]{return !data_queue.empty();});
value = data_queue.front();
data_queue.pop();
}

std::shared_ptr<T> wait_and_pop() {
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk, [this]{return !data_queue.empty();});
std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
data_queue.pop();
return res;
}

bool empty() const {
std::lock_guard<std::mutex> lk(mut);
return data_queue.empty();
}
};

LRU的简单实现

最近最少使用缓存(LRU),该缓存会删除最近最少使用的项目。缓存应该从键映射到值(允许你插入和检索特定键对应的值),并在初始化时指定最大容量。当缓存被填满时,它应该删除最近最少使用的项目。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/// 最近最少使用缓存(LRU),该缓存会删除最近最少使用的项目。缓存应该从键映射到值(允许你插入和检索特定键对应的值),并在初始化时指定最大容量。当缓存被填满时,它应该删除最近最少使用的项目。
class LRUCache {
public:
LRUCache(int capacity) : max_size(capacity) {
}
/// 获取数据 get(key) - 如果密钥 (key) 存在于缓存中,则获取密钥的值(总是正数),否则返回 -1。
int get(int key) {
auto res = map.find(key);
if(res != map.end()){
list.splice(list.begin(), list, res->second);
return res->second->second;
} else {
return -1;
}
}

/// 写入数据 put(key, value) - 如果密钥不存在,则写入其数据值。当缓存容量达到上限时,它应该在写入新数据之前删除最近最少使用的数据值,从而为新的数据值留出空间。
void put(int key, int value) {
auto res = map.find(key);
list.push_front(std::make_pair(key, value));
if(res != map.end()){
list.erase(res->second);
map.erase(res);
}
map[key] = list.begin();

if(map.size() > max_size){
auto last = list.end();
--last;
map.erase(last->first);
list.pop_back();
}
}

public:
std::unordered_map<int, std::list<std::pair<int, int>>::iterator> map;
std::list<std::pair<int, int>> list;
int max_size = 0;
};

跳表简单实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
struct Node {
Node(int i, Node *r, Node *d) : val(i), right(r), down(d) {}
int val;
Node *right;
Node *down;
};
class Skiplist {
public:
Skiplist() {}

bool search(int target) {
auto cur = head;
while (cur) {
while (cur->right && cur->right->val < target) {
cur = cur->right;
}
if (cur->right && cur->right->val == target) {
return true;
}
cur = cur->down;
}
return false;
}

void add(int num) {
int r_level = 1;
while (r_level <= max_level && (rand() & 1 == 0)) {
++r_level;
}
if (r_level > max_level) {
max_level = r_level;
head = new Node(num, nullptr, head);
}
Node *cur = head;
Node *last = nullptr;
for (int l = max_level; l >= 1; --l) {
while (cur->right && cur->right->val < num) {
cur = cur->right;
}
if (l <= r_level) {
cur->right = new Node(num, cur->right, nullptr);
if (last) {
last->down = cur->right;
}
last = cur->right;
}
cur = cur->down;
}
}

bool erase(int num) {
auto cur = head;
bool seen = false;
for (int l = max_level; l >= 1; --l) {
while (cur->right && cur->right->val < num) {
cur = cur->right;
}
if (cur->right && cur->right->val == num) {
seen = true;
Node *temp = cur->right;
cur->right = cur->right->right;
delete temp;
}
cur = cur->down;
}
return seen;
}

private:
int max_level = 0;
Node *head = nullptr;
};

C++ 回调函数示例

简单示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#include <functional>
#include <iostream>

void print() { std::cout << "print()" << std::endl; }

void callback(std::function<void()> func) {
std::cout << "Enter: callback()" << std::endl;
func();
std::cout << "Leave: callback()" << std::endl;
}

int main() {
callback(print);
system("pause");
}

输出:

Enter: callback()

print()

Leave: callback()

接下来我们把这两个函数放入类中实现,在调用的时候绑定函数名和其对应实例就可以按以上例子方法调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#include <functional>
#include <iostream>

class operation {
public:
void print() { std::cout << "print()" << std::endl; }
};

class controller {
public:
void callback(std::function<void()> func) {
std::cout << "Enter: callback()" << std::endl;
func();
std::cout << "Leave: callback()" << std::endl;
}
};

int main() {
controller control;
operation op;
/// 绑定实例和对应的操作函数
auto f = std::bind(&operation::print, &op);
control.callback(f);
system("pause");
}

现在我们把绑定函数对象的过程封装起来

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#include <functional>
#include <iostream>

class operation {
public:
auto get_print_function() { return std::bind(&operation::print, this); }
void print() { std::cout << "print()" << std::endl; }
};

class controller {
public:
void callback(std::function<void()> func) {
std::cout << "Enter: callback()" << std::endl;
func();
std::cout << "Leave: callback()" << std::endl;
}
};

int main() {
controller control;
operation op;
control.callback(op.get_print_function());
system("pause");
}

数据结构

字符串

字符串是不可变字节(byte)序列,其本身是一个符合结构.

1
2
3
4
type stringStruct struct {
str unsafe.Pointer
len int
}

头部指针指向字节数组,但没有NULL结尾。默认以UTF-8编码存储Unicode字符,字面量里允许使用十六进制、八进制和UTF编码格式。

内置函数len返回字节数组长度,cap不接受字符串类型参数。

字符串默认值不是nil, 而是"".

使用for遍历字符串是,分byterune两种方式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func main () {	// byte
s:="李建聪"
for i := 0; i < len(s); i++ {
fmt.Printf("%d: [%c]\n", i, s[i])
}


for i, c := range s { // rune: 返回数组索引号,以及Unicode字符
fmt.Printf("%d: [%c]\n", i, c)
}
}
///0: [æ]
///1: [
///2: []
///3: [å]
///4: [»]
///5: [º]
///6: [è]
///7: []
///8: [ª]
///0: [李]
///3: [建]
///6: [聪]

要修改字符串,须将其转换为可变类型([]rune[]byte), 待完成后再转换回来。但不管怎么转换,都须重新分配内存,并复制数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func pp(format string, ptr interface{}) {
p := reflect.ValueOf(ptr).Pointer()
h := (*uintptr)(unsafe.Pointer(p))
fmt.Printf(format, *h)
}

func main () { // byte
s:="hello, world!"
pp("s: %x\n", &s)

bs := []byte(s)
s2 := string(bs)

pp("string to []byte, bs:%x\n", &bs)
pp("[]byte to string, s2:%x\n", &s2)

rs := []rune(s)
s3 := string(rs)

pp("string to []byte, rs:%x\n", &rs)
pp("[]byte to string, s3:%x\n", &s3)
}
/// 输出:
/// s: 2bcc91
/// string to []byte, bs:c0000a20a0
/// []byte to string, s2:c0000a20b0
/// string to []byte, rs:c0000b80c0
/// []byte to string, s3:c0000a20d0

编译器会为了某些场合进行专门优化,避免额外分配和复制操作:

  • []byte转换为string key, 去map[string]查询的时候。
  • string转换为[]byte, 进行for range迭代时,直接取字节赋值给局部变量。

除了类型转换外,动态构建字符串也容易造成性能问题。
用加法操作符拼接字符串时,每次都须重新分配内存。如此,在构建超大字符串时,性能就显得极差。
改进思路时预分配i足够大的空间。常用方法是用string.Join函数,他会统计所有参数长度,并一次性完成内存分配操作。
另外
utf8.ValidString(s) 返回s是不是一个有效的字符串
utf8.RuneCountInString(s) 替代len返回unicode的字符数量