当前位置:   article > 正文

【项目实战】高并发内存池_基于tls的高并发内存池

基于tls的高并发内存池

目录

项目介绍

1.这个项目做的是什么?

2.这个项目的要求的知识储备和难度?

内存池的定义

1.池化技术

2.内存池

 3.内存池主要解决的问题

4.malloc

定长内存池

什么是定长内存池?

定长内存池变量介绍

定长内存池申请一个对象大小空间实现

定长内存池释放一个对象大小空间

Threadcache

Threadcache设计框架

内存碎片问题

FreeList的结构

根据申请大小计算对齐数及桶号

一、计算对齐数

二、计算对应的哈希桶桶号

Threadcache类的设计

TLS无锁获取机制

一、申请内存对象

二、向中心缓存获取对象

三、释放内存对象

Centralache

Centralache的整体设计

 span的结构设计

 SpanList结构设计

 Centralache类的设计

获取一个新的Span

从中心缓存获取一定数量的对象给thread cache

把threadache中的对象还给span

 Pageache

Pageache的整体设计框架

Pageache类的设计

(难点)两个关系的转化与映射

页号与地址的对应

页号与span的对应关系

获取一个K页的span

映射函数设计

 pageache回收span

项目优化与测试

优化一  利用定长内存池代替new

优化二 释放对象时不传对象大小

优化三 利用基数树代替unordered_map优化

测试与总结


项目介绍

1.这个项目做的是什么?

          当前项目是实现一个高并发的内存池,他的原型是 google 的一个开源项目 tcmalloc tcmalloc 全称hread-Caching Malloc,即线程缓存的 malloc ,实现了高效的多线程内存管理,用于替代系统的内存分配相关的函数(malloc free )。
           我们这个项目是把 tcmalloc 最核心的框架简化后拿出来,模拟实现出一个自己的高并发内存池,目的就 是学习tcamlloc 的精华,这种方式有点类似我们之前学习 STL 容器的方式。但是相比 STL 容器部分,tcmalloc的代码量和复杂度上升了很多,大家要有心理准备。当前另一方面,难度的上升,我们的收获 和成长也是在这个过程中同步上升。
          另一方面 tcmalloc 是全球大厂 google 开源的,可以认为当时顶尖的 C++ 高手写出来的,他的知名度也是非常高的,不少公司都在用它,Go 语言直接用它做了自己内存分配器。

2.这个项目的要求的知识储备和难度?

         这个项目会用到 C/C++ 、数据结构(链表、哈希桶)、操作系统内存管理、单例模式、多线程、互斥锁

内存池的定义

1.池化技术

是在计算机技术中经常使用的一种设计模式,其内涵在于:将程序中需要经常使用的核心资源
先申请出来,放到一个池内,由程序自己管理,这样可以提高资源的使用效率,也可以保证本程
序占有的资源数量。 经常使用的池技术包括内存池、线程池和连接池等,其中尤以内存池和线程
池使用最多。

2.内存池

        内存池是指程序预先从操作系统申请一块足够大内存,此后,当程序中需要申请内存的时候,不是直接 向操作系统申请,而是直接从内存池中获取;同理,当程序释放内存的时候,并不真正将内存返回给操作系统,而是返回内存池。当程序退出( 或者特定时间 ) 时,内存池才将之前申请的内存真正释放。

 3.内存池主要解决的问题

    内存池主要解决的当然是效率的问题,其次如果作为系统的内存分配器的角度,还需要解决一下内存碎片的问题。那么什么是内存碎片呢?
再需要补充说明的是内存碎片分为外碎片和内碎片,上面我们讲的外碎片问题。外部碎片是一些空闲的连续内存区域太小,这些内存空间不连续,以至于合计的内存足够,但是不能满足一些的内存分配申请需求。内部碎片是由于一些对齐的需求,导致分配出去的空间中一些内存无法被利用。内碎片问题,我们后面项目就会看到,那会再进行更准确的理解。

4.malloc

C/C++ 中我们要动态申请内存都是通过 malloc 去申请内存,但是我们要知道,实际我们不是直接去堆获取内存的,而malloc 就是一个内存池。 malloc() 相当于向操作系统 批发 了一块较大的内存空间,然后 零售 给程序用。当全部“ 售完 或程序有大量的内存需求时,再根据实际需求向操作系统 进货 malloc 的实现方式有很多种,一般不同编译器平台用的都是不同的。比如windows vs 系列用的微软自己写的一套,linux gcc用的 glibc 中的 ptmalloc 。下面有几篇关于这块的文章,大概可以去简单看看了解一下,关于ptmalloc,学完我们的项目以后,有兴趣大家可以去看看他的实现细节。

   

定长内存池

作为程序员 (C/C++) 我们知道申请内存使用的是 malloc malloc 其实就是一个通用的大众货,什么场景下都可以用,但是什么场景下都可以用就意味着什么场景下都不会有很高的性能 ,下面我们就先来设计一个定长内存池做个开胃菜,当然这个定长内存池在我们后面的高并发内存池中也是有价值的,所以学习他目的有两层,先熟悉一下简单内存池是如何控制的,第二他会作为我们后面内存池的一个基础组件。

什么是定长内存池?

我们设计这个内存池之前,我们先来设计一个定长的内存池,什么是定长内存池呢,它就像new 一个对象一样,每次new的时候对象的大小是固定的。也就是说申请内存的大小是已经创建出的类的对象固定大小,这个时候相当于我们为这个类创建一个内存池,每次申请的都是这个类的对象大小,为一个确定的值。

定长内存池变量介绍

  1. template<class T>
  2. class ObjectPool
  3. {
  4. private:
  5. char* _memory = nullptr; // 指向大块内存的指针
  6. size_t _remainBytes = 0; // 大块内存在切分过程中剩余字节数
  7. void* _freeList = nullptr; // 还回来过程中链接的自由链表的头指针
  8. };

因此,定长内存池当中包含三个成员变量:

  • _memory:指向大块内存的指针。
  • _remainBytes:大块内存切分过程中剩余字节数。
  • _freeList:还回来过程中链接的自由链表的头指针。

定长内存池申请一个对象大小空间实现

  1. template<class T>
  2. class ObjectPool
  3. {
  4. public:
  5. T* New()
  6. {
  7. T* obj = nullptr;
  8. // 优先把还回来内存块对象,再次重复利用
  9. if (_freeList)
  10. {
  11. void* next = *((void**)_freeList);
  12. obj = (T*)_freeList;
  13. _freeList = next;
  14. }
  15. else
  16. {
  17. // 剩余内存不够一个对象大小时,则重新开大块空间
  18. if (_remainBytes < sizeof(T))
  19. {
  20. _remainBytes = 128 * 1024;
  21. _memory = (char*)malloc(_remainBytes);
  22. if (_memory == nullptr)
  23. {
  24. throw std::bad_alloc();
  25. }
  26. }
  27. obj = (T*)_memory;
  28. size_t objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T);
  29. _memory += objSize;
  30. _remainBytes -= objSize;
  31. }
  32. // 定位new,显示调用T的构造函数初始化
  33. new(obj)T;
  34. return obj;
  35. }

这里有一个很巧妙的构造,为了让释放后的内存块在freelist中链接起来,由于32位下,一个指针的大小是4字节,所以我们可以取这个对象的头四个字节的内容填充下一个回收对象的首地址,这样就实现了把他们链接起来。

还有一处很好的设计,就是我们在32位下指针是4个字节,但是在64位下,指针就是8个字节,我们应该如何来做呢?

*(void**)在这里就是一个很好的设计,当在64位下的时候,void*是指针类型8字节,void**也是指针类型,8字节,我对void**解引用,得到的是void*8字节,这个时候,我们就可以在64位下拿到前8个字节,而32位下,就void**照样是4个字节。这样就使得了void**在32位和64位下都适用了。

总结申请步骤:先看回收链表中有没有剩余,如果有的话,将剩余链表中头删一个对象,然后给obj。如果没有了,就看下现在的剩余内存池够不够创建一个对象的,如果不够的话,找系统申请一个128K的空间,然后在新申请的内存空间中切一个obj给他,当然还有一种情况,就是确实有剩余,但是对象本身不够一个指针大小(32位下4字节),这个时候我们就可以直接给他一个指针大小就可以了。

定长内存池释放一个对象大小空间

  1. void Delete(T* obj)
  2. {
  3. // 显示调用析构函数清理对象
  4. obj->~T();
  5. // 头插
  6. *(void**)obj = _freeList;
  7. _freeList = obj;
  8. }

 就是一个简单的头插即可,但是这个地方要注意的是我们在申请和释放的时候都需要显示的调用我们的构造函数和析构函数,因为在这里创建对象和删除对象的操作都是我们自己写的接口,一旦我们自己写了,系统就不会进行自动调用,我们就必须显示的调用构造函数和析构函数来进行该类的初始化和清理工作,因为构造函数和析构函数的作用并不是仅仅开辟空间和销毁空间,他们也有可能在其中做了很多其他的操作,所以我们必须进行显示调用。​​​​​​

高并发内存池整体框架设计
现代很多的开发环境都是多核多线程,在申请内存的场景下,必然存在激烈的锁竞争问题。 malloc 本身其实已经很优秀,那么我们项目的原型tcmalloc 就是在多线程高并发的场景下更胜一筹,所以这次我们
实现的 内存池需要考虑以下几方面的问题
1. 性能问题。
2. 多线程环境下,锁竞争问题。
3. 内存碎片问题。
concurrent memory pool 主要由以下 3 个部分构成:
1. thread cache :线程缓存是每个线程独有的,用于小于 256KB 的内存的分配, 线程从这里申请内 存不需要加锁,每个线程独享一个 cache ,这也就是这个并发线程池高效的地方
  2. central cache :中心缓存是所有线程所共享, thread cache 按需从 central cache 中获取 的对象。central cache 合适的时机回收 thread cache 中的对象,避免一个线程占用了太多的内存,而
其他线程的内存吃紧, 达到内存分配在多个线程中更均衡的按需调度的目的 central cache 是存
在竞争的,所以从这里取内存对象是需要加锁, 首先这里用的是桶锁,其次只有 thread cache
没有内存对象时才会找 central cache ,所以这里竞争不会很激烈
   
  3. page cache :页缓存是在 central cache 缓存上面的一层缓存,存储的内存是以页为单位存储及分配的,central cache 没有内存对象时,从 page cache 分配出一定数量的 page ,并切割成定长大小的小块内存,分配给central cache 当一个 span 的几个跨度页的对象都回收以后, page cache 会回收 central cache 满足条件的 span 对象,并且合并相邻的页,组成更大的页,缓解内存碎片 的问题

 

Threadcache

Threadcache设计框架

之前我们实现定长内存池的时候是一个永远申请一个固定字节的大小,现在我们想要申请随机空间的大小,那么这个时候我们可以采取一个类似于哈希桶的结构,当我们想要开辟某个大小空间的时候,我们就从对应的下标的桶中取出一个对象大小。

内存碎片问题

那么现在问题又来了,我们需要开辟一个多大的哈希桶呢来存储呢?我们这里可以定义最大一次性申请128K的空间,一次性申请超过128K的情况我们后边再说。现在我们是想把这128K划分成不同的情况,因为我们不可能开辟128*1024大小数组,开销太大了,在每次申请的时候,我们都给它申请差不多的空间。

这个时候我们就需要做一个平衡了,就是说我们如果给的空间与实际要的空间差距很小的话,那么证明我们需要开辟数组的大小更大(因为精确度提高,单位密度划线就更大),如果我们开辟的空间与实际需求空间太小,我们就会造成空间的浪费,所以这里我们引入一个机制,就是内存对齐的机制。

那么我们需要开辟多大的空间呢,如果都按照8字节来进行对齐的话,那么就要128*1024/8=32768,所以我们就引入了下面这个机制。

 我们计算一下空间的利用率

  根据上面的公式,我们要得到某个区间的最大浪费率,就应该让分子取到最大,让分母取到最小。比如129~1024这个区间,该区域的对齐数是16,那么最大浪费的字节数就是15,而最小对齐后的字节数就是这个区间内的前16个数所对齐到的字节数,也就是144,那么该区间的最大浪费率也就是15 ÷ 144 ≈ 10.42 %左右,而1025+128=1052,而最大浪费空间就是127,这个是最小对齐数是127÷1152≈11.02%,同理1023÷9216≈11.10%。也就是说,我们可以把空间碎片率控制在百分之11左右。

FreeList的结构

  1. static void*& NextObj(void* obj)
  2. {
  3. return *(void**)obj;
  4. }
  5. class FreeList
  6. {
  7. public:
  8. void Push(void* obj)
  9. {
  10. assert(obj);
  11. // 头插
  12. //*(void**)obj = _freeList;
  13. NextObj(obj) = _freeList;
  14. _freeList = obj;
  15. ++_size;
  16. }
  17. void PushRange(void* start, void* end, size_t n)
  18. {
  19. NextObj(end) = _freeList;
  20. _freeList = start;
  21. _size += n;
  22. }
  23. void PopRange(void*& start, void*& end, size_t n)
  24. {
  25. assert(n <= _size);
  26. start = _freeList;
  27. end = start;
  28. for (size_t i = 0; i < n - 1; ++i)
  29. {
  30. end = NextObj(end);
  31. }
  32. _freeList = NextObj(end);
  33. NextObj(end) = nullptr;
  34. _size -= n;
  35. }
  36. void* Pop()
  37. {
  38. assert(_freeList);
  39. // 头删
  40. void* obj = _freeList;
  41. _freeList = NextObj(obj);
  42. --_size;
  43. return obj;
  44. }
  45. bool Empty()
  46. {
  47. return _freeList == nullptr;
  48. }
  49. size_t& MaxSize()
  50. {
  51. return _maxSize;
  52. }
  53. size_t Size()
  54. {
  55. return _size;
  56. }
  57. private:
  58. void* _freeList = nullptr;
  59. size_t _maxSize = 1;
  60. size_t _size = 0;
  61. };

这里就与前面的定长内存池实现就差不多了,size记录freelist的个数,这里统一用Nextobj来直接去这个对象的前四个字节。

根据申请大小计算对齐数及桶号

计算对齐数

  1. static inline size_t RoundUp(size_t bytes)
  2. {
  3. if (bytes <= 128)
  4. {
  5. return _RoundUp(bytes, 8);
  6. }
  7. else if (bytes <= 1024)
  8. {
  9. return _RoundUp(bytes, 16);
  10. }
  11. else if (bytes <= 8 * 1024)
  12. {
  13. return _RoundUp(bytes, 128);
  14. }
  15. else if (bytes <= 64 * 1024)
  16. {
  17. return _RoundUp(bytes, 1024);
  18. }
  19. else if (bytes <= 256 * 1024)
  20. {
  21. return _RoundUp(bytes, 8 * 1024);
  22. }
  23. else
  24. {
  25. assert(false);
  26. return -1;
  27. }
  28. }
  29. static inline size_t _RoundUp(size_t bytes, size_t alignNum)
  30. {
  31. size_t alignSize = 0;
  32. if (bytes%alignNum != 0)
  33. {
  34. alignSize = (bytes / alignNum + 1)*alignNum;
  35. }
  36. else
  37. {
  38. alignSize = bytes;
  39. }
  40. return alignSize;
  41. }

 我们可以通过这种方式在计算其对齐数的大小,相当于对于一个非对齐数的值,往下加一个对应区间内的对齐数,就是该对齐数的值,比较通俗易懂,而其实谷歌的大佬也为我们提供了另一种写法,是这个样子的,两种达到的效果是一样的。

  1. static inline size_t _RoundUp(size_t bytes, size_t alignNum)
  2. {
  3. return ((bytes + alignNum - 1) & ~(alignNum - 1));
  4. }

二、计算对应的哈希桶桶号

  1. size_t _Index(size_t bytes, size_t alignNum)
  2. {
  3. if (bytes % alignNum == 0)
  4. {
  5. return bytes / alignNum - 1;
  6. }
  7. else
  8. {
  9. return bytes / alignNum;
  10. }
  11. }
  12. // 计算映射的哪一个自由链表桶
  13. static inline size_t Index(size_t bytes)
  14. {
  15. assert(bytes <= MAX_BYTES);
  16. // 每个区间有多少个链
  17. static int group_array[4] = { 16, 56, 56, 56 };
  18. if (bytes <= 128){
  19. return _Index(bytes, 8);
  20. }
  21. else if (bytes <= 1024){
  22. return _Index(bytes - 128, 16) + group_array[0];
  23. }
  24. else if (bytes <= 8 * 1024){
  25. return _Index(bytes - 1024, 128) + group_array[1] + group_array[0];
  26. }
  27. else if (bytes <= 64 * 1024){
  28. return _Index(bytes - 8 * 1024, 1024) + group_array[2] + group_array[1] + group_array[0];
  29. }
  30. else if (bytes <= 256 * 1024){
  31. return _Index(bytes - 64 * 1024, 8*1024) + group_array[3] + group_array[2] + group_array[1] + group_array[0];
  32. }
  33. else{
  34. assert(false);
  35. }
  36. return -1;
  37. }

原理也是显而易见的,我们先计算他在这个区间排在第几个位置,再加上这个区间之前的全部的桶号就是这个申请大小锁要去找的桶号,就是这个桶号的下标。

  1. static inline size_t _Index(size_t bytes, size_t align_shift)
  2. {
  3. return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1;
  4. }

 当然,谷歌程序员也给我们提供了一种更高效的算法,其效果与我们之前实现的是一样的。

  1. static const size_t MAX_BYTES = 256 * 1024;
  2. static const size_t NFREELIST = 208;

按照刚才的设计,计算出来一共是208个桶,我们定义最大申请大小是256K,这样的话就把刚才的内容和现在的联系起来了。

Threadcache类的设计

  1. class ThreadCache
  2. {
  3. public:
  4. // 申请和释放内存对象
  5. void* Allocate(size_t size);
  6. void Deallocate(void* ptr, size_t size);
  7. // 从中心缓存获取对象
  8. void* FetchFromCentralCache(size_t index, size_t size);
  9. // 释放对象时,链表过长时,回收内存回到中心缓存
  10. void ListTooLong(FreeList& list, size_t size);
  11. private:
  12. FreeList _freeLists[NFREELIST];
  13. };
  14. // TLS thread local storage
  15. static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;

TLS无锁获取机制

        我们设计的目的是想让每一个线程都可以拿到自己独有的 threadcache,而想要在一个线程中获取到属于自己的资源就必须加锁进行保护,而加锁必定会浪费时间资源,这个地方我们可以使用TLS机制来进行访问,具体可以看下面两个博主对TLS具体介绍。

Thread Local Storage---__thread 关键字的使用方法_鱼思故渊的博客-CSDN博客

Thread Local Storage_evilswords的博客-CSDN博客

一、申请内存对象

  1. void* ThreadCache::Allocate(size_t size)
  2. {
  3. assert(size <= MAX_BYTES);
  4. size_t alignSize = SizeClass::RoundUp(size);
  5. size_t index = SizeClass::Index(size);
  6. if (!_freeLists[index].Empty())
  7. {
  8. return _freeLists[index].Pop();
  9. }
  10. else
  11. {
  12. return FetchFromCentralCache(index, alignSize);
  13. }
  14. }

原理很简单,就是先判断是否小于256K,然后计算字节对齐数和桶号,在到对应的哈希桶下面去弹出一个桶大小的对象给它,但是如果对应表没有了,那么这个时候我们就需要到对应的下一层centralcache去拿数据了。

二、向中心缓存获取对象

这个时候问题就来了,我们一次向中心缓存获取多少个对象比较好呢,如果一次申请太少,而后面我们需要这个空间大小的对象申请很多的话,那么就会造成频繁向中心缓存申请的问题,造成效率下降,但是如果一次性申请太多的话,用不了就可能造成资源的浪费,所以我们这里可以用一个慢增长的机制。

什么是慢增长机制呢?

就是在申请对象的时候,刚刚开始给1,往后再申请的时候,我们让最大值+1,当然也不能一直加下去,我们可以设定一个最高值,就是用最大申请大小/这个申请大小,来表示这个大小一次性申请的最大上限,然后最终两者取小即可。

  1. void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)
  2. {
  3. // 慢开始反馈调节算法
  4. // 1、最开始不会一次向central cache一次批量要太多,因为要太多了可能用不完
  5. // 2、如果你不要这个size大小内存需求,那么batchNum就会不断增长,直到上限
  6. // 3、size越大,一次向central cache要的batchNum就越小
  7. // 4、size越小,一次向central cache要的batchNum就越大
  8. size_t batchNum = min(_freeLists[index].MaxSize(), SizeClass::NumMoveSize(size));
  9. if (_freeLists[index].MaxSize() == batchNum)
  10. {
  11. _freeLists[index].MaxSize() += 1;
  12. }
  13. void* start = nullptr;
  14. void* end = nullptr;
  15. size_t actualNum = CentralCache::GetInstance()->FetchRangeObj(start, end, batchNum, size);
  16. assert(actualNum > 0);
  17. if (actualNum == 1)
  18. {
  19. assert(start == end);
  20. return start;
  21. }
  22. else
  23. {
  24. _freeLists[index].PushRange(NextObj(start), end, actualNum-1);
  25. return start;
  26. }
  27. }

三、释放内存对象

  1. void ThreadCache::Deallocate(void* ptr, size_t size)
  2. {
  3. assert(ptr);
  4. assert(size <= MAX_BYTES);
  5. // 找对映射的自由链表桶,对象插入进入
  6. size_t index = SizeClass::Index(size);
  7. _freeLists[index].Push(ptr);
  8. // 当链表长度大于一次批量申请的内存时就开始还一段list给central cache
  9. if (_freeLists[index].Size() >= _freeLists[index].MaxSize())
  10. {
  11. ListTooLong(_freeLists[index], size);
  12. }
  13. }
  14. void ThreadCache::ListTooLong(FreeList& list, size_t size)
  15. {
  16. void* start = nullptr;
  17. void* end = nullptr;
  18. list.PopRange(start, end, list.MaxSize());
  19. CentralCache::GetInstance()->ReleaseListToSpans(start, size);
  20. }

我们先断言一下是否归还的大小大于最大值,然后计算桶号,把归还回来的插入对应的桶,插入桶之后,桶里的对象就增加了,当增加到一定程度的时候我们就应该把一部分对象归还给该对象。

Centralache

Centralache的整体设计

我们在这里选择与threadache哈希桶一一对应的模式,只不过这次挂的并不是一个一个对象了,而是一个又一个span,span块的大小是以页为单位的(这里页我们设计成8K的大小),每一个span都按照对应桶号的字节对齐大小且成了一个又一个的小对象,然后span与span之间的关系我们不再用头四个字节存地址了(因为存的是下一个小对象的地址),我们用双向链表来维护他们之间的关系。

 span的结构设计

  1. struct Span
  2. {
  3. PAGE_ID _pageId = 0; // 大块内存起始页的页号
  4. size_t _n = 0; // 页的数量
  5. Span* _next = nullptr; // 双向链表的结构
  6. Span* _prev = nullptr;
  7. size_t _useCount = 0; // 切好小块内存,被分配给thread cache的计数
  8. void* _freeList = nullptr; // 切好的小块内存的自由链表
  9. bool _isUse = false; // 是否在被使用
  10. };

由于是双向循环的链表,我们就按照之前的双向链表来定义前后指针,以及这个span是否正在被使用,以及_usecount来表示用了多少块了,如果usecount=0,就证明全部的内存块都还回来了。以及freelist,就是span下面切的小对象的链表。

在这里我们详细谈一下页号,当我们在32位系统下,我们有2的32次方个地址空间,而64位系统下,有2的64次方个地址空间。我们以8K作为1页的大小,那么32位系统下就有2的19次方页,而64位系统下,就有2的51次方页,这里我们如何来表示页号呢,就可以使用size_t来表示32位的大小,用longlong来表示64位下的大小。

  1. #ifdef _WIN64
  2. typedef unsigned long long PAGE_ID;
  3. #elif _WIN32
  4. typedef size_t PAGE_ID;
  5. #else
  6. // linux
  7. #endif

另外64位下也是可以识别32位的,所以要先把64位下的情况写在前面。

 SpanList结构设计

  1. class SpanList
  2. {
  3. public:
  4. SpanList()
  5. {
  6. _head = new Span;
  7. _head->_next = _head;
  8. _head->_prev = _head;
  9. }
  10. Span* Begin()
  11. {
  12. return _head->_next;
  13. }
  14. Span* End()
  15. {
  16. return _head;
  17. }
  18. bool Empty()
  19. {
  20. return _head->_next == _head;
  21. }
  22. void PushFront(Span* span)
  23. {
  24. Insert(Begin(), span);
  25. }
  26. Span* PopFront()
  27. {
  28. Span* front = _head->_next;
  29. Erase(front);
  30. return front;
  31. }
  32. void Insert(Span* pos, Span* newSpan)
  33. {
  34. assert(pos);
  35. assert(newSpan);
  36. Span* prev = pos->_prev;
  37. // prev newspan pos
  38. prev->_next = newSpan;
  39. newSpan->_prev = prev;
  40. newSpan->_next = pos;
  41. pos->_prev = newSpan;
  42. }
  43. void Erase(Span* pos)
  44. {
  45. assert(pos);
  46. assert(pos != _head);
  47. Span* prev = pos->_prev;
  48. Span* next = pos->_next;
  49. prev->_next = next;
  50. next->_prev = prev;
  51. }
  52. private:
  53. Span* _head;
  54. public:
  55. std::mutex _mtx; // 桶锁
  56. };

 这里就是一个简单的双向链表的各种操作,但是唯一与双向链表不同的是,在这里我们定义了一个桶锁,当两个线程需要访问同一个桶的时候,这个时候我们就需要进行加锁了。

 Centralache类的设计

  1. class CentralCache
  2. {
  3. public:
  4. static CentralCache* GetInstance()
  5. {
  6. return &_sInst;
  7. }
  8. // 获取一个非空的span
  9. Span* GetOneSpan(SpanList& list, size_t byte_size);
  10. // 从中心缓存获取一定数量的对象给thread cache
  11. size_t FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size);
  12. // 将一定数量的对象释放到span跨度
  13. void ReleaseListToSpans(void* start, size_t byte_size);
  14. private:
  15. SpanList _spanLists[NFREELIST];
  16. private:
  17. CentralCache()
  18. {}
  19. CentralCache(const CentralCache&) = delete;
  20. static CentralCache _sInst;
  21. };

        由于这里不再是单个线程独有的部分了,而是被所有线程共用一个的装置,所以我们需要在这里设计成单例模式,其实不需要设计成饿汉模式,那样的话就比较复杂,在这里设计成懒汉模式就够了。

获取一个新的Span

  1. Span* CentralCache::GetOneSpan(SpanList& list, size_t size)
  2. {
  3. // 查看当前的spanlist中是否有还有未分配对象的span
  4. Span* it = list.Begin();
  5. while (it != list.End())
  6. {
  7. if (it->_freeList != nullptr)
  8. {
  9. return it;
  10. }
  11. else
  12. {
  13. it = it->_next;
  14. }
  15. }
  16. // 先把central cache的桶锁解掉,这样如果其他线程释放内存对象回来,不会阻塞
  17. list._mtx.unlock();
  18. // 走到这里说没有空闲span了,只能找page cache要
  19. PageCache::GetInstance()->_pageMtx.lock();
  20. Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size));
  21. span->_isUse = true;
  22. PageCache::GetInstance()->_pageMtx.unlock();
  23. // 对获取span进行切分,不需要加锁,因为这会其他线程访问不到这个span
  24. // 计算span的大块内存的起始地址和大块内存的大小(字节数)
  25. char* start = (char*)(span->_pageId << PAGE_SHIFT);
  26. size_t bytes = span->_n << PAGE_SHIFT;
  27. char* end = start + bytes;
  28. // 把大块内存切成自由链表链接起来
  29. // 1、先切一块下来去做头,方便尾插
  30. span->_freeList = start;
  31. start += size;
  32. void* tail = span->_freeList;
  33. int i = 1;
  34. while (start < end)
  35. {
  36. ++i;
  37. NextObj(tail) = start;
  38. tail = NextObj(tail); // tail = start;
  39. start += size;
  40. }
  41. // 切好span以后,需要把span挂到桶里面去的时候,再加锁
  42. list._mtx.lock();
  43. list.PushFront(span);
  44. return span;
  45. }

获取一个非空的span,注意span并不是像obj一样分发完了就不存在了,它的结构是一个固定的结构,所以我们每次都要遍历一遍在这个桶的spanlist的链表,看是否有空的span。

如果没有的话我们就应该往下一层pageache中去获取span,获取完成以后,把span的使用设置成正在被使用的状态,这个时候问题又来了,我们获取一个几页的span比较合适呢?

  1. static size_t NumMovePage(size_t size)
  2. {
  3. size_t num = NumMoveSize(size);
  4. size_t npage = num*size;
  5. npage >>= PAGE_SHIFT;
  6. if (npage == 0)
  7. npage = 1;
  8. return npage;
  9. }

        我们可以先找到对应的申请大小一次往centralache一次最多拿多少个对象,然后用最大申请的对象*每个对象的大小,得到的是threadache一次向centralache最多获取的字节数,然后÷8K就是我们要申请多少页的span。

         然后把页号转化为地址,然后把页号*页字节数(8K),就是这个span一共有多少个字节,用本页的起始地址+字节数就是这个span结尾地址,这个时候我们就拿到了span的两端,即start和end。把一个span切成对应对象的大小(先切下一个头节点方便尾插)前4个字节存下一个的地址。

  关于的问题

         我们这里的加锁是以桶为单位的,只有两个线程访问到了同一个桶的时候,我们这个时候才需要加锁,当centralache不够的时候,需要往pageache中拿而这个时候就可以把桶锁释放掉,因为我们现在是要到pageache中进行操作,而有可能其他线程这个时候正想把一些对象还回来,所以我们这里的操作就是把桶锁放开。

       而我们向pageache拿到一个span后,需要把大锁放开,因为这个span是我们新申请的,其他的线程就算是再去pageache中申请,也不可能拿到当前已经申请完成的span,所以这里我们可以把大锁放开,然后再进行span的切分操作。

      还有一个问题就是,当切分好了对象的span插入到对应的桶的时候,这个时候的在对应的桶里进行插入操作,需要把桶锁加回来,有可能会造成一个线程在插入,另一个线程在获取,两个线程就可能对同一个桶同时进行操作,线程就不安全了。

从中心缓存获取一定数量的对象给thread cache

  1. size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size)
  2. {
  3. size_t index = SizeClass::Index(size);
  4. _spanLists[index]._mtx.lock();
  5. Span* span = GetOneSpan(_spanLists[index], size);
  6. assert(span);
  7. assert(span->_freeList);
  8. // 从span中获取batchNum个对象
  9. // 如果不够batchNum个,有多少拿多少
  10. start = span->_freeList;
  11. end = start;
  12. size_t i = 0;
  13. size_t actualNum = 1;
  14. while ( i < batchNum - 1 && NextObj(end) != nullptr)
  15. {
  16. end = NextObj(end);
  17. ++i;
  18. ++actualNum;
  19. }
  20. span->_freeList = NextObj(end);
  21. NextObj(end) = nullptr;
  22. span->_useCount += actualNum;
  23. _spanLists[index]._mtx.unlock();
  24. return actualNum;
  25. }

当上层threadache中的对象不够的时候,此时就向中心缓存centralache中的对应位置的哈希桶内获取一个span(刚刚已经写了获取span的函数),在这里定义start和end为输出型参数,把span的头节点给start,因为span已经执行完了切对象的操作(把每个对象个大小的内存块的),所以我们直接通过前4个字节遍历对应span下的_freelist的内存块,获取对应的对象个数,并把获取的结尾位置给end。

当然,有可能因为span中的对象个数不足我们想要获取的,这个时候有多少拿多少,然后每获取一个实际对象,使用块计数++(方便判断都还回来的时候,那时usecount==0,此时就可以归还这个span给pageache),完成操作之后把锁解开。

【误区:这个时候不要担心一次性获取不完足够的对象,然后再去找pageache中获取新的span再拿没获取完的对象,没有必要,因为我们每次申请空间实际上只需要弹出一个对象就够了(因为那1个对象的大小就是经过计算、对齐之后的本次申请大小,大小必然是大于等于我们的申请空间),所以说只要span不为空,只要还有一次对象,相当于就满足了这次的申请空间的需求,而下次再申请的时候,如果对应spanlist中一个对象都没有的话,Getonespan函数就会到对应的pageache获取,所以这里不够时,就多少拿多少就行。】

把threadache中的对象还给span

  1. void CentralCache::ReleaseListToSpans(void* start, size_t size)
  2. {
  3. size_t index = SizeClass::Index(size);
  4. _spanLists[index]._mtx.lock();
  5. while (start)
  6. {
  7. void* next = NextObj(start);
  8. Span* span = PageCache::GetInstance()->MapObjectToSpan(start);
  9. NextObj(start) = span->_freeList;
  10. span->_freeList = start;
  11. span->_useCount--;
  12. // 说明span的切分出去的所有小块内存都回来了
  13. // 这个span就可以再回收给page cache,pagecache可以再尝试去做前后页的合并
  14. if (span->_useCount == 0)
  15. {
  16. _spanLists[index].Erase(span);
  17. span->_freeList = nullptr;
  18. span->_next = nullptr;
  19. span->_prev = nullptr;
  20. // 释放span给page cache时,使用page cache的锁就可以了
  21. // 这时把桶锁解掉
  22. _spanLists[index]._mtx.unlock();
  23. PageCache::GetInstance()->_pageMtx.lock();
  24. PageCache::GetInstance()->ReleaseSpanToPageCache(span);
  25. PageCache::GetInstance()->_pageMtx.unlock();
  26. _spanLists[index]._mtx.lock();
  27. }
  28. start = next;
  29. }
  30. _spanLists[index]._mtx.unlock();
  31. }
  32. void ThreadCache::ListTooLong(FreeList& list, size_t size)
  33. {
  34. void* start = nullptr;
  35. void* end = nullptr;
  36. list.PopRange(start, end, list.MaxSize());
  37. CentralCache::GetInstance()->ReleaseListToSpans(start, size);
  38. }

当threadache中的freelist长度超过一次性申请的最大长度MaxSize的时候,我们就可以把threadahche中的MaxSize对象还给centralache,可以利用popRange先把MaxSize对象在threadache中删去,又因为popRange的start与end是输出型参数,我们可以拿到删除部分的头节点,popRange可以把结尾部分的对象的下一个节点是空,所以回收这些对象的时候可以顺着start一直访问到空的时候就可以了。

我们传入size,先计算这个对象在对应的哪一个桶,加上桶锁,然后找到一个对应span(具体怎么对应,后面讲),找出其下面的freelist往里面一个对象一个对象进行头插,每插入一个对象,相当于换回来一个切分块,所以usecount需要进行减一的操作。

当usecount减到0的时候,说明这一个span全部还回来了,这个时候我们就把span继续还给下一层的pageache,然后再把start=next插入下一个对象,直到全部插入完成。

接着来说到锁的问题,当span的usecount==0即都还回来的时候,我们先把span的删除,然后再把这个桶锁放开,因为现在span已经删除,不在链表中,所以其他线程访问不到span,这个时候加大锁去pageache中归还这个span即可。

归还完成之后放开大锁,再把桶锁加上,因为归还的每一个对象不一定来自同一个span,所以下一个对象可能是这个桶中别的span,此时还未归还完,所以要加上再桶锁,具体如何找到对应的span,后面讲。

 Pageache

Pageache的整体设计框架

 

 与threadache和centralache不同的是,Pageache的基本单位是span,而其哈希桶下标所对应的下标是表示这个哈希桶挂着的是一个几页的span,这也是方便了centralache向pageache中获取的时候直接以n页的span为目的获取。

Pageache类的设计

  1. class PageCache
  2. {
  3. public:
  4. static PageCache* GetInstance()
  5. {
  6. return &_sInst;
  7. }
  8. // 获取从对象到span的映射
  9. Span* MapObjectToSpan(void* obj);
  10. // 释放空闲span回到Pagecache,并合并相邻的span
  11. void ReleaseSpanToPageCache(Span* span);
  12. // 获取一个K页的span
  13. Span* NewSpan(size_t k);
  14. std::mutex _pageMtx;
  15. private:
  16. SpanList _spanLists[NPAGES];
  17. std::unordered_map<PAGE_ID, Span*> _idSpanMap;
  18. PageCache()
  19. {}
  20. PageCache(const PageCache&) = delete;
  21. static PageCache _sInst;
  22. };

(难点)两个关系的转化与映射

页号与地址的对应

由于我们刚开始的时候需要向系统申请一块256K的空间 ,这个空间的地址必然是连续的,既然是连续的,我们的地址其实可以看作16进制,我们把转成十进制后依然是连续的,我们除以每页的大小,就是这个页的页号,然后通过页号*1页的大小(这里是4K,4*1024),就可以找到本页的首地址。

页号与span的对应关系

这个时候重点的来了,我使用的时候拿到的是threadache给我提供的一个个对象,而这一个个对象必然是来自某些span,然后这些对象用完了会随机释放到freelist的中,freelist达到Maxsize个对象就把一些对象还回centralache中的span,现在问题是我怎么知道这些对象之前来自哪个span?

我们退一步说,我们为什么要知道这个对象来自哪个span?或者说我们为什么要建立span和页号的对应关系?原因有两个:

1.我们当时申请内存是申请一大块连续内存,为了高效,我们释放的时候也需要释放一大块连续内存,而申请上来的连续内存又被切分成了一个个span,而span是如何表示其内存的连续关系呢?是通过页号,一个span中有n页,我们通过找到最前面的页和最后面页的页号,将他们转化为地址,来找到当时申请时前后的连续空间的地址对应的span。以实现前后页的合并,最终将一大块连续内存还给操作系统。

2.当span被切分之后,就成了一个又一个的小对象,而刚开始还未使用的span中每一个对象空间地址必然是连续的,切分之后我们需要让他们找到曾经被切分的span,当usecount减到0的时候,说明所有的span中所有的切的小对象大小都回来了,这个时候可能freelist中的所有对象块不是连续的(因为回来的时间可能不是按序到达,只是来一个头插一个)这都无所谓,但是假设可以把他们排列组合,就会发现就是我们曾经申请的span的那一个地址区间。这样一个span找好了,就可以完成刚刚说的合并了。而一个对象的查找所属span,我们可以通过对象的地址,来找到其所属页,再通过页号与span的映射关系来完成。

 问题来啦,我们怎么实现页表与span的映射呢,我们可以用哈希map来实现,我们通过了解“获取一个K页的span来深度解析一下这个过程”。

获取一个K页的span

在之前我们说到,当centralache中span没有对象的时候,就会根据Maxsize()×一个对象空间的字节数除以一页的大小,计算出获取一个K页的span找pageache进行获取。

  1. Span* PageCache::NewSpan(size_t k)
  2. {
  3. assert(k > 0 && k < NPAGES);
  4. // 先检查第k个桶里面有没有span
  5. if (!_spanLists[k].Empty())
  6. {
  7. Span* kSpan = _spanLists->PopFront();
  8. for (PAGE_ID i = 0; i < kSpan->_n; ++i)
  9. {
  10. //_idSpanMap[kSpan->_pageId + i] = kSpan;
  11. _idSpanMap.set(kSpan->_pageId + i, kSpan);
  12. }
  13. return kspan;
  14. }
  15. // 检查一下后面的桶里面有没有span,如果有可以把他它进行切分
  16. for (size_t i = k+1; i < NPAGES; ++i)
  17. {
  18. if (!_spanLists[i].Empty())
  19. {
  20. Span* nSpan = _spanLists[i].PopFront();
  21. Span* kSpan = new Span;
  22. // 在nSpan的头部切一个k页下来
  23. // k页span返回
  24. // nSpan再挂到对应映射的位置
  25. kSpan->_pageId = nSpan->_pageId;
  26. kSpan->_n = k;
  27. nSpan->_pageId += k;
  28. nSpan->_n -= k;
  29. _spanLists[nSpan->_n].PushFront(nSpan);
  30. // 存储nSpan的首位页号跟nSpan映射,方便page cache回收内存时
  31. // 进行的合并查找
  32. _idSpanMap[nSpan->_pageId] = nSpan;
  33. _idSpanMap[nSpan->_pageId + nSpan->_n - 1] = nSpan;
  34. // 建立id和span的映射,方便central cache回收小块内存时,查找对应的span
  35. for (PAGE_ID i = 0; i < kSpan->_n; ++i)
  36. {
  37. _idSpanMap[kSpan->_pageId + i] = kSpan;
  38. }
  39. return kSpan;
  40. }
  41. // 走到这个位置就说明后面没有大页的span了
  42. // 这时就去找堆要一个128页的span
  43. //Span* bigSpan = new Span;
  44. Span* bigSpan = _spanPool.New();
  45. void* ptr = SystemAlloc(NPAGES - 1);
  46. bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
  47. bigSpan->_n = NPAGES - 1;
  48. _spanLists[bigSpan->_n].PushFront(bigSpan);
  49. return NewSpan(k);
  50. }

       拿到k页请求之后,先进行一下断言,看下是否符合在1-128页的span范围之内,,然后看对应K页span的桶是否有对象,如果有,就开始从这个span的开始页 建立每一页和span的映射关系,因为这个span需要去切成小对象,小对象通过自己的地址找到首页页号,这个页号因为必然建立过与span的映射,所以可以找到span。

      而如果一个span没有的话,那么我们不是直接向系统申请,而是顺着向后面的桶寻找,如果后面的某个桶中有的话,那么就从这个span中切下K页大小,然后把剩下的N页挂到第N号桶上去。并把span的页数,首页页号等进行一下修改。

切之后的span也是需要建立页号与span的映射的,但是这里不用把每一页都建立,因为如果后面使用到这个span(这里是指把这个span拿到centralache中)那个时候我们自然会建立每页与span的映射,而此时暂时用不到它,这时建立映射的目的只是为了后面实现span与span的合并,而合并的时候,我们是查找对应span相邻的页,我们只需要把这个span的首页和尾页与span建立起联系即可。

    如果后面的桶中一个对象都没用,这个时候直接找系统要一个128页的span,然后把这个大span插入到128号桶上,再重复调用一下这个函数,这个时候128页的span不为空,所以就可以切下K页,把128-K页挂到对应的桶上去。

映射函数设计

  1. Span* PageCache::MapObjectToSpan(void* obj)
  2. {
  3. PAGE_ID id = ((PAGE_ID)obj >> PAGE_SHIFT);
  4. std::unique_lock<std::mutex> lock(_pageMtx);
  5. auto ret = _idSpanMap.find(id);
  6. if (ret != _idSpanMap.end())
  7. {
  8. return ret->second;
  9. }
  10. else
  11. {
  12. assert(false);
  13. return nullptr;
  14. }
  15. }

这里是已知对象首地址,来查找span,我们先把地址转化成页号,然后再从map中查找对应页号映射的*span。并把对应的span返回,如果没有,就报错。

这里是要加锁的,因为我们在访问map的时候,其他线程可能在执行插入的操作,如果插入满了就可能会造成unordered_map的扩容,熟悉unordered_map都知道,扩容后所有数据要根据数组进行重新映射,这个时候有线程读,就会造成数据和错乱的问题,所以一定要加上锁。

模拟原来储存的形式:

 若有线程插入数据导致扩容后的储存形式:

发现此时储存形式发生了改变,为了防止一个线程正在读,另一个线程正在建立映射造成这种问题。所以说是需要加锁的。 

 pageache回收span

当一个span的usecount--到0的时候,说明span里面的小对象已经全部还回来了,这个时候我们就可以把这个span还给pageache了。

  1. void PageCache::ReleaseSpanToPageCache(Span* span)
  2. {
  3. // 大于128 page的直接还给堆
  4. if (span->_n > NPAGES-1)
  5. {
  6. void* ptr = (void*)(span->_pageId << PAGE_SHIFT);
  7. SystemFree(ptr);
  8. //delete span;
  9. _spanPool.Delete(span);
  10. return;
  11. }
  12. // 对span前后的页,尝试进行合并,缓解内存碎片问题
  13. while (1)
  14. {
  15. PAGE_ID prevId = span->_pageId - 1;
  16. auto ret = _idSpanMap.find(prevId);
  17. // 前面的页号没有,不合并了
  18. if (ret == _idSpanMap.end())
  19. {
  20. break;
  21. }
  22. // 前面相邻页的span在使用,不合并了
  23. Span* prevSpan = ret->second;
  24. if (prevSpan->_isUse == true)
  25. {
  26. break;
  27. }
  28. // 合并出超过128页的span没办法管理,不合并了
  29. if (prevSpan->_n + span->_n > NPAGES-1)
  30. {
  31. break;
  32. }
  33. span->_pageId = prevSpan->_pageId;
  34. span->_n += prevSpan->_n;
  35. _spanLists[prevSpan->_n].Erase(prevSpan);
  36. //delete prevSpan;
  37. _spanPool.Delete(prevSpan);
  38. }
  39. // 向后合并
  40. while (1)
  41. {
  42. PAGE_ID nextId = span->_pageId + span->_n;
  43. auto ret = _idSpanMap.find(nextId);
  44. if (ret == _idSpanMap.end())
  45. {
  46. break;
  47. }
  48. Span* nextSpan = ret->second;
  49. if (nextSpan->_isUse == true)
  50. {
  51. break;
  52. }
  53. if (nextSpan->_n + span->_n > NPAGES-1)
  54. {
  55. break;
  56. }
  57. span->_n += nextSpan->_n;
  58. _spanLists[nextSpan->_n].Erase(nextSpan);
  59. //delete nextSpan;
  60. _spanPool.Delete(nextSpan);
  61. }
  62. _spanLists[span->_n].PushFront(span);
  63. span->_isUse = false;
  64. _idSpanMap[span->_pageId] = span;
  65. _idSpanMap[span->_pageId+span->_n-1] = span;
  66. }

先判断这个span是否大于128,如果大于128,则直接还给堆。

我们分别向前向后进行合并页面,只要碰到一下的情况,就不再合并了:

1.前后页还没有开辟出来(没有出现再映射关系中)。

2.前后页的span正在使用中。

3.合并页数超过128,就不在往后合并了。

符合合并条件之后,把span的页数加上新合并的页数,再把之前合并前的那个span删去,并把合并后的大span插入到对应页数的哈希桶上,并把前后页加入映射关系中,方便下一次的合并。

项目优化与测试

优化一  利用定长内存池代替new

我们在前面已经实现了一个定长的内存池,我们经过测试,发现效率是要比new和delete高的,所以我们在这里把new和delete代替。

  1. //单例模式
  2. class PageCache
  3. {
  4. public:
  5. //...
  6. private:
  7. ObjectPool<Span> _spanPool;
  8. };
  9. //申请span对象
  10. Span* span = _spanPool.New();
  11. //释放span对象
  12. _spanPool.Delete(span);
  13. //通过TLS,每个线程无锁的获取自己专属的ThreadCache对象
  14. if (pTLSThreadCache == nullptr)
  15. {
  16. static std::mutex tcMtx;
  17. static ObjectPool<ThreadCache> tcPool;
  18. tcMtx.lock();
  19. pTLSThreadCache = tcPool.New();
  20. tcMtx.unlock();
  21. }
  22. //带头双向循环链表
  23. class SpanList
  24. {
  25. public:
  26. SpanList()
  27. {
  28. _head = _spanPool.New();
  29. _head->_next = _head;
  30. _head->_prev = _head;
  31. }
  32. private:
  33. Span* _head;
  34. static ObjectPool<Span> _spanPool;
  35. };

优化二 释放对象时不传对象大小

我们销毁对象的时候,必须要传入size来计算其桶号,然后找到对应的桶进行释放,其实可以不传入size,只需要在span中加入一个objsize,下次获取span时候同时在span上记录下size的值,销毁的时候只需要用对象的地址找到对应的页号,在利用页号在map中映射到对应的span,获取objsize(PageCache::MapObjectToSpan函数)即可

  1. //管理以页为单位的大块内存
  2. struct Span
  3. {
  4. PAGE_ID _pageId = 0; //大块内存起始页的页号
  5. size_t _n = 0; //页的数量
  6. Span* _next = nullptr; //双链表结构
  7. Span* _prev = nullptr;
  8. size_t _objSize = 0; //切好的小对象的大小
  9. size_t _useCount = 0; //切好的小块内存,被分配给thread cache的计数
  10. void* _freeList = nullptr; //切好的小块内存的自由链表
  11. bool _isUse = false; //是否在被使用
  12. };
  13. Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size));
  14. span->_objSize = size;
  15. static void ConcurrentFree(void* ptr)
  16. {
  17. Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr);
  18. size_t size = span->_objSize;
  19. if (size > MAX_BYTES) //大于256KB的内存释放
  20. {
  21. PageCache::GetInstance()->_pageMtx.lock();
  22. PageCache::GetInstance()->ReleaseSpanToPageCache(span);
  23. PageCache::GetInstance()->_pageMtx.unlock();
  24. }
  25. else
  26. {
  27. assert(pTLSThreadCache);
  28. pTLSThreadCache->Deallocate(ptr, size);
  29. }
  30. }

优化三 利用基数树代替unordered_map优化

通过调试发现,最消耗效率的是map中的锁,因为每次页号查找span的时候我们都需要进行加锁解锁操作耗费了大量的时间。

在这里我们需要仔细想一想,就是说我们加锁的目的之前已经说过,就是为了防止一边在读,另一边在插入导致unordered_map扩容,数据与内存不匹配的问题,要想解决这个问题,我们主要的方法就是防止扩容。

怎么才能防止扩容,其核心策略就是把所有页号个数的数组开辟出来,这样我们就引入了基数树。

基数数是直接开辟所有可能的页下标数组,是一个固定的结构,数据只有存在和不存在两种状态,而且因为页号固定属于一个span,所以不可能存在更改映射关系的可能,只能在新的span被使用的时候往里面不断写入映射,而上层使用映射来做什么的时候,使用的必定是已经建立好映射的关系。

                那么我们这个树需要开辟多大呢,在32位下,一页8K的话,我们2的32次方÷2的13次方是2的19次方,我们一个数组的一个元素需要存一个span*的指针,也就是4个字节,算下来需要开辟2的19次方*4字节=2M大小,是可以的,但是在64位,下,2的64次方÷2的13次方是2的51次方,一个指针8字节,2的51次方×8大概是2的24次方G,绝对是行不通的,所以这里我们要引入3层基数树。

 我们以32位下,一页8K为例,需要表示页号大概是要19和比特位,我们把这19个比特位进行拆分映射,第一次先拿5个比特位,然后再拿后面5个比特位进行二次映射,最后拿剩下的比特位找到对应的span指针。

其三层的好处是,我们不需要一上来就把2M的空间开辟出来,我们当用到某一个区域的地址页号映射要求的时候,我们才开辟对应的空间,这样又进一步提高了效率。

基数树代码:

  1. //二层基数树
  2. template <int BITS>
  3. class TCMalloc_PageMap2
  4. {
  5. private:
  6. static const int ROOT_BITS = 5; //第一层对应页号的前5个比特位
  7. static const int ROOT_LENGTH = 1 << ROOT_BITS; //第一层存储元素的个数
  8. static const int LEAF_BITS = BITS - ROOT_BITS; //第二层对应页号的其余比特位
  9. static const int LEAF_LENGTH = 1 << LEAF_BITS; //第二层存储元素的个数
  10. //第一层数组中存储的元素类型
  11. struct Leaf
  12. {
  13. void* values[LEAF_LENGTH];
  14. };
  15. Leaf* root_[ROOT_LENGTH]; //第一层数组
  16. public:
  17. typedef uintptr_t Number;
  18. explicit TCMalloc_PageMap2()
  19. {
  20. memset(root_, 0, sizeof(root_)); //将第一层的空间进行清理
  21. PreallocateMoreMemory(); //直接将第二层全部开辟
  22. }
  23. void* get(Number k) const
  24. {
  25. const Number i1 = k >> LEAF_BITS; //第一层对应的下标
  26. const Number i2 = k & (LEAF_LENGTH - 1); //第二层对应的下标
  27. if ((k >> BITS) > 0 || root_[i1] == NULL) //页号值不在范围或没有建立过映射
  28. {
  29. return NULL;
  30. }
  31. return root_[i1]->values[i2]; //返回该页号对应span的指针
  32. }
  33. void set(Number k, void* v)
  34. {
  35. const Number i1 = k >> LEAF_BITS; //第一层对应的下标
  36. const Number i2 = k & (LEAF_LENGTH - 1); //第二层对应的下标
  37. assert(i1 < ROOT_LENGTH);
  38. root_[i1]->values[i2] = v; //建立该页号与对应span的映射
  39. }
  40. //确保映射[start,start_n-1]页号的空间是开辟好了的
  41. bool Ensure(Number start, size_t n)
  42. {
  43. for (Number key = start; key <= start + n - 1;)
  44. {
  45. const Number i1 = key >> LEAF_BITS;
  46. if (i1 >= ROOT_LENGTH) //页号超出范围
  47. return false;
  48. if (root_[i1] == NULL) //第一层i1下标指向的空间未开辟
  49. {
  50. //开辟对应空间
  51. static ObjectPool<Leaf> leafPool;
  52. Leaf* leaf = (Leaf*)leafPool.New();
  53. memset(leaf, 0, sizeof(*leaf));
  54. root_[i1] = leaf;
  55. }
  56. key = ((key >> LEAF_BITS) + 1) << LEAF_BITS; //继续后续检查
  57. }
  58. return true;
  59. }
  60. void PreallocateMoreMemory()
  61. {
  62. Ensure(0, 1 << BITS); //将第二层的空间全部开辟好
  63. }
  64. };

测试与总结

以下是测试代码,来对比我们写的内存池与malloc和free的效率

  1. // ntimes 一轮申请和释放内存的次数
  2. // rounds 轮次
  3. void BenchmarkMalloc(size_t ntimes, size_t nworks, size_t rounds)
  4. {
  5. std::vector<std::thread> vthread(nworks);
  6. size_t malloc_costtime = 0;
  7. size_t free_costtime = 0;
  8. for (size_t k = 0; k < nworks; ++k)
  9. {
  10. vthread[k] = std::thread([&, k]() {
  11. std::vector<void*> v;
  12. v.reserve(ntimes);
  13. for (size_t j = 0; j < rounds; ++j)
  14. {
  15. size_t begin1 = clock();
  16. for (size_t i = 0; i < ntimes; i++)
  17. {
  18. //v.push_back(malloc(16));
  19. v.push_back(malloc((16 + i) % 8192 + 1));
  20. }
  21. size_t end1 = clock();
  22. size_t begin2 = clock();
  23. for (size_t i = 0; i < ntimes; i++)
  24. {
  25. free(v[i]);
  26. }
  27. size_t end2 = clock();
  28. v.clear();
  29. malloc_costtime += (end1 - begin1);
  30. free_costtime += (end2 - begin2);
  31. }
  32. });
  33. }
  34. for (auto& t : vthread)
  35. {
  36. t.join();
  37. }
  38. printf("%u个线程并发执行%u轮次,每轮次malloc %u次: 花费:%u ms\n",
  39. nworks, rounds, ntimes, malloc_costtime);
  40. printf("%u个线程并发执行%u轮次,每轮次free %u次: 花费:%u ms\n",
  41. nworks, rounds, ntimes, free_costtime);
  42. printf("%u个线程并发malloc&free %u次,总计花费:%u ms\n",
  43. nworks, nworks*rounds*ntimes, malloc_costtime + free_costtime);
  44. }
  45. // 单轮次申请释放次数 线程数 轮次
  46. void BenchmarkConcurrentMalloc(size_t ntimes, size_t nworks, size_t rounds)
  47. {
  48. std::vector<std::thread> vthread(nworks);
  49. size_t malloc_costtime = 0;
  50. size_t free_costtime = 0;
  51. for (size_t k = 0; k < nworks; ++k)
  52. {
  53. vthread[k] = std::thread([&]() {
  54. std::vector<void*> v;
  55. v.reserve(ntimes);
  56. for (size_t j = 0; j < rounds; ++j)
  57. {
  58. size_t begin1 = clock();
  59. for (size_t i = 0; i < ntimes; i++)
  60. {
  61. //v.push_back(ConcurrentAlloc(16));
  62. v.push_back(ConcurrentAlloc((16 + i) % 8192 + 1));
  63. }
  64. size_t end1 = clock();
  65. size_t begin2 = clock();
  66. for (size_t i = 0; i < ntimes; i++)
  67. {
  68. ConcurrentFree(v[i]);
  69. }
  70. size_t end2 = clock();
  71. v.clear();
  72. malloc_costtime += (end1 - begin1);
  73. free_costtime += (end2 - begin2);
  74. }
  75. });
  76. }
  77. for (auto& t : vthread)
  78. {
  79. t.join();
  80. }
  81. printf("%u个线程并发执行%u轮次,每轮次concurrent alloc %u次: 花费:%u ms\n",
  82. nworks, rounds, ntimes, malloc_costtime);
  83. printf("%u个线程并发执行%u轮次,每轮次concurrent dealloc %u次: 花费:%u ms\n",
  84. nworks, rounds, ntimes, free_costtime);
  85. printf("%u个线程并发concurrent alloc&dealloc %u次,总计花费:%u ms\n",
  86. nworks, nworks*rounds*ntimes, malloc_costtime + free_costtime);
  87. }
  88. int main()
  89. {
  90. size_t n = 10000;
  91. cout << "==========================================================" << endl;
  92. BenchmarkConcurrentMalloc(n, 4, 10);
  93. cout << endl << endl;
  94. BenchmarkMalloc(n, 4, 10);
  95. cout << "==========================================================" << endl;
  96. return 0;
  97. }

 测试中遇到的问题 

     1.总结一下测试过程中的一些问题,刚开始我们是以指定的一个固定大小进行不断的测试,发现效率很低,其根本原因是因为我们指定了固定大小,每次都往那一个桶里拿数据,释放数据,不停的加锁与解锁,影响了效率。

     2.我们没有考虑一种情况,当内存大于128K的时候,我们此时应该给它进行一步优化。

  1. //不再申请threadache,直接找pageache,让它向系统申请
  2. static void* ConcurrentAlloc(size_t size)
  3. {
  4. if (size > MAX_BYTES) //大于256KB的内存申请
  5. {
  6. //计算出对齐后需要申请的页数
  7. size_t alignSize = SizeClass::RoundUp(size);
  8. size_t kPage = alignSize >> PAGE_SHIFT;
  9. //向page cache申请kPage页的span
  10. PageCache::GetInstance()->_pageMtx.lock();
  11. Span* span = PageCache::GetInstance()->NewSpan(kPage);
  12. PageCache::GetInstance()->_pageMtx.unlock();
  13. void* ptr = (void*)(span->_pageId << PAGE_SHIFT);
  14. return ptr;
  15. }
  16. else
  17. {
  18. //通过TLS,每个线程无锁的获取自己专属的ThreadCache对象
  19. if (pTLSThreadCache == nullptr)
  20. {
  21. pTLSThreadCache = new ThreadCache;
  22. }
  23. cout << std::this_thread::get_id() << ":" << pTLSThreadCache << endl;
  24. return pTLSThreadCache->Allocate(size);
  25. }
  26. }
  27. //获取向上对齐后的字节数
  28. static inline size_t RoundUp(size_t bytes)
  29. {
  30. if (bytes <= 128)
  31. {
  32. return _RoundUp(bytes, 8);
  33. }
  34. else if (bytes <= 1024)
  35. {
  36. return _RoundUp(bytes, 16);
  37. }
  38. else if (bytes <= 8 * 1024)
  39. {
  40. return _RoundUp(bytes, 128);
  41. }
  42. else if (bytes <= 64 * 1024)
  43. {
  44. return _RoundUp(bytes, 1024);
  45. }
  46. else if (bytes <= 256 * 1024)
  47. {
  48. return _RoundUp(bytes, 8 * 1024);
  49. }
  50. else
  51. {
  52. //大于256KB的按页对齐
  53. return _RoundUp(bytes, 1 << PAGE_SHIFT);
  54. }
  55. }
  56. //pageage 直接找堆申请
  57. Span* PageCache::NewSpan(size_t k)
  58. {
  59. assert(k > 0);
  60. if (k > NPAGES - 1) //大于128页直接找堆申请
  61. {
  62. void* ptr = SystemAlloc(k);
  63. Span* span = new Span;
  64. span->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
  65. span->_n = k;
  66. //建立页号与span之间的映射
  67. _idSpanMap[span->_pageId] = span;
  68. return span;
  69. }
  70. }
  71. //释放
  72. static void ConcurrentFree(void* ptr, size_t size)
  73. {
  74. if (size > MAX_BYTES) //大于256KB的内存释放
  75. {
  76. Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr);
  77. PageCache::GetInstance()->_pageMtx.lock();
  78. PageCache::GetInstance()->ReleaseSpanToPageCache(span);
  79. PageCache::GetInstance()->_pageMtx.unlock();
  80. }
  81. //....
  82. }
  83. void PageCache::ReleaseSpanToPageCache(Span* span)
  84. {
  85. // 大于128 page的直接还给堆
  86. if (span->_n > NPAGES-1)
  87. {
  88. void* ptr = (void*)(span->_pageId << PAGE_SHIFT);
  89. SystemFree(ptr);
  90. //delete span;
  91. _spanPool.Delete(span);
  92. return;
  93. }
  94. inline static void SystemFree(void* ptr)
  95. {
  96. #ifdef _WIN32
  97. VirtualFree(ptr, 0, MEM_RELEASE);
  98. #else
  99. // sbrk unmmap等
  100. #endif
  101. }

我们可以看到,经过上面10轮的申请释放内存,我们写的内存池要比free和malloc的效率更加高。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/空白诗007/article/detail/907841
推荐阅读
相关标签
  

闽ICP备14008679号