JUC基础学习笔记
阅读原文时间:2023年07月08日阅读:4

JUC即java.util .concurrent工具包的简称。从JDK 1.5 开始出现,主要用于处理多线程、高并发问题。

原子性、可见性、有序性

1、原子性:

指一个操作或者多个操作,要么同时成功并且执行的过程不会被任何因素打断,要么同时失败。

2、可见性:

指多个线程访问一个资源时,该资源的状态、值信息等对于其他线程都是可见的

3、有序性:

指执行顺序按照代码先后来执行。

1、用户线程:平时用到的普通线程,自定义线程

2、守护线程:运行在后台,是一种特殊的线程,比如垃圾回收

当主线程结束后,用户线程还在运行,JVM 存活

如果没有用户线程,都是守护线程,JVM 结束

1、串行模式:表示所有任务都按先后顺序进行,当一个任务完成后才能接着执行下一个任务;

2、并行模式:同一时刻(物理上)多个线程同时执行,体现在多核CPU处理器上;

3、并发模式:同一时刻(逻辑上)模拟出多个线程快速交替访问同一个资源,体现在单核CPU处理器上;

NEW //尚未启动
RUNNABLE //正在执行中
BLOCKED //阻塞的(被同步锁或者IO锁阻塞)
WAITING //永久等待状态
TIMED_WAITING //等待指定的时间重新被唤醒的状态
TERMINATED //执行完成

1、同步机制synchronized关键字:

  (1)、等待模式:wait()

  (2)、通知模式:notify()/notifyAll()

2、同步机制Lock接口:

Lock接口的父子类关系:

Lock 接口方法 :

{
void lock();
void lockInterruptibly() throws InterruptedException;
boolean tryLock();
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
void unlock();
Condition newCondition();
}

  (1)、声明锁:Lock lock = new Lock();

  (2)、声明钥匙:Condition condition = lock.newCondition();

  (3)、Condition接口类可以实现等待/通知模式:

    1)、等待模式:await()会使当前线程等待,同时会释放锁,当其他线程调用signal()时,线程会重新获得锁并继续执行。

    2)、通知模式:signal()用于唤醒一个等待的线程。

注:线程wait()的虚假唤醒问题:

  *1*、虚假唤醒:线程被唤醒,但不会被通知、中断或超时

  *2*、解决方案:将线程的等待方法放到循环中

  JVM 是通过进入、退出对象监视器( Monitor )来实现对方法、同步块的同步的。具体实现是在编译之后在同步方法调用前加入一个 monitor.enter 指令,在退出方法和异常处插入 monitor.exit 的指令。其本质就是对一个对象监视器( Monitor )进行获取,而这个获取过程具有排他性从而达到了同一时刻只能一个线程访问的目的。而对于没有获取到锁的线程将会阻塞到方法入口处,直到获取锁的线程 monitor.exit 之后才能尝试继续获取锁。

1、Lock是一个接口,而synchronized 是Java中的关键字

2、synchronized可以给类、方法、代码块加锁;而lock 只能给代码块加锁。

3、synchronized不需要手动获取锁和释放锁,发生异常会自动释放锁,不会造成死锁;而lock需要手动加锁Lock()和释放锁unLock(),如果没有unLock()去释放锁就会造成死锁,故Lock 使用时需要在finally块中释放锁。

4、通过Lock可以知道有没有成功获取锁,而synchronized 却无法办到。

5、Lock可以让等待锁的线程响应中断,而synchronized 却不行,使用synchronized时,等待的线程会一直等待下去,不能够响应中断;

参考

1、synchronized是关键字;ReentrantLock是实现Lock 接口的类,即“可重入锁”。

2、ReentrantLock可以被继承、可以有方法、可以有各种各样的类变量,

ReentrantLock比synchronized的扩展性体现在几点上:

  (1)、ReentrantLock可以对获取锁的等待时间进行设置,这样就避免了死锁

  (2)、ReentrantLock可以获取各种锁的信息

  (3)、ReentrantLock可以灵活地实现多路通知

3、锁机制的不同:

  (1)、ReentrantLock底层调用的是Unsafe的park方法加锁;

  (2)、synchronized操作的是对象头中Mark Word(标记字)。

  (对象头的Mark Word:主要用来表示对象的线程锁状态,另外还可以用来配合GC、存放该对象的hashCode)

1、死锁:

指两个或两个以上的进程(或线程)在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程。

2、活锁:

指任务或者执行者没有被阻塞,由于某些条件没有满足,导致一直重复尝试—失败—尝试—失败的过程。处于活锁的实体是在不断的改变状态,活锁有可能自行解开。(即两个线程都可使用资源,但都相互礼让,最后两个线程都无法使用资源)

3、饿死:

指一个线程长时间得不到需要的资源而不能执行的现象(即单进程状态下,一个线程让优先级高的线程先执行,该线程一直处在等待状态)

4、阻塞:

指的是暂停一个线程的执行以等待某个条件触发

1、互斥条件:一个资源每次只能被一个进程使用。

2、请求与保持条件:一个进程因请求资源而阻塞时,对已获得的资源保持不放

3、不剥夺条件:进程已获得的资源,在末使用完之前,不能强行剥夺。

4、循环等待条件:若干进程之间形成一种头尾相接的循环等待资源关系

Java 提供了大量方法来支持阻塞:

1、sleep()

  sleep()允许指定以毫秒为单位的一段时间作为参数,它使得线程在指定的时间内进入阻塞状态,不能得到CPU 时间,指定的时间一过,线程重新进入可执行状态。

2、suspend() 和 resume()

  两个方法配套使用,suspend()使得线程进入阻塞状态,并且不会自动恢复,必须其对应的resume() 被调用,才能使得线程重新进入可执行状态。

3、yield()

  yield() 使当前线程放弃当前已经分得的CPU时间,但不使当前线程阻塞,即线程仍处于可执行状态,随时可能再次分得 CPU 时间。调用 yield() 的效果等价于调度程序认为该线程已执行了足够的时间从而转到另一个线程。

4、wait() 和 notify()

  两个方法配套使用,wait() 使得线程进入阻塞状态,它有两种形式,一种允许指定以毫秒为单位的一段时间作为参数,另一种没有参数,前者当对应的 notify() 被调用或者超出指定时间时线程重新进入可执行状态,后者则必须对应的notify() 被调用.

集合:

1、Collection父接口

|----Collection接口

*            |----List接口

*                |----ArrayList类:线程不安全;

*                |----LinkedList类: 线程不安全

*                |---- Vector类:线程安全的

*

*            |----Set接口

*                |----HashSet类:线程不安全的;

*                     |----LinkedHashSet类: 线程不安全

*                |----TreeSet类:线程不安全

2、Map父接口

|----Map接口

*            |----HashMap类:线程不安全;

*                |----LinkedHashMap类: 线程不安全

*            |----TreeMap类: 线程不安全

*            |----Hashtable:线程安全的

1、Vector方式:

(例:List list = new Vector();)

源码:

public class Vector
extends AbstractList
implements List, RandomAccess, Cloneable, java.io.Serializable {
  … …
}

Vector继承于AbstractList,实现List, RandomAccess, Cloneable,Serializable接口。

  (1)、继承AbstractList,实现List接口——即Vector为队列,支持添加、删除、修改、遍历等功能。

  (2)、实现RandmoAccess 接口——即Vector提供随机访问功能。

  (3)、实现Cloneable接口——即Vector实现clone()函数,能被克隆。

  注:Vector类中方法被synchronized同步修辞,故线程安全,运行时无并发异常

2、Collections.synchronizedList()方式:

(例:List list = Collections.synchronizedList(new ArrayList<>());)

Collections是个一个操作Collection和Map的工具类,提供了一系列的静态方法来辅助容器操作,这些方法包括对容器的搜索、排序、线程安全化等等。

源码:

public static List synchronizedList(List list) {
return (list instanceof RandomAccess ?
new SynchronizedRandomAccessList<>(list) :
new SynchronizedList<>(list));
}

3、CopyOnWriteArrayList()方式:

(例:List list = new CopyOnWriteArrayList();)

源码:

public class CopyOnWriteArrayList
implements List, RandomAccess, Cloneable, java.io.Serializable {
… …
/** The array, accessed only via getArray/setArray. */
private transient volatile Object[] array;
… …
}

1、“动态数组”机制:

  内部采用“volatile 数组”(array)来保持数据。在“添加/修改/删除”数据时,都会新建一个数组,并将更新后的数据拷贝到新建的数组中,最后再将该数组赋值给“volatile 数组”。

(由于在“添加/修改/删除”数据时,都需要新建数组,所以CopyOnWriteArrayList 增删效率低,遍历查询效率高。)

2、“线程安全”机制:

  通过 volatile 和互斥锁(锁定/非锁定)来实现的。

  通过“volatile 数组”来保存数据的。一个线程读取 volatile 数组时,总能看到其它线程对该 volatile 变量最后的写入;就这样,通过 volatile 提供了“读取到的数据总是最新的”这个机制的保证。通过互斥锁来保护数据。在“添加/修改/删除”数据时,会先“获取互斥锁”,再修改完毕之后,先将数据更新到“volatile 数组”中,然后再“释放互斥锁”,就达到了保护数据的目的。

1、volatile 是变量修饰符;synchronized是修饰类、方法、代码段。

2、volatile 仅能实现变量的修改可见性,不能保证原子性;synchronized 可以保证变量的修改可见性和原子性。

3、volatile不会造成线程的阻塞;synchronized可能会造成线程的阻塞。

1、使用安全类,如 java.util.concurrent 下的类;

2、使用自动锁synchronized;

3、使用手动锁Lock;

不是线程安全的操作。它涉及到多个指令,如读取变量值,增加,然后存储回内存,这个过程可能会出现多个线程交差

  使用Callable接口创建线程,需要实现call()方法,call()在完成时返回结果必须存储在主线程已知的对象中,可以使用Future接口或FutureTask类获取该线程的返回结果。

源码:用于获取任务的结果

public Object get()throws InterruptedException,ExecutionException;

1、Future对象可以在后台完成主线程中比较耗时的操作,但不会导致主线程阻塞,当主线程将来需要其执行结果时,可通过Future对象获得后台作业的计算结果或者执行状态。

2、FutureTask多用于耗时的计算,主线程可以在完成自己的任务后,再去获取结果

3、Future对象或FutureTask对象调用get()方法获取结果只有在计算完成时获取,否则会一直阻塞直到任务转入完成状态,一旦计算完成,就不能再重新开始或取消计算,只会返回结果或者抛出异常。

JUC中提供了三种常用的辅助类,通过这些辅助类可以很好的解决线程数量过多时 Lock 锁的频繁操作。

1、CountDownLatch: 减少计数(减1操作)

使用:

//设置一个计数器
CountDownLatch countDownLatch = new CountDownLatch(计数器的数值);
//调用countDown()方法来进行减1的操作(调用countDown方法的线程不会阻塞)
countDownLatch.countDown();
//当一个或多个线程执行await()方法时,这些线程会阻塞;当计数器的值变为0时,因await方法阻塞的线程会被唤醒,继续执行接下来语句
countDownLatch.await();

2、CyclicBarrier: 循环栅栏(加1操作)

使用:

/定义循环栅栏,每次执行CyclicBarrier一次内部计数器会加一
CyclicBarrier cyclicBarrier = new CyclicBarrier(目标障碍数);
//当达到了目标障碍数,才会执行cyclicBarrier.await()之后的语句
cyclicBarrier.await();

3、Semaphore: 信号灯

使用:

//每个信号量初始化为一个最多只能分发一个许可证
Semaphore semaphore = new Semaphore(最大信号量);
//调用acquire()方法获得许可证
semaphore.acquire();
//调用release()方法释放许可
semaphore.release();

1、ReentrantReadWriteLock提供的两种锁:

  (1)、共享锁:读操作相关的锁

  当线程持有读锁的情况下,该线程不能取得写锁;

  (由于线程获取读锁的时候,可能有其他线程同时也在持有读锁,因此不能把获取读锁的线程“升级”为写锁)

  (2)、排他锁:写操作相关的锁

  线程占用写锁的情况下,该线程可以继续获取读锁;

2、线程进入读写锁的前提条件:

  (1)、进入读锁的前提条件:没有其他线程的写锁

  (2)、进入写锁的前提条件:没有其他线程的读/写锁

3、源码:

public class ReentrantReadWriteLock
implements ReadWriteLock, java.io.Serializable {
  ……
//使用默认(非公平)的排序属性创建一个新的ReentrantReadWriteLock
public ReentrantReadWriteLock() {
……
}
//使用给定的公平策略创建一个新的 ReentrantReadWriteLock
public ReentrantReadWriteLock(boolean fair) {
……
}
//返回用于写入操作的锁
public ReentrantReadWriteLock.WriteLock writeLock() {
return writerLock;
}
//返回用于读取操作的锁
public ReentrantReadWriteLock.ReadLock readLock() {
return readerLock;
}
// WriteLock、ReadLock实现了Lock接口
public static class WriteLock implements Lock, java.io.Serializable {……}
public static class ReadLock implements Lock, java.io.Serializable {……}
  ……
}

4、使用:

//创建读写锁对象
private ReadWriteLock rwLock = new ReentrantReadWriteLock();
//添加写锁
rwLock.writeLock().lock();
//释放写锁
rwLock.writeLock().unlock();
//添加读锁
rwLock.readLock().lock();
//释放读锁
rwLock.readLock().unlock();

1、线程池不仅能够保证内核的充分利用,还能防止过分调度

2、Java中的线程池是通过Executors工具类与Executor框架实现:

1、RUNNING:最正常的状态,接受新的任务,处理等待队列中的任务。

2、SHUTDOWN:不接受新的任务提交,但是会继续处理等待队列中的任务。

3、STOP:不接受新的任务提交,不再处理等待队列中的任务,中断正在执行任务的线程。

4、TIDYING:所有的任务都销毁了,workCount为0,线程池的状态在转换为TIDYING状态时,会执行terminated()方法。

5、TERMINATED:terminated()方法结束后,线程池的状态转变成这个。

线程池的创建方法总共有 7 种,但总体来说可分为 2 类:

  (1)、通过ThreadPoolExecutor创建线程池;

  (2)、通过Executors工具类创建线程池

1、Executors.newFixedThreadPool:固定长度线程池

  控制并发的线程数,超出的线程会在队列中等待;如果有工作线程退出,将会有新的工作线程被创建,以补足指定的数目

使用:

ExecutorService threadPool = Executors.newFixedThreadPool(int nThreads);

2、Executors.newCachedThreadPool:可缓存线程池

  若线程数超过处理所需,缓存一段时间后会回收,若线程数不够,则新建线程。其内部使用SynchronousQueue 作为工作队列;

使用:

ExecutorService threadPool = Executors.newCachedThreadPool()

3、Executors.newSingleThreadExecutor:单一线程池

  保证先进先出的执行顺序,最多会有一个任务处于活动状态,并且不允许使用者改动线程池实例,因此可以避免其改变线程数目;

使用:

ExecutorService threadPool = Executors.newSingleThreadExecutor()

4、Executors.newScheduledThreadPool:

  创建一个可以执行延迟任务的线程池;

使用:

ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(int corePoolSize)

5、Executors.newSingleThreadScheduledExecutor:

  创建一个单线程的可以执行延迟任务的线程池;

使用:

ScheduledExecutorService threadPool = Executors.newSingleThreadScheduledExecutor();

6、Executors.newWorkStealingPool:

  创建一个抢占式执行的线程池,利用Work-Stealing算法,并行地处理任务,不保证处理顺序——JDK 1.8 添加。

使用:

ExecutorService threadPool = Executors.newWorkStealingPool();

7、(重)ThreadPoolExecutor:(可用于自定义创建)

  最原始的创建线程池的方式

使用:

/**
*可常用参数
* corePoolSize 线程池的核心线程数(最小的线程数)
* maximumPoolSize 能容纳的最大线程数
* keepAliveTime 空闲线程存活时间
* unit 存活的时间单位
(1)、TimeUnit.DAYS:天
(2)、TimeUnit.HOURS:小时
(3)、TimeUnit.MINUTES:分
(4)、TimeUnit.SECONDS:秒
(5)、TimeUnit.MILLISECONDS:毫秒
(6)、TimeUnit.MICROSECONDS:微妙
(7)、TimeUnit.NANOSECONDS:纳秒
* workQueue 存放提交但未执行任务的队列(阻塞队列)
* threadFactory 创建线程的工厂类:可以省略
* handler 等待队列满后的拒绝策略:可以省略
拒绝策略:
(1)、CallerRunsPolicy: 当触发拒绝策略,只要线程池没有关闭的话,则使用调用 线程直接运行任务。一般并发比较小,性能要求不高,不允许失败。
    但是,由于调用者自己运行任务,如果任务提交速度过快,可能导致程序阻塞,性能效率上必然的损失较大
(2)、AbortPolicy: 丢弃任务,并抛出拒绝执行
(3)、RejectedExecutionException 异常信息。线程池默认的拒绝策略。必须处理好抛出的异常,否则会打断当前的执行流程,影响后续的任务执行。
(4)、DiscardPolicy: 直接丢弃,其他啥都没有
(5)、DiscardOldestPolicy: 当触发拒绝策略,只要线程池没有关闭的话,丢弃阻塞队列 workQueue 中最老的一个任务,并将新任务加入
*/
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
5,
10,
60L,
TimeUnit.SECONDS,
new SynchronousQueue<>(),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
);

阿里巴巴《Java开发手册》答案:

【强制】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。

说明:Executors 返回的线程池对象的弊端如下:

  1) FixedThreadPool 和 SingleThreadPool:允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。

  2)CachedThreadPool:允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。

所以综上情况所述,推荐使用ThreadPoolExecutor的方式进行线程池的创建,因为这种创建方式更可控,并且更加明确了线程池的运行规则,可以规避一些未知的风险。

1、在创建了线程池后,等待提交过来的任务请求。

2、当调用execute()方法添加一个请求任务时,线程池会做如下判断:

  (1)、如果正在运行的线程数量小于corePoolSize(线程池的核心线程数),那么马上创建线程运行这个任务;

  (2)、如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入workQueue阻塞队列中;

  (3)、如果这时候workQueue阻塞队列饱和且正在运行的线程数量还小于 maximumPoolSize,那么还是要创建非核心线程立刻运行这个任务;

  (4)、如果workQueue 阻塞队列满了且正在运行的线程数量大于或等于 maximumPoolSize(即:workQueue.size() + maximumPoolSize),那么线程池会启动饱和拒绝策略来执行。

3、当一个线程完成任务时,它会从workQueue 阻塞队列中取下一个任务来执行。

4、当一个线程无事可做,超过一定的时间(keepAliveTime) 时,线程池会判断:

  如果当前运行的线程数大于 corePoolSize,那么这个线程就被停掉。所以线程池的所有任务完成后,它最终会收缩到 corePoolSize 的大小。

1、ThreadPoolTaskExecutor:

2、ThreadPoolTaskScheduler:

3、关系说明

(1)、关系结构图:

  1)、蓝色实现的箭头是指继承关系
  2)、绿色虚线指的是接口实现关系
  3)、绿色实线指的是接口继承关系

(2)、用途选择:

  1)、ThreadPoolTaskExecutor是一个专门用于执行任务的类

  2)、ThreadPoolTaskScheduler是一个专门用于调度任务的类

  3)、两者之间进行选择归结为以下问题:是否需要执行或安排任务的执行

4、配置实现:

@Configuration
public class TaskPoolConfig {
/**
* ThreadPoolTaskExecutor是spring core包中的,而ThreadPoolExecutor是JDK中的JUC。
* ThreadPoolTaskExecutor是对ThreadPoolExecutor进行了封装处理。
*
* 拒绝策略:
* (1)、CallerRunsPolicy: 当触发拒绝策略,只要线程池没有关闭的话,则使用调用 线程直接运行任务。一般并发比较小,性能要求不高,不允许失败。
*     但是,由于调用者自己运行任务,如果任务提交速度过快,可能导致程序阻塞,性能效率上必然的损失较大
* (2)、AbortPolicy: 丢弃任务,并抛出拒绝执行
* (3)、RejectedExecutionException 异常信息。线程池默认的拒绝策略。必须处理好抛出的异常,否则会打断当前的执行流程,影响后续的任务执行。
* (4)、DiscardPolicy: 直接丢弃,其他啥都没有
* (5)、DiscardOldestPolicy: 当触发拒绝策略,只要线程池没有关闭的话,丢弃阻塞队列 workQueue 中最老的一个任务,并将新任务加入
*/
@Value("${async.thread.concurrency.coreSize:10}")
private int coreSize;

@Value("${async.thread.concurrency.maxSize:20}")  
private int maxSize;

@Value("${async.thread.concurrency.queueCapacity:1000}")  
private int queueCapacity;

@Value("${async.thread.concurrency.keepAliveSeconds:10000}")  
private int keepAliveSeconds;

@Bean  
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {  
    final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();  
    executor.setCorePoolSize(coreSize); //核心线程数  
    executor.setMaxPoolSize(maxSize);   //最大线程数  
    executor.setQueueCapacity(queueCapacity);   //最大等待队列数  
    executor.setKeepAliveSeconds(keepAliveSeconds); //除核心线程,其他线程的保留时间  
    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());    //等待队列满后的拒绝策略  
    executor.initialize();  //执行初始化  
    executor.setThreadNamePrefix("async-executor-");    //线程前缀名称  
    return executor;  
}

@Bean  
public ThreadPoolTaskScheduler taskSchedulerExecutor() {  
    // 创建一个线程池对象  
    final ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();  
    // 定义一个线程池大小  
    scheduler.setPoolSize(100);  
    // 线程池名的前缀  
    scheduler.setThreadNamePrefix("taskExecutor-");  
    // 设置线程池关闭的时候等待所有任务都完成再继续销毁其他的Bean  
    scheduler.setWaitForTasksToCompleteOnShutdown(true);  
    // 设置线程池中任务的等待时间,如果超过这个时候还没有销毁就强制销毁,以确保应用最后能够被关闭,而不是阻塞住  
    scheduler.setAwaitTerminationSeconds(60);  
    // 线程池对拒绝任务的处理策略,当线程池没有处理能力的时候,该策略会直接在 execute 方法的调用线程中运行被拒绝的任务;如果执行程序已关闭,则会丢弃该任务  
    scheduler.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());  
    return scheduler;  
}  

}

1、execute():只能执行Runnable 类型的任务。

2、submit():可以执行Runnable和Callable类型的任务。

Callable 类型的任务可以获取执行的返回值,而Runnable执行无返回值。

1、“分治法”思想:

(1)、任务分割(调用ForkJoinTask.fork()方法):首先 Fork/Join 框架需要把大的任务分割成足够小的子任务,如果子任务比较大的话还要对子任务进行继续分割;

(2)、执行任务并合并结果(调用ForkJoinTask.join()方法):分割的子任务分别放到双端队列里,然后几个启动线程分别从双端队列里获取任务执行。子任务执行完的结果都放在另外一个队列里,启动一个线程从队列里取数据,然后合并这些数据。

2、使用与底层原理:

(1)、伪代码:

一、定义TaskExample类
public class TaskExample extends RecursiveTask {
//RecursiveTask: 继承后可以实现递归调用的任务
… …
if(任务很小){
直接计算得到结果
}else{
拆分成N个子线程任务
调用子线程任务的fork()进行计算
调用子线程任务的join()合并计算结果
}
… …
}
二、定义ForkJoinPoolDemo类
public class ForkJoinPoolDemo {
public static void main(String[] args) {
… …
//定义执行对象,ForkJoinTask需要通过ForkJoinPool来执行
ForkJoinPool forkJoinPool = new ForkJoinPool();
//加入任务执行
ForkJoinTask result = forkJoinPool.submit(需执行任务的对象);
try {
//输出结果
}catch (Exception e){
e.printStackTrace();
}finally {
forkJoinPool.shutdown();
}
}
}

(2)、底层原理:

  1)、fork()

  调用 ForkJoinTask的fork方法时,程序会把任务放在ForkJoinWorkerThread的pushTask的workQueue阻塞队列中,异步地执行这个任务,然后立即返回结果;

(pushTask方法把当前任务存放在ForkJoinTask数组队列里。然后再调用ForkJoinPool的 signalWork()方法唤醒或创建一个工作线程来执行任务。)

源码:

public final ForkJoinTask fork() {
Thread t;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
((ForkJoinWorkerThread)t).workQueue.push(this);
else
ForkJoinPool.common.externalPush(this);
return this;
}

final void push(ForkJoinTask task) {
ForkJoinTask[] a; ForkJoinPool p;
int b = base, s = top, n;
if ((a = array) != null) { // ignore if queue removed
int m = a.length - 1; // fenced write for task visibility
U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task); U.putOrderedInt(this, QTOP, s + 1); if ((n = s - b) <= 1) { if ((p = pool) != null) p.signalWork(p.workQueues, this); } else if (n >= m)
growArray();
}
}

  2)、join()

  Join 方法的主要作用是阻塞当前线程并等待获取结果。

  doJoin()方法得到当前任务的状态来判断返回的结果,

  任务状态有4 种: 已完成(NORMAL)、被取消(CANCELLED)、信号(SIGNAL)和出现异常(EXCEPTIONAL)

源码:

public final V join() {
int s;
if ((s = doJoin() & DONE_MASK) != NORMAL)
reportException(s);
return getRawResult();
}
/**
* 如果任务状态是已完成,则直接返回任务结果。
* 如果任务状态是被取消,则直接抛出 CancellationException
* 如果任务状态是抛出异常,则直接抛出对应的异常
*/
private int doJoin() {
int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
return (s = status) < 0 ? s :
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(w = (wt = (ForkJoinWorkerThread)t).workQueue).
tryUnpush(this) && (s = doExec()) < 0 ? s :
wt.pool.awaitJoin(w, this, 0L) :
externalAwaitDone();
}
/**
* doJoin()方法流程如下:
* 1. 首先通过查看任务的状态,看任务是否已经执行完成,如果执行完成, 则直接返回任务状态;
* 2. 如果没有执行完,则从任务数组里取出任务并执行。
* 3. 如果任务顺利执行完成,则设置任务状态为 NORMAL,如果出现异常, 则记录异常,并将任务状态设置为 EXCEPTIONAL。
*/

1、锁升级(无锁/偏向锁/轻量级锁/重量级锁)

无锁、偏向锁、轻量级锁和重量级锁,会随着多线程的竞争情况逐渐升级,但不能降级;通过偏向锁/轻量级锁/重量级锁的机制可实现高效的synchronized:(针对Synchronized)

(1)、偏向锁: 偏向锁会偏向第一个获得他的线程,如果接下来的执行过程中,该锁没有被其他线程获取,则持有偏向锁的线程将永远不需要再进行同步。

(偏向锁可以提高带有同步但无竞争的程序性能,也就是说他并不一定总是对程序运行有利,如果程序中大多数的锁都是被多个不同的线程访问,那偏向模式就是多余的。)

(2)、轻量级锁:指当锁是偏向锁的时候,被另一个线程所访问,偏向锁就会升级为轻量级锁,其他线程会通过自旋的形式尝试获取锁,不会阻塞,提高性能。

(3)、重量级锁:指当锁为轻量级锁的时候,另一个线程虽然是自旋,但自旋不会持续下去,当自旋一定次数的时候,还没有获取到锁,就会进入阻塞,该锁膨胀为重量级锁。重量级锁会让其他申请的线程进入阻塞,性能降低。

2、可重入锁/非可重入锁

(1)、可重入锁:指同一个线程在外层方法获取锁的时候,在进入内层方法申请同一锁时会自动获取该锁

(2)、非可重入锁:同情况下会形成死锁

3、共享锁/独占锁

(1)、共享锁:指某个锁可被多个线程所持有。

(2)、独享锁:指某个锁一次只能被一个线程所持有。

4、公平锁/非公平锁

(1)、公平锁:多个线程相互竞争时要排队,多个线程按照申请锁的顺序来获取锁。

(2)、非公平锁:多个线程相互竞争时,先尝试插队,插队失败再排队,比如:synchronized、ReentrantLock

5、乐观锁/悲观锁

(1)、乐观锁:乐观锁认为竞争不总是会发生,因此它不需要持有锁,将比较-替换这两个动作作为一个原子操作尝试去修改内存中的变量,如果失败则表示发生冲突,那么就应该有相应的重试逻辑。

(2)、悲观锁:悲观锁认为竞争总是会发生,因此每次对某资源进行操作时,都会持有一个独占的锁。

6、自旋锁/非自旋锁

(1)、自旋锁:当线程现在拿不到锁时,并不直接陷入阻塞或者释放 CPU 资源,而是开始利用循环,不停地尝试获取锁,这个循环过程被形象地比喻为“自旋”,就像是线程在“自我旋转”。

(2)、非自旋锁:即没有自旋的过程,如果拿不到锁就直接放弃,或者进行其他的处理逻辑,例如去排队、陷入阻塞等

1、CAS算法的理解:

(1)、CAS(Compare And Swap),即比较再替换。JDK5之前Java语言是靠synchronized关键字保证同步的,这是一种独占锁,也是悲观锁。JDK5增加了并发包java.util.concurrent.*,该类下采用CAS算法实现,CAS算法是一种区别于synchronouse同步锁的乐观锁。

(2)、CAS操作包含三个操作数,内存值V,旧的预期值A,要修改的新值B。当且仅当预期值A和内存值V相同时,才将内存值V修改为B并返回true,否则什么都不做并返回false(需要volatile变量配合)

2、CAS算法存在的问题:

(1)、CPU开销较大

(2)、不能保证代码块的原子性

(3)、ABA问题(最大问题)

ABA问题,即并发环境下,并发1在修改数据时,虽然还是A,但已经不是初始条件的A了,中间发生了A变B,B又变A的变化,此A已经非彼A,数据却成功修改,可能导致错误。

ABA问题的解决思路是给数据加一个版本号,每次更新后对其版本加1,这样在值变回A之后,其版本已不是原来的版本了。具体可参见jdk中的AtomicStampedReference

1、Atomic原子类:

Atomic (事务的四个特性ACID,其中A就是原子性) 指一个操作是不可中断的。即使是在多个线程一起执行的时候,一个操作一旦开始,就不会被其他线程干扰。所以,所谓原子类说简单点就是具有原子/原子操作特征的类。

原子类存放在java.util.concurrent.atomic下:

2、实现原理:

Atomic类主要利用CAS (Compare And Swap)算法、volatile变量与native方法来保证原子操作,从而避免synchronized的高开销,执行效率大为提升。

搜索

复制