当前位置:   article > 正文

Redis学习笔记-TCP连接处理器acceptTcpHandler

accepttcphandler

Redis在初始化阶段会为每个监听fd注册一个连接处理器,当AeEvent多路io复用模块检测到监听socket收到数据之后就会调用这个处理器–acceptTcpHandler。

void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
    char cip[REDIS_IP_STR_LEN];
    REDIS_NOTUSED(el);
    REDIS_NOTUSED(mask);
    REDIS_NOTUSED(privdata);

    while(max--) {
        // accept 客户端连接
        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
        if (cfd == ANET_ERR) {
            if (errno != EWOULDBLOCK)
                redisLog(REDIS_WARNING,
                    "Accepting client connection: %s", server.neterr);
            return;
        }
        redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);
        // 为客户端创建客户端状态(redisClient)
        acceptCommonHandler(cfd,0);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

处理逻辑很简单,就是调用accept函数接收一个TCP链接。
初次看代码的时候有个地方比较疑惑,就是这个函数是在一个while循环里边不断调用anetTcpAccept函数,正常不是调一次就可以了吗?猜测这里是为了处理多个连接同时到达的场景,比如有十个连接同时到达等待应用层调用accept,这样通过一个while循环就可以依次处理这十个连接,直到返回一个ANET_ERR错误。
一次调用最多可以处理MAX_ACCEPTS_PER_CALL(1000)个连接。
anetTcpAccept函数会返回一个fd,一个socket,之后Redis会把这个socket封装到一个client结构体中,并为这个fd,或者说这个socket,或者说这个client注册一个读回调函数—readQueryFromClient。这样当下次这个client端往Redis服务器发送数据的时候,数据就会被内核拷贝到这个socket的输入缓冲区,然后aeEvent模块监听到这个事件就会调用readQueryFromClient函数。

static void acceptCommonHandler(int fd, int flags) 
{

    // 创建客户端
    redisClient *c;
    if ((c = createClient(fd)) == NULL) {
        redisLog(REDIS_WARNING,
            "Error registering fd event for the new client: %s (fd=%d)",
            strerror(errno),fd);
        close(fd); /* May be already closed, just ignore errors */
        return;
    }
......
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
redisClient *createClient(int fd) 
{

    // 分配空间
    redisClient *c = zmalloc(sizeof(redisClient));

    if (fd != -1) {
        // 非阻塞
        anetNonBlock(NULL,fd);
        // 禁用 Nagle 算法
        anetEnableTcpNoDelay(NULL,fd);
        // 设置 keep alive
        if (server.tcpkeepalive)
            anetKeepAlive(NULL,fd,server.tcpkeepalive);
        // 绑定读事件到事件 loop (开始接收命令请求)
        if (aeCreateFileEvent(server.el,fd,AE_READABLE,
            readQueryFromClient, c) == AE_ERR)
        {
            close(fd);
            zfree(c);
            return NULL;
        }
    }
    ......
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号