Java 网络 I/O 模型详解 (Reactor & Proactor)

网络 I/O 模型用于描述网络数据传输时,Socket 与处理线程之间的交互方式,不同的网络 I/O 模型适用于不同的应用场景和需求。

基础 I/O 模型

基础 I/O 模型业界一般称其为 “BIO”。该模型中有一个 Acceptor 线程阻塞监听连接事件,一旦收到客户端的连接请求,就会为该客户端创建一个独立的服务线程以进行通信,每个服务线程需要独自管理自己的 Socket。通常情况下,该线程会按顺序执行读取、计算和回复等一系列步骤。其架构图如下:

基础 I/O 模型

然而,这种模型消耗了大量的系统资源:每当有新的连接出现时,都必须新建一个线程来处理请求,而每个线程在完成任务后都必须被销毁。这种频繁的线程创建与销毁将给系统带来沉重的负担,严重降低系统的吞吐量。为了缓解这个问题,我们可以在该架构的基础上引入线程池:

基础 I/O 模型 + 线程池

服务端收到客户端连接请求后,将任务交给线程池,由线程池分配工作线程执行并在完成后归还,有效避免了频繁创建与销毁线程,提升了系统吞吐量。但由于基础 I/O 模型需要每个线程独自处理自己的 Socket,因而在请求量过大、线程数过高的情况下,线程池频繁切换线程很容易导致 load 升高,当面对十万级甚至百万级连接请求时,基础 I/O 模型就无能为力了。

为了解决这个问题,设计者们利用了批量处理的思想:将多个 Socket 统一交给一个“管理线程”处理,这便是接下来要介绍的多路复用 I/O 模型。

多路复用 I/O 模型

多路复用 I/O 模型业界一般称其为 “NIO”,从 JDK 1.4 开始引入,并在后续的版本中不断改进,直至 JDK 1.8 趋于完善,其核心架构如下:

多路复用 I/O 模型

它引入了诸如 Channel、Selector 和 Buffer 等概念,这些概念有效地封装了操作系统层面的 I/O 接口。以 Linux 平台为例,JDK 1.4 使用了 POSIX 标准的 select API(此时 Linux 还没引入 epoll),通过 Selector 对象来集中监控多个 I/O 事件,从而避免了每个线程独立处理 Socket 所带来的问题。此外,自 JDK 1.5 起,还添加了对 epoll 的支持,进一步提升了在 Linux 系统上的 I/O 性能。

Reactor 多路复用模型

Reactor 即“反应器”模型,反应器核心首先阻塞等待网络事件的到达,一旦事件发生,就通过事件驱动的方式继续进行后续的处理工作。以下是 Reactor 模型中的五个重要角色:

  • Handle (句柄或描述符):它是资源在操作系统层面的一种抽象,表示与事件绑定了的资源,即各种 SocketChannel
  • Synchronous Event Demultiplexer (同步事件分发器):Handle 代表的事件会被注册到同步事件分发器上,当事件就绪时,Demultiplexer 会将就绪的事件提交给 Reactor。
    • Demultiplexer 的本质是一个系统调用,用于等待事件的发生。调用方在调用它后会被阻塞,一直阻塞到 Demultiplexer 上有事件就绪为止。
    • 在 Linux 中,同步事件分发器指的就是 I/O 多路复用器,比如 selectpollepoll 等,Java NIO 中的 Selector 就是对多路复用器的封装。
  • Reactor (反应器):事件管理的接口,内部使用 Synchronous Event Demultiplexer 注册、注销 Event Handler,当有事件进入"就绪"状态时,调用注册事件的回调函数处理事件。
  • Event Handler (事件处理器接口):事件处理程序提供了一组接口,在 Reactor 监听到相应的事件发生时调用,执行相应的事件处理。
    • 比如当 Channel 被注册到 Selector 时的回调方法、连接事件发生时的回调方法、写事件发生时的回调方法等都是事件处理器,我们可以实现这些回调来达到对某一事件进行特定反馈的目的。
    • 原生的 Java 并不支持 Event Handler,实际业务中需要自己实现,或使用 Netty 等网络框架
  • Concrete Event Handler (事件处理器实现):它是 Event Handler 的实现类,用于实现回调方法指定的业务逻辑。

Reactor 模型有三种实现方式,分别是:单 Reactor 单线程模型、单 Reactor 多线程模型和主从 Reactor 多线程模型。

单 Reactor 单线程模型

单 Reactor 单线程模型指设计中只有一个 Reactor,无论是与 I/O 读写相关,还是与 I/O 无关的编解码和计算,都在一个线程上完成。其架构图如下所示:

单 Reactor 单线程模型

在上图中:

  • Acceptor 专门处理连接事件,而 Selector 则充当同步事件分发器。
  • 客户端的请求可以分为连接请求和其他事件请求两种。
    • Selector 上注册了一系列的 Channel,它不断监听这些 Channel。
    • 一旦某个 Channel 上的事件处理器就绪,Selector 就会将该事件分发给事件处理器。

该模型仅依靠单线程处理请求,主循环承担了太多的任务,容易在高并发情境下造成请求积压甚至超时。此外,单线程无法有效利用多核资源。因此,更合适的做法是为解码、计算和编码操作引入额外的线程,并使用线程池进行管理。

单 Reactor 多线程模型

单 Reactor 多线程模型是指仅有一个线程负责执行 I/O 操作和处理连接请求,其他逻辑均由 Worker 线程执行。其架构图如下:

单 Reactor 多线程模型

与第一种模型相比,单 Reactor 多线程模型将业务逻辑委托给线程池来处理,从而可以更有效地利用多核 CPU 资源。然而,单个线程的 Reactor 仍负责监听和响应所有事件,这意味着当 Reactor 处理读写事件时,其他客户端的连接操作可能无法得到及时处理。因此,主从 Reactor 多线程模型应运而生。

主从 Reactor 多线程模型

该模型将处理连接事件的 Reactor 与处理读写事件的 Reactor 分离,避免了读写事件较为频繁的情况下影响新客户端连接。

主从 Reactor 多线程模型

主从 Reactor 多线程模型中存在多个 Reactor,Main-Reactor 一般只有一个,它负责监听和处理连接请求;而 Sub-Reactor 可以有多个,用线程池进行管理,主要负责监听和处理读写事件等。当然 Main Reactor 也可以多个,也通过线程池管理,但是这样会增加系统复杂度,需要合理规划调度,否则反而会拖累性能。

Java NIO API

Buffer (缓冲区)

Buffer 本质上是可读可写的内存块,它提供了简化内存操作的方法,并通过属性记录缓冲区的状态变化。

java.nio.Buffer 类及其子类
Buffer 类及其子类

java.nio.Buffer 有很多实现类,例如:ByteBufferCharBufferLongBuffer 等,分别用于处理不同的数据类型,以提高性能。

缓冲区对象创建

以上所有类型的 Buffer 都支持以下两种方法创建:

方法名说明
allocate()创建一个新 buffer
wrap(double[] array, ...)根据现有内容创建一个缓冲区

其中 ByteBuffer 比较特殊,它有两种不同的缓冲区:

  • 直接缓冲区:在系统内核缓冲中分配的缓冲区,通过 allocateDirect() 方法分配,可以直接操作 JVM 堆外内存;
  • 非直接缓冲区:普通的 JVM 堆内缓冲区,通过 allocate() 方法分配。
向缓冲区添加数据
方法名说明
XxxBuffer put(..)向各类 Buffer 中添加数据
int position()/Buffer position(int newPosition)Buffer 基类规定的方法,用于获得当前要操作的索引/修改当前要操作的索引位置
int limit()/Buffer limit(int newLimit)Buffer 基类规定的方法,用于查询最多能操作到哪个索引/修改最多能操作的索引位置
int capacity()Buffer 基类规定的方法,返回缓冲区的总长度
int remaining()/boolean hasRemaining()Buffer 基类规定的方法,查询还有多少能操作的索引/查询是否还能操作

写操作图解:

nio buffer 写操作图解
读取缓冲区数据
方法名说明
get()读取一个单位类型数据
flip()反转缓冲区,将 limit 设置为 position,再将 position 设为 0

常用于写入数据后将 Buffer 切换为读模式
get(int index)读指定索引处的单位数据
rewind()position 置为 0,用于重复读取
clear()初始化缓冲区,将 position 设为 0,limit 设置为最大容量capacity,同时保留 Buffer 内的数据

常用于读取数据后将 Buffer 切换为写模式
array()将缓冲区转换成数组 char[] 返回

flip() 方法图解:

nio buffer flip() 方法图解

clear() 方法图解:

nio buffer clear() 方法图解

Channel (通道)

Channel 是一个全双工读写通道,同时支持阻塞和非阻塞模式。它类似于 I/O 流,但也有一些不同之处:

  • Channel 可读可写全双工,而流一般来说是单向的,需要区分输入流和输出流;
  • Channel 支持异步读写;
  • Channel 总是基于 Buffer 读写。

java.nio.channels 中提供了四类 Channel,分别是:

  • XxxSocketChannel:用于客户端 TCP 操作;
  • XxxServerSocketChannel:用于服务端 TCP 操作;
  • DatagramChannel:用于 UDP 操作。
  • XxxFileChannel:用于文件操作;

Selector (选择器)

Selector 用于持续轮询注册在其上的 Channel,以选择并分发已处理的就绪事件。多路复用 I/O 模型里的事件有以下四种:

  • 连接事件;
  • 接收事件;
  • 可读事件;
  • 可写事件。

Selector 可以同时轮训和监控多个 Channel,当 Selector 发现某个 Channel 的数据状态发生变化时,会通过 SelectorKey 触发相关事件,并由监听此事件的事件处理器来执行相关逻辑。其常用 API 如下:

  • java.nio.Selector 抽象类:

    方法名说明
    Selector open()获取一个 Selector 对象
    int select()阻塞监控所有注册的 Channel,当有对应事件发生,会将 SelectorKey 放入集合内部并返回事件数量
    int select(long timeout)带超时的阻塞监听
    selectedKeys()返回存有 SelectorKey 的集合
  • java.nio.channels.SelectionKey 抽象类:

    方法名说明对应事件属性
    isAcceptable()是否是连接继续事件SelectionKey.OP_ACCEPT
    isConnectable()是否是连接就绪事件SelectionKey.OP_CONNECT
    isReadable()是否是可读事件SelectionKey.OP_READ
    isWritable()是否是可写事件SelectionKey.OP_WRITE

单 Reactor 单线程模型代码示例

简单起见,本文只介绍如何实现单 Reactor 单线程模型。

服务端实现

我们首先构建服务端的核心部分,即构造方法和 main 方法。这两个关键方法将用于启动服务器并激活 Reactor 反应器:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class ReactorServer {

    private final Selector selector;

    public ReactorServer() throws IOException {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        ServerSocket serverSocket = serverSocketChannel.socket();
        serverSocket.bind(new InetSocketAddress(1234));

        selector = Selector.open();
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    }

    // 其他内容暂时省略 ...

    public static void main(String[] args) throws IOException {
        ReactorServer server = new ReactorServer();
        System.out.println("Server start ...");
        Reactor reactor = server.new Reactor();
        reactor.run();
    }
}

接下来,我们将专注于 Reactor 类的实现。考虑到该模型的依赖性,这里将 Reactor 设计为内部类。同时,为了简化代码,我们将事件处理器的实现直接嵌入到了 Reactor 中:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public class Reactor {
    public void run() {
        try {
            // Reactor 循环
            while (true) {
                selector.select();
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while (iterator.hasNext()) {
                    SelectionKey selectionKey = iterator.next();
                    iterator.remove();
                    if (selectionKey.isAcceptable()) {
                        // 处理连接事件
                        handleAcceptEvent(selectionKey);
                    } else if (selectionKey.isReadable()) {
                        // 处理可读事件
                        handleReadEvent(selectionKey);
                    } else if (selectionKey.isWritable()) {
                        // 处理可写事件
                        handleWriteEvent(selectionKey);
                    }
                }
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    // 连接事件处理器
    private void handleAcceptEvent(SelectionKey selectionKey) throws IOException {
        ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
        SocketChannel clientChannel = serverSocketChannel.accept();
        if (clientChannel != null) {
            clientChannel.configureBlocking(false);// 设置非阻塞
            // 监听可读事件
            clientChannel.register(selector, SelectionKey.OP_READ);
        }
    }
    // 可读事件处理器
    private void handleReadEvent(SelectionKey selectionKey) throws IOException {
        SocketChannel clientChannel = (SocketChannel) selectionKey.channel();
        ByteBuffer receiveBuffer = ByteBuffer.allocate(1024);
        int count = clientChannel.read(receiveBuffer);
        if (count > 0) {
            String context = new String(receiveBuffer.array(), 0, count);
            System.out.println("Received from client: " + context);
            // 读取成功后监听可写事件
            selectionKey.interestOps(SelectionKey.OP_WRITE);
        }
    }
    // 可写事件处理器
    private void handleWriteEvent(SelectionKey selectionKey) throws IOException {
        SocketChannel clientChannel = (SocketChannel) selectionKey.channel();
        ByteBuffer sendBuffer = ByteBuffer.wrap(("Hello client!").getBytes());
        clientChannel.write(sendBuffer);
        // 写回后,继续监听可读事件
        selectionKey.interestOps(SelectionKey.OP_READ);
    }
}

客户端实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
public class TestReactorClient {

    private final String serverHost;
    private final int serverPort;
    private Selector selector;
    private final ExecutorService executorService = Executors.newSingleThreadExecutor();
    private SocketChannel clientChannel;

    public TestReactorClient(String serverHost, int serverPort) {
        this.serverHost = serverHost;
        this.serverPort = serverPort;
    }

    public void run() throws IOException, InterruptedException {
        connect();
        Thread.sleep(1000);
        sendMsg();
        executorService.shutdown();
    }

    private void connect() throws IOException {
        clientChannel = SocketChannel.open();
        clientChannel.configureBlocking(false);
        selector = Selector.open();
        clientChannel.register(selector, SelectionKey.OP_CONNECT);
        clientChannel.connect(new InetSocketAddress(serverHost, serverPort));
        selector.select();
        clientChannel.finishConnect();
        selector.selectedKeys().clear();
        clientChannel.register(selector, SelectionKey.OP_READ);
        executorService.execute(this::handleEvent);
    }

    private void handleEvent() {
        try {
            while (true) {
                selector.select();
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while (iterator.hasNext()) {
                    SelectionKey selectionKey = iterator.next();
                    iterator.remove();
                    if (selectionKey.isReadable()) {
                        ByteBuffer receiveBuffer = ByteBuffer.allocate(1024);
                        int count = clientChannel.read(receiveBuffer);
                        if (count > 0) {
                            String context = new String(receiveBuffer.array(), 0, count);
                            System.out.println("Received from server: " + context);
                        }
                    }
                }
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private void sendMsg() throws IOException {
        ByteBuffer sendBuffer = ByteBuffer.wrap("Hello server!".getBytes());
        clientChannel.write(sendBuffer);
        System.out.println("Sent to server: Hello server!");
    }

    public static void main(String[] args) throws IOException, InterruptedException {
        TestReactorClient client = new TestReactorClient("127.0.0.1", 1234);
        client.run();
    }
}

由于篇幅原因,测试结果省略。

异步 I/O 模型

异步 I/O 模型业界一般称其为 “AIO”,从 JDK 1.7 开始支持。在该模型中,当后台数据处理完成时,内核会通知相应的线程直接获取已经处理好的数据(这是异步与同步之间最本质的区别,即内核返回的是可读事件通知,还是已经处理好的数据),并继续执行后续的操作。

注意

Windows 提供了一套完整的支持 Socket 异步编程的接口 —— IOCP(JDK 中由 sun.nio.ch.Iocp 封装),它是真正意义上的由操作系统实现的异步 I/O。

然而,Linux 内核并不支持异步网络 I/O,因此 Linux 平台上的 JDK 中的异步 API 是由 JVM 在用户态模拟出来的。详情可以参考源码 sun.nio.ch.EPollPort(还是封装的 epoll),本文就不展开了。

不过,下文在介绍异步概念时,我们仍以抽象内核为主体,而暂时忽略 Linux 平台的特性。

Proactor 异步模型

Proactor 模型是与 Reactor 对标的异步模型。该模型有如下六个角色:

  • Handle:同 Reactor 模型的 Handle,表示与事件绑定了的资源,即 AsynchronousServerSocketChannelAsynchronousSocketChannel 等异步管道。
  • Asynchronous Operation Processor (异步操作处理器):由操作系统内核实现,负责执行相关事件的 I/O 操作。
  • Proactor (前摄器):由操作系统内核实现,负责管理事件循环。它通过 Async Operation Processor 来执行 I/O 操作,当事件完成时,Proactor 会调用相应的完成事件处理器(Completion Event Handler)来处理完成的事件。
  • Completion Event Queue (完成事件队列):由操作系统内核实现,Async Operation Processor 执行完的 I/O 操作结果会放入该队列,Proactor 会从该队列中获得相应的结果。
  • Completion Event Handler (完成事件接口):完成事件处理器抽象层,一般是由回调函数组成的接口,例如 Java 中的 CompletionHandler
  • Concrete Completion Event Handler:完成事件处理器的具体实现。

Proactor 模型与 Reactor 模型很相似,也会进行事件分发,与 Reactor 不同的是,它注册的并不是就绪事件,而是完成事件。Reactor 模型需要应用程序自己处理 I/O 操作,而 Proactor 模型则是由内核线程处理,当执行事件处理器时,Reactor 模型下的 I/O 操作还没有完成,只是就绪,而在 Proactor 模型下 I/O 操作已经完成。

Java AIO API

AsynchronousChannel

Java AsynchronousChannel

以上是 Java 在 java.nio.channels 包中新增的与异步相关的类,我们重点关注其中的 AsynchronousServerSocketChannelAsynchronousChannelGroupCompletionHandler,其 API 如下:

  • java.nio.channels.AsynchronousServerSocketChannel 接口:

    方法说明
    bind将 channel 的 socket 绑定到目标地址,并将 socket 配置为侦听连接
    <A> void accept(A attachment, CompletionHandler<AsynchronousSocketChannel,? super A> handler)接收来自客户端的连接,同时注册连接完毕事件处理器 CompletionHandler
    Future<AsynchronousSocketChannel> accept()接收来自客户端的连接,并返回一个 Future 句柄
  • java.nio.channels.AsynchronousChannelGroup 抽象类:

    为异步 Channel 提供线程池支持。

    方法说明
    AsynchronousChannelGroup withFixedThreadPool(int nThreads, ThreadFactory threadFactory)创建具有固定线程池的异步 Channel Group
    AsynchronousChannelGroup withCachedThreadPool(ExecutorService executor, int initialSize)使用给定的线程池创建异步 Channel Group,该线程池根据需要创建新线程。通过 initialSize 控制初始线程数
    AsynchronousChannelGroup withThreadPool(ExecutorService executor)使用给定的线程池创建异步 Channel Group,内部调用的 withCachedThreadPool(executor, 0)
  • java.nio.channels.CompletionHandler 接口:

    方法说明
    void completed(V result, A attachment)操作完成时的回调
    void failed(Throwable exc, A attachment)操作失败时的回调

AsynchronousServerSocketChannel 是 Java 异步 I/O 的核心,它将监听来自客户端的请求,进而选择将完成事件以 CompletionHandlerFuture 的形式提供给上层应用。

其中,“完成式模型” CompletionHandler 比较好理解,他们都是 java.nio 体系结构下的,而“将来式模型” Future 则属于 JUC 体系结构,其 API 如下:

方法说明
boolean cancel(boolean mayInterruptIfRunning)尝试取消此任务的执行。
  • 如果任务已完成、已被取消或由于其他原因无法取消,则此尝试将失败。
  • 如果成功,并且在调用 cancel 时此任务尚未启动,则此任务不应运行。
  • 如果任务已启动,则根据 mayInterruptIfRunning 参数确定是否应中断执行此任务的线程以尝试停止任务。
    此方法返回后,对 isDone 的后续调用将始终返回 true。如果此方法返回 true,则对 isCancelled 的后续调用将始终返回 true。
  • boolean isCancelled()正常终止、异常或取消状态都返回 true
    V get()阻塞等待任务的执行结果,可以被中断
    V get(long timeout, TimeUnit unit)阻塞一定时间,等待任务执行结果
    boolean isDone()判断任务是否终止(成功、异常、取消状态都算)

    Future 是 JUC 工具包中的一个强大接口,主要用于并发任务管理、结果获取和任务取消等场景。

    Proactor 模型代码示例

    接下来,让我们通过 CompletionHandler 实现一个简易的 Proactor 服务器,并用 Future 实现客户端进行测试。

    服务端实现

    首先,我们着手实现 main 方法。这个方法承担着与内核中的 Proactor、完成事件处理器等关键组件进行交互的职责,同时负责注册连接完成事件处理器:

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    
    public class ProactorServer {
    
        public static void main(String[] args) throws IOException {
            AsynchronousChannelGroup channelGroup =
                    AsynchronousChannelGroup.withThreadPool(Executors.newFixedThreadPool(10));
            // 创建异步管道,并使用线程池
            AsynchronousServerSocketChannel serverChannel = AsynchronousServerSocketChannel.open(channelGroup);
            serverChannel.bind(new InetSocketAddress("127.0.0.1", 1234));
            System.out.println("Server start ...");
            // 注册连接处理器,处理客户端连接请求
            CompletionHandler<AsynchronousSocketChannel, Void> acceptHandler = new AcceptCompletionHandler(serverChannel);
            serverChannel.accept(null, acceptHandler);
    
            // 阻塞防止主线程退出
            try {
                Thread.currentThread().join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    接下来,我们将专注于不同类型的完成事件处理器的实现,这些处理器都会遵循 CompletionHandler 接口的规范:

    • 连接成功事件处理器
       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      
      static class AcceptCompletionHandler implements CompletionHandler<AsynchronousSocketChannel, Void> {
      
          private final AsynchronousServerSocketChannel serverChannel;
      
          public AcceptCompletionHandler(AsynchronousServerSocketChannel serverChannel) {
              this.serverChannel = serverChannel;
          }
      
          @Override
          public void completed(AsynchronousSocketChannel clientChannel, Void attachment) {
              // 连接事件处理成功后,监听可读事件
              CompletionHandler<Integer, ByteBuffer> readHandler = new ReadCompletionHandler(clientChannel);
              ByteBuffer buffer = ByteBuffer.allocate(1024);
              clientChannel.read(buffer, buffer, readHandler);
              // 通知客户端接受连接
              serverChannel.accept(null, this);
          }
      
          @Override
          public void failed(Throwable exc, Void attachment) {
              System.err.println("connect failed: " + exc.getMessage());
          }
      }
    • 读取完毕事件处理器
       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      
      static class ReadCompletionHandler implements CompletionHandler<Integer, ByteBuffer> {
      
          private final AsynchronousSocketChannel clientChannel;
      
          public ReadCompletionHandler(AsynchronousSocketChannel clientChannel) {
              this.clientChannel = clientChannel;
          }
      
          @Override
          public void completed(Integer bytesRead, ByteBuffer buffer) {
              if (bytesRead > 0) {
                  buffer.flip();
                  byte[] data = new byte[bytesRead];
                  buffer.get(data);
                  String receivedData = new String(data);
                  System.out.println("receive msg from client: " + receivedData);
      
                  // 接收请求并返回响应
                  String response = "hello, client!";
                  ByteBuffer responseBuffer = ByteBuffer.wrap(response.getBytes());
                  CompletionHandler<Integer, ByteBuffer> writeHandler = new WriteCompletionHandler(clientChannel);
                  clientChannel.write(responseBuffer, responseBuffer, writeHandler);
              }
          }
      
          @Override
          public void failed(Throwable exc, ByteBuffer buffer) {
              System.err.println("receive msg from client error: " + exc.getMessage());
          }
      }
    • 写回完毕事件处理器
       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      
      static class WriteCompletionHandler implements CompletionHandler<Integer, ByteBuffer> {
      
          private final AsynchronousSocketChannel clientChannel;
      
          public WriteCompletionHandler(AsynchronousSocketChannel clientChannel) {
              this.clientChannel = clientChannel;
          }
      
          @Override
          public void completed(Integer bytesWritten, ByteBuffer buffer) {
              System.out.println("send msg to client success!");
      
              try {
                  clientChannel.close();
              } catch (IOException e) {
                  e.printStackTrace();
              }
          }
      
          @Override
          public void failed(Throwable exc, ByteBuffer buffer) {
              System.err.println("send msg to client error: " + exc.getMessage());
          }
      }

    客户端核心实现

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    
    static class ClientThread implements Runnable {
        private final int clientId;
        public ClientThread(int clientId) {
            this.clientId = clientId;
        }
    
        @Override
        public void run() {
            try {
                AsynchronousSocketChannel clientChannel = AsynchronousSocketChannel.open();
                Future<Void> future = clientChannel.connect(new InetSocketAddress("localhost", 1234));
                future.get(); // 阻塞等待连接成功
    
                String message = "Hello from client " + clientId;
                ByteBuffer buffer = ByteBuffer.wrap(message.getBytes());
                Future<Integer> writeFuture = clientChannel.write(buffer);
                writeFuture.get(); // 阻塞等待发送成功
    
                ByteBuffer responseBuffer = ByteBuffer.allocate(1024);
                Future<Integer> readFuture = clientChannel.read(responseBuffer);
                readFuture.get(); // 阻塞等待服务端返回数据
    
                responseBuffer.flip();
                byte[] responseData = new byte[responseBuffer.remaining()];
                responseBuffer.get(responseData);
                System.out.println("Client " + clientId + " received response: " + new String(responseData));
    
                clientChannel.close();
            } catch (IOException | InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }
    }

    由于篇幅原因,测试结果省略。至此,与 Java 程序 I/O 模型相关的内容就介绍完了。

    总结

    网络 I/O 模型描述了网卡与 Socket 监听线程之间的交互方式。无论是基础 I/O 模型,还是 I/O 多路复用模型,它们都有其各自的应用场景:

    • 基础 I/O 模型的 API 相对简单,特别适合用来实现一些简易的客户端。
    • 多路复用 I/O 模型通过批量处理,将多个 Socket 交由个别线程进行统一处理,从而极大地提高了吞吐量。然而,由于该模型不支持异步 I/O,用户线程在处理 I/O 任务时会阻塞新到达的任务,因此通常需要设计更复杂的架构,例如主从 Reactor 多线程模型等,这增加了系统的复杂性。
    • 异步 I/O 模型解决了上述同步模型中的问题,而且不需要在应用层设计复杂的架构(这部分工作下放到内核来完成了)。但这种模型仅在 Windows 平台上能得到良好的支持,我们最常用的 Linux 内核并不支持原生的网络异步 I/O,当 Java 程序部署在 Linux 平台上时,JVM 需要在用户态线程中模拟异步操作,依旧无法避免线程上下文切换以及内核数据拷贝,因此其性能与多路复用 I/O 相比并没有优势。

      Linux 5.1 版本引入了 io_uring,该补丁通过两个环形队列以及内存映射等技术,在 Linux 内核上实现了接近满血的网络异步 I/O。不过目前 io_uring 仍处于频繁的优化迭代中,不适用与生产环境。等以后有空再写文章介绍 io_uring。

    0%