当前位置:   article > 正文

C++并发编程(七)无锁数据结构_c++ 无锁编程

c++ 无锁编程

目录

1.非阻塞型数据结构

1.1无锁数据结构

1.2免等数据结构(wait-free data structure)

1.3优点和缺点

2.无锁数据结构范例

2.1线程安全的无锁栈

2.2在无锁数据结构中管理节点

2.3运用风险指针检测无法回收的节点

2.4引用计数检测正在使用的节点

2.5无锁栈容器施加内存模型

3.线程安全的无锁队列

4.无锁数据结构的设计原则

小结


使用了互斥、条件变量或future操作进行同步操作的算法或数据结构,称为阻塞型算法或阻塞型数据结构。

操作系统会把阻塞的线程暂停,并将时间片分配给其它线程,等有线程执行了恰当操作(包括释放互斥、指挥条件变量或未future对象装填结果值),阻塞方被解除。

线程/函数类型:
无阻碍(obstruction-free):其它线程全部暂停,则目标线程将在有限步骤内完成自己的操作。

无锁(lock-free):如果多个线程共同操作同一份数据,那么在有限步骤内,其中某一线程能够完成自己的操作。

免等(wait-free):在某份数据上,每个线程经过有限步骤能完成自己的操作,即便该数据同时被其它多个线程操作。

无阻碍算法在大多时候并不实用,因为线程全部暂停的情况极少出现。

1.非阻塞型数据结构

  1. class spinlock_mutex
  2. {
  3. std::atomic_flag flag;
  4. public:
  5. spinlock_mutex() :flag(ATOMIC_FLAG_INIT) {}//初始化原子标志
  6. void lock()
  7. {
  8. while (flag.test_and_set(std::memory_order_acquire));//循环尝试锁住互斥,返回false说明线程已将标志设置为成立
  9. }
  10. void unlock()
  11. {
  12. flag.clear(std::memory_order_release);//标志位置0,解锁互斥
  13. }
  14. };

自旋锁中,lock()不断循环,直到test_and_set()返回false,代码在循环中“自旋”。

没有调用任何阻塞型函数,因此这一互斥保护数据的线程不会发生阻塞。

1.1无锁数据结构

如果某数据结构具有无锁特性,它应该支持多个线程同时访问,但每个线程执行的操作 不一定相同。

当前线程更新目标数据结构的过程中,其它线程可能同时改动数据结构,故需要比较-交换操作判定是否出现这种情形。若出现,则当前线程需要重新执行其它操作,再次进行比较-交换操作。因此,比较-交换操作通常含有循环。此时,若其它线程被暂停运行,比较-交换操作得以成功执行,则这份代码是无锁实现;否则,就成为自旋锁,虽不会阻塞,也不属于无锁实现。

无锁算法含有上述循环,可能令某一线程总在错误的时机执行操作,结果被迫终止并重新执行,导致其“受饿”(starvation)。免等数据结构和无锁数据结构能避免这一问题。

1.2免等数据结构(wait-free data structure)

如果一个免等数据结构被多个线程访问,不论其它线程发生了什么,每个线程都能在有限步骤内完成自己操作。

正确写出较为困难(容易沦为自旋锁),要确保各项操作在一次执行中顺利完成,且每个步骤都不会导致别的线程操作失败。

1.3优点和缺点

原因1(出发点):最大限度实现并发。无锁数据结构总存在某个线程能执行下一步操作。

原因2:代码健壮性。假设数据结构的写操作受锁保护,如果线程在持锁时间终止,那么数据结构仅完成了部分改动,且此后无从修补。若是无锁数据意外终止,则丢失的数据仅限于它持有的部分,其它数据仍完好。

缺点:无锁数据结构不含锁,不会出现死锁,但可能出现“活锁”(live lock)。若两个线程同时更改同一份数据结构,若它们所做的改动都导致对方从头开始操作,那双方会反复循环,不断重试,称为“活锁”。   这一缺点会降低代码效率:虽然提高了操作同一数据结构的并发度,缩短了单个线程因等待而消耗的时间,却也可能降低整体性能。对基于锁的数据结构,其原子操作仅涉及互斥的加锁行为,而无锁结构则更多。

2.无锁数据结构范例

无锁数据结构依赖于原子操作,为了易于分析,我们先全部采用memory_order_seq_cst次序,本节范例不会出现互斥锁,但有一点:只有原子标志std::atomic_flag的实现可以保证无锁。在部分平台上,有些操作看似无锁,却用到了C++库的内部锁。

2.1线程安全的无锁栈

必须保证,一但线程将一项数据加入栈容器中,就能立即安全地被另一线程取出,只有唯一一个线程能获取该项数据。可以用链表实现,head指针指向第一个节点,各节点内的next依次指向后续节点。

单线程添加节点的基本流程:
1.创建新节点

2.新节点的成员指针next指向当前的头节点

3.head指针指向新节点

若两个线程同时添加节点,步骤2和步骤3之间会产生数据竞争:假设某线程完成了步骤2,但尚未完成3,头节点未来得及更新,此时,另一线程有可能抢先改动head指针。

需要留意head的更新:必须先准备好新节点,才可以设置指针head指向它,之后再也无法修改该节点。

可以采用原子化的比较-交换操作,保证next指针与head指针相同:

  1. template<typename T>
  2. class lock_free_stack
  3. {
  4. private:
  5. struct node
  6. {
  7. T data;
  8. node* next;
  9. node(const T& data) :data(_data) {}
  10. };
  11. std::atomic<node*> head;
  12. public:
  13. void push(const T& data)
  14. {
  15. const node* new_node = new node(data);
  16. new_data->next = head;
  17. while (!head.compare_exchange_weak(new_node->next, new_node));
  18. }
  19. };

异常安全:可能抛出异常的是新节点构建处,节点本身会清理,且栈容器尚未改动。

单线程弹出操作基本流程:
1.读取head当前值
2.读取head->next
3.将head的值改为head->next的值
4. 弹出栈顶节点,获取其所含数据data并返回
5.删除已弹出的节点

多线程中,若多个线程弹出,可能读出同一个head指针的值,返回值取同一个节点,违背了栈的设计意愿。或者一个线程顺利完成5个步骤,另一线程才开始 执行2,则将对悬空指针解引用。

暂且忽略内存泄漏,可以按照push操作的设计思路,采用比较-交换操作更新head指针:

  1. template<typename T>
  2. class lock_free_stack
  3. {
  4. public:
  5. void pop(T& result)
  6. {
  7. node* old_head = head.load();
  8. while (!head.compare_exchange_weak(old_head, head->next));
  9. result = old_head->data;
  10. }
  11. };

上述简短的代码没有处理空栈,需要在while循环中查验head指针是否为nullptr,并针对空栈抛出异常。
向函数传入引用的方法不适用,仅当确认只有一个线程弹出节点时,复制数据才是安全行为,我们可以返回一个智能指针,指向所获取的值。
若pop()返回智能指针,就要在堆数据段上为节点分配内存,在pop()调用过程中分配堆内存,可能会抛出异常,我们可以在push处分配内存,因为push函数内,新节点本身就要分配内存。

  1. template<typename T>
  2. class lock_free_stack
  3. {
  4. private:
  5. struct node
  6. {
  7. std::shared_ptr<T> data;//通过指针持有数据
  8. node* next;
  9. node(const T& data) :data(std::make_shared<T>(data)) {}//依据型别T分配内存,须在堆数据段为数据分配内存
  10. };
  11. std::atomic<node*> head;
  12. public:
  13. void push(const T& data)
  14. {
  15. const node* new_node = new node(data);
  16. new_node->next = head.load();
  17. while (!head.compare_exchange_weak(new_node->next, new_node));
  18. }
  19. std::shared_ptr<T> pop()
  20. {
  21. node* old_head = head.load();
  22. while (old_head && !head.compare_exchange_weak(old_head, old_head->next));//判定old_head非空
  23. return old_head ? old_head->data : std::shared_ptr<T>();
  24. }
  25. };

上述代码虽是无锁实现,但并非免等,compare_exchaneg_weak()若总是false,理论上会导致push和pop中的while循环持续进行。

2.2在无锁数据结构中管理节点

 若有多个线程同时调用pop(),需要采取某种跟踪措施,判定何时才可以安全地删除节点,避免内存泄漏。我们可以维护一个“等待删除链表”,在pop()中设置原子化计数器,每次执行pop()都自增,在离开函数时自减,当计数器为0时,就能安全地删除节点。

  1. template<typename T>
  2. class lock_free_stack
  3. {
  4. private:
  5. std::atomic<unsigned> threads_in_pop;
  6. void try_reclaim(node* old_head);
  7. public:
  8. std::shared_ptr<T> pop()
  9. {
  10. ++threads_in_pop;
  11. node* old_head = head.load();
  12. while (old_head && !head.compare_exchange_weak(old_head, old_head->next));
  13. std::shared_ptr<T> res;
  14. if (old_head)
  15. {
  16. res.swap(old_head->data);//swap交换共享指针data来删除数据
  17. }
  18. try_reclaim(old_head);//删除old_head节点
  19. return res;
  20. }
  21. //引用计数的内存释放机制
  22. std::atomic<node*> to_be_deleted;
  23. static void delete_nodes(node* nodes)//删除链表所有节点
  24. {
  25. while (nodes)
  26. {
  27. node* next = nodes->next;
  28. delete nodes;
  29. nodes = next;
  30. }
  31. }
  32. template<typename T>
  33. void lock_free_stack<T>::try_reclaim(node* old_head)
  34. {
  35. if (threads_in_pop == 1)
  36. {
  37. //位置[1]
  38. //to_be_deleted被交换为nullptr,别的线程无法通过to_be_deleted指针操作候删链表,本线程通过nodes_to_delete独占候删链表
  39. node* nodes_to_delete = to_be_deleted.exchange(nullptr);//交换操作:赋值,返回原值
  40. if (!--threads_in_pop)//确保只有一个线程调用pop()
  41. {
  42. delete_nodes(nodes_to_delete);//若此时仍仅有一个线程调用,则可直接删除候删链表
  43. }
  44. else if (nodes_to_delete)//有新的线程调用pop(),且此时候删链表中已有节点,位置[1]处可能有新的节点已加入候删链表(调用else)
  45. {
  46. chain_pending_nodes(nodes_to_delete);//把新的候删链表节点加入到旧的候删链表末尾
  47. }
  48. delete old_head;//直接删除old_head,无需再加入候删链表
  49. }
  50. else
  51. {
  52. chain_pending_node(old_head);//将old_head加入候删链表
  53. --thread_in_pop;
  54. }
  55. }
  56. void chain_pending_nodes(node* nodes)
  57. {
  58. node* last = nodes;
  59. while (node* const next = last->next)//遍历到尾部
  60. {
  61. last = next;
  62. }
  63. chain_pending_nodes(nodes, last);
  64. }
  65. void chain_pending_nodes(node* first, node* last)
  66. {
  67. last->next = to_be_deleted;//在尾部接上新加入候删链表的节点
  68. while (!to_be_deleted.compare_exchange_weak(last->next, first));//更新to_be_deleted
  69. }
  70. void chain_pending_node(node* n)//把n插入到候删链表头部
  71. {
  72. chain_pending_nodes(n, n);
  73. }
  74. };

2.3运用风险指针检测无法回收的节点

假设当前线程要访问某对象,而该对象却将被别的线程删除,那就让前者设置一个指向目标对象的风险指针,以通知其它线程删除的风险。

  1. //风险指针实现pop()函数
  2. std::shared_ptr<T> pop()
  3. {
  4. std::atomic<void*>& hp = get_hazard_pointer_for_current_thread();
  5. node* old_head = head.load();
  6. do
  7. {
  8. node* temp;
  9. do
  10. {
  11. temp = old_head;//记录当前的old_head值
  12. hp.store(old_head);
  13. old_head = head.load();//若处理期间,head发生了改变,则old_head的值不等于原本值(精髓)
  14. }
  15. while (old_head != temp);//只有循环过程head值没有发生改变,才会结束循环
  16. }
  17. while (old_head && !head.compare_exchange_strong(old_head, old_head->next));//只有head没有发生改变,才会更改指针并结束循环,使用strong避免佯败
  18. //构思的简化版本
  19. // node* old_head
  20. //do
  21. //{
  22. // old_head = head.load();
  23. // hp.store(old_head);
  24. //}
  25. //while (old_head && !head.compare_exchange_strong(old_head, old_head->next));
  26. hp.store(nullptr);//指针更新完成后,将风险指针清零
  27. std::shared_ptr<T> res;
  28. if (old_head)
  29. {
  30. res.swap(old_head->data);
  31. if (outstanding_hazard_pointers_for(old_head))//核查是否被风险指针所
  32. {
  33. reclaim_later(old_head);//若有风险指针指涉,则留待稍后回收
  34. }
  35. else
  36. {
  37. delete old_head;
  38. }
  39. delete_nodes_with_no_hazards();//核查由reclaim_later()回收的节点,删除掉不再被风险指针指涉的节点
  40. }
  41. return res;
  42. }

其中,get_hazard_pointer_for_current_thread()用于为各线程的风险指针实例分配内存,忽略效率问题,采用简单的结构体,将线程id与风险指针配对,存储在定长数组中:

  1. //get_hazard_pointer_for_current_thread()的实现
  2. const unsigned max_hazard_pointers = 100;
  3. struct hazard_pointer
  4. {
  5. std::atomic<std::thread::id> id;
  6. std::atomic<void*> pointer;
  7. };
  8. hazard_pointer hazard_pointers[max_hazard_pointers];//风险指针数组
  9. class hp_owner
  10. {
  11. hazard_pointer* hp;
  12. public:
  13. hp_owner(const hp_owner&) = delete;
  14. hp_owner operator=(const hp_owner&) = delete;
  15. hp_owner():hp(nullptr)
  16. {
  17. for (unsigned int i = 0; i < max_hazard_pointers; ++i)
  18. {
  19. std::thread::id old_id;
  20. if (hazard_pointers[i].id.compare_exchange_strong(old_id, std::this_thread::get_id()))//查找未被占用的位置,并放入本线程id
  21. {
  22. hp = &hazard_pointers[i];
  23. break;
  24. }
  25. }
  26. if (!hp)
  27. {
  28. throw std::runtime_error("No hazard pointers avaliable!");//无法找到空闲位置
  29. }
  30. }
  31. std::atomic<void*>& get_pointer()
  32. {
  33. return hp->pointer;
  34. }
  35. ~hp_owner()
  36. {
  37. hp->pointer.store(nullptr);
  38. hp->id.store(std::thread::id());//让别的线程可以重用风险指针数组元素
  39. }
  40. };
  41. std::atomic<void*>& get_hazard_pointer_for_current_thread()
  42. {
  43. static thread_local hp_owner hazard;
  44. return hazard.get_pointer();
  45. }

对任一pop()线程,只要完成了hp_owner的创建,指针会被缓存,以后访问会较快。

outstanding_hazard_pointers_for()实现仅需遍历数组查验风险指针是否存在即可:

  1. bool outstanding_hazard_pointers_for(void* p)
  2. {
  3. for (unsigned i = 0; i < max_hazard_pointers; ++i)
  4. {
  5. if (hazard_pointers[i].pointer == p)
  6. return true;
  7. }
  8. return false;
  9. }

 reclaim_later()和delete_nodes_with_no_hazards()可以利用简单的链表实现,前者将节点加入链表,后者扫描整个链表,删除未被风险指针指涉的结点:

  1. template<typename T>
  2. void do_delete(void* p)
  3. {
  4. delete static_cast<T*> (p);
  5. }
  6. //链表节点
  7. struct data_to_reclaim
  8. {
  9. void* data;
  10. std::function<void(void*)> deleter;//包装实际删除函数do_delete,在析构函数中被调用
  11. data_to_reclaim* next;
  12. template<typename T>
  13. data_to_reclaim(T* p):
  14. data(p),
  15. deleter(&do_delete<T>),
  16. next(0)
  17. { }
  18. ~data_to_reclaim()
  19. {
  20. deleter(data);
  21. }
  22. };
  23. //链表
  24. std::atomic<data_to_reclaim*> nodes_to_reclaim;
  25. //将节点加入链表头部
  26. void add_to_reclaim_list(data_to_reclaim* node)
  27. {
  28. node->next = nodes_to_reclaim.load();
  29. while (!nodes_to_reclaim.compare_exchange_weak(node->next, node));//更新链表
  30. }
  31. //通用接口,将数据装入链表或将节点装入链表
  32. template<typename T>
  33. void reclaim_later(T* data)
  34. {
  35. add_to_reclaim_list(new data_to_reclaim(data));
  36. }
  37. void delete_nodes_with_no_hazards()
  38. {
  39. data_to_reclaim* current = nodes_to_reclaim.exchange(nullptr);//先收归当前线程所有,在当前节点删除过程中,其它线程可以往链表上增加节点
  40. while (current)
  41. {
  42. data_to_reclaim* next = current->next;
  43. if (!outstanding_hazard_pointers_for(current->data))
  44. {
  45. delete current;
  46. }
  47. else
  48. {
  49. add_to_reclaim_list(current);//放回到候删链表
  50. }
  51. current = next;
  52. }
  53. }

上述方法尽管可以安全地回收节点,但会造成大量开销,因为每次调用pop()都得检查max_hazard_pointers数量节点的原子变量。

改进1:用存储空间换效率,我们不必每次pop()都删除底层节点,仅当链表中节点数量超过max_hazard_pointers时进行删除,在pop()调用max_hazard_pointers后,仍然需要每次查验节点,所以程序性能不会有太大改善。
缺点:候删链表变长,等待回收的节点数增多;需另外设置原子计数器对链表中的节点数计数;多线程访问候删链表会形成条件竞争。

改进2:让各线程独立维护自己的候删链表,这样做不必设置计数器,也消除了条件竞争,但需要分配更多节点(mXm),若线程没回收自己的节点却需要退出运行,则节点转到全局链表中,再转到下一个局部链表。

IBM公司对风险指针申请了专利,需要正当授权。或遵从GBL协议开发自由软件,符合IBM公司禁止主张专利权益条款。

接下来介绍还没注册专利的内存回收技术:引用计数。

2.4引用计数检测正在使用的节点

安全地删除节点涉及一个问题:检测哪些节点仍被执行读操作的线程访问。引用计数对各节点维护一个计数器,记录访问该节点的线程数目。

我们可以使用std::shared_ptr<>作为计数器,毕竟本来就是基于引用计数的指针。但是在某些平台上,其操作不一定通过无锁方式实现,可以使用is_lock_free(&some_shared_ptr)进行判别。
(在最后一个指针销毁之际,需要清零该节点的next指针。)

首先看一下基于std::shared_ptr<>的无锁栈容器:

  1. //基于std::shared_ptr<>的无锁栈容器
  2. template<typename T>
  3. class lock_free_stack
  4. {
  5. private:
  6. struct node
  7. {
  8. std::shared_ptr<T> data;
  9. std::shared_ptr<node> next;
  10. node(const T& data) :data(std::make_shared<T>())
  11. {}
  12. };
  13. std::shared_ptr<node> head;
  14. public:
  15. void push(const T& data)
  16. {
  17. std::shared_ptr<node> new_node = std::make_shared<node>(data);
  18. new_node->next = std::atomic_load(&head);
  19. while (!std::atomic_compare_exchange_weak(&head, new_node->next, new_node));
  20. }
  21. std::shared_ptr<T> pop()
  22. {
  23. std::shared_ptr<node> old_head = std::atomic_load(&head);
  24. while (old_head && !std::atomic_compare_exchange_weak(&head, &old_head, std::atomic_load(&old_head->next)));
  25. if (old_head)
  26. {
  27. std::atomic_store(&old_head->next, std::shared_ptr<node>());
  28. return old_head->data;
  29. }
  30. return std::make_shared<T>();
  31. }
  32. ~lock_free_stack() {
  33. while (pop());
  34. }
  35. };

我们可以把head指针和next指针改为std::experimental::atomic_shared_ptr<>,这样就无需调用原子类的非成员函数。

每个节点配置内、外部计数器各一个,两个计数器之和即为节点的引用数目。其中,外部计数器与节点指针一起被包装为结构体,节点被访问则自增;内部计数器在节点内部,节点读取操作完成则自减:

  1. template<typename T>
  2. class lock_free_stack
  3. {
  4. private:
  5. struct node;
  6. struct counted_node_ptr
  7. {
  8. int external_count;//外部计数器
  9. node* ptr;
  10. };
  11. struct node
  12. {
  13. std::shared_ptr<T> data;//底层数据
  14. std::atomic<int> internal_count;//内部计数器
  15. counted_node_ptr next;
  16. node(const T& data) :
  17. data(std::make_shared<T>()),
  18. internal_count(0)
  19. {}
  20. };
  21. std::atomic<counted_node_ptr> head;
  22. public:
  23. void push(const T& data)
  24. {
  25. counted_node_ptr new_node;
  26. new_node.ptr = new node(data);
  27. new_node.external_count = 1;//初始为1,因为仅有head指针访问新节点
  28. new_node.ptr->next = head.load();
  29. while (!head.compare_exchange_weak(new_node.ptr->next, new_node));
  30. }
  31. ~lock_free_stack() {
  32. while (pop());
  33. }
  34. };

如果硬件平台支持双字比较-交换操作(指单一指令即可完成64位数据的比较-交换操作),std::atomic<counted_node_ptr>就属于无锁数据结构,若平台不支持,则std::atomic<>中涉及的结构体尺寸过大,无法直接通过原子指令操作,最好使用std::shared_ptr<>实现栈容器。

pop()部分相对复杂:
 

  1. template<typename T>
  2. class lock_free_stack
  3. {
  4. private:
  5. //外部计数器自增
  6. void increase_head_count(count_node_ptr& old_counter)
  7. {
  8. counted_node_ptr new_counter;
  9. do
  10. {
  11. new_counter = old_counter;
  12. ++new_counter.external_count;
  13. } while (!head.compare_exchange_weak(old_counter, new_counter));//该函数接收old_head节点,此处确保old_head没有被改变过
  14. old_counter.external_count = new_counter.external_count;
  15. }
  16. public:
  17. std::shared_ptr<T> pop()
  18. {
  19. counted_node_ptr old_head = head.load();
  20. for (;;)
  21. {
  22. increase_head_count(old_head);//外部计数器自增表明正被指涉,确保读取目标节点是安全行为
  23. const node* ptr = old_head.ptr;//读取指针
  24. if (!ptr)
  25. {
  26. return std::shared_ptr<T>();
  27. }
  28. if (head.compare_exchange_strong(old_head, ptr->next))//尝试弹出
  29. {
  30. std::shared_ptr<T> res;
  31. res.swap(ptr->data);//数据置换,作为返回值备用
  32. const int count_increase = old_head.external_count - 2;//头节点弹出栈-1,当前线程不再访问-1
  33. //若外部计数器的值=内部计数器的值(fetch_add的返回值)的相反数,则删除节点
  34. if (ptr->internal_count.fetch_add(count_increase) == -count_increase)//将外部引用的值加入到内部引用上
  35. {
  36. delete ptr;//内部计数器=-外部计数器,故相加=0,没有线程指涉,可安全删除节点
  37. }
  38. return res;//无论节点是否删除,弹出已完成,返回值
  39. }
  40. else if (ptr->internal_count.fetch_sub(1) == 1)//比较-交换操作失败,表明有另一线程先于本线程压入或弹出节点,先令引用-1
  41. {
  42. delete ptr;//内部计数器-1后=0,表明其它线程已完成弹出,不再被其它线程指涉,因此先删除再继续循环以返回弹出数据
  43. }
  44. }
  45. }
  46. };

在push()中创建节点,线程push()访问,使外部计数器+1,故初始化为1,在pop()函数中,头节点被old_head指涉,故外部计数器+1。

弹出时,外部计数器-2,因为初始节点计数值为1,对应头节点或next节点的指涉,删除节点需要-1,另外是不再由本线程的访问-1。

节点被弹出后,外部计数器会加到内部计数器,其它线程可以通过内部计数器判断是否可以安全删除。(外部计数器用于本线程的控制,内部计数器用于与其它线程共享信息)

2.5无锁栈容器施加内存模型

前文的无锁栈采用默认的std::memory_order_seq_cst次序,可以试图放宽内存次序的约束提高效率。

过程分析:

push线程先构造压入节点,然后设定head指针的新目标。

pop线程首先载入head指针,接着调用自增函数,函数内配合比较-交换操作执行循环。然后从节点读出ptr->next指针,更改head指针指向next。

2.5.1head指针的同步关系

上述过程有一种内存关系,存储操作要在载入操作之前(节点压入在弹出之前),构建先行关系,就要在push中的compare_exchange_weak()采用release或更严格的次序,若执行失败,则head和new_node指针无变化,仅next指针指回到head,只需relaxed次序:

  1. void push(const T& data)
  2. {
  3. counted_node_ptr new_node;
  4. new_node.ptr = new node(data);
  5. new_node.external_count = 1;//初始为1,因为仅有head指针访问新节点
  6. new_node.ptr->next = head.load(std::memory_order_relaxed);
  7. while (!head.compare_exchange_weak(new_node.ptr->next, new_node,
  8. std::memory_order_release, std::memory_order_relaxed));
  9. }

push()中对head指针的比较-交换是释放操作,pop()中对head指针的更新是获取操作,构成先行关系,而push()中加载头节点的操作不影响上述关系,故可采用relaxed次序。 

 pop()函数调用increase_head_count()加载head指针,该操作受到std::memory_order_acquire或更严格的内存约束次序,失败则采用relaxed次序:

  1. void increase_head_count(count_node_ptr& old_counter)
  2. {
  3. counted_node_ptr new_counter;
  4. do
  5. {
  6. new_counter = old_counter;
  7. ++new_counter.external_count;
  8. } while (!head.compare_exchange_weak(old_counter, new_counter,
  9. std::memory_order_acquire, std::memory_order_relaxed));//该函数接收old_head节点,此处确保old_head没有被改变过
  10. old_counter.external_count = new_counter.external_count;
  11. }

pop()中的compare_exchange_strong()操作用于把head指针指向ptr->next,然后会访问ptr->data,我们需保证push()中对ptr->data的存储操作在该载入操作之前发生,而上述内存次序已然确保这种次序关系,故此处compare_exchange_strong()操作可采用relax次序。若失败则old_head更新为head,采用relaxed次序。

这份代码涉及几处比较-交换操作,但head指针是唯一修改的数据项,push()与increase_head_count()的调用同步,后者能安全地读取前者存入的值。

2.5.2ptr指针的同步关系

最后,我们需要确保swap()数据提取操作,先于“重新循环”分支(else if分支)中的delete删除指针操作,因此,以内部计数器为对象,fetch_add()操作使用release次序,fetch_sub()操作使用acquire次序保证先后次序。

优化:但是同一时刻只能有一个线程执行减少引用(else if分支),我们还可以放宽限制,在分支内增加一项load(std::memory_order_acquire)操作,若计数器为0,则执行删除操作的线程重新载入内部计数器,形成同步关系,而fetch_sub()仅需采用relaxed次序。

以下下是最终实现:

  1. template<typename T>
  2. class lock_free_stack
  3. {
  4. private:
  5. struct node;
  6. struct counted_node_ptr
  7. {
  8. int external_count;//外部计数器
  9. node* ptr;
  10. };
  11. struct node
  12. {
  13. std::shared_ptr<T> data;//底层数据
  14. std::atomic<int> internal_count;//内部计数器
  15. counted_node_ptr next;
  16. node(const T& data) :
  17. data(std::make_shared<T>()),
  18. internal_count(0)
  19. {}
  20. };
  21. std::atomic<counted_node_ptr> head;
  22. //外部计数器自增
  23. void increase_head_count(counted_node_ptr& old_counter)
  24. {
  25. counted_node_ptr new_counter;
  26. do
  27. {
  28. new_counter = old_counter;
  29. ++new_counter.external_count;
  30. } while (!head.compare_exchange_weak(old_counter, new_counter,
  31. std::memory_order_acquire, std::memory_order_relaxed));//该函数接收old_head节点,此处确保old_head没有被改变过
  32. old_counter.external_count = new_counter.external_count;
  33. }
  34. public:
  35. void push(const T& data)
  36. {
  37. counted_node_ptr new_node;
  38. new_node.ptr = new node(data);
  39. new_node.external_count = 1;//初始为1,因为仅有head指针访问新节点
  40. new_node.ptr->next = head.load(std::memory_order_relaxed);
  41. while (!head.compare_exchange_weak(new_node.ptr->next, new_node,
  42. std::memory_order_release, std::memory_order_relaxed));
  43. }
  44. std::shared_ptr<T> pop()
  45. {
  46. counted_node_ptr old_head = head.load();
  47. for (;;)
  48. {
  49. increase_head_count(old_head);//外部计数器自增表明正被指涉,确保读取目标节点是安全行为
  50. const node* ptr = old_head.ptr;//读取指向的目标
  51. if (!ptr)
  52. {
  53. return std::shared_ptr<T>();
  54. }
  55. if (head.compare_exchange_strong(old_head, ptr->next))//尝试弹出
  56. {
  57. std::shared_ptr<T> res;
  58. res.swap(ptr->data);//数据置换,作为返回值备用
  59. const int count_increase = old_head.external_count - 2;//头节点弹出栈-1,当前线程不再访问-1
  60. //若外部计数器的值=内部计数器的值(fetch_add的返回值)的相反数,则删除节点
  61. if (ptr->internal_count.fetch_add(count_increase, std::memory_order_release) == -count_increase)//将外部引用的值加入到内部引用上
  62. {
  63. delete ptr;//内部计数器=-外部计数器,故相加=0,没有线程指涉,可安全删除节点
  64. }
  65. return res;//无论节点是否删除,弹出已完成,返回值
  66. }
  67. else if (ptr->internal_count.fetch_sub(1, std::memory_order_relaxed) == 1)//比较-交换操作失败,表明有另一线程先于本线程压入或弹出节点,先令引用-1
  68. {
  69. ptr->internal_count.load(std::memory_order_acquire);
  70. delete ptr;//内部计数器-1后=0,表明其它线程已完成弹出,不再被其它线程指涉,因此先删除再继续循环以返回弹出数据
  71. }
  72. }
  73. }
  74. ~lock_free_stack()
  75. {
  76. while (pop());
  77. }
  78. };

3.线程安全的无锁队列

队列和栈容器稍有不同,其push()和pop()分别访问不同部分,以上一章的线程安全队列为基础,设置两个指针分别指向头、尾节点,采用原子变量代替对应的互斥:

  1. //线程安全的无锁队列
  2. template<typename T>
  3. class lock_free_queue
  4. {
  5. private:
  6. struct node
  7. {
  8. std::shared_ptr<T> data;
  9. node* next;
  10. node():next(nullptr)
  11. {}
  12. };
  13. std::atomic<node*> head;
  14. std::atomic<node*> tail;
  15. node* pop_head()
  16. {
  17. const node* old_head = head.load();
  18. if (old_head == tail.load())
  19. {
  20. return nullptr;
  21. }
  22. head.store(old_head->next);
  23. return old_head;
  24. }
  25. public:
  26. lock_free_queue():head(new node),tail(head.load())
  27. {}
  28. lock_free_queue(const lock_free_queue& other) = delete;
  29. lock_free_queue& operator=(const lock_free_queue& other) = delete;
  30. ~lock_free_queue()
  31. {
  32. while (node* const old_head = head.load())
  33. {
  34. head.store(old_head->next);
  35. delete old_head;
  36. }
  37. }
  38. std::shared_ptr<T> pop()
  39. {
  40. node* old_head = pop_head();
  41. if (!old_head)
  42. {
  43. return std::shared_ptr<T>();
  44. }
  45. const std::shared_ptr<T> res(old_head->data);
  46. delete old_head;
  47. return res;
  48. }
  49. void push(T new_value)
  50. {
  51. std::shared_ptr<T> new_data(std::make_shared<T>(new_value));
  52. node* p = new node;
  53. node* old_tail = tail.load();
  54. old_tail->data.swap(new_data);
  55. old_tail->next = p;
  56. tail.store(p);
  57. }
  58. };

首先,tail的存储操作与其载入操作同步 ,push()中,load()与中的store()构成释放序列;pop_head先完成load()操作,pop()中再完成old_head->data的载入操作。

内外计数器法
在多个线程并发调用push和pop时,便会发生问题,如push()中同时读取尾节点,却将其next指针和数据p设置为不同的值。此时我们参考上文的内外指针方法:

  1. void push(T new_value)
  2. {
  3. std::unique_ptr<T> new_data(new T(new_value));
  4. counted_node_ptr new_next;
  5. new_next.ptr = new node;
  6. new_next.external_count = 1;
  7. for (;;)
  8. {
  9. node* const old_tail = tail.load();
  10. T* old_data = nullptr;//虚拟尾节点的数据为空
  11. if (old_tail->data.compare_exchange_strong(old_data, new_data.get()))
  12. {
  13. old_tail->next = new_next;
  14. tail.store(new_next.ptr);
  15. new_data.release();
  16. break;
  17. }
  18. }
  19. }

上述解决方案有瑕疵,若有pop线程同时删除尾节点,会产生未定义行为。

双外部计数器法

我们采用新的方式,在next指针中新增外部引用计数器,节点内部包含内部计数器和以及记录外部计数器数量的数据成员,若外部计数器被销毁,则该数据成员-1。对任意节点,若外部计数器不存在,且内部计数器归0,表明可以安全删除。

  1. template<typename T>
  2. class lock_free_queue
  3. {
  4. private:
  5. struct node;
  6. struct counted_node_ptr
  7. {
  8. int external_count;
  9. node* ptr;
  10. };
  11. struct node_counter //节点内部计数工具
  12. {
  13. unsigned internal_count : 30;//30位的整型值,维持32位整体尺寸
  14. unsigned external_count : 2;//同一个节点最多有两个外部计数器,因此分配两位的位域
  15. };
  16. struct node
  17. {
  18. std::atomic<T*> data;
  19. std::atomic<node_counter> count;
  20. counted_node_ptr next;
  21. node()
  22. {
  23. //初始化count结构体
  24. node_counter new_count;
  25. new_count.internal_count = 0;
  26. new_count.external_count = 2;//被tail指涉,也被前一节点的next指涉
  27. count.store(new_count);
  28. next.ptr = nullptr;
  29. next.external_count = 0;
  30. }
  31. };
  32. std::atomic<counted_node_ptr> head;
  33. std::atomic<counted_node_ptr> tail;
  34. public:
  35. void push(T new_value)
  36. {
  37. std::unique_ptr<T> new_data(new T(new value));
  38. counted_node_ptr new_next;
  39. new_next.ptr = new node;
  40. new_next.external_count = 1;
  41. counted_node_ptr old_tail = tail.load();
  42. for (;;)
  43. {
  44. increase_external_count(tail, old_tail);
  45. T* old_data = nullptr;
  46. if (old_tail.ptr->data.compare_exchange_strong(old_data, new_data.get()))
  47. {
  48. old_tail.ptr->next = new_next;
  49. old_tail = tail.exchange(new_next);
  50. free_external_counter(old_tail);
  51. new_data.release();
  52. break;
  53. }
  54. old_tail.ptr->release_ref();
  55. }
  56. }
  57. };

计数器结构体32位尺寸,保证在32位或64位计算机上,一个机器字便能容纳整个结构体,其原子操作更有机会以无锁方式实现。

pop()函数从无锁队列弹出头节点,头节点采用了引用计数:

  1. std::unique_ptr<T> pop()
  2. {
  3. counted_node_ptr old_head = head.load(std::memory_order_relaxed);
  4. for (;;)
  5. {
  6. increase_external_count(head, old_head);
  7. node* const ptr = old_head.ptr;
  8. if (ptr == tail.load().ptr)//头节点=尾节点,列表没有数据
  9. {
  10. ptr->release_ref();//释放引用
  11. return std::unique_ptr<T>();
  12. }
  13. if (head.compare_exchange_strong(old_head, ptr->next))
  14. {
  15. T* const res = ptr->data.exchange(nullptr);//res=ptr->data; ptr->data = nullptr;
  16. free_external_counter(old_head);
  17. return std::unique_ptr<T>(res);
  18. }
  19. ptr->release_ref();
  20. }
  21. }

内部引用释放操作,与安全栈的引用自增操作类似:

  1. void release_ref()
  2. {
  3. node_counter old_counter = count.load(std::memory_order_relaxed);
  4. node_counter new_counter;
  5. do
  6. {
  7. new_counter = old_counter;
  8. --new_counter.internal_count;
  9. } while (!count.compare_exchange_strong(old_counter, new_counter,
  10. std::memory_order_acquire, std::memory_order_relaxed));
  11. if (!new_counter.internal_count && !new_counter.external_counters)//计数器为0则删除
  12. {
  13. delete this;
  14. }
  15. }

外部引用增加函数使用静态数据成员,改变head或tail 的外部计数器:

  1. static void increase_external_count(std::atomic<counted_node_ptr>& counter, counted_node_ptr& old_counter)
  2. {
  3. counted_node_ptr new_counter;
  4. do
  5. {
  6. new_counter = old_counter;
  7. ++new_counter.external_count;
  8. } while (!counter.compare_exchange_strong(old_counter, new_counter,
  9. std::memory_order_acquire, std::memory_order_relaxed));
  10. old_counter.external_count = new_counter.external_count;
  11. }

 外部引用释放操作,同样使用静态成员函数,需要传入删除的指针:

  1. static void free_external_counter(counted_node_ptr& old_node_ptr)
  2. {
  3. node* const ptr = old_node_ptr.ptr;
  4. const int count_increase = old_node_ptr.external_count - 2;//指针外的外部计数器-2
  5. node_counter node_counter = ptr->count.load(std::memory_order_relaxed);
  6. node_counter new_counter;
  7. do
  8. {
  9. new_counter = old_counter;
  10. --new_counter.external_counters;
  11. new_counter.internal_count += count_increase;//剩余的外部计数器加到内部
  12. } while (!ptr->count.compare_exchange_strong(
  13. old_counter, new_counter,
  14. std::memory_order_acquire, std::memory_order_relaxed));
  15. if (!new_counter.internal_count && !new_counter.external_counters)
  16. {
  17. delete ptr;
  18. }
  19. }

 push()操作中,需要保证old_tail->ptr为nullptr才能使compare_exchange_strong更新old_tail->ptr指针,故仅有一个线程可以push(),有可能导致多次循环,形成忙等。第一个push令其它线程发生阻塞,执行完毕才解除,所以并非无锁实现。

协助修改next指针

我们需要一种方法,即便有线程因执行push()而被阻塞,也要让它继续执行,可以借助另一线程,帮受阻塞的线程分担工作。

首先修改next指针为原子变量,pop()函数中采用原子操作载入next指针:

  1. std::unique_ptr<T> pop()
  2. {
  3. counted_node_ptr old_head = head.load(std::memory_order_relaxed);
  4. for (;;)
  5. {
  6. increase_external_count(head, old_head);
  7. node* const ptr = old_head.ptr;
  8. if (ptr == tail.load().ptr)//头节点=尾节点,列表没有数据
  9. {
  10. ptr->release_ref();//释放引用
  11. return std::unique_ptr<T>();
  12. }
  13. counted_node_ptr next = ptr->next.load();
  14. if (head.compare_exchange_strong(old_head, next))
  15. {
  16. T* const res = ptr->data.exchange(nullptr);//res=ptr->data; ptr->data = nullptr;
  17. free_external_counter(old_head);
  18. return std::unique_ptr<T>(res);
  19. }
  20. ptr->release_ref();
  21. }
  22. }

push的更改相对复杂:

  1. void set_new_tail(counted_node_ptr& old_tail, counted_node_ptr& new_tail)
  2. {
  3. node* const current_tail_ptr = old_tail.ptr;
  4. while (!tail.compare_exchange_weak(old_tail, new_tail) && old_tail.ptr == current_tail_ptr);
  5. if (old_tail.ptr == current_tail_ptr)
  6. free_external_counter(old_tail);//外部-1
  7. else
  8. current_tail_ptr->release_ref();//内部-1
  9. }
  10. void push(T new_value)
  11. {
  12. std::unique_ptr<T> new_data(new T(new value));
  13. counted_node_ptr new_next;
  14. new_next.ptr = new node;
  15. new_next.external_count = 1;
  16. counted_node_ptr old_tail = tail.load();
  17. for (;;)
  18. {
  19. increase_external_count(tail, old_tail);
  20. T* old_data = nullptr;
  21. if (old_tail.ptr->data.compare_exchange_strong(old_data, new_data.get()))
  22. {
  23. counted_node_ptr old_next = { 0 };
  24. if (!old_tail.ptr->next.compare_exchange_strong(old_next, new_next))
  25. {
  26. delete new_next.ptr;
  27. new_next = old_next;
  28. }
  29. set_new_tail(old_tail, new_next);
  30. //old_tail.ptr->next = new_next;
  31. //old_tail = tail.exchange(new_next);
  32. //free_external_counter(old_tail);
  33. new_data.release();
  34. break;
  35. }
  36. else
  37. {
  38. counted_node_ptr old_next = { 0 };
  39. if (old_tail.ptr->next.compare_exchange_strong(old_next, new_next))
  40. {
  41. old_next = new_next;
  42. new_next.ptr = new node;
  43. }
  44. set_new_tail(old_tail, old_next);
  45. }
  46. }
  47. }

 push只有一个线程可以完成data的设置,失败的则协助更新next指针。

上述代码存在大量new和delete操作,内存分配器的效率会对代码性能产生很大影响,要评判内存分配器的优劣,唯一的方法是进行测试,优化技术包括为每个线程配备独立的内存分配器,引入空闲内存列表循环使用节点,而不交回内存分配器处理。

4.无锁数据结构的设计原则

1.在原型设计中使用默认内存次序,正常工作后逐渐放宽次序约束。

2.无锁内存回收方案。基本要求:目标对象只有在不被任何线程指涉的情况下,才能删除。本章介绍了三种方法:1、暂缓删除动作,没有线程访问数据结构才删除;2、采用风险指针,辨识正被访问的数据;3、引用计数(配合候删链表)。还有一种方法是重复使用节点,数据结构销毁时才完全释放,缺点是可能导致“ABA”问题。

3.防范ABA问题。线程甲读取原子变量x,其值为A,然后根据键值A的值查找值V;此时线程乙对原子变量执行操作,改其值为B,线程甲发生阻塞;线程丙改变了A对应的目标值;线程丁再次改变x的值为A,但此时A关联的目标值发生改变,线程甲无从分辨从而破坏数据结构。
防范方法是在x中引入ABA计数器,每当x的值被改变,计数器自增,如果别的线程改动了x,比较交换操作仍会失败。

4.找出忙等循环,协助其它线程。某线程在执行过程中,因另一线程操作而暂停等待,可以让前者在修改失败时协助修改next指针(要求next指针为原子变量),完成后再回到自身数据的修改。

小结

设计无锁数据结构的要点:

1.改变链表指针时,通常采用while(!compare_compare_weak(..., ...))或if(...),以保证链表安全更新。

2.分析线程中需要更改的指针,把特定指针(头、尾节点)指针设置为原子指针,分析必要的先后次序(如先创建再更改,先返回再删除),以release-acquire的内存次序构成同步关系。

3.暂不能安全删除的节点加入候删链表,或使用协助循环方法,避免忙等。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/寸_铁/article/detail/877168
推荐阅读
相关标签
  

闽ICP备14008679号