I/O 多路复用机制的概念
有过 Linux C 开发经验的同学都知道,在最基础的网络 I/O 模型中,每个用户进程进入内核态后,如果发现监听的 socket 对象的接收队列没有数据,就会阻塞等待。当内核发现数据到达时,再唤醒等待的进程,继续处理 I/O 数据。每次等待、唤醒都需要切换两次进程上下文。如果在客户端这么做还能够接受,但在高并发服务器上,频繁的上下文切换将严重降低网络吞吐量。为了避免由这种上下文切换带来的资源浪费,I/O 多路复用机制应运而生。
I/O 多路复用是一种用户进程与内核进程之间的特殊协作方式,它允许单个进程处理多个网络连接,而不再局限于一对一的处理方式。以 Linux 平台为例,它支持以下几种路复用方式:
- POSIX 标准定义的 select 和 poll;
- Linux 平台相关的 epoll。
其中,epoll 在性能方面的表现最为出色。
本文内容基于 Linux-3.12,不同版本的源码不尽相同,但核心逻辑不变,不影响理解。
epoll 核心原理
如果您没有 epoll 开发经验,可以先阅读下一小节的 epoll 开发示例,以便对 epoll 有一个基本的了解。
epoll_create 创建内核对象
在用户进程调用 epoll_create
时,内核会创建一个 struct eventpoll
类型的内核对象,并把它关联到当前进程已打开的文件列表 fdtable
中:
其详细结构如下:
wq
:等待队列链表。当数据就绪触发软中断时,会通过 wq
来寻找阻塞在 epoll 对象上的用户进程;rbr
:一颗红黑树,用于管理用户进程添加的所有 TCP 连接,可以实现高效的增、删、查询操作;rdllist
:就绪的文件描述符列表。当有连接就绪时,内核会将这些就绪的连接添加到这个链表中,这样用户进程就可以直接获取已就绪的所有事件,而无需像使用 POSIX API 那样遍历所有描述符。
熟悉了主要的数据结构之后,我们再来查看 fs/eventpoll.c 中 epoll_create
的源码实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| SYSCALL_DEFINE1(epoll_create1, int, flags)
{
int error, fd;
struct eventpoll *ep = NULL;
struct file *file;
...
// [1] 创建并初始化 eventpoll 对象
error = ep_alloc(&ep);
...
// [2] 获取一个未使用的文件描述符,作为 epfd
fd = get_unused_fd_flags(O_RDWR | (flags & O_CLOEXEC));
...
// [3] 创建 epoll 的 file 实例
// file 的 f_op 被设置为 eventpoll_fops
// file 的 private_data 设置为上边的 eventpoll 对象,即 ep
file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep, O_RDWR | (flags & O_CLOEXEC));
...
// [4] 建立 fd 和 file 的关联关系
fd_install(fd, file);
return fd;
}
|
[1]
:首先调用 ep_alloc
创建一个 eventpoll
结构体,完成等待队列、就绪队列等参数的初始化: 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| static int ep_alloc(struct eventpoll **pep)
{
struct eventpoll *ep;
// 申请内存空间
ep = kzalloc(sizeof(*ep), GFP_KERNEL);
...
// 初始化等待队列头
init_waitqueue_head(&ep->wq);
...
// 初始化就绪队列
INIT_LIST_HEAD(&ep->rdllist);
...
// 初始化红黑树
ep->rbr = RB_ROOT;
...
}
|
[2]
:然后,获取一个未使用的文件描述符,作为 epfd
;[3]
:再然后,创建 epoll file
实例,以及匿名 inode
节点和 dentry
等数据结构:- epoll 文件的
file->f_op
会被设置为 eventpoll_fops
,否则将不支持 epoll: 1
2
3
4
5
6
7
8
9
10
| static const struct file_operations eventpoll_fops = {
#ifdef CONFIG_PROC_FS
.show_fdinfo = ep_show_fdinfo,
#endif
.release = ep_eventpoll_release,
// poll 指针不为空,表示支持 epoll 操作
// 因此普通文件不支持 epoll
.poll = ep_eventpoll_poll,
.llseek = noop_llseek,
};
|
- epoll 文件的
file->private_data
会被设置为参数传入的 eventpoll
变量 (ep
);
[4]
:最后建立 fd
和 file
之间的关联关系。
epoll_ctl 添加 socket
当用户进程调用 epoll_ctl
向 epoll 添加 socket 时,内核会做以下三件事情:
- 初始化一个红黑树节点对象
epitem
; - 将监听的事件添加到 socket 的等待队列中,其回调函数是
ep_poll_callback
; - 将
epitem
节点插入 epoll 对象的红黑树。
源码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
| SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
struct epoll_event __user *, event)
{
struct fd f, tf;
struct eventpoll *ep;
struct epitem *epi;
struct epoll_event epds;
...
// [1] 根据 epfd 找到其封装了 file 的 fd 内核对象
f = fdget(epfd);
// 根据需要被监听 socket 文件描述符,找到其封装了 file 的 fd 内核对象
tf = fdget(fd);
/* 确认被监听的 tf 支持 epoll,即文件的 fops 为 struct file_operations */
if (!tf.file->f_op || !tf.file->f_op->poll)
...
// [2] 找到 epfd 关联的 eventpoll 内核对象
ep = f.file->private_data;
...
switch (op) {
case EPOLL_CTL_ADD:
if (!epi) { // 新增操作,需要确保 epitem 不存在
epds.events |= POLLERR | POLLHUP;
// [3] 完成 socket 注册操作
error = ep_insert(ep, &epds, tf.file, fd);
} else ...
clear_tfile_check_list();
break;
...
}
return error;
}
|
epoll_ctl
首先根据文件描述符,找到被监听 fd
关联的内核文件对象 tf.file
,以及 epfd
关联的 epollevent
,然后调用 ep_insert
完成注册操作:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
| static int ep_insert(struct eventpoll *ep, struct epoll_event *event,
struct file *tfile, int fd)
{
// [1] 初始化一个 epitem
...
struct epitem *epi;
struct ep_pqueue epq;
...
if (!(epi = kmem_cache_alloc(epi_cache, GFP_KERNEL)))
return -ENOMEM;
/* Item initialization follow here ... */
...
INIT_LIST_HEAD(&epi->pwqlist);
epi->ep = ep;
// epitem->fdd 中存储了被监听 socket 的 fd 描述符和 struct file 对象
ep_set_ffd(&epi->ffd, tfile, fd);
...
epq.epi = epi;
// [2] 初始化 socket 等待队列,将回调函数 ep_poll_callback 设置为数据就绪时的回调
init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
revents = ep_item_poll(epi, &epq.pt);
...
// [3] 插入红黑树
ep_rbtree_insert(ep, epi);
...
error_xxx:
// 各种错误处理
return error;
}
|
[1]
:初始化一个 epitem
,epitem
中的 fdd
存储了被监听 socket 的 fd
描述符和 struct file
对象地址;[2]
:调用 init_poll_funcptr()
函数,将 ep_ptable_queue_proc()
函数注册为 poll 模式的回调,在 ep_ptable_queue_proc()
里再把 ep_poll_callback()
设置为监听事件触发时真正的回调;[3]
:将这个 epitem
插入红黑树。
接下来,我们将对这三个步骤进行详细分析。
分配并初始化 epitem
对于每一个 socket,调用 epoll_ctl
注册事件时,都会为其创建一个 epitem
对象,该结构体的数据结构如下:
1
2
3
4
5
6
7
8
9
10
11
| struct epitem {
// 红黑树节点
struct rb_node rbn;
// 被监听 socket 的文件描述符信息
struct epoll_filefd ffd;
// 所属的 eventpoll 对象
struct eventpoll *ep;
// 等待队列头节点
struct list_head pwqlist;
...
};
|
初始化完成后的 epitem
关联关系如下:
到这一步,epitem 已经实现了 eventpoll 文件和被监听 socket 文件的关联。
设置 socket 等待队列
创建并初始化 epitem
后,紧接着就是初始化 socket 对象上的等待队列,并把 fs/eventpoll.c 中的 ep_epoll_callback()
设置为数据就绪时的回调函数。这里的实现比较复杂,是通过 poll 模式完成的,首先来看 ep_item_poll
:
1
2
3
4
5
| static inline unsigned int ep_item_poll(struct epitem *epi, poll_table *pt)
{
pt->_key = epi->event.events;
return epi->ffd.file->f_op->poll(epi->ffd.file, pt) & epi->event.events;
}
|
这里调用到了 socket (epi->ffd
) 下的 file->f_op->poll()
,该函数指针指向的是 net/socket.c 中的 sock_poll()
:
1
2
3
4
5
| static unsigned int sock_poll(struct file *file, poll_table *wait)
{
...
return busy_flag | sock->ops->poll(file, sock, wait);
}
|
其中的 sock->ops->poll()
又指向 tcp_poll()
:
1
2
3
4
5
6
7
8
| unsigned int tcp_poll(struct file *file, poll_table *wait)
{
...
struct sock *sk = sock->sk;
...
sock_poll_wait(file, sk_sleep(sk), wait);
...
}
|
在调用 sock_poll_wait()
之前,先在第二个参数这里调用了 sk_sleep()
函数。sk_sleep()
函数会返回 sock
对象里等待队列的头节点 wait_queue_head_t
,稍后等待队列项将插入这里,实现如下:
1
2
3
4
5
| static inline wait_queue_head_t *sk_sleep(struct sock *sk)
{
BUILD_BUG_ON(offsetof(struct socket_wq, wait) != 0);
return &rcu_dereference_raw(sk->sk_wq)->wait;
}
|
注意
这里的等待队列是 socket 中的,而非 epoll 的等待队列。
接着真正进入 include/net/sock.h 中的 sock_poll_wait()
函数:
1
2
3
4
5
6
7
| static inline void sock_poll_wait(struct file *filp,
wait_queue_head_t *wait_address, poll_table *p)
{
...
poll_wait(filp, wait_address, p);
...
}
|
其核心实现是 include/net/poll.h 中的 poll_wait()
函数:
1
2
3
4
5
| static inline void poll_wait(struct file *filp,
wait_queue_head_t *wait_address, poll_table *p) {
if (p && p->_qproc && wait_address)
p->_qproc(filp, wait_address, p)
}
|
_qproc
函数指针在上文 ep_insert 函数执行时会被 init_poll_funcptr 设置为 ep_ptable_queue_proc()
,到这里总算进入了重点:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,
poll_table *pt)
{
// 根据地址拿到参数 pt 指针下边的 epitem,也就是上边创建的那个 epitem
struct epitem *epi = ep_item_from_epqueue(pt);
struct eppoll_entry *pwq;
// [1] 分配一个 eppoll_entry 结构体
if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) {
// [2] 将 eppoll_entry->wait->func 指向 ep_poll_callback
init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);
// [3] 完成最终关联
pwq->whead = whead; // whead 为 socket 等待队列头
pwq->base = epi; // base 指向 epitem
add_wait_queue(whead, &pwq->wait);
...
}...
}
|
[1]
:首先分配一个 eppoll_entry
结构体,该结构体负责关联 epitem
、事件就绪回调函数与 socket 等待队列:1
2
3
4
5
6
7
8
9
| struct eppoll_entry {
...
// 指向 epitem
struct epitem *base;
// 指向回调函数
wait_queue_t wait;
// 指向被监听的 socket 等待队列
wait_queue_head_t *whead;
};
|
[2]
:调用 init_waitqueue_func_entry()
将 eppoll_entry->wait->func
指向 ep_poll_callback()
:1
2
3
4
5
6
7
| static inline void init_waitqueue_func_entry(wait_queue_t *q,
wait_queue_func_t func)
{
q->flags = 0;
q->private = NULL;
q->func = func;
}
|
注意
正常情况下,q->private
指向当前用户进程,当 socket 有数据到达时唤醒用户进程,但这里的 socket 是交给 epoll 管理的,因此 q->private
会被设置为 NULL。
[3]
:完成最终关联。这里首先用 eppoll_entry->whead
缓存当前 socket 等待队列的头节点,然后将 eppoll_entry->base
指向传入的 epitem
,最后调用 add_wait_queue()
将 eppoll_entry->wait
移动到 socket 等待队列头节点,完成最终关联。
到这一步,相关的结构体之间的关系如下:
插入红黑树
完成上述两个步骤后,将这个 epitem
插入红黑树,核心流程就完成了。红黑树使得 epoll 在查找效率、插入效率、内存开销等多个方面都更加均衡。
最终,这些内核数据结构的最终关系如下图所示:
epoll_wait 等待事件就绪
epoll_wait
的逻辑非常简单:当调用它时,它会检查 eventpoll->rdllist
链表,查看其中是否有准备好的 socket,如果没有,它会创建一个等待项并将其加入到 eventpoll->wq
等待队列中,然后将自己阻塞。如下图所示:
其源码实现如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
| static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
int maxevents, long timeout)
{
...
wait_queue_t wait;
...
fetch_events:
// [1] 判断就绪队列上是否有事件就绪
if (!ep_events_available(ep)) {
// [2] 未就绪,则定义等待事件项并关联当前进程
init_waitqueue_entry(&wait, current);
// [3] 将等待事件项添加到 epoll 的等待队列上
__add_wait_queue_exclusive(&ep->wq, &wait);
for (;;) {
...
// [4] 让出 CPU,阻塞直到超时或触发软中断
if (!schedule_hrtimeout_range(to, slack, HRTIMER_MODE_ABS))
timed_out = 1;
...
// [5] 判断是否存在就绪事件
eavail = ep_events_available(ep);
...
if (!res && eavail &&
// [6] 向用户态发送就绪事件,就绪的事件会被转储到 events 数组
!(res = ep_send_events(ep, events, maxevents)) && !timed_out)
goto fetch_events;
return res;
}
|
[1]
:调用 ep_events_available()
判断就绪队列上是否存在已经就绪的 socket,这个队列上的数据是由上文注册的回调 ep_poll_callback()
插入的;[2]
:如果没有就绪事件,就调用 init_waitqueue_entry()
创建一个 epoll 等待队列项 wait_queue_t
,同时把当前进程挂在 wait_queue_t->private
上;[3]
:将创建的 wait_queue_t
添加到 eventpoll->wq
上;[4]
:让出 CPU,阻塞直到超时或数据就绪发送中断;[5]
:判断是否存在就绪事件;[6]
:当前进程被唤醒,且存在就绪事件时,调用 ep_send_events
将绪的事件转储到用户态 events
数组。ep_send_events
方法内部会调用 __copy_to_user
将数据复制到用户空间。
事件就绪时的唤醒过程
在前边执行 epoll_ctl
监听事件时,内核为每个 socket 都添加了一个注册了 ep_poll_callback()
回调的等待队列项。在调用 epoll_wait
陷入阻塞前,又在 eventpoll
对象的等待队列中添加了指向当前多路复用线程的等待项。
- 在 socket 的等待队列中,其回调函数是
ep_poll_callback()
,另外其 private
不需要唤醒进程,所以指向了 NULL; - 在
eventpoll
的等待队列中,其回调函数是 default_wake_function()
,其 private
指针指向等待事件就绪的用户进程,即控制 epoll 的多路复用进程。
当 socket 有数据到达时,内核会查找 socket 等待队列元素上的回调函数,并交给系统的软中断去调用,ep_poll_callback()
的实现如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key)
{
...
// [1] 获取 wait 节点关联的 epitem
struct epitem *epi = ep_item_from_wait(wait);
// [2] 获取 epitem 所属的 eventpoll
struct eventpoll *ep = epi->ep;
...
// [3] 将当前 epitem 添加到 eventpoll 的就绪队列
if (!ep_is_linked(&epi->rdllink)) {
list_add_tail(&epi->rdllink, &ep->rdllist);
ep_pm_stay_awake_rcu(epi);
}
// [4] 查看 eventpoll 的等待队列上是否有等待的进程
if (waitqueue_active(&ep->wq))
wake_up_locked(&ep->wq); // 如果有,则唤醒
...
return 1;
}
|
[1]
:获取就绪的 wait
节点所关联的 epitem
;[2]
:获取 epitem
所属的 eventpoll
;[3]
:将 epitem
添加到 eventpoll
的就绪队列;[4]
:调用 default_wake_function()
唤醒调用 epoll_wait
的进程。
一旦该线程被唤醒,便会使用上文介绍的 ep_send_events() 函数,将就绪队列的数据转储到用户态的 events
数组中,以供用户态进程 直接 使用。
epoll 开发示例
epoll 常用接口
epoll 有以下三个常用接口:
int epoll_create(int size)
:创建一个 epoll 句柄,其中的 size
参数用于通知内核这个监听的 socket 文件描述符数目总共有多大,需要传递的是待监听的最大 fd 值 +1。注意:一旦 epoll 句柄被成功创建,它就会占用一个文件描述符,我们可以通过查看路径 /proc/#{process_id}/fd/
找到这个 fd。因此,在使用完 epoll 后,务必要调用 close()
函数来关闭它,以免占用文件描述符。
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
:epoll 的事件注册函数,负责注册要监听的事件类型。int epoll_wait(int epfd, struct epoll_event __user *events, int maxevents, int timeout)
:等待事件的产生。- 参数
events
用来从内核得到事件的集合; - 参数
maxevents
告之内核这个 events
有多大,这个 maxevents
的值不能大于创建 epoll_create()
时的 size
; - 参数
timeout
是超时时间,单位毫秒:- 0 会立即返回;
- 如果值小于 0 将不确定,也有说法说是永久阻塞。
- 该函数的返回值为需要处理的事件数目,如返回 0 表示已超时。
epoll 工作模式
epoll 对文件描述符的操作有两种模式:边缘触发 (Edge-Triggered) 和水平触发 (Level-Triggered)。
边缘触发(Edge-Triggered)模式:
在边缘触发模式下,epoll 仅在文件描述符状态发生变化时通知应用程序,而不会反复通知相同的事件。也就是说,如果一个文件描述符从未就绪变为就绪状态,或者从就绪状态变为未就绪状态,epoll 将会通知应用程序。因此,应用程序需要一次性将所有可读或可写的数据处理完,以免错过状态变化的通知。该模式适用于需要及时响应文件描述符状态变化的场景。
水平触发(Level-Triggered)模式:
在水平触发模式下,epoll 会持续通知应用程序文件描述符仍然处于就绪状态,直到应用程序读取或写入操作导致文件描述符不再就绪。该模式适用于需要持续监测文件描述符状态并逐步处理数据的场景。
基于 epoll 的 echo demo 程序
下面我们基于 epoll 编写一个服务端 echo 程序,首先我们声明以下宏以及函数:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
| // 头文件缺一不可,即便 clion 提示未用到
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <unistd.h>
#include <sys/types.h>
/**
* 宏定义
*/
#define IPADDRESS"127.0.0.1"
#define PORT 8787
#define MAXSIZE 1024
#define LISTENQ 5
#define FDSIZE 1000
#define EPOLLEVENTS 100
/**
* 函数声明
*/
// 创建套接字并进行绑定
static int socket_bind(const char *ip, int port);
// IO 多路复用 epoll
static void do_epoll(int listenfd);
// 事件处理函数
static void handle_events(int epollfd,
struct epoll_event *events,
int num, int listenfd, char *buf);
// 连接处理器
static void handle_accpet(int epollfd, int listenfd);
// 读处理
static void do_read(int epollfd, int fd, char *buf);
// 写处理
static void do_write(int epollfd, int fd, char *buf);
// 添加事件
static void add_event(int epollfd, int fd, int state);
// 修改事件
static void modify_event(int epollfd, int fd, int state);
// 删除事件
static void delete_event(int epollfd, int fd, int state);
|
事件注册函数实现
然后着手实现上述函数,首先是增删改事件的相关函数:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| // 添加要监听的事件
void add_event(int epollfd, int fd, int state) {
struct epoll_event event;
event.events = state;
event.data.fd = fd;
epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
}
// 修改 fd 监听的事件类型
void modify_event(int epollfd, int fd, int state) {
struct epoll_event event;
event.events = state;
event.data.fd = fd;
epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &event);
}
// 删除 fd 正在监听的事件
void delete_event(int epollfd, int fd, int state) {
struct epoll_event event;
event.events = state;
event.data.fd = fd;
epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, &event);
}
|
这些函数相当重要,负责根据需求操作被监听的 fd 上的事件。
socket_bind
接下来我们实现 TCP 服务端资源初始化函数 socket_bind
:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| int socket_bind(const char *ip, int port) {
int listenfd = socket(AF_INET, SOCK_STREAM, 0);
if (listenfd < 0) {
perror("socket error");
exit(1);
}
struct sockaddr_in servaddr;
bzero(&servaddr, sizeof(servaddr));
servaddr.sin_family = AF_INET;
inet_pton(AF_INET, ip, &servaddr.sin_addr);
servaddr.sin_port = htons(port);
if (bind(listenfd, (struct sockaddr *) &servaddr, sizeof(servaddr))) {
perror("bind error");
exit(1);
return listenfd;
}
}
|
socket_bind
函数就是普通的 C 语言 TCP 服务端注册,这里不做过多介绍了。
事件循环器实现
接下来我们实现 do_epoll
函数:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| _Noreturn void do_epoll(int listenfd) {
int epollfd, ret;
struct epoll_event events[EPOLLEVENTS];
char buf[MAXSIZE];
memset(buf, 0, MAXSIZE);
//创建一个 epoll 描述符
epollfd = epoll_create(FDSIZE);
//监听连接事件
add_event(epollfd, listenfd, EPOLLIN);
//循环获取就绪事件
for (;;) {
ret = epoll_wait(epollfd, events, EPOLLEVENTS, -1);
handle_events(epollfd, events, ret, listenfd, buf);
}
close(epollfd);
}
|
do_epoll
是整个程序的核心,内部封装了 epoll 事件处理循环,统筹调度整个程序。该函数首先调用 add_event()
监听连接事件,当有请求到达时通过 handle_events
事件处理器分发相应的事件。
事件处理函数实现
handle_events()
函数实现如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
| void handle_events(int epollfd, struct epoll_event *events,
int num, int listenfd, char *buf) {
for (int i = 0; i < num; i++) {
int fd = events[i].data.fd;
//分发事件
if (fd == listenfd && (events[i].events & EPOLLIN))
handle_accpet(epollfd, listenfd);
else if (events[i].events & EPOLLIN)
do_read(epollfd, fd, buf);
else if (events[i].events & EPOLLOUT)
do_write(epollfd, fd, buf);
}
}
|
handle_events
会遍历就绪列表,然后判断各个就绪事件对应的监听类型,并将其分发给不同的事件处理器处理。各类事件处理器的实现如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
| //连接事件处理器
void handle_accpet(int epollfd, int listenfd) {
int clifd;
struct sockaddr_in cliaddr;
socklen_t cliaddrlen;
clifd = accept(listenfd, (struct sockaddr *) &cliaddr, &cliaddrlen);
if (clifd < 0)
perror("accept error");
else {
printf("acceot new client:%s:%d\n",
inet_ntoa(cliaddr.sin_addr),
cliaddr.sin_port);
// 监听客户端的发送事件
add_event(epollfd, clifd, EPOLLIN);
}
}
// 可读事件处理器
void do_read(int epollfd, int fd, char *buf) {
// 同步读取数据
ssize_t nread = read(fd, buf, MAXSIZE);
if (nread <= 0) {
if (nread == 0)
fprintf(stderr, "client close.\n");
else
perror("read error");
close(fd);
delete_event(epollfd, fd, EPOLLIN);
} else {
printf("read msg is: %s", buf);
// 修改描述符对应的事件,改为监听可写事件
modify_event(epollfd, fd, EPOLLOUT);
}
}
//可写事件处理器
void do_write(int epollfd, int fd, char *buf) {
printf("write msg is: %s", buf);
ssize_t nwrite = write(fd, buf, strlen(buf));
if (nwrite < 0) {
perror("write error");
close(fd);
delete_event(epollfd, fd, EPOLLOUT);
} else {
// 转为监听可读事件
modify_event(epollfd, fd, EPOLLIN);
memset(buf, 0, MAXSIZE);
}
}
|
可见,处理器在同步处理完 I/O 事件后,紧接着会根据业务修改要监听的事件。最终,我们在 main
函数中使用上述函数:
1
2
3
4
5
6
| int main() {
int listenfd = socket_bind(IPADDRESS, PORT);
listen(listenfd, LISTENQ);
do_epoll(listenfd);
return 0;
}
|
main
函数首先调用 socket_bind()
函数初始化 TCP 服务端资源,然后调用 listen()
初始化接收队列并监听端口,最终调用 do_epoll
循环处理事件。