在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待,基于AbstractQueuedSynchronizer实现,state初始化为count,每countDown一次减1直到等于0,unpark唤醒await线程
重要方法:
- await():调用此方法线程会被阻塞,直到count为0
- await(long timeout, TimeUnit unit):同await(),可以设置最大等待时间,如超过最大等待时间则不再等待
- countDown():count减1,直至为0
public static void main(String[] args) throws InterruptedException {
int studentCount = 10;
final CountDownLatch countDownLatch = new CountDownLatch(studentCount);
for (int i = 0; i < studentCount; i++) {
String studentName = String.valueOf(i);
new Thread(() -> {
try {
System.out.println("学生" + studentName + "正在考试……");
Thread.sleep(2000);
System.out.println("学生" + studentName + "已交卷");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//finally中进行计数器减1,防止发生异常计数失败
countDownLatch.countDown();
}
}).start();
}
System.out.println("等待所有学生交卷");
//如果30秒内还不能收齐试卷,默认为学生弃权,防止发生异常一直等待
countDownLatch.await(30, TimeUnit.SECONDS);
System.out.println("全部学生已经交卷,正在计算平均分");
}
为什么不使用ReentrantLock?
countDown不需要堵塞,只需要在最后一次count=0时去唤醒堵塞的主线程(await),AQS+LockSupport完全够用
计数信号量,用于控制特定资源在同一个时间被访问的个数,基于AbstractQueuedSynchronizer实现,支持公平和非公平信号量,默认非公平信号量,state初始化为permits
重要方法:
- acquire():从信号量获取1个许可,信号量内部计数器减1,如果没有许可,线程将一直阻塞
- acquire(int permits):从信号量获取permits个许可,在提供这些许可前,线程一直阻塞
- release():释放1个许可,将其返回给信号量,信号量内部计数器加1
- release(int permits):释放permits个许可
- availablePermits():当前可用的许可数
- tryAcquire():尝试地获得1个许可,如果获取不到则返回false
- tryAcquire(long timeout, TimeUnit unit):在指定的时间内尝试地获得1个许可,如果获取不到则返回false
- tryAcquire(int permits):尝试地获得permits个许可,如果获取不到则返回false
- tryAcquire(int permits, long timeout, TimeUnit unit):在指定的时间内尝试地获得permits个许可,如果获取不到则返回false
public static void main(String[] args) {
int carCount = 10;
int lot = 5;
Semaphore semaphore = new Semaphore(lot);
for (int i = 0; i < carCount; i++) {
final int x = i;
new Thread(() -> {
try {
System.out.println("来了一辆车" + x);
semaphore.acquire();
System.out.println("有车位," + x + "停车入位");
Thread.sleep(2000);
System.out.println(x + "离开车位");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}).start();
}
}
一个可循环使用(Cyclic)的屏障(Barrier),让一组线程到达一个屏障(同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会放行,所有被屏障拦截的线程继续执行。基于ReentrantLock+Condition实现,await后先lock,然后--count,不等于0就执行Condition.await。反之,重置count并执行Condition.signalAll唤醒所有堵塞线程
重要方法:
- await():在CyclicBarrier上进行阻塞等待,并使count减1
- await(long timeout, TimeUnit unit):在CyclicBarrier上进行限时的阻塞等待,并使count减1,当时间到达限定时间后,线程继续执行
- getParties():获取CyclicBarrier通过屏障的线程数量,也称为方数
- getNumberWaiting():获取正在CyclicBarrier上等待的线程数量
public static void main(String[] args) throws InterruptedException {
int passenger = 5;
final CyclicBarrier cyclicBarrier = new CyclicBarrier(passenger,
() -> System.out.println("乘客已经满5人,准备上车"));
for (int i = 0; i < passenger; i++) {
new Thread(() -> {
try {
System.out.println("乘客+1,等待满员");
cyclicBarrier.await();
System.out.println("乘客已上车");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
}
阶段器/多阶段栅栏,可以在初始时设定参与线程数,也可以中途注册/注销参与者,当到达的参与者数量满足栅栏设定的数量后,会进行阶段升级(advance),是一个可控制任务阶段执行且可重复使用的同步器,包含了CountDownLatch和CyclicBarrier的功能,比他们更加灵活、强大。通过自旋+synchronized实现注册时同步问题,LockSupport实现堵塞与唤醒
基本概念
上面图中的几点关键点:
state
通过state字段来实现同步逻辑,state是volatile修饰的64位long变量,它有包含了四个维度的语义:
使用样例
模拟CountDownLatch
public static void main(String[] args) {
int num = 6;
Phaser phaser = new Phaser(num);
for (int i = 0; i < num; i++) {
new Thread(() -> {
try {
Thread.sleep(200);
System.out.println(Thread.currentThread().getName() + " 已到达");
//表示当前线程已到达
phaser.arrive();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
phaser.awaitAdvance(phaser.getPhase());
System.out.println("大家都到达了");
}
模拟CyclicBarrier
public static void main(String[] args) {
int num = 6;
Phaser phaser = new Phaser(num){
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println("Phase " + phase + " 结束,目前有Parties:" + registeredParties + "个");
// 返回true表示需要终止Phaser,否则继续下一轮的phase
return registeredParties == 0;
}
};
for (int i = 0; i < num; i++) {
new Thread(() -> {
// 到达并等待其他线程到达
System.out.println(Thread.currentThread().getName() + " 已到达,等待其他线程到达");
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName() + " 开始执行任务");
}).start();
}
}
多阶段执行,并控制执行轮数Phase
public static void main(String[] args) {
// 最多执行3轮
int maxPhase = 3;
int num = 6;
Phaser phaser = new Phaser(num) {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
boolean flag = phase + 1 >= maxPhase || registeredParties == 0;
System.out.println("Phase " + phase + " 结束,目前有Parties:" + registeredParties + "个,阶段器是否停止:" + flag);
return flag;
}
};
for (int i = 0; i < num; i++) {
new Thread(() -> {
// phaser关闭前循环执行
while (!phaser.isTerminated()) {
// 到达并等待其他线程到达
System.out.println(Thread.currentThread().getName() + " 已到达,等待其他线程到达");
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName() + " 开始执行任务");
}
}).start();
}
}
支持分层功能,减小同一Phaser上同步开销
public static void main(String[] args) {
// 最多执行3轮
int maxPhase = 3;
Phaser parent = new Phaser() {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
boolean flag = phase + 1 >= maxPhase || registeredParties == 0;
// registeredParties等于2
System.out.println("Phase " + phase + " 结束,目前有Parties:" + registeredParties + "个,阶段器是否停止:" + flag);
return flag;
}
};
int num1 = 5;
final Phaser phaser1 = new Phaser(parent);
phaser1.bulkRegister(num1);
for (int i = 0; i < num1; i++) {
new Thread(() -> {
// phaser关闭前循环执行
while (!phaser1.isTerminated()) {
// 到达并等待其他线程到达
System.out.println(Thread.currentThread().getName() + " 已到达,等待其他线程到达");
phaser1.arriveAndAwaitAdvance();
}
}).start();
}
int num2 = 4;
final Phaser phaser2 = new Phaser(parent);
phaser2.bulkRegister(num2);
for (int i = 0; i < num2; i++) {
new Thread(() -> {
// phaser关闭前循环执行
while (!phaser2.isTerminated()) {
// 到达并等待其他线程到达
System.out.println(Thread.currentThread().getName() + " 已到达,等待其他线程到达");
phaser2.arriveAndAwaitAdvance();
}
}).start();
}
}
灵活调整parties
public static void main(String[] args) {
// 最多执行3轮
int maxPhase = 3;
int num = 6;
Phaser phaser = new Phaser() {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
boolean flag = phase + 1 >= maxPhase || registeredParties == 0;
System.out.println("Phase " + phase + " 结束,目前有Parties:" + registeredParties + "个,阶段器是否停止:" + flag);
return flag;
}
};
for (int i = 0; i < 2; i++) {
new Thread(() -> {
// 注册线:程parties+1
phaser.register();
// phaser关闭前循环执行
while (!phaser.isTerminated()) {
// 到达并等待其他线程到达
System.out.println(Thread.currentThread().getName() + " 已到达,等待其他线程到达");
phaser.arriveAndAwaitAdvance();
}
}).start();
}
// 增加等待的线程数
phaser.bulkRegister(num - 2);
for (int i = 0; i < num - 2; i++) {
new Thread(() -> {
// 到达并等待其他线程到达
System.out.println(Thread.currentThread().getName() + " 已到达,等待其他线程到达");
// 达到后注销该线程
phaser.arriveAndDeregister();
}).start();
}
}
参考:
交换器,用于两个线程之间的数据交换,如下图:
每个线程通过ThreadLocal保存自己的Node信息,通过成功抢占slot的Node节点中item、match进行数据交换
为什么slot占用失败的线程需要CAS修改slot=null?
- 可能存在多线程竞争,所以需要使用CAS
- slot=null代表新一轮的开始
堵塞/唤醒使用LockSupport
多槽位交换:同时出现了多个配对线程竞争修改slot槽位,导致某个线程CAS修改slot失败时,就会初始化arena多槽数组,后续所有的交换都会走arenaExchange,竞争分散到不同的槽位
代码示例
public static void main(String[] args) {
final Exchanger
Random random = new Random();
for (int i = 0; i < 2; i++) {
new Thread(() -> {
while (true) {
try {
Thread.sleep(random.nextInt(1000));
Integer v = random.nextInt(100);
System.out.println(Thread.currentThread().getName() + " 生产数据:" + v);
Integer message = exchanger.exchange(v);
System.out.println(Thread.currentThread().getName() + " 交换得到数据:" + message);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
手机扫一扫
移动阅读更方便
你可能感兴趣的文章