/* check if one or more complete packets were indeed received */
while (*R_DMA_CH1_FIRST != virt_to_phys(myNextRxDesc)) { /* Take out the buffer and give it to the OS, then * allocate a new buffer to put a packet in. */ e100_rx(dev); ((struct net_local *)dev->priv)->stats.rx_packets++; /* restart/continue on the channel, for safety */ *R_DMA_CH1_CMD = IO_STATE(R_DMA_CH1_CMD, cmd, restart); /* clear dma channel 1 eop/descr irq bits */ *R_DMA_CH1_CLR_INTR = IO_STATE(R_DMA_CH1_CLR_INTR, clr_eop, do) | IO_STATE(R_DMA_CH1_CLR_INTR, clr_descr, do); /* now, we might have gotten another packet so we have to loop back and check if so */ } } }
if (length < RX_COPYBREAK) { /* Small packet, copy data */ skb = dev_alloc_skb(length - ETHER_HEAD_LEN);
skb_put(skb, length - ETHER_HEAD_LEN); /* allocate room for the packet body */ skb_data_ptr = skb_push(skb, ETHER_HEAD_LEN); /* allocate room for the header */
memcpy(skb_data_ptr, phys_to_virt(myNextRxDesc->descr.buf), length); } else { /* Large packet, send directly to upper layers and allocate new memory */ skb = myNextRxDesc->skb; skb_put(skb, length); myNextRxDesc->skb = dev_alloc_skb(MAX_MEDIA_DATA_SIZE); myNextRxDesc->descr.buf = virt_to_phys(myNextRxDesc->skb->data); }
skb->protocol = eth_type_trans(skb, dev); /* Send the packet to the upper layers */ netif_rx(skb); /* Prepare for next packet */ myNextRxDesc->descr.status = 0; myPrevRxDesc = myNextRxDesc; myNextRxDesc = phys_to_virt(myNextRxDesc->descr.next);
rx_queue_len++;
/* Check if descriptors should be returned */ if (rx_queue_len == RX_QUEUE_THRESHOLD) { flush_etrax_cache(); myPrevRxDesc->descr.ctrl |= d_eol; myLastRxDesc->descr.ctrl &= ~d_eol; myLastRxDesc = myPrevRxDesc; rx_queue_len = 0; } }
NAPI
NAPI 是 Linux 上采用的一种提高网络处理效率的技术,它的核心概念就是不采用中断的方式读取数据,而代之以首先采用中断唤醒数据接收的服务程序,然后 Pol l 的方法来轮询数据。NAPI 的使用流程如下:
NAPI 被驱动 Enable,但是默认是关闭状态
数据包到达 NIC 并且被 DMA 到 RAM 中
NIC 产生 IRQ,触发了驱动中的 IRQ Handler
驱动通过 softirq 唤醒 NAPI 子系统,通过使用驱动注册的 Poll 函数来获取数据包
驱动关闭 NIC 的中断,这样可以通过驱动使用 NAPI 获取数据包而不用处理中断
当所有数据包都已被处理,NAPI 被 disable,IRQs 被 re-enable
当再次有数据包到达,重复步骤2
对于使用 NAPI 的驱动,都会有一个 poll 函数,它会调用 netif_napi_add 方法向 NAPI 子系统注册,后面会详细介绍。
/* * Structure for NAPI scheduling similar to tasklet but with weighting */ structnapi_struct { /* The poll_list must only be managed by the entity which * changes the state of the NAPI_STATE_SCHED bit. This means * whoever atomically sets that bit can add this napi_struct * to the per-CPU poll_list, and whoever clears that bit * can remove from the list right before clearing the bit. */ structlist_headpoll_list;
unsignedlong state; int weight; unsignedlong gro_bitmask; int (*poll)(struct napi_struct *, int);
ixgbe_probe(); -> ixgbe_init_interrupt_scheme(); -> ixgbe_alloc_q_vectors(); // We allocate one q_vector per queue interrupt -> ixgbe_alloc_q_vector(); // Allocate memory for a single interrupt vector
staticintixgbe_alloc_q_vector(struct ixgbe_adapter *adapter, int v_count, int v_idx, int txr_count, int txr_idx, int xdp_count, int xdp_idx, int rxr_count, int rxr_idx) { /* ... */
/* allocate q_vector and rings */ q_vector = kzalloc(size, GFP_KERNEL); if (!q_vector) return -ENOMEM;
/* initialize NAPI */ netif_napi_add(adapter->netdev, &q_vector->napi, ixgbe_poll, 64);
ixgbe_request_irq(adapter); /* Notify the stack of the actual queue counts. */ queues = adapter->num_tx_queues; netif_set_real_num_tx_queues(netdev, queues);
当前绝大多数的 NIC 会直接将收到的数据包 DMA 到 RAM,然后操作系统网络子系统可以从 RAM 中取出数据,这段空间即是 Ring Buffer。因此,驱动必须向 OS 申请一段 Memory Region,并将这段地址告知 Hardware,之后网卡就会自动将数据包 DMA 到这段 Ring Buffer 中。
当网络数据包速率较高时,一个 CPU 不能够处理所有的数据包,具体的,这段 Ring Buffer 是固定大小的空间,数据包将会被丢掉。为了解决这个问题,设计了 Receive Side Scaling (RSS) 或者说多队列网卡技术。一些网卡设备可以将接收到的数据包分发到多个 Ring Buffer 中,每个 Ring Buffer 就是一个独立的 Queue。这样允许 OS 在硬件层面就使用多个 CPU 来并行处理收到的数据包。
structixgbe_ring { structixgbe_ring *next;/* pointer to next ring in q_vector */ structixgbe_q_vector *q_vector;/* backpointer to host q_vector */ structnet_device *netdev;/* netdev ring belongs to */ structdevice *dev;/* device for DMA mapping */ void *desc; /* descriptor ring memory */ union { structixgbe_tx_buffer *tx_buffer_info; structixgbe_rx_buffer *rx_buffer_info; }; unsignedlong state; u8 __iomem *tail; dma_addr_t dma; /* phys. address of descriptor ring */ unsignedintsize; /* length in bytes */
u16 count; /* amount of descriptors */
u8 queue_index; /* needed for multiqueue queue management */ u8 reg_idx; /* holds the special value that gets * the hardware register offset * associated with this ring, which is * different for DCB and RSS modes */ u16 next_to_use; u16 next_to_clean;
staticvoidrun_ksoftirqd(unsignedint cpu) { local_irq_disable(); if (local_softirq_pending()) { /* * We can safely run softirq on inline stack, as we are not deep * in the task stack here. */ __do_softirq(); local_irq_enable(); cond_resched(); return; } local_irq_enable(); }
__do_softirq() 的逻辑如下:
看当前那个 softirq 处在 pending 状态
softirq 的时间被统计
softirq 的执行统计数据递增
执行 pending 状态的 softirq 的 handler
SoftIRQ 有以下几种类型:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
enum { HI_SOFTIRQ=0, TIMER_SOFTIRQ, NET_TX_SOFTIRQ, NET_RX_SOFTIRQ, BLOCK_SOFTIRQ, BLOCK_IOPOLL_SOFTIRQ, TASKLET_SOFTIRQ, SCHED_SOFTIRQ, HRTIMER_SOFTIRQ, RCU_SOFTIRQ, /* Preferable RCU should always be the last softirq */ NR_SOFTIRQS };
/* Write the ITR value calculated from the previous interrupt. */ igb_write_itr(q_vector);
napi_schedule(&q_vector->napi);
return IRQ_HANDLED; }
我们具体看看 napi_schedule 做了什么,他实际是 __napi_schedule 的简单封装,这段代码首先获取了当前 CPU 上注册的 softnet_data,然后执行了 ____napi_schedule
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/** * __napi_schedule - schedule for receive * @n: entry to schedule * * The entry's receive function will be scheduled to run */ void __napi_schedule(struct napi_struct *n) { unsignedlong flags;
if (list_empty(&list)) { if (!sd_has_rps_ipi_waiting(sd) && list_empty(&repoll)) goto out; break; }
n = list_first_entry(&list, struct napi_struct, poll_list); budget -= napi_poll(n, &repoll);
/* If softirq window is exhausted then punt. * Allow this to run for 2 jiffies since which will allow * an average latency of 1.5/HZ. */ if (unlikely(budget <= 0 || time_after_eq(jiffies, time_limit))) { sd->time_squeeze++; break; } }
local_irq_disable();
list_splice_tail_init(&sd->poll_list, &list); list_splice_tail(&repoll, &list); list_splice(&list, &sd->poll_list); if (!list_empty(&sd->poll_list)) __raise_softirq_irqoff(NET_RX_SOFTIRQ);
/* This NAPI_STATE_SCHED test is for avoiding a race * with netpoll's poll_napi(). Only the entity which * obtains the lock and sees NAPI_STATE_SCHED set will * actually make the ->poll() call. Therefore we avoid * accidentally calling ->poll() when NAPI is not scheduled. */ work = 0; if (test_bit(NAPI_STATE_SCHED, &n->state)) { work = n->poll(n, weight); trace_napi_poll(n, work, weight); } /* Drivers must not modify the NAPI state if they * consume the entire weight. In such cases this code * still "owns" the NAPI instance and therefore can * move the instance around on the list at-will. */ if (unlikely(napi_disable_pending(n))) { napi_complete(n); goto out_unlock; }
if (n->gro_bitmask) { /* flush too old packets * If HZ < 1000, flush all packets. */ napi_gro_flush(n, HZ >= 1000); } list_add_tail(&n->poll_list, repoll);
out_unlock: netpoll_poll_unlock(have);
return work; }
这里 NAPI 和 驱动定义的规则是:
如果 driver 的 poll function,比如这里的 ixgbe_poll 消耗了整个 weight,那么不能改变 NAPI 的状态,将当前 napi 结构体移到 poll list 的末尾,接下来将会执行下一个 net_rx_action loop
/** * ixgbe_poll - NAPI Rx polling callback * @napi: structure for representing this polling device * @budget: how many packets driver is allowed to clean * * This function is used for legacy and MSI, NAPI mode **/ intixgbe_poll(struct napi_struct *napi, int budget) { structixgbe_q_vector *q_vector = container_of(napi, structixgbe_q_vector, napi); structixgbe_adapter *adapter = q_vector->adapter; structixgbe_ring *ring; int per_ring_budget, work_done = 0; bool clean_complete = true;
/* Exit if we are called by netpoll */ if (budget <= 0) return budget;
/* attempt to distribute budget to each queue fairly, but don't allow * the budget to go below 1 because we'll exit polling */ if (q_vector->rx.count > 1) per_ring_budget = max(budget/q_vector->rx.count, 1); else per_ring_budget = budget;
ixgbe_for_each_ring(ring, q_vector->rx) { int cleaned = ixgbe_clean_rx_irq(q_vector, ring, per_ring_budget);
/* If all work not completed, return budget and keep polling */ if (!clean_complete) return budget;
/* all work done, exit the polling mode */ if (likely(napi_complete_done(napi, work_done))) { if (adapter->rx_itr_setting & 1) ixgbe_set_itr(q_vector); if (!test_bit(__IXGBE_DOWN, &adapter->state)) ixgbe_irq_enable_queues(adapter, BIT_ULL(q_vector->v_idx)); }
每个 NAPI 变量都会运行在相应 CPU 的软中断的上下文中。而且,触发硬中断的这个 CPU 接下来会负责执行相应的软中断处理函数来收包。换言之,同一个 CPU 既处理硬中断,又处理相应的软中断。
一些网卡(例如 Intel I350)在硬件层支持多队列。这意味着收进来的包会被通过 DMA 放到位于不同内存的队列上,而不同的队列有相应的 NAPI 变量管理软中断 poll()过程。因此, 多个 CPU 同时处理从网卡来的中断,处理收包过程。这个特性被称作 RSS(Receive Side Scaling,接收端扩展)。
/* Schedule NAPI for backlog device * We can use non atomic operation since we own the queue lock */ if (!__test_and_set_bit(NAPI_STATE_SCHED, &sd->backlog.state)) { if (!rps_ipi_queued(sd)) ____napi_schedule(sd, &sd->backlog); } goto enqueue;
Flow limits
RPS 在不同 CPU 之间分发 packet,但是,如果一个 flow 特别大,会出现单个 CPU 被打爆,而其他 CPU 无事可做(饥饿)的状态。因此引入了 flow limit 特性,放到一个 backlog 队列的属 于同一个 flow 的包的数量不能超过一个阈值。这可以保证即使有一个很大的 flow 在大量收包 ,小 flow 也能得到及时的处理。
1
if (qlen <= netdev_max_backlog && !skb_flow_limit(skb, qlen)) {
NAPI is disabled if the total weight will not be used. The poller is restarted with the call to ____napi_schedule from enqueue_to_backlog as described above.
__netif_receive_skb_core 完成将数据送到协议栈这一繁重工作(the heavy lifting of delivering the data)。在此之前,它会先检查是否插入了 packet tap(探测点),这些 tap 是抓包用的。例如,AF_PACKET 地址族就可以插入这些抓包指令, 一般通过 libpcap 库。
处理 tap
如果存在抓包点(tap),数据就会先到抓包点,然后才到协议层。
如果有 packet tap(通常通过 libpcap),packet 会送到那里。
1 2 3 4 5 6 7
list_for_each_entry_rcu(ptype, &ptype_all, list) { if (!ptype->dev || ptype->dev == skb->dev) { if (pt_prev) ret = deliver_skb(skb, pt_prev, orig_dev); pt_prev = ptype; } }
if (!pskb_may_pull(skb, sizeof(struct iphdr))) goto inhdr_error;
iph = ip_hdr(skb);
if (iph->ihl < 5 || iph->version != 4) goto inhdr_error;
if (!pskb_may_pull(skb, iph->ihl*4)) goto inhdr_error;
iph = ip_hdr(skb);
if (unlikely(ip_fast_csum((u8 *)iph, iph->ihl))) goto csum_error;
len = ntohs(iph->tot_len); if (skb->len < len) { goto drop; } elseif (len < (iph->ihl*4)) goto inhdr_error;
/* Our transport medium may have padded the buffer out. Now we know it * is IP we can trim to the true length of the frame. * Note this now means skb->len holds ntohs(iph->tot_len). */ if (pskb_trim_rcsum(skb, len)) { goto drop; }
/* that should never happen */ if (skb->pkt_type != PACKET_HOST) goto drop;
if (unlikely(skb->sk)) goto drop;
if (skb_warn_if_lro(skb)) goto drop;
if (!xfrm4_policy_check(NULL, XFRM_POLICY_FWD, skb)) goto drop;
if (IPCB(skb)->opt.router_alert && ip_call_ra_chain(skb)) return NET_RX_SUCCESS;
skb_forward_csum(skb); net = dev_net(skb->dev);
/* * According to the RFC, we must first decrease the TTL field. If * that reaches zero, we must reply an ICMP control message telling * that the packet's lifetime expired. */ if (ip_hdr(skb)->ttl <= 1) goto too_many_hops;
if (!xfrm4_route_forward(skb)) goto drop;
rt = skb_rtable(skb);
if (opt->is_strictroute && rt->rt_uses_gateway) goto sr_failed;
IPCB(skb)->flags |= IPSKB_FORWARDED; mtu = ip_dst_mtu_maybe_forward(&rt->dst, true); if (ip_exceeds_mtu(skb, mtu)) { IP_INC_STATS(net, IPSTATS_MIB_FRAGFAILS); icmp_send(skb, ICMP_DEST_UNREACH, ICMP_FRAG_NEEDED, htonl(mtu)); goto drop; }
/* We are about to mangle packet. Copy it! */ if (skb_cow(skb, LL_RESERVED_SPACE(rt->dst.dev)+rt->dst.header_len)) goto drop; iph = ip_hdr(skb);
/* Decrease ttl after skb cow done */ ip_decrease_ttl(iph);
/* * We now generate an ICMP HOST REDIRECT giving the route * we calculated. */ if (IPCB(skb)->flags & IPSKB_DOREDIRECT && !opt->srr && !skb_sec_path(skb)) ip_rt_send_redirect(skb);
if (net->ipv4.sysctl_ip_fwd_update_priority) skb->priority = rt_tos2priority(iph->tos);
/* This is used to register protocols. */ structnet_protocol { int (*early_demux)(struct sk_buff *skb); int (*early_demux_handler)(struct sk_buff *skb); int (*handler)(struct sk_buff *skb); void (*err_handler)(struct sk_buff *skb, u32 info); unsignedint no_policy:1, netns_ok:1, /* does the protocol do more stringent * icmp tag validation than simple * socket lookup? */ icmp_strict_tag_validation:1; };