无锁多生产者多消费者环形缓冲区队列

如今,高性能服务器软件(例如,HTTP 加速器)在大多数情况下运行在多核机器上。现代硬件可以提供 32、64 甚至更多 CPU 核心。在如此高度并发的环境中,锁竞争有时比数据复制、上下文切换等等更损害整体系统性能。因此,将最热门的数据结构从加锁设计转向无锁设计可以显著提高多核环境中的软件性能。

在传统服务器软件中,最热门的数据结构之一是工作队列,它可能每秒执行数十万次推送和弹出操作,来自数十个生产者和/或消费者。

工作队列是一种 FIFO 数据结构,只有两个操作:push() 和 pop()。它通常限制其大小,以便在队列中没有元素时 pop() 等待,而在队列包含允许的最大元素数时 push() 等待。重要的是,许多线程可以同时在不同的 CPU 核心上执行 pop() 和 push() 操作。

一种可能的工作队列实现是使用环形缓冲区来存储指向队列元素的指针。与常见的非侵入式链表(它存储用户传递的值的副本,例如 std::list)相比,它具有良好的性能。关于环形缓冲区实现的重要之处在于它原生限制了其大小——您只能以循环方式移动当前位置。另一方面,链表需要维护一个额外的字段来表示总队列长度。使用链表,push 和 pop 操作除了更新元素链接外,还必须修改队列长度,因此您需要更加注意队列中无锁实现的一致性。

基本上,不同的 CPU 体系结构为内存操作的排序提供了不同的保证,这对于无锁算法至关重要。在本文中,我专注于 x86,因为它是最广泛使用的体系结构,而不是编写通用(但速度较慢)的代码。

朴素的同步队列

首先,让我们定义队列的接口(本文中使用 C++11)


template<class T, long Q_SIZE>
class NaiveQueue {
public:
    NaiveQueue();
    void push(T *x);
    T *pop();
};

队列将存储 T* 指针,最大大小为 Q_SIZE。

让我们看看在朴素的加锁实现中队列会是什么样子。要开发队列,我们需要一个数组,我们在其中放置环形缓冲区。我们可以将其定义为


T *ptr_array_[Q_SIZE];

类的两个成员 head_ 和 tail_ 将指向队列的头部(下一个推送元素的位置)和尾部(下一个弹出元素的位置),并且应在类构造函数中初始化为零。我们可以通过将计数器定义为 unsigned long 来简化环形缓冲区上的操作。一个 unsigned long(长度为 64 位)足够处理数百万次/秒的操作,持续数千年。因此,tail_ 和 head_ 将定义为


unsigned long head_;
unsigned long tail_;

这样,我们就可以通过以下方式访问元素(head_ 和 tail_ 相同)


ptr_array_[tail_++ & Q_MASK]

其中 Q_MASK 定义为


static const unsigned long Q_MASK = Q_SIZE - 1;

为了获得数组中的当前位置,我们可以计算 tail_ 除以 Q_SIZE 的整数除法的余数,但我们最好将 Q_SIZE 定义为 2 的幂(在我们的例子中为 32768),这样我们就可以使用 Q_MASK 和 tail_ 之间的按位与运算,这样会更快。

因为队列中的操作必须在没有元素或队列已满时等待,所以我们需要两个条件变量


std::condition_variable cond_empty_;
std::condition_variable cond_overflow_;

分别等待队列中的新元素或可用空间。当然,我们需要一个互斥锁来序列化我们的队列


std::mutex mtx_;

这样,我们可以用以下方式编写 push() 和 pop()


void push(T *x)
{
    std::unique_lock<std::mutex> lock(mtx_);

    cond_overflow_.wait(lock, [&head_, &tail_]() {
                    return tail_ + Q_SIZE > head_;
            });

    ptr_array_[head_++ & Q_MASK] = x;

    cond_empty_.notify_one();
}

T *pop()
{
    std::unique_lock<std::mutex> lock(mtx_);

    cond_empty_.wait(lock, [&head_, &tail_]() {
                    return tail_ < head_;
            });

    T *x = ptr_array_[tail_++ & Q_MASK];

    cond_overflow_.notify_one();

    return x;
}
]]>

我们在使用 mtx_ 获取独占锁的情况下执行这两个操作。当获取锁时,我们可以检查当前队列状态:它是空的(我们无法弹出任何新元素)还是满的(我们无法推送新元素)。std::condition_variable::wait() 将当前线程移动到睡眠状态,直到指定的谓词为真。接下来,我们推送或弹出一个元素,并通知另一个线程(通过 notify_one() 调用)我们已更改队列状态。因为我们一次只添加或删除一个元素,所以只有一个等待可用元素或队列中空闲槽位的线程可以取得进展,因此我们只通知并唤醒一个线程。

此实现的问题在于,在单个时间点只能有一个线程修改队列。此外,互斥锁和条件变量非常昂贵——在 Linux 中,它们通过 futex(2) 系统调用实现。因此,每次线程需要等待互斥锁或条件变量时,都会导致调用 futex(2),这将重新调度线程并将其移动到等待队列。

现在,让我们运行一个基本测试,该测试仅在 16 个生产者和 16 个消费者中推送和弹出队列的地址(文章末尾有一个指向完整源代码的链接)。在具有 16 个 Xeon 核心的机器上,测试大约花费了七分钟


# time ./a.out

real    6m59.219s
user    6m21.515s
sys     72m34.177s

并且,带有 -c-f 选项的 strace 表明程序 99.98% 的时间都花费在 futex 系统调用中。

无锁多生产者多消费者队列

希望您不必向内核寻求用户空间线程同步方面的帮助。CPU(至少在最常见的体系结构中)提供原子内存操作和屏障。通过这些操作,您可以原子地

  • 读取内存操作数,修改它并写回。

  • 读取内存操作数,将其与一个值进行比较,并将其与另一个值交换。

内存屏障是特殊的汇编指令,也称为栅栏。栅栏保证指令在本地 CPU 上的执行顺序以及在其他 CPU 上的可见性顺序。让我们考虑两个独立的 data 指令 A 和 B,它们之间用栅栏分隔(让我们使用 mfence,它为读写操作的排序提供保证)


A
mfence
B

栅栏保证

  1. 编译器优化不会将 A 移动到栅栏之后,也不会将 B 移动到栅栏之前。

  2. CPU 将按顺序执行 A 和 B 指令(它通常乱序执行指令)。

  3. 在同一总线上工作的其他 CPU 核心和处理器封装将先看到指令 A 的结果,然后看到指令 B 的结果。

对于我们的队列,我们需要同步多个线程对 head_ 和 tail_ 字段的访问。实际上,当您在两个核心上运行 head_++ 时(这是 RMW,读取-修改-写入操作的一个示例,因为处理器必须读取当前的 head_ 值,在本地递增它,然后将其写回内存),两个核心都可以同时读取当前的 head_ 值,递增它并同时写回新值,因此一个增量丢失了。对于原子操作,C++11 提供了 std::atomic 模板,它应该在未来取代当前的 GCC sync_ 内联函数。不幸的是,对于我的编译器(x86-64 的 GCC 4.6.3),std::atomic<> 方法仍然独立于指定的内存顺序生成额外的栅栏。因此,我将使用旧的 GCC 内联函数进行原子操作。

我们可以通过以下方式原子地读取和递增变量(例如,我们的 head_)


__sync_fetch_and_add(&head_, 1);

这使得 CPU 锁定它将要执行操作(在我们的例子中是递增)的共享内存位置。在多处理器环境中,处理器相互通信以确保它们都看到相关数据。这被称为缓存一致性协议。通过此协议,处理器可以获得对内存位置的独占所有权。但是,这些通信不是免费的,我们应该谨慎地使用此类原子操作,并且仅在真正需要它们时才使用。否则,我们可能会严重损害性能。

同时,对内存位置的普通读写操作原子地执行,不需要任何额外的操作(例如,指定 lock 前缀以使指令在 x86 体系结构上原子地运行)。

在我们的无锁实现中,我们将放弃互斥锁 mtx_,因此也将放弃两个条件变量。但是,如果 push 操作时队列已满,pop 操作时队列为空,我们仍然需要等待。对于 push,我们将使用一个简单的循环来执行此操作,就像我们对加锁队列所做的那样


while (tail_ + Q_SIZE < head_)
    sched_yield();

sched_yield() 只是让其他线程在当前处理器上运行。这是重新调度当前线程的本机方式和最快方式。但是,如果调度程序运行队列中没有其他线程等待可用的 CPU,则当前线程将立即重新调度回来。因此,即使我们没有数据要处理,我们也总是会看到 100% 的 CPU 使用率。为了解决这个问题,我们可以使用 usleep(3) 并使用一些小值。

让我们更仔细地看看循环中发生了什么。首先,我们读取 tail_ 值;接下来我们读取 head_ 的值,之后,我们决定是等待还是推送元素并将 head_ 向前移动。当前线程可以在检查期间甚至在检查之后在任何位置调度。让我们考虑双线程场景(表 1)。

表 1. 双线程场景
线程 1 线程 2
读取 tail_ 读取 tail_
读取 head_ 读取 head_
(已调度) 推送一个元素
推送一个元素

如果环形缓冲区中只有一个空闲位置,我们将覆盖指向最旧队列元素的指针。我们可以通过在循环之前递增共享的 head_ 并使用临时局部变量来解决问题(也就是说,我们保留一个位置,我们将在其中插入元素,并等待它空闲)


unsigned long tmp_head =
    __sync_fetch_and_add(&head_, 1);
while (tail_ + Q_SIZE < tmp_head)
    sched_yield();
ptr_array_[tmp_head & Q_MASK] = x;

我们可以为 pop() 编写类似的代码——只需交换 head 和 tail 即可。但是,问题仍然存在。两个生产者可以递增 head_,检查它们是否有足够的空间,并在刚好在插入 x 之前同时重新调度。消费者可以立即唤醒(它看到 head_ 向前移动了两个位置),并从尚未插入的队列中读取值。

在解决这个问题之前,让我们考虑以下示例,其中我们有两个生产者(P1 和 P2)和两个消费者(C1 和 C2)


             LT                          LH
| _ | _ | _ | x | x | x | x | x | x | x | _ | _ | _ |
              ^   ^                       ^   ^
              |   |                       |   |
              C1  C2                      P1  P2

在此示例中,“_”表示空闲槽位,“x”表示插入的元素。C1 和 C2 将读取值,而 P1 和 P2 将向当前空闲槽位写入元素。令 LT 为所有消费者中最新的(最低的)尾部值,它存储在最新消费者 C1 的 tmp_tail 中。消费者 C1 当前可以在 LT 位置处理队列(也就是说,它正在获取元素的中间)。并且,令 LH 相应地为所有生产者中 tmp_head 的最低值。在每个给定时间,我们不能将元素推送到等于或大于 LT 的位置,并且我们不应尝试在等于或大于 LH 的位置弹出元素。这意味着所有生产者都应该关心当前的 LT 值,而所有消费者都应该关心当前的 LH 值。因此,让我们为 LH 和 LT 引入两个辅助类成员


volatile unsigned long last_head_;
volatile unsigned long last_tail_;

因此,我们应该在上面的循环中检查 last_tail_ 值而不是 tail_。我们需要从多个线程更新这些值,但我们将通过普通写操作(没有 RMW)来完成此操作,因此成员不必是原子类型。我只是将变量指定为 volatile,以防止它们的值缓存在本地处理器寄存器中。

现在的问题是谁应该更新 last_head_ 和 last_tail_ 值,以及何时更新。我们预计在大多数情况下,我们能够对队列执行 push 和/或 pop 操作而无需等待。因此,我们只能在真正需要它们时才更新这两个辅助变量——也就是说,在等待循环内。因此,当生产者意识到由于 last_tail_ 值太小而无法插入新元素时,它会进入等待循环并尝试更新 last_tail_ 值。为了更新该值,线程必须检查每个消费者的当前 tmp_tail。因此,我们需要使临时值对其他线程可见。一种可能的解决方案是维护一个 tmp_tail 和 tmp_head 值数组,其大小等于正在运行的线程数。我们可以使用以下代码来完成此操作


struct ThrPos {
    volatile unsigned long head, tail;
};

ThrPos thr_p_[std::max(n_consumers_, n_producers_)];

其中 n_consumers_ 是消费者的数量,n_producers_ 是生产者的数量。我们可以动态分配数组,但为了简单起见,现在将其保持静态大小。许多线程读取数组的元素,但只有一个线程可以使用普通移动指令(没有 RMW 操作)更新它们,因此我们也可以对变量使用常规读取。

因为 thr_p_ 值仅用于限制当前队列指针的移动,所以我们将它们初始化为允许的最大值——也就是说,在有人推送到队列或从队列中弹出之前,我们不限制 head_ 和 tail_ 的移动。

我们可以使用以下循环找到所有消费者的最低尾部值


auto min = tail_;
for (size_t i = 0; i < n_consumers_; ++i) {
    auto tmp_t = thr_p_[i].tail;

    asm volatile("" ::: "memory"); // compiler barrier

    if (tmp_t < min)
        min = tmp_t;
1}

这里需要临时变量 tmp_t,因为我们无法原子地比较 thr_p_[i].tail 是否小于 min,并在小于时更新 min。当我们记住当前消费者的尾部并将其与 min 进行比较时,消费者可以移动尾部。它只能向前移动,因此 while 条件中的检查仍然正确,我们不会覆盖一些活动队列元素。但是,如果我们不使用 tmp_t,我们将代码编写成这样


if (thr_p_[i].tail < min)
    min = thr_p_[i].tail;

然后,当我们将消费者与 min 进行比较时,消费者可能具有较低的尾部值,但在比较完成后且刚好在赋值之前将其向前移动很远。因此,我们可能会找到不正确的最小值。

我添加了编译器屏障 asm volatile("" ::: "memory)——这是一个 GCC 特定的编译器屏障——以确保编译器不会移动 thr_p_[i].tail 访问,并且只访问内存位置一次——将其值加载到 tmp_t

关于数组的一个重要事项是,它必须由当前线程标识符索引。因为 POSIX 线程(以及因此使用它们的 C++ 线程)不使用小的单调递增值来标识线程,所以我们需要使用我们自己的线程包装。我将使用队列的内联 thr_pos() 方法来访问数组元素


ThrPos& thr_pos() const
{
    return thr_p_[ThrId()];
}

您可以在文章末尾引用的源代码中找到 ThrId() 实现的示例。

在编写 push() 和 pop() 的最终实现之前,让我们回到我们队列的初始应用,即工作队列。通常,生产者和消费者在队列操作之间执行大量工作。例如,它可能是一个非常慢的 IO 操作。那么,如果一个消费者从队列中获取一个元素并进入长时间的 IO 操作睡眠状态会发生什么?它的尾部值将长时间保持不变,并且所有生产者都将等待它,而所有其他消费者都完全清空了队列。这不是我们期望的行为。

让我们分两步解决这个问题。首先,让我们在获取元素后立即为每个线程的尾部指针分配允许的最大值。因此,我们应该在 pop() 方法的末尾编写以下内容


T *ret = ptr_array_[thr_pos().tail & Q_MASK];
thr_pos().tail = ULONG_MAX;
return ret;

因为 push() 中的生产者开始从全局 tail_ 的当前值中查找 last_tail_ 的最小允许值,所以只有在没有活动消费者时,它才能将当前的 tail_ 值分配给 last_tail_(这就是我们想要的)。

一般来说,其他处理器可以在本地处理器从 ptr_array_ 读取之前看到 thr_pos().tail 更新,因此它们可以在本地处理器读取之前移动并覆盖数组中的位置。这在具有宽松内存操作排序的处理器上是可能的。但是,x86 提供了相对严格的内存排序规则——特别是,它保证 1) 存储不会与较早的加载重新排序,并且 2) 其他处理器以一致的顺序看到存储。因此,在 x86 上,从 ptr_array_ 加载和存储到 thr_pos().tail 的代码将按此顺序完成并被所有处理器看到。

第二步是在 pop() 的开头正确设置 thr_pos().tail。我们使用以下内容分配当前的 thr_pos().tail


thr_pos().tail = __sync_fetch_and_add(&tail_, 1);

问题是该操作仅对 tail_ 移位是原子的,但对 thr_pos().tail 分配不是原子的。因此,存在一个时间窗口,其中 thr_pos().tail = ULONG_MAX,并且 tail_ 可能会被其他消费者显着移动,因此 push() 会将 last_tail_ 设置为当前的、刚刚递增的 tail_。因此,当我们要弹出一个元素时,我们必须保留一个小于或等于 tail_ 值的尾部位置,我们将从中弹出一个元素


thr_pos().tail = tail_;
thr_pos().tail = __sync_fetch_and_add(&tail_, 1);

在此代码中,我们实际上执行以下三个操作

  • 将 tail_ 写入 thr_pos().tail。

  • 递增 tail_。

  • 将 tail_ 的先前值写入 thr_pos().tail。

同样,在这种一般情况下,我们无法保证其他处理器将以相同的顺序“看到”写操作的结果。潜在地,一些其他处理器可以首先读取递增的 tail_ 值,尝试找到新的 last_tail_,然后才读取新的当前线程尾部值。但是,__sync_fetch_and_add() 执行锁定指令,这意味着大多数体系结构(包括 x86)上的隐式完全内存屏障,因此第一个和第三个操作都不能在第二个操作之上移动。因此,我们也可以在此处跳过显式内存屏障。

因此,如果队列几乎已满,则所有生产者都将在我们弹出的元素的位置或之前停止。

现在让我们编写 push() 和 pop() 方法的最终实现


void push(T *ptr)
{
    thr_pos().head = head_;
    thr_pos().head = __sync_fetch_and_add(&head_, 1);

    while (__builtin_expect(thr_pos().head >=
                            last_tail_ + Q_SIZE, 0))
    {
        ::sched_yield();

        auto min = tail_;
        for (size_t i = 0; i < n_consumers_; ++i) {
            auto tmp_t = thr_p_[i].tail;

            asm volatile("" ::: "memory"); // compiler barrier

            if (tmp_t < min)
                min = tmp_t;
        }
        last_tail_ = min;
    }

    ptr_array_[thr_pos().head & Q_MASK] = ptr;
    thr_pos().head = ULONG_MAX;
}

T *pop()
{
    thr_pos().tail = tail_;
    thr_pos().tail = __sync_fetch_and_add(&tail_, 1);

    while (__builtin_expect(thr_pos().tail >=
                            last_head_, 0))
    {
        ::sched_yield();

        auto min = head_;
        for (size_t i = 0; i < n_producers_; ++i) {
            auto tmp_h = thr_p_[i].head;
       
            asm volatile("" ::: "memory"); // compiler barrier

            if (tmp_h < min)
                min = tmp_h;
        }
        last_head_ = min;
    }

    T *ret = ptr_array_[thr_pos().tail & Q_MASK];
    thr_pos().tail = ULONG_MAX;
    return ret;
}

细心的读者会注意到,多个线程可以扫描所有生产者或消费者线程的当前 head 或 tail 值。因此,许多线程可以找到不同的最小值并尝试同时将它们写入 last_head_ 或 last_tail_,因此我们可能会在此处使用 CAS 操作。但是,原子 CAS 非常昂贵,最坏的情况是我们将过小的值分配给 last_head_ 或 last_tail_。或者,如果我们曾经用较小的旧值覆盖新的较高值,我们将再次陷入 sched_yield()。与使用同步 CAS 操作相比,我们可能会更频繁地陷入 sched_yield(),但在实践中,额外原子操作的成本会降低性能。

此外,我使用了 __builtin_expect,其零 expect 参数表示我们不希望 while 语句中的条件过于频繁地变为真,并且编译器应在条件为假时移动在代码之后执行的内部循环代码。这样,我们可以改善指令缓存的使用。

最后,让我们运行与朴素队列相同的测试


# time ./a.out 

real    1m53.566s
user    27m55.784s
sys     2m4.461s

这比我们的朴素队列实现快 3.7 倍!

结论

如今,高性能计算通常通过两种方式实现:水平扩展(scale-out),通过添加新的计算节点,以及垂直扩展(scale-up),通过向单个节点添加额外的计算资源(如 CPU 或内存)。不幸的是,线性扩展仅在理论上是可能的。在实践中,如果您将计算资源增加一倍,则很可能只会获得 30-60% 的性能提升。锁竞争是阻止通过添加额外的 CPU 来有效扩展的关键问题之一。无锁算法使扩展更有效率,并允许您从多核环境中获得更多性能。

带有正确性测试的朴素队列和无锁队列实现的代码可在 https://github.com/krizhanovsky/NatSys-Lab/blob/master/lockfree_rb_q.cc 获取。

致谢

特别感谢 Johann George 对本文的最终审阅。

加载 Disqus 评论