Disruptor-高性能队列
阅读原文时间:2023年07月10日阅读:1

简介

Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题。与Kafka、RabbitMQ用于服务间的消息队列不同,disruptor一般用于线程间消息的传递。基于Disruptor开发的系统单线程能支撑每秒600万订单。

disruptor是用于一个JVM中多个线程之间的消息队列,作用与ArrayBlockingQueue有相似之处,但是disruptor从功能、性能都远好于ArrayBlockingQueue,当多个线程之间传递大量数据或对性能要求较高时,可以考虑使用disruptor作为ArrayBlockingQueue的替代者。

 官方也对disruptor和ArrayBlockingQueue的性能在不同的应用场景下做了对比,目测性能只有有5~10倍左右的提升。

代码

public static RingBuffer<LongEvent> ringBuffer=null;
    static {
        //创建工厂
        LongEventFactory factory = new LongEventFactory();

        //设置buff数量,要求一定为2的n次方
        int bufferSize=1024;
        //构造Disruptor容器 第三个参数是线程构造方式  第四个参数是线程环境(多线程还是单线程) 第五个是等待策略
        //等待策略(常用,其一共8种):
        //BlockingWaitStrategy:通过线程堵塞的方式,等待生产者唤醒,被唤醒后,再循环检查依赖的sequence是否已经消费;
        //BusySpinWaitStrategy:线程一直自旋等待,可能比较耗cpu;
        Disruptor disruptor = new Disruptor(factory, bufferSize, Executors.defaultThreadFactory(),ProducerType.MULTI,new BlockingWaitStrategy());
        //装载消费者
        LongEventHandler longEventHandler = new LongEventHandler();
        disruptor.handleEventsWith(longEventHandler);

        //获取buff
        ringBuffer = disruptor.getRingBuffer();

        //disruptor的异常处理,异常一定要处理,否则一个线程出现异常后,其后续将都不会执行
        disruptor.handleExceptionsFor(longEventHandler).with(new ExceptionHandler<LongEvent>(){
            //当消费产生异常的时候处理方法
            @Override
            public void handleEventException(Throwable throwable,long l,LongEvent longEvent){
                System.out.println("-----消费产生的异常----");
                throwable.printStackTrace();
            }

            //启动的时候出现异常的处理方法
            @Override
            public void handleOnStartException(Throwable throwable){
                System.out.println("Exception Start to Handle!");
            }

            //关闭的时候出现异常的处理方法
            @Override
            public void handleOnShutdownException(Throwable throwable){
                System.out.println("Exception End to Handle!");
            }
        });

        //一定要最后  开启Disruptor
        disruptor.start();
    }

    public void producer(){
       while (true){
           try {
               TimeUnit.MILLISECONDS.sleep(5000);
           } catch (InterruptedException e) {
               e.printStackTrace();
           }
           //获取下一个序列号
           long next = ringBuffer.next();
           try {
               //获取事件
               LongEvent longEvent = ringBuffer.get(next);
               //设置值
               longEvent.set(new Random().nextInt(500));
           } finally {
               //最后,把序列号刷回buff中
               ringBuffer.publish(next);
           }
       }
    }

Gitee地址

https://gitee.com/zhuayng/foundation-study/blob/develop/JavaBasis/Disruptor/src/main/java/com/yxkj/disruptor/service/DisruptorDemo.java

参考

https://www.jianshu.com/p/bad7b4b44e48

手机扫一扫

移动阅读更方便

阿里云服务器
腾讯云服务器
七牛云服务器