赞
踩
从网卡收包到上送协议栈有两个模式:
一种是传统的中断模式,即收到一个数据包,执行一次中断处理函数(比如e100_rx),在此函数中分配skb,替换有数据的skb(DMA已经将数据拷贝到初始化的skb),调用netif_rx将有数据的skb放在percpu的队列上(如果开启了RPS,这个队列有可能是本地cpu的,也有可能是其他cpu的),最后激活软中断。之后的软中断处理函数net_rx_action中调用poll函数process_backlog(如果将skb放在其他cpu队列上了,还需要通过ipi激活其他cpu的软中断),处理percpu队列上的数据包,上送协议栈__netif_receive_skb。
中断模式会触发很多中断,影响性能,所以有了napi模式,这种模式下,一次中断可以poll收多个数据包(配额64)。具体的为收到一个中断,执行中断处理函数(比如ixgbe_msix_clean_rings),在此函数中只是激活软中断,并不处理skb,在之后的软中断处理函数net_rx_action中调用驱动注册的poll函数,比如ixgbe_poll,来收包,上送协议栈netif_receive_skb_internal(如果开启了RPS,就会按照non-napi的处理方式,将skb放在percpu的队列上,这个队列有可能是本地cpu的,也有可能是其他cpu的),再经过软中断处理才会将skb上送协议栈__netif_receive_skb。
下面的图片展示了这两种模式的流程,其中蓝色部分为公共流程,红色的为non-NAPI流程,绿色的为NAPI流程。
image.png
软中断流程分为两步,首先激活软中断,然后在某个时刻执行软中断处理函数
- raise_softirq
- raise_softirq_irqoff(nr);
- __raise_softirq_irqoff(unsigned int nr)
- or_softirq_pending(1UL << nr);
b. NAPI模式下激活软中断方式,一般在驱动的中断处理函数中调用
- napi_schedule
- __napi_schedule(n);
- ____napi_schedule(this_cpu_ptr(&softnet_data), n);
- list_add_tail(&napi->poll_list, &sd->poll_list);
- __raise_softirq_irqoff(NET_RX_SOFTIRQ);
- or_softirq_pending(1UL << nr);
c. non-NAPI模式下激活软中断方式,在netif_rx->enqueue_to_backlog时调用
- enqueue_to_backlog
- sd = &per_cpu(softnet_data, cpu);
- ____napi_schedule(sd, &sd->backlog);
- list_add_tail(&napi->poll_list, &sd->poll_list);
- __raise_softirq_irqoff(NET_RX_SOFTIRQ);
- or_softirq_pending(1UL << nr);
- irq_exit
- if (!in_interrupt() && local_softirq_pending())
- invoke_softirq
- __do_softirq
b. ksoftirqd内核服务线程运行的时候
- __do_softirq
- invoke_softirq
- raise_softirq_irqoff
- wakeup_softirqd
- run_ksoftirqd
- if (local_softirq_pending()) {
- __do_softirq
c. netif_rx_ni
netif_rx_ni 会先将做和netif_rx一样的操作后,如果有软中断激活,则执行软中断
- netif_rx_ni
- if (local_softirq_pending())
- do_softirq();
- do_softirq_own_stack();
- if (local_softirq_pending())
- __do_softirq
- //kernel启动时,软中断相关初始化
- static int __init net_dev_init(void)
- {
- ...
- /*
- * Initialise the packet receive queues.
- */
- //初始化percpu的结构softnet_data
- for_each_possible_cpu(i) {
- struct softnet_data *sd = &per_cpu(softnet_data, i);
-
- skb_queue_head_init(&sd->input_pkt_queue);
- skb_queue_head_init(&sd->process_queue);
- INIT_LIST_HEAD(&sd->poll_list);
- sd->output_queue_tailp = &sd->output_queue;
- #ifdef CONFIG_RPS
- sd->csd.func = rps_trigger_softirq; //激活其他cpu软中断
- sd->csd.info = sd;
- sd->cpu = i;
- #endif
- //backlog借用napi的结构,实现non-NAPI的处理。
- //process_backlog就是NAPI下的poll函数
- sd->backlog.poll = process_backlog;
- sd->backlog.weight = weight_p;
- }
- ...
- //注册和网络相关的两个软中断处理函数
- open_softirq(NET_TX_SOFTIRQ, net_tx_action);
- open_softirq(NET_RX_SOFTIRQ, net_rx_action);
- ...
- }
支持以下软中断类型
- enum
- {
- HI_SOFTIRQ=0,
- TIMER_SOFTIRQ,
- NET_TX_SOFTIRQ,
- NET_RX_SOFTIRQ,
- BLOCK_SOFTIRQ,
- BLOCK_IOPOLL_SOFTIRQ,
- TASKLET_SOFTIRQ,
- SCHED_SOFTIRQ,
- HRTIMER_SOFTIRQ,
- RCU_SOFTIRQ, /* Preferable RCU should always be the last softirq */
-
- NR_SOFTIRQS
- };
注册软中断处理函数
- void open_softirq(int nr, void (*action)(struct softirq_action *))
- {
- softirq_vec[nr].action = action;
- }
- static irqreturn_t dm9000_interrupt(int irq, void *dev_id)
- /* Received the coming packet */
- if (int_status & ISR_PRS)
- dm9000_rx(dev);
- //分配 skb
- skb = netdev_alloc_skb(dev, RxLen + 4)
- //将数据存入 skb
- rdptr = (u8 *) skb_put(skb, RxLen - 4);
- (db->inblk)(db->io_data, rdptr, RxLen);
- //调用netif_rx处理skb
- netif_rx(skb);
-
- int netif_rx(struct sk_buff *skb)
- {
- //static tracepoint
- trace_netif_rx_entry(skb);
-
- return netif_rx_internal(skb);
- }
获取合适的cpu,调用 enqueue_to_backlog 将skb放入percpu的队列中
- static int netif_rx_internal(struct sk_buff *skb)
- {
- int ret;
-
- net_timestamp_check(netdev_tstamp_prequeue, skb);
-
- trace_netif_rx(skb);
- #ifdef CONFIG_RPS
- //如果内核配置选项配置了 RPS,并且使能了rps(echo f >
- ///sys/class/net/eth0/queues/rx-0/rps_cpus),则通过
- //get_rps_cpu获取合适的cpu(有可能是本地cpu也有可能是
- //remote cpu),否则使用本地cpu
- if (static_key_false(&rps_needed)) {
- struct rps_dev_flow voidflow, *rflow = &voidflow;
- int cpu;
-
- preempt_disable();
- rcu_read_lock();
-
- cpu = get_rps_cpu(skb->dev, skb, &rflow);
- if (cpu < 0)
- cpu = smp_processor_id();
-
- ret = enqueue_to_backlog(skb, cpu, &rflow->last_qtail);
-
- rcu_read_unlock();
- preempt_enable();
- } else
- #endif
- {
- unsigned int qtail;
- //没有配置rps,则获取当地cpu
- ret = enqueue_to_backlog(skb, get_cpu(), &qtail);
- put_cpu();
- }
- return ret;
- }
将skb放在指定cpu的softnet_data->input_pkt_queue队列中,
如果是队列上第一个包还需要激活软中断
- /*
- * enqueue_to_backlog is called to queue an skb to a per CPU backlog
- * queue (may be a remote CPU queue).
- */
- static int enqueue_to_backlog(struct sk_buff *skb, int cpu,
- unsigned int *qtail)
- {
- struct softnet_data *sd;
- unsigned long flags;
- unsigned int qlen;
- //获取percpu的sd
- sd = &per_cpu(softnet_data, cpu);
-
- local_irq_save(flags);
-
- rps_lock(sd);
- if (!netif_running(skb->dev))
- goto drop;
- //如果队列中skb个数小于netdev_max_backlog(默认值
- //1000,可以通过sysctl修改netdev_max_backlog值),
- //并且 skb_flow_limit (为了防止large flow占用太多cpu,small
- //flow得不到处理。代码实现没看明白)返回false,则skb可以
- //继续入队,否则drop skb
- qlen = skb_queue_len(&sd->input_pkt_queue);
- if (qlen <= netdev_max_backlog && !skb_flow_limit(skb, qlen)) {
- //如果队列不为空,则直接入队,否则先激活软中断,再入队
- if (skb_queue_len(&sd->input_pkt_queue)) {
- enqueue:
- __skb_queue_tail(&sd->input_pkt_queue, skb);
- input_queue_tail_incr_save(sd, qtail);
- rps_unlock(sd);
- local_irq_restore(flags);
- return NET_RX_SUCCESS;
- }
-
- /* Schedule NAPI for backlog device
- * We can use non atomic operation since we own the queue lock
- */
- //队列为空时,即skb是第一个入队元素,则将state设置
- //为 NAPI_STATE_SCHED(软中断处理函数
- //rx_net_action会检查此标志),表示软中断可以处理此
- //backlog
- if (!__test_and_set_bit(NAPI_STATE_SCHED, &sd->backlog.state)) {
- //if返回0的情况下,需要将sd->backlog挂到sd-
- //>poll_list上,并激活软中断。rps_ipi_queued看下
- //面的分析
- if (!rps_ipi_queued(sd))
- ____napi_schedule(sd, &sd->backlog);
- }
- goto enqueue;
- }
-
- drop:
- sd->dropped++;
- rps_unlock(sd);
-
- local_irq_restore(flags);
-
- atomic_long_inc(&skb->dev->rx_dropped);
- kfree_skb(skb);
- return NET_RX_DROP;
- }
-
- /*
- * Check if this softnet_data structure is another cpu one
- * If yes, queue it to our IPI list and return 1
- * If no, return 0
- */
- //上面注释说的很清楚,在配置RPS情况下,检查sd是当前cpu的
- //还是其他cpu的,如果是其他cpu的,将sd放在当前cpu的mysd-
- //>rps_ipi_list上,并激活当前cpu的软中断,返回1. 在软中断处理
- //函数net_rx_action中,通过ipi中断通知其他cpu来处理放在其他
- //cpu队列上的skb如果是当前cpu,或者没有配置RPS,则返回0,
- //在外层函数激活软中断,并将当前cpu的backlog放入sd->poll_list
- //上
- static int rps_ipi_queued(struct softnet_data *sd)
- {
- #ifdef CONFIG_RPS
- struct softnet_data *mysd = this_cpu_ptr(&softnet_data);
-
- if (sd != mysd) {
- sd->rps_ipi_next = mysd->rps_ipi_list;
- mysd->rps_ipi_list = sd;
-
- __raise_softirq_irqoff(NET_RX_SOFTIRQ);
- return 1;
- }
- #endif /* CONFIG_RPS */
- return 0;
- }
- asmlinkage __visible void __do_softirq(void)
- {
- MAX_SOFTIRQ_TIME为2ms,如果一直有软中断可以执行2ms
- unsigned long end = jiffies + MAX_SOFTIRQ_TIME;
- unsigned long old_flags = current->flags;
- MAX_SOFTIRQ_RESTART为10,表示可以循环执行10此软中断
- int max_restart = MAX_SOFTIRQ_RESTART;
- struct softirq_action *h;
- bool in_hardirq;
- __u32 pending;
- int softirq_bit;
-
- /*
- * Mask out PF_MEMALLOC s current task context is borrowed for the
- * softirq. A softirq handled such as network RX might set PF_MEMALLOC
- * again if the socket is related to swap
- */
- current->flags &= ~PF_MEMALLOC;
- //取出当前cpu上所有的软中断
- pending = local_softirq_pending();
- account_irq_enter_time(current);
-
- __local_bh_disable_ip(_RET_IP_, SOFTIRQ_OFFSET);
- in_hardirq = lockdep_softirq_start();
-
- restart:
- /* Reset the pending bitmask before enabling irqs */
- //清空当前cpu上所有的软中断
- set_softirq_pending(0);
- //执行软中断时打开硬件中断
- local_irq_enable();
-
- h = softirq_vec;
- //遍历执行软中断
- while ((softirq_bit = ffs(pending))) {
- unsigned int vec_nr;
- int prev_count;
-
- h += softirq_bit - 1;
-
- vec_nr = h - softirq_vec;
- prev_count = preempt_count();
-
- kstat_incr_softirqs_this_cpu(vec_nr);
-
- trace_softirq_entry(vec_nr);
- //软中断处理函数,比如 net_rx_action
- h->action(h);
- trace_softirq_exit(vec_nr);
- if (unlikely(prev_count != preempt_count())) {
- pr_err("huh, entered softirq %u %s %p with preempt_count %08x, exited with %08x?\n",
- vec_nr, softirq_to_name[vec_nr], h->action,
- prev_count, preempt_count());
- preempt_count_set(prev_count);
- }
- h++;
- pending >>= softirq_bit;
- }
-
- rcu_bh_qs();
- //执行完软中断,关闭硬中断
- local_irq_disable();
- //检查执行软中断过程中(开启硬中断)是否有新的软中断被激活
- pending = local_softirq_pending();
- if (pending) {
- //如果有新的软中断被激活,并且执行软中断时间不足
- //2ms,并且重新执行次数不足10次,则可以再次执行软
- //中断。
- if (time_before(jiffies, end) && !need_resched() &&
- --max_restart)
- goto restart;
- //否则只能唤醒软中断处理线程继续处理软中断
- wakeup_softirqd();
- }
-
- lockdep_softirq_end(in_hardirq);
- account_irq_exit_time(current);
- __local_bh_enable(SOFTIRQ_OFFSET);
- WARN_ON_ONCE(in_interrupt());
- tsk_restore_flags(current, old_flags, PF_MEMALLOC);
- }
网络收包软中断处理函数
- static void net_rx_action(struct softirq_action *h)
- {
- //获取percpu的sd
- struct softnet_data *sd = this_cpu_ptr(&softnet_data);
- unsigned long time_limit = jiffies + 2;
-
- //netdev_budget默认值300,可通过sysctl修改
- int budget = netdev_budget;
- void *have;
-
- local_irq_disable();
- //如果sd->poll_list不为空,说明有数据需要处理
- while (!list_empty(&sd->poll_list)) {
- struct napi_struct *n;
- int work, weight;
-
- /* If softirq window is exhuasted then punt.
- * Allow this to run for 2 jiffies since which will allow
- * an average latency of 1.5/HZ.
- */
- //如果budget用完了,或者经过了两个时间片,说明数据
- //包压力过大,还没处理完就需要跳出循环,在
- //softnet_break会再次激活软中断(因为执行软中断时已
- //经把所有的pending清空了)
- if (unlikely(budget <= 0 || time_after_eq(jiffies, time_limit)))
- goto softnet_break;
-
- local_irq_enable();
-
- /* Even though interrupts have been re-enabled, this
- * access is safe because interrupts can only add new
- * entries to the tail of this list, and only ->poll()
- * calls can remove this head entry from the list.
- */
- n = list_first_entry(&sd->poll_list, struct napi_struct, poll_list);
-
- have = netpoll_poll_lock(n);
-
- weight = n->weight;
-
- /* This NAPI_STATE_SCHED test is for avoiding a race
- * with netpoll's poll_napi(). Only the entity which
- * obtains the lock and sees NAPI_STATE_SCHED set will
- * actually make the ->poll() call. Therefore we avoid
- * accidentally calling ->poll() when NAPI is not scheduled.
- */
- work = 0;
- //只有state为NAPI_STATE_SCHED才会执行poll函数。
- //对于non-napi来说,poll函数为process_backlog,处理
- //percpu的input queue上的数据包。
- //对于napi来说,poll函数为网卡驱动提供的poll函数,比
- //如ixgbe_poll,分配skb,将skb上送协议栈
- //如果poll处理后的结果work小于weight说明没有更多数
- //据需要处理,poll函数中会把napi从链表sd->poll_list删
- //除。如果work等于weight说明还有更多数据需要处理,
- //不会删除napi,只是将napi移动到链表尾部
- if (test_bit(NAPI_STATE_SCHED, &n->state)) {
- work = n->poll(n, weight);
- trace_napi_poll(n);
- }
- WARN_ON_ONCE(work > weight);
- //work为poll实际处理的数据个数,budget需要减去work
- budget -= work;
- local_irq_disable();
- /* Drivers must not modify the NAPI state if they
- * consume the entire weight. In such cases this code
- * still "owns" the NAPI instance and therefore can
- * move the instance around on the list at-will.
- */
- //如果work等于weight说明还有更多数据需要处理
- if (unlikely(work == weight)) {
- if (unlikely(napi_disable_pending(n))) {
- local_irq_enable();
- napi_complete(n);
- local_irq_disable();
- } else {
- if (n->gro_list) {
- /* flush too old packets
- * If HZ < 1000, flush all packets.
- */
- local_irq_enable();
- napi_gro_flush(n, HZ >= 1000);
- local_irq_disable();
- }
- //将napi移动到链表尾部
- list_move_tail(&n->poll_list, &sd->poll_list);
- }
- }
- netpoll_poll_unlock(have);
- }
- out:
- net_rps_action_and_irq_enable(sd);
- return;
- softnet_break:
- sd->time_squeeze++;
- __raise_softirq_irqoff(NET_RX_SOFTIRQ);
- goto out;
- }
- /*
- * net_rps_action_and_irq_enable sends any pending IPI's for rps.
- * Note: called with local irq disabled, but exits with local irq enabled.
- */
- //如果链表 sd->rps_ipi_list不为空,说明在rps下,将skb放在其他
- //cpu上的percpu队列上了,所以需要通过ipi中断通知其他cpu,通
- //过smp_call_function_single_async远程激活其他cpu的软中断,
- //使其他cpu处理数据包
- static void net_rps_action_and_irq_enable(struct softnet_data *sd)
- {
- #ifdef CONFIG_RPS
- struct softnet_data *remsd = sd->rps_ipi_list;
-
- if (remsd) {
- sd->rps_ipi_list = NULL;
-
- local_irq_enable();
-
- /* Send pending IPI's to kick RPS processing on remote cpus. */
- while (remsd) {
- struct softnet_data *next = remsd->rps_ipi_next;
- if (cpu_online(remsd->cpu))
- smp_call_function_single_async(remsd->cpu,
- &remsd->csd);
- remsd = next;
- }
- } else
- #endif
- local_irq_enable();
- }
non-napi下的poll函数为 process_backlog
- static int process_backlog(struct napi_struct *napi, int quota)
- {
- int work = 0;
- struct softnet_data *sd = container_of(napi, struct softnet_data, backlog);
-
- #ifdef CONFIG_RPS
- /* Check if we have pending ipi, its better to send them now,
- * not waiting net_rx_action() end.
- */
- //激活其他cpu上的软中断
- if (sd->rps_ipi_list) {
- local_irq_disable();
- net_rps_action_and_irq_enable(sd);
- }
- #endif
- napi->weight = weight_p;
- local_irq_disable();
- while (1) {
- struct sk_buff *skb;
-
- while ((skb = __skb_dequeue(&sd->process_queue))) {
- rcu_read_lock();
- local_irq_enable();
- //将skb上送协议栈
- __netif_receive_skb(skb);
- rcu_read_unlock();
- local_irq_disable();
- input_queue_head_incr(sd);
- //处理skb的个数达到quota了,说明还有更多数据
- //包需要处理
- if (++work >= quota) {
- local_irq_enable();
- return work;
- }
- }
-
- rps_lock(sd);
- if (skb_queue_empty(&sd->input_pkt_queue)) {
- /*
- * Inline a custom version of __napi_complete().
- * only current cpu owns and manipulates this napi,
- * and NAPI_STATE_SCHED is the only possible flag set
- * on backlog.
- * We can use a plain write instead of clear_bit(),
- * and we dont need an smp_mb() memory barrier.
- */
- //如果input_pkt_queue队列为空,将napi从链表
- //poll_list删除
- list_del(&napi->poll_list);
- napi->state = 0;
- rps_unlock(sd);
-
- break;
- }
- //将input_pkt_queue队列中的skb挂到process_queue
- //上,并清空input_pkt_queue
- skb_queue_splice_tail_init(&sd->input_pkt_queue,
- &sd->process_queue);
- rps_unlock(sd);
- }
- local_irq_enable();
-
- return work;
- }
1.激活软中断
- //硬件中断到来时调用中断处理函数 ixgbe_msix_clean_rings
- ixgbe_msix_clean_rings
- napi_schedule(&q_vector->napi);
- ____napi_schedule(this_cpu_ptr(&softnet_data), n);
- //将napi添加到per cpu的softnet_data->poll_list中
- list_add_tail(&napi->poll_list, &sd->poll_list);
- //将接收软中断置位
- __raise_softirq_irqoff(NET_RX_SOFTIRQ);
-
2.执行软中断
- __do_softirq
- net_rx_action
- n = list_first_entry(&sd->poll_list, struct napi_struct, poll_list);
- work = n->poll(n, weight); //即调用 ixgbe_poll
- ixgbe_clean_rx_irq(q_vector, ring)
- skb = ixgbe_fetch_rx_buffer(rx_ring, rx_desc);
- ixgbe_rx_skb(q_vector, skb);
- napi_gro_receive(&q_vector->napi, skb);
- //上送协议栈,但如果开启了RPS就走non-NAPI的路径了
- netif_receive_skb_internal
- /* all work done, exit the polling mode */
- //如果处理的skb小于配额,说明工作已经完成,将napi从poll_list删除
- //清除标志位 NAPI_STATE_SCHED
- napi_complete(napi);
- list_del(&n->poll_list);
- clear_bit(NAPI_STATE_SCHED, &n->state);
如果没有开启RPS,则直接调用__netif_receive_skb上送协议栈了。如果开启了RPS,则调用get_rps_cpu获取合适的cpu(有可能是本地cpu,也有可能是其他cpu),再调用enqueue_to_backlog将skb放在percpu的队列中,激活相应cpu的软中断。
- static int netif_receive_skb_internal(struct sk_buff *skb)
- {
- int ret;
-
- net_timestamp_check(netdev_tstamp_prequeue, skb);
-
- if (skb_defer_rx_timestamp(skb))
- return NET_RX_SUCCESS;
-
- rcu_read_lock();
-
- #ifdef CONFIG_RPS
- //注意使用的是static_key_false进行判断,意思是分支预测为false概率很大
- if (static_key_false(&rps_needed)) {
- struct rps_dev_flow voidflow, *rflow = &voidflow;
- int cpu = get_rps_cpu(skb->dev, skb, &rflow);
-
- if (cpu >= 0) {
- ret = enqueue_to_backlog(skb, cpu, &rflow->last_qtail);
- rcu_read_unlock();
- return ret;
- }
- }
- #endif
- ret = __netif_receive_skb(skb);
- rcu_read_unlock();
- return ret;
- }
https://blog.packagecloud.io/eng/2016/06/22/monitoring-tuning-linux-networking-stack-receiving-data/
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。