Disruptor 是英国外汇交易公司LMAX开发的一款高吞吐低延迟内存队列框架,其充分考虑了底层CPU等运行模式来进行数据结构设计 (mechanical sympathy) ,能比传统队列方法延迟低三个数量级,吞吐量提高八倍。其中很多设计思想值得借鉴学习,本篇将核心介绍该队列的高性能设计方法,并对核心源码进行解读。
以下是一组官方提供的与ArrayBlockQueue对比的性能基准测试
Nehalem 2.8Ghz – Windows 7 SP1 64-bit
Sandy Bridge 2.2Ghz – Linux 2.6.38 64-bit
ABQ
Disruptor
ABQ
Disruptor
Unicast: 1P – 1C
5,339,256
25,998,336
4,057,453
22,381,378
Pipeline: 1P – 3C
2,128,918
16,806,157
2,006,903
15,857,913
Sequencer: 3P – 1C
5,539,531
13,403,268
2,056,118
14,540,519
Multicast: 1P – 3C
1,077,384
9,377,871
260,733
10,860,121
Diamond: 1P – 3C
2,113,941
16,143,613
2,082,725
15,295,197
Sequence
可以视为一个存储Long类型的序号包装类,内部提供了对序号的无锁读写能力,主要用于存储当前生产游标及消费的消费游标。RingBuffer
及每一个EventHandler
都会利用 Sequence
来维护其生产或消费的相关位置信息。
Sequencer
封装了对ringBuffer生产消费游标的访问算法,例如获取当前所有消费者消费的最小游标位置,生产者发布事件时基于这个位置来决定当前事件是否会错误覆盖未消费的事件。目前有两个实现 SingleProducerSequencer
与MultiProducerSequencer
,分别用于单线程生产及多线程生产场景。
EventHandler
事件处理逻辑的接口类,一个函数接口类,用于封装用户对事件的消费逻辑。
EventProcessor
实际上Disruptor分发事件消费并不是直接调度 EventHandler
的,通过 EventProcessor
对EventHandler
进行了一层封装。EventProcessor
实现了Runnable
接口,在run方法内会循环监听事件,并将事件传递给EventHandler
执行消费逻辑。
SequenceBarrier
EventProcessor
不直接与RingBuffer交互,通过SequenceBarrier
封装了对RingBuffer事件的监听,一个``SequenceBarrier管理多个
EventProcessor,负责将所管理的
EventProcessor`进行等待或唤醒。
传统队列内一般采用数组或者链表,生产者、消费者关注队列的头尾节点,在这种模式下生产者对事件的生产、消费者对事件的消费、队列的容量等多个关注点相互耦合,从而导致了多线程之间的资源竞争。
Disruptor将多个关注点进行分离:
队列元素的存储 (RingBuffer)
队列协调生产者声明下一个需要交换的队列元素的序号 (Sequencer)
队列协调生产者声明下一个需要交换的队列元素的序号 (SequencesBarrier)
伪共享定义:不同线程对同一缓存行的不同字段进行频繁操作,会导致相互干扰而使得缓存失效,因为要维护缓存失效等CPU一致性协议通知,且因为失效每次都需要重新到内存拉取数据不能利用cpu的写缓存,性能反而比无缓存更低。
以上图为例,Core1因为修改了同一个缓存行的X、Core2因为修改同一缓冲行Y,导致因其他线程对同一缓存行的其他字段修改而缓存失效。若解决伪共享可以通过填充方式使得字段必然不会同时出现在同一个缓存行内。
public class ShareTest {
private volatile long x;
// 下面一行填充缓存行字段是否注释将造成很大的性能差别
// 注释情况下出现伪共享耗时: 3832 ms
// 未注释情况下 x y分别属于不同缓存行耗时: 637ms
private long p1,p2,p3,p4,p5,p6,p7;
private volatile long y;
public static void main(String[] args) throws InterruptedException {
long number = 100000000;
ShareTest shareTest = new ShareTest();
CountDownLatch countDownLatch = new CountDownLatch(2);
// 线程1修改x
new Thread(() -> {
for (long i = 0; i < number; i++) {
shareTest.x++;
}
countDownLatch.countDown();
}).start();
// 线程2修改y
new Thread(() -> {
for (long i = 0; i < number; i++) {
shareTest.y++;
}
countDownLatch.countDown();
}).start();
long begin = System.currentTimeMillis();
countDownLatch.await();
// 得到修改时间
System.out.println(System.currentTimeMillis() - begin);
}
}
在传统队列中存在头结点、尾节点的引用若未进行填充隔离,很容易存在于同一个缓存行中,当生产者消费者分别修改时就产生了伪共享。在Disruptor内很多伪共享可能发生的场景都做了字段填充,这样有效的避免了伪共享带来的性能消耗:
abstract class RingBufferPad {
protected long p1, p2, p3, p4, p5, p6, p7;
}
abstract class RingBufferFields<E> extends RingBufferPad {...}
// ringBuffer 继承了RingBufferPad,由于填充字段的存在,其不会被缓存到同一个缓存行内
public final class RingBuffer<E> extends RingBufferFields<E> implements ... {...}
在传统队列中,事件的生产通常是由生产者在事件发布时刻创建并追加到列表中。当一个事件完成了消费就需要从链表或数组中去除引用,需要进行GC的回收。GC的回收必然带来额外的性能开销,最致命的是当事件处理速度低需要在队列中引用较长时间时,事件对象会进入老年代,进入老年代后会带来更高的GC消耗。
Disruptor采用预创建事件的方式,在队列启动前便将所有位置的事件完成初始化,后续每一次事件的发布都是复用原位置的事件,只是将对象的字段属性进行修改。这样就避免了GC带来的额外开销,所有事件可以长久的进行循环复用。
// Disruptor 的事件发布是从对应位置取出已经提前初始化好的对象并进行设值
long next = ringBuffer.next();
LongEvent longEvent = ringBuffer.get(next);
longEvent.setVal(value);
ringBuffer.publish(next);
在传统队列中,由于各个关注点耦合,当需要进行依赖消费时,必须提供多组队列数组结构进行相互依赖。当存在分叉时,就会产生多组队列,会产生大量的额外成本。
Disruptor因为将关注点进行分离,抽象出负责协调监听消息接收的 SequenceBarrier,使其只需要一个RingBuffer即可实现依赖消费。
SequenceBarrier负责对前置序号进行监控,前置序号可能是Sequencer的生产坐标,也可能是上游消费者的消费坐标,这样可以灵活的组成链式依赖而无需多组队列结构。
Sequencer负责协调生产者获取下一个可以写入的事件位置,所以其需要感知当前消费链路上最小的消费位置避免生产覆盖。而链式依赖的结构其会自动监听链路最后消费者的序号,并缓存其当前最小消费序号。
当有可取或可发布的事件,生产者和消费者会直接到对应坐标进行读取写入操作,相互隔离无需加锁。
Disruptor通过Sequence来维护各处存放的序号,可以针对不同互斥及可见性场景选择不同内存屏障读写方法及CAS写方法。
class LhsPadding {
// 这是伪共享的填充字段
protected long p1, p2, p3, p4, p5, p6, p7;
}
class Value extends LhsPadding {
// 这是真实的序号值,是一个volatile类型
protected volatile long value;
}
class RhsPadding extends Value {
// 这是伪共享的填充字段
protected long p9, p10, p11, p12, p13, p14, p15;
}
// Sequence 本质上是维护一个数字值,保障其高效可见的读写
// zyn 因为继承了 RhsPadding Sequence的字段实际为
// p1, p2, p3, p4, p5, p6, p7, value, p9, p10, p11, p12, p13, p14, p15
// 避免了伪共享
public class Sequence extends RhsPadding {
static final long INITIAL_VALUE = -1L;
private static final Unsafe UNSAFE;
// 反射value字段在类的 offset
// 后方法会基于反射进行赋值
private static final long VALUE_OFFSET;
// 省略非核心代码
...
// 这是一个普通读
public long get() {
return value;
}
public void set(final long value) {
// 这里不能直接用普通的方式吗?可以看到 Sequence.set 并未直接基于 volatile 的赋值
// putOrderedLong 是Store/Store barrier 比 volatile 的 Store/Load barrier 性能消耗更低
// 此处仅仅防止写的顺序重排序,并不会保障立刻可见,用于不需要其他线程立刻可见的场景
UNSAFE.putOrderedLong(this, VALUE_OFFSET, value);
}
public void setVolatile(final long value) {
// 这是采用了 Store/Load barrier
UNSAFE.putLongVolatile(this, VALUE_OFFSET, value);
}
public boolean compareAndSet(final long expectedValue, final long newValue) {
return UNSAFE.compareAndSwapLong(this, VALUE_OFFSET, expectedValue, newValue);
}
// 省略非核心代码
...
}
例如在执行消费者事件消费记录当前消费者最新消费位置时,并未采用高消耗的 setVolatile 而是 Store/Store barrier 的set方法
因为消费者更新的位置没必要让生产线程立刻可见,等待生产线程失效队列自动更新时可见即可。
// 循环执行
while (nextSequence <= availableSequence) {
event = dataProvider.get(nextSequence);
// 这里是真实调用 EventHandler 的 onEvent 方法
eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
nextSequence++;
}
// 执行完成设置新的消费序号
// 这用的是写屏障,未使用全能屏障,因为此值无必要让所有其他线程立刻可见
sequence.set(availableSequence);
public class DisruptorDemo {
// 定义一个事件
public static class LongEvent {
private long val;
public long getVal() {return val;}
public void setVal(final long val) {this.val = val;}
}
// 一个简单的事件处理器
public static class LongEventHandler implements EventHandler<LongEvent> {
private String handlerName;
private Integer module;
public LongEventHandler(final String handlerName, final Integer module) {
this.handlerName = handlerName;
this.module = module;
}
@Override
public void onEvent(final LongEvent event, final long sequence, final boolean endOfBatch)
throws Exception {
if (event.getVal() % module == 0) {
System.out.println(handlerName + ": " + event.getVal());
}
}
}
public static void main(String[] args) throws InterruptedException {
// 定义 ringBuffer 大小
final int bufferSize = 1 << 5;
Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize,
DaemonThreadFactory.INSTANCE, ProducerType.SINGLE, new BlockingWaitStrategy());
// dsl模式的声明事件处理器及其关系
// 这里代表有两个前置处理器 Handler5 & Handler2
// 当一个消息被 Handler5 & Handler2 都消费完成才会到 Handler3 消费
LongEventHandler handler5 = new LongEventHandler("Handler 5", 5);
LongEventHandler handler2 = new LongEventHandler("Handler 2", 2);
LongEventHandler handler3 = new LongEventHandler("Handler 3", 3);
disruptor.handleEventsWith(handler5, handler2).then(handler3);
// 启动 disruptor 会将ringBuffer返回
// 生产者基于 ringBuffer 进行事件发布
RingBuffer<LongEvent> ringBuffer = disruptor.start();
// 发布 1000 个事件
for (int i = 0; i < 1000; i++) {
long next = ringBuffer.next();
LongEvent longEvent = ringBuffer.get(next);
longEvent.setVal(i);
ringBuffer.publish(next);
}
Thread.sleep(10000L);
}
}
/**
* @param eventFactory 用于预填充事件对象到ringBuffer
* @param ringBufferSize ringBuffer大小必须是2的幂,可以快速位运算取模
* @param threadFactory 提供给消费者线程的线程工厂
* @param producerType 生产者类型
* @param waitStrategy 等待策略
*/
public Disruptor(
final EventFactory<T> eventFactory,
final int ringBufferSize,
final ThreadFactory threadFactory,
final ProducerType producerType,
final WaitStrategy waitStrategy) {
this(
RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),
new BasicExecutor(threadFactory));
}
producerType
接下来的源码解析部分会分别针对 SingleProducerSequencer , MultiProducerSequencer 两种Sequencer模式做说明
// 代表单生产者模式会使用 SingleProducerSequencer 作为 Sequencer
ProducerType.SINGLE
// 代表多生产者模式会用 MultiProducerSequencer 作为 Sequencer
ProducerType.MULTI
waitStrategy
消费者线程对事件的等待策略,主要用以下两种
// 阻塞等待,CPU消耗小,适合对延迟要求不高
BlockingWaitStrategy
// 自旋,CPU消耗大,低延迟场景
BusySpinWaitStrategy
可以参考我加注释说明的 Disruptor源码
手机扫一扫
移动阅读更方便
你可能感兴趣的文章