赞
踩
前两篇介绍了redis的初始化过程,以及事件循环。本篇来看一下客户端的连接建立与请求处理。
- for (j = 0; j < server.ipfd_count; j++) {
- if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
- acceptTcpHandler,NULL) == AE_ERR)
- {
- redisPanic(
- "Unrecoverable error creating server.ipfd file event.");
- }
- }
监听socket的读事件就是有客户端连接请求过来,对应的事件处理函数是acceptTcpHandler,这个函数就是用于处理客户端连接建立。函数如下:
- void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
- int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
- char cip[REDIS_IP_STR_LEN];
- REDIS_NOTUSED(el);
- REDIS_NOTUSED(mask);
- REDIS_NOTUSED(privdata);
-
- while(max--) {
- cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
- if (cfd == ANET_ERR) {
- if (errno != EWOULDBLOCK)
- redisLog(REDIS_WARNING,
- "Accepting client connection: %s", server.neterr);
- return;
- }
- redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);
- acceptCommonHandler(cfd,0);
- }
- }
当这个函数被调用时(对应的监听socket的读事件发生),已经有客户端完成三次握手建立连接。函数anetTcpAccept用于accept客户端的连接,其返回值是客户端对应的socket。然后,会调用acceptCommonHandler对连接以及客户端进行初始化。这部分逻辑是在一个while循环中,最多迭代MAX_ACCEPTS_PER_CALL(1000)次,也就是说每次事件循环最多可以处理1000个客户端的连接。
- static void acceptCommonHandler(int fd, int flags) {
- redisClient *c;
- if ((c = createClient(fd)) == NULL) {
- redisLog(REDIS_WARNING,
- "Error registering fd event for the new client: %s (fd=%d)",
- strerror(errno),fd);
- close(fd); /* May be already closed, just ignore errors */
- return;
- }
- /* If maxclient directive is set and this is one client more... close the
- * connection. Note that we create the client instead to check before
- * for this condition, since now the socket is already set in non-blocking
- * mode and we can send an error for free using the Kernel I/O */
- if (listLength(server.clients) > server.maxclients) {
- char *err = "-ERR max number of clients reached\r\n";
-
- /* That's a best effort error message, don't check write errors */
- if (write(c->fd,err,strlen(err)) == -1) {
- /* Nothing to do, Just to avoid the warning... */
- }
- server.stat_rejected_conn++;
- freeClient(c);
- return;
- }
- server.stat_numconnections++;
- c->flags |= flags;
- }
这个函数主要调用createClient初始化客户端相关数据结构以及对应的socket,初始化后会判断当前连接的客户端是否超过最大值,如果超过的话,会拒绝这次连接。否则,更新客户端连接数的计数。
- if (fd != -1) {
- anetNonBlock(NULL,fd);
- // <MM>
- // 关闭Nagle算法,提升响应速度
- // </MM>
- anetEnableTcpNoDelay(NULL,fd);
- if (server.tcpkeepalive)
- anetKeepAlive(NULL,fd,server.tcpkeepalive);
- if (aeCreateFileEvent(server.el,fd,AE_READABLE,
- readQueryFromClient, c) == AE_ERR)
- {
- close(fd);
- zfree(c);
- return NULL;
- }
- }
- server.current_client = c;
- readlen = REDIS_IOBUF_LEN;
这段代码重新设置读取数据的大小,避免频繁拷贝数据。如果当前请求是一个multi bulk类型的,并且要处理的bulk的大小大于REDIS_MBULK_BIG_ARG(32KB),则将读取数据大小设置为该bulk剩余数据的大小。
- /* If this is a multi bulk request, and we are processing a bulk reply
- * that is large enough, try to maximize the probability that the query
- * buffer contains exactly the SDS string representing the object, even
- * at the risk of requiring more read(2) calls. This way the function
- * processMultiB
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。