分类目录归档:lockfree

[LockFree之美] 使用Hazard Version实现的无锁Stack与Queue

  • 概述

这篇文章(http://oceanbase.org.cn/?p=82)的第6小节讲述了Hazard Version的实现原理,它的设计思想最早由OB团队的席华锋提出,本文不再赘述,本文主要分享Hazard Version的实现要点,以及使用它实现无锁StackQueue的方法,已经在多核系统上的性能测试,代码已在github共享。

  • ABA问题

如《共享变量的并发读写》一文所述,Hazard Version主要解决的是无锁数据结构处理内存释放的问题,为此我们先来回顾一下无锁编程领域最经典的“ABA问题”,使用链表实现的无锁Stack来举例:

使用一个top指针来指向栈顶,使用CAS原子操作对top进行修改,来实现pushpop,一个常见但错误的实现可能是这样的:

	void push(Node *node) {
		Node *curr = top;
		Node *old = curr;
		node→next = curr;
		while (old != (curr = CAS(&top, old, node))) {
			old = curr;
			node→next = curr;		
		}
	}
	Node *pop() {
		Node *curr = top;
		Node *old = curr;
		while (NULL != curr
			old != (curr = CAS(&top, old, curr→next))) {
			old = curr;
		}
		return curr
	}

push的逻辑没问题,问题在于pop,它在还未“锁定”top的情况下,就引用了topnext字段,而此时的top指向的内存空间可能已经被释放甚至重用,一方面可能直接引起非法地址访问使得程序直接崩溃;更严重的可能造成隐蔽的“ABA问题”,考虑如下时序:

  1. Stack状态为 A→B →Ctop指向A

  2. 线程Itop(即节点A的地址)赋值给curr,并取得curr→next指针(为B)放入寄存器

  3. 线程II执行完整pop流程,Stack状态变为B→Ctop指向B,节点A被弹出后释放

  4. 线程II执行完整pop流程,Stack状态变为Ctop指向C,节点B被弹出后释放

  5. 线程II执行完整push流程,新申请节点为被复用的AStack状态变为A→Ctop指向A

  6. 线程I继续执行,CAS判断top值仍为A,则原子替换topB,链表被改坏

我们无法通过简单的修改pop逻辑来避免“ABA问题”,因为这里的纠结之处在于要“锁定(取出)”top,就要使用CAS原子的将top改为它的next指针,但是要安全的引用top→next又要先“锁定(取出)”top。因此解决“ABA问题”的根本,在于保证pop逻辑引用top指针时,它不会被释放,而在释放节点内存空间时,严格保证不会再有其他人引用这个节点。Hazard PointerMichael M M. Hazard pointers: Safe memory reclamation for lock-free objects[J]. IEEE Transactions on Parallel and Distributed Systems, 2004, 15(6): 491-504.)技术是一个里程碑式的创新,真正在实际工程中解决“ABA问题”(memsql/oceanbase都在使用),Hazard Version是对Hazard Pointer在易用性上的改进,原理一致,详细介绍请参考《共享变量的并发读写》一文。

  • Hazard Version实现要点和局限

    • Hazard Version设计概要


      如上图所示,Hazard Version数据结构主要由三部分组成

      • 全局的Current Version变量

      • 每个线程局部一个Hazard Version变量

      • 每个线程局部一个待回收的内存块链表Retire List

      Hazard Version提供如下三个操作逻辑:

      • 每个线程要开始访问“可能被其他线程释放”的内存块前,将当前Current Version的值保存在线程局部的Hazard Version中,对共享内存块的操作完成后,再清除线程局部的Hazard Version值。

      • 要释放一个共享内存块时,原子的将Current Version1后,将旧值保存在内存块中,然后将它挂在线程局部的Retire List上。

      • 当待回收的内存块过多时,遍历所有线程的Hazard Version,以及全局的Current Version,获得最小值(Min Version),遍历待回收的内存块,将Version小于Min Version的内存块回收掉。

    • 获取Min Version的原子性问题

      如上所述,遍历所有线程,获取Min Version的操作并不保证原子性,可能出现一个线程获取到一个很小的Current Version后,被切换出去,而错过了同时进行的遍历操作。这个问题可以通过简单的double check来规避,线程拿到Current Version,保存到线程局部的Hazard Version后,再检查一下当前Current Version是否发生变化,如果已发生变化,则重试直到Current Version不再改变为止。因为如果Current Version未发生变化,即使遍历被错过,Current Version也参加了遍历,所以得到的Min Version就是真正的Hazard Version最小值。

    • 无锁遍历Retire List

      回收内存时,每个线程优先回收自己本地的Retire List,如果全局待回收的节点仍然比较多,则遍历其他线程的Retire List进行回收,这里会涉及多线程操作Retire List(即它的owner线程挂入新节点,而其他线程要进行遍历),一个简单的做法是要遍历Retire List时,使用CASRetire List替换为NULL,取得旧值后,在当前线程本地进行遍历,对遍历剩下的节点,就挂到线程本地的Retire List上去。

    • CAS小技巧

      CAS操作通常在循环中执行,一次CAS执行失败后,要重新准备新的compare参数和assign参数。比如原子修改栈顶:

      Node *old_v = top;
      
      node→next = old_v;
      
      while (old_v != CAS(&top, old_v, node)) {
      
      old_v = top;
      
      node→next = old_v;
      
      }

      因为CAS无论成功与否,都可以返回旧值,因此这里可以用一个临时变量保存CAS的返回值,避免重新取一次top,毕竟重新取top很可能造成缓存miss

      Node *curr_v = top;
      
      Node *old_v = curr_v;
      
      node→next = curr_v;
      
      while (old_v != (curr_v = CAS(&top, old_v, node))) {
      
      old_v = curr_v;
      
      node→next = curr_v;
      
      }
    • Hazard Version的局限

      相对Hazard Pointer来说,Hazard Version这个本质上是管理多个线程Hazard Version的“滑动窗口”,它通过增加获取Min Version的代价,来减小各个线程获取和重置Hazard Version的代价。因此它有与“滑动窗口”一样的局限性,太久不释放的Hazard Version,会阻塞内存回收。在使用时需要注意,对Hazard Versionrequirerelease操作之间,不能有耗时过长的逻辑。

    • Hazard Version代码仓库:

      https://github.com/kayaklee/libhalog/blob/master/src/clib/hal_hazard_version.h

  • 无锁Stack

    使用Hazard Version实现无锁Stack比较简单,因为push不涉及“ABA问题”,所以只需要在pop时使用Hazard Version保护即可。

    代码请参考:https://github.com/kayaklee/libhalog/blob/master/test/clib/hv_sample_lifo.cpp

    性能测试结果:如下图所示,在美团云12核虚拟机上,6个线程push6个线程pop,不带pop结果检查情况下,最快接近780/spush+pop次数。

     stack_perf

  • 无锁Queue

    Stack不同,实现无锁Queue的一个难点在要维护HeadTail两个指针,在Queue由空变为非空,和非空变为空的时候,需要保证原子的修改和判断这两个指针,这几乎是没有办法实现的。一个变通的方法,是受到Hazard Pointer论文的启发,在初始状态下,引入一个Dummy Header节点,无论push还是pop,都始终保证headtail不会变为NULL

    这样一来,push操作不需要管head指针,每次都只修改tail即可;而pop操作,是取head→next节点中的值作为队首数据,确认head→next不为空,然后原子的将head改为head→next。因为pushpop操作都存在先拿tailhead指针,然后引用它们的next成员的操作,因此都需要使用Hazard Version进行保护。

    代码请参考:https://github.com/kayaklee/libhalog/blob/master/test/clib/hv_sample_fifo.cpp

    性能测试结果:如下图所示,在美团云12核虚拟机上,6个线程push6个线程pop,不带pop结果检查情况下,最快超过1300/spush+pop次数。

    queue_perf

  • StackQueue性能测试数据和虚拟机cpu信息:

    https://github.com/kayaklee/libhalog/tree/master/test/clib/hv_performance_record

Loading

[LockFree之美] 共享变量的并发读写

共享对象的并发读写

在高性能并发服务器中,对于共享对象的读写是最常见的操作之一,比如全局配置类对象的并发读取和更新,以及更复杂的如copy on write btree、堆栈等的并发读写,最基本的操作都可以简化理解为通过全局共享的指针,并发读取和更新指针所指向对象的操作。
最简单的模型如下所示,一个包含了多个字段的结构体:

struct GConf
{
  int64_t a;
  int64_t b;
  int64_t c;
};

可以通过一个全局指针struct gConf *g_conf进行访问。有多个线程会同时读取或更新这个对象,要求任何时刻读取到的内容一定是“一致的”,即读取到的a/b/c的值一定是相同一次更新的值,而不能出现读到的a是某一次更新的值,而b或c是另外一次更新的值。
我们假设研发平台为x86_64,指令级的原子操作最多是64位(实际intel提供了128bit的原子操作,但是本文后续的讨论的方法只用到64bit的原子操作),因为gConf包含三个8字节整数,无法使用原子操作指令对a/b/c一次性赋值,因此必须通过程序算法来保证并发情况下读取到的内容满足“一致性”。
下面开始从易到难讨论5种处理“共享对象并发读写”的方法。

1. 读写锁
最简单朴素的想法是为这个结构配备一个读写锁,读取操作在读锁保护下执行,更新操作在写锁保护下执行。这个方法在对象读取和更新操作都比较简单,且并发程度不高的情况下是一个不错的选择。但是由于读锁与写锁互相阻塞,在读取或更新操作比较费时的情况下,会出现更新或读取线程被阻塞的风险,在高并发的场景下,这种阻塞不可接受。一个实例是数据库系统的全局schema管理器,每次事务都需要读取schema的内容,而DDL会引起schema更新,它的内容比较复杂,更新操作比较费时,会阻塞住处理事务的线程。

2. 引用计数的问题
为了解决读写操作相互阻塞的问题,使用类似Copy on write的方式,每次更新时都构造新的对象,在更新过程中,读取操作还是读取旧的对象,更新操作完成后,原子的替换掉指针使其指向新的对象。而被替换掉的对象不能立即释放,而是要确保这个对象不再被继续使用的情况下,才能将其释放掉。因此这里使用引用计数机制,在读取前将引用计数加1,读取完毕后将引用计数减1,当引用计数减到0的时候,表示没人在使用这个对象,可以将其释放掉。
引用计数在没有并发保护情况下面临的问题是,取得全局指针g_conf和通过这个指针操作对象的引用计数这两个操作并非原子的。可能存在的情况是,一个线程T1取得g_conf的值保存到寄存器后,就被切换出去,另外一个线程T2将g_conf替换后,判断旧的对象引用计数为0,则将其释放。当T1被切换回来继续执行时,它在寄存器中保存的g_conf指向的对象已经被释放,访问对象引用计数的这一次操作就已经存在非法内存访问的风险。
下面两节讨论两种解决并发引用计数的方案。

3. 带锁的引用计数
解决第2点提出的原子性问题,一个最直接的方式是使用锁,将读写g_conf和修改引用计数两个操作放在一个独立全局锁(g_lock)的临界区执行,代码示例如下:

读取并将引用计数加1:

fetch()
{
  g_lock.rdlock();
  g_conf->inc_ref();
  ret = g_conf;
  g_lock.unlock();
}

将引用计数减1:

revert(ptr)
{
  g_lock.rdlock();
  if(0 == ptr->dec_ref())
  {
    free(ptr);
  }
  g_lock.unlock();
}

使用新的对象替换旧对象:

set_new_conf(new_conf)
{
  g_lock.wrlock()
  if (0 == g_conf->dec_ref())
  {
    free(g_conf);
  }
  new_conf->inc_ref();
  g_conf = new_conf;
  g_lock.unlock();
}

在实际场景中,通常读取g_conf的操作更多且并发量大,而替换g_conf的操作不会经常发生,因此读取g_conf时加共享锁,替换g_conf时加互斥锁。
使用锁保护引用计数的方式在高并发的情况下存在一些局限:一方面可能存在读者或写者被饿死的情况;另一方面多个写者无法并发,应用在copy on write btree等数据结构中时,多个写者相互阻塞的问题会影响到整个数据结构的并发性能。

4. 无锁的引用计数
因此,一个更进一步的设计是使用无锁的引用计数,即无论读取g_conf还是替换g_conf都不在锁中进行,如下图所示:share_obj_seq_ref

RefNode的结构包含了指向GConf对象的指针data_ptr,以及引用计数ref_cnt,自增序列号seq_id,ref_cnt和seq_id长度都为4字节且地址连续,可以经由一次8字节的原子操作进行原子性的修改和读取。每次构造新的GConf对象的要同时分配新的RefNode来保存GConf对象的地址。
RefNode对象的分配和释放由专用的分配器进行分配和重用,一旦分配就不会释放回系统。引用计数的加1和减1,操作的是RefNode对象的ref_cnt字段,当引用计数减为0时,将data_ptr指向的对象释放,并将seq_id加1,然后将RefNode对象释放给内存分配器。已释放回分配器的RefNode对象仍然可以被安全的访问到。
替代全局GConf指针,我们增加了一个被称为GNode结构体的全局对象称为g_node,它包含了指向RefNode对象的指针称为node_idx,和这个RefNode中seq_id字段的值node_seq。node_idx和seq_id长度都为4字节且地址连续,可以经由一次8字节的原子操作进行原子性的读取和替换。
读取和替换操作的代码示例如下:
将引用计数加1并返回:

fetch()
{
  GNode tmp_g_node = atomic_load(g_node);
  while (true)
  {
    if (tmp_g_node.node_idx->atomic_check_seq_and_inc_ref(tmp_g_node.node_seq))
    {
      ret = tmp_g_node.node_idx;
      break;
    }
    else
    {
      tmp_g_node = atomic_load(g_node);
    }
  }
}

将引用计数减1:

revert(node_idx)
{
  if (node_idx->atomic_dec_ref_and_inc_seq())
  {
    free(node_idx->data_ptr);
    free_list.free(node_idx);
  }
}

使用新的对象替换旧对象:

set_new_conf(new_conf)
{
  RefNode *new_node = free_list.alloc();
  new_node->ref_cnt = 1;
  new_node->data_ptr = new_conf;

  GNode new_g_node;
  new_g_node.node_seq = new_node->seq_id;
  new_g_node.node_idx = new_node;

  GNode old_g_node = atomic_store_and_fetch_old(&g_node, new_g_node);
  if (old_g_node.node_idx->atomic_dec_ref_and_inc_seq())
  {
    free(old_g_node.node_idx->data_ptr);
    free_list.free(old_g_node.node_idx);
  }
}

其中几个原子操作的作用:
1. RefNode::atomic_check_seq_and_inc_ref原子性的判断如果seq_id等于传入参数就将ref_cnt加1,并返回true,否则返回false。
2. atomic_store_and_fetch_old原子性的将第二个参数赋值给第一个参数,并返回第一个参数的旧值。
3. RefNode::atomic_dec_ref_and_inc_seq原子性的将ref_cnt减1,并判断ref_cnt如果已减为0,则将seq_id加1并返回true,否则返回false。
工程实现上,为了达到node_idx长度只能有4字节的要求,RefNode分配器设计为定长数组,node_idx为数组下标,使用定长无锁队列作为分配器来维护空闲的node指针。一般来说一个线程只会增加一次引用计数,因此对于维护GConf这样全局只有一个的对象,定长RefNode数组的长度初始化为与线程个数相等就够用了。

5. HazardPointer
引用计数基本可以解决共享对象并发读写的问题,但它仍然存在一些不足:第一,每次读取都需要使用原子操作修改全局引用计数,高并发情况下的对同一个变量的原子操作可能成为性能瓶颈;第二,管理RefNode对象的无锁分配器有额外的管理和维护成本,增加的实现复杂度;第三,针对每个共享对象都需要维护N个(N为线程个数)RefNode,如果共享对象数量较多(比如Btree节点),为节省空间,还需要额外的机制来避免大量使用RefNode。
因此再介绍一种被称为HazardPointer的思路,它由Maged M. Michael在论文“Hazard Pointers: Safe Memory Reclamation for Lock-Free Objects”中提出,基本思路是将可能要被访问到的共享对象指针(成为hazard pointer)先保存到线程局部,然后再访问,访问完成后从线程局部移除。而要释放一个共享对象时,则要先遍历查询所有线程的局部信息,如果尚有线程局部保存有这个共享对象的指针,说明这个线程有可能将要访问这个对象,因此不能释放,只有所有线程的局部信息中都没有保存这个共享对象的指针情况下,才能将其释放。
读取和替换操作的代码示例如下:

g_hp_array[N]; //保存每个线程局部信息的数组,N为线程个数

读取操作:

while(true)
{
  ret = g_conf;
  g_hp_array[thread_id] = ret;
  if (g_conf == ref)
  {
    break;
  }
  else
  {
    g_hp_array[thread_id] = NULL;
  }
}
// 读取逻辑代码…
g_hp_array[thread_id] = NULL;

使用新的对象替换旧对象:

set_new_conf(new_conf)
{
  retired_ptr = atomic_store_and_fetch_old(&g_conf, new_conf);
  found = false;
  for (i = 0; i < g_hp_array.length(); i++)
  {
    if (retired_ptr == g_hp_array[i])
    {
      found = true;
      break;
    }
  }
  if (!found)
  {
    free(retired_ptr);
  }
}

HazardPointer的设计解决了引用计数原子操作的性能瓶颈,避免了实现额外RefNode无锁分配器的复杂度。但是同时引入一个新的问题:由于回收内存的操作需要遍历查询全局数组g_hp_array,代价比较大,通常工程上的做法是将待回收的指针暂存在线程局部,等数量超过一个配置门限时,才进行批量回收。使得回收内存产生延迟,并且回收操作本身由于需要遍历数组,也会消耗较多的时间。
关于并发场景的问题,由于回收操作便利g_hp_array并不能保证原子的遍历,而HazardPointer的设计要求对象已经被放入g_hp_array后是能够安全访问的,因此可能存在如下时序:
1. 读取线程拿到g_conf指针存入局部变量ret
2. 更新线程将g_conf指针替换,扫描g_hp_array没有找到g_conf,释放g_conf内存
3. 读取线程将刚刚拿到的ret放入g_hp_array,然后开始使用ret,而此时ret指向的内存已经释放
为了处理这个问题,如上述伪代码所示,读取线程需要增加double check逻辑,判断如果从拿到g_conf到放入g_hp_array之间,可能有回收逻辑遍历过g_hp_array,则要重试。
因为替换g_conf意味着要将旧的g_conf释放,而释放意味着要先会遍历g_hp_array,因此读取线程只需要在将g_conf放入g_hp_array后,判断g_conf是否变化,如果变化了,则说明g_conf可能正好在从拿到g_conf到放入g_hp_array之间被释放了,因此需要重试。

6. HazardVersion

HazardPointer的实现简单,但是与引用计数法有着相似的不足:需要为每个共享变量维护O(N)个(N为线程个数)槽位,使用者需要仔细分析算法以尽量减少同时存在的hazard pointer 或者引入其他机制封装多个共享变量称为一个hazard pointer。这样就给使用者带来了比较大的难度,HazardPointer机制也与具体数据结构的实现比较紧的耦合在一起。

因此在HazardPointer基础上发展出了被称为HazardVersion技术,它提供类似lock一样的acquire/release接口,支持无限制个数共享对象的管理。

与HazardPointer的实现不同:首先全局要维护一个int64_t类型的GlobalVersion;要访问共享对象前,先将当时的GlobalVersion保存到线程局部,称为hazard version;而每次要释放共享对象的时候先将当前GlobalVersion保存在共享对象,然后将GlobalVersion原子的加1,然后遍历所有线程的局部信息,找到最小的version称为reclaim version,判断如果待释放的对象中保存的version小于reclaim version则可以释放。

读取和替换操作的代码示例如下:

g_version; //全局GlobalVersion
g_hv_array[N]; //保存每个线程局部信息的数组,N为线程个数

读取操作:

g_hv_array[thread_id] = g_version;
ret = g_conf;
// 读取逻辑代码…
g_hv_array[thread_id] = INT64_MAX;

使用新的对象替换旧对象:

set_new_conf(new_conf)
{
  retired_ptr = atomic_store_and_fetch_old(&g_conf, new_conf);
  retired_ptr->set_version(atomic_fetch_and_add(&g_version, 1));
  reclaim_version = INT64_MAX;
  for (i = 0; i < g_hv_array.length(); i++)
  {
    if (reclaim_version > g_hv_array[i])
    {
      reclaim_version = g_hv_array[i];
    }
  }
  if (reclaim_version > retired_ptr->get_version())
  {
    free(retired_ptr);
  }
}

可以看到,读取操作先保存了GlobalVersion到线程局部,然后去读取g_conf指针,虽然可能同时发生的回收操作遍历g_hv_array并不保证原子性,但是即使一个更小的hazard version被放入g_hv_array而错过了遍历,也不会有风险。

因为回收操作是先更新g_conf指针后遍历g_hv_array,而读取操作是先更新g_hv_array后读取g_conf指针,所以最坏情况下读取操作与回收操作同时发生,且读取操作放入g_hv_array的hazard version被回收操作遍历时错过,那么读取操作读到的g_conf一定是被回收操作更新之后的,本次不会被回收,可以安全的访问。

HazardVersion相对HazardPointer,在用法上更加简单,无需针对具体的数据结构进行分析,局限是需要在被管理的对象中保存version。其次,与HazardPointer一样,由于遍历g_hv_array比较费时,因此也需要引入类似的延迟回收机制。需要注意的是,它的缺点也比较明显,一旦出现一个线程长时间不释放HazardVersin时,会阻塞住后面更大Version对象的释放。

7. 总结
本文讨论了“读写锁”、“带锁的引用计数”、“无锁的引用计数”、“HazardPointer”、“HazardVersion”五种方法来处理共享对象的并发读写问题,在实际工程领域,“带锁的引用计数”比较常见,在特定场景下优化读写锁可以得到不错的性能,并且很易于理解和实现;“无锁的引用计数”实现复杂但并发读写性能卓越,在已开源的OceanBase版本中也有实践;“HazardPointer”实现简单,但是理解和使用都较复杂;“HazardVersion”是一种基于“HazardPointer”的创新思路,有一定的局限性,凭借简单易用的接口,可以在很多场景替代“HazardPointer”。

Loading