AQS 简介

AQS (AbstractQueuedSynchronizer) 是 JUC 中实现并发同步功能的基石,它抽象了同步器的状态管理线程的排队和唤醒、同步器的共享/独占模式等关键部分。同时,AQS 还提供了一套通用的 API,使得开发者能够更方便地使用同步器。

state 变量

在 AQS 中,维护了一个 volatile int state 变量来表示共享资源。通过调用 getStatesetStatecompareAndSetState 方法,可以方便地查看或修改共享资源的值:

private volatile int state;

// get 方法
protected final int getState() {
    return state;
}
// set 方法
protected final void setState(int newState) {
    state = newState;
}
// 基于 CAS 的 set 方法
protected final boolean compareAndSetState(int expect, int update) {
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

AQS 作为一个抽象同步类,并没有限定 state 的使用规则,具体如何利用它由实现类自行决定。例如:

  • ReentrantLockstate 设定成一种可重入锁,当 state == 0 时表示资源已被释放,否则表示重入次数。
  • ReentrantReadWriteLockstate 的高 16 位用作读锁重入次数,低 16 位用作写锁重入次数。
  • Semaphorestate 用于记录当前可用信号数量。
  • CountDownLatchstate 用于记录计数器的当前值。

同步队列

同步队列是整个 AQS 的核心,它是一个双向队列,通过 headtail 两个指针分别指向队列的头和尾,而且队列头节点是一个空节点,不与任何线程关联:

同步队列

初始状态下,headtail 指针均指向 null,当往队列中添加阻塞线程时,会先创建一个空节点,并让 headtail 都指向这个空节点,然后再在空节点后插入被阻塞线程的节点。所以,当 head == tail 的时候,就说明队列为空了。

Node 内部类

Node 内部类提供了以下几个字段:

// 标记该线程是获取共享资源时被阻塞挂起后放入 AQS 队列的
static final Node SHARED = new Node();
// 标记线程是获取独占资源时被挂起后放入 AQS 队列的
static final Node EXCLUSIVE = null;

static final int CANCELLED =  1;
static final int SIGNAL    = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;

//表示节点的状态,不同的值代表不同的含义
volatile int waitStatus;

// 记录前驱节点
volatile Node prev;
// 记录后继节点
volatile Node next;

// 存放进入 AQS 队列里面的线程
volatile Thread thread;

// 表示条件队列中下一个节点
Node nextWaiter;

这里我们重点关注表示节点状态的 volatile int waitStatus,它共有 5 种取值:

  • 0:新节点入队时的默认状态,或者处理过程中的中间状态;

  • CANCELLED (1):表示当前节点已取消调度。

    节点一旦进入该状态,将不再发生状态变化,通常发生在节点超时或被中断时;

  • SIGNAL (-1):表示当前节点的后继节点正在等待唤醒。

    当后继节点入队时,会将前驱节点的状态更新为 SIGNAL,以通知后继节点当前节点已经释放资源。

  • CONDITION (-2):表示当前节点由于不满足某个条件而被阻塞,常用于实现 Condition 接口中的同步队列;

  • PROPAGATE (-3):用于共享同步模式,表示释放共享资源时,需要唤醒的节点不仅包括当前节点的后继节点,还可能包括其他节点。(本质是为了解决 JDK1.6 中共享模式的 BUG 而引入的一种新状态)

waitStatus 的负值表示节点处于有效等待状态,正值表示节点已取消。

AQS 的模板方法

前文提到,AQS 作为一个抽象同步类,并没有限定 state 的使用规则,但这只是其抽象设计的一部分。实际上,AQS 在抽象独占共享两种同步模式时,通过模板方法限定了资源获取 (acquire)释放 (release) 的过程。这样一来子类只需要实现 tryAcquire(int arg)tryRelease(int arg) 等对 state 的具体操作即可实现各种同步策略。

AQS 提供了 acquire()release()acquireShared()releaseShared() 四个基础模板方法。前两个用于独占模式,后两个用于共享模式。下面我们将详细分析这些方法的实现原理。

独占模式 (Exclusive)

获取资源 acquire()

acquire() 方法是独占模式下线程获取共享资源的顶层入口。如果线程获取到资源,则直接返回。否则,线程将被添加到同步队列中,一直等待直到获取到资源为止。需要注意的是,在整个获取资源的过程中,线程将不会触发上层应用的中断处理逻辑。

以下是 acquire() 方法的实现:

@ReservedStackAccess
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
该方法的执行流程如下:
  1. 首先调用 tryAcquire() 钩子函数获取资源,具体交由子类实现;
  2. 如果获取资源失败,则调用 addWaiter() 将该线程加入同步队列尾部,并标记为独占模式;
  3. 然后线程进入 acquireQueued() 方法挂起,直到成功获取资源后才会返回。
  4. acquireQueued() 方法会根据阻塞过程中是否发生过中断来返回结果。如果线程被中断过,则该方法会返回 true,然后调用 selfInterrupt() 方法来补偿中断信号。

以下是本流程中所涉及到的方法的详细介绍。

tryAcquire()

tryAcquire 是 AQS 模板方法中的一个钩子函数,具体需要子类去重写,直接调用会抛出异常,实现如下:

protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

addWaiter()

addWaiter() 方法用于在同步队列的队尾添加一个新的节点,实现如下:

private Node addWaiter(Node mode) {
    // 把当前线程封装为 node,并指定资源访问方式
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    // 如果 tail 不为空,直接把新 node 插到末尾
    if (pred != null) {
        node.prev = pred;
        // 通过 CAS 将 tail 指向新 node
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 如果 tail 是空,说明同步队列未初始化,执行初始化
    // 如果 compareAndSetTail 操作失败,也进入该方法,自旋重试
    enq(node);
    return node;
}
addWaiter 方法内部首先会创建一个新节点,该节点的线程为当前线程,模式为传入的 mode,可以是独占模式 (Node.EXCLUSIVE) 或共享模式 (Node.SHARED)。然后获取同步队列尾节点的 pred。如果 pred 不为空,则将新节点插入队列的尾部;否则,调用 enq 方法对队列进行初始化,并将新节点插入到队尾。最后返回新创建的节点。其中,enq 的实现如下:
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            // CAS:如果头为空,就用虚拟 node 节点初始化头
            if (compareAndSetHead(new Node()))
                tail = head;// 然后将 tail 也指向头
        } else {
            // 已经初始化好了,则 CAS + 自旋,将新节点设为 tail
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

enq 用于初始化同步队列,并通过自旋将 node 节点插入队列末尾。该方法首先获取同步队列的尾节点 tail

  • 如果 tail 为空,表示同步队列为空,需要进行初始化。在初始化过程中,使用 compareAndSetHead 方法将一个空的 Node 设置为 head 节点,head 节点本身不保存任何线程信息。
    • 如果成功设置头节点,则将 tail 指向该头节点。
    • 否则,说明其他线程已完成同步队列的初始化,因此直接开始下一轮循环。
  • 如果 tail 不为空,则将当前节点插入到队尾。具体做法是将当前节点的 prev 指向 tail,然后调用 compareAndSetTail 方法将同步队列的尾节点设置为当前节点。
    • 如果设置成功,则将原 tail 节点的 next 指向当前节点,然后返回原 tail 节点。
    • 否则,自旋重试该过程。

acquireQueued()

线程节点成功插入同步队列后,便会进入 acquireQueued() 中阻塞等待。源码如下:

@ReservedStackAccess
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;// 标记是否成功拿到资源
    try {
        boolean interrupted = false;// 是否发生过中断
        for (;;) {
            // 获取当前线程节点的前驱节点
            final Node p = node.predecessor();
            // 如果前驱节点已经是队列头了,说明当前节点是队列中第一个有效节点,所以尝试获取资源
            if (p == head && tryAcquire(arg)) {
                // 获取到了资源,则将该线程设置为新队头
                setHead(node);
                // 摘下原队头
                p.next = null; // help GC
                failed = false;
                // 返回等待过程中的中断标志
                return interrupted;
            }
            // 如果前驱节点不是 head,或者抢占资源失败了,则检查是否可以安全挂起
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt()) //如果可以安全挂起,则执行挂起操作,否则继续循环 
                // 如果等待过程中被中断过,哪怕只有那么一次,也会将 interrupted 标记为 true
                interrupted = true;
        }
    } finally {
        // 如果等待过程中没有成功获取资源(如 timeout,或者可中断的情况下被中断了),
        // 那么取消节点在队列中的等待
        if (failed)
            cancelAcquire(node);
    }
}

该方法首先判断当前线程节点的前驱节点是否是头节点:

  • 如果是则说明当前节点是同步队列中最靠前的等待节点,所以调用 tryAcquire 尝试获取资源;

  • 如果不是头节点,或者上一步获取资源失败,则调用 shouldParkAfterFailedAcquire() 判断能否被安全挂起,当满足挂起条件时调用 parkAndCheckInterrupt() 将线程挂起。

  • 如果成功获取到资源,则调用 setHead() 将当前节点设置为新的空队头(即移除该线程),实现如下:

        private void setHead(Node node) {
            head = node;
            // 队头都是空节点,不关联线程信息
            node.thread = null;
            node.prev = null;
        }
        

shouldParkAfterFailedAcquire()

shouldParkAfterFailedAcquire 方法用于确定当前线程是否可以安全地休眠,以及在某些情况下是否应该继续尝试获取同步资源。如果线程能够被安全挂起,该方法将会返回 true,否则返回 false。实现如下:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        // 前驱节点的状态已经被设置为 SIGNAL,则当前线程可以安全的休眠
        return true;
    if (ws > 0) {
        // 前驱节点状态为 CANCELLED,则向前遍历,直到遇到正常状态的节点
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        // 将所有 CANCELLED 节点短路(异常节点将被 GC)
        pred.next = node;
    } else {
        // 如果前驱节点状态正常,但不为 SIGNAL,则通过 CAS 操作设置前驱节点状态,并返回 false
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

parkAndCheckInterrupt()

parkAndCheckInterrupt() 方法用于将当前线程挂起(休眠),直到有其他线程中断当前线程或者等待时间结束。实现如下:

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    // 返回中断状态,并清除中断标志
    return Thread.interrupted();
}

// 其中,LockSupport.park() 实现如下:
public static void park(Object blocker) {
    Thread t = Thread.currentThread();
    setBlocker(t, blocker);
    UNSAFE.park(false, 0L);
    setBlocker(t, null);
}

parkAndCheckInterrupt() 方法通过调用 Unsafe.park 方法让当前线程进入休眠状态,并在其他线程调用了 UNSAFE.unpark(Thread thread) 方法或 thread.interrupt() 方法时被唤醒,同时设置中断标志位。

需要注意的是,park() 方法在阻塞线程时会检查线程的中断状态,如果线程已经被中断,它会立即返回而不进入阻塞状态。

selfInterrupt

当线程被中断唤醒后,会调用 selfInterrupt() 方法补偿被清空的中断标志,供上层业务使用。该方法实现如下:

static void selfInterrupt() {
    Thread.currentThread().interrupt();
}

释放资源 release()

release() 方法是独占模式下线程释放共享资源的顶层入口。如果线程成功释放了资源,则唤醒后继节点并返回 true,否则直接返回 false。

@ReservedStackAccess
public final boolean release(int arg) {
    // 尝试释放资源
    if (tryRelease(arg)) {
        // 获取同步队列头节点
        Node h = head;
        // 如果节点状态不等于 0,则唤醒后继节点
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

tryRelease()

tryRelease() 也是模板方法 release() 中的钩子函数,这里不再赘述了。

protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}

unparkSuccessor()

unparkSuccessor() 用于唤醒 node 的后继节点。实现如下:

private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    // 如果参数节点的状态为负(正常状态),则尝试将它的状态设为 0
    // 这样避免过多的线程进入这个方法
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    // 拿到参数节点的后继节点
    Node s = node.next;
    // 如果后继节点是空,或者状态为 CANCELLED
    if (s == null || s.waitStatus > 0) {
        s = null;
        //则从 tail 向前遍历,找到最靠前的正常状态的(且非参数节点本身)节点
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // 后继节点不为空,或者从后往前找到了正常节点,将其内部线程唤醒
    if (s != null)
        LockSupport.unpark(s.thread);
}

需要注意的是,在正常情况下,头节点的后继节点需要被唤醒。但是,如果发生意外情况,例如头节点的下一个节点此时恰好为 null,或者后继节点状态异常,就需要从 tail 开始从后往前查找最靠前的正常状态的节点(且非头节点本身),然后唤醒它。

共享模式 (Share)

获取资源:acquireShared()

acquireShared() 方法是共享模式下线程获取资源的顶层入口。在整个获取资源的过程中,线程将不会触发上层应用的中断处理逻辑。实现如下:

public final void acquireShared(int arg) {
    // 调用钩子函数获取资源
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg); //如果获取失败,加入同步队列重试
}

tryAcquireShared()

protected int tryAcquireShared(int arg) {
    throw new UnsupportedOperationException();
}

tryAcquireShared() 也是一个钩子函数,AQS 中未提供默认实现,但规定了该方法返回值的含义,需要我们重点关注:

  • < 0:表示当前线程未获取到资源,需要进一步等待。
  • == 0:表示当前线程已经获取到资源,但是剩余资源不足,后续线程无法继续获取。
  • > 0:表示当前线程已经获取到资源,并且后续线程也可以获取资源。

doAcquireShared()

doAcquireShared()acquireShared 中的核心方法,实现与 acquirequeued() 类似:

private void doAcquireShared(int arg) {
    // 将线程加入同步队列末尾,设置为共享模式
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                // 如果前驱节点是头节点,尝试获取一次共享资源
                int r = tryAcquireShared(arg);
                if (r >= 0) {// 成功获取到资源
                    // 将当前节点设置为新的头节点,并根据条件决定是否唤醒后续节点
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted) // 如果被中断过,补偿中断
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            // 抢占资源失败,判断能否安全休眠,如果可以则休眠
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

在共享模式下,同步队列的管理与独占模式相似。但需要在获取资源后,判断剩余资源是否足够满足后续线程的需求。这一关键步骤决定了后续线程是等待还是继续尝试获取资源。

setHeadAndPropagate()

setHeadAndPropagate() 方法的主要作用是设置队列头节点并决定是否唤醒后继节点。实现如下:

private void setHeadAndPropagate(Node node, int propagate) {
    // 缓存旧的头节点
    Node h = head;
    // 将当前节点设置为新的头节点,并将原头节点摘下 GC
    setHead(node);
    // 如果 propagate > 0,表示剩余资源足够
    // 如果原头节点为空,或状态为负
    // 如果当前节点为空,或状态为负
    //  上述条件满足一个,就唤醒后继共享模式节点
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

后继节点的唤醒由 doReleaseShared() 方法实现,我们将在 下文 对其进行介绍。这里我们重点关注唤醒后继节点的条件:#todo

释放资源:releaseShared()

releaseShared() 是共享模式下实现资源释放的顶层方法。源码如下:

@ReservedStackAccess
public final boolean releaseShared(int arg) {
    // 尝试释放资源
    if (tryReleaseShared(arg)) {
        // 释放成功,唤醒后继节点
        doReleaseShared();
        return true;
    }
    // 释放失败,返回 false
    return false;
}

doReleaseShared()

doReleaseShared() 方法是共享模式下资源释放的核心方法。源码如下:

private void doReleaseShared() {
    for (;;) {
        // 缓存当前头节点
        Node h = head;
        // 如果头节点不为空,且存在线程节点
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                // 如果头节点状态为 SIGNAL,则将其更新为 0,更新失败则自旋重试
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h); // CAS 成功的线程,唤醒后继节点
            }
            else if (ws == 0 && // 如果头节点状态已经为 0,说明前边有线程处理过,将其更新为 PROPAGATE
                        !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        // 如果上边代码执行过程中有其他线程节点获取到了资源,就继续循环,否则退出方法
        if (h == head)
            break;
    }
}
  • 该方法只有在头节点的状态为 SIGNAL 时才会唤醒后继节点,并将头节点的状态设置为中间状态 0。
  • 如果在此时有其他线程执行了 releaseShared() 操作,也进入了该方法,由于头节点未发生变化,因此会获取到中间状态的 waitStatus == 0,于是将头节点状态设置为 PROPAGATE
  • 如果在 doReleaseShared() 方法执行期间,头节点未发生变更,则可以直接安全地返回。但是,如果在方法执行期间,有其他节点线程被唤醒,并成功获取到了资源,将自己设置成了新的头节点,那么就需要重新进行循环加速处理过程。

Condition 同步条件

条件变量使用示例

在开始介绍 AQS Condition 实现原理之前,我们先来了解下条件变量的使用方法。与 wait/notify 必须配合 synchronized 关键字使用一样,Condition 也必须与 ReentrantLock 配合使用:

// 创建一个互斥锁
Lock lock = new ReentrantLock(true);
// 从 lock 中获取一个条件变量
Condition cdt = lock.newCondition();

拿到条件变量 cdt 后,需要等待条件成立的线程便可通过 await() 调用阻塞等待:

// 首先需要获取到锁
lock.lock();
try {
    if (The condition is not met) {
        cdt.await(); // 阻塞等待条件成立
    }
    // 条件成立,执行任务
    run();
} finally {
    lock.unlock();
}

其它线程使条件成立后,调用 cdt.signal() 便可唤醒上述等待的线程:

// 首先需要获取到锁
lock.lock();
try {
    // 运行致使条件成立的代码
    cdt.signal(); // 然后唤醒等待的线程
} finally {
    lock.unlock();
}

Condition 接口

java.util.concurrent.locks.Condition 接口提供了与 wait/notify 功能类似的一组同步方法:

public interface Condition {
    // 等待条件成立
    void await() throws InterruptedException;
    void awaitUninterruptibly();
    ...
    // 唤醒等待中的线程
    void signal();
    void signalAll();
}

ConditionObject 内部类

AQS 提供了一个继承自 Condition 的内部类 java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject,用于在同步器中进行线程等待和唤醒的对象。

ConditionObject 中维护了一个类似于 AQS 同步队列的条件队列,其节点类型仍为 Node。这些节点通过 Node nextWaiter 字段形成了一个单向链表,用于保存正在等待条件的线程。ConditionObject 还通过两个字段分别指向条件队列的头和尾节点:

public class ConditionObject implements Condition, java.io.Serializable {
    private transient Node firstWaiter;
    private transient Node lastWaiter;
    ...
}

这些线程在其他线程调用 signal()signalAll() 方法时被唤醒。

await() 实现原理

await() 方法是 ConditionObject 内部类中最核心的方法之一。该方法用于使当前线程等待在与该 ConditionObject 关联的条件上,并在条件满足时重新去获取共享资源。实现如下:

public final void await() throws InterruptedException {
    // 如果线程已被中断,直接抛出中断异常
    if (Thread.interrupted())
        throw new InterruptedException();
    // 将当前线程封装为一个 Condition 节点,并加到条件队列末尾
    //  内部不需要 CAS,因为此时已经获取到了独占资源
    Node node = addConditionWaiter();
    // 阻塞前,先释放当前节点所持有的资源,否则会死锁
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // 如果 node 在同步队列上,跳出循环
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this); // 阻塞自己
        // 如果收到中断新号,则将节点添加到同步队列,跳出循环
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 当线程被唤醒,并且被添加到同步队列后,交给 aqs 同步队列重新获取资源
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    // 获取到资源后,删除条件队列中废弃的节点
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    // 根据中断模式处理中断
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}
  • await() 方法内部涉及到了中断模式 interruptMode,它由 ConditionObject 中的两个常量字段指定:

        /** 退出 await 状态时,只需补偿中断信号 */
        private static final int REINTERRUPT =  1;
        /** 推出 await 状态时,需要直接抛出中断异常 */
        private static final int THROW_IE    = -1;
        

如果线程在等待过程中线程被中断,则根据中断模式进行处理:如果是 THROW_IE 模式,会重新抛出中断异常;如果是 REINTERRUPT 模式,则仅会调用 selfInterrupt() 方法补偿中断标志位。

以下是本流程中所涉及到的方法的详细介绍。

addConditionWaiter()

addConditionWaiter() 方法用于创建一个新的 Condition 类型节点,并将其添加到条件队列中。如果条件队列为空,则初始化该队列。实现如下:

private Node addConditionWaiter() {
    Node t = lastWaiter;
    // 如果条件队列尾节点状态不是 CONDITION,执行一次清理操作
    //  清理完成后,更新 t 引用
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    // 将当前线程封装为一个 CONDITION 节点
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    // 如果尾节点为空,则初始化队列,否则直接将 node 插到末尾
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

可以看到,该方法并没有使用 CAS 操作来插入新节点(nextWaiter 字段甚至没有用 volatile 关键字修饰),这是因为 Condition 只用于可重入写锁,同一时刻只有一个线程会持有锁,因此是线程安全的

fullyRelease()

AQS 中的 AbstractQueuedSynchronizer#fullyRelease 方法通过调用 AbstractQueuedSynchronizer#release 释放当前节点持有的所有资源,并唤醒后继节点。实现如下:

final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        // 获取节点的同步资源
        int savedState = getState();
        // 释放资源,并唤醒后继节点
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        // 如果释放资源失败,废弃该节点
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}

isOnSyncQueue

AQS 中的 AbstractQueuedSynchronizer#isOnSyncQueue() 方法用于确认节点是否在同步队列上。实现如下:

final boolean isOnSyncQueue(Node node) {
    // 如果状态为 Condition,或者前驱节点为 null(单向节点)
    //  说明是条件队列节点,不在同步队列上
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    // 如果 `next` 指向后继节点,说明是同步队列节点
    //  因为条件队列用的是 nextWaiter 字段
    if (node.next != null) // If has successor, it must be on queue
        return true;
    // 上述条件都不满足,从同步队列尾节点向前遍历
    return findNodeFromTail(node);
}

可见,该方法利用两种队列的特性,在性能上做了一定的优化,当优化手段都失效后,再执行以下方法兜底:

private boolean findNodeFromTail(Node node) {
        Node t = tail;
    for (;;) {
        // 同步队列上找到了当前节点,说明这个节点在同步队列上
        if (t == node)
            return true;
        // 找到头依旧没找到,说明不在同步队列上
        if (t == null)
            return false;
        t = t.prev;
    }
}

checkInterruptWhileWaiting()

checkInterruptWhileWaiting() 方法用于检查中断信号。如果发生了中断,则返回中断模式,否则返回 0。实现如下:

private int checkInterruptWhileWaiting(Node node) {
    return Thread.interrupted() ?
        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
        0;
}

当线程在条件队列上等待时被中断,会调用 transferAfterCancelledWait 方法将节点添加到同步队列。该方法会通过 CAS 操作将节点的状态由 CONDITION 更新为中间态 0。

  • 如果 CAS 操作成功,则将该节点添加至同步队列,并返回 true。
  • 如果 CAS 操作失败,说明节点的状态已经被其他线程更改,那么方法会自旋,直到该节点被成功添加至同步队列,再返回 false。

transferAfterCancelledWait 源码如下:

final boolean transferAfterCancelledWait(Node node) {
    // 更新状态为 0
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
        // 将节点添加到同步队列
        enq(node);
        return true;
    }
    // 如果当前节点被插入尾节点过程中有其他线程调用该方法,则到这里自旋等待
    //  直到节点被成功添加,再返回 false
    while (!isOnSyncQueue(node))
        Thread.yield();
    return false;
}

最终 checkInterruptWhileWaiting() 方法根据 transferAfterCancelledWait 的返回值,设置中断模式:

  • 如果返回 true,中断模式会被设置为 THROW_IE,这意味着该线程会抛出 InterruptedException
  • 否则,中断模式会被设置为 REINTERRUPT,这意味着该线程会被补偿中断信号。

unlinkCancelledWaiters()

unlinkCancelledWaiters() 方法负责将条件队列上的非 Condition 节点摘除。实现如下:

private void unlinkCancelledWaiters() {
    Node t = firstWaiter;
    Node trail = null;
    while (t != null) {
        Node next = t.nextWaiter;
        if (t.waitStatus != Node.CONDITION) {
            // 找到非 Condition 节点,将其短路
            t.nextWaiter = null;
            if (trail == null)
                firstWaiter = next;
            else
                trail.nextWaiter = next;
            if (next == null) // 找到了末尾,更新队列尾节点
                lastWaiter = trail;
        }
        else
            trail = t; //缓存上一个 Condition 节点
        t = next;
    }
}
  • 方法仅通过 volatile 语义保证一致性。因为方法内只涉及节点的短路,不涉及状态的变化,因此多个线程的重复操作不会影响一致性。

reportInterruptAfterWait()

reportInterruptAfterWait 方法用于根据中断模式处理中断信号。实现如下:

private void reportInterruptAfterWait(int interruptMode)
        throws InterruptedException {
        if (interruptMode == THROW_IE)
            throw new InterruptedException();
        else if (interruptMode == REINTERRUPT)
            selfInterrupt();
    }

如果是 THROW_IE 模式,这里会重新抛出中断异常;如果是 REINTERRUPT 模式,则仅调用 selfInterrupt() 方法来补偿中断标志位。

awaitUninterruptibly() 实现原理

awaitUninterruptiblyawait 方法作用类似,只是不会响应中断信号:

public final void awaitUninterruptibly() {
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    boolean interrupted = false;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        //如果发生中断,只做个标记,继续循环
        if (Thread.interrupted())
            interrupted = true;
    }
    if (acquireQueued(node, savedState) || interrupted)
        selfInterrupt();
}

signal() 实现原理

signal() 方法用于唤醒条件队列中的第一个节点,并将其从条件队列中移除。实现如下:

public final void signal() {
    // 只有持有排他资源,才能执行该方法,否则抛异常
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    // 唤醒条件队列头节点
    if (first != null)
        doSignal(first);
}

其中,doSignal() 是发送信号的核心实现:

private void doSignal(Node first) {
    do {
        // 头指针后移
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;//摘下原头节点
    } while (!transferForSignal(first) &&
                (first = firstWaiter) != null);
}

该方法首先将条件队列的头节点指向第二个节点。如果第二个节点为 null,说明之前条件队列上只有一个节点,因此将尾节点 lastWaiter 也更新为 null。然后,它会调用 transferForSignal() 方法将被摘下的头节点添加到同步队列的末尾:

final boolean transferForSignal(Node node) {
    // 通过 CAS 将当前 Condition 节点状态更新为 0
    // 相当于废弃条件队列上的该节点
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;// 更新失败,说明节点已被其他线程处理,返回 false

    // 如果更新成功,将当前节点添加到同步队列末尾
    //  并返回原同步队列尾节点,即当前 node 的前驱节点
    Node p = enq(node);
    int ws = p.waitStatus;
    // 如果同步队列原先的尾节点状态正常,
    //   则通过 CAS 将其设置状态设置为 SIGNAL,
    //   如果设置失败立刻唤醒当前节点
    // 如果同步队列原先的尾节点已被取消,也立刻唤醒当前节点
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

通常情况下,signal() 方法不会直接唤醒条件队列的头节点,而是将其委托给同步队列去唤醒;只有在添加到同步队列后,前驱节点被取消或者状态无法被设置为 SIGNAL 时,才会亲自唤醒头节点。

signalAll() 实现原理

signalAll() 方法用于唤醒等待在条件队列上的所有线程。实现如下:

public final void signalAll() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignalAll(first);
}

其中,doSignalAll() 是发送信号的核心实现:

private void doSignalAll(Node first) {
    // 头尾指针都指向空,相当于保存当前条件队列的镜像
    //  后续线程可以向新队列插入数据,互不影响
    lastWaiter = firstWaiter = null;
    do {
        Node next = first.nextWaiter;
        // 摘下快照队列头节点
        first.nextWaiter = null;
        // 唤醒摘下的头节点
        transferForSignal(first);
        // 快照队列头指针后移
        first = next;
    } while (first != null);// 处理完链上所有节点
}

该方法的核心实现仍然是 transferForSignal(),因此不再进行赘述。与 doSignal() 方法不同的是,该方法会首先保存当前条件队列的快照,然后从快照队列的头节点开始逐个唤醒,直到唤醒所有节点。