Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题。与Kafka、RabbitMQ用于服务间的消息队列不同,disruptor一般用于线程间消息的传递。基于Disruptor开发的系统单线程能支撑每秒600万订单。
disruptor是用于一个JVM中多个线程之间的消息队列,作用与ArrayBlockingQueue有相似之处,但是disruptor从功能、性能都远好于ArrayBlockingQueue,当多个线程之间传递大量数据或对性能要求较高时,可以考虑使用disruptor作为ArrayBlockingQueue的替代者。
官方也对disruptor和ArrayBlockingQueue的性能在不同的应用场景下做了对比,目测性能只有有5~10倍左右的提升。
public static RingBuffer<LongEvent> ringBuffer=null;
static {
//创建工厂
LongEventFactory factory = new LongEventFactory();
//设置buff数量,要求一定为2的n次方
int bufferSize=1024;
//构造Disruptor容器 第三个参数是线程构造方式 第四个参数是线程环境(多线程还是单线程) 第五个是等待策略
//等待策略(常用,其一共8种):
//BlockingWaitStrategy:通过线程堵塞的方式,等待生产者唤醒,被唤醒后,再循环检查依赖的sequence是否已经消费;
//BusySpinWaitStrategy:线程一直自旋等待,可能比较耗cpu;
Disruptor disruptor = new Disruptor(factory, bufferSize, Executors.defaultThreadFactory(),ProducerType.MULTI,new BlockingWaitStrategy());
//装载消费者
LongEventHandler longEventHandler = new LongEventHandler();
disruptor.handleEventsWith(longEventHandler);
//获取buff
ringBuffer = disruptor.getRingBuffer();
//disruptor的异常处理,异常一定要处理,否则一个线程出现异常后,其后续将都不会执行
disruptor.handleExceptionsFor(longEventHandler).with(new ExceptionHandler<LongEvent>(){
//当消费产生异常的时候处理方法
@Override
public void handleEventException(Throwable throwable,long l,LongEvent longEvent){
System.out.println("-----消费产生的异常----");
throwable.printStackTrace();
}
//启动的时候出现异常的处理方法
@Override
public void handleOnStartException(Throwable throwable){
System.out.println("Exception Start to Handle!");
}
//关闭的时候出现异常的处理方法
@Override
public void handleOnShutdownException(Throwable throwable){
System.out.println("Exception End to Handle!");
}
});
//一定要最后 开启Disruptor
disruptor.start();
}
public void producer(){
while (true){
try {
TimeUnit.MILLISECONDS.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//获取下一个序列号
long next = ringBuffer.next();
try {
//获取事件
LongEvent longEvent = ringBuffer.get(next);
//设置值
longEvent.set(new Random().nextInt(500));
} finally {
//最后,把序列号刷回buff中
ringBuffer.publish(next);
}
}
}
手机扫一扫
移动阅读更方便
你可能感兴趣的文章