Semaphore
Semaphore 是 JUC 中基于 AQS 实现的用于并发控制的信号量。本文我们将详细介绍 Semaphore 的 API 以及 acquire、release 方法的实现原理
Semaphore 是 JUC 中基于 AQS 共享模式实现的用于并发控制的信号量。
使用示例
下面是一个 Semaphore
的简单使用示例:
private static int n = 0;
public static void main(String[] args) {
// 初始化十个信号量
Semaphore semaphore = new Semaphore(10);
for (int i = 0; i < 15; i++) {
new Thread(() -> {
try {
semaphore.acquire();// 获取信号量的许可,如果获取不到则阻塞
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(++n);
}).start();
}
}
在以上示例中,我们创建了一个包含 10 个资源的信号量,并使用 15 个线程对其进行申请。当线程成功获取信号量后,不释放信号量直接退出,并打印出相关结果。
2
6
8
5
3
1
4
10
9
7
从结果可以看出,成功获取信号量的线程数最大为 10,这意味着只有 10 个线程成功获取了信号量,剩下的五个线程被阻塞了。下面我们详细介绍 Semaphore
的 API。
API 简介
申请信号量
方法 | 说明 | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|
acquire() | 获取信号量中的一个许可证,如果有可用许可证并立即返回,可用许可证的数量减一。 如果没有可用的许可,则当前线程休眠,直到发生以下两种情况之一:
|
释放信号量
方法 | 说明 |
---|---|
release() 、release(int permits) | 释放一个许可、释放指定数量的许可 |
Semaphore Sync 同步器
state 变量
Semaphore
的 Sync
同步器使用 state
变量表示可用信号的数量:
abstract static class Sync extends AbstractQueuedSynchronizer {
// 构造方法中初始化信号量初始值
Sync(int permits) {
setState(permits);
}
// 获取剩余信号量
final int getPermits() {
return getState();
}
//...
}
Sync 方法
nonfairTryAcquireShared()
Sync
同步器提供了一个默认的非公平信号量获取方法 nonfairTryAcquireShared(int)
。源码如下:
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
该方法首先获取当前信号量数量 state
,然后用 state
减去请求的信号量数量 acquires
,如果结果小于 0,则表示剩余信号量不足,直接返回这个差值;否则,使用 CAS 尝试获取信号量,如果尝试成功则返回剩余的信号量,如果失败则自旋重试。
tryReleaseShared()
tryReleaseShared(int)
是 Sync
同步器中提供的信号量释放的实现:
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
该方法本质上也是通过调用 AQS 的 state
修改方法完成对信号量数量的更新。
reducePermits(int)、drainPermits()
reducePermits(int)
和 drainPermits()
是 Sync
同步器中提供的信号量删除方法,它们的实现本质上与信号量申请操作类似,只是代表的含义不同。
// 删除指定数量的信号量
final void reducePermits(int reductions) {
for (;;) {
int current = getState();
int next = current - reductions;
if (next > current) // underflow
throw new Error("Permit count underflow");
if (compareAndSetState(current, next))
return;
}
}
// 删除全部信号量
final int drainPermits() {
for (;;) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
实现类 NonfairSync
NonfairSync
是 Semaphore
中的非公平同步器(默认),它继承自 Sync
。源码如下:
static final class NonfairSync extends Sync {
// 构造方法
NonfairSync(int permits) {
super(permits);
}
// 仅调用 Sync 提供的 nonfairTryAcquireShared 实现
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
NonfairSync
的实现非常简单,它重写了 AQS 共享模式下尝试获取资源的钩子函数 tryAcquireShared(int)
,其中调用了父类 Sync
提供的非公平实现的 nonfairTryAcquireShared
方法。
实现类 FairSync
FairSync
是 Semaphore
中的公平同步器,它也是 Sync
的子类。源码如下:
static final class FairSync extends Sync {
// 构造方法
FairSync(int permits) {
super(permits);
}
// 自己实现的公平锁
protected int tryAcquireShared(int acquires) {
for (;;) {
// 如果当前节点有前驱节点,直接返回 -1
if (hasQueuedPredecessors())
return -1;
// 如果当前节点已经是头节点,或者等待队列空了,开始获取信号量
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
FairSync
也重写了 AQS 中的钩子函数 tryAcquireShared(int)
,并实现了自旋锁的公平性原则:在尝试获取信号量之前,该方法会先判断当前节点的前驱节点是否是头节点,或者等待队列是否为空。只有满足了这两个条件中的一个,该方法才会尝试获取信号量;否则直接返回 -1。
Semaphore 方法
构造方法
Semaphore
提供了两个构造方法:
// 创建一个非公平的信号量,参数为许可证数量
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
// 通过 fair 参数控制是否公平
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
- 第一个构造方法只接受一个
int
类型的参数permits
,它表示初始的信号量数量,如果permits
的值为 0,则所有线程都无法获取信号量。 - 第二个构造方法除了初始信号量数量外,还可以指定同步控制器的公平性。如果指定为 true,则使用公平同步控制器,否则使用非公平同步控制器。
- 如果
permits
参数传入 1,那么Semaphore
将退化为排它锁; - 如果
permits
参数传入 0,那么Semaphore
相当于一个同步阻塞器。
其他方法
Semaphore
中的其余方法基本都是通过调用 AQS 及实现类中的方法实现的。以下是其中一些方法的源码:
// 申请 1 个信号量
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
// 申请 permits 个信号量
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
// 释放 1 个信号量
public void release() {
sync.releaseShared(1);
}
// 释放 permits 个信号量
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
...