AbstractQueuedSynchronizer
本文详细介绍了 AQS 的同步阻塞队列、Node 内部类、模板方法、独占模式和共享模式的源码实现,以及 ConditionObject 的 await、signal、signalAll 等方法的实现原理
AQS 简介
AQS (AbstractQueuedSynchronizer) 是 JUC 中实现并发同步功能的基石,它抽象了同步器的状态管理、线程的排队和唤醒、同步器的共享/独占模式等关键部分。同时,AQS 还提供了一套通用的 API,使得开发者能够更方便地使用同步器。
state 变量
在 AQS 中,维护了一个 volatile int state
变量来表示共享资源。通过调用 getState
、setState
、compareAndSetState
方法,可以方便地查看或修改共享资源的值:
private volatile int state;
// get 方法
protected final int getState() {
return state;
}
// set 方法
protected final void setState(int newState) {
state = newState;
}
// 基于 CAS 的 set 方法
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
AQS 作为一个抽象同步类,并没有限定 state
的使用规则,具体如何利用它由实现类自行决定。例如:
ReentrantLock
将state
设定成一种可重入锁,当state == 0
时表示资源已被释放,否则表示重入次数。ReentrantReadWriteLock
将state
的高 16 位用作读锁重入次数,低 16 位用作写锁重入次数。Semaphore
将state
用于记录当前可用信号数量。CountDownLatch
将state
用于记录计数器的当前值。
同步队列
同步队列是整个 AQS 的核心,它是一个双向队列,通过 head
和 tail
两个指针分别指向队列的头和尾,而且队列头节点是一个空节点,不与任何线程关联:
![同步队列](images/sync-queue.webp)
初始状态下,head
和 tail
指针均指向 null,当往队列中添加阻塞线程时,会先创建一个空节点,并让 head
和 tail
都指向这个空节点,然后再在空节点后插入被阻塞线程的节点。所以,当 head == tail
的时候,就说明队列为空了。
Node 内部类
Node
内部类提供了以下几个字段:
// 标记该线程是获取共享资源时被阻塞挂起后放入 AQS 队列的
static final Node SHARED = new Node();
// 标记线程是获取独占资源时被挂起后放入 AQS 队列的
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
//表示节点的状态,不同的值代表不同的含义
volatile int waitStatus;
// 记录前驱节点
volatile Node prev;
// 记录后继节点
volatile Node next;
// 存放进入 AQS 队列里面的线程
volatile Thread thread;
// 表示条件队列中下一个节点
Node nextWaiter;
这里我们重点关注表示节点状态的 volatile int waitStatus
,它共有 5 种取值:
0:新节点入队时的默认状态,或者处理过程中的中间状态;
CANCELLED (1):表示当前节点已取消调度。
节点一旦进入该状态,将不再发生状态变化,通常发生在节点超时或被中断时;
SIGNAL (-1):表示当前节点的后继节点正在等待唤醒。
当后继节点入队时,会将前驱节点的状态更新为
SIGNAL
,以通知后继节点当前节点已经释放资源。CONDITION (-2):表示当前节点由于不满足某个条件而被阻塞,常用于实现
Condition
接口中的同步队列;PROPAGATE (-3):用于共享同步模式,表示释放共享资源时,需要唤醒的节点不仅包括当前节点的后继节点,还可能包括其他节点。(本质是为了解决 JDK1.6 中共享模式的 BUG 而引入的一种新状态)
waitStatus
的负值表示节点处于有效等待状态,正值表示节点已取消。
AQS 的模板方法
前文提到,AQS 作为一个抽象同步类,并没有限定 state
的使用规则,但这只是其抽象设计的一部分。实际上,AQS 在抽象独占和共享两种同步模式时,通过模板方法限定了资源获取 (acquire) 和释放 (release) 的过程。这样一来子类只需要实现 tryAcquire(int arg)
、tryRelease(int arg)
等对 state
的具体操作即可实现各种同步策略。
AQS 提供了 acquire()
、release()
、acquireShared()
和 releaseShared()
四个基础模板方法。前两个用于独占模式,后两个用于共享模式。下面我们将详细分析这些方法的实现原理。
独占模式 (Exclusive)
获取资源 acquire()
acquire()
方法是独占模式下线程获取共享资源的顶层入口。如果线程获取到资源,则直接返回。否则,线程将被添加到同步队列中,一直等待直到获取到资源为止。需要注意的是,在整个获取资源的过程中,线程将不会触发上层应用的中断处理逻辑。
以下是 acquire()
方法的实现:
@ReservedStackAccess
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
- 首先调用
tryAcquire()
钩子函数获取资源,具体交由子类实现; - 如果获取资源失败,则调用
addWaiter()
将该线程加入同步队列尾部,并标记为独占模式; - 然后线程进入
acquireQueued()
方法挂起,直到成功获取资源后才会返回。 acquireQueued()
方法会根据阻塞过程中是否发生过中断来返回结果。如果线程被中断过,则该方法会返回 true,然后调用selfInterrupt()
方法来补偿中断信号。
以下是本流程中所涉及到的方法的详细介绍。
tryAcquire()
tryAcquire
是 AQS 模板方法中的一个钩子函数,具体需要子类去重写,直接调用会抛出异常,实现如下:
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
tryAcquire
和 tryAcquireShared
。如果 AQS 将这两个方法定义为抽象方法,那么面向独占模式的实现类就必须实现用不到的 tryAcquireShared
方法,面向共享模式的实现类也必须实现 tryAcquire
方法。即使实现类可以像 AQS 一样在不需要的方法中抛出异常来满足要求,但这既麻烦,又不优雅。addWaiter()
addWaiter()
方法用于在同步队列的队尾添加一个新的节点,实现如下:
private Node addWaiter(Node mode) {
// 把当前线程封装为 node,并指定资源访问方式
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
// 如果 tail 不为空,直接把新 node 插到末尾
if (pred != null) {
node.prev = pred;
// 通过 CAS 将 tail 指向新 node
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 如果 tail 是空,说明同步队列未初始化,执行初始化
// 如果 compareAndSetTail 操作失败,也进入该方法,自旋重试
enq(node);
return node;
}
addWaiter
方法内部首先会创建一个新节点,该节点的线程为当前线程,模式为传入的 mode
,可以是独占模式 (Node.EXCLUSIVE
) 或共享模式 (Node.SHARED
)。然后获取同步队列尾节点的 pred
。如果 pred
不为空,则将新节点插入队列的尾部;否则,调用 enq
方法对队列进行初始化,并将新节点插入到队尾。最后返回新创建的节点。其中,enq
的实现如下:private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
// CAS:如果头为空,就用虚拟 node 节点初始化头
if (compareAndSetHead(new Node()))
tail = head;// 然后将 tail 也指向头
} else {
// 已经初始化好了,则 CAS + 自旋,将新节点设为 tail
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
enq
用于初始化同步队列,并通过自旋将 node
节点插入队列末尾。该方法首先获取同步队列的尾节点 tail
:
- 如果
tail
为空,表示同步队列为空,需要进行初始化。在初始化过程中,使用compareAndSetHead
方法将一个空的Node
设置为head
节点,head
节点本身不保存任何线程信息。- 如果成功设置头节点,则将
tail
指向该头节点。 - 否则,说明其他线程已完成同步队列的初始化,因此直接开始下一轮循环。
- 如果成功设置头节点,则将
- 如果
tail
不为空,则将当前节点插入到队尾。具体做法是将当前节点的prev
指向tail
,然后调用compareAndSetTail
方法将同步队列的尾节点设置为当前节点。- 如果设置成功,则将原
tail
节点的next
指向当前节点,然后返回原tail
节点。 - 否则,自旋重试该过程。
- 如果设置成功,则将原
acquireQueued()
线程节点成功插入同步队列后,便会进入 acquireQueued()
中阻塞等待。源码如下:
@ReservedStackAccess
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;// 标记是否成功拿到资源
try {
boolean interrupted = false;// 是否发生过中断
for (;;) {
// 获取当前线程节点的前驱节点
final Node p = node.predecessor();
// 如果前驱节点已经是队列头了,说明当前节点是队列中第一个有效节点,所以尝试获取资源
if (p == head && tryAcquire(arg)) {
// 获取到了资源,则将该线程设置为新队头
setHead(node);
// 摘下原队头
p.next = null; // help GC
failed = false;
// 返回等待过程中的中断标志
return interrupted;
}
// 如果前驱节点不是 head,或者抢占资源失败了,则检查是否可以安全挂起
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) //如果可以安全挂起,则执行挂起操作,否则继续循环
// 如果等待过程中被中断过,哪怕只有那么一次,也会将 interrupted 标记为 true
interrupted = true;
}
} finally {
// 如果等待过程中没有成功获取资源(如 timeout,或者可中断的情况下被中断了),
// 那么取消节点在队列中的等待
if (failed)
cancelAcquire(node);
}
}
该方法首先判断当前线程节点的前驱节点是否是头节点:
如果是则说明当前节点是同步队列中最靠前的等待节点,所以调用
tryAcquire
尝试获取资源;如果不是头节点,或者上一步获取资源失败,则调用
shouldParkAfterFailedAcquire()
判断能否被安全挂起,当满足挂起条件时调用parkAndCheckInterrupt()
将线程挂起。如果成功获取到资源,则调用
setHead()
将当前节点设置为新的空队头(即移除该线程),实现如下:private void setHead(Node node) { head = node; // 队头都是空节点,不关联线程信息 node.thread = null; node.prev = null; }
shouldParkAfterFailedAcquire()
shouldParkAfterFailedAcquire
方法用于确定当前线程是否可以安全地休眠,以及在某些情况下是否应该继续尝试获取同步资源。如果线程能够被安全挂起,该方法将会返回 true
,否则返回 false
。实现如下:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// 前驱节点的状态已经被设置为 SIGNAL,则当前线程可以安全的休眠
return true;
if (ws > 0) {
// 前驱节点状态为 CANCELLED,则向前遍历,直到遇到正常状态的节点
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
// 将所有 CANCELLED 节点短路(异常节点将被 GC)
pred.next = node;
} else {
// 如果前驱节点状态正常,但不为 SIGNAL,则通过 CAS 操作设置前驱节点状态,并返回 false
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
parkAndCheckInterrupt()
parkAndCheckInterrupt()
方法用于将当前线程挂起(休眠),直到有其他线程中断当前线程或者等待时间结束。实现如下:
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
// 返回中断状态,并清除中断标志
return Thread.interrupted();
}
// 其中,LockSupport.park() 实现如下:
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(false, 0L);
setBlocker(t, null);
}
parkAndCheckInterrupt()
方法通过调用 Unsafe.park
方法让当前线程进入休眠状态,并在其他线程调用了 UNSAFE.unpark(Thread thread)
方法或 thread.interrupt()
方法时被唤醒,同时设置中断标志位。
需要注意的是,park()
方法在阻塞线程时会检查线程的中断状态,如果线程已经被中断,它会立即返回而不进入阻塞状态。
selfInterrupt
当线程被中断唤醒后,会调用 selfInterrupt()
方法补偿被清空的中断标志,供上层业务使用。该方法实现如下:
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
parkAndCheckInterrupt()
方法会使线程进入休眠状态,被中断唤醒后将清空中断标志并记录中断情况。若发生过中断,获取到资源时会再次补偿中断信号。清空中断标志的原因:释放资源 release()
release()
方法是独占模式下线程释放共享资源的顶层入口。如果线程成功释放了资源,则唤醒后继节点并返回 true,否则直接返回 false。
@ReservedStackAccess
public final boolean release(int arg) {
// 尝试释放资源
if (tryRelease(arg)) {
// 获取同步队列头节点
Node h = head;
// 如果节点状态不等于 0,则唤醒后继节点
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
tryRelease()
tryRelease()
也是模板方法 release()
中的钩子函数,这里不再赘述了。
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
unparkSuccessor()
unparkSuccessor()
用于唤醒 node
的后继节点。实现如下:
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
// 如果参数节点的状态为负(正常状态),则尝试将它的状态设为 0
// 这样避免过多的线程进入这个方法
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 拿到参数节点的后继节点
Node s = node.next;
// 如果后继节点是空,或者状态为 CANCELLED
if (s == null || s.waitStatus > 0) {
s = null;
//则从 tail 向前遍历,找到最靠前的正常状态的(且非参数节点本身)节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 后继节点不为空,或者从后往前找到了正常节点,将其内部线程唤醒
if (s != null)
LockSupport.unpark(s.thread);
}
需要注意的是,在正常情况下,头节点的后继节点需要被唤醒。但是,如果发生意外情况,例如头节点的下一个节点此时恰好为 null,或者后继节点状态异常,就需要从 tail
开始从后往前查找最靠前的正常状态的节点(且非头节点本身),然后唤醒它。
共享模式 (Share)
获取资源:acquireShared()
acquireShared()
方法是共享模式下线程获取资源的顶层入口。在整个获取资源的过程中,线程将不会触发上层应用的中断处理逻辑。实现如下:
public final void acquireShared(int arg) {
// 调用钩子函数获取资源
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg); //如果获取失败,加入同步队列重试
}
tryAcquireShared()
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
tryAcquireShared()
也是一个钩子函数,AQS 中未提供默认实现,但规定了该方法返回值的含义,需要我们重点关注:
< 0
:表示当前线程未获取到资源,需要进一步等待。== 0
:表示当前线程已经获取到资源,但是剩余资源不足,后续线程无法继续获取。> 0
:表示当前线程已经获取到资源,并且后续线程也可以获取资源。
doAcquireShared()
doAcquireShared()
是 acquireShared
中的核心方法,实现与 acquirequeued() 类似:
private void doAcquireShared(int arg) {
// 将线程加入同步队列末尾,设置为共享模式
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
// 如果前驱节点是头节点,尝试获取一次共享资源
int r = tryAcquireShared(arg);
if (r >= 0) {// 成功获取到资源
// 将当前节点设置为新的头节点,并根据条件决定是否唤醒后续节点
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted) // 如果被中断过,补偿中断
selfInterrupt();
failed = false;
return;
}
}
// 抢占资源失败,判断能否安全休眠,如果可以则休眠
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
在共享模式下,同步队列的管理与独占模式相似。但需要在获取资源后,判断剩余资源是否足够满足后续线程的需求。这一关键步骤决定了后续线程是等待还是继续尝试获取资源。
setHeadAndPropagate()
setHeadAndPropagate()
方法的主要作用是设置队列头节点并决定是否唤醒后继节点。实现如下:
private void setHeadAndPropagate(Node node, int propagate) {
// 缓存旧的头节点
Node h = head;
// 将当前节点设置为新的头节点,并将原头节点摘下 GC
setHead(node);
// 如果 propagate > 0,表示剩余资源足够
// 如果原头节点为空,或状态为负
// 如果当前节点为空,或状态为负
// 上述条件满足一个,就唤醒后继共享模式节点
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
后继节点的唤醒由 doReleaseShared()
方法实现,我们将在 下文 对其进行介绍。这里我们重点关注唤醒后继节点的条件:#todo
释放资源:releaseShared()
releaseShared()
是共享模式下实现资源释放的顶层方法。源码如下:
@ReservedStackAccess
public final boolean releaseShared(int arg) {
// 尝试释放资源
if (tryReleaseShared(arg)) {
// 释放成功,唤醒后继节点
doReleaseShared();
return true;
}
// 释放失败,返回 false
return false;
}
doReleaseShared()
doReleaseShared()
方法是共享模式下资源释放的核心方法。源码如下:
private void doReleaseShared() {
for (;;) {
// 缓存当前头节点
Node h = head;
// 如果头节点不为空,且存在线程节点
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
// 如果头节点状态为 SIGNAL,则将其更新为 0,更新失败则自旋重试
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h); // CAS 成功的线程,唤醒后继节点
}
else if (ws == 0 && // 如果头节点状态已经为 0,说明前边有线程处理过,将其更新为 PROPAGATE
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// 如果上边代码执行过程中有其他线程节点获取到了资源,就继续循环,否则退出方法
if (h == head)
break;
}
}
- 该方法只有在头节点的状态为
SIGNAL
时才会唤醒后继节点,并将头节点的状态设置为中间状态 0。 - 如果在此时有其他线程执行了
releaseShared()
操作,也进入了该方法,由于头节点未发生变化,因此会获取到中间状态的waitStatus == 0
,于是将头节点状态设置为PROPAGATE
。 - 如果在
doReleaseShared()
方法执行期间,头节点未发生变更,则可以直接安全地返回。但是,如果在方法执行期间,有其他节点线程被唤醒,并成功获取到了资源,将自己设置成了新的头节点,那么就需要重新进行循环加速处理过程。
Condition 同步条件
条件变量使用示例
在开始介绍 AQS Condition
实现原理之前,我们先来了解下条件变量的使用方法。与 wait
/notify
必须配合 synchronized
关键字使用一样,Condition
也必须与 ReentrantLock 配合使用:
// 创建一个互斥锁
Lock lock = new ReentrantLock(true);
// 从 lock 中获取一个条件变量
Condition cdt = lock.newCondition();
拿到条件变量 cdt
后,需要等待条件成立的线程便可通过 await()
调用阻塞等待:
// 首先需要获取到锁
lock.lock();
try {
if (The condition is not met) {
cdt.await(); // 阻塞等待条件成立
}
// 条件成立,执行任务
run();
} finally {
lock.unlock();
}
其它线程使条件成立后,调用 cdt.signal()
便可唤醒上述等待的线程:
// 首先需要获取到锁
lock.lock();
try {
// 运行致使条件成立的代码
cdt.signal(); // 然后唤醒等待的线程
} finally {
lock.unlock();
}
Condition 接口
java.util.concurrent.locks.Condition
接口提供了与 wait
/notify
功能类似的一组同步方法:
public interface Condition {
// 等待条件成立
void await() throws InterruptedException;
void awaitUninterruptibly();
...
// 唤醒等待中的线程
void signal();
void signalAll();
}
ConditionObject 内部类
AQS 提供了一个继承自 Condition
的内部类 java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject
,用于在同步器中进行线程等待和唤醒的对象。
ConditionObject
中维护了一个类似于 AQS 同步队列的条件队列,其节点类型仍为 Node。这些节点通过 Node nextWaiter
字段形成了一个单向链表,用于保存正在等待条件的线程。ConditionObject
还通过两个字段分别指向条件队列的头和尾节点:
public class ConditionObject implements Condition, java.io.Serializable {
private transient Node firstWaiter;
private transient Node lastWaiter;
...
}
这些线程在其他线程调用 signal()
或 signalAll()
方法时被唤醒。
await() 实现原理
await()
方法是 ConditionObject
内部类中最核心的方法之一。该方法用于使当前线程等待在与该 ConditionObject
关联的条件上,并在条件满足时重新去获取共享资源。实现如下:
public final void await() throws InterruptedException {
// 如果线程已被中断,直接抛出中断异常
if (Thread.interrupted())
throw new InterruptedException();
// 将当前线程封装为一个 Condition 节点,并加到条件队列末尾
// 内部不需要 CAS,因为此时已经获取到了独占资源
Node node = addConditionWaiter();
// 阻塞前,先释放当前节点所持有的资源,否则会死锁
int savedState = fullyRelease(node);
int interruptMode = 0;
// 如果 node 在同步队列上,跳出循环
while (!isOnSyncQueue(node)) {
LockSupport.park(this); // 阻塞自己
// 如果收到中断新号,则将节点添加到同步队列,跳出循环
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 当线程被唤醒,并且被添加到同步队列后,交给 aqs 同步队列重新获取资源
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 获取到资源后,删除条件队列中废弃的节点
if (node.nextWaiter != null)
unlinkCancelledWaiters();
// 根据中断模式处理中断
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
await()
方法内部涉及到了中断模式interruptMode
,它由ConditionObject
中的两个常量字段指定:/** 退出 await 状态时,只需补偿中断信号 */ private static final int REINTERRUPT = 1; /** 推出 await 状态时,需要直接抛出中断异常 */ private static final int THROW_IE = -1;
如果线程在等待过程中线程被中断,则根据中断模式进行处理:如果是 THROW_IE
模式,会重新抛出中断异常;如果是 REINTERRUPT
模式,则仅会调用 selfInterrupt()
方法补偿中断标志位。
以下是本流程中所涉及到的方法的详细介绍。
addConditionWaiter()
addConditionWaiter()
方法用于创建一个新的 Condition 类型节点,并将其添加到条件队列中。如果条件队列为空,则初始化该队列。实现如下:
private Node addConditionWaiter() {
Node t = lastWaiter;
// 如果条件队列尾节点状态不是 CONDITION,执行一次清理操作
// 清理完成后,更新 t 引用
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
// 将当前线程封装为一个 CONDITION 节点
Node node = new Node(Thread.currentThread(), Node.CONDITION);
// 如果尾节点为空,则初始化队列,否则直接将 node 插到末尾
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
可以看到,该方法并没有使用 CAS 操作来插入新节点(nextWaiter
字段甚至没有用 volatile
关键字修饰),这是因为 Condition 只用于可重入写锁,同一时刻只有一个线程会持有锁,因此是线程安全的。
fullyRelease()
AQS 中的 AbstractQueuedSynchronizer#fullyRelease
方法通过调用 AbstractQueuedSynchronizer#release 释放当前节点持有的所有资源,并唤醒后继节点。实现如下:
final int fullyRelease(Node node) {
boolean failed = true;
try {
// 获取节点的同步资源
int savedState = getState();
// 释放资源,并唤醒后继节点
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
// 如果释放资源失败,废弃该节点
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
isOnSyncQueue
AQS 中的 AbstractQueuedSynchronizer#isOnSyncQueue()
方法用于确认节点是否在同步队列上。实现如下:
final boolean isOnSyncQueue(Node node) {
// 如果状态为 Condition,或者前驱节点为 null(单向节点)
// 说明是条件队列节点,不在同步队列上
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
// 如果 `next` 指向后继节点,说明是同步队列节点
// 因为条件队列用的是 nextWaiter 字段
if (node.next != null) // If has successor, it must be on queue
return true;
// 上述条件都不满足,从同步队列尾节点向前遍历
return findNodeFromTail(node);
}
可见,该方法利用两种队列的特性,在性能上做了一定的优化,当优化手段都失效后,再执行以下方法兜底:
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
// 同步队列上找到了当前节点,说明这个节点在同步队列上
if (t == node)
return true;
// 找到头依旧没找到,说明不在同步队列上
if (t == null)
return false;
t = t.prev;
}
}
checkInterruptWhileWaiting()
checkInterruptWhileWaiting()
方法用于检查中断信号。如果发生了中断,则返回中断模式,否则返回 0。实现如下:
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
当线程在条件队列上等待时被中断,会调用 transferAfterCancelledWait
方法将节点添加到同步队列。该方法会通过 CAS 操作将节点的状态由 CONDITION
更新为中间态 0。
- 如果 CAS 操作成功,则将该节点添加至同步队列,并返回 true。
- 如果 CAS 操作失败,说明节点的状态已经被其他线程更改,那么方法会自旋,直到该节点被成功添加至同步队列,再返回 false。
transferAfterCancelledWait
源码如下:
final boolean transferAfterCancelledWait(Node node) {
// 更新状态为 0
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
// 将节点添加到同步队列
enq(node);
return true;
}
// 如果当前节点被插入尾节点过程中有其他线程调用该方法,则到这里自旋等待
// 直到节点被成功添加,再返回 false
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
最终 checkInterruptWhileWaiting()
方法根据 transferAfterCancelledWait
的返回值,设置中断模式:
- 如果返回 true,中断模式会被设置为
THROW_IE
,这意味着该线程会抛出InterruptedException
。 - 否则,中断模式会被设置为
REINTERRUPT
,这意味着该线程会被补偿中断信号。
unlinkCancelledWaiters()
unlinkCancelledWaiters()
方法负责将条件队列上的非 Condition
节点摘除。实现如下:
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
// 找到非 Condition 节点,将其短路
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null) // 找到了末尾,更新队列尾节点
lastWaiter = trail;
}
else
trail = t; //缓存上一个 Condition 节点
t = next;
}
}
- 方法仅通过
volatile
语义保证一致性。因为方法内只涉及节点的短路,不涉及状态的变化,因此多个线程的重复操作不会影响一致性。
reportInterruptAfterWait()
reportInterruptAfterWait
方法用于根据中断模式处理中断信号。实现如下:
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
如果是 THROW_IE
模式,这里会重新抛出中断异常;如果是 REINTERRUPT
模式,则仅调用 selfInterrupt() 方法来补偿中断标志位。
awaitUninterruptibly() 实现原理
awaitUninterruptibly
与 await
方法作用类似,只是不会响应中断信号:
public final void awaitUninterruptibly() {
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
boolean interrupted = false;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
//如果发生中断,只做个标记,继续循环
if (Thread.interrupted())
interrupted = true;
}
if (acquireQueued(node, savedState) || interrupted)
selfInterrupt();
}
signal() 实现原理
signal()
方法用于唤醒条件队列中的第一个节点,并将其从条件队列中移除。实现如下:
public final void signal() {
// 只有持有排他资源,才能执行该方法,否则抛异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
// 唤醒条件队列头节点
if (first != null)
doSignal(first);
}
其中,doSignal()
是发送信号的核心实现:
private void doSignal(Node first) {
do {
// 头指针后移
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;//摘下原头节点
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
该方法首先将条件队列的头节点指向第二个节点。如果第二个节点为 null,说明之前条件队列上只有一个节点,因此将尾节点 lastWaiter
也更新为 null。然后,它会调用 transferForSignal()
方法将被摘下的头节点添加到同步队列的末尾:
final boolean transferForSignal(Node node) {
// 通过 CAS 将当前 Condition 节点状态更新为 0
// 相当于废弃条件队列上的该节点
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;// 更新失败,说明节点已被其他线程处理,返回 false
// 如果更新成功,将当前节点添加到同步队列末尾
// 并返回原同步队列尾节点,即当前 node 的前驱节点
Node p = enq(node);
int ws = p.waitStatus;
// 如果同步队列原先的尾节点状态正常,
// 则通过 CAS 将其设置状态设置为 SIGNAL,
// 如果设置失败立刻唤醒当前节点
// 如果同步队列原先的尾节点已被取消,也立刻唤醒当前节点
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
通常情况下,signal()
方法不会直接唤醒条件队列的头节点,而是将其委托给同步队列去唤醒;只有在添加到同步队列后,前驱节点被取消或者状态无法被设置为 SIGNAL
时,才会亲自唤醒头节点。
signalAll() 实现原理
signalAll()
方法用于唤醒等待在条件队列上的所有线程。实现如下:
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
其中,doSignalAll()
是发送信号的核心实现:
private void doSignalAll(Node first) {
// 头尾指针都指向空,相当于保存当前条件队列的镜像
// 后续线程可以向新队列插入数据,互不影响
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
// 摘下快照队列头节点
first.nextWaiter = null;
// 唤醒摘下的头节点
transferForSignal(first);
// 快照队列头指针后移
first = next;
} while (first != null);// 处理完链上所有节点
}
该方法的核心实现仍然是 transferForSignal(),因此不再进行赘述。与 doSignal()
方法不同的是,该方法会首先保存当前条件队列的快照,然后从快照队列的头节点开始逐个唤醒,直到唤醒所有节点。