《C++ 并发编程实战》读书笔记(4)

1 分钟阅读

屏障

屏障的这些操作可以强制内存顺序约束,而无需修改任何数据,并且与使用memory_order_relaxed顺序约束的原子操作组合起来使用。屏障是全局操作,能在执行该屏障的线程里影响其他原子操作的顺序。 屏障一般也被称为内存障碍(memory barriers), 它们之所以这样命名, 是因为他们在代码中放置了一行代码,是的特定的操作无法穿越。 在独立变量上的松散操作通常可以自由地被编译器或硬件重新排序。屏障限制了这一自由,并且在之前并不存在的地方引入happens-beforesynchronizes-with关系。

松散操作可以使用平常排序

#include <atomic>
#include <thread>
#include <cassert>
#include <string>
#include <vector>

std::atomic<bool> x, y;
std::atomic<int> z;

void write_x_then_y() {
  x.store(true, std::memory_order_relaxed);
  std::atomic_thread_fence(std::memory_order_relaxed);
  y.store(true, std::memory_order_relaxed);
}

void read_y_then_x() {
  while (!y.load(std::memory_order_relaxed));
  std::atomic_thread_fence(std::memory_order_acquire);
  if (x.load(std::memory_order_relaxed)) ++z;
}

int main() {
  x = false;
  y = false;
  z = 0;
  std::thread a(write_x_then_y);
  std::thread b(read_y_then_x);
  a.join();
  b.join();
  assert(z.load() != 0);
}

并发数据结构

数据结构的线程安全的基本原理

  • 保证当数据结构不变性被别的线程破坏时的状态不被任何别的线程看到。
  • 注意避免数据结构结构所固有的竞争现象,通过为完整操作提供函数,而不是提供操作步骤。
  • 注意当出现例外时,数据结构是怎么样来保证不变性不被破坏。
  • 当使用数据结构时,通过限制锁的范围和避免使用嵌套锁,来降低产生死锁的机会。

第二个要考虑的是实现真正的并发读取

  • 锁的范围能否被限定,使得一个操作的部分可以在锁外被执行?
  • 数据结构的不同部分能否被不同的互斥元保护?
  • 能否所有操作需要同样级别的保护?
  • 数据结构的一个小改变能否在不影响操作语义情况下提高并发机会?

无锁数据结构:

对于有资格称为无锁的数据结构,就必须能够让多于一个线程可以并发的访问此数据结构。这些线程不需要做相同的操作,无所队列可以允许一个线程push的同时,另一个线程pop,到那时如果两个吸纳成同时试图插入新数据的时候,就会打破无锁队列。不仅如此,如果一个访问数据结构的线程在操作中途被调度器挂起的话别的线程必须仍然能够完成操作而无需等待挂起的线程。

无等待数据结构

无等待的数据结构是一种无锁的数据结构,并且有着额外的特性,每个访问数据结构的线程都可以在有限数量的步骤内完成它的操作,而不用管别的线程的行为,因为其他线程的冲突而可能卷入无限次重试的算法不是无等待。 为了确保每个线程都能够在有限步骤内完成它的操作,就必须保证每个操作都可以在一个操作周期内执行,并且一个线程执行的操作不会导致另一个线程上操作的失败。

无锁数据结构的优点与缺点

使用无锁数据结构的最主要的原因就是为了实现最大程度的并发。对于基于锁的容器,总是有可能一个线程必须阻塞,并在可以继续前等待另一个线程完成其操作。互斥元锁的目的就是通过互斥来阻止并发。 使用无锁数据结构的第二个原因是健壮性。当一个线程在持有锁的时候终止,哪个数据结构就永远被破坏了。但是如果一个线程在操作无所数据结构时终止了,就不会丢失任何数据。

因为不使用任何锁,因此无锁数据结构是不会发生死锁的,尽管有可能存在活锁. 当两个线程都试图修改数据结构,但是对于每个线程来说,另一个线程所作的修改都会要求此线程的操作重新被执行,因此这两个线程都会一直循环和不断尝试,在这种情况下就会发生活锁。除非某个线程先到达(通过协议,通过更快或完全靠运气), 不然此循环会一直继续下去。活锁通常是短暂的,因为它们取决于线程的精确调度,因此活锁会降低性能而不会导致长期问题。 这就是无锁数据结构的缺点,尽管它可以增加在数据结构上操作的并发能力,并且减少了线程等待的时间,但是它可能降低整体性能。首先,无锁代码使用的原子操作可能比非原子操作要慢很多。

template<typename T>
class lock_free_stack {
  private:
  struct node {
    T data;
    node* next;
    node(T const& data_) : data(data_) {}
  };
  std::atomic<node*> head;
  public:
  void push(T const& data){
    node* const new_node = new node(data);
    new_node->next = head;
    while(!head.compare_exchange_weak(new_node->next, new_node));
  }
  void pop(T& result){
    node* old_head = head.load();
    while(!head.compare_exchange_weak(old_head, old_head->next));
    result = old_head->data;
  }
}
template<typename T>
class lock_free_stack {
private:
  std::atomic<unsigned> threads_in_pop;
  std::atomic<node*> to_be_deleted;

  void try_reclaim(node* old_head) {
    if (threads_in_pop == 1) {
      node* nodes_to_delete = to_be_deleted.exchange(nullptr);
      if (!--threads_in_pop) {
        delete_nodes(nodes_to_delete);
      }
      else if (nodes_to_delete) {
        chanin_pending_nodes(nodes_to_delete);
      }
    }
  }
  static void delete_node(node* nodes) {
    while (nodes) {
      node* next = nodes->next;
      delete nodes;
      nodes = next;
    }
   }
public:
  std::shared_ptr<T> pop() {
    ++threads_in_pop;
    node* old_head = head.load();
    while (old_head && !head.compare_exchange_weak(old_head, old_head->next));
    std::shared_ptr<T> res;
    if (old_head) {
      res.swap(old_head->data);
    }
    try_reclaim(old_head);
    return res;
  }
};