Java并发编程之同步辅助类
阅读原文时间:2023年07月10日阅读:3

在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待,基于AbstractQueuedSynchronizer实现,state初始化为count,每countDown一次减1直到等于0,unpark唤醒await线程

重要方法:

  • await():调用此方法线程会被阻塞,直到count为0
  • await(long timeout, TimeUnit unit):同await(),可以设置最大等待时间,如超过最大等待时间则不再等待
  • countDown():count减1,直至为0
public static void main(String[] args) throws InterruptedException {
  int studentCount = 10;
  final CountDownLatch countDownLatch = new CountDownLatch(studentCount);
  for (int i = 0; i < studentCount; i++) {
    String studentName = String.valueOf(i);
    new Thread(() -> {
      try {
        System.out.println("学生" + studentName + "正在考试……");
        Thread.sleep(2000);
        System.out.println("学生" + studentName + "已交卷");
      } catch (InterruptedException e) {
        e.printStackTrace();
      } finally {
        //finally中进行计数器减1,防止发生异常计数失败
        countDownLatch.countDown();
      }
    }).start();
  }
  System.out.println("等待所有学生交卷");
  //如果30秒内还不能收齐试卷,默认为学生弃权,防止发生异常一直等待
  countDownLatch.await(30, TimeUnit.SECONDS);
  System.out.println("全部学生已经交卷,正在计算平均分");
}

为什么不使用ReentrantLock?

countDown不需要堵塞,只需要在最后一次count=0时去唤醒堵塞的主线程(await),AQS+LockSupport完全够用

计数信号量,用于控制特定资源在同一个时间被访问的个数,基于AbstractQueuedSynchronizer实现,支持公平和非公平信号量,默认非公平信号量,state初始化为permits

重要方法:

  • acquire():从信号量获取1个许可,信号量内部计数器减1,如果没有许可,线程将一直阻塞
  • acquire(int permits):从信号量获取permits个许可,在提供这些许可前,线程一直阻塞
  • release():释放1个许可,将其返回给信号量,信号量内部计数器加1
  • release(int permits):释放permits个许可
  • availablePermits():当前可用的许可数
  • tryAcquire():尝试地获得1个许可,如果获取不到则返回false
  • tryAcquire(long timeout, TimeUnit unit):在指定的时间内尝试地获得1个许可,如果获取不到则返回false
  • tryAcquire(int permits):尝试地获得permits个许可,如果获取不到则返回false
  • tryAcquire(int permits, long timeout, TimeUnit unit):在指定的时间内尝试地获得permits个许可,如果获取不到则返回false
public static void main(String[] args) {
  int carCount = 10;
  int lot = 5;
  Semaphore semaphore = new Semaphore(lot);
  for (int i = 0; i < carCount; i++) {
    final int x = i;
    new Thread(() -> {
      try {
        System.out.println("来了一辆车" + x);
        semaphore.acquire();
        System.out.println("有车位," + x + "停车入位");
        Thread.sleep(2000);
        System.out.println(x + "离开车位");
      } catch (InterruptedException e) {
        e.printStackTrace();
      } finally {
        semaphore.release();
      }
    }).start();
  }
}

一个可循环使用(Cyclic)的屏障(Barrier),让一组线程到达一个屏障(同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会放行,所有被屏障拦截的线程继续执行。基于ReentrantLock+Condition实现,await后先lock,然后--count,不等于0就执行Condition.await。反之,重置count并执行Condition.signalAll唤醒所有堵塞线程

重要方法:

  • await():在CyclicBarrier上进行阻塞等待,并使count减1
  • await(long timeout, TimeUnit unit):在CyclicBarrier上进行限时的阻塞等待,并使count减1,当时间到达限定时间后,线程继续执行
  • getParties():获取CyclicBarrier通过屏障的线程数量,也称为方数
  • getNumberWaiting():获取正在CyclicBarrier上等待的线程数量
public static void main(String[] args) throws InterruptedException {
  int passenger = 5;
  final CyclicBarrier cyclicBarrier = new CyclicBarrier(passenger,
    () -> System.out.println("乘客已经满5人,准备上车"));
  for (int i = 0; i < passenger; i++) {
    new Thread(() -> {
      try {
        System.out.println("乘客+1,等待满员");
        cyclicBarrier.await();
        System.out.println("乘客已上车");
      } catch (InterruptedException e) {
        e.printStackTrace();
      } catch (BrokenBarrierException e) {
        e.printStackTrace();
      }
    }).start();
  }
}

阶段器/多阶段栅栏,可以在初始时设定参与线程数,也可以中途注册/注销参与者,当到达的参与者数量满足栅栏设定的数量后,会进行阶段升级(advance),是一个可控制任务阶段执行且可重复使用的同步器,包含了CountDownLatch和CyclicBarrier的功能,比他们更加灵活、强大。通过自旋+synchronized实现注册时同步问题,LockSupport实现堵塞与唤醒

基本概念

  • parties(参与者):参与线程的个数,跟CountDownLatch或者CyclicBarrier的构造方法的参数的含义是一样的,不同Phaser提供了调整的方法
  • register / deregister : register通知Phaser参与等待的线程数增加了,deregister通知Phaser参与等待的线程数减少了,然后相应调整parties
  • arrive / advance:arrive跟CyclicBarrier中到达栅栏是一个意思,当所有parties个线程都arrive了,则触发advance
  • phase:表示执行任务的阶段,初始值是0,每一次advance都会将该值加1,最大值是Integer.MAX_VALUE;如果Phaser被终止了,则该值为负数,此时所有的register、arrive或者await操作都会立即返回
  • 父子Phaser:父子Phaser一方面可以避免parties线程过多时导致cas修改state容易失败,另一方面可以基于父子Phaser实现复杂的执行任务的阶段控制。
    • 子Phaser的parties线程可以有多个,但是对于父Phaser只有一个
    • 只有子Phaser所有的parties线程都到达的时候才通知父Phaser当前子Phaser已到达
    • 只有子Phaser所有的parties线程都被注销(deregister)了才会向父Phaser注销当前子Phaser
    • Phaser有root、parent两个属性,在多级父子Phaser下,所有的Phaser的root属性都指向同一个祖先Phaser,调用internalAwaitAdvance方法时也是在root Phaser上调用。即所有的子Phaser都共享祖先Phaser的等待线程链表,从而实现最后一个到达的子Phaser可以唤醒其他子Phaser关联的等待线程

上面图中的几点关键点:

  • 树的根结点root链接着两个“无锁栈”,用于保存等待线程(比如当线程等待Phaser进入下一阶段时,会根据当前阶段的奇偶性,把自己挂到某个栈中),所有Phaser对象都共享这两个栈。
  • 当首次将某个Phaser结点链接到树中时,会同时向该结点的父结点注册一个参与者(子phaser实际只承担了维护state的任务)

state

通过state字段来实现同步逻辑,state是volatile修饰的64位long变量,它有包含了四个维度的语义:

  • 低16位,当前未到达的parties,调用arriveXXX时,该值-1,调用register时+1
  • 中16位,当前总parties,调用register时+1,deRegister时-1
  • 高32位,phase,即Phaser的年龄,当未到达的parties减到0(即所有parties已到达)时,phase自动加1,并且把16-31位的parties数复制到0-15位,从而该Phaser可以继续复用

使用样例

  • 模拟CountDownLatch

    public static void main(String[] args) {
    int num = 6;
    Phaser phaser = new Phaser(num);
    for (int i = 0; i < num; i++) { new Thread(() -> {
    try {
    Thread.sleep(200);
    System.out.println(Thread.currentThread().getName() + " 已到达");
    //表示当前线程已到达
    phaser.arrive();
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }).start();
    }
    phaser.awaitAdvance(phaser.getPhase());
    System.out.println("大家都到达了");
    }

  • 模拟CyclicBarrier

    public static void main(String[] args) {
    int num = 6;
    Phaser phaser = new Phaser(num){
    @Override
    protected boolean onAdvance(int phase, int registeredParties) {
    System.out.println("Phase " + phase + " 结束,目前有Parties:" + registeredParties + "个");
    // 返回true表示需要终止Phaser,否则继续下一轮的phase
    return registeredParties == 0;
    }
    };
    for (int i = 0; i < num; i++) { new Thread(() -> {
    // 到达并等待其他线程到达
    System.out.println(Thread.currentThread().getName() + " 已到达,等待其他线程到达");
    phaser.arriveAndAwaitAdvance();
    System.out.println(Thread.currentThread().getName() + " 开始执行任务");
    }).start();
    }
    }

  • 多阶段执行,并控制执行轮数Phase

    public static void main(String[] args) {
    // 最多执行3轮
    int maxPhase = 3;
    int num = 6;
    Phaser phaser = new Phaser(num) {
    @Override
    protected boolean onAdvance(int phase, int registeredParties) {
    boolean flag = phase + 1 >= maxPhase || registeredParties == 0;
    System.out.println("Phase " + phase + " 结束,目前有Parties:" + registeredParties + "个,阶段器是否停止:" + flag);
    return flag;
    }
    };
    for (int i = 0; i < num; i++) { new Thread(() -> {
    // phaser关闭前循环执行
    while (!phaser.isTerminated()) {
    // 到达并等待其他线程到达
    System.out.println(Thread.currentThread().getName() + " 已到达,等待其他线程到达");
    phaser.arriveAndAwaitAdvance();
    System.out.println(Thread.currentThread().getName() + " 开始执行任务");
    }
    }).start();
    }
    }

  • 支持分层功能,减小同一Phaser上同步开销

    public static void main(String[] args) {
    // 最多执行3轮
    int maxPhase = 3;
    Phaser parent = new Phaser() {
    @Override
    protected boolean onAdvance(int phase, int registeredParties) {
    boolean flag = phase + 1 >= maxPhase || registeredParties == 0;
    // registeredParties等于2
    System.out.println("Phase " + phase + " 结束,目前有Parties:" + registeredParties + "个,阶段器是否停止:" + flag);
    return flag;
    }
    };
    int num1 = 5;
    final Phaser phaser1 = new Phaser(parent);
    phaser1.bulkRegister(num1);
    for (int i = 0; i < num1; i++) { new Thread(() -> {
    // phaser关闭前循环执行
    while (!phaser1.isTerminated()) {
    // 到达并等待其他线程到达
    System.out.println(Thread.currentThread().getName() + " 已到达,等待其他线程到达");
    phaser1.arriveAndAwaitAdvance();
    }
    }).start();
    }

    int num2 = 4;
    final Phaser phaser2 = new Phaser(parent);
    phaser2.bulkRegister(num2);
    for (int i = 0; i < num2; i++) {
        new Thread(() -> {
            // phaser关闭前循环执行
            while (!phaser2.isTerminated()) {
                // 到达并等待其他线程到达
                System.out.println(Thread.currentThread().getName() + " 已到达,等待其他线程到达");
                phaser2.arriveAndAwaitAdvance();
            }
        }).start();
    }

    }

  • 灵活调整parties

    public static void main(String[] args) {
    // 最多执行3轮
    int maxPhase = 3;
    int num = 6;
    Phaser phaser = new Phaser() {
    @Override
    protected boolean onAdvance(int phase, int registeredParties) {
    boolean flag = phase + 1 >= maxPhase || registeredParties == 0;
    System.out.println("Phase " + phase + " 结束,目前有Parties:" + registeredParties + "个,阶段器是否停止:" + flag);
    return flag;
    }
    };
    for (int i = 0; i < 2; i++) { new Thread(() -> {
    // 注册线:程parties+1
    phaser.register();
    // phaser关闭前循环执行
    while (!phaser.isTerminated()) {
    // 到达并等待其他线程到达
    System.out.println(Thread.currentThread().getName() + " 已到达,等待其他线程到达");
    phaser.arriveAndAwaitAdvance();
    }
    }).start();
    }
    // 增加等待的线程数
    phaser.bulkRegister(num - 2);
    for (int i = 0; i < num - 2; i++) { new Thread(() -> {
    // 到达并等待其他线程到达
    System.out.println(Thread.currentThread().getName() + " 已到达,等待其他线程到达");
    // 达到后注销该线程
    phaser.arriveAndDeregister();
    }).start();
    }
    }

参考:

  • 交换示意图

每个线程通过ThreadLocal保存自己的Node信息,通过成功抢占slot的Node节点中item、match进行数据交换

  • 完整流程

  • 为什么slot占用失败的线程需要CAS修改slot=null?

    • 可能存在多线程竞争,所以需要使用CAS
    • slot=null代表新一轮的开始
  • 堵塞/唤醒使用LockSupport

  • 多槽位交换:同时出现了多个配对线程竞争修改slot槽位,导致某个线程CAS修改slot失败时,就会初始化arena多槽数组,后续所有的交换都会走arenaExchange,竞争分散到不同的槽位

  • 代码示例

    public static void main(String[] args) {
    final Exchanger exchanger = new Exchanger<>();
    Random random = new Random();
    for (int i = 0; i < 2; i++) { new Thread(() -> {
    while (true) {
    try {
    Thread.sleep(random.nextInt(1000));
    Integer v = random.nextInt(100);
    System.out.println(Thread.currentThread().getName() + " 生产数据:" + v);
    Integer message = exchanger.exchange(v);
    System.out.println(Thread.currentThread().getName() + " 交换得到数据:" + message);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    }).start();
    }
    }