赞
踩
Redis在初始化阶段会为每个监听fd注册一个连接处理器,当AeEvent多路io复用模块检测到监听socket收到数据之后就会调用这个处理器–acceptTcpHandler。
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { int cport, cfd, max = MAX_ACCEPTS_PER_CALL; char cip[REDIS_IP_STR_LEN]; REDIS_NOTUSED(el); REDIS_NOTUSED(mask); REDIS_NOTUSED(privdata); while(max--) { // accept 客户端连接 cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport); if (cfd == ANET_ERR) { if (errno != EWOULDBLOCK) redisLog(REDIS_WARNING, "Accepting client connection: %s", server.neterr); return; } redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport); // 为客户端创建客户端状态(redisClient) acceptCommonHandler(cfd,0); } }
处理逻辑很简单,就是调用accept函数接收一个TCP链接。
初次看代码的时候有个地方比较疑惑,就是这个函数是在一个while循环里边不断调用anetTcpAccept函数,正常不是调一次就可以了吗?猜测这里是为了处理多个连接同时到达的场景,比如有十个连接同时到达等待应用层调用accept,这样通过一个while循环就可以依次处理这十个连接,直到返回一个ANET_ERR错误。
一次调用最多可以处理MAX_ACCEPTS_PER_CALL(1000)个连接。
anetTcpAccept函数会返回一个fd,一个socket,之后Redis会把这个socket封装到一个client结构体中,并为这个fd,或者说这个socket,或者说这个client注册一个读回调函数—readQueryFromClient。这样当下次这个client端往Redis服务器发送数据的时候,数据就会被内核拷贝到这个socket的输入缓冲区,然后aeEvent模块监听到这个事件就会调用readQueryFromClient函数。
static void acceptCommonHandler(int fd, int flags)
{
// 创建客户端
redisClient *c;
if ((c = createClient(fd)) == NULL) {
redisLog(REDIS_WARNING,
"Error registering fd event for the new client: %s (fd=%d)",
strerror(errno),fd);
close(fd); /* May be already closed, just ignore errors */
return;
}
......
}
redisClient *createClient(int fd) { // 分配空间 redisClient *c = zmalloc(sizeof(redisClient)); if (fd != -1) { // 非阻塞 anetNonBlock(NULL,fd); // 禁用 Nagle 算法 anetEnableTcpNoDelay(NULL,fd); // 设置 keep alive if (server.tcpkeepalive) anetKeepAlive(NULL,fd,server.tcpkeepalive); // 绑定读事件到事件 loop (开始接收命令请求) if (aeCreateFileEvent(server.el,fd,AE_READABLE, readQueryFromClient, c) == AE_ERR) { close(fd); zfree(c); return NULL; } } ...... }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。