Semaphore 是 JUC 中基于 AQS 共享模式实现的用于并发控制的信号量。

使用示例

下面是一个 Semaphore 的简单使用示例:

  private static int n = 0;

public static void main(String[] args) {
    // 初始化十个信号量
    Semaphore semaphore = new Semaphore(10);
    for (int i = 0; i < 15; i++) {
        new Thread(() -> {
            try {
                semaphore.acquire();// 获取信号量的许可,如果获取不到则阻塞
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(++n);
        }).start();
    }
}
  

在以上示例中,我们创建了一个包含 10 个资源的信号量,并使用 15 个线程对其进行申请。当线程成功获取信号量后,不释放信号量直接退出,并打印出相关结果。

  2
6
8
5
3
1
4
10
9
7
  

从结果可以看出,成功获取信号量的线程数最大为 10,这意味着只有 10 个线程成功获取了信号量,剩下的五个线程被阻塞了。下面我们详细介绍 Semaphore 的 API。

API 简介

申请信号量

方法说明
acquire()

获取信号量中的一个许可证,如果有可用许可证并立即返回,可用许可证的数量减一。

如果没有可用的许可,则当前线程休眠,直到发生以下两种情况之一:

  • 其他线程为此信号量调用 release 方法,并且当前线程接下来要分配一个许可;
  • 或一些其他线程 Threadinterrupt 当前线程。

      如果当前线程在进入此方法时设置了中断状态;或者在等待许可时被 Threadinterrupt,就会抛出 InterruptedException 并清除当前线程的中断状态。

acquire(int permits)获取信号量中指定数量的许可,其他同 acquire()
acquireUninterruptibly()acquireUninterruptibly(int permits)阻塞状态下无视其他线程的中断信号
tryAcquire()尝试从信号量中获取一个许可,如果无可用许可,直接返回 false,不会阻塞
tryAcquire(int permits)尝试从信号量中获取指定数量的许可,如果无可用许可,直接返回 false,不会阻塞
tryAcquire(int permits, long timeout, TimeUnit unit)在指定的时间内尝试从信号量中获取指定数量的许可证,如果在指定的时间内获取成功,返回 true,否则返回 false

释放信号量

方法说明
release()release(int permits)释放一个许可、释放指定数量的许可

Semaphore Sync 同步器

state 变量

SemaphoreSync 同步器使用 state 变量表示可用信号的数量:

  abstract static class Sync extends AbstractQueuedSynchronizer {
    // 构造方法中初始化信号量初始值
    Sync(int permits) {
        setState(permits);
    }
    // 获取剩余信号量
    final int getPermits() {
        return getState();
    }

    //...
}
  

Sync 方法

nonfairTryAcquireShared()

Sync 同步器提供了一个默认的非公平信号量获取方法 nonfairTryAcquireShared(int)。源码如下:

  final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}
  

该方法首先获取当前信号量数量 state,然后用 state 减去请求的信号量数量 acquires,如果结果小于 0,则表示剩余信号量不足,直接返回这个差值;否则,使用 CAS 尝试获取信号量,如果尝试成功则返回剩余的信号量,如果失败则自旋重试。

tryReleaseShared()

tryReleaseShared(int)Sync 同步器中提供的信号量释放的实现:

  protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        int current = getState();
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        if (compareAndSetState(current, next))
            return true;
    }
}
  

该方法本质上也是通过调用 AQS 的 state 修改方法完成对信号量数量的更新。

reducePermits(int)、drainPermits()

reducePermits(int)drainPermits()Sync 同步器中提供的信号量删除方法,它们的实现本质上与信号量申请操作类似,只是代表的含义不同。

  // 删除指定数量的信号量
final void reducePermits(int reductions) {
    for (;;) {
        int current = getState();
        int next = current - reductions;
        if (next > current) // underflow
            throw new Error("Permit count underflow");
        if (compareAndSetState(current, next))
            return;
    }
}
// 删除全部信号量
final int drainPermits() {
    for (;;) {
        int current = getState();
        if (current == 0 || compareAndSetState(current, 0))
            return current;
    }
}
  

实现类 NonfairSync

NonfairSyncSemaphore 中的非公平同步器(默认),它继承自 Sync。源码如下:

  static final class NonfairSync extends Sync {
    // 构造方法
    NonfairSync(int permits) {
        super(permits);
    }
    // 仅调用 Sync 提供的 nonfairTryAcquireShared 实现
    protected int tryAcquireShared(int acquires) {
        return nonfairTryAcquireShared(acquires);
    }
}
  

NonfairSync 的实现非常简单,它重写了 AQS 共享模式下尝试获取资源的钩子函数 tryAcquireShared(int),其中调用了父类 Sync 提供的非公平实现的 nonfairTryAcquireShared 方法。

实现类 FairSync

FairSyncSemaphore 中的公平同步器,它也是 Sync 的子类。源码如下:

  static final class FairSync extends Sync {
    // 构造方法
    FairSync(int permits) {
        super(permits);
    }
    // 自己实现的公平锁
    protected int tryAcquireShared(int acquires) {
        for (;;) {
            // 如果当前节点有前驱节点,直接返回 -1
            if (hasQueuedPredecessors())
                return -1;
            // 如果当前节点已经是头节点,或者等待队列空了,开始获取信号量
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }
}
  

FairSync 也重写了 AQS 中的钩子函数 tryAcquireShared(int),并实现了自旋锁的公平性原则:在尝试获取信号量之前,该方法会先判断当前节点的前驱节点是否是头节点,或者等待队列是否为空。只有满足了这两个条件中的一个,该方法才会尝试获取信号量;否则直接返回 -1。

Semaphore 方法

构造方法

Semaphore 提供了两个构造方法:

  // 创建一个非公平的信号量,参数为许可证数量
public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}
// 通过 fair 参数控制是否公平
public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
  
  • 第一个构造方法只接受一个 int 类型的参数 permits,它表示初始的信号量数量,如果 permits 的值为 0,则所有线程都无法获取信号量。
  • 第二个构造方法除了初始信号量数量外,还可以指定同步控制器的公平性。如果指定为 true,则使用公平同步控制器,否则使用非公平同步控制器。

其他方法

Semaphore 中的其余方法基本都是通过调用 AQS 及实现类中的方法实现的。以下是其中一些方法的源码:

  // 申请 1 个信号量
public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

// 申请 permits 个信号量
public void acquire(int permits) throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly(permits);
}

// 释放 1 个信号量
public void release() {
    sync.releaseShared(1);
}

// 释放 permits 个信号量
public void release(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.releaseShared(permits);
}

...