StampedLock
本文详细介绍 StampedLock 的源码实现,包括 StampedLock 使用示例,乐观读锁、悲观读锁、写锁的原理与实现
在 读写锁 一文中我们了解到,ReentrantReadWriteLock
是一种悲观读锁,读写之间仍然是互斥的,实际使用时性能并不高,而且容易导致写锁饥饿。因此,JDK 8 引入了一款支持 “乐观读” 策略的新式读写锁 ——StampedLock
。
StampedLock
读取的时候可以不对资源加锁,只有读取后发现数据被修改了,才升级成“悲观读”,一定程度上降低了读锁的地位,让资源更向写锁一边倾斜,降低写锁饥饿的概率。
使用示例
在分析源码前,我们先来看看源码中给出的官方使用示例:
class Point {
private double x, y;
private final StampedLock sl = new StampedLock();
// 多个线程调用该方法,修改 x,y
void move(double deltaX, double deltaY) {
long stamp = sl.writeLock();// 写锁
try {
x += deltaX;
y += deltaY;
} finally {
sl.unlockWrite(stamp);
}
}
// 多个线程调用该方法,求距离
double distanceFromOrigin() {
long stamp = sl.tryOptimisticRead(); // 使用“乐观读”
double currentX = x, currentY = y; //将数据拷贝到线程本地栈
if (!sl.validate(stamp)) { //读取后,判断数据是否被修改过
stamp = sl.readLock(); //如果被修改过,升级为悲观读锁
try {
currentX = x;
currentY = y;
} finally {
sl.unlockRead(stamp);
}
}
return Math.sqrt(currentX * currentX + currentY * currentY);
}
// 多个线程调用该方法,移动坐标
void moveIfAtOrigin(double newX, double newY) {
long stamp = sl.readLock();
try {
while (x == 0.0 && y == 0.0) {
long ws = sl.tryConvertToWriteLock(stamp);
if (ws != 0L) {
stamp = ws;
x = newX;
y = newY;
break;
} else {
sl.unlockRead(stamp);
stamp = sl.writeLock();
}
}
} finally {
sl.unlock(stamp);
}
}
}
示例的 Point
类中有三个支持多线程调用的方法:
move()
方法:该方法用于移动坐标,支持多线程调用,由于是纯粹的“写操作”,所以方法直接使用了互斥的写锁writeLock
&unlockWrite
。distanceFromOrigin()
方法:该方法用于计算点到原点的距离,支持多线程调用,由于是纯粹的“读操作”,因此方法一开始选择了乐观读锁tryOptimisticRead()
,如果读取后又有其他线程修改了数据导致版本号不匹配,就重新申请悲观读锁readLock()
。moveIfAtOrigin()
方法:该方法用于将原点的坐标移到目标位置,支持多线程调用,该方法选择先通过悲观读锁确认当前点位于坐标轴原点,当确认后再通过tryConvertToWriteLock()
方法直接将悲观读锁升级为写锁。
state 变量
作为一个读写锁,StampedLock
也提供了一个分成两份的 state
变量,只不过 StampedLock
重新实现了一版 CLH 队列,并没有像 ReentrantReadWriteLock
一样基于 AQS 实现。因此,它将 state
直接设为了自己的私有变量。声明如下:
public class StampedLock implements java.io.Serializable {
// state 字段
private transient volatile long state;
// state 的初始值 1 0000 0000,即 256
private static final long ORIGIN = WBIT << 1;
private static final int LG_READERS = 7;
private static final long RUNIT = 1L;
// 写锁掩码 1000 0000 == 128,第八位表示写锁
private static final long WBIT = 1L << LG_READERS;
// 读锁掩码 0111 1111 == 127,低七位表示读锁
private static final long RBITS = WBIT - 1L;
// 读锁的总数量:0111 1110 == 126
private static final long RFULL = RBITS - 1L;
// 综合掩码:1111 1111 == 255
private static final long ABITS = RBITS | WBIT;
// 1...1 1000 0000 = -128
private static final long SBITS = ~RBITS;
}
如上所示,StampedLock
用 state
变量的低 8 位表示读锁和写锁的状态,其中:
- 第 8 位表示写锁的状态(只拥有一个比特位,所以写锁不支持重入);
- 低 7 位表示读锁的状态。
乐观读实现原理
当 state
为初始状态 ORIGIN
时,其二进制值为 WBIT
左移一位,即 1 0000 0000
。为什么 state
变量的默认值不是 0 呢?这就涉及到了乐观锁的实现原理,源码如下:
public long tryOptimisticRead() {
long s;
// 第八位为 1,表示写锁被占用,此时直接返回 0
// 否则返回 s & SBITS
return (((s = state) & WBIT) == 0L) ? (s & SBITS) : 0L;
}
当写锁被占用时,((s = state) & WBIT) != 0L
,因此方法直接返回 0;当悲观读锁被占用时,state
的第 9 位以上肯定存在不为 0 的位(初始化与写锁原理使然),因此 s & SBITS
始终大于 0;当既没有读锁占用也没有写锁占用,也就是初始状态时,state == ORIGIN
,即第 9 位为 1 其他位都为 0,因此 s & SBITS
等于 256 也大于 0。所以 tryOptimisticRead()
在存在写锁占用时返回 0,否则该方法均返回正值。
获取到乐观锁后,需要接着调用 validate()
判断锁是否有效:
public boolean validate(long stamp) {
U.loadFence(); // sun.misc.Unsafe
// 当 stamp 等于 0 时,该方法永远返回 false
return (stamp & SBITS) == (state & SBITS);
}
该方法会判断 tryOptimisticRead()
的返回值与当前 state
变量的第 9 位以上部分是否相等,如果 tryOptimisticRead()
返回 0,则 validate()
必然返回 false,表明存在写锁占用,乐观锁失效;如果 tryOptimisticRead()
返回值大于 0,但等式不成立,那么说明期间存在其他线程完成过写锁的加锁与释放,乐观锁也会失效;其他情况下乐观锁有效,返回 true。
我们注意到,在取得 stamp
并比较之前,还插入了一个内存屏障 U.loadFence()
,该屏障可以禁止其两边代码之间的重排序,因此上文代码示例中才“敢”在调用 tryOptimisticRead()
后,先执行变量复制操作 double currentX = x, currentY = y
,而不是紧接着判断乐观读是否有效。
WNode 节点类
StampedLock
内部重新实现的 CLH 队列的节点 WNode
源码如下:
/** 队列节点 */
static final class WNode {
volatile WNode prev;
volatile WNode next;
volatile WNode cowait; // list of linked readers
volatile Thread thread; // non-null while possibly parked
volatile int status; // 0, WAITING(-1), or CANCELLED(1)
final int mode; // RMODE(0) or WMODE(1)
/* 节点初始化的时候,会设定当前节点的类型,以及其前驱节点 */
WNode(int m, WNode p) { mode = m; prev = p; }
}
/** CLH 队列头节点 */
private transient volatile WNode whead;
/** CLH 队列尾节点 */
private transient volatile WNode wtail;
由 WNode
组成的 CLH 队列与 AQS 中的队列有相似之处,默认情况下,头尾节点都指向 null。在初始化后,会将一个空的 WNode
设置为整个队列的头节点,并将与线程绑定的节点插入到头节点之后。因此,整个队列中的头节点作为空的虚节点存在。
特殊的是,WNode
中有一个 cowait
指针,会串联所有等待读锁资源的节点,方便批量唤醒线程。
写锁实现原理
写锁加锁操作
写锁的加锁操作包含 writeLock()
、tryWriteLock()
等多个方法,这里我们只分析其中的典型方法 writeLock()
:
public long writeLock() {
long s, next;
return ((((s = state) & ABITS) == 0L &&
U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ?
next : acquireWrite(false, 0L));
}
写锁不支持重入且与读锁互斥,所以只有当 state
的低八位都为 0 时,才允许抢锁。当抢锁失败后,再将后续的重试过程移交给 acquireWrite()
方法。acquireWrite()
的实现如下:
private long acquireWrite(boolean interruptible, long deadline) {
WNode node = null, p;
/* 第一个大循环,包含入队前的 CAS 抢锁操作、队列初始化以及尝试入队操作 */
for (int spins = -1;;) { // spin while enqueuing
long m, s, ns;
// 如果 state 低 8 位为 0 了,尝试 CAS 加锁,否则继续自旋
if ((m = (s = state) & ABITS) == 0L) {
if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT))
return ns;
}
else if (spins < 0) // 更新 spins
// 当前锁被其他线程写锁占用,且队列为空,则 spins = 0,退出自旋
// 否则初始化 spins
spins = (m == WBIT && wtail == whead) ? SPINS : 0;
// SPINS 为进入队列前的最大重试次数,
// 值为 (NCPU > 1) ? 1 << 6 : 0
else if (spins > 0) { // 根据随机函数决定是否减少 spins
if (LockSupport.nextSecondarySeed() >= 0)
--spins;
}
else if ((p = wtail) == null) {
// 如果队列为空,则初始化队列
WNode hd = new WNode(WMODE, null);
if (U.compareAndSwapObject(this, WHEAD, null, hd))
wtail = hd;
}
else if (node == null) // 初始化节点
node = new WNode(WMODE, p);
else if (node.prev != p)
node.prev = p;
else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
p.next = node;
break; // 最终将节点插入队列,跳出循环
}
}
/* 第二个大循环,包含节点阻塞前的自旋、阻塞与唤醒操作 */
for (int spins = -1;;) {
WNode h, np, pp; int ps;
if ((h = whead) == p) { // 如果队列此前没有其他节点在等待
if (spins < 0) // 则先初始化 spins
// HEAD_SPINS 为阻塞线程前的最大自旋次数,
// 值为 (NCPU > 1) ? 1 << 10 : 0
spins = HEAD_SPINS;
else if (spins < MAX_HEAD_SPINS) {
// MAX_HEAD_SPINS = (NCPU > 1) ? 1 << 16 : 0
// 第二次进入循环时,说明是 re-locking 操作,
// 则扩大自旋次数上限
spins <<= 1;
}
// 阻塞前自旋 spins 次,尝试 CAS 加锁
for (int k = spins;;) { // spin at head
long s, ns;
if (((s = state) & ABITS) == 0L) {
if (U.compareAndSwapLong(this, STATE, s,
ns = s + WBIT)) {
whead = node;
node.prev = null;
// 成功则返回 384(ORIGIN + WBIT)
return ns;
}
}
else if (LockSupport.nextSecondarySeed() >= 0 &&
--k <= 0)
break; // 自旋次数超了,则跳出循环
}
}
// 如果队列中存在其他等待读锁的节点
else if (h != null) {
// 则沿着 cowait 全部唤醒
WNode c; Thread w;
while ((c = h.cowait) != null) {
if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
(w = c.thread) != null)
U.unpark(w);
}
}
// 如果本次自旋执行到这里,头节点仍然没变化,则阻塞
if (whead == h) {
if ((np = node.prev) != p) {
// 上次自旋如果 p 是 CANCELLED,会被短路,
// 所以这里更新 p 引用,使它继续指向当前的前驱节点
if (np != null)
(p = np).next = node;
}
else if ((ps = p.status) == 0) // 更新前驱节点状态
U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
else if (ps == CANCELLED) {
// 如果 p 被取消了,则短路 p
if ((pp = p.prev) != null) {
node.prev = pp;
pp.next = node;
}
}
else { // 阻塞线程
long time; // 0 argument to park means no timeout
if (deadline == 0L)
time = 0L;
else if ((time = deadline - System.nanoTime()) <= 0L)
return cancelWaiter(node, node, false);
Thread wt = Thread.currentThread();
U.putObject(wt, PARKBLOCKER, this);
node.thread = wt;
if (p.status < 0 && (p != h || (state & ABITS) != 0L) &&
whead == h && node.prev == p)
U.park(false, time); // 阻塞,等待被唤醒
node.thread = null;
U.putObject(wt, PARKBLOCKER, null);
if (interruptible && Thread.interrupted())
return cancelWaiter(node, node, true);
}
}
}
}
整个 acquireWrite
方法是两个大的 for
循环,内部实现了非常复杂的自旋策略:
- 第一个大循环的目的是创建当前线程的
WNode
,并将其插入 CLH 队列的尾部。自旋过程中如果发现锁未被任何线程占用时,还会通过 CAS 尝试加锁。 - 第二个大循环的目的是阻塞当前线程:
- 当程序进入第二个大循环,说明当前
WNode
已经位于 CLH 队列的尾部,可以阻塞了。- 不过在阻塞前,如果当前 CLH 队列中没有其他等待的节点,还会继续自旋加锁直到达到
MAX_HEAD_SPINS
指定的次数。 - 如果 CLH 队列中存在其他阻塞的节点,因由于锁之间不互斥,所以这里会沿着
cowait
链表唤醒全部读锁节点,对应上述代码中高亮的 68 ~ 76 行。
- 不过在阻塞前,如果当前 CLH 队列中没有其他等待的节点,还会继续自旋加锁直到达到
- 当其他线程在解锁操作中调用
release()
方法时,便会唤醒等待队列中阻塞等待的第一个节点,被唤醒的节点再接着执行第二个大循环中剩下的代码。
- 当程序进入第二个大循环,说明当前
- 在循环的过程中,如果 CAS 成功,则返回
state
的值为 384,否则一直循环下去直到被中断信号取消。
写锁解锁操作
写锁的解锁操作为 unlockWrite(long stamp)
,实现如下:
public void unlockWrite(long stamp) {
WNode h;
if (state != stamp || (stamp & WBIT) == 0L)
throw new IllegalMonitorStateException();
state = (stamp += WBIT) == 0L ? ORIGIN : stamp;
if ((h = whead) != null && h.status != 0)
release(h);
}
当释放写锁时,首先通过 state
判断写锁是否被占用,如果没有被占用则抛出异常;否则更新 state
的值。这里我们重点关注 state
值的更新表达式:(stamp += WBIT) == 0L ? ORIGIN : stamp
,这里并没有直接将 state
值的第 8 位设为 0,而是通过进位操作让 state
的低八位为 0。不断加读锁,则不断进位,直到溢出后再将 state
初始化为 ORIGIN
。
state
第 9 位以上的部分保留写锁的操作痕迹,相当的精妙!最后当 CLH 头部的虚节点不为空,且状态正常时,调用 release()
唤醒等待队列中第一个等待的线程。实现如下:
private void release(WNode h) {
if (h != null) {
WNode q; Thread w;
U.compareAndSwapInt(h, WSTATUS, WAITING, 0); // 更新虚节点状态为 0
if ((q = h.next) == null || q.status == CANCELLED) {
// 如果等待队列中的第一个实节点为空,或状态被取消,
// 则从后往前寻找第一个正常的节点,进行唤醒
for (WNode t = wtail; t != null && t != h; t = t.prev)
if (t.status <= 0)
q = t;
}
if (q != null && (w = q.thread) != null)
U.unpark(w); // 唤醒目标节点
}
}
读锁实现原理
读锁加锁操作
StampedLock
的读锁加锁方法包括 readLock()
、tryReadLock()
和 readLockInterruptibly()
等,本文只介绍其中的典型 readLock()
方法,实现如下:
public long readLock() {
long s = state, next;
return ((whead == wtail && (s & ABITS) < RFULL &&
U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) ?
next : acquireRead(false, 0L));
}
由于读锁与写锁互斥,且 StampedLock
的设计目标是降低写锁饥饿的概率,所以只有当 CLH 队列为空,且读锁还有剩余资源时,才会通过 CAS 抢锁,如果抢锁成功则返回 CAS 后的 state
值 next
。以上情况都不满足则调用 acquireRead()
托管后续的重试过程。读锁的 acquireRead()
方法与写锁的 acquireWrite() 整体流程类似,只不过当读锁剩余资源数量不足时,会考虑对读锁容量进行扩容:
if ((m = (s = state) & ABITS) < RFULL ? // 判断当前 state 是否小于 RFULL (126)
U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) : // CAS 抢锁
(m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) // 否则扩容
{
WNode c; Thread w;
whead = node;
node.prev = null;
while ((c = node.cowait) != null) {
if (U.compareAndSwapObject(node, WCOWAIT,
c, c.cowait) &&
(w = c.thread) != null)
U.unpark(w); // 唤醒其他等待中的读锁节点
}
return ns; //加锁成功,返回
}
tryIncReaderOverflow()
实现如下:
private long tryIncReaderOverflow(long s) {
// assert (s & ABITS) >= RFULL;
if ((s & ABITS) == RFULL) { // 如果 state 恰好等于 RFULL (126)
// 尝试通过 CAS 将 state 由 126 设置为 127
if (U.compareAndSwapLong(this, STATE, s, s | RBITS)) {
// 设置成功的线程将 readerOverflow+1,再将 state 降回 126
// 由于只有一个线程会 CAS 成功,所以这里是线程安全的
++readerOverflow;
state = s;
return s;
}
}
// state 升到 127 后,其他线程会进入这个分支,让出 CPU
else if ((LockSupport.nextSecondarySeed() &
OVERFLOW_YIELD_RATE) == 0)
Thread.yield();
// 除了上述 CAS 成功的线程,其他线程都返回 0
return 0L;
}
因此,扩容过程中 CAS 成功的线程会加锁成功并返回,其他线程都会继续下一轮自旋。
StampedLock
将 state
变量的低 7 位用作读锁资源,那么理论上读锁资源总数应该有 127 个才对,为什么要设置为 126 呢?这就与上方介绍的 tryIncReaderOverflow()
方法实现有关:tryIncReaderOverflow()
方法的扩容是通过 CAS state
变量实现的,判定成功的条件就是线程是否将 state
由 126 设置为 127,因此这里预留了一个值用于 CAS。解答了第一个问题,第二个问题就简单了:state
变量的 bit 位被卡死了 7 位,因此需要新的变量来存储扩容后的读锁加锁数量。读锁解锁操作
读锁的解锁操作为 unlockRead(long stamp)
,实现如下:
public void unlockRead(long stamp) {
long s, m; WNode h;
for (;;) {
// 如果参数值不与当前 state 相等
// 或 stamp 低八位为 0
// 或当前 state 低八位为 0
// 或写锁被占用
// 则抛出异常
if (((s = state) & SBITS) != (stamp & SBITS) ||
(stamp & ABITS) == 0L || (m = s & ABITS) == 0L || m == WBIT)
throw new IllegalMonitorStateException();
if (m < RFULL) {
if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) { // 释放读锁
if (m == RUNIT && (h = whead) != null && h.status != 0)
release(h); // 释放 head 的后继节点
break;
}
}
// 如果之前提高过读锁上限,这里降低上限
else if (tryDecReaderOverflow(s) != 0L)
break;
}
}
当读锁的释放操作较写锁复杂一些:它首先会判断其是否符合释放条件,如果符合条件,再接着判断读锁是否被扩容过。如果没有扩容过,就直接 CAS state
变量释放资源,成功后唤醒队列中第一个等待的线程;如果读锁被扩容过,则调用 tryDecReaderOverflow()
完成解锁。tryDecReaderOverflow()
方法实现如下:
private long tryDecReaderOverflow(long s) {
// assert (s & ABITS) >= RFULL;
if ((s & ABITS) == RFULL) {
if (U.compareAndSwapLong(this, STATE, s, s | RBITS)) {
int r; long next;
if ((r = readerOverflow) > 0) {
readerOverflow = r - 1;
next = s;
}
else
next = s - RUNIT;
state = next;
return next;
}
}
else if ((LockSupport.nextSecondarySeed() &
OVERFLOW_YIELD_RATE) == 0)
Thread.yield();
return 0L;
}
可见,减少容量操作与提高容量类似,都是通过对 state
变量进行 CAS 操作实现的,CAS 成功的线程继续操作 state
变量并返回其值,表示解锁成功;其他失败的线程均返回 0,表示解锁失败。