高性能环形队列框架 Disruptor 核心概念
阅读原文时间:2021年09月27日阅读:2

高性能环形队列框架 Disruptor

Disruptor 是英国外汇交易公司LMAX开发的一款高吞吐低延迟内存队列框架,其充分考虑了底层CPU等运行模式来进行数据结构设计 (mechanical sympathy) ,能比传统队列方法延迟低三个数量级,吞吐量提高八倍。其中很多设计思想值得借鉴学习,本篇将核心介绍该队列的高性能设计方法,并对核心源码进行解读。

以下是一组官方提供的与ArrayBlockQueue对比的性能基准测试

Nehalem 2.8Ghz – Windows 7 SP1 64-bit

Sandy Bridge 2.2Ghz – Linux 2.6.38 64-bit

ABQ

Disruptor

ABQ

Disruptor

Unicast: 1P – 1C

5,339,256

25,998,336

4,057,453

22,381,378

Pipeline: 1P – 3C

2,128,918

16,806,157

2,006,903

15,857,913

Sequencer: 3P – 1C

5,539,531

13,403,268

2,056,118

14,540,519

Multicast: 1P – 3C

1,077,384

9,377,871

260,733

10,860,121

Diamond: 1P – 3C

2,113,941

16,143,613

2,082,725

15,295,197

Sequence

可以视为一个存储Long类型的序号包装类,内部提供了对序号的无锁读写能力,主要用于存储当前生产游标及消费的消费游标。RingBuffer及每一个EventHandler都会利用 Sequence来维护其生产或消费的相关位置信息。

Sequencer

封装了对ringBuffer生产消费游标的访问算法,例如获取当前所有消费者消费的最小游标位置,生产者发布事件时基于这个位置来决定当前事件是否会错误覆盖未消费的事件。目前有两个实现 SingleProducerSequencerMultiProducerSequencer,分别用于单线程生产及多线程生产场景。

EventHandler

事件处理逻辑的接口类,一个函数接口类,用于封装用户对事件的消费逻辑。

EventProcessor

实际上Disruptor分发事件消费并不是直接调度 EventHandler 的,通过 EventProcessorEventHandler进行了一层封装。EventProcessor 实现了Runnable接口,在run方法内会循环监听事件,并将事件传递给EventHandler执行消费逻辑。

SequenceBarrier

EventProcessor 不直接与RingBuffer交互,通过SequenceBarrier封装了对RingBuffer事件的监听,一个``SequenceBarrier管理多个EventProcessor,负责将所管理的EventProcessor`进行等待或唤醒。

环形的无锁设计

传统队列内一般采用数组或者链表,生产者、消费者关注队列的头尾节点,在这种模式下生产者对事件的生产、消费者对事件的消费、队列的容量等多个关注点相互耦合,从而导致了多线程之间的资源竞争。

Disruptor将多个关注点进行分离:

  • 队列元素的存储 (RingBuffer)

    • 定义一个环形的数据结构,空间大小必须是2的指数,可以直接用位运算取模得到对应的位置
    • 实际也是一个数组来实现,通过坐标的首尾移动达到环形的效果
  • 队列协调生产者声明下一个需要交换的队列元素的序号 (Sequencer)

    • Sequencer缓存了消费者消费的最小事件位置,只要生产者申请的空间不会覆盖到此位置之后即可允许发布
  • 队列协调生产者声明下一个需要交换的队列元素的序号 (SequencesBarrier)

    • 当无事件可以消费时,调用不同的等待策略进行事件的监听

避免伪共享

伪共享定义:不同线程对同一缓存行的不同字段进行频繁操作,会导致相互干扰而使得缓存失效,因为要维护缓存失效等CPU一致性协议通知,且因为失效每次都需要重新到内存拉取数据不能利用cpu的写缓存,性能反而比无缓存更低。

以上图为例,Core1因为修改了同一个缓存行的X、Core2因为修改同一缓冲行Y,导致因其他线程对同一缓存行的其他字段修改而缓存失效。若解决伪共享可以通过填充方式使得字段必然不会同时出现在同一个缓存行内。

public class ShareTest {

    private volatile long x;
    // 下面一行填充缓存行字段是否注释将造成很大的性能差别
    // 注释情况下出现伪共享耗时: 3832 ms
    // 未注释情况下 x y分别属于不同缓存行耗时: 637ms
    private long p1,p2,p3,p4,p5,p6,p7;
    private volatile long y;

    public static void main(String[] args) throws InterruptedException {
        long number = 100000000;
        ShareTest shareTest = new ShareTest();

        CountDownLatch countDownLatch = new CountDownLatch(2);
        //  线程1修改x
        new Thread(() -> {
            for (long i = 0; i < number; i++) {
                shareTest.x++;
            }
            countDownLatch.countDown();
        }).start();

        // 线程2修改y
        new Thread(() -> {
            for (long i = 0; i < number; i++) {
                shareTest.y++;
            }
            countDownLatch.countDown();
        }).start();

        long begin = System.currentTimeMillis();
        countDownLatch.await();

        // 得到修改时间
        System.out.println(System.currentTimeMillis() - begin);
    }
}

在传统队列中存在头结点、尾节点的引用若未进行填充隔离,很容易存在于同一个缓存行中,当生产者消费者分别修改时就产生了伪共享。在Disruptor内很多伪共享可能发生的场景都做了字段填充,这样有效的避免了伪共享带来的性能消耗:

abstract class RingBufferPad {
    protected long p1, p2, p3, p4, p5, p6, p7;
}

abstract class RingBufferFields<E> extends RingBufferPad {...}
// ringBuffer 继承了RingBufferPad,由于填充字段的存在,其不会被缓存到同一个缓存行内
public final class RingBuffer<E> extends RingBufferFields<E> implements ... {...}

事件分配避免GC

在传统队列中,事件的生产通常是由生产者在事件发布时刻创建并追加到列表中。当一个事件完成了消费就需要从链表或数组中去除引用,需要进行GC的回收。GC的回收必然带来额外的性能开销,最致命的是当事件处理速度低需要在队列中引用较长时间时,事件对象会进入老年代,进入老年代后会带来更高的GC消耗。

Disruptor采用预创建事件的方式,在队列启动前便将所有位置的事件完成初始化,后续每一次事件的发布都是复用原位置的事件,只是将对象的字段属性进行修改。这样就避免了GC带来的额外开销,所有事件可以长久的进行循环复用。

// Disruptor 的事件发布是从对应位置取出已经提前初始化好的对象并进行设值
long next = ringBuffer.next();
LongEvent longEvent = ringBuffer.get(next);
longEvent.setVal(value);
ringBuffer.publish(next);

依赖图解决

在传统队列中,由于各个关注点耦合,当需要进行依赖消费时,必须提供多组队列数组结构进行相互依赖。当存在分叉时,就会产生多组队列,会产生大量的额外成本。

Disruptor因为将关注点进行分离,抽象出负责协调监听消息接收的 SequenceBarrier,使其只需要一个RingBuffer即可实现依赖消费。

SequenceBarrier负责对前置序号进行监控,前置序号可能是Sequencer的生产坐标,也可能是上游消费者的消费坐标,这样可以灵活的组成链式依赖而无需多组队列结构。

Sequencer负责协调生产者获取下一个可以写入的事件位置,所以其需要感知当前消费链路上最小的消费位置避免生产覆盖。而链式依赖的结构其会自动监听链路最后消费者的序号,并缓存其当前最小消费序号。

当有可取或可发布的事件,生产者和消费者会直接到对应坐标进行读取写入操作,相互隔离无需加锁。

内存屏障的使用

Disruptor通过Sequence来维护各处存放的序号,可以针对不同互斥及可见性场景选择不同内存屏障读写方法及CAS写方法。

class LhsPadding {
    // 这是伪共享的填充字段
    protected long p1, p2, p3, p4, p5, p6, p7;
}

class Value extends LhsPadding {
    // 这是真实的序号值,是一个volatile类型
    protected volatile long value;
}

class RhsPadding extends Value {
    // 这是伪共享的填充字段
    protected long p9, p10, p11, p12, p13, p14, p15;
}

// Sequence 本质上是维护一个数字值,保障其高效可见的读写
// zyn 因为继承了 RhsPadding Sequence的字段实际为
// p1, p2, p3, p4, p5, p6, p7, value, p9, p10, p11, p12, p13, p14, p15
// 避免了伪共享
public class Sequence extends RhsPadding {
    static final long INITIAL_VALUE = -1L;
    private static final Unsafe UNSAFE;
    // 反射value字段在类的 offset
    // 后方法会基于反射进行赋值
    private static final long VALUE_OFFSET;

    // 省略非核心代码
    ...

    // 这是一个普通读
    public long get() {
        return value;
    }

    public void set(final long value) {
        // 这里不能直接用普通的方式吗?可以看到 Sequence.set 并未直接基于 volatile 的赋值
        // putOrderedLong 是Store/Store barrier 比 volatile 的 Store/Load barrier 性能消耗更低
        // 此处仅仅防止写的顺序重排序,并不会保障立刻可见,用于不需要其他线程立刻可见的场景
        UNSAFE.putOrderedLong(this, VALUE_OFFSET, value);
    }

    public void setVolatile(final long value) {
        // 这是采用了 Store/Load barrier
        UNSAFE.putLongVolatile(this, VALUE_OFFSET, value);
    }

    public boolean compareAndSet(final long expectedValue, final long newValue) {
        return UNSAFE.compareAndSwapLong(this, VALUE_OFFSET, expectedValue, newValue);
    }

    // 省略非核心代码
    ...
}

例如在执行消费者事件消费记录当前消费者最新消费位置时,并未采用高消耗的 setVolatile 而是 Store/Store barrier 的set方法

因为消费者更新的位置没必要让生产线程立刻可见,等待生产线程失效队列自动更新时可见即可。

// 循环执行
while (nextSequence <= availableSequence) {
    event = dataProvider.get(nextSequence);
    // 这里是真实调用 EventHandler 的 onEvent 方法
    eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
    nextSequence++;
}

// 执行完成设置新的消费序号
// 这用的是写屏障,未使用全能屏障,因为此值无必要让所有其他线程立刻可见
sequence.set(availableSequence);


public class DisruptorDemo {

    // 定义一个事件
    public static class LongEvent {
        private long val;
        public long getVal() {return val;}
        public void setVal(final long val) {this.val = val;}
    }

    // 一个简单的事件处理器
    public static class LongEventHandler implements EventHandler<LongEvent> {
        private String handlerName;
        private Integer module;

        public LongEventHandler(final String handlerName, final Integer module) {
            this.handlerName = handlerName;
            this.module = module;
        }

        @Override
        public void onEvent(final LongEvent event, final long sequence, final boolean endOfBatch)
                                                                                throws Exception {
            if (event.getVal() % module == 0) {
                System.out.println(handlerName + ": " + event.getVal());
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        // 定义 ringBuffer 大小
        final int bufferSize = 1 << 5;
        Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize,
                DaemonThreadFactory.INSTANCE, ProducerType.SINGLE, new BlockingWaitStrategy());

        // dsl模式的声明事件处理器及其关系
        // 这里代表有两个前置处理器 Handler5 & Handler2
        // 当一个消息被 Handler5 & Handler2 都消费完成才会到 Handler3 消费
        LongEventHandler handler5 = new LongEventHandler("Handler 5", 5);
        LongEventHandler handler2 = new LongEventHandler("Handler 2", 2);
        LongEventHandler handler3 = new LongEventHandler("Handler 3", 3);
        disruptor.handleEventsWith(handler5, handler2).then(handler3);

        // 启动 disruptor 会将ringBuffer返回
        // 生产者基于 ringBuffer 进行事件发布
        RingBuffer<LongEvent> ringBuffer = disruptor.start();

        // 发布 1000 个事件
        for (int i = 0; i < 1000; i++) {
            long next = ringBuffer.next();
            LongEvent longEvent = ringBuffer.get(next);
            longEvent.setVal(i);
            ringBuffer.publish(next);
        }

        Thread.sleep(10000L);
    }
}

对Disruptor构造参数的说明

    /**
     * @param eventFactory   用于预填充事件对象到ringBuffer
     * @param ringBufferSize ringBuffer大小必须是2的幂,可以快速位运算取模
     * @param threadFactory  提供给消费者线程的线程工厂
     * @param producerType   生产者类型
     * @param waitStrategy   等待策略
     */
    public Disruptor(
            final EventFactory<T> eventFactory,
            final int ringBufferSize,
            final ThreadFactory threadFactory,
            final ProducerType producerType,
            final WaitStrategy waitStrategy) {
        this(
                RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),
                new BasicExecutor(threadFactory));
    }

producerType

接下来的源码解析部分会分别针对 SingleProducerSequencer , MultiProducerSequencer 两种Sequencer模式做说明

// 代表单生产者模式会使用 SingleProducerSequencer 作为 Sequencer
ProducerType.SINGLE
// 代表多生产者模式会用 MultiProducerSequencer 作为 Sequencer
ProducerType.MULTI

waitStrategy

消费者线程对事件的等待策略,主要用以下两种

// 阻塞等待,CPU消耗小,适合对延迟要求不高
BlockingWaitStrategy
// 自旋,CPU消耗大,低延迟场景
BusySpinWaitStrategy

可以参考我加注释说明的 Disruptor源码

分析源码时梳理的逻辑结构图

LMAX Disruptor 论文

聊聊缓存一致性协议

聊一聊内存屏障

手机扫一扫

移动阅读更方便

阿里云服务器
腾讯云服务器
七牛云服务器