Linux Epoll 核心原理与开发示例

I/O 多路复用机制的概念

有过 Linux C 开发经验的同学都知道,在最基础的网络 I/O 模型中,每个用户进程进入内核态后,如果发现监听的 socket 对象的接收队列没有数据,就会阻塞等待。当内核发现数据到达时,再唤醒等待的进程,继续处理 I/O 数据。每次等待、唤醒都需要切换两次进程上下文。如果在客户端这么做还能够接受,但在高并发服务器上,频繁的上下文切换将严重降低网络吞吐量。为了避免由这种上下文切换带来的资源浪费,I/O 多路复用机制应运而生。

I/O 多路复用是一种用户进程与内核进程之间的特殊协作方式,它允许单个进程处理多个网络连接,而不再局限于一对一的处理方式。以 Linux 平台为例,它支持以下几种路复用方式:

  • POSIX 标准定义的 select 和 poll;
  • Linux 平台相关的 epoll。

其中,epoll 在性能方面的表现最为出色。

本文内容基于 Linux-3.12,不同版本的源码不尽相同,但核心逻辑不变,不影响理解。

epoll 核心原理

如果您没有 epoll 开发经验,可以先阅读下一小节的 epoll 开发示例,以便对 epoll 有一个基本的了解。

epoll_create 创建内核对象

在用户进程调用 epoll_create 时,内核会创建一个 struct eventpoll 类型的内核对象,并把它关联到当前进程已打开的文件列表 fdtable 中:

epoll 内核对象

其详细结构如下:

eventpoll 结构体
  • wq:等待队列链表。当数据就绪触发软中断时,会通过 wq 来寻找阻塞在 epoll 对象上的用户进程;
  • rbr:一颗红黑树,用于管理用户进程添加的所有 TCP 连接,可以实现高效的增、删、查询操作;
  • rdllist:就绪的文件描述符列表。当有连接就绪时,内核会将这些就绪的连接添加到这个链表中,这样用户进程就可以直接获取已就绪的所有事件,而无需像使用 POSIX API 那样遍历所有描述符。

熟悉了主要的数据结构之后,我们再来查看 fs/eventpoll.c 中 epoll_create 的源码实现:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
SYSCALL_DEFINE1(epoll_create1, int, flags)
{
    int error, fd;
	struct eventpoll *ep = NULL;
	struct file *file;
    ...
    // [1] 创建并初始化 eventpoll 对象
    error = ep_alloc(&ep);
    ...
    // [2] 获取一个未使用的文件描述符,作为 epfd
    fd = get_unused_fd_flags(O_RDWR | (flags & O_CLOEXEC));
    ...
    // [3] 创建 epoll 的 file 实例
    //  file 的 f_op 被设置为 eventpoll_fops
    //  file 的 private_data 设置为上边的 eventpoll 对象,即 ep 
    file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep, O_RDWR | (flags & O_CLOEXEC));
    ...
    // [4] 建立 fd 和 file 的关联关系
    fd_install(fd, file);
    return fd;
}
  • [1]:首先调用 ep_alloc 创建一个 eventpoll 结构体,完成等待队列、就绪队列等参数的初始化:
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    
    static int ep_alloc(struct eventpoll **pep)
    {
        struct eventpoll *ep;
        // 申请内存空间
        ep = kzalloc(sizeof(*ep), GFP_KERNEL);
        ...
        // 初始化等待队列头
        init_waitqueue_head(&ep->wq);
        ...
        // 初始化就绪队列
        INIT_LIST_HEAD(&ep->rdllist);
        ...
        // 初始化红黑树
        ep->rbr = RB_ROOT;
        ...
    }
  • [2]:然后,获取一个未使用的文件描述符,作为 epfd
  • [3]:再然后,创建 epoll file 实例,以及匿名 inode 节点和 dentry 等数据结构:
    • epoll 文件的 file->f_op 会被设置为 eventpoll_fops否则将不支持 epoll
       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      
      static const struct file_operations eventpoll_fops = {
      #ifdef CONFIG_PROC_FS
          .show_fdinfo	= ep_show_fdinfo,
      #endif
          .release	= ep_eventpoll_release,
           // poll 指针不为空,表示支持 epoll 操作
           //  因此普通文件不支持 epoll
          .poll		= ep_eventpoll_poll,
          .llseek		= noop_llseek,
      };
    • epoll 文件的 file->private_data 会被设置为参数传入的 eventpoll 变量 (ep);
  • [4]:最后建立 fdfile 之间的关联关系。

epoll_ctl 添加 socket

当用户进程调用 epoll_ctl 向 epoll 添加 socket 时,内核会做以下三件事情:

  1. 初始化一个红黑树节点对象 epitem
  2. 将监听的事件添加到 socket 的等待队列中,其回调函数是 ep_poll_callback
  3. epitem 节点插入 epoll 对象的红黑树。

源码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
		struct epoll_event __user *, event)
{
	struct fd f, tf;
	struct eventpoll *ep;
	struct epitem *epi;
	struct epoll_event epds;
	...
    // [1] 根据 epfd 找到其封装了 file 的 fd 内核对象
	f = fdget(epfd);
	// 根据需要被监听 socket 文件描述符,找到其封装了 file 的 fd 内核对象
	tf = fdget(fd);
	/* 确认被监听的 tf 支持 epoll,即文件的 fops 为 struct file_operations */
	if (!tf.file->f_op || !tf.file->f_op->poll) 
    ...

	// [2] 找到 epfd 关联的 eventpoll 内核对象
	ep = f.file->private_data;
    ...
	switch (op) {
	case EPOLL_CTL_ADD:
		if (!epi) { // 新增操作,需要确保 epitem 不存在
			epds.events |= POLLERR | POLLHUP;
    // [3] 完成 socket 注册操作
			error = ep_insert(ep, &epds, tf.file, fd);
		} else ...
		clear_tfile_check_list();
		break;
	...
	}
	return error;
}

epoll_ctl 首先根据文件描述符,找到被监听 fd 关联的内核文件对象 tf.file,以及 epfd 关联的 epollevent,然后调用 ep_insert 完成注册操作:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
static int ep_insert(struct eventpoll *ep, struct epoll_event *event,
		     struct file *tfile, int fd)
{
    // [1] 初始化一个 epitem
	...
	struct epitem *epi;
    struct ep_pqueue epq;
	...
	if (!(epi = kmem_cache_alloc(epi_cache, GFP_KERNEL)))
		return -ENOMEM;
	/* Item initialization follow here ... */
	...
	INIT_LIST_HEAD(&epi->pwqlist);
	epi->ep = ep;
    // epitem->fdd 中存储了被监听 socket 的 fd 描述符和 struct file 对象
	ep_set_ffd(&epi->ffd, tfile, fd);
	...
	epq.epi = epi;
    // [2] 初始化 socket 等待队列,将回调函数 ep_poll_callback 设置为数据就绪时的回调
	init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
	revents = ep_item_poll(epi, &epq.pt);
    ...
    // [3] 插入红黑树
	ep_rbtree_insert(ep, epi);
	...
error_xxx:
    // 各种错误处理
	return error;
}
  • [1]:初始化一个 epitemepitem 中的 fdd 存储了被监听 socket 的 fd 描述符和 struct file 对象地址;
  • [2]:调用 init_poll_funcptr() 函数,将 ep_ptable_queue_proc() 函数注册为 poll 模式的回调,在 ep_ptable_queue_proc() 里再把 ep_poll_callback() 设置为监听事件触发时真正的回调;
  • [3]:将这个 epitem 插入红黑树。

接下来,我们将对这三个步骤进行详细分析。

分配并初始化 epitem

对于每一个 socket,调用 epoll_ctl 注册事件时,都会为其创建一个 epitem 对象,该结构体的数据结构如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
struct epitem {
	// 红黑树节点
	struct rb_node rbn;
	// 被监听 socket 的文件描述符信息
	struct epoll_filefd ffd;
    // 所属的 eventpoll 对象
	struct eventpoll *ep;
    // 等待队列头节点
    struct list_head pwqlist;
    ...
};

初始化完成后的 epitem 关联关系如下:

epitem 内核数据结构关系

到这一步,epitem 已经实现了 eventpoll 文件和被监听 socket 文件的关联

设置 socket 等待队列

创建并初始化 epitem 后,紧接着就是初始化 socket 对象上的等待队列,并把 fs/eventpoll.c 中的 ep_epoll_callback() 设置为数据就绪时的回调函数。这里的实现比较复杂,是通过 poll 模式完成的,首先来看 ep_item_poll

1
2
3
4
5
static inline unsigned int ep_item_poll(struct epitem *epi, poll_table *pt)
{
	pt->_key = epi->event.events;
	return epi->ffd.file->f_op->poll(epi->ffd.file, pt) & epi->event.events;
}

这里调用到了 socket (epi->ffd) 下的 file->f_op->poll(),该函数指针指向的是 net/socket.c 中的 sock_poll()

1
2
3
4
5
static unsigned int sock_poll(struct file *file, poll_table *wait)
{
    ...
    return busy_flag | sock->ops->poll(file, sock, wait);
}

其中的 sock->ops->poll() 又指向 tcp_poll()

1
2
3
4
5
6
7
8
unsigned int tcp_poll(struct file *file, poll_table *wait)
{
    ...
    struct sock *sk = sock->sk;
    ...
    sock_poll_wait(file, sk_sleep(sk), wait);
    ...
}

在调用 sock_poll_wait() 之前,先在第二个参数这里调用了 sk_sleep() 函数。sk_sleep() 函数会返回 sock 对象里等待队列的头节点 wait_queue_head_t,稍后等待队列项将插入这里,实现如下:

1
2
3
4
5
static inline wait_queue_head_t *sk_sleep(struct sock *sk)
{
	BUILD_BUG_ON(offsetof(struct socket_wq, wait) != 0);
	return &rcu_dereference_raw(sk->sk_wq)->wait;
}
注意
这里的等待队列是 socket 中的,而非 epoll 的等待队列。

接着真正进入 include/net/sock.h 中的 sock_poll_wait() 函数:

1
2
3
4
5
6
7
static inline void sock_poll_wait(struct file *filp,
		wait_queue_head_t *wait_address, poll_table *p)
{
    ...
    poll_wait(filp, wait_address, p);
    ...
}

其核心实现是 include/net/poll.h 中的 poll_wait() 函数:

1
2
3
4
5
static inline void poll_wait(struct file *filp,
        wait_queue_head_t *wait_address, poll_table *p) {
    if (p && p->_qproc && wait_address)
        p->_qproc(filp, wait_address, p)
}

_qproc 函数指针在上文 ep_insert 函数执行时会被 init_poll_funcptr 设置为 ep_ptable_queue_proc()到这里总算进入了重点

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,
				 poll_table *pt)
{
    // 根据地址拿到参数 pt 指针下边的 epitem,也就是上边创建的那个 epitem
	struct epitem *epi = ep_item_from_epqueue(pt);
	struct eppoll_entry *pwq;
    // [1] 分配一个 eppoll_entry 结构体
	if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) {
        // [2] 将 eppoll_entry->wait->func 指向 ep_poll_callback
		init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);
        // [3] 完成最终关联
		pwq->whead = whead; // whead 为 socket 等待队列头
		pwq->base = epi; // base 指向 epitem
		add_wait_queue(whead, &pwq->wait);
		...
	}...
}
  • [1]:首先分配一个 eppoll_entry 结构体,该结构体负责关联 epitem、事件就绪回调函数与 socket 等待队列:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    
    struct eppoll_entry {
        ...
        // 指向 epitem
        struct epitem *base;
        // 指向回调函数
        wait_queue_t wait;
        // 指向被监听的 socket 等待队列
        wait_queue_head_t *whead;
    };
  • [2]:调用 init_waitqueue_func_entry()eppoll_entry->wait->func 指向 ep_poll_callback()
    1
    2
    3
    4
    5
    6
    7
    
    static inline void init_waitqueue_func_entry(wait_queue_t *q,
                        wait_queue_func_t func)
    {
        q->flags = 0;
        q->private = NULL;
        q->func = func;
    }
    注意
    正常情况下,q->private 指向当前用户进程,当 socket 有数据到达时唤醒用户进程,但这里的 socket 是交给 epoll 管理的,因此 q->private 会被设置为 NULL。
  • [3]:完成最终关联。这里首先用 eppoll_entry->whead 缓存当前 socket 等待队列的头节点,然后将 eppoll_entry->base 指向传入的 epitem,最后调用 add_wait_queue()eppoll_entry->wait 移动到 socket 等待队列头节点,完成最终关联。

到这一步,相关的结构体之间的关系如下:

epitem 内核数据结构关系

插入红黑树

完成上述两个步骤后,将这个 epitem 插入红黑树,核心流程就完成了。红黑树使得 epoll 在查找效率、插入效率、内存开销等多个方面都更加均衡。

最终,这些内核数据结构的最终关系如下图所示:

epoll 内核数据结构关系

epoll_wait 等待事件就绪

epoll_wait 的逻辑非常简单:当调用它时,它会检查 eventpoll->rdllist 链表,查看其中是否有准备好的 socket,如果没有,它会创建一个等待项并将其加入到 eventpoll->wq 等待队列中,然后将自己阻塞。如下图所示:

epoll_wait 流程

其源码实现如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
		   int maxevents, long timeout)
{
    ...
	wait_queue_t wait;
    ...
fetch_events:
    // [1] 判断就绪队列上是否有事件就绪
	if (!ep_events_available(ep)) {
		// [2] 未就绪,则定义等待事件项并关联当前进程
		init_waitqueue_entry(&wait, current);
        // [3] 将等待事件项添加到 epoll 的等待队列上
        __add_wait_queue_exclusive(&ep->wq, &wait);
		for (;;) {
			...
            // [4] 让出 CPU,阻塞直到超时或触发软中断
			if (!schedule_hrtimeout_range(to, slack, HRTIMER_MODE_ABS))
				timed_out = 1;
    ...
    // [5] 判断是否存在就绪事件
    eavail = ep_events_available(ep);
    ...
    if (!res && eavail &&
        // [6] 向用户态发送就绪事件,就绪的事件会被转储到 events 数组
	    !(res = ep_send_events(ep, events, maxevents)) && !timed_out)
		goto fetch_events;

	return res;
}
  • [1]:调用 ep_events_available() 判断就绪队列上是否存在已经就绪的 socket,这个队列上的数据是由上文注册的回调 ep_poll_callback() 插入的;
  • [2]:如果没有就绪事件,就调用 init_waitqueue_entry() 创建一个 epoll 等待队列项 wait_queue_t,同时把当前进程挂在 wait_queue_t->private 上;
  • [3]:将创建的 wait_queue_t 添加到 eventpoll->wq 上;
  • [4]:让出 CPU,阻塞直到超时或数据就绪发送中断;
  • [5]:判断是否存在就绪事件;
  • [6]:当前进程被唤醒,且存在就绪事件时,调用 ep_send_events 将绪的事件转储到用户态 events 数组。

    ep_send_events 方法内部会调用 __copy_to_user 将数据复制到用户空间。

事件就绪时的唤醒过程

在前边执行 epoll_ctl 监听事件时,内核为每个 socket 都添加了一个注册了 ep_poll_callback() 回调的等待队列项。在调用 epoll_wait 陷入阻塞前,又在 eventpoll 对象的等待队列中添加了指向当前多路复用线程的等待项。

事件就绪时的唤醒过程
  • 在 socket 的等待队列中,其回调函数是 ep_poll_callback(),另外其 private 不需要唤醒进程,所以指向了 NULL;
  • eventpoll 的等待队列中,其回调函数是 default_wake_function(),其 private 指针指向等待事件就绪的用户进程,即控制 epoll 的多路复用进程。

当 socket 有数据到达时,内核会查找 socket 等待队列元素上的回调函数,并交给系统的软中断去调用,ep_poll_callback() 的实现如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key)
{
	...
    // [1] 获取 wait 节点关联的 epitem
	struct epitem *epi = ep_item_from_wait(wait);
    // [2] 获取 epitem 所属的 eventpoll
	struct eventpoll *ep = epi->ep;
    ...
    // [3] 将当前 epitem 添加到 eventpoll 的就绪队列
	if (!ep_is_linked(&epi->rdllink)) {
		list_add_tail(&epi->rdllink, &ep->rdllist);
		ep_pm_stay_awake_rcu(epi);
	}
    // [4] 查看 eventpoll 的等待队列上是否有等待的进程
	if (waitqueue_active(&ep->wq))
		wake_up_locked(&ep->wq); // 如果有,则唤醒
	...
	return 1;
}
  • [1]:获取就绪的 wait 节点所关联的 epitem
  • [2]:获取 epitem 所属的 eventpoll
  • [3]:将 epitem 添加到 eventpoll 的就绪队列;
  • [4]:调用 default_wake_function() 唤醒调用 epoll_wait 的进程。

一旦该线程被唤醒,便会使用上文介绍的 ep_send_events() 函数,将就绪队列的数据转储到用户态的 events 数组中,以供用户态进程 直接 使用。

epoll 开发示例

epoll 常用接口

epoll 有以下三个常用接口:

  • int epoll_create(int size):创建一个 epoll 句柄,其中的 size 参数用于通知内核这个监听的 socket 文件描述符数目总共有多大,需要传递的是待监听的最大 fd 值 +1。

    注意:一旦 epoll 句柄被成功创建,它就会占用一个文件描述符,我们可以通过查看路径 /proc/#{process_id}/fd/ 找到这个 fd。因此,在使用完 epoll 后,务必要调用 close() 函数来关闭它,以免占用文件描述符。

  • int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event):epoll 的事件注册函数,负责注册要监听的事件类型。
    • 第一个参数使用 epoll_create() 的返回值;

    • 第二个参数表示动作,用三个宏来表示:

      • EPOLL_CTL_ADD:注册新的 socket fd 到 epfd 中;
      • EPOLL_CTL_MOD:修改已经注册的 socket fd 的监听事件;
      • EPOLL_CTL_DEL:从 epfd 中删除一个 socket fd;
    • 第三个参数是需要监听的 socket fd;

    • 第四个参数是告诉内核需要监听什么事件,struct epoll_event 结构如下:

      1
      2
      3
      4
      
      struct epoll_event {
          __poll_t events; //表示事件类型
          __u64 data;  // 用户数据
      } EPOLL_PACKED;

      events 可以是多个宏的组合,常用的宏如下:

      • EPOLLIN:表示文件描述符上有数据可读,即输入事件;
      • EPOLLPRI:表示有紧急数据可读,通常是带外数据 (out-of-band data);
      • EPOLLOUT:表示文件描述符可写,即输出事件;
      • EPOLLERR:表示对应的文件描述符发生错误,例如连接错误;
      • EPOLLHUP:表示对应的文件描述符被挂起;
      • EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个 socket,需要重新把它加入 epoll 队列;
      • EPOLLET:将 epoll 设为边缘触发 (Edge Triggered) 模式,这是相对于水平触发 (Level Triggered) 来说的。
  • int epoll_wait(int epfd, struct epoll_event __user *events, int maxevents, int timeout):等待事件的产生。
    • 参数 events 用来从内核得到事件的集合;
    • 参数 maxevents 告之内核这个 events 有多大,这个 maxevents 的值不能大于创建 epoll_create() 时的 size
    • 参数 timeout 是超时时间,单位毫秒:
      • 0 会立即返回;
      • 如果值小于 0 将不确定,也有说法说是永久阻塞。
    • 该函数的返回值为需要处理的事件数目,如返回 0 表示已超时。

epoll 工作模式

epoll 对文件描述符的操作有两种模式:边缘触发 (Edge-Triggered) 和水平触发 (Level-Triggered)。

  • 边缘触发(Edge-Triggered)模式:

    在边缘触发模式下,epoll 仅在文件描述符状态发生变化时通知应用程序,而不会反复通知相同的事件。也就是说,如果一个文件描述符从未就绪变为就绪状态,或者从就绪状态变为未就绪状态,epoll 将会通知应用程序。因此,应用程序需要一次性将所有可读或可写的数据处理完,以免错过状态变化的通知。该模式适用于需要及时响应文件描述符状态变化的场景。

  • 水平触发(Level-Triggered)模式:

    在水平触发模式下,epoll 会持续通知应用程序文件描述符仍然处于就绪状态,直到应用程序读取或写入操作导致文件描述符不再就绪。该模式适用于需要持续监测文件描述符状态并逐步处理数据的场景。

基于 epoll 的 echo demo 程序

下面我们基于 epoll 编写一个服务端 echo 程序,首先我们声明以下宏以及函数:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// 头文件缺一不可,即便 clion 提示未用到
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <unistd.h>
#include <sys/types.h>

/**
* 宏定义
*/
#define IPADDRESS"127.0.0.1"
#define PORT 8787
#define MAXSIZE 1024
#define LISTENQ 5
#define FDSIZE 1000
#define EPOLLEVENTS 100

/**
* 函数声明
*/
// 创建套接字并进行绑定
static int socket_bind(const char *ip, int port);

// IO 多路复用 epoll
static void do_epoll(int listenfd);

// 事件处理函数
static void handle_events(int epollfd,
                          struct epoll_event *events,
                          int num, int listenfd, char *buf);

// 连接处理器
static void handle_accpet(int epollfd, int listenfd);

// 读处理
static void do_read(int epollfd, int fd, char *buf);

// 写处理
static void do_write(int epollfd, int fd, char *buf);

// 添加事件
static void add_event(int epollfd, int fd, int state);

// 修改事件
static void modify_event(int epollfd, int fd, int state);

// 删除事件
static void delete_event(int epollfd, int fd, int state);

事件注册函数实现

然后着手实现上述函数,首先是增删改事件的相关函数:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 添加要监听的事件
void add_event(int epollfd, int fd, int state) {
    struct epoll_event event;
    event.events = state;
    event.data.fd = fd;
    epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
}

// 修改 fd 监听的事件类型
void modify_event(int epollfd, int fd, int state) {
    struct epoll_event event;
    event.events = state;
    event.data.fd = fd;
    epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &event);
}

// 删除 fd 正在监听的事件
void delete_event(int epollfd, int fd, int state) {
    struct epoll_event event;
    event.events = state;
    event.data.fd = fd;
    epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, &event);
}

这些函数相当重要,负责根据需求操作被监听的 fd 上的事件。

socket_bind

接下来我们实现 TCP 服务端资源初始化函数 socket_bind

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
int socket_bind(const char *ip, int port) {
    int listenfd = socket(AF_INET, SOCK_STREAM, 0);
    if (listenfd < 0) {
        perror("socket error");
        exit(1);
    }
    struct sockaddr_in servaddr;
    bzero(&servaddr, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    inet_pton(AF_INET, ip, &servaddr.sin_addr);
    servaddr.sin_port = htons(port);
    if (bind(listenfd, (struct sockaddr *) &servaddr, sizeof(servaddr))) {
        perror("bind error");
        exit(1);
        return listenfd;
    }
}

socket_bind 函数就是普通的 C 语言 TCP 服务端注册,这里不做过多介绍了。

事件循环器实现

接下来我们实现 do_epoll 函数:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
_Noreturn void do_epoll(int listenfd) {
    int epollfd, ret;
    struct epoll_event events[EPOLLEVENTS];
    char buf[MAXSIZE];
    memset(buf, 0, MAXSIZE);
    //创建一个 epoll 描述符
    epollfd = epoll_create(FDSIZE);
    //监听连接事件
    add_event(epollfd, listenfd, EPOLLIN);
    //循环获取就绪事件
    for (;;) {
        ret = epoll_wait(epollfd, events, EPOLLEVENTS, -1);
        handle_events(epollfd, events, ret, listenfd, buf);
    }
    close(epollfd);
}

do_epoll 是整个程序的核心,内部封装了 epoll 事件处理循环,统筹调度整个程序。该函数首先调用 add_event() 监听连接事件,当有请求到达时通过 handle_events 事件处理器分发相应的事件。

事件处理函数实现

handle_events() 函数实现如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
void handle_events(int epollfd, struct epoll_event *events,
                   int num, int listenfd, char *buf) {
    for (int i = 0; i < num; i++) {
        int fd = events[i].data.fd;
        //分发事件
        if (fd == listenfd && (events[i].events & EPOLLIN))
            handle_accpet(epollfd, listenfd);
        else if (events[i].events & EPOLLIN)
            do_read(epollfd, fd, buf);
        else if (events[i].events & EPOLLOUT)
            do_write(epollfd, fd, buf);
    }
}

handle_events 会遍历就绪列表,然后判断各个就绪事件对应的监听类型,并将其分发给不同的事件处理器处理。各类事件处理器的实现如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
//连接事件处理器
void handle_accpet(int epollfd, int listenfd) {
    int clifd;
    struct sockaddr_in cliaddr;
    socklen_t cliaddrlen;
    clifd = accept(listenfd, (struct sockaddr *) &cliaddr, &cliaddrlen);
    if (clifd < 0)
        perror("accept error");
    else {
        printf("acceot new client:%s:%d\n",
               inet_ntoa(cliaddr.sin_addr),
               cliaddr.sin_port);
        // 监听客户端的发送事件
        add_event(epollfd, clifd, EPOLLIN);
    }
}

// 可读事件处理器
void do_read(int epollfd, int fd, char *buf) {
    // 同步读取数据
    ssize_t nread = read(fd, buf, MAXSIZE);
    if (nread <= 0) {
        if (nread == 0)
            fprintf(stderr, "client close.\n");
        else
            perror("read error");
        close(fd);
        delete_event(epollfd, fd, EPOLLIN);
    } else {
        printf("read msg is: %s", buf);
        // 修改描述符对应的事件,改为监听可写事件
        modify_event(epollfd, fd, EPOLLOUT);
    }
}

//可写事件处理器
void do_write(int epollfd, int fd, char *buf) {
    printf("write msg is: %s", buf);
    ssize_t nwrite = write(fd, buf, strlen(buf));
    if (nwrite < 0) {
        perror("write error");
        close(fd);
        delete_event(epollfd, fd, EPOLLOUT);
    } else {
        // 转为监听可读事件
        modify_event(epollfd, fd, EPOLLIN);
        memset(buf, 0, MAXSIZE);
    }
}

可见,处理器在同步处理完 I/O 事件后,紧接着会根据业务修改要监听的事件。最终,我们在 main 函数中使用上述函数:

1
2
3
4
5
6
int main() {
    int listenfd = socket_bind(IPADDRESS, PORT);
    listen(listenfd, LISTENQ);
    do_epoll(listenfd);
    return 0;
}

main 函数首先调用 socket_bind() 函数初始化 TCP 服务端资源,然后调用 listen() 初始化接收队列并监听端口,最终调用 do_epoll 循环处理事件。

0%