阅读本文前,需要储备的知识点如下,点击链接直接跳转。
AQS即AbstractQueuedSynchronizer
的简称,翻译过来就是抽象队列同步器的意思,由Doug Lea大神开发的。说他抽象是因为它提供的是一个基于队列的同步器框架,定义了一些基础功能方法(控制状态变量,获取和释放同步状态方法以及入队出队操作等),具体场景使用只需要根据需要实现对应的方法即可。我们在锁(比如ReentrantLock)、并发工具类(比如CountDownLatch)都可以看到内部类继承了AbstractQueuedSynchronizer
,也就是说AQS才是这些类的基石。说了这么多,感觉把抽象说的越抽象了,下面我们从几个栗子入手吧。
注意:本文使用的JDK版本为JDK8,AQS的代码非常巧妙和经典,很多细节和模块都可以单独拉出来写一篇文章,很多细节问题建议自行阅读和思考。
本篇文章主要讲独占模式的应用和原理分析,关于共享模式不再这里展开细讲。
3个线程获取同一个锁,获得后休眠1秒结束,所以3个线程间隔1秒打印输出。
public class ReentrantLockTest {
public static void main(String[] args) {
lockTest();
}
public static void lockTest() {
ReentrantLock lock = new ReentrantLock();
PrintThread t1 = new PrintThread(lock, "t1");
PrintThread t2 = new PrintThread(lock, "t2");
PrintThread t3 = new PrintThread(lock, "t3");
t1.start();
t2.start();
t3.start();
}
}
class PrintThread extends Thread {
private Lock lock;
public PrintThread(Lock lock, String threadName) {
this.lock = lock;
this.setName(threadName);
}
@Override
public void run() {
lock.lock();
try {
System.out.println(String.format("time:%s,thread:%s,result:%s",
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").format(LocalDateTime.now()),
Thread.currentThread().getName(), "get lock success"));
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
打印结果如下
time:2021-04-13 13:53:55,thread:t1,result:get lock success
time:2021-04-13 13:53:56,thread:t2,result:get lock success
time:2021-04-13 13:53:57,thread:t3,result:get lock success
是因为这3个线程执行时都要先获取锁执行完逻辑后再释放锁,而ReentrantLock
是独占锁,相当于这3个线程间是串行执行的,相互间隔1秒(注意,线程的先后执行顺序不一定是固定的,但线程内有休眠1秒的操作,所以至少相隔1秒)
main线程创建一个CountDownLatch latch = new CountDownLatch(1),3个线程持有该CountDownLatch
并调用CountDownLatch
的await()
方法,直到main线程休眠2秒后执行CountDownLatch
的countDown()
方法,释放一个同步状态使得数量值为0,唤醒等待在await()
的线程继续执行。
public class CountDownLatchTest {
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
ConcurrentThread concurrentThread1 = new ConcurrentThread(latch, "t1");
ConcurrentThread concurrentThread2 = new ConcurrentThread(latch, "t2");
ConcurrentThread concurrentThread3 = new ConcurrentThread(latch, "t3");
concurrentThread1.start();
concurrentThread2.start();
concurrentThread3.start();
Thread.sleep(2000);
System.out.println(Thread.currentThread().getName() + " countDown...");
latch.countDown();
}
}
class ConcurrentThread extends Thread {
private CountDownLatch latch;
public ConcurrentThread(CountDownLatch latch, String threadName) {
this.latch = latch;
this.setName(threadName);
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " is ready...");
try {
latch.await();
System.out.println(Thread.currentThread().getName() + " is executing...");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
打印结果如下(注意,线程的先后执行顺序不一定是固定的)
t1 is ready...
t3 is ready...
t2 is ready...
main countDown...
t1 is executing...
t3 is executing...
t2 is executing...
这三个线程在执行时先打印“…ready”后,然后等待在await()方法上,由于CountDownLatch
是共享锁,而初始的state是1,main线程休眠2秒后调用了countDown()方法会将state置成0,会唤起等待队列里的所有后继线程,所以会相继打印“executing…”。
这里就两个简单的使用栗子,不过可以看出,均是在多线程场景中使用,而且代码里并没有AQS相关的影子,那是因为在这些类的内部有内部类去继承了AbstractQueuedSynchronizer
,由这些内部类处理业务逻辑,底层核心逻辑是由AQS框架提供的(线程排队、线程等待、线程唤醒、超时处理、中断处理等),子类调用API实现核心逻辑,AQS在多线程中使用发挥真正的作用。下面我们一步步来分析AQS。
图中红色连接的线表示内部类,蓝色线表示继承
我们首先来看看AQS相关的URL类图吧,从JDK的源码中我们发现,AQS真正出现的在两个地方,第一个就是lock锁(比如ReentrantLock等),第二个就是并发工具类(比如CountDownLatch、Semaphore等),由这些内部类继承了AQS去实现相关的方法辅助主类实现相关控制,但是我们在JDK的源码中可以看先这些lock锁和并发工具类应在了很多的地方,比如队列、线程池及并发类相关的一些地方。
上图把各类的方法展示出来了,我们可以看到继承了AQS类的那些Sync内部类都只用覆盖实现一小部分方法即可完成特定的功能。因为在AQS类中已经实现了大部分底层通用的逻辑,对于其子类来说只用实现部分对外暴露的方法即可,同样我们也可以继承AQS实现自定义的锁或者工具类。
public abstract class AbstractOwnableSynchronizer
implements java.io.Serializable {
private transient Thread exclusiveOwnerThread;
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
protected final Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
}
AbstractOwnableSynchronizer
类里包含一个Thread
的属性并提供了get、set方法,这个Thread对象就是当前持有锁的线程。线程能否支持重入功能就是判断当前线程和持有锁的线程是不是同一个对象,只是同步状态state值增加而已,等线程主动释放锁后该同步状态state值数量值减少。
该类使用了abstract
修饰,但是类中并没有抽象方法,目的就是这个类不对外直接使用,而get、set方法使用了protected final修饰,说明方法可被子类使用但不能被子类重写。
另外,exclusiveOwnerThread是用了transient
修饰,说明这个属性不参与序列化,因为Thread没有实现Serializable
接口,不能进行序列化处理,另外进程是系统资源分配的最小单位,线程是进程执行的最小单位,线程是由操作系统分配和调度的,所以不能将线程进行序列化。
AbstractQueuedSynchronizer
类也是一个抽象类,继承自AbstractOwnableSynchronizer
,也就拥有了设置持有锁线程的能力,同样该类使用了abstract
修饰,目的就是这个类不对外直接使用,需要具体子类去继承后使用。虽然他实现了序列化接口,但是其内部类Node
并未实现序列化接口,所以在AbstractQueuedSynchronizer
类的属性head、tail都是Node类型并且加了transient
关键字不参与序列化,从以上我们大概就能猜到如果将AQS序列化它只保存一些基本属性的值,并不包含线程以及队列,基本在使用过程中也不会对其进行序列化,具体的属性和队列后续会详细介绍,下面列举一些AQS类里重要的方法和属性。
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
/**
* 独占模式,尝试获取同步状态,立即返回获取成功或失败,需要子类实现
*/
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
/**
* 独占模式,尝试释放同步状态,立即返回获取成功或失败,需要子类实现
*/
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
/**
* 共享模式,尝试获取共享锁,需要子类实现,
* 立即返回获取的数量值
* 0:获取锁成功,没有剩余资源
* > 0:获取锁成功,并且有剩余资源
* < 0:获取失败
*/
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
/**
* 共享模式,尝试释放共享锁,需要子类实现,释放成功返回true
*/
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
/**
* 当前线程是否独占资源,需要子类实现,true:是,false:否
*/
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
/**
* 入队
*/
private Node enq(final Node node) {...}
/**
* 将当前线程封装成Node逻辑里也有调入队enq方法的逻辑
*/
private Node addWaiter(Node mode){...}
/**
* 【重要】对外提供的获取锁的方法,子类调用此方法执行获取锁的动作,
* 内部调用包含了获取锁、排队、阻塞、中断等操作
*/
public final void acquire(int arg) {...}
/**
* 【重要】对外提供的释放锁方法,子类调用此方法执行释放锁的动作,
* 内部包含更新state、唤醒等待队列的第一个等待节点
*/
public final boolean release(int arg) {...}
/**
* 【重要】双向队列头结点
*/
private transient volatile Node head;
/**
* 【重要】双向队列尾结点
*/
private transient volatile Node tail;
/**
* 【重要】同步状态,控制线程是否可获取资源,是用一个整型的变量表示,
* 加了volatile,保证了该变量在多线程间的可见性
*/
private volatile int state;
/**
* 静态内部类,将等待锁的线程封装成Node进行排队
*/
static final class Node {
...
}
// 其他方法、属性、内部类未列出
...
}
该类中没有抽象方法,但是上面提到的几个方法都是抛了UnsupportedOperationException
异常,说明需要具体子类实现时去复写,这也正是独占模式和共享模式要对应实现的方法。
head、tail两个Node
类型的属性分别表示了双向链表的队头和队尾,如果线程不能获取到锁则进入队列排队并且等待唤醒或者超时中断,后续细讲。
整型的state属性比较核心,表示同步状态,就是用它来控制线程是否需要阻塞。上面的代码没有列出其他方法,部分方法源码后文会详细分析。
AQS类中有一个非常重要的内部类Node
,我们称作它为节点,这个内部类是AQS框架线程排队的基石,非常核心,按照注释上所说Node类是CLH
队列的一种变种(CLH队列是一种单向队列,这里不做介绍,感兴趣可自行搜索),Node类是一种双向队列,内部有Node prev,Node next属性,分别表示前驱节点和后继节点,还有一个Thread
属性,表示封装的当前线程,所以AQS的队列其实就是以Node节点形成的一个双向链表,结构如下:
我们看下Node类的属性和方法类图。
节点模式:
Node SHARED = new Node()来表示共享模式,Node EXCLUSIVE = null表示独占模式。
节点等待状态waitStatus:
这个属性字段比较重要,因为它是AQS控制线程执行的关键字段,这个值的改变是采用CAS操作的。他的取值只有以下几种。
(1)1:CANCELLED,取消状态,可能情况有节点等待超时被取消或者被中断,那么代表这个Node节点中包含的线程未获取到锁,由具体业务判断是否需要执行后续逻辑。
(2)0:初始化值,创建节点的时候默认会初始化,0也就是他的默认值。
(3)-1:SIGNAL,表明该节点以后的线程需要等待唤醒,后续节点的线程可以阻塞。
(4)-2:CONDITION,表明该节点的线程需要等待,由ConditionObject
实现条件队列会用到。
(5)-3:PROPAGATE,一般在共享模式下会有该状态,表明头节点获取到了共享资源,可向后传播,等待队列里的其他节点也都可以获取共享资源。
Thread thread属性对象
AQS框架将当前正在获取同步状态的线程包装成Node节点的一个属性,根据Node节点的waitStatus状态来控制当前线程是被唤醒继续尝试获取锁还是线程取消。
AQS内部的两个变量head代表队列的头结点,tail代表队列的尾节点,是一个双向队列,如Node类所介绍,head和tail指向如下图所示。
注意:head节点比较特殊,队列里需要唤醒的线程是从head节点的next节点开始, 在队列初始化时放的是一个new Node()对象,属性thread并没有赋值,后续排队的线程被唤醒时会把他自己设置成head并且将thread属性设置成null。所以head节点可以这么理解,head节点初始化时是一个虚拟节点,没有用处,只是充当一个队头标识,当队列中有线程排队时,说明head节点已经是获取到锁的线程的节点了,等这个线程执行完需要唤醒head.next之后的线程继续执行,这就是排队和唤醒的逻辑。
在AQS类中,有一个state属性,描述如下
/**
* The synchronization state.
*/
private volatile int state;
state是整型变量,叫同步状态,也可叫加锁的次数,使用了volatile修饰,保证了线程间的可见性,所有的线程是否可获取到锁资源都是基于对这个字段值的操作来确定。对于独占锁来说,初始情况下state=0,表示当前资源空闲,可被线程获取到锁,如果state>0,表示已经有线程占用资源,后续的线程(非持有锁的线程)需要进入队列,不会存在<0的情况,因为如果释放锁的过程中到state=0时就已将exclusiveOwnerThread置成null了,所以多次调用释放锁的方法时,如果exclusiveOwnerThread不是当前线程的话,则会抛出IllegalMonitorStateException
异常。
多个线程获取锁时按照请求的先后顺序排队,不存在插队的情况。
常用的实现方式如下:
final void lock() {
acquire(1);
}
acquire方法是AQS的获取锁方法,多线程竞争获取锁时会排队。
多个线程获取锁时,首先不是按照请求的先后顺序排队,而且先尝试去获取锁,也就是抢占式获取,如果获取到了那么该线程就是持有锁的线程可以执行他的逻辑,如果没有获取到锁,那么就会走入队排队流程,所以有可能会出现后到的线程可能比等待队列里的线程先获取到锁。
常用的实现方式如下:
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
通过代码可以看到非公平的情况下,线程会先尝试使用cas方式设置state,如果设置成功则获取到锁,设置失败则走入队排队等待获取锁流程。
所以,这两个的区别在于是否会抢占获取锁。 设置成公平锁时,每个线程获取锁的概率是一样的,每个线程会先看等待队列是否为空,若为空,直接获取锁,若不为空,自动排队等候获取锁;设置成非公平锁时,所有的线程都会优先去尝试争抢锁,不会按顺序等待,若抢不到锁,再用类似公平锁的方式获取锁。
那为什么会这样设计呢,这两种分别使用在什么场景下呢。
恢复挂起的线程到真正锁的获取还是有时间差的,从开发人员来看这个时间微乎其微,但是从CPU的角度来看,这个时间差存在的还是很明显的。所以非公平锁能更充分的利用CPU的时间片,尽量减少CPU空闲状态时间
使用多线程很重要的考量点是线程切换的开销,当采用非公平锁时,当1个线程请求锁获取同步状态,然后释放同步状态,因为不需要考虑是否还有前驱节点,所以刚释放锁的线程在此刻再次获取同步状态的概率就变得非常大,所以就减少了线程的开销
貌似上面说的两点都是非公平锁比较好,但是非公平锁也有他的问题,有可能导致排队的线程长时间排队也没有机会获取到锁,这就是传说中的“锁饥饿”,如果使用的是带有超时时间的方式获取锁,则可能导致排队中的线程大面积超时获取锁失败。
那什么时候用公平锁,什么时候用非公平锁?
如果为了更高的吞吐量,非公平锁是比较合适的,因为节省很多线程切换时间,吞吐量自然就上去了; 否则那就用公平锁,大家按请求先后顺序排队使用。
以ReentrantLock公平锁方式不带超时不可中断获取锁为例。
整体流程如下,先了解整体流程有助于我们理解,会涉及到子流程,流程图单独给出。
主要获取锁代码如下,这也是调用获取锁的入口,逻辑看代码注释:
public final void acquire(int arg) {
/*
(1)tryAcquire方法由子类实现尝试获取锁的逻辑,
返回true就不走后面的判断,表示获取到了锁,返回false表示未获取到锁,走后续入队等待流程
(2)addWaiter方法是将当前线程封装成Node对象返回,里面也有关于入队的操作
(3)acquireQueued方法主要是先再尝试获取一次锁,
获取到了就返回是否被中断标识,获取不到则需要确认线程是否需要阻塞以及阻塞操作,
最终返回释放被中断标识
(4)selfInterrupt是将当前线程中断,因为LockSupport.park阻塞线程时是不会响应中断的,
但是通过Thread.interrupted()这个方法可以获取到当前线程是否被中断标识
*/
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
这里tryAcquire(arg)尝试获取锁的方法由AQS子类实现,其余三个方法(acquireQueued、addWaiter、selfInterrupt)都是AQS来实现的,这也是个模板方法设计模式。
tryAcquire(arg)流程,尝试获取锁的具体实现逻辑。
代码如下:
protected final boolean tryAcquire(int acquires) {
// 获取当前线程
final Thread current = Thread.currentThread();
// 获取AQS的同步状态值state
int c = getState();
// state是0则表示没有线程持有锁,可以尝试去获取锁
if (c == 0) {
/*
(1)hasQueuedPredecessors方法判断队列里当前线程的Node之前是否还有其他Node,
返回true说明有其他线程也在等待,尝试获取锁失败,返回false说明前面没有线程等待,
可以继续执行逻辑,这里先判断了state=0没有直接cas操作而是再判断队列里是否有等待的线程,
充分体现了公平性
(2)如果compareAndSetState(0, acquires)也设置成功,则说明加锁成功,
将exclusiveOwnerThread设置成当前线程,返回true表示获取锁成功
*/
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
/*
这个else if逻辑主要就是可重入的判断和处理,
如果持有锁的线程是当前线程则state= state + acquires
*/
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
addWaiter(Node.EXCLUSIVE)流程,将线程包装成Node节点的逻辑,有入队排队的逻辑,返回包装的Node节点。
代码如下:
private Node addWaiter(Node mode) {
// 将当前节点封装成Node对象
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
/*
(1)队列不为空的情况下,先尝试将node插入到队尾,
compareAndSetTail返回成功则说明node变成队列成功,直接返回,否则需要走入队流程
(2)主要是将当前node的prev指向原tail,原tail节点的next指向当前node上,
这样就完成了node的入队
*/
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 尝试直接插入队尾失败了就走入队逻辑
enq(node);
// 返回当前线程封装成的Node对象
return node;
}
private Node enq(final Node node) {
// 入队使用的for无限循环,是一个自旋的过程,直到成功
for (;;) {
Node t = tail;
/*
如果队尾tail为空,则说明队列还未初始化,先初始化head节点,然后tail也指向head,
完成初始化队列,虽然只有一个节点,但head和tail都有了指向
*/
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
/*
如果队尾tail不为空,则采用cas方式将当前node插入队尾,
成功则返回,否则一直自旋尝试直到成功
*/
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
线程阻塞逻辑,acquireQueued(final Node node, int arg)具体实现流程
代码如下:
final boolean acquireQueued(final Node node, int arg) {
/*
failed变量表示获取锁是否失败,初始化为true表示失败,只有在获取到锁时failed为false,
为true时表示获取锁过程中异常,finally块里的判断是否需要取消当前这个线程获取锁的相关逻辑,
包括队列的调整以及后继Node里线程的唤醒
*/
boolean failed = true;
try {
/*
interrupted变量表示当前线程是否被中断的标识,true:线程被中断,false:线程未被中断,
这个方法整体返回的就是这个值,用来确定后续是否要调用selfInterrupt()方法中断当前线程
*/
boolean interrupted = false;
// for无限循环,自旋处理
for (;;) {
// 取当前节点的前一个节点
final Node p = node.predecessor();
// 如果前一个节点是head并且tryAcquire尝试获取到锁了,则将当前线程设置成head
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
/*
这里就是线程阻塞等待的核心了,尝试获取锁失败时,判断是否需要阻塞,
需要阻塞的话就调用LockSupport.park方法阻塞当前线程
*/
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
/*
在不可中断模式下,failed的值始终会是false,因为虽然被中断了,
但是当前线程还是获取到锁了,走正常的后续处理逻辑,finally这里的逻辑就不会走了
*/
if (failed)
cancelAcquire(node);
}
}
尝试获取锁失败时是否需要阻塞当前线程判断流程,shouldParkAfterFailedAcquire(Node pred, Node node)逻辑
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
/*
当前线程的前一个节点的waitStatus状态是Node.SIGNAL,
则说明前一个线程如果获取到锁并且执行完成后释放了锁需要唤醒后续节点,
从另一个角度来说当前线程自然要阻塞等待了
*/
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
/*
当前线程的前一个节点的waitStatus状态是Node.CANCELLED时,说明前驱节点已经取消获取锁了
需要从当前节点一直向前查找知道节点没有被取消,
然后把找到的第一个没有被取消的节点的next指向当前节点,这样就把当前节点前取消状态的都删掉
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
前一个节点的waitStatus状态还是0,或者是共享锁的传播状态PROPAGATE时,
则会把前一个节点的waitStatus状态改成Node.SIGNAL
所以是后一个节点排队时把前一个节点waitStatus改成Node.SIGNAL,
表示前一个节点执行完释放锁了要走唤醒后续节点的逻辑,
依次类推,队列里只有最后一个Node节点的waitStatus是0,因为它没有后续节点,
也不需要执行唤醒操作,其余在没有被中断状态下应该都是Node.SIGNAL
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
/*
阻塞当前线程调的就是LockSupport.park,原理之前文章有讲过,这就是线程阻塞等待的核心实现了
线程被LockSupport.park了不会响应中断,
如果线程被中断了需要用Thread.interrupted()获取当前线程的中断标识
*/
LockSupport.park(this);
return Thread.interrupted();
}
以ReentrantLock释放锁为例,释放锁不区分公平锁还是非公平锁,释放的逻辑是一样的,整体流程如下。
release(int arg)这是AQS里定义的模板方法,主要释放锁代码如下,这也是调用释放锁的入口,逻辑看代码注释:
public final boolean release(int arg) {
// 尝试释放锁,由子类实现具体逻辑
if (tryRelease(arg)) {
Node h = head;
// 头节点不为null,并且waitStatus!=0,说明要唤醒后续节点
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
// 返回锁是否空闲标识,其实就是tryRelease(arg)的返回结果
return false;
}
tryRelease(int releases)是尝试释放锁的逻辑,AQS定义的方法,默认是抛异常,子类根据具体场景实现逻辑。以下是ReentrantLock的内部类Sync的具体实现,返回true表示现在锁空闲了,返回false表示锁现在还被占用。
protected final boolean tryRelease(int releases) {
// 计算释放releases后,新的state值
int c = getState() - releases;
// 如果当前释放锁的线程不是持有锁的线程直接抛异常,只有持有锁的线程才能释放锁
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
/*
如果释放releases后,新的state是0,那么说明锁就空闲了,将free标识赋值为true,
然后将exclusiveOwnerThread赋值为null
*/
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
// 设置state新值,只有持有锁的线程才可操作,无需cas
setState(c);
return free;
}
unparkSuccessor(Node node) 这个方法就是关键的唤醒后续等待队列里的线程关键方法。通过调用LockSupport.unpark方法将阻塞的线程唤醒继续执行。
private void unparkSuccessor(Node node) {
// node是当前释放锁的线程,它的waitStatus如果<0就把他置成0
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
如果node的next节点是null或者取消了,则从队尾往前查找,一直找到node节点,
获得第一个未被取消的节点
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 找到第一个未被取消的节点,并唤醒线程,使其继续执行
if (s != null)
LockSupport.unpark(s.thread);
}
这里有一个比较关键的地方,如果node的next节点是null或者取消状态,则从队尾往前查找,一直找到node节点,为什么会从后往前遍历?
这里考虑了并发的场景,从后往前不会导致node丢失,具体我们可以从addWaiter方法看。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
这里的第6、7、8行就是关键了,先设置prev节点,这样就保证了所有的节点都有前驱节点,第7、8这两行没有保证原子操作,如果cas成功了,但是刚好cpu时间片切换,第8行未执行,那么pred的next就是空了,所以从前往后可能会漏节点,从后往前是完整的队列,举个栗子:
(1)假如释放锁的线程是tail尾节点,刚好unparkSuccessor时,执行到node.next为空的判断之前,cpu时间片切换了。
(2)有个线程调用了addWaiter方法,把新node的prev指向了tail,cas设置尾节点也成功了,就在这儿cpu又切换了,那么原tail节点的next还没有设置。
(3)cpu再切回到unparkSuccessor的node.next为空判断时,这时候他的next是null(因为next指针还没有指向新node节点),实际上后面还有一个node节点,这样就会漏掉节点数据了。
如果从后往前的话,每一个node的前驱肯定是有值的,但是高并发情况下不能保证每一个node的后继节点也能及时连接上。所以从后往前就确保了能遍历到每一个节点。
也就是从等待队列里阻塞的方法恢复执行,返回线程是否中断标识,然后再继续尝试获取锁。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
到这里,基本上已经把独占锁的获取锁和释放锁的流程和逻辑都讲完了,AQS基本已经把大部分的核心功能帮我们写好了,我们只用去写或利用他已有的方法,实现我们自己的逻辑即可,就比如以上讲到的独占锁的获取和释放,其实我们自己仅仅具体实现了tryAcquire(int acquires)、tryRelease(int releases)这两个方法,花了大篇幅讲的都是AQS的流程和逻辑,由此,真正的感受到了AQS的巧妙设计。
理解了上面的独占锁的加锁流程,对于超时和中断处理的理解就很容易了,这两种其实都有线程中断抛出异常逻辑,另外将带超时时间获取锁和可响应中断获取锁这两种方式关于获取结果交给开发人员自行处理,既体现了设计的灵活性也可让开发人员根据具体业务场景具体处理,还是以ReentrantLock来讲解。
关于超时,就是在指定的时间内未获取到锁就返回获取失败,在指定的时间内获取到了锁返回成功,有两种,一个是尝试获取,例如:tryLock(),不管有没有获取到立即返回,相当于超时是0,另一种是指定超时时间,如果指定时间未获取到锁就返回false,例如:tryLock(long timeout, TimeUnit unit),下面详细讲解下。
tryLock()
public boolean tryLock() {
// 入口方法,是以非公平方式尝试获取锁,返回true:获取成功,false:获取失败
return sync.nonfairTryAcquire(1);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// state值是0时,表示暂时锁空闲,尝试cas赋值,也可以理解成尝试加锁
if (c == 0) {
// cas成功,则说明加锁成功,设置当前线程为持有锁的线程,返回true:获取成功
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 当前线程如果是持有锁的线程,可重入,判断并设置state=state+acquires,返回true:获取成功
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// 尝试没有获取到锁,当前线程也不是持有锁的线程,直接返回false:获取失败
return false;
}
tryLock()的实现逻辑还是挺简单了,不带超时相关设置,相当于超时时间是0,要么立即成功,要么立即失败,不涉及复杂的入队、阻塞、唤醒、取消相关逻辑。单纯的看state=0说明空闲cas成功则立即获取锁,或者持有锁的线程是当前线程,这样就可重入,获取锁成功,其他情况均尝试获取锁失败,直接返回。
tryLock(long timeout, TimeUnit unit)
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
/*
主入口方法,带超时时间尝试获取锁,获取到返回true,未获取到返回false,
注意还有可能抛出被中断异常InterruptedException
*/
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
// 判断如果线程被中断,则抛异常
if (Thread.interrupted())
throw new InterruptedException();
//还是先尝试获取锁,获取成功则返回true,获取失败执行后面的doAcquireNanos方法,带超时等待
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
/**
上文已经说过了,如果线程进入等待队列并且阻塞了,那么它是不会响应中断的,虽然阻塞队列不响应中断,但是被唤醒后,线程的中断标识是可以获取到的,所以可以通过该标识来处理是否需要主动抛异常中断处理。
需要注意中断并不是实时感知的,虽然被中断了如果没有被唤醒,还是需要继续等待,直到被唤醒后,获取中断标识来做处理。
我们还是以ReentrantLock为例,lockInterruptibly()这个就是可以响应中断的方法。
public void lockInterruptibly() throws InterruptedException {
// sync这个对象继承了AbstractQueuedSynchronizer,这里直接调用的是AQS的方法了。
sync.acquireInterruptibly(1);
}
public final void acquireInterruptibly(int arg)
throws InterruptedException {
// 先判断下如果线程已经被中断了,直接抛出InterruptedException异常
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
// 尝试获取锁没有成功时,才进入可响应中断获取锁的方法里
doAcquireInterruptibly(arg);
}
/**
* 这个方法就是获取锁时可响应中断核心实现,
* 大体流程上跟tryLock(long timeout, TimeUnit unit)这个方法差不多
* 逻辑里调用了相同的方法的就不再详细阐述了,只说不同的核心关键逻辑
*/
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
/*
主要的处理就在这里了,判断需要阻塞并且阻塞被唤醒后,
如果中断标识为true则抛出InterruptedException异常
*/
throw new InterruptedException();
}
} finally {
/*
这里可能会走,如果线程被中断了,抛出InterruptedException异常后,failed变量还是true
需要走取消的逻辑,将当前线程的Node从队列去掉相关逻辑处理
*/
if (failed)
cancelAcquire(node);
}
}
AQS是一个抽象队列同步框架,支持独占模式和共享模式,由于AQS是一个抽象类,仅仅需要子类去实现具体的获取锁释放锁方法,锁的获取和释放入口统一由AQS提供,如下所示。
(1)不响应中断
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
(2)响应中断
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
独占模式下,不管是否响应中断,获取锁时子类仅需要实现tryAcquire(arg)方法,尝试获取资源,成功则返回true,失败则返回false,其他都由AQS提供。
释放锁入口
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
独占模式下,释放锁时子类仅需要实现tryRelease(arg)方法,尝试释放资源,成功则返回true,失败则返回false,其他都由AQS提供。
(1) 不响应中断
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
(2) 响应中断
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
共享模式下,不管是否响应中断,获取锁时子类仅需要实现tryAcquireShared(arg)方法,尝试获取资源,返回值<0表示失败;=0表示成功,但没有剩余可用资源;>0表示成功,且有剩余资源,其他都由AQS提供。
释放锁入口
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
共享模式下,释放锁时子类仅需要实现tryReleaseShared(arg)方法,尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false,其他都由AQS提供。
使用AQS自定义锁时,子类可以实现Lock接口(因为Lock定义了获取锁和释放锁的方法,也可以不实现这个接口,自己定义方法),然后实现尝试获取锁和释放锁的方法即可。
实现一个独占不响应中断不可重入的公平锁。
独占锁需要实现tryAcquire(arg)、tryRelease(arg)这两个方法。不可重入,则要判断只要有线程占用锁,不管是不是当前线程都返回获取失败,公平锁说明尝试获取锁时要先看队列里是否有等待获取锁的Node。
其实也就是ReentrantLock的另一个版本
这样锁的定义和实现都完成了,代码如下。
public class MyLock {
private Sync sync;
public MyLock() {
sync = new Sync();
}
private class Sync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int arg) {
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, arg)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
}
return false;
}
@Override
protected boolean tryRelease(int arg) {
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (getState() == 1) {
free = true;
setExclusiveOwnerThread(null);
setState(0);
}
return free;
}
}
public final void lock() {
sync.acquire(1);
}
public void unLock() {
sync.release(1);
}
}
测试
多个线程获取锁
class Test {
public static void main(String[] args) {
MyLock myLock = new MyLock();
List
for (int i = 0; i < 5; i++) {
list.add(new Thread(() -> {
System.out.println(LocalDateTime.now() + ":" + Thread.currentThread().getName() + "将要加锁");
myLock.lock();
System.out.println(LocalDateTime.now() + ":" + Thread.currentThread().getName() + "加锁成功");
try {
System.out.println(LocalDateTime.now() + ":" + Thread.currentThread().getName() + "执行业务逻辑");
Thread.sleep(new Random().nextInt(10));
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(LocalDateTime.now() + ":" + Thread.currentThread().getName() + "解锁成功");
myLock.unLock();
}
}, "t" + i));
}
list.forEach(Thread::start);
}
}
结果输出:
2023-06-08T11:35:27.822:t0将要加锁
2023-06-08T11:35:27.822:t4将要加锁
2023-06-08T11:35:27.822:t3将要加锁
2023-06-08T11:35:27.822:t1将要加锁
2023-06-08T11:35:27.822:t2将要加锁
2023-06-08T11:35:27.823:t0加锁成功
2023-06-08T11:35:27.823:t0执行业务逻辑
2023-06-08T11:35:27.828:t0解锁成功
2023-06-08T11:35:27.828:t4加锁成功
2023-06-08T11:35:27.828:t4执行业务逻辑
2023-06-08T11:35:27.831:t4解锁成功
2023-06-08T11:35:27.831:t3加锁成功
2023-06-08T11:35:27.831:t3执行业务逻辑
2023-06-08T11:35:27.836:t3解锁成功
2023-06-08T11:35:27.836:t1加锁成功
2023-06-08T11:35:27.836:t1执行业务逻辑
2023-06-08T11:35:27.837:t1解锁成功
2023-06-08T11:35:27.837:t2加锁成功
2023-06-08T11:35:27.837:t2执行业务逻辑
2023-06-08T11:35:27.845:t2解锁成功
线程是否可重入
class Test {
public static void main(String[] args) {
MyLock myLock = new MyLock();
new Thread(() -> {
System.out.println(LocalDateTime.now() + ":" + Thread.currentThread().getName() + "将要加锁");
myLock.lock();
System.out.println(LocalDateTime.now() + ":" + Thread.currentThread().getName() + "加锁成功");
try {
myLock.lock();
System.out.println(LocalDateTime.now() + ":" + Thread.currentThread().getName() + "再次加锁成功");
System.out.println(LocalDateTime.now() + ":" + Thread.currentThread().getName() + "执行业务逻试");
Thread.sleep(new Random().nextInt(10));
myLock.unLock();
}
catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(LocalDateTime.now() + ":" + Thread.currentThread().getName() + "解锁成功");
myLock.unLock();
}
},"t1").start();
new Thread(() -> {
System.out.println(LocalDateTime.now() + ":" + Thread.currentThread().getName() + "将要加锁");
myLock.lock();
System.out.println(LocalDateTime.now() + ":" + Thread.currentThread().getName() + "加锁成功");
try {
System.out.println(LocalDateTime.now() + ":" + Thread.currentThread().getName() + "执行业务逻试");
Thread.sleep(new Random().nextInt(10));
myLock.unLock();
System.out.println(LocalDateTime.now() + ":" + Thread.currentThread().getName() + "解锁成功");
myLock.lock();
System.out.println(LocalDateTime.now() + ":" + Thread.currentThread().getName() + "再次加锁成功");
myLock.unLock();
System.out.println(LocalDateTime.now() + ":" + Thread.currentThread().getName() + "再次解锁成功");
}
catch (InterruptedException e) {
e.printStackTrace();
}
},"t2").start();
}
}
有两种可能的输出:
这种情况输出如下,t1先加锁成功,t2等待,实现了多线程间的加锁互斥,另外t1加锁成功后有再次加锁,发现还是等待,这说明锁不可重入,功能实现,这两个线程都将一直等下去。
2023-06-08T11:47:57.016:t1将要加锁
2023-06-08T11:47:57.017:t1加锁成功
2023-06-08T11:47:57.016:t2将要加锁
这种情况输出如下,t2先加锁成功,正常执行业务逻辑后释放锁,t2释放锁后线程可正常结束。t2释放了锁,则t1加锁成功,当t1想第二次再加锁时,发现需要等待,锁不可重入。
2023-06-08T11:49:28.492:t2将要加锁
2023-06-08T11:49:28.492:t1将要加锁
2023-06-08T11:49:28.493:t2加锁成功
2023-06-08T11:49:28.493:t2执行业务逻试
2023-06-08T11:49:28.501:t2解锁成功
2023-06-08T11:49:28.501:t1加锁成功
通过这两个例子,我们可以看出,这种独占锁、不可重入的情况下,lock()和unlock()方法必须配对使用,不能连续加锁和释放锁。
java.util.concurrent包下有几个基于AQS实现的锁,如下所示,有了以上知识基础,再理解这些锁是很容易的,了解详细可参考具体源码实现。
锁
类型
描述
ReentrantLock
独享锁
可重入锁
ReentrantReadWriteLock
独享锁、共享锁兼备
ReadLock是共享锁,WriteLock是独享锁
CountDownLatch
共享锁
不可重复使用
Semaphore
共享锁
可重复使用
CyclicBarrier
共享锁
使用ReentrantLock实现的共享锁,可重复使用
主要讲解了AQS的独占模式,提到了一些共享模式相关的知识,有了独享模式的基础,理解共享模式并不难,还有关于Condition相关的知识没有讲,所以关于共享模式和Condition相关的大家可以自行去阅读源码,后续有机会也会出相关的文章。
还有另外一个类AbstractQueuedLongSynchronizer
,这个类是AbstractQueuedSynchronizer
的一个变种,只是把state的类型从int变成long了,所有涉及跟这个state相关的操作参数和返回都改成long类型了,理论上使用这个类实现的锁可以超过Integer.MAX_VALUE的限制,最大的可获取锁的次数就变成Long.MAX_VALUE,这个在如多级锁和需要64位状态时会非常有用,目前在JDK里并没有发现使用的地方,而在HikariCP连接池com.zaxxer.hikari.util.QueuedSequenceSynchronizer
这个类内部使用到了这个类,感兴趣的可自行阅读。
AQS的设计确实相当巧妙、逻辑非常严谨,在多线程下使用,已尽可能最大限度支持高并发操作,通过对源码的学习,我们了解了锁的设计,大部分的工作都由AQS完成(包括线程的包装排队、阻塞、唤醒、超时处理、中断处理等),剩下的小部分代码由开发者根据业务场景具体实现(尝试获取锁,释放锁),不得不佩服如此精美巧妙的设计和实现,Doug Lea,我永远的神!
手机扫一扫
移动阅读更方便
你可能感兴趣的文章