import java.util.concurrent.atomic.AtomicInteger;
/**
* Disruptor中的 Event
* @author Alienware
*
*/
public class Trade {
private String id;
private String name;
private double price;
//在并发时实现线程安全
private AtomicInteger count = new AtomicInteger(0);
public Trade() {
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public double getPrice() {
return price;
}
public void setPrice(double price) {
this.price = price;
}
public AtomicInteger getCount() {
return count;
}
public void setCount(AtomicInteger count) {
this.count = count;
}
}
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import com.lmax.disruptor.EventTranslator;
import com.lmax.disruptor.dsl.Disruptor;
public class TradePushlisher implements Runnable {
private Disruptor<Trade> disruptor;
private CountDownLatch latch;
private static int PUBLISH\_COUNT = 1;
public TradePushlisher(CountDownLatch latch, Disruptor<Trade> disruptor) {
this.disruptor = disruptor;
this.latch = latch;
}
public void run() {
TradeEventTranslator eventTranslator = new TradeEventTranslator();
for(int i =0; i < PUBLISH\_COUNT; i ++){
//新的提交任务的方式
disruptor.publishEvent(eventTranslator);
}
latch.countDown();
}
}
class TradeEventTranslator implements EventTranslator
private Random random = new Random();
public void translateTo(Trade event, long sequence) {
this.generateTrade(event);
}
private void generateTrade(Trade event) {
event.setPrice(random.nextDouble() \* 9999);
}
}
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WorkHandler;
public class Handler1 implements EventHandler
//EventHandler
public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
this.onEvent(event);
}
//WorkHandler
public void onEvent(Trade event) throws Exception {
System.err.println("handler 1 : SET NAME");
Thread.sleep(1000);
event.setName("H1");
}
}
import java.util.UUID;
import com.lmax.disruptor.EventHandler;
public class Handler2 implements EventHandler
public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
System.err.println("handler 2 : SET ID");
Thread.sleep(2000);
event.setId(UUID.randomUUID().toString());
}
}
import com.lmax.disruptor.EventHandler;
public class Handler3 implements EventHandler
public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
System.err.println("handler 3 : NAME: "
+ event.getName()
+ ", ID: "
+ event.getId()
+ ", PRICE: "
+ event.getPrice()
+ " INSTANCE : " + event.toString());
}
}
import com.lmax.disruptor.EventHandler;
public class Handler4 implements EventHandler
public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
System.err.println("handler 4 : SET PRICE");
Thread.sleep(1000);
event.setPrice(17.0);
}
}
import com.lmax.disruptor.EventHandler;
public class Handler5 implements EventHandler
public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
System.err.println("handler 5 : GET PRICE: " + event.getPrice());
Thread.sleep(1000);
event.setPrice(event.getPrice() + 3.0);
}
}
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.lmax.disruptor.BusySpinWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.EventHandlerGroup;
import com.lmax.disruptor.dsl.ProducerType;
public class Main {
@SuppressWarnings("unchecked")
public static void main(String[] args) throws Exception {
//构建一个线程池用于提交任务
ExecutorService es1 = Executors.newFixedThreadPool(1);
ExecutorService es2 = Executors.newFixedThreadPool(5);
//1 构建Disruptor
Disruptor<Trade> disruptor = new Disruptor<Trade>(
new EventFactory<Trade>() {
public Trade newInstance() {
return new Trade();
}
},
1024\*1024,
es2,
ProducerType.SINGLE,
new BusySpinWaitStrategy());
//2 把消费者设置到Disruptor中 handleEventsWith
//2.1 串行操作:
disruptor
.handleEventsWith(new Handler1())
.handleEventsWith(new Handler2())
.handleEventsWith(new Handler3());
//2.2 并行操作: 可以有两种方式去进行
//1 handleEventsWith方法 添加多个handler实现即可
//2 handleEventsWith方法 分别进行调用
disruptor.handleEventsWith(new Handler1(), new Handler2(), new Handler3());
disruptor.handleEventsWith(new Handler2());
disruptor.handleEventsWith(new Handler3());
//2.3 菱形操作 (一)
disruptor.handleEventsWith(new Handler1(), new Handler2())
.handleEventsWith(new Handler3());
//2.3 菱形操作 (二)
EventHandlerGroup<Trade> ehGroup = disruptor.handleEventsWith(new Handler1(), new Handler2());
ehGroup.then(new Handler3());
//2.4 六边形操作
Handler1 h1 = new Handler1();
Handler2 h2 = new Handler2();
Handler3 h3 = new Handler3();
Handler4 h4 = new Handler4();
Handler5 h5 = new Handler5();
disruptor.handleEventsWith(h1, h4);
disruptor.after(h1).handleEventsWith(h2);
disruptor.after(h4).handleEventsWith(h5);
disruptor.after(h2, h5).handleEventsWith(h3);
//3 启动disruptor
RingBuffer<Trade> ringBuffer = disruptor.start();
//设置唤醒的次数
CountDownLatch latch = new CountDownLatch(1);
long begin = System.currentTimeMillis();
es1.submit(new TradePushlisher(latch, disruptor));
latch.await(); //进行向下
disruptor.shutdown();
es1.shutdown();
es2.shutdown();
System.err.println("总耗时: " + (System.currentTimeMillis() - begin));
}
}
多生产者模型:
多消费者:
import java.util.concurrent.atomic.AtomicInteger;
public class Order {
private String id;
private String name;
private double price;
public Order() {
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public double getPrice() {
return price;
}
public void setPrice(double price) {
this.price = price;
}
}
import com.lmax.disruptor.RingBuffer;
public class Producer {
private RingBuffer
public Producer(RingBuffer<Order> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void sendData(String uuid) {
long sequence = ringBuffer.next();
try {
Order order = ringBuffer.get(sequence);
order.setId(uuid);
} finally {
ringBuffer.publish(sequence);
}
}
}
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import com.lmax.disruptor.WorkHandler;
//消费者实现多消费的接口
public class Consumer implements WorkHandler
private String comsumerId;
private static AtomicInteger count = new AtomicInteger(0);
private Random random = new Random();
public Consumer(String comsumerId) {
this.comsumerId = comsumerId;
}
public void onEvent(Order event) throws Exception {
Thread.sleep(1 \* random.nextInt(5));
System.err.println("当前消费者: " + this.comsumerId + ", 消费信息ID: " + event.getId());
count.incrementAndGet();
}
public int getCount(){
return count.get();
}
}
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.WorkerPool;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.ProducerType;
public class Main {
//1 创建RingBuffer
RingBuffer<Order> ringBuffer =
RingBuffer.create(ProducerType.MULTI,
new EventFactory<Order>() {
public Order newInstance() {
return new Order();
}
},
1024\*1024,
new YieldingWaitStrategy());
//2 通过ringBuffer 创建一个屏障
SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
//3 创建多个消费者数组:
Consumer\[\] consumers = new Consumer\[10\];
for(int i = 0; i < consumers.length; i++) {
consumers\[i\] = new Consumer("C" + i);
}
//4 构建多消费者工作池
WorkerPool<Order> workerPool = new WorkerPool<Order>(
ringBuffer,
sequenceBarrier,
new EventExceptionHandler(), //在失败的时候怎么处理
consumers);
//5 设置多个消费者的sequence序号 用于单独统计消费进度, 并且设置到ringbuffer中
ringBuffer.addGatingSequences(workerPool.getWorkerSequences());
//6 启动workerPool
workerPool
.start(Executors.newFixedThreadPool(5));
final CountDownLatch latch = new CountDownLatch(1);
for(int i = 0; i < 100; i++) {
final Producer producer = new Producer(ringBuffer);
new Thread(new Runnable() {
public void run() {
try {
latch.await();
} catch (Exception e) {
e.printStackTrace();
}
for(int j = 0; j<100; j++) {
producer.sendData(UUID.randomUUID().toString());
}
}
}).start();
}
Thread.sleep(2000);
System.err.println("----------线程创建完毕,开始生产数据----------");
latch.countDown();
Thread.sleep(10000);
System.err.println("任务总数:" + consumers\[2\].getCount());
}
static class EventExceptionHandler implements ExceptionHandler<Order> {
@Override
public void handleEventException(Throwable ex, long sequence, Order event) {
}
@Override
public void handleOnStartException(Throwable ex) {
}
@Override
public void handleOnShutdownException(Throwable ex) {
}
}
}
读读共享、读写互斥、写写互斥
LockSupport :
Thread A = new Thread(new Runnable() {
@Override
public void run() {
int sum = 0;
for(int i =0; i < 10; i ++){
sum += i;
}
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
LockSupport.park(); //后执行
System.err.println("sum: " + sum);
}
});
A.start();
Thread.sleep(1000);
LockSupport.unpark(A); //先执行
Executors.newCachedThreadPool();
Executors.newFixedThreadPool(10);
ThreadPoolExecutor pool = new ThreadPoolExecutor(5,
Runtime.getRuntime().availableProcessors() \* 2,
60,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(200),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("order-thread");
if(t.isDaemon()) {
t.setDaemon(false);
}
if(Thread.NORM\_PRIORITY != t.getPriority()) {
t.setPriority(Thread.NORM\_PRIORITY);
}
return t;
}
},
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.err.println("拒绝策略:" + r);
}
});
ReentrantLock reentrantLock = new ReentrantLock(true);
线程池:
ThreadPoolExecutor pool=new ThreadPoolExecutor(
5,
Runtime.getRuntime().availableProcessors()*2,
60,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(200),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread=new Thread(r);
thread.setName("order-thread");
if(thread.isDaemon()){
thread.setDaemon(false);
}
if(Thread.NORM\_PRIORITY!=thread.getPriority()){
thread.setPriority(Thread.NORM\_PRIORITY);
}
return thread;
}
},
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.err.println("拒绝策略:"+r);
}
});
AQS架构:
手机扫一扫
移动阅读更方便
你可能感兴趣的文章