15.AQS的今生,构建出JUC的基础
阅读原文时间:2023年07月10日阅读:1

大家好,我是王有志,欢迎和我聊技术,聊漂泊在外的生活。快来加入我们的Java提桶跑路群:共同富裕的Java人

AQS的前世,从1990年的论文说起》中我们已经对AQS做了简单的介绍,并学习了先于AQS出现的3种基于排队思想的自旋锁。今天我们深入到AQS的设计中,探究Doug Lea是如何构建JUC框架基础组件的。不过在正式开始前,我们先来回顾上一篇中提到的面试题:

  • 原理相关:AQS是什么?它是怎样实现的?

  • 设计相关:如何使用AQS实现Mutex?

希望今天可以帮你解答上面的问题。

Tips

《The java.util.concurrent Synchronizer Framework》中清晰的阐述了Doug Lea设计AQS的目的:

This framework provides common mechanics for atomically managing synchronization state, blocking and unblocking threads, and queuing.

(AQS)框架为同步状态的原子性管理,线程的阻塞和唤醒以及排队提供了一种通用的机制。也就是说,可以通过AQS去构建不同的同步器,如:基于AQS而诞生的ReentrantLock

基于构建通用同步机制的目的,Doug Lea分析了各种同步器,总结出它们共同的特性:

  • acquire操作:阻塞调用线程,直到同步状态允许其继续执行;

  • release操作:改变同步状态,唤醒被阻塞的线程。

除此之外,论文中也提到了对AQS的性能要求,Doug Lea认为大家在分析synchronized时提到的2个问题:

  • 如何最小化空间开销(因为任意Java对象都可以作为锁)

  • 如何最小化单核处理器的单线程环境下的时间开销

都不是AQS要考虑的,他认为AQS需要考虑的是scalability(可伸缩性),即大部分场景中,即便存在竞争,也能提供稳定的效率。原文中是这样描述的:

Among the main goals is to minimize the total amount of time during which some thread is permitted to pass a synchronization point but has not done so.

(AQS)主要目标之一是使某一线程被允许通过同步点但还没有通过的情况下耗费的总时间最少,即从一个线程释放锁开始,到另一个线程获取锁,这个过程锁消耗的时间。

Doug Lea先是完成了acquire操作和release操作的伪代码设计:

// acquire操作
while (synchronization state does not allow acquire) {
    enqueue current thread if not already queued;
    possibly block current thread;
}
dequeue current thread if it was queued;

// release操作
update synchronization state;
if (state may permit a blocked thread to acquire)
    unblock one or more queued threads;

为了实现上述的操作,需要以下组件的协同工作:原子管理的同步状态,线程的阻塞与唤醒,以及队列

同步状态

AQS使用volatile修饰的int类型变量state保存同步状态,并提供getStatesetStatecompareAndSetState方法。

AQS中,state不仅用作表示同步状态,也是某些同步器实现的计数器,如:Semaphore中允许通过的线程数量,ReentrantLock中可重入特性的实现,都依赖于state作为计数器的特性。

早期,Java对long类型变量的原子操作需要借助内置锁来完成,性能较差,并且除了CyclicBarrier外,其余同步器使用32位的int类型已经能够满足需求,因此在AQS诞生初期,state可以使用int类型。

Tips

  • CyclicBarrier通过锁来实现;

  • Java 1.6中提供了使用long类型的AbstractQueuedLongSynchronizer

  • 注意要区别同步状态与线程在队列中的状态。

阻塞与唤醒

早期,线程的阻塞与唤醒只能通过Thread.suspendThread.resume实现,但存在竞态问题,即一个线程先调用了Thread.resume后调用Thread.suspend,那么Thread.resume不会产生任何作用。

AQS使用LockSupport.parkLockSupport.unpark实现阻塞与唤醒,特点是如果LockSupport.unpark发生在LockSupport.park前,则此次的LockSupport.park无效。

Tips:无论提前调用多少次LockSupport.unpark,都只会使后一次LockSupport.park无效。

CLH队列

队列的设计是构建AQS的关键,Doug Lea在论文中使用“The heart of”来形容:

The heart of the framework is maintenance of queues of blocked threads, which are restricted here to FIFO queues.

Doug Lea参考了CLH的设计, 保留了基本的设计,由前驱节点做阻塞与唤醒的控制,但是在队列的选择上做出了改变,AQS选择双向链表来实现队列,节点中添加了prevnext指针。

添加prev指针主要是为了实现取消功能,而next指针的加入可以方便的实现唤醒后继节点。

再次强调,本文基于Java 11完成,与Java 8的源码存在差异,如,操作同步状态state时,Java 8借助了UnSafe,而Java 11中使用了VarHandle。另外,本文只讨论AQS的独占(EXCLUSIVE)模式因此会跳过共享(SHARED)模式

队列的结构

有了《AQS的前世,从1990年的论文说起》的铺垫,再结合Doug Lea论文中的描述,我们可以很容易想象到AQS中队列节点的结构:线程状态,前驱节点指针,后继节点指针以及用于保存线程的变量。事实也和我们的猜想十分接近:

static final class Node {

  volatile int waitStatus;

  volatile Node prev;

  volatile Node next;

  volatile Thread thread;

  Node nextWaiter;
}

注意,NodewaitStatus表示线程在队列中的状态,AQS的state表示同步器的状态。Node中定义了waitStatus的5种状态:

  • CANCELLED:1,线程获取锁的请求已经取消;

  • SIGNAL :-1,节点释放后,需要唤醒后继节点

  • CONDITION:-2,节点处于条件队列中;

  • PROPAGATE:-3,共享(SHARED)模式下使用;

  • 0,初始化Node时的默认值。

AQS的实现中,并不是后继节点“监听”前驱节点的状态,来决定自身是否持有锁,而是通过前驱节点释放锁,并主动唤醒后继节点来实现排队的

AQS的结构

AQS的结构就更加简单了:

private transient volatile Node head;
private transient volatile Node tail;
private volatile int state;

static final long SPIN_FOR_TIMEOUT_THRESHOLD = 1000L;

总共4个成员变量,除了我们意料之中的,队列的头尾节点和AQS的同步状态外,还有SPIN_FOR_TIMEOUT_THRESHOLD。看名字会有些误解,以为是自旋的阈值,实际上并不是,AQS提供了带有超时时间的方法,例如doAcquireNanos方法:

private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
  final long deadline = System.nanoTime() + nanosTimeout;
  final Node node = addWaiter(Node.EXCLUSIVE);
  for (;;) {
    final Node p = node.predecessor();
    nanosTimeout = deadline - System.nanoTime();
    if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD) {
      LockSupport.parkNanos(this, nanosTimeout);
    }
}

可以看到只有在剩余的nanosTimeout大于SPIN_FOR_TIMEOUT_THRESHOLD时,才会调用LockSupport.parkNanos(this, nanosTimeout)

Tips

  • Java 11中,无论是AQS还是Node中都使用了VarHandle,定义了大量的成员变量,我们跳过这部分;

  • 删除了doAcquireNanos方法中大部分内容,重点展示nanosTimeoutSPIN_FOR_TIMEOUT_THRESHOLD的关系。

获取锁

如果是你,你会如何设计AQS的加锁过程?

我可能会“按部就班”的构建队列,并将等待线程逐个的加入的队列中。那Doug Lea是如何设计AQS加锁过程的呢?

public final void acquire(int arg) {
  if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    selfInterrupt();
}

acquire方法中,Doug Lea设计了4步操作,如果仅从名字来看,首先tryAcquire尝试获取锁,如果获取失败,则通过addWaiter加入等待,然后调用acquireQueued方法进入排队状态,最后是通过调用selfInterrupt方法使当前线程中断。先来看AQS中的tryAcquire方法。

protected boolean tryAcquire(int arg) {
  throw new UnsupportedOperationException();
}

AQS中并未给出任何实现,它要求子类必须重写tryAcquire方法,否则抛出异常。

addWaiter方法

接着是addWaiter方法:

private Node addWaiter(Node mode) {
  // 注释1:创建节点,通过acquire进入时mode = Node.EXCLUSIVE
  Node node = new Node(mode);
  for (;;) {
    // 注释2:获取尾节点
    Node oldTail = tail;
    if (oldTail != null) {
      // 注释5:添加新的尾节点
      node.setPrevRelaxed(oldTail);
      if (compareAndSetTail(oldTail, node)) {
        oldTail.next = node;
        return node;
      }
    } else {
      // 注释3:尾节点为空则初始化队列
      initializeSyncQueue();
    }
  }
}

static final class Node {
  Node(Node nextWaiter) {
    this.nextWaiter = nextWaiter;
    // 可以看做是:this.thread = Thread.currentThread()
    THREAD.set(this, Thread.currentThread());
  }
}

private final void initializeSyncQueue() {
  Node h;
  // 注释4:创建空节点,作为尾节点
  if (HEAD.compareAndSet(this, null, (h = new Node())))
    tail = h;
}

addWaiter的逻辑并不复杂:

  • 注释1:创建节点node

  • 注释2:获取AQS的尾节点oldTail,并判断是否存在尾节点;

  • 注释3:初始化队列;

  • 注释4:创建空节点h,作为AQS的头尾节点;

  • 注释5:更新AQS的尾节点为node,并与oldTail关联。

我们知道,只有tryAcquire失败后,才会调用addWaiter方法,也就是说,如果实现了tryAcquire获取锁的逻辑,那么在没有竞争的场景下,AQS就不会构建等待队列

另外我注意到在初始化队列时,Doug Lea为等待队列添加了一个空的头节点,我的理解是,这里使用了处理链表的常用技巧:虚拟头节点

回过头来看addWaiter做了什么?它的核心功能是初始化的等待队列,并返回当前队列的尾节点

acquireQueued方法

addWaiter创建了等待队列并返回尾节点后,就进入了acquireQueued方法中:

final boolean acquireQueued(final Node node, int arg) {
  // 是否中断的标记
  boolean interrupted = false;
  try {
    for (;;) {
      // 注释1:获取node的前驱节点,node更名为currentNode更方便我理解
      final Node p = node.predecessor();
      if (p == head && tryAcquire(arg)) {
        setHead(node);
        p.next = null;
        return interrupted;
      }
      // 注释2:判断是否需要park当前线程
      if (shouldParkAfterFailedAcquire(p, node))
        interrupted |= parkAndCheckInterrupt();
    }
  } catch (Throwable t) {
    cancelAcquire(node);
    if (interrupted)
      selfInterrupt();
    throw t;
  }
}

注释1的部分,获取到node的前驱节点p,如果p为头节点,则当前线程直接通过tryAcquire尝试获取锁。如果p不是头节点的话可以直接调用tryAcquire吗?

答案是不可以,如果p不是头节点,也就证明当前线程不在获取锁的第二顺位上,前面可能还有若干节点在等待锁,如果任意节点都直接调用tryAcquire,那就失去了acquireQueued方法的意义。

注释2的部分,p不是头节点的情况,也就是当前线程非第二顺位获取锁。那么node就要根据前驱节点的状态来判断是否中断执行了:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  // 获取前驱节点的状态,waitStatus初始化的状态为0
  int ws = pred.waitStatus;
  if (ws == Node.SIGNAL)
    // 注释2:前驱节点处于Node.SIGNAL状态
    return true;
  if (ws > 0) {
    do {
      node.prev = pred = pred.prev;
    } while (pred.waitStatus > 0);
    pred.next = node;
  } else {
    // 注释1:更新前驱节点的状态为Node.SIGNAL
    pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
  }
  return false;
}

private final boolean parkAndCheckInterrupt() {
  // 暂停线程
  LockSupport.park(this);
  return Thread.interrupted();
}

addWaiter的流程中可以看到,处理node的过程中并没有处理node.waitStatus,此时waitStatus == 0,那么对于node的前驱节点pred也是一样的,因次第一次执行shouldParkAfterFailedAcquire方法时,会进入注释1的部分,并返回false

再次进入acquireQueued的循环后,shouldParkAfterFailedAcquire返回true,执行parkAndCheckInterrupt方法,需要注意LockSupport.park(this)会让线程暂停在此处,也就是说如果没有线程唤醒,线程会一直停留在此处

至此,AQS的加锁过程已经结束了,我们画张图来回顾下这个过程:

释放锁

接着来看解锁的过程:

public final boolean release(int arg) {
  if (tryRelease(arg)) {
    Node h = head;
    if (h != null && h.waitStatus != 0)
      unparkSuccessor(h);
    return true;
  }
  return false;
}

按照AQS的风格tryRelease必然是要交给子类实现的:

protected boolean tryRelease(int arg) {
  throw new UnsupportedOperationException();
}

果不其然。

假设tryRelease执行成功,接下来会发生什么?

  • 获取头节点h;

  • 判断头节点的状态h.waitStatus != 0

  • 执行unparkSuccessor

来看unparkSuccessor的代码:

private void unparkSuccessor(Node node) {
  // node是当前线程的前驱节点,也是head节点
  int ws = node.waitStatus;
  if (ws < 0)
    // 处理node的waitStatus
    node.compareAndSetWaitStatus(ws, 0);
  Node s = node.next;
  // 注释2:从后向前遍历待唤醒的节点
  if (s == null || s.waitStatus > 0) {
    s = null;
    for (Node p = tail; p != node && p != null; p = p.prev)
      if (p.waitStatus <= 0)
        s = p;
  }
  // 注释1:唤醒后继节点
  if (s != null)
    LockSupport.unpark(s.thread);
}

如果一切顺利,那么unparkSuccessor时会跳过注释2的部分,直接执行注释1的LockSupport.unpark

不过别忘了,待唤醒的线程此时还在acquireQueued方法中阻塞着,唤醒的线程会继续执行acquireQueued中的内容,调用tryAcquire获取锁,并更新AQS的头节点

我们设想一个场景:

addWaiter执行到compareAndSetTail(oldTail, node)时调用了unparkSuccessor,可能会出现一种情况:

即T1已经与HEAD建立了联系,但HEAD却没有与T1建立联系。因此注释2中,判断HEAD节点没有后继节点时,会通过TAIL节点,从后向前遍历等待队列,查找待唤醒的线程。

好了,AQS的核心源码分析到这里就结束了,至于条件队列,共享模式等就留给大家自行探索了。

学习完AQS的核心原理后,我们来实践一下,借助AQS来构建构建互斥锁:

public class MutexLock {
  public void lock() {
    sync.acquire(1);
  }

  public void unlock() {
    sync.release(0);
  }

  private final Sync sync = new Sync();

  static class Sync extends AbstractQueuedSynchronizer {
    @Override
    protected boolean tryAcquire(int arg) {
      Thread currentThread = Thread.currentThread();
      if(compareAndSetState(0, arg)) {
        setExclusiveOwnerThread(currentThread);
        return true;
      }else {
        return false;
      }
    }

    @Override
    protected boolean tryRelease(int arg) {
      if(getState() != 1) {
        return false;
      }

      setState(arg);
      return true;
    }
  }
}

通过AQS实现只有基础功能的互斥锁还是非常简单的,甚至在重写tryAcquire方法时可以不设置独占线程(虽然在现在也没起到作用),只是简单的使用CAS替换掉AQS的state即可:

@Override
protected boolean tryAcquire(int arg) {
  return compareAndSetState(0, arg);
}

当然了,这只是一把“玩具锁”,还存在许多问题,比如,非上锁线程依旧可以解锁。其次除了阻塞还排队外,也不支持诸如可重入等高级特性。

好了,关于AQS的部分就到这里了。如果你有看过《AQS的前世,从1990年的论文说起》中基于排队思想自旋锁的演进过程,并理解了MCS锁和CLH锁的实现,那么理解AQS对你来说是非常容易的,虽然它们看起来是不同的东西,但核心原理是相同的,只是在技术实现上有些许差别。

最后,希望通过AQS的前世和今生,能够帮助你重新认识AQS,理解Doug Lea设计这样一个同步器基础组件的意义。


好了,今天就到这里了,Bye~~

手机扫一扫

移动阅读更方便

阿里云服务器
腾讯云服务器
七牛云服务器

你可能感兴趣的文章