ReentrantReadWriteLock 简介

ReentrantReadWriteLock 是 JUC 中基于 AQS 实现的用于并发控制的可重入读写锁,它实现了读-读操作之间的不互斥,理论上并发度相较 ReentrantLock 更高。

尽管在理论上 ReentrantReadWriteLock 具有更高的并发度,但由于其采用了悲观读的策略,性能并不理想。这也是后来 JDK 8 推出新型读写锁 StampedLock 的原因。

与读写锁相关的类图如下:

ReentrantReadWriteLock 类继承层次

ReentrantReadWriteLock 实现了 ReadWriteLock 接口,ReadWriteLock 是读锁和写锁的抽象接口,定义了以下两个方法:

  • Lock readLock():返回用于读取的锁;
  • Lock writeLock():返回用于写入的锁。

ReentrantReadWriteLock 内部有三个核心字段:ReadLockWriteLockSync

  // 读锁
private final ReentrantReadWriteLock.ReadLock readerLock;
// 写锁
private final ReentrantReadWriteLock.WriteLock writerLock;
// 基于 AQS 的同步器
final Sync sync;
  

这三个字段在 ReentrantReadWriteLock 的构造方法中被初始化:

  // 默认的非公平构造方法
public ReentrantReadWriteLock() {
    this(false);
}

// 通过参数 fair 指定公平性
public ReentrantReadWriteLock(boolean fair) {
    // 根据参数生成对应的 Sync 实现类对象
    sync = fair ? new FairSync() : new NonfairSync();
    // 将当前锁对象传入,构造读锁和写锁
    readerLock = new ReadLock(this);
    writerLock = new WriteLock(this);
}
  

WriteLock & ReadLock 视图

WriteLockReadLockReentrantReadWriteLock 中实现了 Lock 接口的内部类,分别表示读写锁的写锁视图读锁视图,它们对外提供了以下方法:

方法说明
lock()获取锁并阻塞当前线程,直到获取到锁为止
lockInterruptibly()获取锁并阻塞当前线程,直到获取到锁或被中断为止
unlock()释放锁
tryLock()尝试获取锁,如果获取成功则返回 true,否则返回 false
tryLock(long time, TimeUnit unit)尝试获取锁,并设置超时时间
newCondition()返回一个与该锁绑定的条件变量 (Condition),用于线程间的通信和控制

这些方法的实现都是通过调用 Sync 同步器中的方法来完成的。

WriteLock

WriteLock 内部维护了一个 Sync 类型的字段,在构造 ReentrantReadWriteLock 时会初始化写锁视图,传入一个 ReentrantReadWriteLock 实例,并将其中的 sync 赋值给这个字段,以便后续调用 Sync 中的方法来实现读写锁的相关操作。

  private final Sync sync;

protected WriteLock(ReentrantReadWriteLock lock) {
    sync = lock.sync;
}
  

WriteLock 中与锁相关的方法的实现都是通过 Sync 实现的:

  public void lock() {
    // 调用 AQS 方法,申请独占类型的资源
    // 方法内部会调用 tryAcquire,这里可以控制公平性
    sync.acquire(1);
}

public boolean tryLock( ) {
    // 直接通过 CAS 抢一次写锁,失败直接返回 false
    return sync.tryWriteLock();
}

public void unlock() {
    // 释放一个独占资源
    sync.release(1);
}

public Condition newCondition() {
    // 返回一个条件
    return sync.newCondition();
}

//...
  

ReadLock

ReadLockWriteLock 类似,只是调用的是 Sync 中的另一套方法,这里我们重点关注 newCondition 方法:

  public Condition newCondition() {
    throw new UnsupportedOperationException();
}
  

读锁不支持 Condition,所以方法中直接抛出了异常。

Sync 抽象类

state 变量

ReentrantReadWriteLockSync 同步器使用 state 变量的高 16 位记录读锁的资源总数,低 16 位记录写锁的重入次数。源代码中的几个静态常量可以具体说明这一点:

  abstract static class Sync extends AbstractQueuedSynchronizer {
    static final int SHARED_SHIFT   = 16;
    // 高 16 位的第一个值
    static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
    // 低 16 位全为 1 表示的无符号数,即 16 位无符号数的最大值 65535
    static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
    // 低 16 位(读锁)掩码
    static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
}
  

为此,Sync 内部提供了两个方法,用于快速获取读锁和写锁的资源数量。这些方法的实现很简单,就是一些位运算:

  // 返回读锁资源总数
static int sharedCount(int c) {
    // 将高 16 位移到低 16 位
    return c >>> SHARED_SHIFT;
}
// 返回写锁重入次数
static int exclusiveCount(int c) {
    // 用低 16 位掩码进行与运算
    return c & EXCLUSIVE_MASK; 
}
  

HoldCounter 内部类

HoldCounterSync 中的一个内部类,用于记录线程自身持有读锁的重入次数,每个线程都有一个对应的 HoldCounter 实例。

  // HoldCounter 内部类
static final class HoldCounter {
    // 记录重入次数
    int count = 0;
    // 通过 native 方法获取线程 ID,这里不使用引用是为了不影响 GC
    final long tid = getThreadId(Thread.currentThread());
}
// 缓存到 ThreadLocal
static final class ThreadLocalHoldCounter
    extends ThreadLocal<HoldCounter> {
    public HoldCounter initialValue() {
        return new HoldCounter();
    }
}
  

Sync 内部维护了以下四个与 HoldCounter 相关的字段:

  // 保存当前线程持有的可重入读锁的重入次数
private transient ThreadLocalHoldCounter readHolds;
// 表示最后一个成功获取 readLock 的线程的 HoldCounter
private transient HoldCounter cachedHoldCounter;
// 缓存第一个抢占到 readLock 的线程
private transient Thread firstReader = null;
// firstReader 的 HoldCounter 值
private transient int firstReaderHoldCount;
  
  • readHolds

    readHolds 是一个 ThreadLocal<HoldCounter> 类型的字段,用于保存当前线程持有的可重入读锁的重入次数,该变量仅在 Sync 构造方法和 Sync#readObject 方法中被初始化:

      // Sync 构造方法
    Sync() {
        // 初始化 HoldCounter 到线程本地缓存
        readHolds = new ThreadLocalHoldCounter();
        setState(getState()); // ensures visibility of readHolds
    }
    // 用于反序列化
    private void readObject(java.io.ObjectInputStream s)
            throws java.io.IOException, ClassNotFoundException {
        s.defaultReadObject();
        readHolds = new ThreadLocalHoldCounter();
        setState(0); // reset to unlocked state
    }
      
  • cachedHoldCounter

    cachedHoldCounter 字段指向最后一个成功获取读锁的线程的 HoldCounter 实例。当下一个要修改读锁的线程正好是当前最后一个获取到读锁的线程时,可以直接使用 cachedHoldCounter 获取当前线程的 HoldCounter,避免了对 ThreadLocal 进行查找。

    cachedHoldCounter 字段没有被 volatile 关键字修饰,因此不能保证线程可见性。然而,恰恰是这个机制,使得即便其他线程修改了这个字段,当前线程仍然有可能能够读取到上次自己缓存的 cachedHoldCounter,从而提高性能。

  • firstReader

    firstReader 表示最后将共享计数(state 的高 16 位)从 0 更改为 1 的唯一线程,且此后一直没有释放读锁。如果没有这样的线程,则 firstReader 指向 null。

    firstReader 也没有被 volatile 关键字修饰,但无需担心线程安全问题,因为该字段只会被 CAS 操作成功的线程进行赋值。具体实现请见下文。

  • firstReaderHoldCount

    int 型变量,用于记录 firstReader 的重入次数。

Sync 公平性控制

  • readerShouldBlock:判断获取读锁的线程是否应该被阻塞;
  • writerShouldBlock:判断获取写锁的线程是否应该被阻塞。

ReentrantLock 不同,ReentrantReadWriteLock.Sync 并没有将 tryAcquire 等钩子函数下放到实现类中去实现,而是自己提供了相应的实现,并通过 readerShouldBlockwriterShouldBlock 两个抽象方法区分公平性。这两个方法通过返回的布尔值来控制是否公平等待,具体实现由 FairSyncNonfairSync 两个实现类负责,详细内容在了解完 Sync 后再 总结

Sync 钩子函数

tryAcquire()

tryAcquire()独占模式下尝试获取资源(写锁)的 钩子函数,源码如下:

  protected final boolean tryAcquire(int acquires) {
    Thread current = Thread.currentThread();
    // 获取当前 state 变量值
    int c = getState();
    // 获取写锁重入次数
    int w = exclusiveCount(c);
    if (c != 0) {
        // 如果 state 值不为 0,但是写锁重入次数为 0,说明读锁占用,直接返回 false
        // 否则说明有写锁未释放,因此判断当前线程是否持有写锁
        //   如果不持有写锁,直接返回 false
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        // 如果当前线程持有写锁,确保重入次数不超限
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // 直接重入资源,并返回 true
        setState(c + acquires);
        return true;
    }
    // 如果 c == 0,表明锁未被占用,则通过 CAS 抢锁
    //  需要注意,如果公平策略认为当前线程应该被阻塞,则不会尝试抢锁
    if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires))
        return false;
    // 如果允许抢锁,且抢锁成功,则绑定当前线程与锁,并返回 true
    setExclusiveOwnerThread(current);
    return true;
}
  

该方法首先获取 state 变量值和写锁重入次数:

  • 如果 state不等于 0,表示锁已被使用,再做进一步判断:
    • 如果写锁重入次数为 0,表明当前存在读锁占用,由于读写互斥,所以直接返回 false
    • 否则说明写锁被占用,进而判断是否是当前线程持有写锁,如果未持有直接返回 false;如果持有再进行下一步判断:确保重入后的重入次数不超过限制,如果超过则抛出异常;如果未超过,则直接赋值,并返回 true。
  • 如果 state 值等于 0,说明读写锁均未被占用,进而调用 writerShouldBlock(),根据公平性决定是否可以抢锁:
    • 如果可以抢锁,则通过 CAS 抢锁,抢锁成功后绑定当前线程与锁的关系,然后返回 true;抢锁失败则直接返回 false。
    • 如果不可以抢锁,也直接返回 false。

tryAcquireShared()

tryAcquireShared() 是以共享模式获取资源(读锁)的 钩子函数该方法内部也仅通过 writerShouldBlock() 来控制公平性,主要涉及非公平模式下 firstReadercachedHoldCounter 的优化处理:

  protected final int tryAcquireShared(int unused) {
    Thread current = Thread.currentThread();
    // 获取当前 state 值
    int c = getState();
    // 如果写锁重入次数不为 0,并且当前线程并未持有写锁,则返回 -1
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;
    // 获取读锁资源总数
    int r = sharedCount(c);
    // 通过 readerShouldBlock 控制公平性
    // 如果策略允许获取读锁,并且重入次数没有超限,就通过 CAS 将读锁资源数+1
    if (!readerShouldBlock() &&
        r < MAX_COUNT &&
        compareAndSetState(c, c + SHARED_UNIT)) {
        // 如果 CAS 成功。且操作前读锁资源总数是 0
        // 也就是当前线程是最后一个让读锁资源数由 0 变为 1 的
        if (r == 0) {
            // 记录 firstReader
            firstReader = current;
            firstReaderHoldCount = 1;
        } else if (firstReader == current) {
            // 如果当前读锁已被使用,并且当前线程就是 firstReader
            // 则更新重入次数
            firstReaderHoldCount++;
        } else {
            // 如果前两个条件不符,则从 cache 中获取 HoldCounter
            HoldCounter rh = cachedHoldCounter;
            // 如果缓存不存在,或者缓存中的线程 id 不是当前线程 id
            //  则将当前线程本地的 HoldCounter,更新到 cache
            if (rh == null || rh.tid != getThreadId(current))
                cachedHoldCounter = rh = readHolds.get();
            // 如果存在缓存,并且缓存中的线程 id 就是当前线程 id
            //  则把缓存指向对象更新到 ThreadLocal
            else if (rh.count == 0)
                readHolds.set(rh);
            rh.count++;// 线程自己的重入次数 +1
        }
        return 1;
    }

    return fullTryAcquireShared(current);
}
  
  • 该方法首先获取 state 变量值和写锁重入次数,如果写锁重入次数不为 0,并且当前线程并未持有写锁,直接返回 -1。否则进行如下判断:
  • 获取读锁的资源总数,然后判断公平策略是否允许立刻获取锁:如果允许并且重入次数没有超限,则通过 CAS 操作将读锁的资源总数 +1。如果 CAS 操作成功,执行如下步骤:
    • 首先判断 CAS 更新操作前,读锁资源总数是否是 0:
      • 如果是 0,表示当前线程是最后一个将读锁资源总数由 0 设置为 1 的,因此将 firstReader 指向当前线程,并将 firstReaderHoldCount 设置为 1(这里通过 CAS 保证了线程安全)。
      • 如果读锁的资源总数不为 0,并且当前线程就是 firstReader,直接将 firstReaderHoldCount 加 1。
      • 如果读锁的资源总数不为 0,且当前线程并非 firstReader,则获取 cachedHoldCounter 缓存的 HoldCounter
        • 如果缓存为 null 或者 HoldCounter 中的线程 ID 并非当前线程的 ID,则将 cachedHoldCounter 更新为当前线程 ThreadLocal 中的 HoldCounter
        • 如果缓存不为 null,并且缓存中的线程 ID 就是当前线程 ID,并且缓存中的重入次数等于 0,则将缓存存入当前线程的 ThreadLocal 中。
      • 判断完成后,将 cachedHoldCounter 中的重入次数 +1。
    • 最终,方法返回 1。
  • 如果公平策略不允许立刻获取读锁,或者重入次数超过了限制,再或者 CAS 更新读锁重入次数失败,则调用 fullTryAcquireShared() 进行完整的资源获取,并将其返回值作为方法的返回值。

fullTryAcquireShared() 方法是共享模式下获取资源(即读锁)的完整实现,它主要用于公平模式下的读锁获取,以及在非公平模式下抢锁失败后的重试。源码如下:

  final int fullTryAcquireShared(Thread current) {
    HoldCounter rh = null;
    for (;;) {
        // 获取读锁资源总数
        int c = getState();
        // 如果写锁重入次数不为 0
        if (exclusiveCount(c) != 0) {
            // 且持有写锁的线程并非当前线程,返回 -1
            if (getExclusiveOwnerThread() != current)
                return -1;
        } else if (readerShouldBlock()) {// 如果公平策略认为需要被阻塞
            // 如果当前线程是 firstReader,放行
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
            } else {
                // 如果不是 firstReader,并且当前线程先前并未占用读锁资源,则返回 -1
                if (rh == null) {
                    rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current)) {
                        rh = readHolds.get();
                        if (rh.count == 0)
                            readHolds.remove();
                    }
                }
                if (rh.count == 0)
                    return -1;
            }
        }
        // 如果不需要被阻塞,继续判断:
        if (sharedCount(c) == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // 通过 CAS 将读锁总资源数 +1,失败则重试
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            // CAS 操作成功,根据情况更新 firstReader 和 cachedHoldCounter
            if (sharedCount(c) == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                // 先获取 cache
                if (rh == null)
                    rh = cachedHoldCounter;
                // 如果 cache 为空,或者 cache 中的线程非当前线程,则用 ThreadLocal 中的覆盖
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                // 如果 cache 不为空,并且线程就是当前线程,而且重入次数为 0,将其存入 ThreadLocal
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
                cachedHoldCounter = rh; // ThreadLocal 存入 cache
            }
            return 1;
        }
    }
}
  

该方法内部是一个死循环,核心实现与上文的 tryAcquireShared 类似,通过 CAS 将读锁资源总数 +1。如果操作成功,就根据当前的情况更新 firstReadercachedHoldCounter,否则不断重试。

这里我们重点关注公平策略认为需要阻塞线程的场景:如果需要被阻塞,先判断当前线程是否是 firstReader。如果是,则直接进行后续的 CAS 重入操作。否则,判断当前线程的读锁重入次数。如果为 0,则清空 ThreadLocal 并返回 -1。

tryRelease()

tryAcquire()独占模式下尝试释放资源的 钩子函数,源码如下:

  protected final boolean tryRelease(int releases) {
    // 当前线程不持有锁,则抛出异常
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    // 计算释放后剩余的资源数
    int nextc = getState() - releases;
    // 通过位运算更新释放后的资源值
    boolean free = exclusiveCount(nextc) == 0;
    // 如果写锁已经被彻底释放,解绑线程
    if (free)
        setExclusiveOwnerThread(null);
    // 更新资源数量
    setState(nextc);
    return free;
}
  

tryReleaseShared()

tryReleaseShared()共享模式下尝试释放资源的钩子函数,源码如下:

  protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    // 如果当前线程是 firstReader
    if (firstReader == current) {
        // 将 firstReader 的重入次数 -1
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
            firstReaderHoldCount--;
    } else { //否则,判断 cachedHoldCounter 或 ThreadLocal
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) { // 如果重入次数已经小于等于 1 了,就从 ThreadLocal 移除
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        --rh.count; //重入次数 -1
    }
    // 操作完 cache 和 firstReader,则不断通过 CAS 操作 state
    for (;;) {
        int c = getState();
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
            // Releasing the read lock has no effect on readers,
            // but it may allow waiting writers to proceed if
            // both read and write locks are now free.
            return nextc == 0;
    }
}
  

Sync 其他方法

  • tryReadLock() & tryWriteLock()

    与上文介绍的几个钩子函数不同,虽然 tryReadLock()tryWriteLock() 的功能也是通过 CAS 尝试获取资源,但它们并不会像钩子函数一样与 AQS 中的模板方法配合,只是单纯地用于尝试获取资源。

    • tryReadLock() 源码如下:

        final boolean tryReadLock() {
          Thread current = Thread.currentThread();
          for (;;) {
              int c = getState();
              // 如果写锁重入次数不为 0,但当前线程不持有写锁,返回 false
              if (exclusiveCount(c) != 0 &&
                  getExclusiveOwnerThread() != current)
                  return false;
              // 如果读锁资源总数超限,抛出异常
              int r = sharedCount(c);
              if (r == MAX_COUNT)
                  throw new Error("Maximum lock count exceeded");
              // 否则通过 CAS 将读锁资源总数 +1
              if (compareAndSetState(c, c + SHARED_UNIT)) {
                  if (r == 0) {
                      firstReader = current;
                      firstReaderHoldCount = 1;
                  } else if (firstReader == current) {
                      firstReaderHoldCount++;
                  } else {
                      HoldCounter rh = cachedHoldCounter;
                      if (rh == null || rh.tid != getThreadId(current))
                          cachedHoldCounter = rh = readHolds.get();
                      else if (rh.count == 0)
                          readHolds.set(rh);
                      rh.count++;
                  }
                  return true;
              }
              // CAS 操作失败则重试
          }
      }
        
    • tryWriteLock() 源码如下:

        final boolean tryWriteLock() {
          Thread current = Thread.currentThread();
          int c = getState();
          if (c != 0) {
              int w = exclusiveCount(c);
              if (w == 0 || current != getExclusiveOwnerThread())
                  return false;
              if (w == MAX_COUNT)
                  throw new Error("Maximum lock count exceeded");
          }
          // 通过 CAS 尝试获取写锁,失败直接返回 false
          if (!compareAndSetState(c, c + 1))
              return false;
          // CAS 操作成功,则绑定线程,并返回 true
          setExclusiveOwnerThread(current);
          return true;
      }
        

    可见,读锁加锁失败后会不断自旋重试,而写锁一旦 CAS 操作失败,直接返回 false。

  • isHeldExclusively():判断当前线程是否是持有写锁的线程。

  • getOwner():如果写锁重入次数为 0,直接返回 null,否则返回绑定线程。

  • getReadLockCount():计算读锁资源总数。

  • isWriteLocked():判断当前锁是否处于写锁模式。

  • getWriteHoldCount():判断当前线程的写锁重入次数,只有持有写锁的线程调用才生效,否则返回 0。

  • getReadHoldCount():获取当前线程自身的读锁重入次数。源码如下:

      final int getReadHoldCount() {
        // 获取写锁总资源数
        if (getReadLockCount() == 0)
            return 0;
        // 如果当前线程是 firstReader,返回 firstReaderHoldCount 加速
        Thread current = Thread.currentThread();
        if (firstReader == current)
            return firstReaderHoldCount;
        // 条件允许,从 cache 中加速获取
        HoldCounter rh = cachedHoldCounter;
        if (rh != null && rh.tid == getThreadId(current))
            return rh.count;
        // 加速失败,从 ThreadLocal 中获取
        int count = readHolds.get().count;
        // 发现重入次数为 0 了,顺便移除 ThreadLocal
        if (count == 0) readHolds.remove();
        return count;
    }
      

Sync 实现类 FairSync

FairSyncSync 的公平实现类,内部重写了 Sync 中定义的两个公平性策略,源码如下:

  static final class FairSync extends Sync {

    final boolean writerShouldBlock() {
        return hasQueuedPredecessors();
    }
    final boolean readerShouldBlock() {
        return hasQueuedPredecessors();
    }
}
  

writerShouldBlock()readerShouldBlock() 直接调用 AQS 中的 hasQueuedPredecessors 方法来判断同步队列是否为空,或者当前节点的前驱节点是否是同步队列的头节点:

  public final boolean hasQueuedPredecessors() {
    Node t = tail;
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}
  

只有同步队列为空,或者当前节点的前驱节点是同步队列的头节点时,才会允许当前线程尝试抢占锁。

Sync 实现类 NonfairSync

NonfairSyncSync 的非公平实现类,源码如下:

  static final class NonfairSync extends Sync {

    final boolean writerShouldBlock() {
        return false;
    }
    final boolean readerShouldBlock() {
        // 如果同步队列头节点是独占模式,则返回 true,进而阻塞线程
        return apparentlyFirstQueuedIsExclusive();
    }
}
  

非公平写锁可以直接争抢资源获取锁,所以 writerShouldBlock 直接返回 false。但非公平读锁不能采用这种方式,因为写锁比读锁有更高的优先级,所以当前队列中等待的第一个线程是写锁请求时,非公平读锁就得让步。

ReadWriteLock 的优缺点

ReentrantReadWriteLock 通过拆分 state 变量,实现了读写分离,进而实现了读与读操作之间的不互斥,在读多写少的场景中可以显著提升并发度。但它的读锁是一种“悲观锁”,会阻塞写操作,因此很容易因为读锁未释放而导致写锁饥饿