0%

【Linux内核网络】数据包收发流程

网络作为 PCIe 设备,工作在物理层和数据链路层,主要由PHY/MAC芯片、Tx/Rx FIFO、DMA等组成。PHY 芯片对外通过变压器与网线连接,实现 CSMA/CD、模数转换、编解码、串并转换等功能。MAC 芯片对内连接 PCI 总线,主要负责比特流于帧的转换、CRC校验和 Packet Filtering 等功能。本文将详细介绍 Linux 内核网络子系统中,在数据链路层收发包的原理。

数据包的接收过程

设备驱动层

对于每一个网卡,无论是真实网卡还是虚拟网卡,在内核中都会对应着 net_device,都会有对应的网络设备驱动来在内核中注册和初始化,正如 Linux 网络设备驱动中所述

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
static int __init ixgbe_init_module(void)
{
/* ... */
ret = pci_register_driver(&ixgbe_driver); // 注册ixgbe_driver
/* ... */
}
module_init(ixgbe_init_module);

static void __exit ixgbe_exit_module(void)
{
/* ... */
pci_unregister_driver(&ixgbe_driver); // 注销ixgbe_driver
/* ... */
}
module_exit(ixgbe_exit_module);

可以看到这里主要调用了 pci_register_driver 来初始化网络设备。

PCI 初始化

Intel I350 网卡是 PCIe 设备,PCI 设备通过 PCI 配置空间来区分自己。当一个 PCI 设备驱动编译时,会通过宏 MODULE_DEVICE_TABLE (from include/module.h) 来导出 PCI 设备 ID,这样设备驱动就可以控制和操作设备了。内核通过这个表来将不同的 PCI 设备与不同的设备驱动对应起来。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
static const struct pci_device_id ixgbe_pci_tbl[] = {
{PCI_VDEVICE(INTEL, IXGBE_DEV_ID_82598), board_82598 },
{PCI_VDEVICE(INTEL, IXGBE_DEV_ID_82598AF_DUAL_PORT), board_82598 },
{PCI_VDEVICE(INTEL, IXGBE_DEV_ID_82598AF_SINGLE_PORT), board_82598 },
{PCI_VDEVICE(INTEL, IXGBE_DEV_ID_82598AT), board_82598 },
{PCI_VDEVICE(INTEL, IXGBE_DEV_ID_82598AT2), board_82598 },
{PCI_VDEVICE(INTEL, IXGBE_DEV_ID_82598EB_CX4), board_82598 },
{PCI_VDEVICE(INTEL, IXGBE_DEV_ID_82598_CX4_DUAL_PORT), board_82598 },
{PCI_VDEVICE(INTEL, IXGBE_DEV_ID_82598_DA_DUAL_PORT), board_82598 },
{PCI_VDEVICE(INTEL, IXGBE_DEV_ID_82598_SR_DUAL_PORT_EM), board_82598 },
{PCI_VDEVICE(INTEL, IXGBE_DEV_ID_82598EB_XF_LR), board_82598 },
{PCI_VDEVICE(INTEL, IXGBE_DEV_ID_82598EB_SFP_LOM), board_82598 },
{PCI_VDEVICE(INTEL, IXGBE_DEV_ID_82598_BX), board_82598 },

/* required last entry */
{0, }
};
MODULE_DEVICE_TABLE(pci, ixgbe_pci_tbl);

前面我们看到 pci_register_driver 用来初始化设备驱动,这里传入的是 igbbe_driver 参数。内核使用struct pci_driver结构来描述一个PCI设备,因为在系统引导的时候,PCI设备已经被识别,当内核发现一个已经检测到的设备同驱动注册的id_table中的信息相匹配时,它就会触发驱动的probe函数。

drivers/net/ethernet/intel/igb/igb_main.c
1
2
3
4
5
6
7
8
9
10
11
12
13
static struct pci_driver ixgbe_driver = {
.name = ixgbe_driver_name,
.id_table = ixgbe_pci_tbl,
.probe = ixgbe_probe, // 系统探测到ixgbe网卡后调用ixgbe_probe()
.remove = ixgbe_remove,
#ifdef CONFIG_PM
.suspend = ixgbe_suspend,
.resume = ixgbe_resume,
#endif
.shutdown = ixgbe_shutdown,
.sriov_configure = ixgbe_pci_sriov_configure,
.err_handler = &ixgbe_err_handler
};

PCI Probe

一旦一个设备被内核通过 PCI IDs 识别,内核将会选择合适的驱动来控制该设备。每个 PCI 驱动注册了一个 probe 函数表示该设备已经被识别和控制。一般 probe 的常见操作包括:

  1. Enable PCI 设备
  2. 申请 memory ranges 和 IO ports
  3. 设置 DMA Mask
  4. 注册 ethtool 函数
  5. 任何需要的 watchdog tasks
  6. 该设备专门的一些事情
  7. struct net_device_ops 对应的创建、初始化和注册
  8. struct net_device 对应的创建、初始化和注册
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* ixgbe_probe - Device Initialization Routine
* @pdev: PCI device information struct
* @ent: entry in ixgbe_pci_tbl
**/
static int ixgbe_probe(struct pci_dev *pdev, const struct pci_device_id *ent)
{
struct net_device *netdev;
struct pci_dev *pdev;
pci_enable_device_mem(pdev); // 初始化 PCI 设备,enable memory resources
dma_set_mask_and_coherent(&pdev->dev, DMA_BIT_MASK(64)); // 设置 DMA Mask
pci_request_mem_regions(pdev, ixgbe_driver_name); // 申请 Memory regions
pci_set_master(pdev); // Enable DMA
pci_save_state(pdev); // 设置 PCI 配置空间

netdev = alloc_etherdev_mq(sizeof(struct ixgbe_adapter), indices); // 这里分配struct net_device
SET_NETDEV_DEV(netdev, &pdev->dev);
adapter = netdev_priv(netdev);

netdev->netdev_ops = &ixgbe_netdev_ops; // 设置 net_device_ops
ixgbe_set_ethtool_ops(netdev); // 注册 ethtool 函数

ixgbe_sw_init(); // 设置 private data
/* ... */
}

网络设备初始化

struct net_device_ops

struct net_device_ops 包含了用于操作网络设备的一系列函数指针,它在 ixgbe_probe 函数中被赋值。如下所示,我们可以看到常见的 ndo_openndo_stopndo_start_xmit 等函数,还可以看到跟 SRIOV 相关的 nod_set_vf_mac,跟 tc 相关 的 nod_setup_tc,跟 bridge 相关的 ndo_fdb_add,跟 XDP 和 BPF 相关的 ndo_bpf 等等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
static const struct net_device_ops ixgbe_netdev_ops = {
.ndo_open = ixgbe_open,
.ndo_stop = ixgbe_close,
.ndo_start_xmit = ixgbe_xmit_frame,
.ndo_set_rx_mode = ixgbe_set_rx_mode,
.ndo_validate_addr = eth_validate_addr,
.ndo_set_mac_address = ixgbe_set_mac,
.ndo_change_mtu = ixgbe_change_mtu,
.ndo_tx_timeout = ixgbe_tx_timeout,
.ndo_set_tx_maxrate = ixgbe_tx_maxrate,
.ndo_vlan_rx_add_vid = ixgbe_vlan_rx_add_vid,
.ndo_vlan_rx_kill_vid = ixgbe_vlan_rx_kill_vid,

.ndo_do_ioctl = ixgbe_ioctl,
.ndo_set_vf_mac = ixgbe_ndo_set_vf_mac,
.ndo_set_vf_vlan = ixgbe_ndo_set_vf_vlan,
.ndo_set_vf_rate = ixgbe_ndo_set_vf_bw,
.ndo_set_vf_spoofchk = ixgbe_ndo_set_vf_spoofchk,
.ndo_set_vf_rss_query_en = ixgbe_ndo_set_vf_rss_query_en,
.ndo_set_vf_trust = ixgbe_ndo_set_vf_trust,
.ndo_get_vf_config = ixgbe_ndo_get_vf_config,
.ndo_get_stats64 = ixgbe_get_stats64,

.ndo_fdb_add = ixgbe_ndo_fdb_add,
.ndo_bridge_setlink = ixgbe_ndo_bridge_setlink,
.ndo_bridge_getlink = ixgbe_ndo_bridge_getlink,

.ndo_setup_tc = __ixgbe_setup_tc,
.ndo_bpf = ixgbe_xdp,
.ndo_xdp_xmit = ixgbe_xdp_xmit,

/* ... */
};

ethtool registration

ethtool 是在网络中广泛使用的调试工具,可以用来获取关于网络设备的众多信息,ethtool 通过 ioctl 与驱动通信。

1
2
3
4
void ixgbe_set_ethtool_ops(struct net_device *netdev)
{
netdev->ethtool_ops = &ixgbe_ethtool_ops;
}

这里可以看到一些常见的 ethtool 函数:

1
2
3
4
5
6
static const struct ethtool_ops ixgbe_ethtool_ops = {
.get_drvinfo = ixgbe_get_drvinfo,
.get_regs_len = ixgbe_get_regs_len,
.get_regs = ixgbe_get_regs,
/* ... */
};

IRQs

当数据包通过网络到达网卡后,data frame 通过 DMA 被写到 RAM,那么接下来 NIC 怎么告诉内核去处理数据呢?

这个时候 NIC 会产生一个 interrupt request (IRQ)) 来表示数据到来了,有三种常见的中断:

  • MSI-X
  • MSI
  • 传统的 IRQs

但是当有大量数据包到来时会产生大量的 IRQs,大量的 CPU 时间被用来处理中断。为了解决这个问题,内核设计了 NAPI 机制来减少中断的产生。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
static void
e100rx_interrupt(int irq, void *dev_id, struct pt_regs * regs)
{
struct net_device *dev = (struct net_device *)dev_id;
unsigned long irqbits = *R_IRQ_MASK2_RD;

if (irqbits & IO_STATE(R_IRQ_MASK2_RD, dma1_eop, active)) {
/* acknowledge the eop interrupt */

*R_DMA_CH1_CLR_INTR = IO_STATE(R_DMA_CH1_CLR_INTR, clr_eop, do);

/* check if one or more complete packets were indeed received */

while (*R_DMA_CH1_FIRST != virt_to_phys(myNextRxDesc)) {
/* Take out the buffer and give it to the OS, then
* allocate a new buffer to put a packet in.
*/
e100_rx(dev);
((struct net_local *)dev->priv)->stats.rx_packets++;
/* restart/continue on the channel, for safety */
*R_DMA_CH1_CMD = IO_STATE(R_DMA_CH1_CMD, cmd, restart);
/* clear dma channel 1 eop/descr irq bits */
*R_DMA_CH1_CLR_INTR =
IO_STATE(R_DMA_CH1_CLR_INTR, clr_eop, do) |
IO_STATE(R_DMA_CH1_CLR_INTR, clr_descr, do);

/* now, we might have gotten another packet
so we have to loop back and check if so */
}
}
}
arch/cris/drivers/ethernet.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
static void
e100_rx(struct net_device *dev)
{
struct sk_buff *skb;
int length = 0;

length = myNextRxDesc->descr.hw_len - 4;

if (length < RX_COPYBREAK) {
/* Small packet, copy data */
skb = dev_alloc_skb(length - ETHER_HEAD_LEN);

skb_put(skb, length - ETHER_HEAD_LEN); /* allocate room for the packet body */
skb_data_ptr = skb_push(skb, ETHER_HEAD_LEN); /* allocate room for the header */

memcpy(skb_data_ptr, phys_to_virt(myNextRxDesc->descr.buf), length);
}
else {
/* Large packet, send directly to upper layers and allocate new memory */
skb = myNextRxDesc->skb;
skb_put(skb, length);
myNextRxDesc->skb = dev_alloc_skb(MAX_MEDIA_DATA_SIZE);
myNextRxDesc->descr.buf = virt_to_phys(myNextRxDesc->skb->data);
}

skb->protocol = eth_type_trans(skb, dev);
/* Send the packet to the upper layers */
netif_rx(skb);
/* Prepare for next packet */
myNextRxDesc->descr.status = 0;
myPrevRxDesc = myNextRxDesc;
myNextRxDesc = phys_to_virt(myNextRxDesc->descr.next);

rx_queue_len++;

/* Check if descriptors should be returned */
if (rx_queue_len == RX_QUEUE_THRESHOLD) {
flush_etrax_cache();
myPrevRxDesc->descr.ctrl |= d_eol;
myLastRxDesc->descr.ctrl &= ~d_eol;
myLastRxDesc = myPrevRxDesc;
rx_queue_len = 0;
}
}

NAPI

NAPI 是 Linux 上采用的一种提高网络处理效率的技术,它的核心概念就是不采用中断的方式读取数据,而代之以首先采用中断唤醒数据接收的服务程序,然后 Pol l 的方法来轮询数据。NAPI 的使用流程如下:

  1. NAPI 被驱动 Enable,但是默认是关闭状态
  2. 数据包到达 NIC 并且被 DMA 到 RAM 中
  3. NIC 产生 IRQ,触发了驱动中的 IRQ Handler
  4. 驱动通过 softirq 唤醒 NAPI 子系统,通过使用驱动注册的 Poll 函数来获取数据包
  5. 驱动关闭 NIC 的中断,这样可以通过驱动使用 NAPI 获取数据包而不用处理中断
  6. 当所有数据包都已被处理,NAPI 被 disable,IRQs 被 re-enable
  7. 当再次有数据包到达,重复步骤2

对于使用 NAPI 的驱动,都会有一个 poll 函数,它会调用 netif_napi_add 方法向 NAPI 子系统注册,后面会详细介绍。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
void netif_napi_add(struct net_device *dev, struct napi_struct *napi,
int (*poll)(struct napi_struct *, int), int weight)
{
INIT_LIST_HEAD(&napi->poll_list);
hrtimer_init(&napi->timer, CLOCK_MONOTONIC, HRTIMER_MODE_REL_PINNED);
napi->timer.function = napi_watchdog;
napi->gro_count = 0;
napi->gro_list = NULL;
napi->skb = NULL;
napi->poll = poll;
if (weight > NAPI_POLL_WEIGHT)
pr_err_once("netif_napi_add() called with weight %d on device %s\n",
weight, dev->name);
napi->weight = weight;
list_add(&napi->dev_list, &dev->napi_list);
napi->dev = dev;
#ifdef CONFIG_NETPOLL
napi->poll_owner = -1;
#endif
set_bit(NAPI_STATE_SCHED, &napi->state);
napi_hash_add(napi);
}

ixgbe_adapter包含ixgbe_q_vector数组(一个ixgbe_q_vector对应一个中断),ixgbe_q_vector包含napi_struct

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/*
* Structure for NAPI scheduling similar to tasklet but with weighting
*/
struct napi_struct {
/* The poll_list must only be managed by the entity which
* changes the state of the NAPI_STATE_SCHED bit. This means
* whoever atomically sets that bit can add this napi_struct
* to the per-CPU poll_list, and whoever clears that bit
* can remove from the list right before clearing the bit.
*/
struct list_head poll_list;

unsigned long state;
int weight;
unsigned long gro_bitmask;
int (*poll)(struct napi_struct *, int);

struct net_device *dev;
struct gro_list gro_hash[GRO_HASH_BUCKETS];
struct sk_buff *skb;
struct hrtimer timer;
struct list_head dev_list;
struct hlist_node napi_hash_node;
unsigned int napi_id;
};
1
2
3
4
ixgbe_probe();
-> ixgbe_init_interrupt_scheme();
-> ixgbe_alloc_q_vectors(); // We allocate one q_vector per queue interrupt
-> ixgbe_alloc_q_vector(); // Allocate memory for a single interrupt vector

我们知道网卡可能有多队列,每个发送队列和接收队列都会对应着一个 q_vector 。仔细看下 ixgbe_alloc_q_vetor 函数的实现,可以看到 ixgbe_poll 是如何被注册的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
static int ixgbe_alloc_q_vector(struct ixgbe_adapter *adapter,
int v_count, int v_idx,
int txr_count, int txr_idx,
int xdp_count, int xdp_idx,
int rxr_count, int rxr_idx)
{
/* ... */

/* allocate q_vector and rings */
q_vector = kzalloc(size, GFP_KERNEL);
if (!q_vector)
return -ENOMEM;

/* initialize NAPI */
netif_napi_add(adapter->netdev, &q_vector->napi, ixgbe_poll, 64);

/* ... */
}

硬中断函数把napi_struct加入CPU的poll_list,软中断函数net_rx_action()遍历poll_list,执行poll函数

打开网络设备

当一个网络设备被启动时(比如 ifconfig eth0 up),会调用 ndo_open,一般会执行以下操作:

  1. Allocate RX and TX queue memory
  2. Enable NAPI
  3. Register an interrupt handler
  4. Enable hardware interrupts
  5. And more

ixgbe_open 为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
int ixgbe_open(struct net_device *netdev)
{

/* allocate transmit descriptors */
ixgbe_setup_all_tx_resources(adapter);

/* allocate receive descriptors */
ixgbe_setup_all_rx_resources(adapter);

ixgbe_configure(adapter);

ixgbe_request_irq(adapter);

/* Notify the stack of the actual queue counts. */
queues = adapter->num_tx_queues;
netif_set_real_num_tx_queues(netdev, queues);

queues = adapter->num_rx_queues;
netif_set_real_num_rx_queues(netdev, queues);

ixgbe_up_complete(adapter);

/* ... */
return 0;
}

初始化 Ring Buffer

当前绝大多数的 NIC 会直接将收到的数据包 DMA 到 RAM,然后操作系统网络子系统可以从 RAM 中取出数据,这段空间即是 Ring Buffer。因此,驱动必须向 OS 申请一段 Memory Region,并将这段地址告知 Hardware,之后网卡就会自动将数据包 DMA 到这段 Ring Buffer 中。

当网络数据包速率较高时,一个 CPU 不能够处理所有的数据包,具体的,这段 Ring Buffer 是固定大小的空间,数据包将会被丢掉。为了解决这个问题,设计了 Receive Side Scaling (RSS) 或者说多队列网卡技术。一些网卡设备可以将接收到的数据包分发到多个 Ring Buffer 中,每个 Ring Buffer 就是一个独立的 Queue。这样允许 OS 在硬件层面就使用多个 CPU 来并行处理收到的数据包。

ixgbe 网卡支持多队列,我们可以看到其驱动在初始化时会调用 ixgbe_setup_all_rx_resourcesixgbe_setup_all_tx_resources 来初始化 Rx 和 Tx 的 Descriptors。

1
2
3
4
5
6
7
8
9
10
static int ixgbe_setup_all_rx_resources(struct ixgbe_adapter *adapter)
{
int i, err = 0;

for (i = 0; i < adapter->num_rx_queues; i++) {
err = ixgbe_setup_rx_resources(adapter, adapter->rx_ring[i]);
}

return 0;
}

我们看到, ixgbe_adapter 的数据结构中有 rx_ring Descriptor 数组用于描述接收 Ring Buffer,类似的还有 tx_ringxdp_ring

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#define MAX_Q_VECTORS             64
#define IXGBE_MAX_FCOE_INDICES 8
#define MAX_RX_QUEUES (IXGBE_MAX_FDIR_INDICES + 1)
#define MAX_TX_QUEUES (IXGBE_MAX_FDIR_INDICES + 1)
#define MAX_XDP_QUEUES (IXGBE_MAX_FDIR_INDICES + 1)

/* board specific private data structure */
struct ixgbe_adapter {
/* ... */
int num_tx_queues;
int num_rx_queues;
int num_xdp_queues;

struct ixgbe_ring *xdp_ring[MAX_XDP_QUEUES];
struct ixgbe_ring *tx_ring[MAX_TX_QUEUES];
struct ixgbe_ring *rx_ring[MAX_RX_QUEUES];

struct ixgbe_q_vector *q_vector[MAX_Q_VECTORS];
};

具体看 ixgbe_ring 这个数据结构,关键字段

  • next:指向在 q_vector 中的下一个 ixgbe_ring
  • desc:指向这个 descriptor ring 拥有的 DMA Buffer 的开始地址,这是数据在接收和发送所在的实际地址
  • rx_buffer_info: 接受队列的描述信息,包括关联的 skb 和 dma 地址
  • dma:表明这个 descriptor ring 指向的 DMA Buffer 的物理地址,对于DMA而言,其数据传输不会经过MMU,因此需要一个真实的物理地址
  • count: 这个 descriptor ring 总共有多少个 descriptor
  • size: descriptor count * sizeof(ixgbe_ring)
  • queue_index:用于 multiqueue 的队列管理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
struct ixgbe_rx_buffer {
struct sk_buff *skb;
dma_addr_t dma;
struct page *page;
__u32 page_offset;
__u16 pagecnt_bias;
};

struct ixgbe_ring {
struct ixgbe_ring *next; /* pointer to next ring in q_vector */
struct ixgbe_q_vector *q_vector; /* backpointer to host q_vector */
struct net_device *netdev; /* netdev ring belongs to */
struct device *dev; /* device for DMA mapping */
void *desc; /* descriptor ring memory */
union {
struct ixgbe_tx_buffer *tx_buffer_info;
struct ixgbe_rx_buffer *rx_buffer_info;
};
unsigned long state;
u8 __iomem *tail;
dma_addr_t dma; /* phys. address of descriptor ring */
unsigned int size; /* length in bytes */

u16 count; /* amount of descriptors */

u8 queue_index; /* needed for multiqueue queue management */
u8 reg_idx; /* holds the special value that gets
* the hardware register offset
* associated with this ring, which is
* different for DCB and RSS modes
*/
u16 next_to_use;
u16 next_to_clean;

/* ... */
};

可以看到在 ixgbe_setup_rx_resources 会初始化该数据结构的变量,并且调用 dam_alloc_coherent 来申请 DMA Memory。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
int ixgbe_setup_rx_resources(struct ixgbe_adapter *adapter, struct ixgbe_ring *rx_ring)
{
struct device *dev = rx_ring->dev;
size = sizeof(struct ixgbe_rx_buffer) * rx_ring->count;
rx_ring->rx_buffer_info = vmalloc(size);

/* Round up to nearest 4K */
rx_ring->size = rx_ring->count * sizeof(union ixgbe_adv_rx_desc);
rx_ring->size = ALIGN(rx_ring->size, 4096);

rx_ring->desc = dma_alloc_coherent(dev, rx_ring->size, &rx_ring->dma, GFP_KERNEL);

rx_ring->next_to_clean = 0;
rx_ring->next_to_use = 0;

/* ... */

return 0;
}
Rx Ring Buffer

这里写图片描述

  1. 网卡驱动创建 rx descriptor ring(一致性DMA内存),将 rx descriptor ring 的总线地址写入网卡寄存器RDBA
  2. 网卡驱动为每个descriptor分配 sk_buff 和数据缓存区,流式DMA映射数据缓存区,将数据缓存区的总线地址保存到 descriptor
  3. 网卡接收数据包,将数据包写入Rx FIFO
  4. DMA找到rx descriptor ring中下一个将要使用的descriptor
  5. 整个数据包写入Rx FIFO后,DMA通过PCI总线将Rx FIFO中的数据包复制到descriptor的数据缓存区
  6. 复制完后,网卡启动硬中断通知CPU数据缓存区中已经有新的数据包了,CPU执行硬中断函数:
    1. NAPI(以e1000网卡为例):e1000_intr() -> napi_schedule() -> raise_softirq_irqoff(NET_RX_SOFTIRQ)
    2. 非NAPI(以dm9000网卡为例):dm9000_interrupt() -> dm9000_rx() -> netif_rx() -> napi_schedule() -> napi_schedule() -> raise_softirq_irqoff(NET_RX_SOFTIRQ)
  7. ksoftirqd执行软中断函数net_rx_action():
    1. NAPI(以e1000网卡为例):net_rx_action() -> e1000_clean() -> e1000_clean_rx_irq() -> e1000_receive_skb() -> netif_receive_skb()
    2. 非NAPI(以dm9000网卡为例):net_rx_action() -> process_backlog() -> netif_receive_skb()
  8. 网卡驱动通过 netif_receive_skb() 将sk_buff上送协议栈

对于这段 Ring Buffer,Hardware 和 Software 会共同读写,Hardware 也就是 NIC,Software 就是驱动。

  • SW向从next_to_use开始的N个 descriptor 补充 sk_buff,next_to_use += N,tail = next_to_use(写网卡寄存器RDT)
  • HW写Frame到从head开始的M个descriptor的sk_buff,写完后回写EOP(End of Packet),head += M
  • SW将从next_to_clean开始的L个sk_buff移出Rx Ring Buffer并上送协议栈,next_to_clean += L,向从next_to_use开始的L个descriptor补充sk_buff,next_to_use += L,tail = next_to_use

注意:每次补充完sk_buff后,tail和next_to_use指向同一个sk_buff

Tx Ring Buffer

这里写图片描述

  1. 网卡驱动创建tx descriptor ring(一致性DMA内存),将tx descriptor ring的总线地址写入网卡寄存器TDBA
  2. 协议栈通过dev_queue_xmit()将sk_buff下送网卡驱动
  3. 网卡驱动将sk_buff放入tx descriptor ring,更新TDT
  4. DMA感知到TDT的改变后,找到tx descriptor ring中下一个将要使用的descriptor
  5. DMA通过PCI总线将descriptor的数据缓存区复制到Tx FIFO
  6. 复制完后,通过MAC芯片将数据包发送出去
  7. 发送完后,网卡更新TDH,启动硬中断通知CPU释放数据缓存区中的数据包

SW将sk_buff挂载到从next_to_use开始的N个descriptor,next_to_use += N,tail = next_to_use(写网卡寄存器TDT)

HW使用DMA读从head开始的M个descriptor的sk_buff,发送成功后回写DD(Descriptor Done),head += M

SW将从next_to_clean的开始的L个sk_buff移出Tx Ring Buffer并清理,next_to_clean += L

注意:每次挂载完sk_buff后,tail和next_to_use指向同一个descriptor

Register interrupt handler

设备驱动需要探查驱动支持那种中断:MSI-X、MSI 或者是 L egacy Interrupt,并且根据中断类型设置 interrupt handler。关于这三种中断类型的区别可以查看 this useful wiki page

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
static int ixgbe_request_irq(struct ixgbe_adapter *adapter)
{
struct net_device *netdev = adapter->netdev;
int err;

if (adapter->flags & IXGBE_FLAG_MSIX_ENABLED)
err = ixgbe_request_msix_irqs(adapter);
else if (adapter->flags & IXGBE_FLAG_MSI_ENABLED)
err = request_irq(adapter->pdev->irq, ixgbe_intr, 0,
netdev->name, adapter);
else
err = request_irq(adapter->pdev->irq, ixgbe_intr, IRQF_SHARED,
netdev->name, adapter);

if (err)
e_err(probe, "request_irq failed, Error %d\n", err);

return err;
}

Enable NAPI

通过 napi_enable 来 Enable 每个 q_vector 的 NAPI:

1
2
3
4
5
6
7
static void ixgbe_napi_enable_all(struct ixgbe_adapter *adapter)
{
int q_idx;

for (q_idx = 0; q_idx < adapter->num_q_vectors; q_idx++)
napi_enable(&adapter->q_vector[q_idx]->napi);
}

SoftIRQs

我们知道,Linux 中断分为顶半部和底半部:

  • 顶半部也即是 Interrupt Handler,处理时全程关闭中断,用于处理有实时性和硬件相关的操作,可以快速的执行完毕。
  • 底半部代表中断处理中被延后执行的部分

目前 Linux 支持 softirq、tasklet 和 work queue 三种底半部机制,对于网络协议栈而言,我们只关注 softirq。对每个 CPU,都会有一个 kernel thread 用于处理软中断,也就是在 top 中看到的 ksoftirqd/0 这种进程。

kernel/softirq.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
static struct smp_hotplug_thread softirq_threads = {
.store = &ksoftirqd,
.thread_should_run = ksoftirqd_should_run,
.thread_fn = run_ksoftirqd,
.thread_comm = "ksoftirqd/%u",
};

static __init int spawn_ksoftirqd(void)
{
register_cpu_notifier(&cpu_nfb);

BUG_ON(smpboot_register_percpu_thread(&softirq_threads));

return 0;
}
early_initcall(spawn_ksoftirqd);

可以看到 ksoftirqd 进程执行的 routine 是 __do_softirq()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
static void run_ksoftirqd(unsigned int cpu)
{
local_irq_disable();
if (local_softirq_pending()) {
/*
* We can safely run softirq on inline stack, as we are not deep
* in the task stack here.
*/
__do_softirq();
local_irq_enable();
cond_resched();
return;
}
local_irq_enable();
}

__do_softirq() 的逻辑如下:

  • 看当前那个 softirq 处在 pending 状态
  • softirq 的时间被统计
  • softirq 的执行统计数据递增
  • 执行 pending 状态的 softirq 的 handler

SoftIRQ 有以下几种类型:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
enum
{
HI_SOFTIRQ=0,
TIMER_SOFTIRQ,
NET_TX_SOFTIRQ,
NET_RX_SOFTIRQ,
BLOCK_SOFTIRQ,
BLOCK_IOPOLL_SOFTIRQ,
TASKLET_SOFTIRQ,
SCHED_SOFTIRQ,
HRTIMER_SOFTIRQ,
RCU_SOFTIRQ, /* Preferable RCU should always be the last softirq */

NR_SOFTIRQS
};

内核初始化期间,softirq_init会注册TASKLET_SOFTIRQ以及HI_SOFTIRQ相关联的处理函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void __init void __init softirq_init(void)
{
int cpu;

for_each_possible_cpu(cpu) {
per_cpu(tasklet_vec, cpu).tail =
&per_cpu(tasklet_vec, cpu).head;
per_cpu(tasklet_hi_vec, cpu).tail =
&per_cpu(tasklet_hi_vec, cpu).head;
}

open_softirq(TASKLET_SOFTIRQ, tasklet_action);
open_softirq(HI_SOFTIRQ, tasklet_hi_action);
}

网络子系统分两种 soft IRQ。NET_TX_SOFTIRQNET_RX_SOFTIRQ,分别处理发送数据包和接收数据包。这两个soft IRQ在net_dev_init函数(net/core/dev.c)中注册,收发数据包的软中断处理函数被注册为net_rx_actionnet_tx_action

1
2
open_softirq(NET_TX_SOFTIRQ, net_tx_action);
open_softirq(NET_RX_SOFTIRQ, net_rx_action);

其中open_softirq实现为:

1
2
3
4
void open_softirq(int nr, void (*action)(struct softirq_action *))
{
softirq_vec[nr].action = action;
}

softnet_data

每个 CPU 都有一个队列,用于存储 incoming frame,这个数据结构即是 softnet_data。因为这是 per-cpu 的数据结构,每个 CPU 都有自己的对立,所以不同的 CPU 之间不需要锁来控制并发处理这个数据结构。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/*
* Incoming packets are placed on per-cpu queues
*/
struct softnet_data {
struct sk_buff_head input_pkt_queue; // 接收,non-NAPI NIC,将skb放到这个队列中,等待软中断处理
struct napi_struct backlog; // 接收,用来兼容 non-NAPI 的驱动
struct Qdisc *output_queue; // 发送,输出帧的控制
struct Qdisc **output_queue_tailp;//
struct list_head poll_list; // 链表挂载了所有在软中断中需要进一步处理的napi设备的napi_struct结构,同时非napi设备共用的backlog成员在需要处理数据包时也将挂载于此
struct sk_buff *completion_queue; // 发送,表示已经成功发送出的帧链表
struct sk_buff_head process_queue; // 非napi设备在软中断中处理input_pkt_queue中数据包时,先先将skb链表转移挂载到这里。目的是尽快释放input_pkt_queue上的锁,这样同cpu上的ISR及其他cpu上的rps逻辑可以使用input_pkt_queue

/* stats */
unsigned int dropped;
unsigned int processed;
unsigned int time_squeeze;
unsigned int cpu_collision;
unsigned int received_rps;

/* ... */
};

softnet_data 是在start_kernel 中创建的,每个cpu一个 softnet_data 变量:

  • 当收到数据包时,网络设备驱动会把自己的napi_struct挂到 CPU 私有变量softnet_data->poll_list
  • 在软中断时,net_rx_action会遍历cpu私有变量的softnet_data->poll_list,执行上面所挂的napi_struct结构的poll钩子函数,将数据包从驱动传到网络协议栈

内核初始化流程如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
start_kernel()
--> rest_init()
--> do_basic_setup()
--> do_initcall
-->net_dev_init

static int __init net_dev_init(void) {
dev_proc_init();
netdev_kobject_init();
register_pernet_subsys(&netdev_net_ops);

for_each_possible_cpu(i) {
struct softnet_data *sd = &per_cpu(softnet_data, i);
skb_queue_head_init(&sd->input_pkt_queue);
skb_queue_head_init(&sd->process_queue);
INIT_LIST_HEAD(&sd->poll_list);
sd->output_queue_tailp = &sd->output_queue;
init_gro_hash(&sd->backlog);
sd->backlog.poll = process_backlog;
sd->backlog.weight = weight_p;
}
open_softirq(NET_TX_SOFTIRQ, net_tx_action);
open_softirq(NET_RX_SOFTIRQ, net_rx_action);

register_pernet_device(&loopback_net_ops);
register_pernet_device(&default_device_ops);
}

总结

  1. 数据包进入物理网卡。如果目的地址不是该网络设备,且该来网络设备没有开启 混杂模式,该包会被网络设备丢弃。
  2. 物理网卡将数据包通过DMA的方式写入到指定的内存地址,该地址由网卡驱动分配并初始化。
  3. 物理网卡通过硬件中断(IRQ)通知CPU,有新的数据包到达物理网卡需要处理。
  4. CPU根据中断表,调用已经注册的中断函数,这个中断函数会调到驱动程序(NIC Driver)中相应的函数
  5. 驱动先禁用网卡的中断,表示驱动程序已经知道内存中有数据了,告诉物理网卡下次再收到数据包直接写内存就可以了,不要再通知CPU了,这样可以提高效率,避免CPU不停的被中断。
  6. 启动软中断继续处理数据包。这样的原因是硬中断处理程序执行的过程中不能被中断,所以如果它执行时间过长,会导致CPU没法响应其它硬件的中断,于是内核引入软中断,这样可以将硬中断处理函数中耗时的部分移到软中断处理函数里面来慢慢处理。

链路层

按照驱动是 NAPI 还是非 NAPI,有不同的调用路径,ksoftirqd 执行软中断函数net_rx_action()

  • NAPI(以e1000网卡为例):net_rx_action() -> e1000_clean() -> e1000_clean_rx_irq() -> e1000_receive_skb() -> netif_receive_skb()
  • 非NAPI(以dm9000网卡为例):net_rx_action() -> process_backlog() -> netif_receive_skb()

在初始化时,默认是非napi的模式,poll函数默认是: process_backlog,如下:

1
2
3
4
net_dev_init
for_each_possible_cpu(i) {
sd->backlog.poll = process_backlog;
}

主流网卡都已经支持 NAPI ,所以我们主要看 NAPI 的流程:

Interrupt Handler

当数据包到达时,NIC 产生中断,驱动 interrupt handler 用于处理中断。可以看到,interrupt handler 十分简单:

  • 执行 igb_write_itr 用于 更新硬件寄存器
  • 执行 napi_schedule 用于唤醒 NAPI processing loop

注意,这里的 NAPI processing loop 不是在 Interrupt handler 中执行的,而是在 softirq 中执行的,interrupt handler 只是触发了他的执行。

1
2
3
4
5
6
7
8
9
10
11
static irqreturn_t igb_msix_ring(int irq, void *data)
{
struct igb_q_vector *q_vector = data;

/* Write the ITR value calculated from the previous interrupt. */
igb_write_itr(q_vector);

napi_schedule(&q_vector->napi);

return IRQ_HANDLED;
}

我们具体看看 napi_schedule 做了什么,他实际是 __napi_schedule 的简单封装,这段代码首先获取了当前 CPU 上注册的 softnet_data,然后执行了 ____napi_schedule

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* __napi_schedule - schedule for receive
* @n: entry to schedule
*
* The entry's receive function will be scheduled to run
*/
void __napi_schedule(struct napi_struct *n)
{
unsigned long flags;

local_irq_save(flags);
____napi_schedule(&__get_cpu_var(softnet_data), n);
local_irq_restore(flags);
}
EXPORT_SYMBOL(__napi_schedule);

____napi_schedule 执行逻辑包括:

  1. 将当前这个设备驱动的 napi->poll_list 加到当前 CPU 关联的 softnet_data 的 poll_list 中
  2. 调用 __raise_softirq_irqoff 来触发 NET_RX_SOFTIRQ 软中断,如果当前 net_rx_action没有执行,将会触发其执行
1
2
3
4
5
6
7
/* Called with irq disabled */
static inline void ____napi_schedule(struct softnet_data *sd,
struct napi_struct *napi)
{
list_add_tail(&napi->poll_list, &sd->poll_list);
__raise_softirq_irqoff(NET_RX_SOFTIRQ);
}

net_rx_action

net_rx_action 是网络接收底半部的入口,它的执行逻辑为:

  • 遍历当前 CPU 的 NAPI list,执行 NAPI poll 函数

  • 当 budget 不够或者超时,则本次 net_rx_action 退出,这样可以保证包处理不会占用整个CPU

  • budget 被定义为一个 NAPI polling cycle 中允许处理的最大 packet number,参考 netdev_budget 解释

  • netdev_budget_usecs 则为一个 NAPI polling cycle 中允许处理的最长时间 microseconds,设定为 2 jiffies

    jiffies 记录了经过多少 tick,tick 代表的时间由 CONFIG_HZ 定义,CONFIG_HZ=200,则一个jiffies对应5ms时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
int netdev_budget __read_mostly = 300;
unsigned int __read_mostly netdev_budget_usecs = 2000;

static __latent_entropy void net_rx_action(struct softirq_action *h)
{
struct softnet_data *sd = this_cpu_ptr(&softnet_data);
unsigned long time_limit = jiffies + usecs_to_jiffies(netdev_budget_usecs);
int budget = netdev_budget;
LIST_HEAD(list);
LIST_HEAD(repoll);

local_irq_disable();
list_splice_init(&sd->poll_list, &list);
local_irq_enable();

for (;;) {
struct napi_struct *n;

if (list_empty(&list)) {
if (!sd_has_rps_ipi_waiting(sd) && list_empty(&repoll))
goto out;
break;
}

n = list_first_entry(&list, struct napi_struct, poll_list);
budget -= napi_poll(n, &repoll);

/* If softirq window is exhausted then punt.
* Allow this to run for 2 jiffies since which will allow
* an average latency of 1.5/HZ.
*/
if (unlikely(budget <= 0 || time_after_eq(jiffies, time_limit))) {
sd->time_squeeze++;
break;
}
}

local_irq_disable();

list_splice_tail_init(&sd->poll_list, &list);
list_splice_tail(&repoll, &list);
list_splice(&list, &sd->poll_list);
if (!list_empty(&sd->poll_list))
__raise_softirq_irqoff(NET_RX_SOFTIRQ);

net_rps_action_and_irq_enable(sd);
out:
__kfree_skb_flush();
}

这里 napi_poll 返回的是处理的 packet number,这里我们可以看到 NAPI weight 的作用了。

1
2
/* initialize NAPI */
netif_napi_add(adapter->netdev, &q_vector->napi, igb_poll, 64);

当我们把 weight 设置为 64,budget 设置为 300,此次 napi polling cycle 将会停止

  1. ixgbe_poll 调用了最多5次
  2. 时间超过 2 jiffies
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
static int napi_poll(struct napi_struct *n, struct list_head *repoll)
{
int work, weight;
list_del_init(&n->poll_list);

weight = n->weight;

/* This NAPI_STATE_SCHED test is for avoiding a race
* with netpoll's poll_napi(). Only the entity which
* obtains the lock and sees NAPI_STATE_SCHED set will
* actually make the ->poll() call. Therefore we avoid
* accidentally calling ->poll() when NAPI is not scheduled.
*/
work = 0;
if (test_bit(NAPI_STATE_SCHED, &n->state)) {
work = n->poll(n, weight);
trace_napi_poll(n, work, weight);
}


/* Drivers must not modify the NAPI state if they
* consume the entire weight. In such cases this code
* still "owns" the NAPI instance and therefore can
* move the instance around on the list at-will.
*/
if (unlikely(napi_disable_pending(n))) {
napi_complete(n);
goto out_unlock;
}

if (n->gro_bitmask) {
/* flush too old packets
* If HZ < 1000, flush all packets.
*/
napi_gro_flush(n, HZ >= 1000);
}
list_add_tail(&n->poll_list, repoll);

out_unlock:
netpoll_poll_unlock(have);

return work;
}

这里 NAPI 和 驱动定义的规则是:

  1. 如果 driver 的 poll function,比如这里的 ixgbe_poll 消耗了整个 weight,那么不能改变 NAPI 的状态,将当前 napi 结构体移到 poll list 的末尾,接下来将会执行下一个 net_rx_action loop
  2. 如果 driver 的 poll function,比如这里的 ixgbe_poll 没有消耗完整个 weight,那么driver 必须 disable NAPI (调用 napi_complete)。当下一次数据包到来时,driver 的 IRQ 调用 napi_schedule 时重新 enable NAPI。

NAPI Poll

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57

/**
* ixgbe_poll - NAPI Rx polling callback
* @napi: structure for representing this polling device
* @budget: how many packets driver is allowed to clean
*
* This function is used for legacy and MSI, NAPI mode
**/
int ixgbe_poll(struct napi_struct *napi, int budget)
{
struct ixgbe_q_vector *q_vector =
container_of(napi, struct ixgbe_q_vector, napi);
struct ixgbe_adapter *adapter = q_vector->adapter;
struct ixgbe_ring *ring;
int per_ring_budget, work_done = 0;
bool clean_complete = true;

ixgbe_for_each_ring(ring, q_vector->tx) {
if (!ixgbe_clean_tx_irq(q_vector, ring, budget))
clean_complete = false;
}

/* Exit if we are called by netpoll */
if (budget <= 0)
return budget;

/* attempt to distribute budget to each queue fairly, but don't allow
* the budget to go below 1 because we'll exit polling */
if (q_vector->rx.count > 1)
per_ring_budget = max(budget/q_vector->rx.count, 1);
else
per_ring_budget = budget;

ixgbe_for_each_ring(ring, q_vector->rx) {
int cleaned = ixgbe_clean_rx_irq(q_vector, ring,
per_ring_budget);

work_done += cleaned;
if (cleaned >= per_ring_budget)
clean_complete = false;
}

/* If all work not completed, return budget and keep polling */
if (!clean_complete)
return budget;

/* all work done, exit the polling mode */
if (likely(napi_complete_done(napi, work_done))) {
if (adapter->rx_itr_setting & 1)
ixgbe_set_itr(q_vector);
if (!test_bit(__IXGBE_DOWN, &adapter->state))
ixgbe_irq_enable_queues(adapter,
BIT_ULL(q_vector->v_idx));
}

return min(work_done, budget - 1);
}

可以看到,这里面最关键的函数是 ixgbe_clean_rx_irq,其他则按照驱动和NAPI定义的规则执行。

Rx Ring Buffer

ixgbe_clean_rx_irq

1
2
3
4
5
static void ixgbe_rx_skb(struct ixgbe_q_vector *q_vector,
struct sk_buff *skb)
{
napi_gro_receive(&q_vector->napi, skb);
}

napi_gro_receive

如果开启了 GRO,napi_gro_receive 将负责处理网络数据,并将数据送到协议栈,大部分相关的逻辑在函数 dev_gro_receive 里实现。

1
2
3
4
5
6
7
8
9
10
gro_result_t napi_gro_receive(struct napi_struct *napi, struct sk_buff *skb)
{
skb_mark_napi_id(skb, napi);
trace_napi_gro_receive_entry(skb);

skb_gro_reset_offset(skb);

return napi_skb_finish(dev_gro_receive(napi, skb), skb);
}
EXPORT_SYMBOL(napi_gro_receive);

dev_gro_receive这个函数首先检查 GRO 是否开启了,如果是,就准备做 GRO。GRO 首先遍历一个 offload filter 列表,如果高层协议认为其中一些数据属于 GRO 处理的范围,就会允许其对数据进行操作。

协议层以此方式让网络设备层知道,这个 packet 是不是当前正在处理的一个需要做 GRO 的 network flow 的一部分,而且也可以通过这种方式传递一些协议相关的信息。例如,TCP 协议需要判断是否应该将一个 ACK 包合并到其他包里。

net/core/dev.c
1
2
3
4
5
6
7
8
9
10
11
12
13
list_for_each_entry_rcu(ptype, head, list) {
if (ptype->type != type || !ptype->callbacks.gro_receive)
continue;

skb_set_network_header(skb, skb_gro_offset(skb));
skb_reset_mac_len(skb);
NAPI_GRO_CB(skb)->same_flow = 0;
NAPI_GRO_CB(skb)->flush = 0;
NAPI_GRO_CB(skb)->free = 0;

pp = ptype->callbacks.gro_receive(&napi->gro_list, skb);
break;
}

如果协议层提示是时候 flush GRO packet 了,那就到下一步处理了。这发生在 napi_gro_complete,会进一步调用相应协议的 gro_complete 回调方法,然后调用 netif_receive_skb 将包送到协议栈。

1
2
3
4
5
6
7
8
if (pp) {
struct sk_buff *nskb = *pp;

*pp = nskb->next;
nskb->next = NULL;
napi_gro_complete(nskb);
napi->gro_count--;
}

接下来,如果协议层将这个包合并到一个已经存在的 flow,napi_gro_receive 就没什么事情需要做,因此就返回了。如果 packet 没有被合并,而且 GRO 的数量小于 MAX_GRO_SKBS( 默认是 8),就会创建一个新的 entry 加到本 CPU 的 NAPI 变量的 gro_list

1
2
3
4
5
6
7
8
9
10
if (NAPI_GRO_CB(skb)->flush || napi->gro_count >= MAX_GRO_SKBS)
goto normal;

napi->gro_count++;
NAPI_GRO_CB(skb)->count = 1;
NAPI_GRO_CB(skb)->age = jiffies;
skb_shinfo(skb)->gso_size = skb_gro_len(skb);
skb->next = napi->gro_list;
napi->gro_list = skb;
ret = GRO_HELD;

napi_skb_finish

一旦 dev_gro_receive 完成,napi_skb_finish 就会被调用,其如果一个 packet 被合并了 ,就释放不用的变量;或者调用 netif_receive_skb 将数据发送到网络协议栈。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
static gro_result_t napi_skb_finish(gro_result_t ret, struct sk_buff *skb)
{
switch (ret) {
case GRO_NORMAL:
if (netif_receive_skb_internal(skb))
ret = GRO_DROP;
break;

case GRO_DROP:
kfree_skb(skb);
break;

case GRO_MERGED_FREE:
if (NAPI_GRO_CB(skb)->free == NAPI_GRO_FREE_STOLEN_HEAD)
napi_skb_free_stolen_head(skb);
else
__kfree_skb(skb);
break;

case GRO_HELD:
case GRO_MERGED:
case GRO_CONSUMED:
break;
}

return ret;
}

netif_receive_skb

netif_receive_skb 及其衍生函数仍然处于 softirq 的循环中,所以这段时间仍然会算在 sitime 中

每个 NAPI 变量都会运行在相应 CPU 的软中断的上下文中。而且,触发硬中断的这个 CPU 接下来会负责执行相应的软中断处理函数来收包。换言之,同一个 CPU 既处理硬中断,又处理相应的软中断。

一些网卡(例如 Intel I350)在硬件层支持多队列。这意味着收进来的包会被通过 DMA 放到位于不同内存的队列上,而不同的队列有相应的 NAPI 变量管理软中断 poll()过程。因此, 多个 CPU 同时处理从网卡来的中断,处理收包过程。这个特性被称作 RSS(Receive Side Scaling,接收端扩展)。

RPS (Receive Packet Steering,接收包控制,接收包引导)是 RSS 的一种软件实现。因为是软件实现的,意味着任何网卡都可以使用这个功能,即便是那些只有一个接收队列的网卡。但是,因为它是软件实现的,这意味着 RPS 只能在 packet 通过 DMA 进入内存后,RPS 才能开始工作。

这意味着,RPS 并不会减少 CPU 处理硬件中断和 NAPI poll(软中断最重要的一部分)的时间,但是可以在 packet 到达内存后,将 packet 分到其他 CPU,从其他 CPU 进入协议栈。

Without RPS

如果 RPS 没启用,会调用__netif_receive_skb,它做一些 bookkeeping 工作,进而调用 __netif_receive_skb_core,将数据移动到离协议栈更近一步。

With RPS enabled

如果 RPS 启用了,它会做一些计算,判断使用哪个 CPU 的 backlog queue,这个过程由 get_rps_cpu 函数完成。

1
2
3
4
5
6
7
cpu = get_rps_cpu(skb->dev, skb, &rflow);

if (cpu >= 0) {
ret = enqueue_to_backlog(skb, cpu, &rflow->last_qtail);
rcu_read_unlock();
return ret;
}

get_rps_cpu 会考虑 RFS 和 aRFS 设置,以此选出一个合适的 CPU,通过调用 enqueue_to_backlog 将数据放到它的 backlog queue

假如你的网卡支持 aRFS,你可以开启它并做如下配置:

  • 打开并配置 RPS
  • 打开并配置 RFS
  • 内核中编译期间指定了 CONFIG_RFS_ACCEL 选项。Ubuntu kernel 3.13.0 是有的
  • 打开网卡的 ntuple 支持。可以用 ethtool 查看当前的 ntuple 设置
  • 配置 IRQ(硬中断)中每个 RX 和 CPU 的对应关系

以上配置完成后,aRFS 就会自动将 RX queue 数据移动到指定 CPU 的内存,每个 flow 的包都会到达同一个 CPU,不需要你再通过 ntuple 手动指定每个 flow 的配置了。

enqueue_to_backlog

首先从远端 CPU 的 struct softnet_data 变量获取 backlog queue 长度。如果 backlog 大于 netdev_max_backlog,或者超过了 flow limit,直接 drop,并更新 softnet_data 的 drop 统计。注意这是远端 CPU 的统计。

1
2
qlen = skb_queue_len(&sd->input_pkt_queue);
if (qlen <= netdev_max_backlog && !skb_flow_limit(skb, qlen)) {

enqueue_to_backlog 被调用的地方很少。在基于 RPS 处理包的地方,以及 netif_rx,会调用到它。大部分驱动都不应该使用 netif_rx,而应该是用 netif_receive_skb。如果你没用到 RPS,你的驱动也没有使用 netif_rx,那增大 backlog 并不会带来益处,因为它根本没被用到。

注意:检查驱动,如果它调用了 netif_receive_skb,而且没用 RPS,那增大 netdev_max_backlog 并不会带来任何性能提升,因为没有数据包会被送到 input_pkt_queue

如果 input_pkt_queue 足够小,而 flow limit 也还没达到(或者被禁掉了 ),那数据包将会被放到队列。这里的逻辑有点 funny,但大致可以归为为:

  • 如果 backlog 是空的:如果远端 CPU NAPI 变量没有运行,并且 IPI 没有被加到队列,那就 触发一个 IPI 加到队列,然后调用____napi_schedule 进一步处理。
  • 如果 backlog 非空,或者远端 CPU NAPI 变量正在运行,那就 enqueue 包 这里使用了 goto,所以代码看起来有点 tricky。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
  if (skb_queue_len(&sd->input_pkt_queue)) {
enqueue:
__skb_queue_tail(&sd->input_pkt_queue, skb);
input_queue_tail_incr_save(sd, qtail);
rps_unlock(sd);
local_irq_restore(flags);
return NET_RX_SUCCESS;
}

/* Schedule NAPI for backlog device
* We can use non atomic operation since we own the queue lock
*/
if (!__test_and_set_bit(NAPI_STATE_SCHED, &sd->backlog.state)) {
if (!rps_ipi_queued(sd))
____napi_schedule(sd, &sd->backlog);
}
goto enqueue;
Flow limits

RPS 在不同 CPU 之间分发 packet,但是,如果一个 flow 特别大,会出现单个 CPU 被打爆,而其他 CPU 无事可做(饥饿)的状态。因此引入了 flow limit 特性,放到一个 backlog 队列的属 于同一个 flow 的包的数量不能超过一个阈值。这可以保证即使有一个很大的 flow 在大量收包 ,小 flow 也能得到及时的处理。

1
if (qlen <= netdev_max_backlog && !skb_flow_limit(skb, qlen)) {

默认,flow limit 功能是关掉的。要打开 flow limit,需要指定一个 bitmap(类似于 RPS 的 bitmap)。

监控:由于 input_pkt_queue 打满或 flow limit 导致的丢包,在/proc/net/softnet_stat 里面的 dropped 列计数。

调优 Tuning: Adjusting netdev_max_backlog to prevent drops 在调整这个值之前,请先阅读前面的“注意”。

如果使用了 RPS,或者驱动调用了 netif_rx,那增加 netdev_max_backlog 可以改善在 enqueue_to_backlog 里的丢包:

例如:

increase backlog to 3000 with sysctl.

1
$ sudo sysctl -w net.core.netdev_max_backlog=3000

默认值是 1000。

backlog 处理逻辑和设备驱动的 poll 函数类似,都是在软中断(softirq)的上下文中执行,因此受整体 budget 和处理时间的限制。

Tuning: Enabling flow limits and tuning flow limit hash table size

1
$ sudo sysctl -w net.core.flow_limit_table_len=8192

默认值是 4096

这只会影响新分配的 flow hash table。所以,如果你想增加 table size 的话,应该在打开 flow limit 功能之前设置这个值。

打开 flow limit 功能的方式是,在/proc/sys/net/core/flow_limit_cpu_bitmap 中指定一 个 bitmask,和通过 bitmask 打开 RPS 的操作类似。

处理 backlog 队列:NAPI poller

每个 CPU 都有一个 backlog queue,其加入到 NAPI 变量的方式和驱动差不多,都是注册一个 poll 方法,在软中断的上下文中处理包。此外,还提供了一个 weight,这也和驱动类似 。注册发生在网络系统初始化的时候, net/core/dev.c的 net_dev_init 函数:

1
2
3
4
sd->backlog.poll = process_backlog;
sd->backlog.weight = weight_p;
sd->backlog.gro_list = NULL;
sd->backlog.gro_count = 0;

backlog NAPI 变量和设备驱动 NAPI 变量的不同之处在于,它的 weight 是可以调节的,而设备驱动是 hardcode 64。

process_backlog

process_backlog 是一个循环,它会一直运行直至 weight用完,或者 backlog 里没有数据了。

backlog queue 里的数据取出来,传递给__netif_receive_skb。这个函数做的事情和 RPS 关闭的情况下做的事情一样。即,__netif_receive_skb 做一些 bookkeeping 工作,然后调用__netif_receive_skb_core 将数据发送给更上面的协议层。

process_backlog 和 NAPI 之间遵循的合约,和驱动和 NAPI 之间的合约相同:

NAPI is disabled if the total weight will not be used. The poller is restarted with the call to ____napi_schedule from enqueue_to_backlog as described above.

函数返回接收完成的数据帧数量(在代码中是变量 work),net_rx_action将会从 budget(通过 net.core.netdev_budget 可以调整)里减去这个值。

__netif_receive_skb_core

__netif_receive_skb_core 完成将数据送到协议栈这一繁重工作(the heavy lifting of delivering the data)。在此之前,它会先检查是否插入了 packet tap(探测点),这些 tap 是抓包用的。例如,AF_PACKET 地址族就可以插入这些抓包指令, 一般通过 libpcap 库。

处理 tap

如果存在抓包点(tap),数据就会先到抓包点,然后才到协议层。

如果有 packet tap(通常通过 libpcap),packet 会送到那里。

1
2
3
4
5
6
7
list_for_each_entry_rcu(ptype, &ptype_all, list) {
if (!ptype->dev || ptype->dev == skb->dev) {
if (pt_prev)
ret = deliver_skb(skb, pt_prev, orig_dev);
pt_prev = ptype;
}
}

packet 如何经过 pcap 可以阅读 net/packet/af_packet.c。

送到网络层

处理完 taps 之后,__netif_receive_skb_core 将数据发送到协议层。它会从数据包中取出协议信息,然后遍历注册在这个协议上的回调函数列表。可以看__netif_receive_skb_core 函数,net/core/dev.c:

1
2
3
4
5
6
7
8
9
10
11
type = skb->protocol;
list_for_each_entry_rcu(ptype,
&ptype_base[ntohs(type) & PTYPE_HASH_MASK], list) {
if (ptype->type == type &&
(ptype->dev == null_or_dev || ptype->dev == skb->dev ||
ptype->dev == orig_dev)) {
if (pt_prev)
ret = deliver_skb(skb, pt_prev, orig_dev);
pt_prev = ptype;
}
}

总结

network-receive-data-2.jpg

  1. 对于第6步中驱动发出的软中断,内核中的ksoftirqd进程会调用网络模块的相应软中断所对应的处理函数,这里其实就是调用net_rx_action函数。
  2. 接下来net_rx_action调用网卡驱动里的poll函数来一个个地处理数据包。
  3. poll函数会让驱动会读取网卡写到内存中的数据包。事实上,内存中数据包的格式只有驱动知道。
  4. 驱动程序将内存中的数据包转换成内核网络模块能识别的skb(socket buffer)格式,然后调用napi_gro_receive函数
  5. napi_gro_receive会处理 GRO 相关的内容,也就是将可以合并的数据包进行合并,这样就只需要调用一次协议栈。然后判断是否开启了RPS,如果开启了,将会调用enqueue_to_backlog
  6. enqueue_to_backlog函数会将数据包放入input_pkt_queue结构体中,然后返回。 > Note: 如果input_pkt_queue满了的话,该数据包将会被丢弃,这个queue的大小可以通过net.core.netdev_max_backlog来配置
  7. 接下来CPU会在软中断上下文中处理自己input_pkt_queue里的网络数据(调用__netif_receive_skb_core函数)
  8. 如果没开启 RPSnapi_gro_receive会直接调用__netif_receive_skb_core函数。
  9. 紧接着CPU会根据是不是有AF_PACKET类型的socket(原始套接字),如果有的话,拷贝一份数据给它(tcpdump抓包就是抓的这里的包)。
  10. 将数据包交给内核协议栈处理。
  11. 当内存中的所有数据包被处理完成后(poll函数执行完成),重新启用网卡的硬中断,这样下次网卡再收到数据的时候就会通知CPU。

网络层

链路层与网络层接口

上面的 ptype_base 是一个 hash table,定义在net/core/dev.c中:

1
struct list_head ptype_base[PTYPE_HASH_SIZE] __read_mostly;

每种协议在上面的 hash table 的一个 slot 里,添加一个过滤器到列表里。这个列表的头用如下函数获取:

1
2
3
4
5
6
7
static inline struct list_head *ptype_head(const struct packet_type *pt)
{
if (pt->type == htons(ETH_P_ALL))
return &ptype_all;
else
return &ptype_base[ntohs(pt->type) & PTYPE_HASH_MASK];
}

添加的时候用 dev_add_pack 这个函数。这就是协议层如何注册自身,用于处理相应协议的网络数据的。

packet_type 定义如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
struct packet_type {
__be16 type; /* This is really htons(ether_type). */
struct net_device *dev; /* NULL is wildcarded here */
int (*func) (struct sk_buff *,
struct net_device *,
struct packet_type *,
struct net_device *);
void (*list_func) (struct list_head *,
struct packet_type *,
struct net_device *);
bool (*id_match)(struct packet_type *ptype,
struct sock *sk);
void *af_packet_priv;
struct list_head list;
};

deliver_skb 直接调用 packet_type 的 func 函数,对于 IP 层就是 ip_rcv

1
2
3
4
5
6
7
8
9
static inline int deliver_skb(struct sk_buff *skb,
struct packet_type *pt_prev,
struct net_device *orig_dev)
{
if (unlikely(skb_orphan_frags_rx(skb, GFP_ATOMIC)))
return -ENOMEM;
refcount_inc(&skb->users);
return pt_prev->func(skb, skb->dev, pt_prev, orig_dev);
}

协议层注册

net/ipv4/af_inet.c
1
2
3
4
5
6
7
static struct packet_type ip_packet_type __read_mostly = {
.type = cpu_to_be16(ETH_P_IP),
.func = ip_rcv,
.list_func = ip_list_rcv,
};

dev_add_pack(&ip_packet_type);

如前所述,链路层通过 ip_rcv 将数据包 skb 从数据链路层传输到网络层,接下来我们将以 ip_rcv 为入口看网络层如何实现。

ip_rcv

可以看到,ip_rcv 函数非常简洁,它在 ip_rcv_core 做了一些校验工作之后,通过 netfilter 框架之后,将数据包传给 ip_rcv_finish

1
2
3
4
5
6
7
8
9
10
11
12
int ip_rcv(struct sk_buff *skb, struct net_device *dev, struct packet_type *pt,
struct net_device *orig_dev)
{
struct net *net = dev_net(dev);

skb = ip_rcv_core(skb, net);
if (skb == NULL)
return NET_RX_DROP;
return NF_HOOK(NFPROTO_IPV4, NF_INET_PRE_ROUTING,
net, NULL, skb, dev, NULL,
ip_rcv_finish);
}

我们看看 ip_rcv_core 的实现,主要对 IP 报文做校验:

  1. Internet Header Length (IHL) 必须要 > 5,IHL 表示有 IP 包 Header 多少个32bit words
  2. IP version 必须是 4
  3. checksum 校验正确
  4. 包长度合理 skb-> len 必须要大于或等于 iph->total length
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
static struct sk_buff *ip_rcv_core(struct sk_buff *skb, struct net *net)
{
const struct iphdr *iph;
u32 len;

skb = skb_share_check(skb, GFP_ATOMIC);

if (!pskb_may_pull(skb, sizeof(struct iphdr)))
goto inhdr_error;

iph = ip_hdr(skb);

if (iph->ihl < 5 || iph->version != 4)
goto inhdr_error;

if (!pskb_may_pull(skb, iph->ihl*4))
goto inhdr_error;

iph = ip_hdr(skb);

if (unlikely(ip_fast_csum((u8 *)iph, iph->ihl)))
goto csum_error;

len = ntohs(iph->tot_len);
if (skb->len < len) {
goto drop;
} else if (len < (iph->ihl*4))
goto inhdr_error;

/* Our transport medium may have padded the buffer out. Now we know it
* is IP we can trim to the true length of the frame.
* Note this now means skb->len holds ntohs(iph->tot_len).
*/
if (pskb_trim_rcsum(skb, len)) {
goto drop;
}

/* 更新 transport_header 地址 */
skb->transport_header = skb->network_header + iph->ihl*4;

/* Remove any debris in the socket control block */
memset(IPCB(skb), 0, sizeof(struct inet_skb_parm));
IPCB(skb)->iif = skb->skb_iif;

/* Must drop socket now because of tproxy. */
skb_orphan(skb);

return skb;

csum_error:
inhdr_error:
drop:
kfree_skb(skb);
out:
return NULL;
}

执行完 ip_rcv_core 之后,可以看到执行了 NF_HOOK,经过 Netfilter 之后执行 ip_rcv_finish

1
2
3
4
5
6
7
8
9
10
static inline int
NF_HOOK(uint8_t pf, unsigned int hook, struct net *net, struct sock *sk, struct sk_buff *skb,
struct net_device *in, struct net_device *out,
int (*okfn)(struct net *, struct sock *, struct sk_buff *))
{
int ret = nf_hook(pf, hook, net, sk, skb, in, out, okfn);
if (ret == 1)
ret = okfn(net, sk, skb);
return ret;
}

ip_rcv_finish

ip_route_input

dst_input

1
2
3
4
5
/* Input packet from network to transport.  */
static inline int dst_input(struct sk_buff *skb)
{
return skb_dst(skb)->input(skb);
}

ip_forward

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
static int ip_forward_finish(struct net *net, struct sock *sk, struct sk_buff *skb)
{
struct ip_options *opt = &(IPCB(skb)->opt);

__IP_INC_STATS(net, IPSTATS_MIB_OUTFORWDATAGRAMS);
__IP_ADD_STATS(net, IPSTATS_MIB_OUTOCTETS, skb->len);

if (unlikely(opt->optlen))
ip_forward_options(skb);

return dst_output(net, sk, skb);
}

int ip_forward(struct sk_buff *skb)
{
u32 mtu;
struct iphdr *iph; /* Our header */
struct rtable *rt; /* Route we use */
struct ip_options *opt = &(IPCB(skb)->opt);
struct net *net;

/* that should never happen */
if (skb->pkt_type != PACKET_HOST)
goto drop;

if (unlikely(skb->sk))
goto drop;

if (skb_warn_if_lro(skb))
goto drop;

if (!xfrm4_policy_check(NULL, XFRM_POLICY_FWD, skb))
goto drop;

if (IPCB(skb)->opt.router_alert && ip_call_ra_chain(skb))
return NET_RX_SUCCESS;

skb_forward_csum(skb);
net = dev_net(skb->dev);

/*
* According to the RFC, we must first decrease the TTL field. If
* that reaches zero, we must reply an ICMP control message telling
* that the packet's lifetime expired.
*/
if (ip_hdr(skb)->ttl <= 1)
goto too_many_hops;

if (!xfrm4_route_forward(skb))
goto drop;

rt = skb_rtable(skb);

if (opt->is_strictroute && rt->rt_uses_gateway)
goto sr_failed;

IPCB(skb)->flags |= IPSKB_FORWARDED;
mtu = ip_dst_mtu_maybe_forward(&rt->dst, true);
if (ip_exceeds_mtu(skb, mtu)) {
IP_INC_STATS(net, IPSTATS_MIB_FRAGFAILS);
icmp_send(skb, ICMP_DEST_UNREACH, ICMP_FRAG_NEEDED,
htonl(mtu));
goto drop;
}

/* We are about to mangle packet. Copy it! */
if (skb_cow(skb, LL_RESERVED_SPACE(rt->dst.dev)+rt->dst.header_len))
goto drop;
iph = ip_hdr(skb);

/* Decrease ttl after skb cow done */
ip_decrease_ttl(iph);

/*
* We now generate an ICMP HOST REDIRECT giving the route
* we calculated.
*/
if (IPCB(skb)->flags & IPSKB_DOREDIRECT && !opt->srr &&
!skb_sec_path(skb))
ip_rt_send_redirect(skb);

if (net->ipv4.sysctl_ip_fwd_update_priority)
skb->priority = rt_tos2priority(iph->tos);

return NF_HOOK(NFPROTO_IPV4, NF_INET_FORWARD,
net, NULL, skb, skb->dev, rt->dst.dev,
ip_forward_finish);

sr_failed:
/*
* Strict routing permits no gatewaying
*/
icmp_send(skb, ICMP_DEST_UNREACH, ICMP_SR_FAILED, 0);
goto drop;

too_many_hops:
/* Tell the sender its packet died... */
__IP_INC_STATS(net, IPSTATS_MIB_INHDRERRORS);
icmp_send(skb, ICMP_TIME_EXCEEDED, ICMP_EXC_TTL, 0);
drop:
kfree_skb(skb);
return NET_RX_DROP;
}

ip_local_deliver

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/*
* Deliver IP Packets to the higher protocol layers.
*/
int ip_local_deliver(struct sk_buff *skb)
{
/*
* Reassemble IP fragments.
*/
struct net *net = dev_net(skb->dev);

if (ip_is_fragment(ip_hdr(skb))) {
if (ip_defrag(net, skb, IP_DEFRAG_LOCAL_DELIVER))
return 0;
}

return NF_HOOK(NFPROTO_IPV4, NF_INET_LOCAL_IN,
net, NULL, skb, skb->dev, NULL,
ip_local_deliver_finish);
}

ip_local_deliver_finish

这里通过 import->handler 来转交到网络层,如果是UDP,那么就是 udp_rcv

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
static int ip_local_deliver_finish(struct net *net, struct sock *sk, struct sk_buff *skb)
{
__skb_pull(skb, skb_network_header_len(skb));

rcu_read_lock();
{
int protocol = ip_hdr(skb)->protocol;
const struct net_protocol *ipprot;
int raw;

resubmit:
raw = raw_local_deliver(skb, protocol);

ipprot = rcu_dereference(inet_protos[protocol]);
if (ipprot) {
int ret;

if (!ipprot->no_policy) {
if (!xfrm4_policy_check(NULL, XFRM_POLICY_IN, skb)) {
kfree_skb(skb);
goto out;
}
nf_reset(skb);
}
ret = ipprot->handler(skb);
if (ret < 0) {
protocol = -ret;
goto resubmit;
}
__IP_INC_STATS(net, IPSTATS_MIB_INDELIVERS);
} else {
if (!raw) {
if (xfrm4_policy_check(NULL, XFRM_POLICY_IN, skb)) {
__IP_INC_STATS(net, IPSTATS_MIB_INUNKNOWNPROTOS);
icmp_send(skb, ICMP_DEST_UNREACH,
ICMP_PROT_UNREACH, 0);
}
kfree_skb(skb);
} else {
__IP_INC_STATS(net, IPSTATS_MIB_INDELIVERS);
consume_skb(skb);
}
}
}
out:
rcu_read_unlock();

return 0;
}

总结

network-receive-data-3.jpg

  • ip_rcv: ip_rcv函数是IP网络层处理模块的入口函数,该函数首先判断属否需要丢弃该数据包(目的mac地址不是当前网卡,并且网卡设置了混杂模式),如果需要进一步处理就然后调用注册在netfilter中的NF_INET_PRE_ROUTING这条链上的处理函数。
  • NF_INET_PRE_ROUTING: netfilter放在协议栈中的钩子函数,可以通过iptables来注入一些数据包处理函数,用来修改或者丢弃数据包,如果数据包没被丢弃,将继续往下走。 > NF_INET_PRE_ROUTING等netfilter链上的处理逻辑可以通iptables来设置,详情请移步: https://morven.life/notes/the_knowledge_of_iptables/
  • routing: 进行路由处理,如果是目的IP不是本地IP,且没有开启ip forward功能,那么数据包将被丢弃,如果开启了ip forward功能,那将进入ip_forward函数。
  • ip_forward: 该函数会先调用netfilter注册的NF_INET_FORWARD链上的相关函数,如果数据包没有被丢弃,那么将继续往后调用dst_output_sk函数。
  • dst_output_sk: 该函数会调用IP网络层的相应函数将该数据包发送出去,这一步将会在下一章节发送数据包中详细介绍。
  • ip_local_deliver: 如果上面路由处理发现发现目的IP是本地IP,那么将会调用ip_local_deliver函数,该函数先调用NF_INET_LOCAL_IN链上的相关函数,如果通过,数据包将会向下发送到UDP层。

传输层

网络层与传输层接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/* This is used to register protocols. */
struct net_protocol {
int (*early_demux)(struct sk_buff *skb);
int (*early_demux_handler)(struct sk_buff *skb);
int (*handler)(struct sk_buff *skb);
void (*err_handler)(struct sk_buff *skb, u32 info);
unsigned int no_policy:1,
netns_ok:1,
/* does the protocol do more stringent
* icmp tag validation than simple
* socket lookup?
*/
icmp_strict_tag_validation:1;
};

static const struct net_protocol tcp_protocol = {
.early_demux = tcp_v4_early_demux,
.handler = tcp_v4_rcv,
.err_handler = tcp_v4_err,
.no_policy = 1,
.netns_ok = 1,
};

static const struct net_protocol udp_protocol = {
.early_demux = udp_v4_early_demux,
.handler = udp_rcv,
.err_handler = udp_err,
.no_policy = 1,
.netns_ok = 1,
};

static const struct net_protocol icmp_protocol = {
.handler = icmp_rcv,
.err_handler = icmp_err,
.no_policy = 1,
.netns_ok = 1,
};
1
2
3
4
5
6
7
8
9
10
/*
* Add all the base protocols.
*/

if (inet_add_protocol(&icmp_protocol, IPPROTO_ICMP) < 0)
pr_crit("%s: Cannot add ICMP protocol\n", __func__);
if (inet_add_protocol(&udp_protocol, IPPROTO_UDP) < 0)
pr_crit("%s: Cannot add UDP protocol\n", __func__);
if (inet_add_protocol(&tcp_protocol, IPPROTO_TCP) < 0)
pr_crit("%s: Cannot add TCP protocol\n", __func__);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#define MAX_INET_PROTOS		256
struct net_protocol __rcu *inet_protos[MAX_INET_PROTOS] __read_mostly;

int inet_add_protocol(const struct net_protocol *prot, unsigned char protocol)
{
if (!prot->netns_ok) {
pr_err("Protocol %u is not namespace aware, cannot register.\n",
protocol);
return -EINVAL;
}

return !cmpxchg((const struct net_protocol **)&inet_protos[protocol],
NULL, prot) ? 0 : -1;
}
EXPORT_SYMBOL(inet_add_protocol);

udp_rcv

__udp4_lib_rcv

udp_queue_rcv_skb

Sk_rcvqueues_full

总结

network-receive-data-4.jpg

  • udp_rcv: 该函数是UDP处理层模块的入口函数,它首先调用__udp4_lib_lookup_skb函数,根据目的IP和端口找对应的socket,如果没有找到相应的socket,那么该数据包将会被丢弃,否则继续。
  • sock_queue_rcv_skb: 该函数一是负责检查这个socket的receive buffer是不是满了,如果满了的话就丢弃该数据包;二是调用sk_filter看这个包是否是满足条件的包,如果当前socket上设置了filter,且该包不满足条件的话,这个数据包也将被丢弃。
  • __skb_queue_tail: 该函数将数据包放入socket接收队列的末尾。
  • sk_data_ready: 通知socket数据包已经准备好。
  • 调用完sk_data_ready之后,一个数据包处理完成,等待应用层程序来读取。

Note: 上面所述的所有执行过程都在软中断的上下文中执行。

数据包的发送过程

传输层

总结

network-send-data-2.jpg

  • udp_sendmsg: 该函数是UDP传输层模块发送数据包的入口。该函数中先调用ip_route_output_flow获取路由信息(主要包括源IP和网卡),然后调用ip_make_skb构造skb结构体,最后将网卡的信息和该skb关联。
  • ip_route_output_flow: 该函数主要处理路由信息,它会根据路由表和目的IP,找到这个数据包应该从哪个设备发送出去,如果该socket没有绑定源IP,该函数还会根据路由表找到一个最合适的源IP给它。 如果该socket已经绑定了源IP,但根据路由表,从这个源IP对应的网卡没法到达目的地址,则该包会被丢弃,于是数据发送失败将返回错误。该函数最后会将找到的设备和源IP塞进flowi4结构体并返回给udp_sendmsg
  • ip_make_skb: 该函数的功能是构造skb包,构造好的skb包里面已经分配了IP包头(包括源IP信息),同时该函数会调用__ip_append_dat,如果需要分片的话,会在__ip_append_data函数中进行分片,同时还会在该函数中检查socket的send buffer是否已经用光,如果被用光的话,返回ENOBUFS。
  • udp_send_skb(skb, fl4): 该函数主要是往skb里面填充UDP的包头,同时处理checksum,然后交给IP网络层层的相应函数。

网络层

总结

network-send-data-3.jpg

  • ip_send_skb: IP网络层模块发送数据包的入口,该函数主要是调用后面的一些列函数。
  • __ip_local_out_sk: 用来设置IP报文头的长度和checksum,然后调用下面netfilter的钩子链NF_INET_LOCAL_OUT
  • NF_INET_LOCAL_OUT: netfilter的钩子函数,可以通过iptables来配置处理函数链;如果该数据包没被丢弃,则继续往下走。
  • dst_output_sk: 该函数根据skb里面的信息,调用相应的output函数ip_output
  • ip_output: 将上一层udp_sendmsg得到的网卡信息写入skb,然后调用 NF_INET_POST_ROUTING的钩子链。
  • NF_INET_POST_ROUTING: 在这一步主要在配置了SNAT,从而导致该skb的路由信息发生变化。
  • ip_finish_output: 这里会判断经过了上一步后,路由信息是否发生变化,如果发生变化的话,需要重新调用dst_output_sk(重新调用这个函数时,可能就不会再走到ip_output,而是走到被netfilter指定的output函数里,这里有可能是xfrm4_transport_output),否则接着往下走。
  • ip_finish_output2: 根据目的IP到路由表里面找到下一跳(nexthop)的地址,然后调用__ipv4_neigh_lookup_noref去arp表里面找下一跳的neigh信息,没找到的话会调用__neigh_create构造一个空的neigh结构体。
  • dst_neigh_output: 该函数调用neigh_resolve_output获取neigh信息,并将neigh信息里面的mac地址填到skb中,然后调用dev_queue_xmit发送数据包。
  • neigh_resolve_output: 该函数里面会发送arp请求,得到下一跳的mac地址,然后将mac地址填到skb中并调用dev_queue_xmit

链路层

总结

network-send-data-4.jpg

  • dev_queue_xmit: 内核模块开始处理发送数据包的入口函数,该函数会先获取设备对应的qdisc,如果没有的话(如loopback或者IP tunnels),就直接调用dev_hard_start_xmit,否则数据包将经过traffic control模块进行处理。
  • traffic control:该模块主要对数据包进行过滤和排序,如果队列满了的话,数据包会被丢掉,详情请参考: http://tldp.org/HOWTO/Traffic-Control-HOWTO/intro.html
  • dev_hard_start_xmit: 该函数先拷贝一份skb给“packet taps”(tcpdump的数据就从来自于此),然后调用ndo_start_xmit函数。如果dev_hard_start_xmit返回错误的话,调用它的函数会把skb放到一个地方,然后抛出软中断NET_TX_SOFTIRQ,然后交给软中断处理程序net_tx_action稍后重试。
  • ndo_start_xmit:该函数绑定到具体驱动发送数据的处理函数。

设备驱动层

Note: ndo_start_xmit会指向具体网卡驱动的发送数据包的函数,这一步之后,数据包发送任务就交给网络设备驱动了,不同的网络设备驱动有不同的处理方式,但是大致流程基本一致:

  1. 将skb放入网卡自己的发送队列
  2. 通知网卡发送数据包
  3. 网卡发送完成后发送中断给CPU
  4. 收到中断后进行skb的清理工作

参考资料