并发编程之:JUC并发控制工具
阅读原文时间:2021年09月08日阅读:3

大家好,我是小黑,一个在互联网苟且偷生的农民工。

在上一期我们讲了Thread.join()方法和CountDownLatch,这两者都可以做到等待一个线程执行完毕之后当前线程继续执行,并且CountDownLatch要更优秀,能满足同时等待多个线程执行,我们通过查看源码知道CountDownLatch是通过AQS实现的。

那么在java.util.concurrent包中除了像CountDownLatch这样的并发控制工具外,还有哪些呢?今天带大家一起来看一看。

等待一个或多个线程直到线程中执行的一组操作完成的同步辅助工具。

CountDownLatch从字面理解为“计数器“,回顾昨天的内容,CountDownLatch可以实现等待其他线程执行,并且可以指定等待时间。

举个例子,比如有一个考试,在开考之前老师要等学生到考场,如果所有学生都提前到达,老师可以提前发试卷,但是如果到考试时间有学生还没有到,老师则可以不等,直接开始,我们通过CountDownLatch来模拟一下。

public class CountDownLatchDemo {
    public static void main(String[] args) throws InterruptedException {

        CountDownLatch count = new CountDownLatch(5);
        for (int i = 1; i <= 5; i++) {
            new Student("学生" + i, count).start();
        }
        // 只等待5秒,5秒之后开始发试卷
        count.await(5, TimeUnit.SECONDS);
        System.out.println("所有学生已到达,老师开始发卷子~");
    }
}

class Student extends Thread {
    private final CountDownLatch count;

    public Student(String name, CountDownLatch count) {
        super(name);
        this.count = count;
    }
    @Override
    public void run() {
        try {
            TimeUnit.SECONDS.sleep(2);
            System.out.println(Thread.currentThread().getName() + "到达考场~");
            count.countDown();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

首先我们看一下代码,主线程中老师等待的时间是5秒,所以5秒中之后开始考试;每个Student在run方法中会sleep() 2秒模拟每个学生到达花费的时间最少是2秒;

我们从结果来看,5个学生都在老师开发发卷子之前到达了考场,说明5个Student到达考场的时间并没有超过5秒,所以肯定的是这5个线程不是串行执行的;

老师在等到之后确实开始考试了;如果把老师等待的时间往小调整,或者增大某个线程到达考场的时间,会发现会在到达考场之前开始发卷子,篇幅原因这里就不放代码了。通过这个例子想必你已经掌握了CountDownLatch的用法。

一个计数信号量。 在概念上,信号量维持一组许可证。

Semaphore字面意思翻译是信号量。信号量通常用于限制线程数量,而不是限制访问某些共享资源。

我们还是通过生活中的例子来模拟,比如说,我们去座摩天轮,一个摩天轮上能容纳的游客人数是固定的,所以在有人要上去之前需要先看是否还有剩余的空位,如果有则放行,如果没有则让游客等待,直到有人从摩天轮上离开。我们使用Semaphore来模拟这个场景。

public class SemaphoreDemo {

    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(3, true);
        for (int i = 1; i <= 5; i++) {
            new Visitor("游客" + i, semaphore).start();
        }
    }

}

class Visitor extends Thread {
    private Semaphore semaphore;

    public Visitor(String name, Semaphore semaphore) {
        super(name);
        this.semaphore = semaphore;
    }

    @Override
    public void run() {
        try {
            semaphore.acquire();
            System.out.println(LocalDateTime.now() + Thread.currentThread().getName() + "坐上了摩天轮,真开心~");
            TimeUnit.SECONDS.sleep(2);
            System.out.println(LocalDateTime.now() + Thread.currentThread().getName() + "从摩天轮下来了-------");
            semaphore.release();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

从运行结果我们可以看出,游客1,2,3同时坐上去,但是游客4,5这时候是没有坐上去的,只能等有人下来之后,游客4,5才能再上去。因为我们在创建Semaphore时只给了3个许可,当1,2,3占用之后,4,5获取不到,只能等待许可再次可用时才能获取。

我们来看看Semaphore都有哪些方法。

构造方法

// 指定许可数量
new Semaphore(3);
// 指定许可数量,同时设置等待线程用公平的方式获取许可
new Semaphore(3, true);
// 指定许可数量,同时设置等待线程用非公平的方式获取许可,默认为false
new Semaphore(3, false);

成员方法

// 获取许可,默认只获取1个,阻塞直到获取成功,或线程中断interrupted
semaphore.acquire();
// 获取给定数量的许可,阻塞直到获取成功,或线程中断interrupted
semaphore.acquire(2);
// 和acquire()一样,但是不可以被中断
semaphore.acquireUninterruptibly();
// 和acquire(2)一样,但是不可以被中断
semaphore.acquireUninterruptibly(2);
// 尝试获取1个许可,如果成功则返回true,失败则立马返回false
semaphore.tryAcquire();
// 尝试获取给定数量的许可,如果成功则返回true,失败则立马返回false
semaphore.tryAcquire(2);
// 尝试获取1个许可知道超时,如果获取成功返回true,反之返回false
semaphore.tryAcquire(5, TimeUnit.SECONDS);
// 尝试获取给定数量的许可知道超时,如果获取成功返回true,反之返回false
semaphore.tryAcquire(2, 1, TimeUnit.SECONDS);
// 释放1个许可
semaphore.release();
// 释放给定数量的许可
semaphore.release(2);

如果看过我之前AQS源码解析的朋友应该能猜到,Semaphore的底层也是通过AQS来实现的,是使用AQS的共享锁相关的实现。

感兴趣的同学可以回顾这篇文章。

允许一组线程全部等待彼此达到共同屏障点的同步辅助工具。

CyclicBarrier从字面理解为“循环栅栏”,可以理解为一个可以循环使用的屏障。它的作用就是等待一组线程都完成执行后再进行下一步。

老样子,我们再来举个例子(我咋有这么多例子可把我牛逼坏了)。

我们进场会和朋友聚餐,那江湖规矩,要等大家都到了,才能开始吃,等大家都吃的差不多了,大家一起散场。有没有发现可上面考试的例子有点像。

我们来使用CyclicBarrier来模拟一下这个场景。

public class CyclicBarrierDemo {

    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
        for (int i = 1; i <= 3; i++) {
            new BBQer("吃货" + i, cyclicBarrier).start();
        }
    }
}

class BBQer extends Thread {
    CyclicBarrier cyclicBarrier;

    public BBQer(String name, CyclicBarrier cyclicBarrier) {
        super(name);
        this.cyclicBarrier = cyclicBarrier;
    }

    @Override
    public void run() {

        try {
            TimeUnit.SECONDS.sleep(1);
            System.out.println(LocalDateTime.now() + Thread.currentThread().getName() + "已到达战场~");
            // 等待其他人到场
            cyclicBarrier.await();
            System.out.println(LocalDateTime.now() + Thread.currentThread().getName() + "已饥饿难耐了,开始战斗~");
            TimeUnit.SECONDS.sleep(1);
            // 等待其他人吃完
            cyclicBarrier.await();
            System.out.println(LocalDateTime.now() + Thread.currentThread().getName() + "吃饱喝足,回家睡觉");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

通过这里例子大家应该明白CyclicBarrier的使用场景了吧,主要体现在可以循环上,这一点和CountDownLatch有很大的区别,CountDownLatch是一个计数器,只能有一次计数完成,之后不能再继续归零计算了。而CyclicBarrier可以循环设置这个屏障。

我们再来看一下CyclicBarrier都有哪些常用的方法。

构造方法

// 创建有指定个数线程的循环屏障
new CyclicBarrier(3);
// 创建有指定个数线程的循环屏障,在所有线程到达屏障后,运行Runnable的run()方法
new CyclicBarrier(3, Runnable);

成员方法

// 等其他所有线程到达
cyclicBarrier.await();
// 等所有线程到达,超时只有放弃等待
cyclicBarrier.await(1, TimeUnit.SECONDS);
// 获取当前正在等待的线程数
cyclicBarrier.getNumberWaiting();
// 重置等待状态到初始状态
cyclicBarrier.reset();

一个可重复使用的同步屏障,功能类似于CyclicBarrier和CountDownLatch但支持更灵活的使用。

Phaser从字面意思可以理解为”阶段器“。通过上面这段话可以感觉到要比CyclicBarrier和CountDownLatch更牛逼一些,更加的灵活。

我们这次不重新举新的例子,还用上面的吃饭的例子,加入说我们上面吃饭的例子,如果说在大家开始吃的时候,另一个朋友打电话说他也要来,这时候总不能不让来吧,应该让他来和我们一起吃,并且吃完一起走。

而这种场景通过上面说到的CountDownLatch,CyclicBarrier还是Semaphore都是不能做到的,我们来看看使用Phaser如何解决。

public class PhaserDemo {

    public static void main(String[] args) throws InterruptedException {
        // 刚开始饭局是3个人
        Phaser phaser = new Phaser(3);
        for (int i = 1; i <= 3; i++) {
            new Foodie("吃货" + i, phaser).start();
        }
        TimeUnit.SECONDS.sleep(2);
        phaser.register();
        new Foodie("新来的", phaser).start();
        System.out.println("饭局人数:" + phaser.getRegisteredParties());
    }
}

class Foodie extends Thread {
    private Phaser phaser;

    public Foodie(String name, Phaser phaser) {
        super(name);
        this.phaser = phaser;
    }

    @Override
    public void run() {
        try {
            phaseOne();
            phaseTwo();
            phaseThree();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void phaseOne() throws InterruptedException {
        // 新来的不用等其他人
        if (Thread.currentThread().getName().equals("新来的")) {
            return;
        }
        TimeUnit.SECONDS.sleep(1);
        System.out.println(LocalDateTime.now() + Thread.currentThread().getName() + "已到达战场~");
        // 到达饭局并加入等待
        phaser.arriveAndAwaitAdvance();
    }

    public void phaseTwo() throws InterruptedException {
        System.out.println(LocalDateTime.now() + Thread.currentThread().getName() + "已饥饿难耐了,开始战斗~");
        TimeUnit.SECONDS.sleep(2);
        System.out.println(LocalDateTime.now() + Thread.currentThread().getName() + "吃完了~");
        phaser.arriveAndAwaitAdvance();
    }

    public void phaseThree() {
        System.out.println(LocalDateTime.now() + Thread.currentThread().getName() + "回家睡觉");
        phaser.arriveAndDeregister();
    }
}

从运行结果我们可以看到,刚开始1,2,3已经开始战斗了(完成了第一个阶段,进入第二个阶段),这是后来了个新朋友加入,那饭局的人数变成了4人,然后再逐步完成后面的阶段。

同样我们来看一下Phaser的方法。

构造方法

// 创建一个阶段器,没有注册方,没有父级和初始阶段
Phaser()
// 创建一个阶段器,指定注册方数量
Phaser(int parties)
// 相当于 Phaser(Phaser parent, 0)
Phaser(Phaser parent)
// 创建一个阶段器,通过给定的父级阶段器和给定的注册方数
Phaser(Phaser parent, int parties)

成员方法

// 抵达这个阶段,并不等待别人到达
arrive()
// 到达并阻塞等待其他到达
arriveAndAwaitAdvance()
// 到达并注销
arriveAndDeregister()
// 等待从指定阶段进入,如果不在该阶段或阶段器已终止则立即返回
awaitAdvance(int phase)
//同awaitAdvance(int phase),可被中断
awaitAdvanceInterruptibly(int phase)
//同awaitAdvance(int phase),可被中断,可超时
awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit)
// 添加一个新的未到达节点
register()
// 添加指定个数新的未到达节点
bulkRegister(int parties)
// 强制终止阶段器
forceTermination()
// 获取到达数量
getArrivedParties()
// 获取父阶段器
getParent()
// 获取根阶段器
getRoot()
// 返回当前阶段数
getPhase()
// 返回已注册了的节点
getRegisteredParties()
// 返回未到达的节点
getUnarrivedParties()
// 判断是否终止
isTerminated()
//在即将进行的节点提前执行动作的可覆盖方法,并控制终止。
onAdvance(int phase, int registeredParties)

我们来简单总结一下,今天主要介绍JUC包中的线程同步工具。

CountDownLatch:主要用于计数,可完成等待多个线程执行,计数器每次减1,减到0之后释放等待线程;归零后无法重置,不可重复利用;

Semaphore:通常用于限制资源访问,如限流,通过acquire()获取后加1,release()之后减1,没有许可时获取会阻塞;

CyclicBarrier:循环屏障,相比CountDownLatch,await()方法每次加1,加到指定值释放等待线程;加到指定值之后会重置,可循环利用;

Phaser:支持CountDownLatch和CyclicBarrier的功能,可以做到替换,并且可以动态的添加或减少设定值。


好的,本期内容就到这里,我们下期见,关注我的公众号【小黑说Java】,更多干货内容。