Java 网络 I/O 模型详解 (Reactor & Proactor)
网络 I/O 模型用于描述网络数据传输时,Socket 与处理线程之间的交互方式,不同的网络 I/O 模型适用于不同的应用场景和需求。
基础 I/O 模型
基础 I/O 模型业界一般称其为 “BIO”。该模型中有一个 Acceptor 线程阻塞监听连接事件,一旦收到客户端的连接请求,就会为该客户端创建一个独立的服务线程以进行通信,每个服务线程需要独自管理自己的 Socket。通常情况下,该线程会按顺序执行读取、计算和回复等一系列步骤。其架构图如下:
![基础 I/O 模型 基础 I/O 模型](/blog/posts/java-network-io-model/images/blocking-io.webp)
然而,这种模型消耗了大量的系统资源:每当有新的连接出现时,都必须新建一个线程来处理请求,而每个线程在完成任务后都必须被销毁。这种频繁的线程创建与销毁将给系统带来沉重的负担,严重降低系统的吞吐量。为了缓解这个问题,我们可以在该架构的基础上引入线程池:
![基础 I/O 模型 + 线程池](/blog/posts/java-network-io-model/images/blocking-io-thread-pool.webp)
服务端收到客户端连接请求后,将任务交给线程池,由线程池分配工作线程执行并在完成后归还,有效避免了频繁创建与销毁线程,提升了系统吞吐量。但由于基础 I/O 模型需要每个线程独自处理自己的 Socket,因而在请求量过大、线程数过高的情况下,线程池频繁切换线程很容易导致 load 升高,当面对十万级甚至百万级连接请求时,基础 I/O 模型就无能为力了。
为了解决这个问题,设计者们利用了批量处理的思想:将多个 Socket 统一交给一个“管理线程”处理,这便是接下来要介绍的多路复用 I/O 模型。
多路复用 I/O 模型
多路复用 I/O 模型业界一般称其为 “NIO”,从 JDK 1.4 开始引入,并在后续的版本中不断改进,直至 JDK 1.8 趋于完善,其核心架构如下:
![多路复用 I/O 模型](/blog/posts/java-network-io-model/images/nio.webp)
它引入了诸如 Channel、Selector 和 Buffer 等概念,这些概念有效地封装了操作系统层面的 I/O 接口。以 Linux 平台为例,JDK 1.4 使用了 POSIX 标准的 select
API(此时 Linux 还没引入 epoll),通过 Selector
对象来集中监控多个 I/O 事件,从而避免了每个线程独立处理 Socket 所带来的问题。此外,自 JDK 1.5 起,还添加了对 epoll 的支持,进一步提升了在 Linux 系统上的 I/O 性能。
Reactor 多路复用模型
Reactor 即“反应器”模型,反应器核心首先阻塞等待网络事件的到达,一旦事件发生,就通过事件驱动的方式继续进行后续的处理工作。以下是 Reactor 模型中的五个重要角色:
- Handle (句柄或描述符):它是资源在操作系统层面的一种抽象,表示与事件绑定了的资源,即各种
SocketChannel
。 - Synchronous Event Demultiplexer (同步事件分发器):Handle 代表的事件会被注册到同步事件分发器上,当事件就绪时,Demultiplexer 会将就绪的事件提交给 Reactor。
- Demultiplexer 的本质是一个系统调用,用于等待事件的发生。调用方在调用它后会被阻塞,一直阻塞到 Demultiplexer 上有事件就绪为止。
- 在 Linux 中,同步事件分发器指的就是 I/O 多路复用器,比如
select
、poll
、epoll
等,Java NIO 中的Selector
就是对多路复用器的封装。
- Reactor (反应器):事件管理的接口,内部使用 Synchronous Event Demultiplexer 注册、注销 Event Handler,当有事件进入"就绪"状态时,调用注册事件的回调函数处理事件。
- Event Handler (事件处理器接口):事件处理程序提供了一组接口,在 Reactor 监听到相应的事件发生时调用,执行相应的事件处理。
- 比如当 Channel 被注册到 Selector 时的回调方法、连接事件发生时的回调方法、写事件发生时的回调方法等都是事件处理器,我们可以实现这些回调来达到对某一事件进行特定反馈的目的。
- 原生的 Java 并不支持 Event Handler,实际业务中需要自己实现,或使用 Netty 等网络框架。
- Concrete Event Handler (事件处理器实现):它是 Event Handler 的实现类,用于实现回调方法指定的业务逻辑。
Reactor 模型有三种实现方式,分别是:单 Reactor 单线程模型、单 Reactor 多线程模型和主从 Reactor 多线程模型。
单 Reactor 单线程模型
单 Reactor 单线程模型指设计中只有一个 Reactor,无论是与 I/O 读写相关,还是与 I/O 无关的编解码和计算,都在一个线程上完成。其架构图如下所示:
![单 Reactor 单线程模型](/blog/posts/java-network-io-model/images/single-reactor-shingle-thread.webp)
在上图中:
- Acceptor 专门处理连接事件,而 Selector 则充当同步事件分发器。
- 客户端的请求可以分为连接请求和其他事件请求两种。
- Selector 上注册了一系列的 Channel,它不断监听这些 Channel。
- 一旦某个 Channel 上的事件处理器就绪,Selector 就会将该事件分发给事件处理器。
该模型仅依靠单线程处理请求,主循环承担了太多的任务,容易在高并发情境下造成请求积压甚至超时。此外,单线程无法有效利用多核资源。因此,更合适的做法是为解码、计算和编码操作引入额外的线程,并使用线程池进行管理。
单 Reactor 多线程模型
单 Reactor 多线程模型是指仅有一个线程负责执行 I/O 操作和处理连接请求,其他逻辑均由 Worker 线程执行。其架构图如下:
![单 Reactor 多线程模型](/blog/posts/java-network-io-model/images/single-reactor-multi-thread.webp)
与第一种模型相比,单 Reactor 多线程模型将业务逻辑委托给线程池来处理,从而可以更有效地利用多核 CPU 资源。然而,单个线程的 Reactor 仍负责监听和响应所有事件,这意味着当 Reactor 处理读写事件时,其他客户端的连接操作可能无法得到及时处理。因此,主从 Reactor 多线程模型应运而生。
主从 Reactor 多线程模型
该模型将处理连接事件的 Reactor 与处理读写事件的 Reactor 分离,避免了读写事件较为频繁的情况下影响新客户端连接。
![主从 Reactor 多线程模型](/blog/posts/java-network-io-model/images/multi-reactor-multi-thread.webp)
主从 Reactor 多线程模型中存在多个 Reactor,Main-Reactor 一般只有一个,它负责监听和处理连接请求;而 Sub-Reactor 可以有多个,用线程池进行管理,主要负责监听和处理读写事件等。当然 Main Reactor 也可以多个,也通过线程池管理,但是这样会增加系统复杂度,需要合理规划调度,否则反而会拖累性能。
Java NIO API
Buffer (缓冲区)
Buffer
本质上是可读可写的内存块,它提供了简化内存操作的方法,并通过属性记录缓冲区的状态变化。
java.nio.Buffer 类及其子类
![Buffer 类及其子类](/blog/posts/java-network-io-model/images/java-nio-buffer.webp)
java.nio.Buffer
有很多实现类,例如:ByteBuffer
、CharBuffer
、LongBuffer
等,分别用于处理不同的数据类型,以提高性能。
缓冲区对象创建
以上所有类型的 Buffer
都支持以下两种方法创建:
方法名 | 说明 |
---|---|
allocate() | 创建一个新 buffer |
wrap(double[] array, ...) | 根据现有内容创建一个缓冲区 |
其中 ByteBuffer
比较特殊,它有两种不同的缓冲区:
- 直接缓冲区:在系统内核缓冲中分配的缓冲区,通过
allocateDirect()
方法分配,可以直接操作 JVM 堆外内存; - 非直接缓冲区:普通的 JVM 堆内缓冲区,通过
allocate()
方法分配。
向缓冲区添加数据
方法名 | 说明 |
---|---|
XxxBuffer put(..) | 向各类 Buffer 中添加数据 |
int position() /Buffer position(int newPosition) | Buffer 基类规定的方法,用于获得当前要操作的索引/修改当前要操作的索引位置 |
int limit() /Buffer limit(int newLimit) | Buffer 基类规定的方法,用于查询最多能操作到哪个索引/修改最多能操作的索引位置 |
int capacity() | Buffer 基类规定的方法,返回缓冲区的总长度 |
int remaining() /boolean hasRemaining() | Buffer 基类规定的方法,查询还有多少能操作的索引/查询是否还能操作 |
写操作图解:
![nio buffer 写操作图解](/blog/posts/java-network-io-model/images/nio-buffer-write.webp)
读取缓冲区数据
方法名 | 说明 |
---|---|
get() | 读取一个单位类型数据 |
flip() | 反转缓冲区,将 limit 设置为 position ,再将 position 设为 0常用于写入数据后将 Buffer 切换为读模式 |
get(int index) | 读指定索引处的单位数据 |
rewind() | 将 position 置为 0,用于重复读取 |
clear() | 初始化缓冲区,将 position 设为 0,limit 设置为最大容量capacity ,同时保留 Buffer 内的数据常用于读取数据后将 Buffer 切换为写模式 |
array() | 将缓冲区转换成数组 char[] 返回 |
flip()
方法图解:
![nio buffer flip() 方法图解](/blog/posts/java-network-io-model/images/nio-buffer-flip.webp)
clear()
方法图解:
![nio buffer clear() 方法图解](/blog/posts/java-network-io-model/images/nio-buffer-clear.webp)
Channel (通道)
Channel
是一个全双工读写通道,同时支持阻塞和非阻塞模式。它类似于 I/O 流,但也有一些不同之处:
Channel
可读可写全双工,而流一般来说是单向的,需要区分输入流和输出流;Channel
支持异步读写;Channel
总是基于 Buffer 读写。
java.nio.channels
中提供了四类 Channel
,分别是:
XxxSocketChannel
:用于客户端 TCP 操作;XxxServerSocketChannel
:用于服务端 TCP 操作;DatagramChannel
:用于 UDP 操作。XxxFileChannel
:用于文件操作;
Selector (选择器)
Selector
用于持续轮询注册在其上的 Channel
,以选择并分发已处理的就绪事件。多路复用 I/O 模型里的事件有以下四种:
- 连接事件;
- 接收事件;
- 可读事件;
- 可写事件。
Selector
可以同时轮训和监控多个 Channel
,当 Selector
发现某个 Channel
的数据状态发生变化时,会通过 SelectorKey
触发相关事件,并由监听此事件的事件处理器来执行相关逻辑。其常用 API 如下:
java.nio.Selector
抽象类:方法名 说明 Selector open()
获取一个 Selector 对象 int select()
阻塞监控所有注册的 Channel,当有对应事件发生,会将 SelectorKey
放入集合内部并返回事件数量int select(long timeout)
带超时的阻塞监听 selectedKeys()
返回存有 SelectorKey
的集合java.nio.channels.SelectionKey
抽象类:方法名 说明 对应事件属性 isAcceptable()
是否是连接继续事件 SelectionKey.OP_ACCEPT
isConnectable()
是否是连接就绪事件 SelectionKey.OP_CONNECT
isReadable()
是否是可读事件 SelectionKey.OP_READ
isWritable()
是否是可写事件 SelectionKey.OP_WRITE
单 Reactor 单线程模型代码示例
简单起见,本文只介绍如何实现单 Reactor 单线程模型。
服务端实现
我们首先构建服务端的核心部分,即构造方法和 main
方法。这两个关键方法将用于启动服务器并激活 Reactor 反应器:
|
|
接下来,我们将专注于 Reactor 类的实现。考虑到该模型的依赖性,这里将 Reactor 设计为内部类。同时,为了简化代码,我们将事件处理器的实现直接嵌入到了 Reactor 中:
|
|
客户端实现
|
|
由于篇幅原因,测试结果省略。
异步 I/O 模型
异步 I/O 模型业界一般称其为 “AIO”,从 JDK 1.7 开始支持。在该模型中,当后台数据处理完成时,内核会通知相应的线程直接获取已经处理好的数据(这是异步与同步之间最本质的区别,即内核返回的是可读事件通知,还是已经处理好的数据),并继续执行后续的操作。
Windows 提供了一套完整的支持 Socket 异步编程的接口 —— IOCP(JDK 中由 sun.nio.ch.Iocp 封装),它是真正意义上的由操作系统实现的异步 I/O。
然而,Linux 内核并不支持异步网络 I/O,因此 Linux 平台上的 JDK 中的异步 API 是由 JVM 在用户态模拟出来的。详情可以参考源码 sun.nio.ch.EPollPort(还是封装的 epoll),本文就不展开了。
不过,下文在介绍异步概念时,我们仍以抽象内核为主体,而暂时忽略 Linux 平台的特性。
Proactor 异步模型
Proactor 模型是与 Reactor 对标的异步模型。该模型有如下六个角色:
- Handle:同 Reactor 模型的 Handle,表示与事件绑定了的资源,即
AsynchronousServerSocketChannel
、AsynchronousSocketChannel
等异步管道。 - Asynchronous Operation Processor (异步操作处理器):由操作系统内核实现,负责执行相关事件的 I/O 操作。
- Proactor (前摄器):由操作系统内核实现,负责管理事件循环。它通过 Async Operation Processor 来执行 I/O 操作,当事件完成时,Proactor 会调用相应的完成事件处理器(Completion Event Handler)来处理完成的事件。
- Completion Event Queue (完成事件队列):由操作系统内核实现,Async Operation Processor 执行完的 I/O 操作结果会放入该队列,Proactor 会从该队列中获得相应的结果。
- Completion Event Handler (完成事件接口):完成事件处理器抽象层,一般是由回调函数组成的接口,例如 Java 中的
CompletionHandler
。 - Concrete Completion Event Handler:完成事件处理器的具体实现。
Proactor 模型与 Reactor 模型很相似,也会进行事件分发,与 Reactor 不同的是,它注册的并不是就绪事件,而是完成事件。Reactor 模型需要应用程序自己处理 I/O 操作,而 Proactor 模型则是由内核线程处理,当执行事件处理器时,Reactor 模型下的 I/O 操作还没有完成,只是就绪,而在 Proactor 模型下 I/O 操作已经完成。
Java AIO API
AsynchronousChannel
![Java AsynchronousChannel](/blog/posts/java-network-io-model/images/java-async.webp)
以上是 Java 在 java.nio.channels
包中新增的与异步相关的类,我们重点关注其中的 AsynchronousServerSocketChannel
、AsynchronousChannelGroup
和 CompletionHandler
,其 API 如下:
java.nio.channels.AsynchronousServerSocketChannel
接口:方法 说明 bind
将 channel 的 socket 绑定到目标地址,并将 socket 配置为侦听连接 <A> void accept(A attachment, CompletionHandler<AsynchronousSocketChannel,? super A> handler)
接收来自客户端的连接,同时注册连接完毕事件处理器 CompletionHandler
Future<AsynchronousSocketChannel> accept()
接收来自客户端的连接,并返回一个 Future
句柄java.nio.channels.AsynchronousChannelGroup
抽象类:为异步 Channel 提供线程池支持。
方法 说明 AsynchronousChannelGroup withFixedThreadPool(int nThreads, ThreadFactory threadFactory)
创建具有固定线程池的异步 Channel Group AsynchronousChannelGroup withCachedThreadPool(ExecutorService executor, int initialSize)
使用给定的线程池创建异步 Channel Group,该线程池根据需要创建新线程。通过 initialSize
控制初始线程数AsynchronousChannelGroup withThreadPool(ExecutorService executor)
使用给定的线程池创建异步 Channel Group,内部调用的 withCachedThreadPool(executor, 0)
java.nio.channels.CompletionHandler
接口:方法 说明 void completed(V result, A attachment)
操作完成时的回调 void failed(Throwable exc, A attachment)
操作失败时的回调
AsynchronousServerSocketChannel
是 Java 异步 I/O 的核心,它将监听来自客户端的请求,进而选择将完成事件以 CompletionHandler
或 Future
的形式提供给上层应用。
其中,“完成式模型” CompletionHandler
比较好理解,他们都是 java.nio
体系结构下的,而“将来式模型” Future
则属于 JUC 体系结构,其 API 如下:
方法 | 说明 | ||||||||
---|---|---|---|---|---|---|---|---|---|
boolean cancel(boolean mayInterruptIfRunning) | 尝试取消此任务的执行。mayInterruptIfRunning 参数确定是否应中断执行此任务的线程以尝试停止任务。boolean isCancelled() 正常终止、异常或取消状态都返回 true | V get() 阻塞等待任务的执行结果,可以被中断 | V get(long timeout, TimeUnit unit) 阻塞一定时间,等待任务执行结果 | boolean isDone() 判断任务是否终止(成功、异常、取消状态都算) | |
Future
是 JUC 工具包中的一个强大接口,主要用于并发任务管理、结果获取和任务取消等场景。
Proactor 模型代码示例
接下来,让我们通过 CompletionHandler
实现一个简易的 Proactor 服务器,并用 Future
实现客户端进行测试。
服务端实现
首先,我们着手实现 main
方法。这个方法承担着与内核中的 Proactor、完成事件处理器等关键组件进行交互的职责,同时负责注册连接完成事件处理器:
|
|
接下来,我们将专注于不同类型的完成事件处理器的实现,这些处理器都会遵循 CompletionHandler
接口的规范:
- 连接成功事件处理器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
static class AcceptCompletionHandler implements CompletionHandler<AsynchronousSocketChannel, Void> { private final AsynchronousServerSocketChannel serverChannel; public AcceptCompletionHandler(AsynchronousServerSocketChannel serverChannel) { this.serverChannel = serverChannel; } @Override public void completed(AsynchronousSocketChannel clientChannel, Void attachment) { // 连接事件处理成功后,监听可读事件 CompletionHandler<Integer, ByteBuffer> readHandler = new ReadCompletionHandler(clientChannel); ByteBuffer buffer = ByteBuffer.allocate(1024); clientChannel.read(buffer, buffer, readHandler); // 通知客户端接受连接 serverChannel.accept(null, this); } @Override public void failed(Throwable exc, Void attachment) { System.err.println("connect failed: " + exc.getMessage()); } }
- 读取完毕事件处理器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
static class ReadCompletionHandler implements CompletionHandler<Integer, ByteBuffer> { private final AsynchronousSocketChannel clientChannel; public ReadCompletionHandler(AsynchronousSocketChannel clientChannel) { this.clientChannel = clientChannel; } @Override public void completed(Integer bytesRead, ByteBuffer buffer) { if (bytesRead > 0) { buffer.flip(); byte[] data = new byte[bytesRead]; buffer.get(data); String receivedData = new String(data); System.out.println("receive msg from client: " + receivedData); // 接收请求并返回响应 String response = "hello, client!"; ByteBuffer responseBuffer = ByteBuffer.wrap(response.getBytes()); CompletionHandler<Integer, ByteBuffer> writeHandler = new WriteCompletionHandler(clientChannel); clientChannel.write(responseBuffer, responseBuffer, writeHandler); } } @Override public void failed(Throwable exc, ByteBuffer buffer) { System.err.println("receive msg from client error: " + exc.getMessage()); } }
- 写回完毕事件处理器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
static class WriteCompletionHandler implements CompletionHandler<Integer, ByteBuffer> { private final AsynchronousSocketChannel clientChannel; public WriteCompletionHandler(AsynchronousSocketChannel clientChannel) { this.clientChannel = clientChannel; } @Override public void completed(Integer bytesWritten, ByteBuffer buffer) { System.out.println("send msg to client success!"); try { clientChannel.close(); } catch (IOException e) { e.printStackTrace(); } } @Override public void failed(Throwable exc, ByteBuffer buffer) { System.err.println("send msg to client error: " + exc.getMessage()); } }
客户端核心实现
|
|
由于篇幅原因,测试结果省略。至此,与 Java 程序 I/O 模型相关的内容就介绍完了。
总结
网络 I/O 模型描述了网卡与 Socket 监听线程之间的交互方式。无论是基础 I/O 模型,还是 I/O 多路复用模型,它们都有其各自的应用场景:
- 基础 I/O 模型的 API 相对简单,特别适合用来实现一些简易的客户端。
- 多路复用 I/O 模型通过批量处理,将多个 Socket 交由个别线程进行统一处理,从而极大地提高了吞吐量。然而,由于该模型不支持异步 I/O,用户线程在处理 I/O 任务时会阻塞新到达的任务,因此通常需要设计更复杂的架构,例如主从 Reactor 多线程模型等,这增加了系统的复杂性。
- 异步 I/O 模型解决了上述同步模型中的问题,而且不需要在应用层设计复杂的架构(这部分工作下放到内核来完成了)。但这种模型仅在 Windows 平台上能得到良好的支持,我们最常用的 Linux 内核并不支持原生的网络异步 I/O,当 Java 程序部署在 Linux 平台上时,JVM 需要在用户态线程中模拟异步操作,依旧无法避免线程上下文切换以及内核数据拷贝,因此其性能与多路复用 I/O 相比并没有优势。
Linux 5.1 版本引入了 io_uring,该补丁通过两个环形队列以及内存映射等技术,在 Linux 内核上实现了接近满血的网络异步 I/O。不过目前 io_uring 仍处于频繁的优化迭代中,不适用与生产环境。等以后有空再写文章介绍 io_uring。