当前位置:   article > 正文

Linux NAPI机制分析_napi_complete_done

napi_complete_done

1、概述

在NAPI之前,网卡每收到一个包就会触发一个中断通知cpu读取数据包,当数据包比较多时,中断数过多势必影响cpu性能,因此Linux引入NAPI机制,NAPI就是在收到中断后,先将网卡收包模式切换成poll模式,等收包完成后重新进入中断模式,本节主要分析Linux的NAPI实现机制。

NAPI的主要流程如下图,物理网卡收到包后触发irq中断通知cpu(触发中断后,默认disable该中断),中断上半部处理里将网卡设备的napi->poll_list加入到softnet_data->poll_list,然后触发rx软中断,软中断处理函数在通过napi_poll方法调用设备自己的poll函数(ixbge_poll)。

在NAPI模式下,系统会为软中断线程及napi各分配一个额度值(软中断的额度为netdev_budget,默认值是300,所有napi共用;每个napi的额度是weight_p,默认值是64),在一次poll流程里,ixgbe_poll每接收一个报文就消耗一个额度,如果ixgbe_poll消耗的额度为napi的额度,说明此时网卡收到的报文比较多,因此需要继续下一次poll,每次napi_poll消耗的额度会累加,当超过软中断线程的额度时,退出本次软中断处理流程;当ixgbe_poll消耗的额度没有达到napi的额度时,说明网卡报文不多,因此重新开启队列中断,进入中断模式。

2、详细流程分析

ixgbe_msix_clean_rings

驱动注册msi中断处理函数入口为ixgbe_msix_clean_rings,当网卡触发irq中断时,进入ixgbe_msix_clean_rings;

  1. static irqreturn_t ixgbe_msix_clean_rings(int irq, void *data)
  2. {
  3. struct ixgbe_q_vector *q_vector = data;
  4. /* EIAM disabled interrupts (on this vector) for us */
  5. if (q_vector->rx.ring || q_vector->tx.ring)
  6. napi_schedule_irqoff(&q_vector->napi);
  7. return IRQ_HANDLED;
  8. }

中断处理函数最终调用napi_scheduler,主napi_scheduler将napi->poll_list加入到sd->poll_list,然后触发rx软中断

  1. static inline void ____napi_schedule(struct softnet_data *sd,
  2. struct napi_struct *napi)
  3. {
  4. list_add_tail(&napi->poll_list, &sd->poll_list);
  5. __raise_softirq_irqoff(NET_RX_SOFTIRQ);
  6. }

net_rx_action

中断流程触发软中断后,结束中断上半部,进入中断下半部处理流程,rx软中断处理函数为net_rx_action,在net_rx_action里,首先为软中断处理过程分配额度(netdev_budget:600),然后调用napi_poll,napi_poll每次使用的额度值累加,如果超过netdev_budget或者napi_poll超过2个tick周期,则退出软中断过程,退出之前将napi->poll_list重新加入到sd->poll_list,等待下一次调度。

  1. static void net_rx_action(struct softirq_action *h)
  2. {
  3. struct softnet_data *sd = this_cpu_ptr(&softnet_data);
  4. unsigned long time_limit = jiffies + 2;
  5. //一次软中断流程处理的配额
  6. int budget = netdev_budget;
  7. LIST_HEAD(list);
  8. LIST_HEAD(repoll);
  9. local_irq_disable();
  10. list_splice_init(&sd->poll_list, &list);
  11. local_irq_enable();
  12. for (;;) {
  13. struct napi_struct *n;
  14. if (list_empty(&list)) {
  15. if (!sd_has_rps_ipi_waiting(sd) && list_empty(&repoll))
  16. return;
  17. break;
  18. }
  19. n = list_first_entry(&list, struct napi_struct, poll_list);
  20. budget -= napi_poll(n, &repoll);
  21. /* If softirq window is exhausted then punt.
  22. * Allow this to run for 2 jiffies since which will allow
  23. * an average latency of 1.5/HZ.
  24. */
  25. //如果软中断的配额用完,或者poll的时间超过2个tick,则退出软中断处理流程
  26. if (unlikely(budget <= 0 ||
  27. time_after_eq(jiffies, time_limit))) {
  28. sd->time_squeeze++;
  29. break;
  30. }
  31. }
  32. __kfree_skb_flush();
  33. local_irq_disable();
  34. //把这个napi重新加到sd->poll_list头部,等待下次软中断再次poll
  35. list_splice_tail_init(&sd->poll_list, &list);
  36. list_splice_tail(&repoll, &list);
  37. list_splice(&list, &sd->poll_list);
  38. if (!list_empty(&sd->poll_list))
  39. //如果poll_list不为空,则再次触发软中断
  40. __raise_softirq_irqoff(NET_RX_SOFTIRQ);
  41. net_rps_action_and_irq_enable(sd);
  42. }

napi_poll

napi_poll主要是调用设备自己的poll函数,如ixgbe_poll,每次napi_poll也有自己的额度(weight_p:64);ixgbe_poll返回设备本次调用使用的额度,在napi_poll的入口首先把napi->poll_list从链表里移除,然后根据ixgbe_poll返回的已使用的额度决定是否将napi_poll重新加入到repoll链表。

如果本次ixgbe_poll额度没有用完(这种情况在ixgbe_poll里会把poll到的消息全部上送协议栈,并重新进入中断模式),则napi_poll无需重新加入repoll;如果额度用完,说明网卡还有消息包需要处理,如果开启gro,napi_poll先将gro_skb->age超过1个tick的优先上送协议栈,然后把napi_poll重新加入到repoll,napi_poll返回到net_rx_action后,net_rx_action会将repoll链表重新整合到sd->poll_list,在退出net_rx_action时再次判断sd->poll_list是否为空,如果不为空,则继续触发rx软中断。

  1. static int napi_poll(struct napi_struct *n, struct list_head *repoll)
  2. {
  3. void *have;
  4. int work, weight;
  5. //先将napi->poll_list删除
  6. list_del_init(&n->poll_list);
  7. have = netpoll_poll_lock(n);
  8. //一次napi poll的配额
  9. weight = n->weight;
  10. /* This NAPI_STATE_SCHED test is for avoiding a race
  11. * with netpoll's poll_napi(). Only the entity which
  12. * obtains the lock and sees NAPI_STATE_SCHED set will
  13. * actually make the ->poll() call. Therefore we avoid
  14. * accidentally calling ->poll() when NAPI is not scheduled.
  15. */
  16. work = 0;
  17. if (test_bit(NAPI_STATE_SCHED, &n->state)) {
  18. work = n->poll(n, weight);
  19. trace_napi_poll(n);
  20. }
  21. WARN_ON_ONCE(work > weight);
  22. //本次napi poll的配额没有用完,进入下一循环
  23. if (likely(work < weight))
  24. goto out_unlock;
  25. /* Drivers must not modify the NAPI state if they
  26. * consume the entire weight. In such cases this code
  27. * still "owns" the NAPI instance and therefore can
  28. * move the instance around on the list at-will.
  29. */
  30. if (unlikely(napi_disable_pending(n))) {
  31. napi_complete(n);
  32. goto out_unlock;
  33. }
  34. //本次配额全部用完, 将gro链表的age超过一个tick周期的skb上送协议栈
  35. if (n->gro_list) {
  36. /* flush too old packets
  37. * If HZ < 1000, flush all packets.
  38. */
  39. napi_gro_flush(n, HZ >= 1000);
  40. }
  41. /* Some drivers may have called napi_schedule
  42. * prior to exhausting their budget.
  43. */
  44. if (unlikely(!list_empty(&n->poll_list))) {
  45. pr_warn_once("%s: Budget exhausted after napi rescheduled\n",
  46. n->dev ? n->dev->name : "backlog");
  47. goto out_unlock;
  48. }
  49. //如果本次额度用完,还需要继续poll,则将napi->poll_list重新加会到repoll
  50. list_add_tail(&n->poll_list, repoll);
  51. out_unlock:
  52. netpoll_poll_unlock(have);
  53. return work;
  54. }

ixgbe_poll

ixgbe_poll里将napi分配的额度按rx队列数均分,然后每个rx队列轮询去收包,如果有一个rx队列额度值用完,则标记本次poll还未完成;

  1. int ixgbe_poll(struct napi_struct *napi, int budget)
  2. {
  3. struct ixgbe_q_vector *q_vector =
  4. container_of(napi, struct ixgbe_q_vector, napi);
  5. struct ixgbe_adapter *adapter = q_vector->adapter;
  6. struct ixgbe_ring *ring;
  7. int per_ring_budget, work_done = 0;
  8. bool clean_complete = true;
  9. #ifdef CONFIG_IXGBE_DCA
  10. if (adapter->flags & IXGBE_FLAG_DCA_ENABLED)
  11. ixgbe_update_dca(q_vector);
  12. #endif
  13. ixgbe_for_each_ring(ring, q_vector->tx) {
  14. if (!ixgbe_clean_tx_irq(q_vector, ring, budget))
  15. clean_complete = false;
  16. }
  17. /* Exit if we are called by netpoll or busy polling is active */
  18. if ((budget <= 0) || !ixgbe_qv_lock_napi(q_vector))
  19. return budget;
  20. /* attempt to distribute budget to each queue fairly, but don't allow
  21. * the budget to go below 1 because we'll exit polling */
  22. //将配额数按rx队列数均分
  23. if (q_vector->rx.count > 1)
  24. per_ring_budget = max(budget/q_vector->rx.count, 1);
  25. else
  26. per_ring_budget = budget;
  27. ixgbe_for_each_ring(ring, q_vector->rx) {
  28. int cleaned = ixgbe_clean_rx_irq(q_vector, ring,
  29. per_ring_budget);
  30. work_done += cleaned;
  31. //如果有ring的配额用完,则标记clean_complete为True
  32. if (cleaned >= per_ring_budget)
  33. clean_complete = false;
  34. }
  35. ixgbe_qv_unlock_napi(q_vector);
  36. /* If all work not completed, return budget and keep polling */
  37. //如果有napi分配给rx队列的配额用完了,说明还有接收包需要继续处理,因此clean还未结束,返回到napi_poll,
  38. //napi_poll里会对gro_list链表里age超过1个tick的skb,先上送协议栈,避免消息包延时太多,并将napi->poll_list
  39. //重新加入到repoll链表,软中断处理函数退出本次流程前会将repoll重新加入sd->poll_list,并重新触发软中断
  40. if (!clean_complete)
  41. return budget;
  42. //所有的rx队列的额度都没有用完,说明没有消息包需要再处理了,强制将gro_list的skb全部上送协议栈
  43. /* all work done, exit the polling mode */
  44. napi_complete_done(napi, work_done);
  45. if (adapter->rx_itr_setting & 1)
  46. ixgbe_set_itr(q_vector);
  47. if (!test_bit(__IXGBE_DOWN, &adapter->state))
  48. //重新开启rx队列中断
  49. ixgbe_irq_enable_queues(adapter, BIT_ULL(q_vector->v_idx));
  50. return min(work_done, budget - 1);
  51. }

如果所有队列的额度值都没用完,则进入napi_complete_done流程,如果有开启gro,则将gro_skb全部上送协议栈,处理完成后通过ixgbe_irq_enable_queues重新是能rx队列中断,进入中断收包模式。

  1. void napi_complete_done(struct napi_struct *n, int work_done)
  2. {
  3. unsigned long flags;
  4. /*
  5. * don't let napi dequeue from the cpu poll list
  6. * just in case its running on a different cpu
  7. */
  8. if (unlikely(test_bit(NAPI_STATE_NPSVC, &n->state)))
  9. return;
  10. if (n->gro_list) {
  11. unsigned long timeout = 0;
  12. if (work_done)
  13. timeout = n->dev->gro_flush_timeout;
  14. //timeout默认为0,因此这里将gro skb全部上送协议栈
  15. if (timeout && NAPI_STRUCT_HAS(n, timer))
  16. hrtimer_start(&n->timer, ns_to_ktime(timeout),
  17. HRTIMER_MODE_REL_PINNED);
  18. else
  19. napi_gro_flush(n, false);
  20. }
  21. if (likely(list_empty(&n->poll_list))) {
  22. WARN_ON_ONCE(!test_and_clear_bit(NAPI_STATE_SCHED, &n->state));
  23. } else {
  24. /* If n->poll_list is not empty, we need to mask irqs */
  25. local_irq_save(flags);
  26. //将napi->poll_list从sd->poll_list移除,清楚napi的scheded状态
  27. __napi_complete(n);
  28. local_irq_restore(flags);
  29. }
  30. }

3、遗留点

收包模式从poll切换到中断模式的时机

1、poll模式下,消息包都处理完毕,主动切入中断模式;

2、????

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/2023面试高手/article/detail/252894
推荐阅读
相关标签
  

闽ICP备14008679号