0%

网络性能调优

RSS

当前多数据网卡支持多个接收和发送队列(multi-queue),在接收方,NIC 可以将不同的 packet 分发给不同的 CPU。NIC 通过一个 filter 将每个 packet 分到不同的 flows 中,每个 flow 的 packet 都被分到同一个接收队列中,而每个接收队列可以由一个独立的 CPU 来处理。这个技术即是 Receive-Side Scaling, RSS

RSS 实现

这里的 filter 一般是一个 hash 函数,它以网络数据包的头文件为key,比如说以 IP 地址和 TCP 端口的 4 元组为 key 进行 hash。RSS 最常见的硬件实现是一个 128-entry 的 indirection table,每个 entry 存储了一个 queue number。一个 packet 所属的接收队列是由 hash (通常是 Toeplitz hash) 计算出来的低 7bit 作为 key,从 indirection table 中拿到 queue number。有一些更高级的网卡支持 programmable filter,比如对 80 端口的 webserver 映射到固定的接收队列。这种 n-tuple 可以通过 ethtool 的 --config-ntuple 配置。

参考 Intel 82599 的datasheet,看看它是如何实现RSS的,如下图:

  1. Parsed receive packet 解析数据包,获取五元组等信息
  2. RSS hash 根据五元组的某些信息计算hash值
  3. Packet Descriptor 将hash值保存到接收描述符中,最终会保存到skb->hash中,后续可以直接使用hash值,比如RPS查找cpu时使用这个hash值
  4. 7 LS bits 使用hash值低7位索引redirection table的一项,每项包含四位(所以最多支持16个队列)
  5. RSS output index table的指定项就是接收队列。

redirection table 中每一项中的队列id是如何设置的呢?在驱动初始化时,根据使能的队列个数,依次填充到每一项,达到队列最大值后,从0开始循环填充。比如使能了4个队列,则 table 的0-127项依次为:0,1,2,3,0,1,2,3 …

看下 ixgbe 中使用到的和 redirection table 相关的寄存器,使用 32个 IXGBE_RETA 寄存器,每个寄存器的 3:0, 11:8,19:1627:24 分别表示一个 table 的entry,而且是4位,所以使能 RSS 时最多支持16个队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

static void ixgbe_setup_reta(struct ixgbe_adapter *adapter)
{
u32 i, j;
u32 reta_entries = ixgbe_rss_indir_tbl_entries(adapter);
u16 rss_i = adapter->ring_feature[RING_F_RSS].indices; // rss_i是使能的队列个数

/* Program table for at least 4 queues w/ SR-IOV so that VFs can
* make full use of any rings they may have. We will use the
* PSRTYPE register to control how many rings we use within the PF.
*/
if ((adapter->flags & IXGBE_FLAG_SRIOV_ENABLED) && (rss_i < 4))
rss_i = 4;

/* Fill out hash function seeds */
ixgbe_store_key(adapter);

/* Fill out redirection table */
memset(adapter->rss_indir_tbl, 0, sizeof(adapter->rss_indir_tbl));

for (i = 0, j = 0; i < reta_entries; i++, j++) {
if (j == rss_i) // j表示队列id,达到最大值 rss_i 后,从0开始
j = 0;

adapter->rss_indir_tbl[i] = j;
}

ixgbe_store_reta(adapter);
}

设置多队列 IRQ 绑核

每一个接收队列都有自己的 IRQ number,NIC 通过 IRQ 通知 CPU 数据包到来,对于 PCIe 类型设备使用 MSI-X 类型中断。我们可以通过配置 /proc/irq/IRQ_NUMBER/smp_affinity 来配置 IRQ 与 CPU 的 affinity,具体可以参考 SMP IRQ affinity

对称多处理器(symmetric multiprocessing)是通过多个处理器处理程序的方式。smp_affinity文件处理一个IRQ号的中断亲和性。在smp_affinity文件结合每个IRQ号存储在/proc/irq/IRQ_NUMBER/smp_affinity文件。这个文件中的值是一个16进制位掩码表示系统的所有CPU核。

1
2
$ cat /proc/irq/18/smp_affinity
ff

smp_affinity是16进制表示,f 就是二进制的1111 ,表示4个cpu都会参与处理中断,这里ff表示有8个cpu核心同时处理中断。

这个中断分布的cpu核也可以从 smp_affinity_list 看到(是数字表示)

1
2
$ cat /proc/irq/18/smp_affinity_list
0-7

如果我想将 IRQ 18 的 SMP Affinity 设置为 5 号 CPU,则可以操作如下,因为十六进制 20 的二进制对应着 00100000

1
$ echo "20" > /proc/irq/18/smp_affinity_list

每个IRQ的默认的smp affinity在这里:cat /proc/irq/default_smp_affinity

查看网卡对应的 IRQ 号

一般情况下,我们可以通过 /proc/interrupts 查看 IRQ 和与网卡的对应关系,比如这里的 eth0 对应着 IRQ Number 为 90,并且在 CPU0 上产生了 1070374 次中断。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# cat /proc/interrupts 
CPU0 CPU1
0: 918926335 0 IO-APIC-edge timer
1: 2 0 IO-APIC-edge i8042
8: 0 0 IO-APIC-edge rtc
9: 0 0 IO-APIC-level acpi
12: 4 0 IO-APIC-edge i8042
14: 8248017 0 IO-APIC-edge ide0
50: 194 0 IO-APIC-level ohci_hcd:usb2
58: 31673 0 IO-APIC-level sata_nv
90: 1070374 0 PCI-MSI eth0
233: 10 0 IO-APIC-level ehci_hcd:usb1
NMI: 5077 2032
LOC: 918809969 918809894
ERR: 0
MIS: 0

当 CPU 数目很多的时候,而且有的时候一张网卡有多个网卡队列,直接查看 /proc/interrupts 很不直观,可以通过其他方法查看。

对于 virtio interface,我们可以查看网卡对应的 PCI 接口下的 msi_irqs 得到本网卡对应的 IRQ Numbers。

1
2
3
4
5
$ readlink -e /sys/class/net/eth1
/sys/devices/pci0000:00/0000:00:03.0/virtio0/net/eth1

$ ls $(readlink -e /sys/class/net/eth1)/../../../msi_irqs
26 27 28

对于容器场景,比如 TKE 的独立网卡网络方案,如果 Pod 将网卡绑定到它的 netns 中,在主 netns 中看不到对应的网卡,则需要进入到 Pod netns

1
2
3
$ nsenter -t 944372 -n -m
$ ls $(readlink -e /sys/class/net/eth0)/../../../msi_irqs
291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323

可以看到,这里主要是通过$(readlink -e /sys/class/net/eth0) 查找 PCI 设备 /sys/devices/pci<domain>:<bus>/ 下的msi_irqsorirq` 文件,更多可以参考 这里

为了使你配置的 irq smp affinity 生效,要注意关掉节点上的 irqbalance 进程。irqbalance 会自动将 IRQs 平衡到各个 CPU,可能会覆盖你的 smp_affinity 配置。

适用场景

一般来说,我们会对多队列网卡进行一对一 CPU 中断绑核,在 这里 可以看到相关脚本。

注意,我们一般不会前几号CPU用作网络收包,因为他们一般都会有一些定时扫描,任务平衡等任务

如果将接收队列中断绑定到这些核上面去,可能会导致 ping flood 抖动延时等问题。

默认情况下,使用 /proc/irq/default_smp_affinity 设置到全 F 可能就会选中到前几个核

建议设置 RSS 的时候,可以将不同网卡队列的 IRQ 均分到不同 CPU,实现每个 CPU 处理各自的硬中断, 这样每个 CPU 的负载不会过大。如果想要查看每个 CPU 的负载,可以通过 mpstat 工具查看,具体参考 Shell笔记

RPS

Receive Packet Steering, RPS 是 RSS 的软件实现。因为是软件实现,所以 RPS 在 data path 的较后面实现,而 RSS 是直接在中断前就通过硬件分发给不同的网卡队列。RPS 相对于 RSS 由以下特点:

  • RPS 可以被用在任何 NIC 上,不依赖于 NIC 的硬件能力
  • software filter 可以很容易的加入来对新的协议进行 hash,而 RSS 需要 NIC 硬件实现 filter
  • RPS 不会增大硬件的 interrupt rate,虽然它确实会引入 IPIs,Inter-Processor Interrupts

RPS 实现

RPS 在网络中断底半部被调用,在 netif_rx_internal (传统中断模式)或者 netif_receive_skb_internal (NAPI模式下),如果使能了RPS,则调用 get_rps_cpu 选择合适的cpu,将 skb 放入此 CPU 的 backlog 队列中,然后 waking up the CPU for processing。这样就可以让多个 CPU 来处理协议栈的工作,避免一个 CPU 负载过大。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//设备接收队列,和 RPS 相关成员是 rps_map
/* This structure contains an instance of an RX queue. */
struct netdev_rx_queue {
#ifdef CONFIG_RPS
struct rps_map __rcu *rps_map;
struct rps_dev_flow_table __rcu *rps_flow_table;
#endif
struct kobject kobj;
struct net_device *dev;
} ____cacheline_aligned_in_smp;

//存放配置RPS的值,假如/sys/class/net/(dev)/queues/rx-(n)/rps_cpus=f,则
//len=4, cpus指向额外分配的内存数组,每个元素保存一个cpu值
struct rps_map {
unsigned int len;
struct rcu_head rcu;
u16 cpus[0];
};

设置 /sys/class/net/<dev>/queues/rx-<n>/rps_cpus 时:

1
2
3
4
5
6
7
8
9
10
11
store_rps_map
for_each_cpu_and(cpu, mask, cpu_online_mask)
map->cpus[i++] = cpu;
if (i)
map->len = i;
//将map赋值到queue->rps_map中,在get_rps_cpu中会使用到
rcu_assign_pointer(queue->rps_map, map);

if (map)
//使能RPS
static_key_slow_inc(&rps_needed);

RPS 选择 CPU 的第一步是计算 flow 的 hash。这个hash 作为一致性hash,可以直接使用 hardware 算出来并保存在 skb 的 hash,一般也是 RSS 使用的 hash (即 computed Toeplitz hash)。如果没有硬件算出来的 hash 的话,可以使用软件计算 hash。

1
2
3
4
5
6
7
static inline __u32 skb_get_hash(struct sk_buff *skb)
{
if (!skb->l4_hash && !skb->sw_hash)
__skb_get_hash(skb);

return skb->hash;
}

At the end of the bottom half routine, IPIs are sent to any CPUs for which
packets have been queued to their backlog queue. The IPI wakes backlog
processing on the remote CPU, and any queued packets are then processed
up the networking stack.

get_rps_cpu 函数中也涉及到了 RFS 的流程,这里先忽略 RFS 流程,只关注RPS相关的。因为 RPS 设置的是某个队列对应的 CPU 列表,所以需要先获取队列id,再获取此队列对应的 CPU 列表。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
/*
* get_rps_cpu is called from netif_receive_skb and returns the target
* CPU from the RPS map of the receiving queue for a given skb.
* rcu_read_lock must be held on entry.
*/
static int get_rps_cpu(struct net_device *dev, struct sk_buff *skb,
struct rps_dev_flow **rflowp)
{
struct netdev_rx_queue *rxqueue;
struct rps_map *map;
struct rps_dev_flow_table *flow_table;
struct rps_sock_flow_table *sock_flow_table;
int cpu = -1;
u16 tcpu;
u32 hash;

//skb->queue_mapping+1 记录了skb从哪个队列接收上来
if (skb_rx_queue_recorded(skb)) {
//获取接收skb队列index
u16 index = skb_get_rx_queue(skb);
if (unlikely(index >= dev->real_num_rx_queues)) {
WARN_ONCE(dev->real_num_rx_queues > 1,
"%s received packet on queue %u, but number "
"of RX queues is %u\n",
dev->name, index, dev->real_num_rx_queues);
goto done;
}
//根据index,获取rxqueue
rxqueue = dev->_rx + index;
} else
//这里应该是queue 0吧
rxqueue = dev->_rx;

map = rcu_dereference(rxqueue->rps_map);
if (map) {
//如果rps_map只配置了一个CPU,并且没有配置rps_flow_table,
//并且rps_sock_flow_entries 配置的这个cpu在线,则直接使用这个cpu。
//如果这个cpu不在线,则返回-1.
if (map->len == 1 &&
!rcu_access_pointer(rxqueue->rps_flow_table)) {
tcpu = map->cpus[0];
if (cpu_online(tcpu))
cpu = tcpu;
goto done;
}
//没有配置 rps_map,也没有配置 rps_flow_table
} else if (!rcu_access_pointer(rxqueue->rps_flow_table)) {
goto done;
}

skb_reset_network_header(skb);
//根据skb获取hash值。如果在RSS模式下,可以直接使用网
//卡计算的hash值,否则需要根据数据包信息计算一个
hash = skb_get_hash(skb);
if (!hash)
goto done;
//flow_table 和 sock_flow_table 是RFS的流程,暂时忽略
flow_table = rcu_dereference(rxqueue->rps_flow_table);
sock_flow_table = rcu_dereference(rps_sock_flow_table);
if (flow_table && sock_flow_table) {
/* ... */
}
//如果没有在设备流表rps_flow_table和全局流表
//rps_sock_flow_table中找到目标cpu,则使用hash在
//rps_map中找一个cpu即可。
if (map) {
tcpu = map->cpus[reciprocal_scale(hash, map->len)];
if (cpu_online(tcpu)) {
cpu = tcpu;
goto done;
}
}

done:
return cpu;
}

RPS 配置

RPS 要求 kernel 开启了 CONFIG_RPS 的选项,这对于 SMP 系统是默认的。为了打开 RPS 的能力,需要通过 sysfs 配置

1
/sys/class/net/<dev>/queues/rx-<n>/rps_cpus

对于单队列网卡,RPS 将会设置 rps_cpus 到接收中断的 CPU 的同一个 memory domain 中,这里的 memory domain 说的是一个 CPU 集合

a memory domain is a set of CPUs that share a particular memory level (L1, L2, NUMA node, etc.)

如果 NUMA 局部性不是问题,那么就会 rps_cpus 配置的就是所有的 CPU。当收发包速率较高时,一般会把接收中断的 CPU 从这个 rps_cpus 的 map 中去掉,因为它已经在处理很多的任务。

对于多队列网卡,如果已经配置了 RSS,那么 RPS 的配置可能是冗余和没必要的。如果 接收队列的数目比 CPU 的数目少,那么当 CPU 的处理能力不够时,可以通过 RPS 将不同的接收队列分配到各自的 memory domain 上。

RPS Flow Limit

RPS 在不同 CPU 之间分发 packet,但是,如果一个 flow 特别大,会出现单个 CPU 被打爆,而其他 CPU 无事可做(饥饿)的状态。因此引入了 flow limit 特性,放到一个 backlog 队列的属于同一个 flow 的包的数量不能超过一个阈值。这可以保证即使有一个很大的 flow 在大量收包 ,小 flow 也能得到及时的处理。

1
if (qlen <= netdev_max_backlog && !skb_flow_limit(skb, qlen)) {
1
2
3
4
5
6
7
8
9
10
Once a CPU's input packet
queue exceeds half the maximum queue length (as set by sysctl
net.core.netdev_max_backlog), the kernel starts a per-flow packet
count over the last 256 packets. If a flow exceeds a set ratio (by
default, half) of these packets when a new packet arrives, then the
new packet is dropped. Packets from other flows are still only
dropped once the input packet queue reaches netdev_max_backlog.
No packets are dropped when the input packet queue length is below
the threshold, so flow limit does not sever connections outright:
even large flows maintain connectivity.

默认,flow limit 功能是关掉的。要打开 flow limit,需要指定一个 bitmap(类似于 RPS 的 bitmap)。

1
/proc/sys/net/core/flow_limit_cpu_bitmap

监控:由于 input_pkt_queue 打满或 flow limit 导致的丢包,在/proc/net/softnet_stat 里面的 dropped 列计数。

如果使用了 RPS,或者驱动调用了 netif_rx,那增加 netdev_max_backlog 可以改善在 enqueue_to_backlog 里的丢包:

例如:

increase backlog to 3000 with sysctl.

1
$ sudo sysctl -w net.core.netdev_max_backlog=3000

默认值是 1000。

backlog 处理逻辑和设备驱动的 poll 函数类似,都是在软中断(softirq)的上下文中执行,因此受整体 budget 和处理时间的限制。

Tuning: Enabling flow limits and tuning flow limit hash table size

1
$ sudo sysctl -w net.core.flow_limit_table_len=8192

默认值是 4096

这只会影响新分配的 flow hash table。所以,如果你想增加 table size 的话,应该在打开 flow limit 功能之前设置这个值。打开 flow limit 功能的方式是,在/proc/sys/net/core/flow_limit_cpu_bitmap 中指定一 个 bitmask,和通过 bitmask 打开 RPS 的操作类似。

RFS

从 RPS 选择 CPU 方法可知,就是使用 skb 的 hash 随机选择一个 CPU,没有考虑到应用层运行在哪个 CPU 上,如果执行软中断的 CPU 和运行应用层的 CPU 不是同一个 CPU ,势必会降低 CPU Cache 命中率,降低性能。一般来说,高性能场景下都会为应用设置 CPU Affinity,将应用和 CPU绑核。

为了解决这个问题,RFS 通过指派应用程序所在的 CPU 来在内核态处理报文,以此来增加 CPU 的缓存命中率。RFS 主要是通过两个流表来实现的:

  • 设备流表,记录的是上次在内核态处理该流中报文的 CPU
  • 全局的socket流表,记录的是流中的报文渴望被处理的目标 CPU

原理是将运行应用的 CPU 保存到一个表中,在 get_rps_cpu 时,从这个表中获取 CPU,即可保证执行软中断的 CPU 和运行应用层的 CPU 是同一个 CPU。

全局 socket 流表

全局socket流表 rps_sock_flow_table 的定义如下:

1
2
3
4
5
6
7
8
/*
* The rps_sock_flow_table contains mappings of flows to the last CPU
* on which they were processed by the application (set in recvmsg).
*/
struct rps_sock_flow_table {
unsigned int mask;
u16 ents[0]; // 弹性数组,key 是 flow hash,value 是流渴望被处理的CPU,也就是应用所在的CPU
};

mask成员存放的就是ents这个柔性数组的大小,该值也是通过配置文件的方式指定的,相关的配置文件为 /proc/sys/net/core/rps_sock_flow_entries,可以通过 sysctl 修改 net.core.rps_sock_flow_entries 配置:

1
$ sudo sysctl -w net.core.rps_sock_flow_entries=32768
1
2
3
4
5
6
7
8
// 设置 /proc/sys/net/core/rps_sock_flow_entries 时,调用如下函数
// 设置这个表其实就是分配内存数组,并将所有ents初始化为 `RPS_NO_CPU`。
rps_sock_flow_sysctl
for (i = 0; i < size; i++)
sock_table->ents[i] = RPS_NO_CPU;

if (sock_table)
static_key_slow_inc(&rps_needed);

rps_sock_flow_table 是一个全局的数据流表,这个表中包含了数据流渴望被处理的CPU。这个 CPU 是当前处理流中报文的应用程序所在的CPU。全局socket流表会在调 recvmsgsendmsg (特别是 inet_accept(), inet_recvmsg(), inet_sendmsg(), inet_sendpage() and tcp_splice_read()),被设置或者更新。

全局socket流表会在调用 recvmsg()等函数时被更新,而在这些函数中是通过调用函数 sock_rps_record_flow() 来更新或者记录流表项信息的,而sock_rps_record_flow() 中最终又是调用函数 rps_record_sock_flow() 来更新 ents 柔性数组的,该函数实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
static inline void rps_record_sock_flow(struct rps_sock_flow_table *table,
u32 hash)
{
if (table && hash) {
unsigned int cpu, index = hash & table->mask;

/* We only give a hint, preemption can change cpu under us */
/*当前CPU*/
cpu = raw_smp_processor_id();
/*ents存放当前cpu*/
if (table->ents[index] != cpu)
table->ents[index] = cpu;
}
}

设备流表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
struct netdev_rx_queue {
struct rps_map __rcu *rps_map;
/*设备流表*/
struct rps_dev_flow_table __rcu *rps_flow_table;
struct kobject kobj;
struct net_device *dev;
} ____cacheline_aligned_in_smp;

struct rps_dev_flow_table {
unsigned int mask;
struct rcu_head rcu;
struct rps_dev_flow flows[0]; //弹性数组
};

struct rps_dev_flow {
u16 cpu; /* 处理该流的cpu */
u16 filter;
unsigned int last_qtail; /* sd->input_pkt_queue队列的尾部索引,即该队列长度 */
};

struct rps_dev_flow 类型弹性数组大小由配置文件 /sys/class/net/(dev)/queues/rx-(n)/rps_flow_cnt 进行指定的。这个表可以记录之前 cpu backlog上数据包何时处理完,等数据包都处理完后就可以将流迁移到新的 CPU 上了,这样就可以避免调度到新的 CPU 时候出现乱序。

1
2
3
4
5
//设置  /sys/class/net/<dev>/queues/rx-<n>/rps_flow_cnt 时
store_rps_dev_flow_table_cnt
table->mask = mask;
for (count = 0; count <= mask; count++)
table->flows[count].cpu = RPS_NO_CPU;

RFS 实现

下面再次分析 get_rps_cpu,看看RFS是如何生效的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
/*
* get_rps_cpu is called from netif_receive_skb and returns the target
* CPU from the RPS map of the receiving queue for a given skb.
* rcu_read_lock must be held on entry.
*/
static int get_rps_cpu(struct net_device *dev, struct sk_buff *skb,
struct rps_dev_flow **rflowp)
{
struct netdev_rx_queue *rxqueue;
struct rps_map *map;
struct rps_dev_flow_table *flow_table;
struct rps_sock_flow_table *sock_flow_table;
int cpu = -1;
u16 tcpu;
u32 hash;

/* ... */

map = rcu_dereference(rxqueue->rps_map);

/* ... */

skb_reset_network_header(skb);
hash = skb_get_hash(skb);
if (!hash)
goto done;

flow_table = rcu_dereference(rxqueue->rps_flow_table);
sock_flow_table = rcu_dereference(rps_sock_flow_table);
if (flow_table && sock_flow_table) {
u16 next_cpu;
struct rps_dev_flow *rflow;
// tcpu记录的是处理数据包的cpu
rflow = &flow_table->flows[hash & flow_table->mask];
tcpu = rflow->cpu;
// next_cpu 记录的是运行 application 的 cpu
next_cpu = sock_flow_table->ents[hash & sock_flow_table->mask];

/*
* If the desired CPU (where last recvmsg was done) is
* different from current CPU (one in the rx-queue flow
* table entry), switch if one of the following holds:
* - Current CPU is unset (equal to RPS_NO_CPU).
* - Current CPU is offline.
* - The current CPU's queue tail has advanced beyond the
* last packet that was enqueued using this table entry.
* This guarantees that all previous packets for the flow
* have been dequeued, thus preserving in order delivery.
*/
if (unlikely(tcpu != next_cpu) &&
(tcpu == RPS_NO_CPU || !cpu_online(tcpu) ||
((int)(per_cpu(softnet_data, tcpu).input_queue_head -
rflow->last_qtail)) >= 0)) {
tcpu = next_cpu;
rflow = set_rps_cpu(dev, skb, rflow, next_cpu);
}

if (tcpu != RPS_NO_CPU && cpu_online(tcpu)) {
*rflowp = rflow;
cpu = tcpu;
goto done;
}
}

// 如果没有在设备流表rps_flow_table和全局流表rps_sock_flow_table中找到目标cpu,
// 则使用hash在rps_map中找一个cpu即可。
if (map) {
tcpu = map->cpus[reciprocal_scale(hash, map->len)];
if (cpu_online(tcpu)) {
cpu = tcpu;
goto done;
}
}

done:
return cpu;
}

更新 rflow->cpunext_cpu,并且记录 next_cpu 队列的 input_queue_headrflow->last_qtail 中,后续数据包入队到 next_cpu 队列上时,rflow->last_qtail 都会加1,通过判断 input_queue_headrflow->last_qtail 来判断 next_cpu 队列是否为空。

1
2
3
4
5
6
7
8
9
10
11
12
static struct rps_dev_flow *
set_rps_cpu(struct net_device *dev, struct sk_buff *skb,
struct rps_dev_flow *rflow, u16 next_cpu)
{
if (next_cpu != RPS_NO_CPU) {
rflow->last_qtail =
per_cpu(softnet_data, next_cpu).input_queue_head;
}

rflow->cpu = next_cpu;
return rflow;
}

Accelebrated RFS

RFS 是将 skb 放在运行应用的 CPU 的 backlog 中处理的,而且我们知道默认情况下哪个 CPU 处理硬件中断,就由哪个 CPU 处理软件中断,即 who trigger, who run,那能不能通过网卡的 fdir 功能(流重定向) 将数据流重定向到运用应用的 CPU 所处理的队列上呢?这就是 Accelerated RFS 的作用。

aRFS 之于 RFS 就像 RSS 之于 RPS,是一种硬件加速的负载均衡机制,直接将 flows 发送给接收 packet 的应用所在的 CPU。具体的实现是,网络协议站会调用驱动中的 ndo_rx_flow_steer 来将 flow 分发到 desired hardware queue。每次在 rps_dev_flow_table 的 flow entry 更新后,网络协议栈会调用 ndo_rx_flow_steer

除了使能RFS的两个表,没其他需要使能的,前提是网卡驱动得支持函数 ndo_rx_flow_steer,不过貌似支持的网卡没几个。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
static struct rps_dev_flow *
set_rps_cpu(struct net_device *dev, struct sk_buff *skb,
struct rps_dev_flow *rflow, u16 next_cpu)
{
if (next_cpu != RPS_NO_CPU) {
#ifdef CONFIG_RFS_ACCEL
struct netdev_rx_queue *rxqueue;
struct rps_dev_flow_table *flow_table;
struct rps_dev_flow *old_rflow;
u32 flow_id;
u16 rxq_index;
int rc;

/* Should we steer this flow to a different hardware queue? */
if (!skb_rx_queue_recorded(skb) || !dev->rx_cpu_rmap ||
!(dev->features & NETIF_F_NTUPLE))
goto out;
rxq_index = cpu_rmap_lookup_index(dev->rx_cpu_rmap, next_cpu);
if (rxq_index == skb_get_rx_queue(skb))
goto out;

rxqueue = dev->_rx + rxq_index;
flow_table = rcu_dereference(rxqueue->rps_flow_table);
if (!flow_table)
goto out;
flow_id = skb_get_hash(skb) & flow_table->mask;
rc = dev->netdev_ops->ndo_rx_flow_steer(dev, skb,
rxq_index, flow_id);
if (rc < 0)
goto out;
old_rflow = rflow;
rflow = &flow_table->flows[flow_id];
rflow->filter = rc;
if (old_rflow->filter == rflow->filter)
old_rflow->filter = RPS_NO_FILTER;
out:
#endif
rflow->last_qtail =
per_cpu(softnet_data, next_cpu).input_queue_head;
}

rflow->cpu = next_cpu;
return rflow;
}

XPS

前面的几种技术都是接收方向的,XPS是针对发送方向的,即从网卡发送出去时,如果有多个发送队列,选择使用哪个队列。

可通过如下命令设置,此命令表示运行在f指定的cpu上的应用调用socket发送的数据会从网卡的tx-n队列发送出去。

1
echo f > /sys/class/net/<dev>/queues/tx-<n>/xps_cpus

虽然设置的是设备的tx queue对应的cpu列表,但是转换到代码中保存的是每个cpu可使用的queue列表。因为查找xps_cpus时,肯定是已知cpu id,寻找从哪个tx queue发送。

选择 tx queue 时,优先选择 xps_cpu 指定的 queue,如果没有指定就使用 skb hash 计算出来一个。当然也不是每个报文都得经过这个过程,只有 socket 的第一个报文需要,选择出 queue 后,将此queue设置到 sk->sk_tx_queue_mapping,后续报文直接获取 sk_tx_queue_mapping 即可。

和XPS相关的几个结构体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/*
* This structure holds an XPS map which can be of variable length. The
* map is an array of queues.
*/
//percpu的结构,queues用来保存此cpu可以使用哪几个queue
struct xps_map {
unsigned int len;
unsigned int alloc_len;
struct rcu_head rcu;
u16 queues[0];
};

/*
* This structure holds all XPS maps for device. Maps are indexed by CPU.
*/
//cpu_map大小为cpu个数
struct xps_dev_maps {
struct rcu_head rcu;
struct xps_map __rcu *cpu_map[0];
};

//perdevice结构,xps_maps 用来保存设备 tx queue 和 cpu 的对应关系
struct net_device {
#ifdef CONFIG_XPS
struct xps_dev_maps __rcu *xps_maps;
#endif

设置 xps_cpus 时调用如下函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
static ssize_t store_xps_map(struct netdev_queue *queue,
struct netdev_queue_attribute *attribute,
const char *buf, size_t len)
{
struct net_device *dev = queue->dev;
unsigned long index;
cpumask_var_t mask;
int err;

if (!capable(CAP_NET_ADMIN))
return -EPERM;

if (!alloc_cpumask_var(&mask, GFP_KERNEL))
return -ENOMEM;
//获取设置的队列索引
index = get_netdev_queue_index(queue);

//获取设置的cpu列表
err = bitmap_parse(buf, len, cpumask_bits(mask), nr_cpumask_bits);
if (err) {
free_cpumask_var(mask);
return err;
}

//更新queue和cpu的映射关系
err = netif_set_xps_queue(dev, mask, index);

free_cpumask_var(mask);

return err ? : len;
}

int netif_set_xps_queue(struct net_device *dev, const struct cpumask *mask,
u16 index)
{
struct xps_dev_maps *dev_maps, *new_dev_maps = NULL;
struct xps_map *map, *new_map;
int maps_sz = max_t(unsigned int, XPS_DEV_MAPS_SIZE, L1_CACHE_BYTES);
int cpu, numa_node_id = -2;
bool active = false;

mutex_lock(&xps_map_mutex);

dev_maps = xmap_dereference(dev->xps_maps);

/* allocate memory for queue storage */
for_each_online_cpu(cpu) {
if (!cpumask_test_cpu(cpu, mask))
continue;

if (!new_dev_maps)
new_dev_maps = kzalloc(maps_sz, GFP_KERNEL);
if (!new_dev_maps) {
mutex_unlock(&xps_map_mutex);
return -ENOMEM;
}

map = dev_maps ? xmap_dereference(dev_maps->cpu_map[cpu]) :
NULL;

map = expand_xps_map(map, cpu, index);
if (!map)
goto error;

RCU_INIT_POINTER(new_dev_maps->cpu_map[cpu], map);
}

if (!new_dev_maps)
goto out_no_new_maps;

for_each_possible_cpu(cpu) {
if (cpumask_test_cpu(cpu, mask) && cpu_online(cpu)) {
/* add queue to CPU maps */
int pos = 0;

map = xmap_dereference(new_dev_maps->cpu_map[cpu]);
while ((pos < map->len) && (map->queues[pos] != index))
pos++;

if (pos == map->len)
map->queues[map->len++] = index;
#ifdef CONFIG_NUMA
if (numa_node_id == -2)
numa_node_id = cpu_to_node(cpu);
else if (numa_node_id != cpu_to_node(cpu))
numa_node_id = -1;
#endif
} else if (dev_maps) {
/* fill in the new device map from the old device map */
map = xmap_dereference(dev_maps->cpu_map[cpu]);
RCU_INIT_POINTER(new_dev_maps->cpu_map[cpu], map);
}

}

rcu_assign_pointer(dev->xps_maps, new_dev_maps);

/* Cleanup old maps */
if (dev_maps) {
for_each_possible_cpu(cpu) {
new_map = xmap_dereference(new_dev_maps->cpu_map[cpu]);
map = xmap_dereference(dev_maps->cpu_map[cpu]);
if (map && map != new_map)
kfree_rcu(map, rcu);
}

kfree_rcu(dev_maps, rcu);
}

dev_maps = new_dev_maps;
active = true;

out_no_new_maps:
/* update Tx queue numa node */
netdev_queue_numa_node_write(netdev_get_tx_queue(dev, index),
(numa_node_id >= 0) ? numa_node_id :
NUMA_NO_NODE);

if (!dev_maps)
goto out_no_maps;

/* removes queue from unused CPUs */
for_each_possible_cpu(cpu) {
if (cpumask_test_cpu(cpu, mask) && cpu_online(cpu))
continue;

if (remove_xps_queue(dev_maps, cpu, index))
active = true;
}

/* free map if not active */
if (!active) {
RCU_INIT_POINTER(dev->xps_maps, NULL);
kfree_rcu(dev_maps, rcu);
}

out_no_maps:
mutex_unlock(&xps_map_mutex);

return 0;
error:
/* remove any maps that we added */
for_each_possible_cpu(cpu) {
new_map = xmap_dereference(new_dev_maps->cpu_map[cpu]);
map = dev_maps ? xmap_dereference(dev_maps->cpu_map[cpu]) :
NULL;
if (new_map && new_map != map)
kfree(new_map);
}

mutex_unlock(&xps_map_mutex);

kfree(new_dev_maps);
return -ENOMEM;
}

设置完xps_cpus后,就可以在发送数据时选择指定的queue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
//__dev_queue_xmit为通用发送函数,其中会调用netdev_pick_tx选择合适的tx queue
static int __dev_queue_xmit(struct sk_buff *skb, void *accel_priv)
txq = netdev_pick_tx(dev, skb, accel_priv);

struct netdev_queue *netdev_pick_tx(struct net_device *dev,
struct sk_buff *skb,
void *accel_priv)
{
int queue_index = 0;
//设备tx queue个数大于1才需要选择
if (dev->real_num_tx_queues != 1) {
const struct net_device_ops *ops = dev->netdev_ops;

//如果网卡驱动提供了ndo_select_queue,则使用
//ndo_select_queue选择queue。没有合适的还得调用
//__netdev_pick_tx。
if (ops->ndo_select_queue)
queue_index = ops->ndo_select_queue(dev, skb, accel_priv,
__netdev_pick_tx);
else
queue_index = __netdev_pick_tx(dev, skb);

if (!accel_priv)
queue_index = netdev_cap_txqueue(dev, queue_index);
}
//将tx queue索引保存到skb->queue_mapping
skb_set_queue_mapping(skb, queue_index);
//根据索引获取指定的netdev_queue
return netdev_get_tx_queue(dev, queue_index);
}

static u16 __netdev_pick_tx(struct net_device *dev, struct sk_buff *skb)
{
struct sock *sk = skb->sk;
//取出sk中保存的tx queue
int queue_index = sk_tx_queue_get(sk);

//在以下三种情况下才会更新tx queue
//a. queue_index小于0,这是初始情况
//b. skb->ooo_okay(ooo全称out of order),一个标志位,只
//有在socket发送完数据后才会设置,表示可以切换queue(假
//如初始设置的queue是1,突然改成3了,为了防
//止报文乱序,要等socket中的数据都发送完成后,设置
//ooo_okay标志后,才可以将 发送queue改成3),只对tcp
//socket有效
//c. queue_index大于设备queue总数
if (queue_index < 0 || skb->ooo_okay ||
queue_index >= dev->real_num_tx_queues) {
//从xps_cpu中选择queue
int new_index = get_xps_queue(dev, skb);
if (new_index < 0)
//如果没有设置,就根据hash和设备queue总数计算出一个
new_index = skb_tx_hash(dev, skb);

if (queue_index != new_index && sk &&
rcu_access_pointer(sk->sk_dst_cache))
//将queue索引更新到 sk->sk_tx_queue_mapping
sk_tx_queue_set(sk, new_index);

queue_index = new_index;
}

return queue_index;
}

根据当前运行的cpu raw_smp_processor_id,来查找tx queue。
如果此cpu只对应一个queue,就使用这个queue,如果设置了多个queue,还得使用skb hash选择一个。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
static inline int get_xps_queue(struct net_device *dev, struct sk_buff *skb)
{
#ifdef CONFIG_XPS
struct xps_dev_maps *dev_maps;
struct xps_map *map;
int queue_index = -1;

rcu_read_lock();
dev_maps = rcu_dereference(dev->xps_maps);
if (dev_maps) {
map = rcu_dereference(
dev_maps->cpu_map[raw_smp_processor_id()]);
if (map) {
if (map->len == 1)
queue_index = map->queues[0];
else
queue_index = map->queues[reciprocal_scale(skb_get_hash(skb),
map->len)];
if (unlikely(queue_index >= dev->real_num_tx_queues))
queue_index = -1;
}
}
rcu_read_unlock();

return queue_index;
#else
return -1;
#endif
}

XPS,全称为Transmit Packet Steering,是软件支持的发包时的多队列,于 kernel 2.6.38 添加此特性。

通常 RPS 和 XPS 同id的队列选择的CPU相同,这也是防止不同CPU切换时性能消耗。

Linux通过配置文件的方式指定哪些cpu核参与到报文的分发处理,配置文件存放的路径是:/sys/class/net/(dev)/queues/tx-(n)/xps_cpus。例如:

1
2
# 1010101
# echo 85 > /sys/class/net/eth0/queues/tx-0/xps_cpus

内核中有关xps最主要的函数就是 get_xps_queue (关于配置如何映射到内核可参考RPS)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
u16 __netdev_pick_tx(struct net_device *dev, struct sk_buff *skb)
{
struct sock *sk = skb->sk;
int queue_index = sk_tx_queue_get(sk);
/*发送队列的index不合法 或者
ooo_okay 不为0时重新获取发送队列
*/
/*ooo是 out of order,
ooo_okay 标志表示流中没有未完成的数据包,所以发送队列可以改变而没有产生乱序数据包的风险。
传输层负责适当地设置ooo_okay。 例如,TCP在连接的所有数据已被确认时设置标志。
*/
if (queue_index < 0 || skb->ooo_okay ||
queue_index >= dev->real_num_tx_queues) {
int new_index = get_xps_queue(dev, skb);
if (new_index < 0)
new_index = skb_tx_hash(dev, skb);

if (queue_index != new_index && sk &&
rcu_access_pointer(sk->sk_dst_cache))
sk_tx_queue_set(sk, new_index);

queue_index = new_index;
}
return queue_index;
}

GRO

Large Receive Offloading (LRO) 是一个硬件优化,Generic Receive Offloading (GRO) 是 LRO 的一种软件实现。

两种方案的主要思想都是:通过合并“足够类似”的包来减少传送给网络栈的包数,这有助于减少 CPU 的使用量。例如,考虑大文件传输的场景,包的数量非常多,大部分包都是一段文件数据。相比于每次都将小包送到网络栈,可以将收到的小包合并成一个很大的包再送到网络栈。GRO 使协议层只需处理一个 header,而将包含大量数据的整个大包送到用户程序。

这类优化方式的缺点是信息丢失:包的 option 或者 flag 信息在合并时会丢失。这也是为什么大部分人不使用或不推荐使用LRO 的原因。

LRO 的实现,一般来说对合并包的规则非常宽松。GRO 是 LRO 的软件实现,但是对于包合并的规则更严苛。如果用 tcpdump 抓包,有时会看到机器收到了看起来不现实的、非常大的包, 这很可能是系统开启了 GRO。接下来会看到,tcpdump 的抓包点(捕获包的 tap )在GRO 之后。

使用 ethtool 的 -k 选项查看 GRO 配置:

1
2
$ ethtool -k eth0 | grep generic-receive-offload
generic-receive-offload: on

-K 修改 GRO 配置:

1
$ sudo ethtool -K ens33 gro on

注意:对于大部分驱动,修改 GRO 配置会涉及先 down 再 up 这个网卡,因此这个网卡上的连接 都会中断。

GSO/TSO

计算机网络上传输的数据基本单位是离散的网包,既然是网包,就有大小限制,这个限制就是 MTU(Maximum Transmission Unit)的大小,(以太网)一般是1500字节(这里的MTU所指的是无需分段的情况下,可以传输的最大IP报文(包含IP头部,但不包含协议栈更下层的头部))。比如我们想发送很多数据出去,经过os协议栈的时候,会自动帮你拆分成几个不超过MTU的网包。然而,这个拆分是比较费计算资源的(比如很多时候还要计算分别的checksum),由 CPU 来做的话,往往会造成使用率过高。

那可不可以把这些简单重复的操作 offload 到网卡上呢?于是就有了 LSO(Large Segment Offload ),在发送数据超过 MTU 限制的时候(太容易发生了),OS 只需要提交一次传输请求给网卡,网卡会自动的把数据拿过来,然后进行切片,并封包发出,发出的网包不超过 MTU 限制。

现在基本上用不到 LSO,已经有更好的替代。

  • TSO (TCP Segmentation Offload): 是一种利用网卡来对大数据包进行自动分段,降低CPU负载的技术。 其主要是延迟分段。
  • GSO (Generic Segmentation Offload): GSO是协议栈是否推迟分段,在发送到网卡之前判断网卡是否支持TSO,如果网卡支持TSO则让网卡分段,否则协议栈分完段再交给驱动。 如果TSO开启,GSO会自动开启。

以下是TSO和GSO的组合关系:

  • GSO开启, TSO开启:协议栈推迟分段,并直接传递大数据包到网卡,让网卡自动分段。
  • GSO开启, TSO关闭:协议栈推迟分段,在最后发送到网卡前才执行分段。
  • GSO关闭, TSO开启:同GSO开启, TSO开启。
  • GSO关闭, TSO关闭:不推迟分段,在tcp_sendmsg中直接发送MSS大小的数据包。

开启GSO/TSO

驱动程序在注册网卡设备的时候默认开启GSO: NETIF_F_GSO

1
2
3
4
5
6
7
8
9
10
11
12
#define NETIF_F_SOFT_FEATURES	(NETIF_F_GSO | NETIF_F_GRO)

int register_netdevice(struct net_device *dev)
{
/* ... */

dev->hw_features |= NETIF_F_SOFT_FEATURES;
dev->features |= NETIF_F_SOFT_FEATURES;
dev->wanted_features = dev->features & dev->hw_features;

/* ... */
}

驱动程序会根据网卡硬件是否支持来设置TSO: NETIF_F_TSO

1
2
3
4
5
6
7
8
// intel e1000 网卡
static int e1000_probe(struct pci_dev *pdev, const struct pci_device_id *ent)
{
/* ... */
if ((hw->mac_type >= e1000_82544) && (hw->mac_type != e1000_82547))
netdev->hw_features |= NETIF_F_TSO;
/* ... */
}

是否推迟分段

GSO/TSO是否开启是保存在 dev->features 中,而设备和路由关联,当我们查询到路由后就可以把配置保存在sock中。

比如在 tcp_v4_connecttcp_v4_syn_recv_sock 都会调用 sk_setup_caps 来设置 GSO/TSO 配置。

需要注意的是,只要开启了GSO,即使硬件不支持TSO,也会设置NETIF_F_TSO,使得sk_can_gso(sk)在GSO开启或者TSO开启的时候都返回true。

1
2
3
4
5
6
7
8
9
10
11
/* This will initiate an outgoing connection. */
int tcp_v4_connect(struct sock *sk, struct sockaddr *uaddr, int addr_len)
{
/* ... */

/* OK, now commit destination to socket. */
sk->sk_gso_type = SKB_GSO_TCPV4;
sk_setup_caps(sk, &rt->dst);

/* ... */
}
sk_setup_caps
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void sk_setup_caps(struct sock *sk, struct dst_entry *dst)
{
__sk_dst_set(sk, dst);
sk->sk_route_caps = dst->dev->features;
if (sk->sk_route_caps & NETIF_F_GSO)
sk->sk_route_caps |= NETIF_F_GSO_SOFTWARE;
sk->sk_route_caps &= ~sk->sk_route_nocaps;
if (sk_can_gso(sk)) {
if (dst->header_len) {
sk->sk_route_caps &= ~NETIF_F_GSO_MASK;
} else {
sk->sk_route_caps |= NETIF_F_SG | NETIF_F_HW_CSUM;
sk->sk_gso_max_size = dst->dev->gso_max_size;
sk->sk_gso_max_segs = dst->dev->gso_max_segs;
}
}
}

从上面可以看出,如果设备开启了GSO,sock都会将TSO标志打开,但是注意这和硬件是否开启TSO无关,硬件的TSO取决于硬件自身特性的支持。

sk_can_gso
1
2
3
4
5
static inline bool sk_can_gso(const struct sock *sk)
{
/*对于tcp,在tcp_v4_connect中被设置:sk->sk_gso_type = SKB_GSO_TCPV4*/
return net_gso_ok(sk->sk_route_caps, sk->sk_gso_type);
}
net_gso_ok
1
2
3
4
5
6
7
static inline bool net_gso_ok(netdev_features_t features, int gso_type)
{
netdev_features_t feature = gso_type << NETIF_F_GSO_SHIFT;

/* ... */
return (features & feature) == feature;
}

由于tcp 在 sk_setup_capssk->sk_route_caps 也被设置有 SKB_GSO_TCPV4,所以整个sk_can_gso成立。

GSO的数据包长度

对紧急数据包或 GSO/TSO 都不开启的情况,才不会推迟发送, 默认使用当前MSS。开启GSO后,tcp_send_mss 返回 mss 和单个 skb 的 GSO 大小,为 mss 的整数倍。

tcp_send_mss
1
2
3
4
5
6
7
8
9
10
11
static int tcp_send_mss(struct sock *sk, int *size_goal, int flags)
{
int mss_now;
/*通过ip option,SACKs及pmtu确定当前的mss*/
mss_now = tcp_current_mss(sk);
/*tcp_xmit_size_goal获取发送数据报到达网络设备时数据段的最大长度,该长度用来分割数据,TCP发送报文时, *每个SKB的大小不能超过该值。
*在此传入是否标识MSG_OOB(out-of-band,比普通数据更高的优先级传送的带外数据)位,这是因为MSG_OOB是判断 *是否支持GSO的条件之一,而紧急数据不支持GSO。
*在不支持GSO的情况下,size_goal就等于mss_now,而如果支持GSO,则size_goal会是MSS的整数倍。数据报发送 *到网络设备后再由网络设备根据MSS进行分割。*/
*size_goal = tcp_xmit_size_goal(sk, mss_now, !(flags & MSG_OOB));
return mss_now;
}
tcp_xmit_size_goal
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
static unsigned int tcp_xmit_size_goal(struct sock *sk, u32 mss_now, int large_allowed)
{
struct tcp_sock *tp = tcp_sk(sk);
u32 xmit_size_goal, old_size_goal;

xmit_size_goal = mss_now;
/*这里large_allowed表示是否是紧急数据;
large_allowed为真表示无带外数据,可以大包发送*/
if (large_allowed && sk_can_gso(sk)) {
u32 gso_size, hlen;

/* Maybe we should/could use sk->sk_prot->max_header here ? */
hlen = inet_csk(sk)->icsk_af_ops->net_header_len +
inet_csk(sk)->icsk_ext_hdr_len +
tp->tcp_header_len;

/* 目标是每ms发送至少一个数据包,而不是每100 ms发送一个大的TSO数据包。
sk_pacing_rate为 每秒的bytes。
这保留了ACK时钟,并且与tcp_tso_should_defer()启发式一致。
sysctl_tcp_min_tso_segs 为 sysctl控制的系统变量。我的系统环境中值为2。
*/
gso_size = sk->sk_pacing_rate / (2 * MSEC_PER_SEC);
gso_size = max_t(u32, gso_size,
sysctl_tcp_min_tso_segs * mss_now);
/*xmit_size_goal为
gso最大分段大小减去tcp和ip头部长度 与
gso_size中比较小的值
*/
xmit_size_goal = min_t(u32, gso_size,
sk->sk_gso_max_size - 1 - hlen);
/*最多达到收到的最大rwnd窗口通告的一半*/
xmit_size_goal = tcp_bound_to_half_wnd(tp, xmit_size_goal);

/* We try hard to avoid divides here */
old_size_goal = tp->xmit_size_goal_segs * mss_now;

if (likely(old_size_goal <= xmit_size_goal &&
old_size_goal + mss_now > xmit_size_goal)) {
xmit_size_goal = old_size_goal;
} else {
tp->xmit_size_goal_segs =
min_t(u16, xmit_size_goal / mss_now,
sk->sk_gso_max_segs);
xmit_size_goal = tp->xmit_size_goal_segs * mss_now;
}
}
return max(xmit_size_goal, mss_now);
}
tcp_sendmsg

应用程序 send() 数据后,会在tcp_sendmsg中尝试在同一个skb,保存size_goal大小的数据,然后再通过tcp_push把这些包通过tcp_write_xmit发出去。

(代码涉及较多,以后进行分析,TBD)

最终会调用tcp_push发送skb,而tcp_push又会调用tcp_write_xmit。tcp_sendmsg已经把数据按照GSO最大的size,放到一个个的skb中, 最终调用tcp_write_xmit发送这些GSO包。tcp_write_xmit会检查当前的拥塞窗口,还有nagle测试,tsq检查来决定是否能发送整个或者部分的skb, 如果只能发送一部分,则需要调用tso_fragment做切分。最后通过tcp_transmit_skb发送, 如果发送窗口没有达到限制,skb中存放的数据将达到GSO最大值。

tcp_write_xmit
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
static bool tcp_write_xmit(struct sock *sk, unsigned int mss_now, int nonagle,
int push_one, gfp_t gfp)
{
struct tcp_sock *tp = tcp_sk(sk);
struct sk_buff *skb;
unsigned int tso_segs, sent_pkts;
int cwnd_quota;
int result;

sent_pkts = 0;

if (!push_one) {
/* Do MTU probing. */
result = tcp_mtu_probe(sk);
if (!result) {
return false;
} else if (result > 0) {
sent_pkts = 1;
}
}
/*遍历发送队列*/
while ((skb = tcp_send_head(sk))) {
unsigned int limit;

tso_segs = tcp_init_tso_segs(sk, skb, mss_now);
BUG_ON(!tso_segs);

if (unlikely(tp->repair) && tp->repair_queue == TCP_SEND_QUEUE)
goto repair; /* Skip network transmission */

cwnd_quota = tcp_cwnd_test(tp, skb);
if (!cwnd_quota) {
if (push_one == 2)
/* Force out a loss probe pkt. */
cwnd_quota = 1;
else
break;
}

if (unlikely(!tcp_snd_wnd_test(tp, skb, mss_now)))
break;
/*tso_segs=1表示无需tso分段*/
if (tso_segs == 1 || !sk->sk_gso_max_segs) {
/* 根据nagle算法,计算是否需要推迟发送数据 */
if (unlikely(!tcp_nagle_test(tp, skb, mss_now,
(tcp_skb_is_last(sk, skb) ?
nonagle : TCP_NAGLE_PUSH))))
break;
} else {
/*有多个tso分段*/
/*push所有skb*/
/*如果发送窗口剩余不多,并且预计下一个ack将很快到来(意味着可用窗口会增加),则推迟发送*/
if (!push_one && tcp_tso_should_defer(sk, skb))
break;
}

limit = max_t(unsigned int, sysctl_tcp_limit_output_bytes,
sk->sk_pacing_rate >> 10);

if (atomic_read(&sk->sk_wmem_alloc) > limit) {
set_bit(TSQ_THROTTLED, &tp->tsq_flags);
smp_mb__after_clear_bit();
if (atomic_read(&sk->sk_wmem_alloc) > limit)
break;
}

/*下面的逻辑是:不用推迟发送,马上发送的情况*/
limit = mss_now;
/*由于tso_segs被设置为skb->len/mss_now,所以开启gso时一定大于1*/
/*tso分段大于1且非urg模式*/
if (tso_segs > 1 && sk->sk_gso_max_segs && !tcp_urg_mode(tp))
/*返回当前skb中可以发送的数据大小,通过mss和cwnd*/
limit = tcp_mss_split_point(sk, skb, mss_now,
min_t(unsigned int,
cwnd_quota,
sk->sk_gso_max_segs));
/* 当skb的长度大于限制时,需要调用tso_fragment分片,如果分段失败则暂不发送 */
/*按limit切割成多个skb*/
if (skb->len > limit &&
unlikely(tso_fragment(sk, skb, limit, mss_now, gfp)))
break;

TCP_SKB_CB(skb)->when = tcp_time_stamp;
/*发送,如果包被qdisc丢了,则退出循环,不继续发送了*/
if (unlikely(tcp_transmit_skb(sk, skb, 1, gfp)))
break;

repair:
/*更新sk_send_head和packets_out*/
tcp_event_new_data_sent(sk, skb);
tcp_minshall_update(tp, mss_now, skb);
sent_pkts += tcp_skb_pcount(skb);

if (push_one)
break;
}

if (likely(sent_pkts)) {
if (tcp_in_cwnd_reduction(sk))
tp->prr_out += sent_pkts;

/* Send one loss probe per tail loss episode. */
if (push_one != 2)
tcp_schedule_loss_probe(sk);
tcp_cwnd_validate(sk);
return false;
}
return (push_one == 2) || (!tp->packets_out && tcp_send_head(sk));
}

其中tcp_init_tso_segs会设置skb的gso信息后文分析。我们看到tcp_write_xmit 会调用tso_fragment进行“tcp分段”。而分段的条件是skb->len > limit。这里的关键就是limit的值,我们看到在tso_segs > 1时,也就是开启gso的时候,limit的值是由tcp_mss_split_point得到的,也就是min(skb->len, window),即发送窗口允许的最大值。在没有开启gso时limit就是当前的mss。

tcp_init_tso_segs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/* Initialize TSO state of a skb.
* This must be invoked the first time we consider transmitting
* SKB onto the wire.
*/
static int tcp_init_tso_segs(const struct sock *sk, struct sk_buff *skb,
unsigned int mss_now)
{
int tso_segs = tcp_skb_pcount(skb);

if (!tso_segs || (tso_segs > 1 && tcp_skb_mss(skb) != mss_now)) {
tcp_set_skb_tso_segs(sk, skb, mss_now);
tso_segs = tcp_skb_pcount(skb);
}
return tso_segs;
}

tcp_write_xmit最后会调用ip_queue_xmit发送skb,进入ip层。

流程图如下:

GSO-TSO流程图

UFO

UFO(UDP fragmentation offload),UPD的offload。

GRE 及 VXLAN接口初始化的时候,会置此位。

1
2
3
4
5
6
7
/* Initialize the device structure. */
static void vxlan_setup(struct net_device *dev)
{
/* ... */
dev->features |= NETIF_F_GSO_SOFTWARE;
/* ... */
}

还有其他driver也支持,例如 macvlan、tun、virtnet等。

总结

  • 接收侧
    • RSS是网卡驱动支持的多队列属性,队列通过中断绑定到不同的CPU,以实现流量负载。
    • RPS是以软件形式实现流量在不同CPU之间的分发。
    • RFS是报文需要在用户态处理时,保证处理的CPU与内核相同,防止缓存miss而导致的消耗。
    • LRO 和 GRO,多个报文组成一个大包上送协议栈。
  • 发送侧
    • XPS 软件多队列发送。
    • TSO 是利用网卡来对大数据包进行自动分段,降低CPU负载的技术。
    • GSO 是协议栈分段功能。分段之前判断是否支持TSO,支持则推迟到网卡分段。 如果TSO开启,GSO会自动开启。
    • UFO 类似TSO,不过只针对UDP报文。

参考资料