现代计算机处理器与存储设备运算速度完全不在同一量级上,至少相差几个数量级,如果让处理器等待计算机存储设备那么这样处理器的优势就不会体现出来。为了提高处理性能实现高并发,在处理器和存储设备之间加入了高速缓存(cache)来作为缓冲。将CPU运算需使用到的数据先复制到缓存中,让CPU运算能够快速进行;当CPU运算完成之后,再将缓存中的结果写回主内存,这样CPU运算就不用依赖等待主内存的读写操作了。
高速缓存设置为多级缓存,其目的为了解决CPU运算速度与内存读写速度不匹配的矛盾;在CPU和内存之间,引入了L1高速缓存、L2高速缓存、L3高速缓存
每一级缓存中所存储的数据全部都是下一级缓存中的一部分,当CPU需要数据时,先从缓存中取,加快读写速度,提高CPU利用率。存储层次金字塔的结构:
寄存器 → L1缓存 → L2缓存 → L3缓存 → 主内存 → 本地磁盘 → 远程数据库。
越往上访问速度越快、成本越高,空间更小。越往下访问速度越慢、成本越低,空间越大。
每个处理器都有自己的高速缓存,同时又共同操作同一块主内存,当多个处理器同时操作主内存时,可能将导致各自的的缓存数据不一致,为了解决这个问题
主要提供了两种解决办法:
JMM定义了线程和主内存之间的抽象关系:线程之间的共享变量存储在主内存中,每个线程都有一个私有的本地内存或者叫工作内存,本地内存中存储了该线程以读/写共享变量的副本。
但本地内存是JMM的一个抽象概念,并不真实存在;它涵盖了缓存,写缓冲区,寄存器以及其他的硬件和编译器优化。
主内存和本地内存的交互
线程的本地内存中保存了被该线程使用到的变量的主内存副本拷贝,线程对变量的所有操作都必须在本地内存中进行,而不能直接读写主内存中的变量。不同的线程之间也无法直接访问对方本地内存中的变量,线程间变量值的传递均需要通过主内存来完成。下面AB线程为例:
在此交互过程中,Java内存模型定义了8种操作来完成,虚拟机实现必须保证每一种操作都是原子的、不可再拆分的(double和long类型例外)
一个线程在执行的过程中不仅会用到CPU资源,还会用到IO,IO的速度远远比不上CPU的运算速度;当一个线程要请求IO的时候可以放弃CPU资源,这个时候其他线程就可以使用CPU,这就提高了CPU的利用率,当然线程之间的切换也会有额外的资源消耗,但多线程带来回报更大。而有了多线程就存在线程安全的问题,在Java并发编程中的一种思路就是通过原子性、可见性和有序性这三大特性切入点去考虑;在并发编程中,必须同时保证程序的原子性、有序性和可见性才能够保证程序的正确性。
原子性
有序性
定义:程序的执行是存在一定顺序的。在Java内存模型中,为了提高性能和程序的执行效率,编译器和处理器会对程序指令做重排序。在单线程中,重排序不会影响程序的正确性;as-if-serial原则是指不管编译器和CPU如何重排序,必须保证单线程情况下程序的结果是正确的;但在并发编程中,却有可能得出错误的结果。
在java当中使用volatile关键字、synchronized关键字或Lock接口来保证有序性。
可见性
在Java中任何一个对象都有一个monitor与之关联,当且一个monitor被持有后,这个对象处于锁定状态;尝试获得锁就是尝试获取对象所对应的monitor的所有权。
synchronized主要原理和思路通过monitor里面设计一个计数器,synchronized关键字在底层编译后的jvm指令中会有monitorenter(加锁)和monitorexit(释放锁)两个指令来实现锁的使用,每个对象都有一个关联的monitor,比如一个对象实例就有一个monitor,一个类的Class对象也有一个monitor,如果要对这个对象加锁,那么必须获取这个对象关联的monitor的lock锁;计数器从0开始;如果一个线程要获取monitor的锁,就看看他的计数器是不是0,如果是0的话,那么说明没人获取锁,他就可以获取锁了,然后对计数器加1加锁成功。
而对象头是synchronized实现锁的基础,因为synchronized申请锁、上锁、释放锁都与对象头有关。对象头其中一个重要部分Mark Word存储对象的hashCode、锁信息或分代年龄或GC标志等信息,锁总共有四个状态,分别为无锁状态、偏向锁、轻量级锁、重量级锁,锁的类型和状态在对象头Mark Word中都有记录,在申请锁、锁升级等过程中JVM都需要读取对象的Mark Word数据。java对象主要组成如下:
而对象头Mark Word组成如下:
volatile原理有volatile修饰的共享变量进行写操作的时候会多出Lock前缀的指令,该指令在多核处理器下会引发两件事情。
这样就保证了多个处理器的缓存是一致的,对应的处理器发现自己缓存行对应的内存地址被修改,就会将当前处理器缓存行设置无效状态,当处理器对这个数据进行修改操作的时候会重新从主内存中把数据读取到缓存里。例如在Jdk7的并发包里新增了一个队列集合类LinkedTransferQueue,它在使用volatile变量的时候,会采用一种将字节追加到64字节的方法来提高性能。那为什么追加到64字节能够优化性能呢?这是因为在很多处理器中它们的L1、L2、L3缓存的高速缓存行都是64字节宽,不支持填充缓存行,例如,现在有两个不足64字节的变量AB,那么在AB变量写入缓存行时会将AB变量的部分数据一起写入一个缓存行中,那么在CPU1和CPU2想同时访问AB变量时是无法实现的,也就是想同时访问一个缓存行的时候会引起冲突,如果可以填充到64字节,AB两个变量会分别写入到两个缓存行中,这样就可以并发,同时进行变量访问,从而提高效率。
Disruptor是LMAX公司LMAX Development Team开源的高性能内存队列,是一个高性能线程间消息传递库,提供并发环缓冲数据结构的库;它的设计目的是在异步事件处理体系结构中提供低延迟、高吞吐量的工作队列。它能够让开发人员只需写单线程代码,就能够获得非常强悍的性能表现,同时避免了写并发编程的难度和坑; 其本质思想在于多线程未必比单线程跑的快。
CPU 为了更快的执行代码,当从内存中读取数据时并不是只读自己想要的部分, 而是读取足够的字节来填入高速缓存行。根据不同的 CPU ,高速缓存行大小不同,有32个字节和64个字节处。这样,当CPU访问相邻的数据时,就不必每次都从内存中读取,提高了速度,这是因为访问内存要比访问高速缓存用的时间多得多。这个缓存是CPU内部自己的缓存,内部的缓存单位是行,叫做缓存行。
当CPU尝试访问某个变量时,会先在L1 Cache中查找,如果命中缓存则直接使用;如果没有找到,就去下一级,一直到内存,随后将该变量所在的一个Cache行大小的区域复制到Cache中。查找的路线越长,速度也越慢,因此频繁访问的数据应当保持在L1Cache中。另外,一个变量的大小往往小于一个Cache行的大小,这时就有可能把多个变量放到一个Cache行中。下面代码举例数组命中缓存行和随机读写执行耗时差异:
package cn.itxs.disruptor;
public class CacheMain {
private static final int ARR_SIZE = 20000;
public static void main(String[] args) {
int[][] arrInt = new int[ARR_SIZE][ARR_SIZE];
long startTime = System.currentTimeMillis();
// 第一种情况为顺序访问,一次访问后,后面的多次访问都可以命中缓存
for (int i = 0; i < ARR_SIZE; i++) {
for (int j = 0; j < ARR_SIZE; j++) {
arrInt[i][j] = i * j;
}
}
long endTime = System.currentTimeMillis();
System.out.println("顺序访问耗时" + (endTime - startTime) + "毫秒");
startTime = System.currentTimeMillis();
// 第二情况为随机访问,每次都无法命中缓存行
for (int i = 0; i < ARR_SIZE; i++) {
for (int j = 0; j < ARR_SIZE; j++) {
arrInt[j][i] = i * j;
}
}
endTime = System.currentTimeMillis();
System.out.println("随机访问耗时" + (endTime - startTime) + "毫秒");
}
}
当CPU执行完后还需要将数据回写到主内存上以便于其它线程可以从主内存中获取最新的数据。假设两个线程都加载了相同的CacheLine即缓存行数据
pom文件引入disruptor的依赖
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.4.4</version>
</dependency>
事件类LongEvent.java
package cn.itxs.disruptor;
public class LongEvent
{
private long value;
public void set(long value)
{
this.value = value;
}
@Override
public String toString() {
return "LongEvent{" +
"value=" + value +
'}';
}
}
事件工厂类EventFactory.java
package cn.itxs.disruptor;
import com.lmax.disruptor.EventFactory;
public class LongEventFactory implements EventFactory<LongEvent> {
@Override
public LongEvent newInstance() {
return new LongEvent();
}
}
事件处理实现类,也即是消费者,这里实现EventHandler接口,也即是每个消费者都消费相同数量的生产者数据,LongEventHandler.java
package cn.itxs.disruptor;
import com.lmax.disruptor.EventHandler;
public class LongEventHandler implements EventHandler<LongEvent> {
public static long count = 0;
@Override
public void onEvent(LongEvent event, long sequence, boolean endOfBatch) {
count ++;
System.out.println("[" + Thread.currentThread().getName() + "]" + event + "消费序号:" + sequence + ",event=" + event.toString());
}
}
测试类
package cn.itxs.disruptor;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.concurrent.*;
public class DisruptorMain {
public static void main(String[] args) throws InterruptedException {
// The factory for the event
LongEventFactory factory = new LongEventFactory();
// Specify the size of the ring buffer, must be power of 2.
int bufferSize = 1024*1024;
// Construct the Disruptor
Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, Executors.defaultThreadFactory(),
ProducerType.MULTI, new SleepingWaitStrategy());
// Connect the handlers
LongEventHandler h1 = new LongEventHandler();
LongEventHandler h2 = new LongEventHandler();
disruptor.handleEventsWith(h1, h2);
// Start the Disruptor, starts all threads running
disruptor.start();
// Get the ring buffer from the Disruptor to be used for publishing.
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
//================================================================================================
final int threadCount = 3;
CyclicBarrier barrier = new CyclicBarrier(threadCount);
ExecutorService service = Executors.newCachedThreadPool();
for (long i = 0; i < threadCount; i++) {
final long threadNum = i;
service.submit(()-> {
System.out.printf("Thread %s ready to start!\n", threadNum );
try {
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
for (int j = 0; j < 2; j++) {
final int seq = j;
ringBuffer.publishEvent((event, sequence) -> {
event.set(seq);
System.out.println(threadNum + "线程生产了序号为" + sequence + ",消息为" + seq);
});
}
});
}
service.shutdown();
TimeUnit.SECONDS.sleep(3);
System.out.println(LongEventHandler.count);
}
}
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-VpEC2SPp-1642435544540)(image-20220117183016502.png)]
事件处理实现类实现WorkHandler接口,也即是多个消费者合起来消费一份生产者数据,LongEventHandler.java
package cn.itxs.disruptor;
import com.lmax.disruptor.WorkHandler;
public class LongEventHandlerWorker implements WorkHandler<LongEvent> {
public static long count = 0;
@Override
public void onEvent(LongEvent longEvent) throws Exception {
count ++;
System.out.println("[" + Thread.currentThread().getName() + "]" + "event=" + longEvent.toString());
}
}
测试类
package cn.itxs.disruptor;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.concurrent.*;
public class DisruptorWorkerMain {
public static void main(String[] args) throws InterruptedException {
// The factory for the event
LongEventFactory factory = new LongEventFactory();
// Specify the size of the ring buffer, must be power of 2.
int bufferSize = 1024*1024;
// Construct the Disruptor
Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, Executors.defaultThreadFactory(),
ProducerType.MULTI, new SleepingWaitStrategy());
// Connect the handlers
// 创建10个消费者来处理同一个生产者发的消息(这10个消费者不重复消费消息)
LongEventHandlerWorker[] longEventHandlerWorkers = new LongEventHandlerWorker[4];
for (int i = 0; i < longEventHandlerWorkers.length; i++) {
longEventHandlerWorkers[i] = new LongEventHandlerWorker();
}
disruptor.handleEventsWithWorkerPool(longEventHandlerWorkers);
// Start the Disruptor, starts all threads running
disruptor.start();
// Get the ring buffer from the Disruptor to be used for publishing.
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
//================================================================================================
final int threadCount = 3;
CyclicBarrier barrier = new CyclicBarrier(threadCount);
ExecutorService service = Executors.newCachedThreadPool();
for (long i = 0; i < threadCount; i++) {
final long threadNum = i;
service.submit(()-> {
System.out.printf("Thread %s ready to start!\n", threadNum );
try {
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
for (int j = 0; j < 2; j++) {
final int seq = j;
ringBuffer.publishEvent((event, sequence) -> {
event.set(seq);
System.out.println(threadNum + "线程生产了序号为" + sequence + ",消息为" + seq);
});
}
});
}
service.shutdown();
TimeUnit.SECONDS.sleep(3);
System.out.println(LongEventHandlerWorker.count);
}
}
**本人博客网站 **IT小神 www.itxiaoshen.com
手机扫一扫
移动阅读更方便
你可能感兴趣的文章