赞
踩
http://blog.csdn.net/john_zzl/article/category/1231787
#define AE_SETSIZE (1024*10) /* Max number of fd supported */
redis的网络模型处理的fd必须小于2048(在events结构体中放不下);
/* State of an event based program */
typedef struct aeEventLoop {
int maxfd;
long long timeEventNextId;
aeFileEvent events[AE_SETSIZE]; /* Registered events */
aeFiredEvent fired[AE_SETSIZE]; /* Fired events */
aeTimeEvent *timeEventHead;
int stop;
void *apidata; /* This is used for polling API specific data */
aeBeforeSleepProc *beforesleep;
} aeEventLoop;
/* File event structure */
typedef struct aeFileEvent {
int mask; /* one of AE_(READABLE|WRITABLE) */
aeFileProc *rfileProc;
aeFileProc *wfileProc;
void *clientData;
} aeFileEvent;
typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);
/* A fired event */
typedef struct aeFiredEvent {
int fd;
int mask;
} aeFiredEvent;
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData);
void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask);
将fd交给eventloop托管,直接通过aeEventLoop.events[fd]找到托管aeFileEvent;
每个fd关联一个mask(托管事件readable or writable),读函数(可读时调用)和写函数(可写时调用),以及clientData(传给读写函数);
aeFireEvent类似epoll的epoll_event;
首先,通过epoll_wait拿到events,再转存到fired数组中;
通过fired数组中的fd找到托管的aeFileEvent,获得读函数、写函数及clientData,然后,根据fired数组中的mask调用读写函数;
之所以引入aeFireEvent,是因为redis的网络模型不仅仅支持epoll,还支持select和kqueue,需要一个中间抽象层;
除了托管fd外,还支持timer,timer都存储在链表timeEventHead,每轮poll后,会check一下timer;
aeCreateFileEvent(server.el,server.ipfd,AE_READABLE,acceptTcpHandler,NULL);
acceptTcpHandler每accept到一个clientfd,会为该fd调用createClient(有maxclients限制可配);
redisClient *createClient(int fd)
1、nonblock&tcpnodelay设置;
2、aeCreateFileEvent(server.el,fd,AE_READABLE,readQueryFromClient, c)将fd交给网络模型托管,当readable时,调用readQueryFromClient处理;
3、初始化;
4、将c加入server.clients链表;
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask)
读数据到redisClient::querybuf;
读到数据超过client_max_querybuf_len时,直接freeClient(c);
processInputBuffer(c);
1、输入以*开头,则reqtype为REDIS_REQ_MULTIBULK,调用processMultibulkBuffer(c);
2、否则,reqtype为REDIS_REQ_INLINE,调用processInlineBuffer(c);
3、调用processCommand(c);
int processInlineBuffer(redisClient *c)
strstr(c->querybuf,"\r\n")找行尾;
argv = sdssplitlen(c->querybuf,querylen," ",1,&argc),按空格分隔每个参数;
更新c->querybuf;
根据argv设置c->argv;
int processMultibulkBuffer(redisClient *c)
解析请求,请求格式如下:
*multibulklen\r\n
$bulklen\r\n
..........\r\n
$bulklen\r\n
..........\r\n
multibulklen指定有多少个bulk,bulklen指定每个bulk的长度;
multibulklen范围[0, 1024*1024];
bulklen范围[0,512*1024*1024];
更新c->querybuf;
根据argv设置c->argv;
int processCommand(redisClient *c)
处理client发送的命令;
bio开线程来执行close和fdatasync操作;
采用的是job list方式(每个线程一个job list,线程从job list取job执行),也就是生产者-消费者模型;
aeCreateFileEvent(server.el, server.io_ready_pipe_read, AE_READABLE, vmThreadedIOCompletedJob, NULL)
主线程和后台线程通过pipe进行通信:
后台线程每处理完一个job,就会给server.io_ready_pipe_write发送一个字节数据;
主线程监听server.io_ready_pipe_read,当有数据可读时,表明有job处理完成,调用vmThreadedIOCompletedJob处理;
int vmSwapObjectThreaded(robj *key, robj *val, redisDb *db)
提交一个job给后台线程,将val交换到磁盘;
job类型为REDIS_IOJOB_PREPARE_SWAP;
val的storage更新为REDIS_VM_SWAPPING;
调用queueIOJob将job append到server.io_newjobs队列,后台线程将从这个队列取job执行;
queueIOJob时,如果server.io_active_threads < server.vm_max_threads,将会调用spawnIOThread(),创建后台工作线程;
void *IOThreadEntryPoint(void *arg)
后台工作线程函数;
从server.io_newjobs队首取下job,将其放入server.io_processing队尾,表示当前job正在处理;
处理job:
REDIS_IOJOB_LOAD,vmReadObjectFromSwap将val从文件读入;
REDIS_IOJOB_PREPARE_SWAP,开启swap过程,本次job计算存储val需要多少page,本次job成功后,转入REDIS_IOJOB_DO_SWAP类型job完成swap;
REDIS_IOJOB_DO_SWAP,vmWriteObjectOnSwap将val存储到文件;
job独立完后,将job从server.io_processing队列取下,放入server.io_processed队尾,然后通过pipe(server.io_ready_pipe_write)给主线程发送一个字节数据("x")以通知主线程调用vmThreadedIOCompletedJob;
void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata, int mask)
主线程收到server.io_ready_pipe_read可读事件时调用;
读一个字节;
从server.io_processed队首取下一个已完成job;
如果job被cancel,直接忽略;
job类型为REDIS_IOJOB_LOAD:
表明val从磁盘读入到内存;
释放其占用的pages;
将value从vmpointer替换为object;
处理所有阻塞在该key上面的client,如果client需要的所有key都内存就绪,将其加入server.io_ready_clients队列;
job类型为REDIS_IOJOB_PREPARE_SWAP:
保存val需要的page数目已经计算完毕;
如果不能swapout或者vmFindContiguousPages(&j->page,j->pages)失败,则swap过程失败,终止swap过程,将storage设置为REDIS_VM_MEMORY;
如果能够swapout且找到了块存储val,则调用vmMarkPagesUsed(j->page,j->pages)标记这些page已经被占用,将job类型更新为REDIS_IOJOB_DO_SWAP,调用queueIOJob提交job;
job类型为REDIS_IOJOB_DO_SWAP:
表明val已经成功存储到磁盘;
将value从object替换为vmpointer,释放object;
本文剖析redis的ziplist的实现。
ziplist是一个存储高效的双链表,存储的元素类型有字符串和整数;虽然存储高效,但每次插入或删除ziplist中的元素都会引起重新分配内存,所以,ziplist作为大型只读表非常高效,频繁的插入或删除ziplist不太合适;
ziplist的内部结构:
<zlbytes><zltail><zllen><entry><entry><zlend>
entry内部结构:
<prelen><curlen><body>
prelen有两种存储方式:
curlen有六种存储方式:
元素为字符串类型:
- 字符串长度小于或等于0xFFFFFF(63),用一个字节表示,|00pppppp|,00是type,pppppp是值;
- 字符串长度大于0xFFFFFF(63)小于或等于0xFFFFFFFFFFFFFF(16383),用两个字节表示,|01pppppp|qqqqqqqq|,01是type,ppppppqqqqqqqq是值;
- 字符串长度大于0xFFFFFFFFFFFFFF(16383),用五个字节表示,|10______|qqqqqqqq|rrrrrrrr|ssssssss|tttttttt|,10是type,后面四个字节是值;
元素为整数类型:
- 一个字节表示,|1100____|表示整数用int16_t存储,隐含值为2;
- 一个字节表示,|1101____|表示整数用int32_t存储,隐含值为4;
- 一个字节表示,|1110____|表示整数用int64_t存储,隐含值为8;
因为ziplist是紧凑的存储方式,所有东西都存放在连续的内存中,所以,插入删除元素特别费劲,需要重新分配内存;
插入元素:
对字符串元素尝试压缩为整数;
需要的内存量:lensize(prevlen) + lensize(curlen) + curlen + nextdiff;
对于下一个entry来说,前一个entry发生变化,需要更新prelen;
因为prelen采用变长存储,所以lensize(prelen)可能发生变动,这样会导致下一个entry自身的大小也发生变化;
这个更新可能会继续下去直到链尾或者收敛;
nextdiff的含义就是下一个entry的大小变化情况,值为:<调整后大小> - <调整前大小>;
当nextdiff为0时,调整收敛;
删除元素:
元素被删除,需要move后续元素以保持内存紧凑;
对被删除元素的下一个entry来说,可能需要更新prelen,这个更新也是级联的;
元素的插入和删除都需要更新zltail;
zipmap是用连续内存保存key,value对的结构;
因为是连续内存保存的,所以每次插入或删除操作都可能会导致重新分配内存;
为了缓解重新分配内存压力,为每个value保留一个free字段,表明可用空闲字节数(4);
存储结构:
<zmlen><len>"foo"<len><free>"bar"<len>"hello"<len><free>"world"<ZIPMAP_END>
每次查询key对应的value,都得遍历zipmap;
插入、删除元素也得move及重新分配内存;
当插入元素存在且既有内存符合要求(放得下且不会浪费太多)时,不需要重新分配内存;
intset结构体:
typedef struct intset {
uint32_t encoding;
uint32_t length;
int8_t contents[];
} intset;
encoding保存编码方式:INTSET_ENC_INT16、INTSET_ENC_INT32和INTSET_ENC_INT64;
length保存元素个数;
contents保存实际数组,int16_t[]、int32_t[]或int64_t[];
元素是有序保存的数组;
元素插入:
当插入的元素不能用intset->encoding方式保存时,也就是说新元素超出intset->encoding所能表示的范围,则需要升级intset->encoding到新元素对应的编码方式;
其他情况,直接二分查找到要插入的位置,resize && move && set;
元素删除:
首先,编码方式过滤;
二分查找元素位置,move && resize;
redis的dict是自动rehash的hash表,为了平衡性能,rehash不是一次做完的,而是分散到每次交互操作来做;
typedef struct dictEntry {
} dict;
dict包含两个hash表,当rehash时,逐步将ht[0]中元素挪至ht[1],全部挪完后,ht[0]=ht[1],结束rehash;
int dictRehash(dict *d, int n)
将ht[0]的n个非空桶的元素rehash到ht[1],更新rehashidx;
如果所有元素都已经rehash,则ht[0]=ht[1],reset(ht[1]),设置rehashidx为-1;
触发_dictRehashStep(在没有iterator的时候,挪元素)的操作有:dictAdd、dictReplace、dictGenericDelete、dictDelete、dictDeleteNoFree、dictFind;
dict *dictCreate(dictType *type, void *privDataPtr); // 创建dict
int dictExpand(dict *d, unsigned long size); // 当ht[0].table为NULL时,创建hashtable;其他时候,创建ht[1],设置rehashidx为0,开始rehash;
int dictAdd(dict *d, void *key, void *val);
判断是否需要rehash,触发条件为:元素个数大于或等于桶个数且设置了可以rehash,或者元素个数是桶个数的5倍以上;
从ht[0]中查询是否有key存在,如果在rehash过程中,另需判断key是否在ht[1]中存在,如果存在,则添加失败;
如果在rehash过程中,将元素添加到ht[1],否则,添加到ht[0];
int dictReplace(dict *d, void *key, void *val);
先调用dictAdd,如果成功,直接返回;
失败则表明,key已经存在,调用dictFind获得dictEntry,将dictEntry->val替换掉;
static int dictGenericDelete(dict *d, const void *key, int nofree);
先尝试从ht[0]中删除key元素;
若ht[0]中没有key元素且在rehash过程中,则尝试从ht[1]中删除元素key;
void dictRelease(dict *d); // 释放dict
dictEntry * dictFind(dict *d, const void *key);
先从ht[0]找key,找到直接返回;
若ht[0]中没找到key元素且在rehash过程中,则尝试从ht[1]中找key;
dictIterator *dictGetSafeIterator(dict *d); // 创建迭代器
dictEntry *dictNext(dictIterator *iter); // 迭代元素
与普通hash表迭代器区别在于,如果dict处于rehash过程中,迭代完ht[0]后,会继续迭代ht[1];
在有迭代器迭代dict时,是不允许从ht[0]挪元素到ht[1]的;
dictEntry *dictGetRandomKey(dict *d);
从dict中随机获取一个元素;
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。