ReentrantReadWriteLock
本文详细介绍 ReentrantReadWriteLock 的源码实现,包括 WriteLock & ReadLock 视图、Sync 抽象类、FairSync、NoFairSync 实现类的原理
ReentrantReadWriteLock 简介
ReentrantReadWriteLock 是 JUC 中基于 AQS 实现的用于并发控制的可重入读写锁,它实现了读-读操作之间的不互斥,理论上并发度相较 ReentrantLock 更高。
尽管在理论上
ReentrantReadWriteLock
具有更高的并发度,但由于其采用了悲观读的策略,性能并不理想。这也是后来 JDK 8 推出新型读写锁 StampedLock 的原因。
与读写锁相关的类图如下:
![ReentrantReadWriteLock 类继承层次 ReentrantReadWriteLock 类继承层次](images/read-write-lock.webp)
ReentrantReadWriteLock
实现了 ReadWriteLock 接口,ReadWriteLock
是读锁和写锁的抽象接口,定义了以下两个方法:
Lock readLock()
:返回用于读取的锁;Lock writeLock()
:返回用于写入的锁。
ReentrantReadWriteLock
内部有三个核心字段:ReadLock、WriteLock 和 Sync:
// 读锁
private final ReentrantReadWriteLock.ReadLock readerLock;
// 写锁
private final ReentrantReadWriteLock.WriteLock writerLock;
// 基于 AQS 的同步器
final Sync sync;
这三个字段在 ReentrantReadWriteLock
的构造方法中被初始化:
// 默认的非公平构造方法
public ReentrantReadWriteLock() {
this(false);
}
// 通过参数 fair 指定公平性
public ReentrantReadWriteLock(boolean fair) {
// 根据参数生成对应的 Sync 实现类对象
sync = fair ? new FairSync() : new NonfairSync();
// 将当前锁对象传入,构造读锁和写锁
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
WriteLock & ReadLock 视图
WriteLock
和 ReadLock
是 ReentrantReadWriteLock
中实现了 Lock
接口的内部类,分别表示读写锁的写锁视图和读锁视图,它们对外提供了以下方法:
方法 | 说明 |
---|---|
lock() | 获取锁并阻塞当前线程,直到获取到锁为止 |
lockInterruptibly() | 获取锁并阻塞当前线程,直到获取到锁或被中断为止 |
unlock() | 释放锁 |
tryLock() | 尝试获取锁,如果获取成功则返回 true,否则返回 false |
tryLock(long time, TimeUnit unit) | 尝试获取锁,并设置超时时间 |
newCondition() | 返回一个与该锁绑定的条件变量 (Condition),用于线程间的通信和控制 |
这些方法的实现都是通过调用 Sync
同步器中的方法来完成的。
WriteLock
WriteLock
内部维护了一个 Sync
类型的字段,在构造 ReentrantReadWriteLock
时会初始化写锁视图,传入一个 ReentrantReadWriteLock
实例,并将其中的 sync
赋值给这个字段,以便后续调用 Sync
中的方法来实现读写锁的相关操作。
private final Sync sync;
protected WriteLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
WriteLock
中与锁相关的方法的实现都是通过 Sync
实现的:
public void lock() {
// 调用 AQS 方法,申请独占类型的资源
// 方法内部会调用 tryAcquire,这里可以控制公平性
sync.acquire(1);
}
public boolean tryLock( ) {
// 直接通过 CAS 抢一次写锁,失败直接返回 false
return sync.tryWriteLock();
}
public void unlock() {
// 释放一个独占资源
sync.release(1);
}
public Condition newCondition() {
// 返回一个条件
return sync.newCondition();
}
//...
ReadLock
ReadLock
与 WriteLock
类似,只是调用的是 Sync
中的另一套方法,这里我们重点关注 newCondition
方法:
public Condition newCondition() {
throw new UnsupportedOperationException();
}
读锁不支持 Condition,所以方法中直接抛出了异常。
Sync 抽象类
state 变量
ReentrantReadWriteLock
的 Sync
同步器使用 state 变量的高 16 位记录读锁的资源总数,低 16 位记录写锁的重入次数。源代码中的几个静态常量可以具体说明这一点:
abstract static class Sync extends AbstractQueuedSynchronizer {
static final int SHARED_SHIFT = 16;
// 高 16 位的第一个值
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
// 低 16 位全为 1 表示的无符号数,即 16 位无符号数的最大值 65535
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
// 低 16 位(读锁)掩码
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
}
为此,Sync
内部提供了两个方法,用于快速获取读锁和写锁的资源数量。这些方法的实现很简单,就是一些位运算:
// 返回读锁资源总数
static int sharedCount(int c) {
// 将高 16 位移到低 16 位
return c >>> SHARED_SHIFT;
}
// 返回写锁重入次数
static int exclusiveCount(int c) {
// 用低 16 位掩码进行与运算
return c & EXCLUSIVE_MASK;
}
HoldCounter 内部类
HoldCounter
是 Sync
中的一个内部类,用于记录线程自身持有读锁的重入次数,每个线程都有一个对应的 HoldCounter
实例。
// HoldCounter 内部类
static final class HoldCounter {
// 记录重入次数
int count = 0;
// 通过 native 方法获取线程 ID,这里不使用引用是为了不影响 GC
final long tid = getThreadId(Thread.currentThread());
}
// 缓存到 ThreadLocal
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}
state
变量高 16 位保存了所有线程的读锁重入总次数。因此,线程需要将自己的读锁重入次数保存在本地缓存中,否则无法得到自己的重入次数。Sync
内部维护了以下四个与 HoldCounter
相关的字段:
// 保存当前线程持有的可重入读锁的重入次数
private transient ThreadLocalHoldCounter readHolds;
// 表示最后一个成功获取 readLock 的线程的 HoldCounter
private transient HoldCounter cachedHoldCounter;
// 缓存第一个抢占到 readLock 的线程
private transient Thread firstReader = null;
// firstReader 的 HoldCounter 值
private transient int firstReaderHoldCount;
readHolds
readHolds
是一个ThreadLocal<HoldCounter>
类型的字段,用于保存当前线程持有的可重入读锁的重入次数,该变量仅在Sync
构造方法和Sync#readObject
方法中被初始化:// Sync 构造方法 Sync() { // 初始化 HoldCounter 到线程本地缓存 readHolds = new ThreadLocalHoldCounter(); setState(getState()); // ensures visibility of readHolds } // 用于反序列化 private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException { s.defaultReadObject(); readHolds = new ThreadLocalHoldCounter(); setState(0); // reset to unlocked state }
cachedHoldCounter
cachedHoldCounter
字段指向最后一个成功获取读锁的线程的HoldCounter
实例。当下一个要修改读锁的线程正好是当前最后一个获取到读锁的线程时,可以直接使用cachedHoldCounter
获取当前线程的HoldCounter
,避免了对ThreadLocal
进行查找。cachedHoldCounter
字段没有被volatile
关键字修饰,因此不能保证线程可见性。然而,恰恰是这个机制,使得即便其他线程修改了这个字段,当前线程仍然有可能能够读取到上次自己缓存的cachedHoldCounter
,从而提高性能。firstReader
firstReader
表示最后将共享计数(state
的高 16 位)从 0 更改为 1 的唯一线程,且此后一直没有释放读锁。如果没有这样的线程,则firstReader
指向 null。firstReader
也没有被volatile
关键字修饰,但无需担心线程安全问题,因为该字段只会被 CAS 操作成功的线程进行赋值。具体实现请见下文。firstReaderHoldCount
int
型变量,用于记录firstReader
的重入次数。
Sync 公平性控制
- readerShouldBlock:判断获取读锁的线程是否应该被阻塞;
- writerShouldBlock:判断获取写锁的线程是否应该被阻塞。
与 ReentrantLock 不同,ReentrantReadWriteLock.Sync
并没有将 tryAcquire
等钩子函数下放到实现类中去实现,而是自己提供了相应的实现,并通过 readerShouldBlock
和 writerShouldBlock
两个抽象方法区分公平性。这两个方法通过返回的布尔值来控制是否公平等待,具体实现由 FairSync
和 NonfairSync
两个实现类负责,详细内容在了解完 Sync
后再 总结。
Sync 钩子函数
tryAcquire()
tryAcquire()
是独占模式下尝试获取资源(写锁)的 钩子函数,源码如下:
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
// 获取当前 state 变量值
int c = getState();
// 获取写锁重入次数
int w = exclusiveCount(c);
if (c != 0) {
// 如果 state 值不为 0,但是写锁重入次数为 0,说明读锁占用,直接返回 false
// 否则说明有写锁未释放,因此判断当前线程是否持有写锁
// 如果不持有写锁,直接返回 false
if (w == 0 || current != getExclusiveOwnerThread())
return false;
// 如果当前线程持有写锁,确保重入次数不超限
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 直接重入资源,并返回 true
setState(c + acquires);
return true;
}
// 如果 c == 0,表明锁未被占用,则通过 CAS 抢锁
// 需要注意,如果公平策略认为当前线程应该被阻塞,则不会尝试抢锁
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
// 如果允许抢锁,且抢锁成功,则绑定当前线程与锁,并返回 true
setExclusiveOwnerThread(current);
return true;
}
该方法首先获取 state
变量值和写锁重入次数:
- 如果
state
值不等于 0,表示锁已被使用,再做进一步判断:- 如果写锁重入次数为 0,表明当前存在读锁占用,由于读写互斥,所以直接返回 false;
- 否则说明写锁被占用,进而判断是否是当前线程持有写锁,如果未持有直接返回 false;如果持有再进行下一步判断:确保重入后的重入次数不超过限制,如果超过则抛出异常;如果未超过,则直接赋值,并返回 true。
- 如果
state
值等于 0,说明读写锁均未被占用,进而调用writerShouldBlock()
,根据公平性决定是否可以抢锁:- 如果可以抢锁,则通过 CAS 抢锁,抢锁成功后绑定当前线程与锁的关系,然后返回 true;抢锁失败则直接返回 false。
- 如果不可以抢锁,也直接返回 false。
tryAcquireShared()
tryAcquireShared()
是以共享模式获取资源(读锁)的 钩子函数,该方法内部也仅通过 writerShouldBlock()
来控制公平性,主要涉及非公平模式下 firstReader
和 cachedHoldCounter
的优化处理:
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
// 获取当前 state 值
int c = getState();
// 如果写锁重入次数不为 0,并且当前线程并未持有写锁,则返回 -1
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
// 获取读锁资源总数
int r = sharedCount(c);
// 通过 readerShouldBlock 控制公平性
// 如果策略允许获取读锁,并且重入次数没有超限,就通过 CAS 将读锁资源数+1
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
// 如果 CAS 成功。且操作前读锁资源总数是 0
// 也就是当前线程是最后一个让读锁资源数由 0 变为 1 的
if (r == 0) {
// 记录 firstReader
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
// 如果当前读锁已被使用,并且当前线程就是 firstReader
// 则更新重入次数
firstReaderHoldCount++;
} else {
// 如果前两个条件不符,则从 cache 中获取 HoldCounter
HoldCounter rh = cachedHoldCounter;
// 如果缓存不存在,或者缓存中的线程 id 不是当前线程 id
// 则将当前线程本地的 HoldCounter,更新到 cache
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
// 如果存在缓存,并且缓存中的线程 id 就是当前线程 id
// 则把缓存指向对象更新到 ThreadLocal
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;// 线程自己的重入次数 +1
}
return 1;
}
return fullTryAcquireShared(current);
}
- 该方法首先获取
state
变量值和写锁重入次数,如果写锁重入次数不为 0,并且当前线程并未持有写锁,直接返回 -1。否则进行如下判断: - 获取读锁的资源总数,然后判断公平策略是否允许立刻获取锁:如果允许并且重入次数没有超限,则通过 CAS 操作将读锁的资源总数 +1。如果 CAS 操作成功,执行如下步骤:
- 首先判断 CAS 更新操作前,读锁资源总数是否是 0:
- 如果是 0,表示当前线程是最后一个将读锁资源总数由 0 设置为 1 的,因此将
firstReader
指向当前线程,并将firstReaderHoldCount
设置为 1(这里通过 CAS 保证了线程安全)。 - 如果读锁的资源总数不为 0,并且当前线程就是
firstReader
,直接将firstReaderHoldCount
加 1。 - 如果读锁的资源总数不为 0,且当前线程并非
firstReader
,则获取cachedHoldCounter
缓存的HoldCounter
。- 如果缓存为 null 或者
HoldCounter
中的线程 ID 并非当前线程的 ID,则将cachedHoldCounter
更新为当前线程ThreadLocal
中的HoldCounter
。 - 如果缓存不为 null,并且缓存中的线程 ID 就是当前线程 ID,并且缓存中的重入次数等于 0,则将缓存存入当前线程的
ThreadLocal
中。
- 如果缓存为 null 或者
- 判断完成后,将
cachedHoldCounter
中的重入次数 +1。
- 如果是 0,表示当前线程是最后一个将读锁资源总数由 0 设置为 1 的,因此将
- 最终,方法返回 1。
- 首先判断 CAS 更新操作前,读锁资源总数是否是 0:
- 如果公平策略不允许立刻获取读锁,或者重入次数超过了限制,再或者 CAS 更新读锁重入次数失败,则调用
fullTryAcquireShared()
进行完整的资源获取,并将其返回值作为方法的返回值。
fullTryAcquireShared()
方法是共享模式下获取资源(即读锁)的完整实现,它主要用于公平模式下的读锁获取,以及在非公平模式下抢锁失败后的重试。源码如下:
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
// 获取读锁资源总数
int c = getState();
// 如果写锁重入次数不为 0
if (exclusiveCount(c) != 0) {
// 且持有写锁的线程并非当前线程,返回 -1
if (getExclusiveOwnerThread() != current)
return -1;
} else if (readerShouldBlock()) {// 如果公平策略认为需要被阻塞
// 如果当前线程是 firstReader,放行
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
// 如果不是 firstReader,并且当前线程先前并未占用读锁资源,则返回 -1
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
// 如果不需要被阻塞,继续判断:
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 通过 CAS 将读锁总资源数 +1,失败则重试
if (compareAndSetState(c, c + SHARED_UNIT)) {
// CAS 操作成功,根据情况更新 firstReader 和 cachedHoldCounter
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
// 先获取 cache
if (rh == null)
rh = cachedHoldCounter;
// 如果 cache 为空,或者 cache 中的线程非当前线程,则用 ThreadLocal 中的覆盖
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
// 如果 cache 不为空,并且线程就是当前线程,而且重入次数为 0,将其存入 ThreadLocal
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // ThreadLocal 存入 cache
}
return 1;
}
}
}
该方法内部是一个死循环,核心实现与上文的 tryAcquireShared
类似,通过 CAS 将读锁资源总数 +1。如果操作成功,就根据当前的情况更新 firstReader
和 cachedHoldCounter
,否则不断重试。
这里我们重点关注公平策略认为需要阻塞线程的场景:如果需要被阻塞,先判断当前线程是否是 firstReader
。如果是,则直接进行后续的 CAS 重入操作。否则,判断当前线程的读锁重入次数。如果为 0,则清空 ThreadLocal
并返回 -1。
tryRelease()
tryAcquire()
是独占模式下尝试释放资源的 钩子函数,源码如下:
protected final boolean tryRelease(int releases) {
// 当前线程不持有锁,则抛出异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 计算释放后剩余的资源数
int nextc = getState() - releases;
// 通过位运算更新释放后的资源值
boolean free = exclusiveCount(nextc) == 0;
// 如果写锁已经被彻底释放,解绑线程
if (free)
setExclusiveOwnerThread(null);
// 更新资源数量
setState(nextc);
return free;
}
tryReleaseShared()
tryReleaseShared()
是共享模式下尝试释放资源的钩子函数,源码如下:
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
// 如果当前线程是 firstReader
if (firstReader == current) {
// 将 firstReader 的重入次数 -1
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else { //否则,判断 cachedHoldCounter 或 ThreadLocal
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) { // 如果重入次数已经小于等于 1 了,就从 ThreadLocal 移除
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count; //重入次数 -1
}
// 操作完 cache 和 firstReader,则不断通过 CAS 操作 state
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}
Sync 其他方法
tryReadLock() & tryWriteLock()
与上文介绍的几个钩子函数不同,虽然
tryReadLock()
和tryWriteLock()
的功能也是通过 CAS 尝试获取资源,但它们并不会像钩子函数一样与 AQS 中的模板方法配合,只是单纯地用于尝试获取资源。tryReadLock()
源码如下:final boolean tryReadLock() { Thread current = Thread.currentThread(); for (;;) { int c = getState(); // 如果写锁重入次数不为 0,但当前线程不持有写锁,返回 false if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return false; // 如果读锁资源总数超限,抛出异常 int r = sharedCount(c); if (r == MAX_COUNT) throw new Error("Maximum lock count exceeded"); // 否则通过 CAS 将读锁资源总数 +1 if (compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return true; } // CAS 操作失败则重试 } }
tryWriteLock()
源码如下:final boolean tryWriteLock() { Thread current = Thread.currentThread(); int c = getState(); if (c != 0) { int w = exclusiveCount(c); if (w == 0 || current != getExclusiveOwnerThread()) return false; if (w == MAX_COUNT) throw new Error("Maximum lock count exceeded"); } // 通过 CAS 尝试获取写锁,失败直接返回 false if (!compareAndSetState(c, c + 1)) return false; // CAS 操作成功,则绑定线程,并返回 true setExclusiveOwnerThread(current); return true; }
可见,读锁加锁失败后会不断自旋重试,而写锁一旦 CAS 操作失败,直接返回 false。
isHeldExclusively():判断当前线程是否是持有写锁的线程。
getOwner():如果写锁重入次数为 0,直接返回 null,否则返回绑定线程。
getReadLockCount():计算读锁资源总数。
isWriteLocked():判断当前锁是否处于写锁模式。
getWriteHoldCount():判断当前线程的写锁重入次数,只有持有写锁的线程调用才生效,否则返回 0。
getReadHoldCount():获取当前线程自身的读锁重入次数。源码如下:
final int getReadHoldCount() { // 获取写锁总资源数 if (getReadLockCount() == 0) return 0; // 如果当前线程是 firstReader,返回 firstReaderHoldCount 加速 Thread current = Thread.currentThread(); if (firstReader == current) return firstReaderHoldCount; // 条件允许,从 cache 中加速获取 HoldCounter rh = cachedHoldCounter; if (rh != null && rh.tid == getThreadId(current)) return rh.count; // 加速失败,从 ThreadLocal 中获取 int count = readHolds.get().count; // 发现重入次数为 0 了,顺便移除 ThreadLocal if (count == 0) readHolds.remove(); return count; }
Sync 实现类 FairSync
FairSync
是 Sync
的公平实现类,内部重写了 Sync
中定义的两个公平性策略,源码如下:
static final class FairSync extends Sync {
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
}
writerShouldBlock()
和 readerShouldBlock()
直接调用 AQS 中的 hasQueuedPredecessors
方法来判断同步队列是否为空,或者当前节点的前驱节点是否是同步队列的头节点:
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
只有同步队列为空,或者当前节点的前驱节点是同步队列的头节点时,才会允许当前线程尝试抢占锁。
Sync 实现类 NonfairSync
NonfairSync
是 Sync
的非公平实现类,源码如下:
static final class NonfairSync extends Sync {
final boolean writerShouldBlock() {
return false;
}
final boolean readerShouldBlock() {
// 如果同步队列头节点是独占模式,则返回 true,进而阻塞线程
return apparentlyFirstQueuedIsExclusive();
}
}
非公平写锁可以直接争抢资源获取锁,所以 writerShouldBlock
直接返回 false。但非公平读锁不能采用这种方式,因为写锁比读锁有更高的优先级,所以当前队列中等待的第一个线程是写锁请求时,非公平读锁就得让步。
ReadWriteLock 的优缺点
ReentrantReadWriteLock
通过拆分 state
变量,实现了读写分离,进而实现了读与读操作之间的不互斥,在读多写少的场景中可以显著提升并发度。但它的读锁是一种“悲观锁”,会阻塞写操作,因此很容易因为读锁未释放而导致写锁饥饿。