Linux Kernel 网络收包过程详解

不知道大家在畅游网络时有没有好奇过,这些数据是如何到达我们的设备,而我们的设备又是如何解析这些数据的呢?为此,我特意请教了隔壁网卡驱动部门的大佬,探讨了一些相关问题。比如,在没有程序监听端口的情况下,如果有人一直通过 TCP 协议向这个端口发送数据,我的 CPU 负载会不会飙升呢?再比如,面试常问的 DMA 究竟是什么呢?接下来,让我们以 TCP 协议为例,详细了解一下 Linux 内核 3.12 版本中的网络子系统的收包过程。

  • 本文只考虑 ipv4,不考虑 ipv6。
  • 本文涉及到驱动程序的部分,以 intel 的 igb 千兆网卡为例进行讲解。

收包过程总览

ISO 在 1985 年制定了开放式系统互联模型 Open System Interconnect,简称 OSI 模型。OSI 模型也被称作七层模型,它将网络通信分为七层,分别是应用层、表示层、会话层、传输层、网络层、数据链路层和物理层。其中:

  • 物理层是通信数据传输的基础。物理层的职责是为上层屏蔽物理设备和传输媒介,利用传输媒介为通信的两端建立、管理和释放物理连接,实现数据比特流的透明传输,保证比特流正确地传输到对端。物理层的传输媒介包括双绞线、电缆、光纤、无线电波等,传输的数据内容被称作信号。常见的物理层协议有 DSLIDSN10BASE-T 等,这些协议保证了在物理层传输的数据的编排规则。物理层协议分为两类,分别是点对点通信线路物理层协议广播通信线路物理层协议,其中广播通信线路又分为有线通信线路无线通信线路
  • 数据链路层的职责是确保数据按照一定的格式被封装,并且能够可靠、透明地将数据传输到对端,它管理整个数据链路的建立、维持和释放。数据链路层在物理层之上,在数据链路层中传输的数据单元叫作。帧包含地址段数据段等信息,地址段含有发送节点和接收节点的地址(如 MAC),数据段包含实际要传输的数据,数据链路层根据对应的协议重新编排这些信息,让这些信息按照一定的格式组装成帧。除了对物理层的原始数据进行封装,数据链路层还会对数据的传输进行错误检测和纠正,保证数据的传输是可靠的,因为在物理层的媒介上传输的数据难免会受到各种不确定的干扰而产生差错。数据链路层的协议有 FDDIHDLC以太网协议 (Ethernet)点对点协议 (PPP) 等。
  • 网络层的职责是提供路由寻址能力,并且它具备一定的拥塞控制、流量控制能力,使两端能够互联且通过通信子网选择最合适的路径。网络层在数据链路层之上,它将数据链路层的帧转换为数据包,然后通过一系列的寻址和路由算法计算后,确定数据包的传输路径,将数据从一个网络节点发送到下一个节点。网络层的协议包括 IPICMPRIPIGMP 等。
  • 传输层位于网络层之上,它是具备数据传输能力的最上层,它在 OSI 模型中非常重要。每个机器可能有多个进程,为了能够准确地将数据传输到对端的进程,传输层在网络层的基础上增加了端口的概念。除了保证端到端的数据传输,传输层也提供了流量控制、拥塞控制、多路复用、失败重传等能力。在传输层传输的数据单元叫做报文,在这一层的协议有 TCP、UDP、SPX 等。
  • 会话层也是一个连接,是在传输层连接基础上的更高一层抽象,它不关心连接的地址、端口号等信息,一个会话既可以是一对一,也可以是一对多、多对一、多对多的。举个例子:有 A、B、C 三个人,A 给 B 打电话,此时建立了 A 与 B 之间的传输层连接,当 A 与 B 交流完毕后,B 把电话直接交给了 C,C 便可直接与 A 进行会话,而不需要重新建立连接。会话层建立在传输层上,它不关心传输连接的建立细节,只负责会话管理,比如建立会话、维持会话、终止会话等。它保证了会话数据的可靠传输,同时提供一定的会话权限认证能力。
  • 表示层的职责是确保从一端的应用层发送的数据可以被另一端正确读取,它需要处理的是数据的表示问题,比如数据格式处理、数据编解码、协商和建立数据交换格式等。它解决了各应用程序之间在数据格式表示上的差异。除此之外,表示层还提供了数据压缩、解压、加密、解密等数据处理能力。该层的协议有 WEP、WPA 等。
  • 应用层是 OSI 模型的最高层,它与应用程序的关系最为密切,它的职责是为应用程序提供服务,规定应用程序中通信的相关细节,完成用于希望在网络上完成的各种工作,它是各种应用程序和网络之间建立联系的桥梁。此外,该层还负责协调各个应用程序之间的工作。在应用层上可以实现各种服务,比如文件传输服务 (FTP)、远程登陆服务 (Telnet) 等,这些形形色色的服务都有不同的协议,所以应用层的协议十分丰富,比如 HTTP、FTP、NFS、SMTP 等。

从整个 OSI 模型的设计来看,每一层职责明确,每一层管辖的范围也非常清晰,一次通信会逐层处理互不影响,这种分层设计思想可以让开发者更容易理解数据传输过程。但 OSI 模型只是个理论模型,并没有定义实现过程,在实际落地时如果采用七层模型,反而会被其复杂性拖慢定制周期,影响网络协议的发展。所以在后续的实践中,逐渐演化出了四层模型,即 TCP/IP 模型。

TCP/IP 模型将整个网络协议栈分为应用层、传输层、网络层和网络接口层这四个层次:

  • 应用层融合了 OSI 模型中的“应用层+表示层+会话层”;
  • 传输层网络层直接对应 OSI 模型中的“传输层”跟“网络层”;
  • 网络接口层则融合了 OSI 模型中的“数据链路层+物理层”。

在这些层次中,应用层通常涵盖我们日常编写的各种网络程序,而传输层、网络层和链路层则由 Linux 内核和网卡驱动实现。在 Linux 源码中:

  • 网卡驱动对应的逻辑位于 driver/net/ethernet 目录。以 intel 系列网卡为例,其具体驱动位于 driver/net/ethernet/intel 目录下;
  • 协议栈模块的代码分布在 kernel 和 net 目录中。

内核和网卡驱动通过中断来处理数据。当网络数据到达网卡时,会触发 CPU 相应的引脚电平变化,通知 CPU 处理数据。对于网络子系统而言,由于涉及多个调用栈的复杂处理,这一过程十分耗时。如果在硬件中断中完成所有处理,则会过度占用 CPU,因为硬中断具有最高的优先级。因此,Linux 引入了软中断机制,该机制将中断处理过程分成了上、下两半部分。上半部分即硬中断处理过程,在执行必要的处理后立刻释放 CPU;而大部分耗时操作则由下半部分的软中断处理函数来处理。Linux 内核通过 ksoftirqd 进程来处理软中断,该进程会根据特定内存空间的标志位来触发相应的操作。

接下来我们从全局了解一下 Linux 内核收包的过程:

linux-kernel-net-eth-read
  • 当网卡收到数据后,会以 DMA 的方式将收到的帧写到主内存里,再向 CPU 发起一个中断;

    DMA(Direct Memory Access,直接内存访问)技术将系统内存的读写权限移交给了协处理器(如网卡、硬盘控制器等),实现了硬件级别的异步操作。

  • CPU 收到中断信号后,调用网卡驱动注册的中断处理函数。处理函数记录必要的信息后发起软中断请求,然后让出 CPU。
  • ksoftirqd 内核进程检测到软中断信号后,调用 poll 函数轮训收包,最后将数据交由各级协议栈处理。

接下来,我们详细介绍这里边涉及的过程。

初始化过程

网卡驱动、内核协议栈等模块能够接收数据之前,需要做很多准备工作。

创建 ksoftirqd 内核进程

Linux 内核通过 ksoftirqd 进程来处理软中断,这些进程的数量可能会多于一个,具体数量取决于 CPU 个数。

注意
软中断不止有网络类型中断,还有其他类型。Linux 内核在 interrupt.h 中定义了所有的软中断类型。

每个 ksoftifqd 进程对应两个处理函数,ksoftirqd_should_run()run_ksoftirqd()

1
2
3
4
5
6
7
// file: kernel/softirq.c
static struct smp_hotplug_thread softirq_threads = {
	.store			= &ksoftirqd,
	.thread_should_run	= ksoftirqd_should_run,
	.thread_fn		= run_ksoftirqd,
	.thread_comm		= "ksoftirqd/%u",
};

当 ksoftirqd 进程被创建好后,就会进入自己的线程循环函数 ksoftirqd_should_run() 等待中断触发。

网络子系统初始化

Linux 内核通过调用各个子系统的 subsys_initcall 函数完成初始化,网络子系统的初始化过程在 net/core/dev.c#subsys_initcall(net_dev_init) 中,核心实现如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
//file: net/core/dev.c
static int __init net_dev_init(void)
{
  ...
	for_each_possible_cpu(i) {
    	// [1] 为每个 CPU 创建一个 softnet_data 结构体
		struct softnet_data *sd = &per_cpu(softnet_data, i);

		memset(sd, 0, sizeof(*sd));
		skb_queue_head_init(&sd->input_pkt_queue);
		skb_queue_head_init(&sd->process_queue);
		sd->completion_queue = NULL;
    	// [2] 初始化 poll_list,用于等待驱动程序将其 poll 函数添加进来
		INIT_LIST_HEAD(&sd->poll_list);
		...
	}
	...
  	// [3] 注册网络设备可读可写的软中断
	open_softirq(NET_TX_SOFTIRQ, net_tx_action);
	open_softirq(NET_RX_SOFTIRQ, net_rx_action);
	...
}

该函数首先为每个 CPU 创建并初始化了一个 softnet_data 结构体,该结构体包含一个 poll_list,用于缓存所有设备驱动的 poll 函数。然后将中断类型 NET_TX_SOFTIRQ 的处理函数注册为 net_tx_action,将 NET_RX_SOFTIRQ 的处理函数注册为 net_rx_action

继续跟踪 open_softirq() 可知,该函数就是将映射关系存储到了 softirq_vec[] 数组中

1
2
3
4
5
//file: kernel/softirq.c
void open_softirq(int nr, void (*action)(struct softirq_action *))
{
	softirq_vec[nr].action = action;
}

后续 ksoftirqd 线程收到软中断后,只需通过中断类型在数组中寻找对应的处理函数即可。

协议栈注册

内核实现了网络层的 IP 协议和传输层的 TCP、UDP 协议,这些协议对应的接收函数分别为 ip_rcv()tcp_v4_rcv()udp_rcv(),而且这些协议栈与我们日常接触的驱动程序一样,也是以模块形式注册的。fs_initcall() 调用 inet_init() 后开始协议栈注册,inet_init() 最终将这些函数注册到了 inet_protos[]ptype_base[] 映射表中:

linux-kernel-net-stack-init
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
//file: net/ipv4/af_inet.c

// [1] ip 数据包类型
static struct packet_type ip_packet_type __read_mostly = {
	.type = cpu_to_be16(ETH_P_IP),
	.func = ip_rcv,
};

// [2] TCP 协议类型
static const struct net_protocol tcp_protocol = {
	.early_demux	=	tcp_v4_early_demux,
	.handler	=	tcp_v4_rcv, // 重点关注处理函数
	.err_handler	=	tcp_v4_err,
	.no_policy	=	1,
	.netns_ok	=	1,
};

// [3] UDP 协议类型
static const struct net_protocol udp_protocol = {
	.handler =	udp_rcv,  // 重点关注处理函数
	.err_handler =	udp_err,
	.no_policy =	1,
	.netns_ok =	1,
};

// [4] 执行协议栈初始化
static int __init inet_init(void)
{
	...
  // TCP、UDP 初始化
	if (inet_add_protocol(&udp_protocol, IPPROTO_UDP) < 0)
		pr_crit("%s: Cannot add UDP protocol\n", __func__);
	if (inet_add_protocol(&tcp_protocol, IPPROTO_TCP) < 0)
		pr_crit("%s: Cannot add TCP protocol\n", __func__);

  ...
  // IP 初始化
	dev_add_pack(&ip_packet_type);

}

fs_initcall(inet_init);

由上可知:

  • TCP 协议的处理函数是 tcp_v4_rcv()、UDP 协议的处理函数是 udp_rcv(),这两个函数最终随着 tcp_protocoludp_protocolinet_add_protocol() 中被注册到映射表:

    1
    2
    3
    4
    5
    6
    7
    
    // file: net/ipv4/protocol.c
    int inet_add_protocol(const struct net_protocol *prot, unsigned char protocol)
    {
        ...
        return !cmpxchg((const struct net_protocol **)&inet_protos[protocol],
            NULL, prot) ? 0 : -1;
    }

    TCP 与 UDP 对应的 net_protocol 都被注册到了 inet_protos[] 中。

  • IP 协议的处理函数是 ip_rcv(),它在 dev_add_pack() 函数中被注册到映射表:

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    
    // file: net/core/dev.c
    void dev_add_pack(struct packet_type *pt)
    {
        // 返回 ptype_base[] 基地址
        struct list_head *head = ptype_head(pt);
    
        spin_lock(&ptype_lock);
        list_add_rcu(&pt->list, head);
        spin_unlock(&ptype_lock);
    }
    
    static inline struct list_head *ptype_head(const struct packet_type *pt)
    {
        if (pt->type == htons(ETH_P_ALL))
            return &ptype_all;
        else
            return &ptype_base[ntohs(pt->type) & PTYPE_HASH_MASK];
    }

    ip_packet_type 结构体中的 type 是协议名,funcip_rcv() 函数,最终该结构体在 dev_add_pack() 中被注册到了映射表 ptype_base[] 中。

网卡驱动初始化

有过驱动开发的同学都知道,网卡驱动会通过 module_init 系统调用向内核注册一个初始化函数,当驱动程序被加载时,内核就会调用这个注册好的函数完成网卡的初始化。比如 igb 网卡驱动的代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
// file: drivers/net/ethernet/intel/igb/igb_main.c

// [1] 函数指针
static struct pci_driver igb_driver = {
	.name     = igb_driver_name,
	.id_table = igb_pci_tbl,
	.probe    = igb_probe,
	.remove   = igb_remove,
    ...
};
// [2] 初始化驱动模块
static int __init igb_init_module(void)
{
	...
	ret = pci_register_driver(&igb_driver);
	return ret;
}

驱动的 pci_register_driver() 执行完后,内核就知道了该驱动的相关信息,比如各种处理函数的地址等。当网卡被识别后,驱动会回调其 probe 函数指针,即 igb_probe(),让硬件设备处于就绪状态。详细操作如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// file: drivers/net/ethernet/intel/igb/igb_main.c
static int igb_probe(struct pci_dev *pdev, const struct pci_device_id *ent)
{
	...
    // [1] PCI 总线初始化
	err = pci_enable_device_mem(pdev);
	...
    // [2] DMA 初始化
	err = dma_set_mask(&pdev->dev, DMA_BIT_MASK(64));
	if (!err) {
		err = dma_set_coherent_mask(&pdev->dev, DMA_BIT_MASK(64));
		if (!err)
			pci_using_dac = 1;
	} ...
    ...
    // [3] 注册网络设备函数实现
	netdev->netdev_ops = &igb_netdev_ops;
    // [4] 注册 nettool 工具的实现函数
	igb_set_ethtool_ops(netdev);
    ...
	// [5] 初始化 NAPI,注册 poll 函数
	err = igb_sw_init(adapter);
	...
}
  • 上述操作的第三步中,完成了网卡回调函数的注册:

    1
    2
    3
    4
    5
    6
    
    // file: drivers/net/ethernet/intel/igb/igb_main.c
    static const struct net_device_ops igb_netdev_ops = {
        .ndo_open		= igb_open,
        .ndo_stop		= igb_close,
        ...
    }

    该结构体中设置了启动网卡、关闭网卡等函数的地址。

  • 该函数的第四步中,设置了 nettool 工具的命令回调函数:

    1
    2
    3
    4
    5
    6
    7
    8
    
    // file: drivers/net/ethernet/intel/igb/igb_ethtool.c
    static const struct ethtool_ops igb_ethtool_ops = {
        .get_settings		= igb_get_settings,
        .set_settings		= igb_set_settings,
        .get_drvinfo		= igb_get_drvinfo,
        .get_regs_len		= igb_get_regs_len,
        ...
    }

    因此,当我们调用 netstate 等命令查询信息时,调用的就是网卡驱动里相应的实现。

  • 该函数的第五步,完成了驱动程序 poll 函数的注册,调用链路如下:igb_sw_init -> igb_init_interrupt_scheme -> igb_alloc_q_vectors -> igb_alloc_q_vector,核心实现如下:

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    
    // file: drivers/net/ethernet/intel/igb/igb_main.c
    static int igb_alloc_q_vector(struct igb_adapter *adapter,
                    int v_count, int v_idx,
                    int txr_count, int txr_idx,
                    int rxr_count, int rxr_idx)
    {
        ...
        /* initialize NAPI */
        netif_napi_add(adapter->netdev, &q_vector->napi,
                igb_poll, 64);
    
        return 0;
    }

    这里 igb 网卡将 igb_poll() 注册为了 poll 函数,且从函数名可以看出,igb 网卡实现了对 NAPI 机制的支持。

    NAPI 机制

    NAPI(New API)是 Linux 内核中的一种机制,它通过减少中断的数量和批量处理网络数据包来优化网络性能。其工作原理如下:

    • 中断触发:网卡接收到数据包,触发中断。
    • 禁用中断:NAPI 禁用进一步的中断,以防止频繁的中断处理。
    • 加入 NAPI 队列:网卡驱动程序将后续接收的数据包放入 NAPI 处理队列。
    • 轮询处理:内核线程开始轮询处理 NAPI 队列中的数据包,直到所有数据包都被处理完毕或达到轮询处理的最大次数。
    • 重新启用中断:处理完成后,重新启用中断,开始下一批数据的处理。

启动网卡

上述初始化过程完成后,就可以启动网卡了。这里会调用 igb_netdev_ops.ndo_open 指向的函数完成初始化,即 igb_open

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
static int igb_open(struct net_device *netdev)
{
	return __igb_open(netdev, false);
}
static int __igb_open(struct net_device *netdev, bool resuming)
{
	...

	// [1] 分配 tx、rx 内存队列
	err = igb_setup_all_tx_resources(adapter);
	err = igb_setup_all_rx_resources(adapter);
	...
	// [2] 注册硬中断处理函数
	igb_configure(adapter);
	err = igb_request_irq(adapter);
	...
	// [3] 启用 NAPI
	for (i = 0; i < adapter->num_q_vectors; i++)
		napi_enable(&(adapter->q_vector[i]->napi));
    // [4] 使能中断
	igb_irq_enable(adapter);
    ...
}
  • 首先我们关注 igb_setup_all_rx_resources,该函数内部完成了环形缓冲区 RingBuffer 的初始化:

    1
    2
    3
    4
    5
    6
    7
    8
    
    // file: drivers/net/ethernet/intel/igb/igb_main.c
    static int igb_setup_all_rx_resources(struct igb_adapter *adapter)
    {...
        for (i = 0; i < adapter->num_rx_queues; i++) {
            err = igb_setup_rx_resources(adapter->rx_ring[i]);
            ...
        }
    }

    这里在循环中创建了若干个 rx_ring

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    
    int igb_setup_rx_resources(struct igb_ring *rx_ring)
    {
        struct device *dev = rx_ring->dev;
        int size;
        // [1] 内存申请
        size = sizeof(struct igb_rx_buffer) * rx_ring->count;
        rx_ring->rx_buffer_info = vzalloc(size);
        ...
    
        // [2] 初始化千兆网卡 e1000_adv_rx_desc 的 DMA 数组内存
        rx_ring->size = rx_ring->count * sizeof(union e1000_adv_rx_desc);
        rx_ring->size = ALIGN(rx_ring->size, 4096);
    
        rx_ring->desc = dma_alloc_coherent(dev, rx_ring->size,
                        &rx_ring->dma, GFP_KERNEL);
        ...
        // [3] 初始化队列成员
        rx_ring->next_to_alloc = 0;
        rx_ring->next_to_clean = 0;
        rx_ring->next_to_use = 0;
        ...
    }

    因此,每个 rx_ring 接收队列内部包含两个环形队列数组linux-kernel-net-eth-ringbuffer

    • 一个是通过 vzalloc 申请的位于内核中的 rx_ring->rx_buffer_info
    • 一个是通过 dma_alloc_coherent 申请的位于网卡中的 rx_ring->desc
  • 接着我们再来看中断处理程序的注册:

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    
    static int igb_request_irq(struct igb_adapter *adapter)
    {
        ...
    	// 使用 PCIE 的 MSI-X 中断模式
        if (adapter->msix_entries) {
            err = igb_request_msix(adapter);
        }
        ...
    }
    
    tatic int igb_request_msix(struct igb_adapter *adapter)
    {
        ...
        for (i = 0; i < adapter->num_q_vectors; i++) {
            ...
            err = request_irq(adapter->msix_entries[vector].vector,
                    igb_msix_ring, 0, q_vector->name,
                    q_vector);...
        } ...
    }

    可见,在 PCIE 的 MSI-X 模式下,igb_request_msix() 为多队列网卡中的每个队列都注册了一个中断处理函数 igb_msix_ring()

    MSI/MSI-X 机制

    MSI/MSI-X 机制的引入解决了传统基于中断线(Line-based Interrupt)机制的种种限制,包括:

    1. 无需经过 I/O APIC 转发中断,直接通过 PCI/PCIe 内存写事务向 CPU 发送中断,提高了效率。
    2. 每个 PCI Function 可以支持分配多个中断向量,满足同一设备有多个不同中断请求的需求。当分配多个中断向量给同一个 PCI Function 时,提供按中断向量进行屏蔽的功能,更加灵活。

    MSI(Message Signaled Interrupts)是一种通过在内存中写入信息来产生中断的方式,其中内存地址由设备驱动程序和硬件设备协商确定。与传统的中断线不同,MSI 不需要单独的中断线,而是通过 PCI/PCIe 总线进行通信。

    MSI-X(Extended Message Signaled Interrupts)是 MSI 的扩展版本,提供了更多的功能和灵活性。它的主要特点包括:

    • 更多的中断向量:MSI-X 可以支持更多的中断向量(最多 2048 个),相比于 MSI 的 32 个,提供了更大的扩展性。
    • 独立的中断配置表:每个中断向量都有自己独立的配置表,可以独立配置每个向量的消息和目标地址,提高了中断处理的灵活性。
    • 更好的负载均衡:由于有更多的中断向量,可以更好地在多个处理器之间分配中断负载,提高系统的并行处理能力。

网卡驱动启动完毕后,就可以接收网络数据包了。

开始接收数据

处理硬中断

首先,当数据帧到达网卡时,第一站就是网卡内部的接收队列 rx_ring->desc,然后网卡协处理器再将 DMA 内存中对应地址的数据拷贝到与之关联的内核队列 rx_ring->rx_buffer_info 中,整个过程都不需要 CPU 参与。

linux-kernel-net-eth-hirq

当 DMA 操作完成后,网卡会向与内存队列关联的 CPU 发起一个硬中断,通知 CPU 有数据到达。中断处理函数为 igb_msix_ring

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
// file: drivers/net/ethernet/intel/igb/igb_main.c
static irqreturn_t igb_msix_ring(int irq, void *data)
{
	struct igb_q_vector *q_vector = data;

	// 写一个寄存器
	igb_write_itr(q_vector);
    // 发送软中断
	napi_schedule(&q_vector->napi);

	return IRQ_HANDLED;
}

其中,napi_schedule 最终调用了 __napi_schedule

1
2
3
4
5
6
7
// file: net/core/dev.c
static inline void ____napi_schedule(struct softnet_data *sd,
				     struct napi_struct *napi)
{
	list_add_tail(&napi->poll_list, &sd->poll_list);
	__raise_softirq_irqoff(NET_RX_SOFTIRQ);
}

可见,这里的硬中断就是将 napi_struct->poll_list 中新到达的帧队列追加到了 CPU 的 softnet_data->poll_list,而 softnet_data->poll_list 是一个双向队列,里边缓存了网络设备中等待被处理的帧。紧接着调用 __raise_softirq_irqoff(NET_RX_SOFTIRQ) 发出了“网络设备可读”的软中断:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
// file: kernel/softirq.c
void __raise_softirq_irqoff(unsigned int nr)
{
	...
	or_softirq_pending(1UL << nr);
}

// file: include/linux/interrupt.h
#define or_softirq_pending(x)  (local_softirq_pending() |= (x))

// file: include/linux/irq_cpustat.h
// 获取 smp_processor_id 对应的中断类型
#define local_softirq_pending() \
	__IRQ_STAT(smp_processor_id(), __softirq_pending)

所谓的软中断,其实就是对某个内存变量做了一次异或运算。

ksoftirqd 处理软中断

通过上文介绍我们知道,CPU 在网卡硬中断过程中仅记录了一个寄存器,并修改了 CPU 的 poll_list,然后就触发软中断退出了,剩余的网络收包处理过程都在软中断中完成。软中断处理数据的整体流程如下:

linux-kernel-net-eth-sirq

上文介绍初始化阶段时讲过,ksoftirqd 会启动一个循环线程函数 ksoftirqd_should_run() 来判断是否触发了软中断:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
// file: kernel/softirq.c
static int ksoftirqd_should_run(unsigned int cpu)
{
	return local_softirq_pending();
}

// file: include/linux/irq_cpustat.h
// 获取 smp_processor_id 对应的中断类型
#define local_softirq_pending() \
	__IRQ_STAT(smp_processor_id(), __softirq_pending)

可见,“判断是否触发软中断”的过程与“触发软中断”过程都调用了 local_softirq_pending(),区别在于一个是为了写入标记,一个是为了读取标记。因此,如果硬中断处理函数中设置了 NET_RX_SOFTIRQ 标志位,这里就能读取到相应的事件。接下来就会进入真正的内核线程函数 run_ksoftirqd() 中去处理软中断:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
static void run_ksoftirqd(unsigned int cpu)
{
	local_irq_disable();
	if (local_softirq_pending()) {
		__do_softirq();
		...
		return;
	}
	local_irq_enable();
}

__do_softirq() 函数会根据当前 CPU 的软中断类型,调用对应的 action 函数:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
asmlinkage void __do_softirq(void)
{
    ...
    do {
		if (pending & 1) {
			unsigned int vec_nr = h - softirq_vec;
			int prev_count = preempt_count();

			kstat_incr_softirqs_this_cpu(vec_nr);

			trace_softirq_entry(vec_nr);
			h->action(h); // 执行动作
			trace_softirq_exit(vec_nr);
			...
		}
		h++;
		pending >>= 1;
	} while (pending);
    ...
}

硬中断与软中断都会调用 local_softirq_pending,而 local_softirq_pending 获取的是 smp_processor_id 对应的中断,因此只要硬中断在哪个 CPU 上被响应,软中断也一定会在同一个 CPU 上被处理。

因此,想要让软中断分散在不同 CPU 上处理,需要调整的是硬中断的 CPU 亲和性。

这里我们接着看网络收包软中断的函数实现 net_rx_action()

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// file: net/core/dev.c
static void net_rx_action(struct softirq_action *h)
{
	struct softnet_data *sd = &__get_cpu_var(softnet_data);
	unsigned long time_limit = jiffies + 2;
	int budget = netdev_budget;
	void *have;
    // [1] 关闭硬中断,避免重复触发(硬件上也会做相应的保证)
	local_irq_disable();
    // [2] 遍历 poll_list,处理就绪的设备
	while (!list_empty(&sd->poll_list)) {
        struct napi_struct *n;
        int work, weight;
		...
		n = list_first_entry(&sd->poll_list, struct napi_struct, poll_list);
		...
		work = 0;
		if (test_bit(NAPI_STATE_SCHED, &n->state)) {
            // [3] 最终,执行网卡的 poll 函数,处理数据
			work = n->poll(n, weight);
			trace_napi_poll(n);
		}
		budget -= work;
		...
	}...
}

该函数的核心是对网卡 poll 函数的调用,我们依旧以 igb 网卡为例,igb_poll() 的实现如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
// file: drivers/net/ethernet/intel/igb/igb_main.c
static int igb_poll(struct napi_struct *napi, int budget)
{
	...
	if (q_vector->tx.ring)
		clean_complete = igb_clean_tx_irq(q_vector);

	if (q_vector->rx.ring)
		clean_complete &= igb_clean_rx_irq(q_vector, budget);

    ...
}

网卡读取过程中的重点工作是对 igb_clean_rx_irq() 的调用:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// file: drivers/net/ethernet/intel/igb/igb_main.c
static bool igb_clean_rx_irq(struct igb_q_vector *q_vector, const int budget)
{
	...
	do {
		...
		// [1] 把数据帧从内核内存中的 RingBuffer 取下
		skb = igb_fetch_rx_buffer(rx_ring, rx_desc, skb);
		...
		/* fetch next buffer in frame if non-eop */
		if (igb_is_non_eop(rx_ring, rx_desc))
			continue;
		/* verify the packet layout is correct */
		if (igb_cleanup_headers(rx_ring, rx_desc, skb)) {
			skb = NULL;
			continue;
		}
		...
		// [2] 数据帧校验,设置其 timestamp、VLAN id、protocol 等字段
		igb_process_skb_fields(rx_ring, rx_desc, skb);
        // [3] 移交给 IP 协议栈
		napi_gro_receive(&q_vector->napi, skb);
		...
	} while (likely(total_packets < budget));
    ... // 读取偏移量更新
}

该函数首先通过 igb_is_non_eop()igb_is_non_eop() 将数据帧从 RingBuffer 上取下,然后调用 napi_gro_receive 进一步处理数据:

1
2
3
4
5
6
7
// file: net/core/dev.c
gro_result_t napi_gro_receive(struct napi_struct *napi, struct sk_buff *skb)
{
	skb_gro_reset_offset(skb);

	return napi_skb_finish(dev_gro_receive(napi, skb), skb);
}

这里的 gro 指的是网卡的 GRO 特性:

GRO 机制

GRO(Generic Receive Offload)是一种 Linux 内核机制,用于提高网络数据包接收的效率。它通过将多个小数据包聚合成一个大数据包进行处理,减少了 CPU 的处理负载和网络栈的开销,从而提高了网络吞吐量。

GRO 的基本思想是将多个属于同一流的数据包聚合在一起,在传递到协议栈上层之前尽可能减少数据包的数量。以下是 GRO 的主要工作流程:

  • 数据包接收:网络接口卡(NIC)接收到数据包,并将其传递给网络栈的驱动层。
  • 数据包聚合:在驱动层,GRO 机制检查每个接收到的数据包,并判断它们是否可以与之前的包聚合在一起。这通常基于数据包的源地址、目的地址、端口号和协议类型等信息。
  • 形成大数据包:如果数据包可以聚合,GRO 将这些小包合并成一个大的数据包。这个大数据包包含了所有小包的有效负载,且保留了每个小包的头部信息。
  • 传递给上层协议栈:聚合后的大数据包被传递到网络协议栈的上层(如 TCP/IP 栈)进行进一步处理。

我们暂且忽略 GRO,直接看 napi_skb_finish() 实现:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
// file: net/core/dev.c
static gro_result_t napi_skb_finish(gro_result_t ret, struct sk_buff *skb)
{
	switch (ret) {
	case GRO_NORMAL:
		if (netif_receive_skb(skb))
			ret = GRO_DROP;
		break;
	    ...
	}

	return ret;
}

正常情况下,该函数会调用 netif_receive_skb() 将数据包送给网络协议栈处理。

网络协议栈处理

netif_receive_skb() 函数会根据数据包的协议(例如 IP 协议、ARP 协议等),将数据包交给特定的协议处理函数去处理:

linux-kernel-net-stack-rcv
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// file: include/linux/rculist.h
#define list_for_each_entry_rcu(pos, head, member) \
	for (pos = list_entry_rcu((head)->next, typeof(*pos), member); \
		&pos->member != (head); \
		pos = list_entry_rcu(pos->member.next, typeof(*pos), member))

// file: net/core/dev.c
int netif_receive_skb(struct sk_buff *skb)
{
	...
	return __netif_receive_skb(skb);
}
static int __netif_receive_skb(struct sk_buff *skb)
{
	...
		ret = __netif_receive_skb_core(skb, false);
}
static int __netif_receive_skb_core(struct sk_buff *skb, bool pfmemalloc)
{
	...
	// [1] pcap 逻辑
	list_for_each_entry_rcu(ptype, &ptype_all, list) {
		// 非设备虚拟协议,常用于 tcpdump
		if (!ptype->dev || ptype->dev == skb->dev) {
			if (pt_prev)
				ret = deliver_skb(skb, pt_prev, orig_dev);
			pt_prev = ptype;
		}
	}
	...
	// [2] 取出 protocol,根据协议类型调用处理器
	type = skb->protocol;
	list_for_each_entry_rcu(ptype,
			&ptype_base[ntohs(type) & PTYPE_HASH_MASK], list) {
		if (ptype->type == type &&
		    (ptype->dev == null_or_dev || ptype->dev == skb->dev ||
		     ptype->dev == orig_dev)) {
			if (pt_prev)
				ret = deliver_skb(skb, pt_prev, orig_dev);
			pt_prev = ptype;
		}
	}
}

所以,__netif_receive_skb_core() 函数首先遍历所有协议,然后交给 tcpdump 挂载到 ptype_all 虚拟协议上的处理函数处理。接着,再获取 skb 的协议类型,交由对应的协议栈处理:

1
2
3
4
5
6
7
8
// file: net/core/dev.c
static inline int deliver_skb(struct sk_buff *skb,
			      struct packet_type *pt_prev,
			      struct net_device *orig_dev)
{
	...
	return pt_prev->func(skb, skb->dev, pt_prev, orig_dev);
}

上文提到过,ptype_base[] 是一个映射表,里边包含了 ipv4 协议的函数指针,所以这里最终调用了 pt_prev->func 指向的 ip_rcv() 函数。

IP 层处理

IP 层的函数入口是 ip_rcv()

1
2
3
4
5
6
7
// file: net/ipv4/ip_input.c
int ip_rcv(struct sk_buff *skb, struct net_device *dev, struct packet_type *pt, struct net_device *orig_dev)
{
	...
	return NF_HOOK(NFPROTO_IPV4, NF_INET_PRE_ROUTING, skb, dev, NULL,
		       ip_rcv_finish);
}

NF_HOOK() 是一个钩子函数,它首先会根据 iptables 对包进行过滤,最终交给参数指向的 ip_rcv_finish() 做进一步处理:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// file: net/ipv4/ip_input.c
static int ip_rcv_finish(struct sk_buff *skb)
{
	...
	if (!skb_dst(skb)) {
		int err = ip_route_input_noref(skb, iph->daddr, iph->saddr,
					       iph->tos, skb->dev);
		...
	}
	...
	// [2] 调用
	return dst_input(skb);
}

int ip_route_input_noref(struct sk_buff *skb, __be32 daddr, __be32 saddr,
			 u8 tos, struct net_device *dev)
{
	...
	int res = ip_route_input_mc(skb, daddr, saddr,tos, dev, our);
	...
}

static int ip_route_input_mc(struct sk_buff *skb, __be32 daddr, __be32 saddr,
				u8 tos, struct net_device *dev, int our)
{
	struct rtable *rth;
	...
	if (our) {
		// [1] 将函数 ip_local_deliver 赋值给 dst.input
		rth->dst.input= ip_local_deliver;
		rth->rt_flags |= RTCF_LOCAL;
	}
	...
}

该过程首先将函数 ip_local_deliver() 赋值给了 dst.input,然后回到 ip_rcv_finish() 里调用了 dst_input()

1
2
3
4
5
// file: include/net/dst.h
static inline int dst_input(struct sk_buff *skb)
{
	return skb_dst(skb)->input(skb);
}

skb_dst(skb)->input(skb) 调用就是刚刚赋值的 ip_local_deliver() 函数:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// file: net/ipv4/ip_input.c
int ip_local_deliver(struct sk_buff *skb)
{
	...
	return NF_HOOK(NFPROTO_IPV4, NF_INET_LOCAL_IN, skb, skb->dev, NULL,
		       ip_local_deliver_finish);
}

static int ip_local_deliver_finish(struct sk_buff *skb)
{
	...
	int protocol = ip_hdr(skb)->protocol;
	const struct net_protocol *ipprot;
	...
	// 通过映射表找到处理函数
	ipprot = rcu_dereference(inet_protos[protocol]);
	if (ipprot != NULL) {
		...
		// 移送到传输层协议中
		ret = ipprot->handler(skb);
		...
	}...
}

这里见到了一个熟悉的映射表 inet_protos[],上文讲过,该映射表内部缓存了 TCP、UDP 等传输层协议的处理函数,因此这里最终会根据协议类型进行分发,将 sbk 移送到更上层的协议中。

传输层处理

到了传输层,内核对数据包的处理就进入了最后阶段。而有一个与传输层密切相关的概念就是 I/O 模型,不同的 I/O 模型适用于不同的应用场景,主要包括以下几种:

  • 阻塞 I/O:在这种模型中,当一个进程执行 I/O 操作时,如果数据没有准备好,进程会一直等待,直到数据可用。
  • 非阻塞 I/O:进程执行 I/O 操作时,如果数据不可用,不会等待,而是立即返回错误码,进程可以休眠一段时间或自旋检查结果。
  • 多路复用 I/O:通过使用 selectpollepoll 等系统调用,进程可以同时监控多个文件描述符,一旦某个文件描述符准备好进行 I/O 操作,系统会通知进程。这种方式减少了轮询的开销。
  • 信号驱动 I/O:当 I/O 事件发生时,内核通过信号通知进程,进程可以在信号处理程序中执行相应的 I/O 操作。适用于需要快速响应的应用。
  • 异步 I/O:进程发起 I/O 操作后,不会等待数据准备完成,内核在操作完成后通知进程。进程可以在等待期间继续执行其他任务。这种方式理论上效率最高,但实现较为复杂。

在网络 I/O 领域,Linux 只支持前三种 I/O 模型,即阻塞、非阻塞与多路复用 I/O。由于阻塞与非阻塞 I/O 在模型上几乎没有区别(都会浪费一个与 socket 交互的进程),所以本文我们先通过 TCP 协议讲解阻塞 I/O 模型下的收包流程,至于 多路复用 的内容将在后续文章中进行介绍。

socket 创建过程

在剖析传输层协议前,我们先来了解一下 socket 是如何在内核中表示的。我们一般通过以下代码创建 socket 并接收数据:

1
2
3
4
5
6
// 创建 socket 文件描述符
int sk = socket(AF_INET, SOCK_STREAM, 0);
// 连接服务器
connect(sk, ...);
// 等待数据到达
recv(sk, ...)

socket() 函数会返回一个 socket 文件描述符,与之关联的是内核中一大堆复杂的内核对象:

linux-kernel-socket

接下来,我们从创建 socket 的系统调用向下跟踪这个过程:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// file: net/socket.c
SYSCALL_DEFINE3(socket, int, family, int, type, int, protocol)
{
	...
	// 阻塞与非阻塞就是标志位不同
	if (SOCK_NONBLOCK != O_NONBLOCK && (flags & SOCK_NONBLOCK))
		flags = (flags & ~SOCK_NONBLOCK) | O_NONBLOCK;
	// 创建 socket
	retval = sock_create(family, type, protocol, &sock);
	...
}
int sock_create(int family, int type, int protocol, struct socket **res)
{
	return __sock_create(current->nsproxy->net_ns, family, type, protocol, res, 0);
}
int __sock_create(struct net *net, int family, int type, int protocol,
			 struct socket **res, int kern)
{
	struct socket *sock;
	const struct net_proto_family *pf;
	...
	// [1] 分配空间
	sock = sock_alloc();
	...
	// [2] 查询协议族函数表
	pf = rcu_dereference(net_families[family]);
	...
	// [3] 调用指定协议族的创建函数,AF_INET 对应的是 inet_create
	err = pf->create(net, sock, protocol, kern);
	...
}

该函数首先为 socket 分配了内存空间,然后获取协议族函数表,最后根据参数传入的协议族 AF_INET 调用了表中对应的函数,即 inet_create()

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// file: net/ipv4/af_inet.c
static struct list_head inetsw[SOCK_MAX];
static int inet_create(struct net *net, struct socket *sock, int protocol,
		       int kern)
{
	struct sock *sk;
	struct inet_protosw *answer;
	...
	/*
	这个数组中的所有元素,最终会插入 inetsw
	static struct inet_protosw inetsw_array[] =
		{
			.type =       SOCK_STREAM,
			.protocol =   IPPROTO_TCP,
			.prot =       &tcp_prot,
			.ops =        &inet_stream_ops,
			.no_check =   0,
			.flags =      INET_PROTOSW_PERMANENT |
					INET_PROTOSW_ICSK,
		}
		...
	};
	*/
	list_for_each_entry_rcu(answer, &inetsw[sock->type], list) {
		...
	}
	...
	// [1] 将 inet_stream_ops 赋值给 sock->ops
	sock->ops = answer->ops; // inet_stream_ops
	// [2] 获得 tcp_prot
	answer_prot = answer->prot;
	...
	// [3] 分配 sock 对象,并将 tcp_prot 赋值给 sock->sk_prot
	sk = sk_alloc(net, PF_INET, GFP_KERNEL, answer_prot);
	...
	// [4] sock 初始化
	sock_init_data(sock, sk);
	...
}

inet_create() 函数又根据 sock->type == SOCK_STREAM 查找与 TCP 协议对应的操作方法集合 inet_stream_opstcp_prot,并把它们分别设置到 sock->opssock->sk_prot

  • inet_stream_ops
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    
    // file: net/ipv4/af_inet.c
    const struct proto_ops inet_stream_ops = {
    	.connect	   = inet_stream_connect,
    	.accept		   = inet_accept,
    	.poll		   = tcp_poll,
    	.listen		   = inet_listen,
    	.sendmsg	   = inet_sendmsg,
    	.recvmsg	   = inet_recvmsg,
    	...
    };
  • tcp_prot
    1
    2
    3
    4
    5
    6
    7
    8
    
    // file: net/ipv4/tcp_ipv4.c
    struct proto tcp_prot = {
    	.connect		= tcp_v4_connect,
    	.accept			= inet_csk_accept,
    	.recvmsg		= tcp_recvmsg,
    	.sendmsg		= tcp_sendmsg,
    	...
    }

接着我们再跟踪 sock_init_data(),该函数内部将 sock 内部的 sk_data_ready 指针指向了 sock_def_readable() 函数:

1
2
3
4
5
6
7
// file: net/core/sock.c
void sock_init_data(struct socket *sock, struct sock *sk)
{
	...
	sk->sk_data_ready	=	sock_def_readable;
	...
}

当收到数据包触发软中断时,就会调用这里设置的函数 sock_def_readable() 来唤醒在该 sock 上等待的进程。

阻塞等待数据到达

内核等待接收网络数据,并阻塞的系统调用是 recvfrom

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
// file: net/socket.c
SYSCALL_DEFINE6(recvfrom, int, fd, void __user *, ubuf, size_t, size,
		unsigned int, flags, struct sockaddr __user *, addr,
		int __user *, addr_len)
{
	struct socket *sock;
	...
	// 根据用户传入的 fd,找到 socket 内核对象
	sock = sockfd_lookup_light(fd, &err, &fput_needed);
	...
	// 阻塞并等待数据到达
	err = sock_recvmsg(sock, &msg, size, flags);
	...
}

sock_recvmsg() 内部又调用了 __sock_recvmsg -> __sock_recvmsg_nosec,最终实现如下:

1
2
3
4
5
6
7
// file: net/socket.c
static inline int __sock_recvmsg_nosec(struct kiocb *iocb, struct socket *sock,
				       struct msghdr *msg, size_t size, int flags)
{
	...
	return sock->ops->recvmsg(iocb, sock, msg, size, flags);
}

这里是一个重点,sock->ops->recvmsg 正是上文注册过的处理函数 inet_recvmsg()

1
2
3
4
5
6
7
8
// file: net/ipv4/af_inet.c
int inet_recvmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg,
		 size_t size, int flags)
{
	...
	err = sk->sk_prot->recvmsg(iocb, sk, msg, size, flags & MSG_DONTWAIT,
				   flags & ~MSG_DONTWAIT, &addr_len);
}

inet_recvmsg() 内部又调用了 sock 内部的 sk->sk_prot->recvmsg 函数,也就是上文中注册的 tcp_recvmsg() 函数:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// file: net/ipv4/tcp.c
int tcp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
		size_t len, int nonblock, int flags, int *addr_len)
{
	struct tcp_sock *tp = tcp_sk(sk);
	int copied = 0;
	...

	do {
		...
		// [1] 遍历接收队列中的数据
		skb_queue_walk(&sk->sk_receive_queue, skb) {
			...
		}
		...
		if (copied >= target) {
			/* Do not sleep, just process backlog. */
			release_sock(sk);
			lock_sock(sk);
		} else // [2] 没有足够的数据,就阻塞当前进程
			sk_wait_data(sk, &timeo);
	}...
}
  1. skb_queue_walk() 函数实现了对 sock 对象下接收队列 sk_receive_queue 的遍历。
  2. 如果没有收到数据或数据不够多,则调用 sk_wait_data() 将当前进程阻塞:
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    
    // file: net/core/sock.c
    int sk_wait_data(struct sock *sk, long *timeo)
    {
    	int rc;
    	// 定义一个等待队列项 wait
    	DEFINE_WAIT(wait);
    	// sk_sleep 获取 sock 的等待队列头,然后 prepare_to_wait 将 wait 插入
    	prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
    	set_bit(SOCK_ASYNC_WAITDATA, &sk->sk_socket->flags);
    	// 让出 CPU,进行休眠,唤醒条件为 sk_receive_queue 不为空
    	rc = sk_wait_event(sk, timeo, !skb_queue_empty(&sk->sk_receive_queue));
    	...
    }
    • 首先通过 DEFINE_WAIT 宏,定义了一个等待队列项 wait

      1
      2
      3
      4
      5
      6
      7
      8
      
      // file: include/linux/wait.h
      #define DEFINE_WAIT_FUNC(name, function)				\
      	wait_queue_t name = {						\
      		.private	= current,				\
      		.func		= function,				\
      		.task_list	= LIST_HEAD_INIT((name).task_list),	\
      	}
      #define DEFINE_WAIT(name) DEFINE_WAIT_FUNC(name, autoremove_wake_function)
      • 该队列项的 private 指针指向了当前进程 current,用于将来的阻塞。
      • 该队列项的 function 指针指向了 autoremove_wake_function(),该函数用于唤醒用户进程。
    • 紧接着调用 sk_sleep() 函数拿到了 sock 内部的等待队列头 wait_queue_head

      1
      2
      3
      4
      5
      6
      
      // file: include/net/sock.h
      static inline wait_queue_head_t *sk_sleep(struct sock *sk)
      {
      	BUILD_BUG_ON(offsetof(struct socket_wq, wait) != 0);
      	return &rcu_dereference_raw(sk->sk_wq)->wait;
      }
    • 然后调用 prepare_to_wait() 将刚创建的 wait 队列项添加到了 sock 等待队列:

       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      
      // file: kernel/wait.c
      void
      prepare_to_wait(wait_queue_head_t *q, wait_queue_t *wait, int state)
      {
      	unsigned long flags;
      
      	wait->flags &= ~WQ_FLAG_EXCLUSIVE;
      	spin_lock_irqsave(&q->lock, flags);
      	if (list_empty(&wait->task_list))
      		__add_wait_queue(q, wait);
      	set_current_state(state);
      	spin_unlock_irqrestore(&q->lock, flags);
      }

      这样当 socket 的接收队列不为空时,内核就可以通过等待队列项唤醒监听 sock 的进程了。

    • 最后调用 sk_wait_event() 将进程挂起,该调用会产生一次进程上下文开销,十分昂贵,大约会消耗几微秒的 CPU 时间。

上述流程总结如下:

linux-kernel-socket-block

软中断处理数据

铺垫工作完成,总算来到了数据处理过程。

linux-kernel-socket-tcp-rcv

上文在 IP 层处理时我们讲到,最后 IP 层通过 net/ipv4/ip_input.c#ip_local_deliver_finish() 函数将数据包交给了传输层协议,所以这里我们接着从 TCP 协议的数据接收函数 tcp_v4_rcv() 开始剖析:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
// file: net/ipv4/tcp_ipv4.c
int tcp_v4_rcv(struct sk_buff *skb)
{
	const struct iphdr *iph;
	const struct tcphdr *th;
	struct sock *sk;
	...
	// [1] 获取 TCP 包头
	th = tcp_hdr(skb);
	// [2] 获取 IP 包头
	iph = ip_hdr(skb);
	...
	// [3] 根据 tcp 包头的 ip 端口信息查找 sock 对象
	sk = __inet_lookup_skb(&tcp_hashinfo, skb, th->source, th->dest);
	...
	// socket 未被锁定
	if (!sock_owned_by_user(sk)) {
		if (!tcp_prequeue(sk, skb))
			// [4] 开始处理数据包
			ret = tcp_v4_do_rcv(sk, skb);
	} ...
}

该函数通过 TCP 头信息中的元组查到 sock 后,直接进入了接收消息的主体函数:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
// file: net/ipv4/tcp_ipv4.c
int tcp_v4_do_rcv(struct sock *sk, struct sk_buff *skb)
{
	struct sock *rsk;
	...
	if (sk->sk_state == TCP_ESTABLISHED) { /* Fast path */
		...
		// ESTABLISHED 状态下解析数据
		tcp_rcv_established(sk, skb, tcp_hdr(skb), skb->len);
		return 0;
	}
	// 其他状态数据包
	...
}

如果处理的是 TCP_ESTABLISHED 状态的包,则进入 tcp_rcv_established() 函数:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
// file: net/ipv4/tcp_input.c
void tcp_rcv_established(struct sock *sk, struct sk_buff *skb,
			 const struct tcphdr *th, unsigned int len)
{
	...
	// 先将数据添加到接收队列
	eaten = tcp_queue_rcv(sk, skb, tcp_header_len,
								&fragstolen);
	...
	// 唤醒等待队列上的进程
	sk->sk_data_ready(sk, 0);
	...
}
  • tcp_rcv_established() 内部又调用了 tcp_queue_rcv()

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    
    // file: net/ipv4/tcp_input.c
    static int __must_check tcp_queue_rcv(struct sock *sk, struct sk_buff *skb, int hdrlen,
    		bool *fragstolen)
    {
    	int eaten;
    	// 拿到 sock 接收队列尾部
    	struct sk_buff *tail = skb_peek_tail(&sk->sk_receive_queue);
    	...
    	// 将收到的 skb 追加到接收队列尾部
    	if (!eaten) {
    		__skb_queue_tail(&sk->sk_receive_queue, skb);
    		skb_set_owner_r(skb, sk);
    	}
    	return eaten;
    }

    tcp_queue_rcv() 最终将收到的 skb 追加到了 sock->sk_receive_queue 接收队列的尾部。

  • 然后,又调用 sk->sk_data_ready(sk, 0) 唤醒了 sock 等待队列上休眠的进程。sk_data_ready 是一个函数指针,上文讲过它指向的是 sock_def_readable()

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    
    // file: net/core/sock.c
    static void sock_def_readable(struct sock *sk, int len)
    {
    	struct socket_wq *wq;
    
    	rcu_read_lock();
    	wq = rcu_dereference(sk->sk_wq);
    	if (wq_has_sleeper(wq))
    		wake_up_interruptible_sync_poll(&wq->wait, POLLIN | POLLPRI |
    						POLLRDNORM | POLLRDBAND);
    	sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_IN);
    	rcu_read_unlock();
    }

    该函数访问了 sock->sk_wq 队列中的 wait 队列项,其 private 指针正指向阻塞的进程,最终通过系统调用 wake_up_interruptible_sync_poll 唤醒了该进程:

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    
    // file: include/linux/wait.h
    #define wake_up_interruptible_sync_poll(x, m)				\
    	__wake_up_sync_key((x), TASK_INTERRUPTIBLE, 1, (void *) (m))
    // file: kernel/sched/core.c
    void __wake_up_sync_key(wait_queue_head_t *q, unsigned int mode,
    			int nr_exclusive, void *key)
    {
    	unsigned long flags;
    	int wake_flags = WF_SYNC;
    
    	if (unlikely(!q))
    		return;
    
    	if (unlikely(nr_exclusive != 1))
    		wake_flags = 0;
    
    	spin_lock_irqsave(&q->lock, flags);
    	__wake_up_common(q, mode, nr_exclusive, wake_flags, key);
    	spin_unlock_irqrestore(&q->lock, flags);
    }

    这里是通过 kernel/sched/core.c 中的 __wake_up_common() 实现的唤醒,而且传入的参数 nr_exclusive == 1,因此每次调用只会唤醒等待队列上的一个线程,以免发生惊群效应

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    
    // file: kernel/sched/core.c
    static void __wake_up_common(wait_queue_head_t *q, unsigned int mode,
    		int nr_exclusive, int wake_flags, void *key)
    {
    	wait_queue_t *curr, *next;
    	// 找到一个等待队列项
    	list_for_each_entry_safe(curr, next, &q->task_list, task_list) {
    		unsigned flags = curr->flags;
    		// 将其唤醒
    		if (curr->func(curr, mode, wake_flags, key) &&
    				(flags & WQ_FLAG_EXCLUSIVE) && !--nr_exclusive) // --nr_exclusive 只唤醒一个
    			break;
    	}
    }

    __wake_up_common 从等待队列中取下一个队列项后,调用了 func 函数指针,而上文我们介绍过这里的函数指针被注册成了 autoremove_wake_function(),其实现如下;

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    
    // file: kernel/wait.c
    int autoremove_wake_function(wait_queue_t *wait, unsigned mode, int sync, void *key)
    {
    	int ret = default_wake_function(wait, mode, sync, key);
    	...
    }
    // file: kernel/sched/core.c
    int default_wake_function(wait_queue_t *curr, unsigned mode, int wake_flags,
    		  void *key)
    {
    	return try_to_wake_up(curr->private, mode, wake_flags);
    }

    这里到了重点,try_to_wake_up 通过等待队列项的 private 指针完成了进程的唤醒,而上文讲过 private 指向的正是休眠等待的线程。该函数执行完,阻塞的进程就被推入可运行队列中等待系统调度了,最终系统唤醒休眠的线程又将产生一次进程上下文切换开销。

0%