JAVA线程Disruptor核心链路应用(八)
阅读原文时间:2023年07月11日阅读:1

import java.util.concurrent.atomic.AtomicInteger;

/**
* Disruptor中的 Event
* @author Alienware
*
*/
public class Trade {

private String id;  
private String name;  
private double price;  
//在并发时实现线程安全  
private AtomicInteger count = new AtomicInteger(0);

public Trade() {  
}  
public String getId() {  
    return id;  
}  
public void setId(String id) {  
    this.id = id;  
}  
public String getName() {  
    return name;  
}  
public void setName(String name) {  
    this.name = name;  
}  
public double getPrice() {  
    return price;  
}  
public void setPrice(double price) {  
    this.price = price;  
}  
public AtomicInteger getCount() {  
    return count;  
}  
public void setCount(AtomicInteger count) {  
    this.count = count;  
}  

}

import java.util.Random;
import java.util.concurrent.CountDownLatch;

import com.lmax.disruptor.EventTranslator;
import com.lmax.disruptor.dsl.Disruptor;

public class TradePushlisher implements Runnable {

private Disruptor<Trade> disruptor;  
private CountDownLatch latch;

private static int PUBLISH\_COUNT = 1;

public TradePushlisher(CountDownLatch latch, Disruptor<Trade> disruptor) {  
    this.disruptor = disruptor;  
    this.latch = latch;  
}

public void run() {  
    TradeEventTranslator eventTranslator = new TradeEventTranslator();  
    for(int i =0; i < PUBLISH\_COUNT; i ++){  
        //新的提交任务的方式  
        disruptor.publishEvent(eventTranslator);  
    }  
    latch.countDown();  
}  

}

class TradeEventTranslator implements EventTranslator {

private Random random = new Random();

public void translateTo(Trade event, long sequence) {  
    this.generateTrade(event);  
}

private void generateTrade(Trade event) {  
    event.setPrice(random.nextDouble() \* 9999);  
}  

}

import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WorkHandler;

public class Handler1 implements EventHandler, WorkHandler{

//EventHandler  
public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {  
    this.onEvent(event);  
}

//WorkHandler  
public void onEvent(Trade event) throws Exception {  
    System.err.println("handler 1 : SET NAME");  
    Thread.sleep(1000);  
    event.setName("H1");  
}  

}

import java.util.UUID;
import com.lmax.disruptor.EventHandler;

public class Handler2 implements EventHandler {

public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {  
    System.err.println("handler 2 : SET ID");  
    Thread.sleep(2000);  
    event.setId(UUID.randomUUID().toString());  
}  

}

import com.lmax.disruptor.EventHandler;

public class Handler3 implements EventHandler {

public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {  
    System.err.println("handler 3 : NAME: "  
                            + event.getName()  
                            + ", ID: "  
                            + event.getId()  
                            + ", PRICE: "  
                            + event.getPrice()  
                            + " INSTANCE : " + event.toString());  
}  

}

import com.lmax.disruptor.EventHandler;

public class Handler4 implements EventHandler {

public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {  
    System.err.println("handler 4 : SET PRICE");  
    Thread.sleep(1000);  
    event.setPrice(17.0);  
}  

}

import com.lmax.disruptor.EventHandler;

public class Handler5 implements EventHandler {
public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
System.err.println("handler 5 : GET PRICE: " + event.getPrice());
Thread.sleep(1000);
event.setPrice(event.getPrice() + 3.0);
}
}

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.lmax.disruptor.BusySpinWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.EventHandlerGroup;
import com.lmax.disruptor.dsl.ProducerType;

public class Main {
@SuppressWarnings("unchecked")
public static void main(String[] args) throws Exception {

    //构建一个线程池用于提交任务  
    ExecutorService es1 = Executors.newFixedThreadPool(1);

    ExecutorService es2 = Executors.newFixedThreadPool(5);  
    //1 构建Disruptor  
    Disruptor<Trade> disruptor = new Disruptor<Trade>(  
            new EventFactory<Trade>() {  
                public Trade newInstance() {  
                    return new Trade();  
                }  
            },  
            1024\*1024,  
            es2,  
            ProducerType.SINGLE,  
            new BusySpinWaitStrategy());

    //2 把消费者设置到Disruptor中 handleEventsWith

    //2.1 串行操作:

    disruptor  
    .handleEventsWith(new Handler1())  
    .handleEventsWith(new Handler2())  
    .handleEventsWith(new Handler3());

    //2.2 并行操作: 可以有两种方式去进行  
    //1 handleEventsWith方法 添加多个handler实现即可  
    //2 handleEventsWith方法 分别进行调用

    disruptor.handleEventsWith(new Handler1(), new Handler2(), new Handler3());  
    disruptor.handleEventsWith(new Handler2());  
    disruptor.handleEventsWith(new Handler3());

    //2.3 菱形操作 (一)

    disruptor.handleEventsWith(new Handler1(), new Handler2())  
    .handleEventsWith(new Handler3());

    //2.3 菱形操作 (二)

    EventHandlerGroup<Trade> ehGroup = disruptor.handleEventsWith(new Handler1(), new Handler2());  
    ehGroup.then(new Handler3());

    //2.4 六边形操作  
    Handler1 h1 = new Handler1();  
    Handler2 h2 = new Handler2();  
    Handler3 h3 = new Handler3();  
    Handler4 h4 = new Handler4();  
    Handler5 h5 = new Handler5();  
    disruptor.handleEventsWith(h1, h4);  
    disruptor.after(h1).handleEventsWith(h2);  
    disruptor.after(h4).handleEventsWith(h5);  
    disruptor.after(h2, h5).handleEventsWith(h3);

    //3 启动disruptor  
    RingBuffer<Trade> ringBuffer = disruptor.start();

    //设置唤醒的次数  
    CountDownLatch latch = new CountDownLatch(1);

    long begin = System.currentTimeMillis();

    es1.submit(new TradePushlisher(latch, disruptor));

    latch.await();  //进行向下

    disruptor.shutdown();  
    es1.shutdown();  
    es2.shutdown();  
    System.err.println("总耗时: " + (System.currentTimeMillis() - begin));

}  

}

  多生产者模型:

多消费者:

import java.util.concurrent.atomic.AtomicInteger;

public class Order {

private String id;  
private String name;  
private double price;

public Order() {  
}  
public String getId() {  
    return id;  
}  
public void setId(String id) {  
    this.id = id;  
}  
public String getName() {  
    return name;  
}  
public void setName(String name) {  
    this.name = name;  
}  
public double getPrice() {  
    return price;  
}  
public void setPrice(double price) {  
    this.price = price;  
}  

}

import com.lmax.disruptor.RingBuffer;

public class Producer {
private RingBuffer ringBuffer;

public Producer(RingBuffer<Order> ringBuffer) {  
    this.ringBuffer = ringBuffer;  
}

public void sendData(String uuid) {  
    long sequence = ringBuffer.next();  
    try {  
        Order order = ringBuffer.get(sequence);  
        order.setId(uuid);  
    } finally {  
        ringBuffer.publish(sequence);  
    }  
}  

}

import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;

import com.lmax.disruptor.WorkHandler;

//消费者实现多消费的接口
public class Consumer implements WorkHandler {

private String comsumerId;

private static AtomicInteger count = new AtomicInteger(0);

private Random random = new Random();

public Consumer(String comsumerId) {  
    this.comsumerId = comsumerId;  
}

public void onEvent(Order event) throws Exception {  
    Thread.sleep(1 \* random.nextInt(5));  
    System.err.println("当前消费者: " + this.comsumerId + ", 消费信息ID: " + event.getId());  
    count.incrementAndGet();  
}

public int getCount(){  
    return count.get();  
}  

}

import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;

import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.WorkerPool;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.ProducerType;

public class Main {

    //1 创建RingBuffer  
    RingBuffer<Order> ringBuffer =  
            RingBuffer.create(ProducerType.MULTI,  
                    new EventFactory<Order>() {  
                        public Order newInstance() {  
                            return new Order();  
                        }  
                    },  
                    1024\*1024,  
                    new YieldingWaitStrategy());

    //2 通过ringBuffer 创建一个屏障  
    SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();

    //3 创建多个消费者数组:  
    Consumer\[\] consumers = new Consumer\[10\];  
    for(int i = 0; i < consumers.length; i++) {  
        consumers\[i\] = new Consumer("C" + i);  
    }

    //4 构建多消费者工作池  
    WorkerPool<Order> workerPool = new WorkerPool<Order>(  
            ringBuffer,  
            sequenceBarrier,  
            new EventExceptionHandler(), //在失败的时候怎么处理  
            consumers);

    //5 设置多个消费者的sequence序号 用于单独统计消费进度, 并且设置到ringbuffer中  
    ringBuffer.addGatingSequences(workerPool.getWorkerSequences());

    //6 启动workerPool  
    workerPool  
    .start(Executors.newFixedThreadPool(5));

    final CountDownLatch latch = new CountDownLatch(1);

    for(int i = 0; i < 100; i++) {  
        final Producer producer = new Producer(ringBuffer);  
        new Thread(new Runnable() {  
            public void run() {  
                try {  
                    latch.await();  
                } catch (Exception e) {  
                    e.printStackTrace();  
                }  
                for(int j = 0; j<100; j++) {  
                    producer.sendData(UUID.randomUUID().toString());  
                }  
            }  
        }).start();  
    }

    Thread.sleep(2000);  
    System.err.println("----------线程创建完毕,开始生产数据----------");  
    latch.countDown();

    Thread.sleep(10000);

    System.err.println("任务总数:" + consumers\[2\].getCount());  
}

static class EventExceptionHandler implements ExceptionHandler<Order> {  
    @Override  
    public void handleEventException(Throwable ex, long sequence, Order event) {  
    }  
    @Override  
    public void handleOnStartException(Throwable ex) {  
    }  
    @Override  
    public void handleOnShutdownException(Throwable ex) {  
    }  
}  

}

  

读读共享、读写互斥、写写互斥

LockSupport :

Thread A = new Thread(new Runnable() {

        @Override  
        public void run() {  
            int sum = 0;  
            for(int i =0; i < 10; i ++){  
                sum += i;  
            }  
            try {  
                Thread.sleep(4000);  
            } catch (InterruptedException e) {  
                e.printStackTrace();  
            }  
            LockSupport.park(); //后执行  
            System.err.println("sum: " + sum);  
        }  
    });

    A.start();

    Thread.sleep(1000);

    LockSupport.unpark(A);  //先执行

    Executors.newCachedThreadPool();  
    Executors.newFixedThreadPool(10);

    ThreadPoolExecutor pool = new ThreadPoolExecutor(5,  
            Runtime.getRuntime().availableProcessors() \* 2,  
            60,  
            TimeUnit.SECONDS,  
            new ArrayBlockingQueue<>(200),  
            new ThreadFactory() {  
                @Override  
                public Thread newThread(Runnable r) {  
                    Thread t = new Thread(r);  
                    t.setName("order-thread");  
                    if(t.isDaemon()) {  
                        t.setDaemon(false);  
                    }  
                    if(Thread.NORM\_PRIORITY != t.getPriority()) {  
                        t.setPriority(Thread.NORM\_PRIORITY);  
                    }  
                    return t;  
                }  
            },  
            new RejectedExecutionHandler() {  
                @Override  
                public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {  
                    System.err.println("拒绝策略:" + r);  
                }  
            });

    ReentrantLock reentrantLock = new ReentrantLock(true);

  线程池:

ThreadPoolExecutor pool=new ThreadPoolExecutor(
5,
Runtime.getRuntime().availableProcessors()*2,
60,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(200),
new ThreadFactory() {

            @Override  
            public Thread newThread(Runnable r) {  
                Thread thread=new Thread(r);  
                thread.setName("order-thread");  
                if(thread.isDaemon()){  
                    thread.setDaemon(false);  
                }  
                if(Thread.NORM\_PRIORITY!=thread.getPriority()){  
                    thread.setPriority(Thread.NORM\_PRIORITY);  
                }  
                return thread;  
            }  
        },  
        new RejectedExecutionHandler() {

            @Override  
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {  
                System.err.println("拒绝策略:"+r);  
            }  
        });

  AQS架构:

手机扫一扫

移动阅读更方便

阿里云服务器
腾讯云服务器
七牛云服务器

你可能感兴趣的文章