当前位置:   article > 正文

内核数据结构 —— 内核队列 (kfifo)

kfifo

内核队列介绍


FIFO 先入先出队列是最常用的数据结构之一,在生产者——消费者模型中扮演了很重要的角色。

Linux 内核实现了一种称之为 kfifo 的内核队列,用于满足对数据有先入先出的场景。

对于通用的 fifo 而言,需要提供最基本的几个接口:

  • 队列的分配
  • 队列的初始化
  • 队列的释放
  • 队列的数据入列
  • 队列的数据出列
  • 队列判满
  • 队列判空

有了上述几组接口,我们才能够算是真正能够操作并使用一个队列。

内核队列的设计精妙之处在于:

  • 保证缓冲区大小为2的次幂,不是的向上取整为2的次幂。
  • 使用无符号整数保存输入(in)和输出(out)的位置,在输入输出时不对in和out的值进行模运算,而让其自然溢出,并能够保证in-out的结果为缓冲区中已存放的数据长度。
  • 将需要取模的运算用 & 操作代替( a % size = (a & (size − 1)) ), 这需要size保证为2的次幂。
  • 使用内存屏障(Memory Barrier)技术,实现单消费者和单生产者对kfifo的无锁并发访问,多个消费者、生产者的并发访问还是需要加锁的(本文不涉及)。

下面对这些精妙之处进行分析。

 

内核队列实现

Linux 的 kfifo 设计实现可以说是让人拍手叫绝,它的实现非常简洁高效。拜读后,直接献上双膝。让我们一起来领略它的风采。

它的实现,使用了一个环形缓冲区。配合两个位置参数 in/out ,它们的差值来指定当前的 fifo 的状态。

 

队列结构体

kfifo 依托于一个称之为 __kfifo 的结构体进行操作:

  1. struct __kfifo {
  2. unsigned int in; // 入列的时候增加的位置
  3. unsigned int out; // 出列的时候增加的位置
  4. unsigned int mask; // 巧妙的 mask 设计,同时包含了数据的个数信息
  5. unsigned int esize; // 元素的大小
  6. void *data; // 数据
  7. };

请注意,这里的 in,out 均是无符号的整数类型

 

队列的分配

  1. int __kfifo_alloc(struct __kfifo *fifo, unsigned int size,
  2. size_t esize, gfp_t gfp_mask)
  3. {
  4. /*
  5. * round up to the next power of 2, since our 'let the indices
  6. * wrap' technique works only in this case.
  7. */
  8. size = roundup_pow_of_two(size);
  9. fifo->in = 0;
  10. fifo->out = 0;
  11. fifo->esize = esize;
  12. if (size < 2) {
  13. fifo->data = NULL;
  14. fifo->mask = 0;
  15. return -EINVAL;
  16. }
  17. fifo->data = kmalloc_array(esize, size, gfp_mask);
  18. if (!fifo->data) {
  19. fifo->mask = 0;
  20. return -ENOMEM;
  21. }
  22. fifo->mask = size - 1;
  23. return 0;
  24. }
  25. EXPORT_SYMBOL(__kfifo_alloc);

kfifo 队列的使用,要求传入的 size 是2的整数次幂,所以函数的入口便是对其进行检查。

接着将 in/out 指向的位置初始化为0,因为此刻队列还未准备好,里面并没有任何数据。

esize 赋值给 fifo->esize 这个是代表了队列中数据的类型的 size,比如队列数据类型如果为 int,则 esize 等于 4.

接着调用 kmalloc_array 接口,意在分配一个 esize * size 大小的空间

最后将 fifo->mask 赋值为 size - 1

分配好队列后,实际情况如下所示:

 

队列的初始化

  1. int __kfifo_init(struct __kfifo *fifo, void *buffer,
  2. unsigned int size, size_t esize)
  3. {
  4. size /= esize;
  5. size = roundup_pow_of_two(size);
  6. fifo->in = 0;
  7. fifo->out = 0;
  8. fifo->esize = esize;
  9. fifo->data = buffer;
  10. if (size < 2) {
  11. fifo->mask = 0;
  12. return -EINVAL;
  13. }
  14. fifo->mask = size - 1;
  15. return 0;
  16. }
  17. EXPORT_SYMBOL(__kfifo_init);

 

队列的释放

  1. void __kfifo_free(struct __kfifo *fifo)
  2. {
  3. kfree(fifo->data);
  4. fifo->in = 0;
  5. fifo->out = 0;
  6. fifo->esize = 0;
  7. fifo->data = NULL;
  8. fifo->mask = 0;
  9. }
  10. EXPORT_SYMBOL(__kfifo_free);

 

队列的入列

  1. /*
  2. * internal helper to calculate the unused elements in a fifo
  3. */
  4. static inline unsigned int kfifo_unused(struct __kfifo *fifo)
  5. {
  6. return (fifo->mask + 1) - (fifo->in - fifo->out);
  7. }
  8. static void kfifo_copy_in(struct __kfifo *fifo, const void *src,
  9. unsigned int len, unsigned int off)
  10. {
  11. unsigned int size = fifo->mask + 1;
  12. unsigned int esize = fifo->esize;
  13. unsigned int l;
  14. off &= fifo->mask;
  15. if (esize != 1) {
  16. off *= esize;
  17. size *= esize;
  18. len *= esize;
  19. }
  20. l = min(len, size - off);
  21. memcpy(fifo->data + off, src, l);
  22. memcpy(fifo->data, src + l, len - l);
  23. /*
  24. * make sure that the data in the fifo is up to date before
  25. * incrementing the fifo->in index counter
  26. */
  27. smp_wmb();
  28. }
  29. unsigned int __kfifo_in(struct __kfifo *fifo,
  30. const void *buf, unsigned int len)
  31. {
  32. unsigned int l;
  33. l = kfifo_unused(fifo);
  34. if (len > l)
  35. len = l;
  36. kfifo_copy_in(fifo, buf, len, fifo->in);
  37. fifo->in += len;
  38. return len;
  39. }

入列使用 

__kfifo_in(struct __kfifo *fifo, const void *buf, unsigned int len) 

他首先调用了 kfifo_unused 来判断当前还有多少剩余的 buf len:

还有印象么,fifo->mask 在初始化的时候被赋值成为 size - 1, 所以这里 (fifo->mask + 1) 就等于申请的时候的 size 值。size 的值代表着总的存储对象的个数的指标,而每次在推数据进入 fifo 的时候,in 都会增加,取出数据的时候,out 都会增加。所以计算当前 fifo 中还剩余多少空间就使用了:

unused = (fifo->mask + 1) - (fifo->in - fifo->out)

注意:这里的 in/out 是不断增加的无符号整形

接着函数进入:

kfifo_copy_in(fifo, buf, len, fifo->in);

首先还是通过 fifo->mask 得到了整个 size 的大小。然后是用:

off &= fifo->mask;

展开就是

fifo->in = fifo->in & fifo->mask;

由于 fifo->mask 的值是 size - 1,比如我们传 size 为 1K,也就是 2 的 10 次幂,由于 fifo->in 是一直在增加的,让他和 mask 进行 & 操作后,其实就是取余操作了,因为我们往已经 kmalloc 好的内存中复制数据的时候,是需要知道他的实际的内存内的偏移量,所以这个 & 操作就完美实现了取余操作,使得其环形的反转。这样做的好处是: “位运算” 的效率高于“取模运算”的效率。

接着分析,判断 esize 的值,就是每个元素的占用内存的情况,如果不是 1 的话(一个字节),则需要对 off,size,len 分别乘以 esize。所以在这个计算后的所有数值,都以 1 byte 为单位的内存表示了。可以使用 memcpy 函数进行操作。

接着使用:

l = min(len, size - off);

取得复制数据和 size-off 之间的最小值,由于是 环形的缓冲区 ,所以在此处存在两种情况:

1. 即将入列的数据 小于 了当前 in 指向地方到最大的地方之间的差值:

2. 即将入列的数据 大于 了当前 in 指向地方到最大的地方之间的差值:

所以在这个地方,先去取一个 len 和 size-off 之间最小的那个值 l,即,先打算尝试把尾巴上能用的空间先用完。

紧接着调用:

  1. memcpy(fifo->data + off, src, l);
  2. memcpy(fifo->data, src + l, len - l);

如果是上述的第 1 种情况(len 为 l 的时候),上面两个 memcpy 的行为是:

第一条 memcpy :将 len 的数据 memcpy 到以 fifo->data (之前用过 kmalloc 分配的内存起始地址),加上 off 偏移(in 对应的偏移),的地方开始,copy 进 src 数据。

第二条 memcpy :len -l 为0,相当于什么都不做。

如果是上述的第 1 种情况(len 为 l 的时候),两个 memcpy 的行为是:

第一条 memcpy : 先将数据以剩余到顶端的长度进行数据拷贝。

第二条 memcpy : 将剩余的数据拷贝到 fifo->data 的起始位置。

实现环形缓冲区的数据拷贝

最后在退出 kfifo_copy_in 后,在 __kfifo_in 函数中对 fifo->in 做累加:

fifo->in += len;

做完上述的拷贝后,对于上述两种情况,最后体现出来的是:

1. 即将入列的数据 小于 了当前 in 指向地方到最大的地方之间的差值,入列后:

 

2. 即将入列的数据 大于 了当前 in 指向地方到最大的地方之间的差值,入列后:

 

队列的出列

队列的出列操作基本和入列的差不多,这里就贴代码,不深入分析了:

  1. static void kfifo_copy_out(struct __kfifo *fifo, void *dst,
  2. unsigned int len, unsigned int off)
  3. {
  4. unsigned int size = fifo->mask + 1;
  5. unsigned int esize = fifo->esize;
  6. unsigned int l;
  7. off &= fifo->mask;
  8. if (esize != 1) {
  9. off *= esize;
  10. size *= esize;
  11. len *= esize;
  12. }
  13. l = min(len, size - off);
  14. memcpy(dst, fifo->data + off, l);
  15. memcpy(dst + l, fifo->data, len - l);
  16. /*
  17. * make sure that the data is copied before
  18. * incrementing the fifo->out index counter
  19. */
  20. smp_wmb();
  21. }
  22. unsigned int __kfifo_out_peek(struct __kfifo *fifo,
  23. void *buf, unsigned int len)
  24. {
  25. unsigned int l;
  26. l = fifo->in - fifo->out;
  27. if (len > l)
  28. len = l;
  29. kfifo_copy_out(fifo, buf, len, fifo->out);
  30. return len;
  31. }
  32. EXPORT_SYMBOL(__kfifo_out_peek);
  33. unsigned int __kfifo_out(struct __kfifo *fifo,
  34. void *buf, unsigned int len)
  35. {
  36. len = __kfifo_out_peek(fifo, buf, len);
  37. fifo->out += len;
  38. return len;
  39. }
  40. EXPORT_SYMBOL(__kfifo_out);

这里多了一个 叫做 __kfifo_out_peek 的函数,该函数只是查看队列出列的那个结构的值,不会真正的取出队列。

 

队列的判空

队列的判空,主要是看 in 和 out 是否一致,如果一致的话,则说明队列是空的,也就是 empty。

 

队列的判满

队列的判满,主要手段是看 in - out 的值是否大于了 mask (size - 1)

 

in/out 溢出

细心的群众可能要问,当频繁的入列/出列后,in/out 不断的增加,万一 in/out 溢出,出现反转后,这个机制能够正常运转么?答案是肯定的。当 in 溢出到反转后,in - out 的值为负数,表示成为无符号的数,依然能够代表已经使用的 buffer 的长度。这正是这个机制的精妙之处。

 

内存屏障

使用内存屏障(Memory Barrier)技术,实现单消费者和单生产者对kfifo的无锁并发访问,多个消费者、生产者的并发访问还是需要加锁。

为什么kfifo实现的单生产/单消费模式的共享队列是不需要加锁同步的呢?天底下没有免费的午餐的道理人人都懂,下面我们就来看看kfifo实现并发无锁的奥秘。

我们知道 编译器编译源代码时,会将源代码进行优化,将源代码的指令进行重排序,以适合于CPU的并行执行。然而,内核同步必须避免指令重新排序,优化屏障(Optimization barrier)避免编译器的重排序优化操作,保证编译程序时在优化屏障之前的指令不会在优化屏障之后执行。

软件可通过读写屏障强制内存访问次序。读写屏障像一堵墙,所有在设置读写屏障之前发起的内存访问,必须先于在设置屏障之后发起的内存访问之前完成,确保内存访问按程序的顺序完成。Linux内核提供的内存屏障API函数说明如下表。内存屏障可用于多处理器和单处理器系统,如果仅用于多处理器系统,就使用smp_xxx函数,在单处理器系统上,它们什么都不要。

smp_rmb
适用于多处理器的读内存屏障。
smp_wmb
适用于多处理器的写内存屏障。
smp_mb
适用于多处理器的内存屏障。

所以在 kfifo_copy_in 和 kfifo_copy_out 的尾部都插入了 smp_wmb() 的写内存屏障的代码

它的作用是确保 fifo->in 和 fifo->out 的增加 len 的这个操作在内存屏障之后,也就是保证了在 SMP 多处理器下,一定是先完成了 fifo 的内存操作,然后再进行变量的增加。以免被优化后的混乱访问,导致策略失败。

不过,多个消费者、生产者的并发访问还是需要加锁限制。

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/601569
推荐阅读
相关标签
  

闽ICP备14008679号