当前位置:   article > 正文

Redis 源码分析 I/O 模型详解_redisio模型源码

redisio模型源码

主流 I/O 模型

阻塞IO、非阻塞IO、异步 IO 。

BIO 模型

同步阻塞 模型,一个客户单对应一个链接的处理线程

缺点:

1、IO 中如果进行 read 是阻塞操作,如果请求的链接操作不做任何操作,也会导致线程阻塞,浪费线程资源

2、如果线程很多,会导致服务器压力增加,比如 C10K问题

引用场景:

BIO 方式运用数目比较小且固定的架构,这种方式对服务器资源要求比较高,但是程序简单容易理解。

NIO 模型

同步非阻塞,是服务器实现的模式是一个线程可以处理多个请求(链接),客户端发送的链接都会注册到多路复用器 selector 上,多路复用器轮训到介入的所有 IO 请求进行处理。

应用场景:

NIO方式适用于链接数目多(轻操作) 的架构,比如聊天服务器,弹幕系统,服务器间通讯,编程比较复杂。Java NIO 模型如下图所示:

总结:

NIO 的三大核心组件:Channel(通道)、Buffer (缓冲区)、Selector (多路复用器)

1、Channel 类似流,每个 Channel 对应一个 buffer 缓冲区。

2、Channel 组册到 Selector 上,由 Selecotor 根据 Channel 读写事件发生时交给空闲线程处理。

3、NIO 中 Buffer 与 Channel 都是可读可写的。

NIO 模型实现

在 linux 系统中是通过调用系统内核函数来创建 socket ,selecotor 对应操作系统的 epoll 描述符。可以将 socket 的连接文件描述符绑定到 epoll 文件描述符上,进行事件的异步通知,实现一个线程处理,并且减少大量的无效遍历,事件处理交给了操作系统的内核,提升效率。

Redis 线程模型

Redis 是一个典型的基于 epoll 的 nio 线程模型, epoll 实例手机所有的事件(连接与读事件)由一个服务线程处理所有命令。

Redis 底层相关的 epoll 的源码实现在 src/ae_epoll.c 文件中。

AIO 模型

异步非阻塞、由于操作系统完成后回调通知程序启动线程去处理,一般适用于链接较多且链接时间较长的应用。

应用场景:

AIO 方式适用于链接数目多且比较长(重操作),比如设备每间隔 2秒上报状态。

三种 I/O 模型对比

BIO

NIO

AIO

IO模型

同步阻塞

同步非阻塞(多路复用)

异步非阻塞

编程难度

简单

复杂

复杂

可靠性

吞吐量

Redis 线程模型

1、交互模型

2、Reactor 模型

处理流程:

  • 主线程往 epoll 内核事件表注册 socket 上的读事件。
  • 主线程调用 epoll_wait 等待 socket 上数据可读。
  • 当 socket 可读的时候 ,epoll_wait 通知主线程,主线程则将 socket 可读事件放入请求队列。
  • 睡眠在请求队列上的某个工作线程被唤醒,他从 socket 读取数据,并且处理用户请求,然后往 epoll 内核事件表中注册 socket 写就绪事件。
  • 主线程 epoll_wart 等待 socket 可写
  • 当 socket 可写时, epoll_wait 通知主线程。主线程将 socket 可写事件放入请求队列。
  • 睡眠在请求队列中的某个线程被唤醒,他往 socket 上写服务端处理客户端请求的结果。

优点和缺点:

  • 优点
    • 响应快,不必为单个同步操作阻塞,也不用考虑 fd 跨线程问题。
    • 可拓展性,可以很方便的通过 reactor 实例(如 multi reactor)个数来利用 cpu 资源;
    • 可复用性,reacotor 本身与具体事件处理逻辑无关,方便复用。
  • 缺点
    • 共享同一个 reactor 时,若出现较长的读写,会影响该 reactor 的响应时间,此时可以考虑 thread-per-connection

3、Reactor 模型示例

服务端(基于 netty):

  1. // 基于 Java 代码为例
  2. EventLoopGroup bossGroup = new NioEventLoopGroup();
  3. EventLoopGroup workerGroup = new NioEventLoopGroup();
  4. try {
  5. ServerBootstrap b = new ServerBootstrap();
  6. b.group(bossGroup, workerGroup)
  7. .channel(NioServerSocketChannel.class)
  8. .option(ChannelOption.SO_BACKLOG, 4096)
  9. .childHandler(new JkvServerInitalizer());
  10. ChannelFuture f = b.bind(SERVER_PORT).sync();
  11. f.channel().closeFuture().sync();
  12. } finally {
  13. bossGroup.shutdownGracefully();
  14. workerGroup.shutdownGracefully();
  15. }

客户端(基于 netty):

  1. EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
  2. try {
  3. Bootstrap bootstrap = new Bootstrap();
  4. bootstrap.group(eventLoopGroup)
  5. .channel(NioSocketChannel.class)
  6. .handler(new MyChatClientInitializer());
  7. Channel channel = bootstrap.connect("localhost",SERVER_PORT).sync().channel();
  8. BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
  9. for (;;) {
  10. channel.writeAndFlush(br.readLine() + "\r\n");
  11. }
  12. }finally {
  13. eventLoopGroup.shutdownGracefully();
  14. }

Redis 的网络模型

Redis 采用的是单线程 Reactor。单机压测QPS可以达到10w , 因为 Redis 主要是以内存读写为主,效率是非常高的。

Redis 服务器是一个事件驱动的程序,服务器需要处理一下两类事件:

1、文件事件(file event): Redis 服务器通过套接字与客户端(或者其他 Redis 服务器)进行连接,而文件事件就是服务器对套接字操作的抽。服务器与客户端(或者其他服务器)的通讯都会产生相应的文件事件,而服务器则通过监听并且处理这些事件来完成一些列网络通讯操作

2、 事件事件(time event): Redis 服务器中国呢的一些操作(比如 serverCron 函数)需要在给定的事件点执行,而时间事件就是服务器对着咧定时操作的抽象。

文件事件

Redis基于Reactor模式开发了自己的网络事件处理器:这个处理器被称为文件事件处理器(file event handler)

  • 文件事件处理器使用I/O多路复用(multiplexing)程序来同时监听多个套接字,并根据套接字目前执行的任务来为套接字关联不同的事件处理器。
  • 当被监听的套接字准备好执行连接应答(accept)、读取(read)、写入(write)、关闭(close)等操作时,与操作相对应的文件事件就会产生,这时文件事件处理器就会调用套接字之前关联好的事件处理器来处理这些事件。

文件事件构成,文件事件处理器的4个部分: 套接字、 I/O 多路复用程序、文件事件派发器(dispatcher)、以及事件处理器。

多路复用器, 的所有功能都是通过包装常见的 select、epoll 、evport 和 kququee 这些 i/o 多路复用函数库来实现了,每个 i/o 多路复用器在 redis 中都对应一个单独的文件比如:src\ae_epoll.c、src\ac_evport.c、src\ac_kqueue.c、src\ac_select.c 等。

因为 Redis 每个 I/O 多路复用函数库都实现了相同的 API , 所以 I/O 多路复用程序的底层实现是可以互换的。

Redis 在 I/O 多路复用程序实现源码中通过 #include 宏定义了相应的谷子额,程序会在编译期间自动选择系统中性能最高的 I/O 多路复用函数库来作为 Redis 的 I/O多路复用程序的底层实现:

  1. /* Include the best multiplexing layer supported by this system.
  2. * The following should be ordered by performances, descending. */
  3. #ifdef HAVE_EVPORT
  4. #include "ae_evport.c"
  5. #else
  6. #ifdef HAVE_EPOLL
  7. #include "ae_epoll.c"
  8. #else
  9. #ifdef HAVE_KQUEUE
  10. #include "ae_kqueue.c"
  11. #else
  12. #include "ae_select.c"
  13. #endif
  14. #endif
  15. #endif

事件的类型

I/O 多路复用程序可以监听多个套接字的 ae.h/AE_READABLE 事件和 ae.h/AE_WRITABLE 事件,这两类事件和套接字操作之间的对应关系如下:

  • 当套接字变得可读时(客户端对套接字执行 write 操作,或者执行 close 操作),或者有新的可应答(acceptable)套接字出现时(客户端对服务器的监听套接字执行connect操作),套接字产生 AE_READABLE 事件。
  • 当套接字变得可写时(客户端对套接字执行 read 操作),套接字产生AE_WRITABLE事件。

如果套接字同时可读可写,那么服务器先读套接字,后写套接字。

文件事件处理器

1、连接应答处理器


networking.c/acceptTcpHandler函数是Redis的连接应答处理器,这个处理器用于对连接服务器监听套接字的客户端进行应答,具体实现为 sys/socket.h/accept 函数的包装。

  1. void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
  2. int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
  3. char cip[NET_IP_STR_LEN];
  4. UNUSED(el);
  5. UNUSED(mask);
  6. UNUSED(privdata);
  7. while(max--) {
  8. cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
  9. if (cfd == ANET_ERR) {
  10. if (errno != EWOULDBLOCK)
  11. serverLog(LL_WARNING,
  12. "Accepting client connection: %s", server.neterr);
  13. return;
  14. }
  15. anetCloexec(cfd);
  16. serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
  17. acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
  18. }
  19. }

2、命令请求处理器


networking.c/readQueryFromClient 函数是Redis的命令请求处理器,这个处理器负责从套接字中读入客户端发送的命令请求内容,具体实现为 unistd.h/read 函数的包装。

  1. void readQueryFromClient(connection *conn) {
  2. client *c = connGetPrivateData(conn);
  3. int nread, readlen;
  4. size_t qblen;
  5. /* Check if we want to read from the client later when exiting from
  6. * the event loop. This is the case if threaded I/O is enabled. */
  7. if (postponeClientRead(c)) return;
  8. /* Update total number of reads on server */
  9. atomicIncr(server.stat_total_reads_processed, 1);
  10. readlen = PROTO_IOBUF_LEN;
  11. /* If this is a multi bulk request, and we are processing a bulk reply
  12. * that is large enough, try to maximize the probability that the query
  13. * buffer contains exactly the SDS string representing the object, even
  14. * at the risk of requiring more read(2) calls. This way the function
  15. * processMultiBulkBuffer() can avoid copying buffers to create the
  16. * Redis Object representing the argument. */
  17. if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
  18. && c->bulklen >= PROTO_MBULK_BIG_ARG)
  19. {
  20. ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf);
  21. /* Note that the 'remaining' variable may be zero in some edge case,
  22. * for example once we resume a blocked client after CLIENT PAUSE. */
  23. if (remaining > 0 && remaining < readlen) readlen = remaining;
  24. }
  25. qblen = sdslen(c->querybuf);
  26. if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
  27. c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
  28. nread = connRead(c->conn, c->querybuf+qblen, readlen);
  29. if (nread == -1) {
  30. if (connGetState(conn) == CONN_STATE_CONNECTED) {
  31. return;
  32. } else {
  33. serverLog(LL_VERBOSE, "Reading from client: %s",connGetLastError(c->conn));
  34. freeClientAsync(c);
  35. return;
  36. }
  37. } else if (nread == 0) {
  38. serverLog(LL_VERBOSE, "Client closed connection");
  39. freeClientAsync(c);
  40. return;
  41. } else if (c->flags & CLIENT_MASTER) {
  42. /* Append the query buffer to the pending (not applied) buffer
  43. * of the master. We'll use this buffer later in order to have a
  44. * copy of the string applied by the last command executed. */
  45. c->pending_querybuf = sdscatlen(c->pending_querybuf,
  46. c->querybuf+qblen,nread);
  47. }
  48. sdsIncrLen(c->querybuf,nread);
  49. c->lastinteraction = server.unixtime;
  50. if (c->flags & CLIENT_MASTER) c->read_reploff += nread;
  51. atomicIncr(server.stat_net_input_bytes, nread);
  52. if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
  53. sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();
  54. bytes = sdscatrepr(bytes,c->querybuf,64);
  55. serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
  56. sdsfree(ci);
  57. sdsfree(bytes);
  58. freeClientAsync(c);
  59. return;
  60. }
  61. /* There is more data in the client input buffer, continue parsing it
  62. * in case to check if there is a full command to execute. */
  63. processInputBuffer(c);
  64. }

3、命令回复处理器


networking.c/sendReplyToClient 函数是Redis的命令回复处理器,这个处理器负责将服务器执行命令后得到的命令回复通过套接字返回给客户端,具体实现为unistd.h/write 函数的包装。

  1. /* Write event handler. Just send data to the client. */
  2. void sendReplyToClient(connection *conn) {
  3. client *c = connGetPrivateData(conn);
  4. writeToClient(c,1);
  5. }

定时事件

实际上redis支持的是周期任务事件,即执行完之后不会删除,而是在重新插入链表。

定时器采用链表的方式进行管理,新定时任务插入链表表头。

  1. if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
  2. serverPanic("Can't create event loop timers.");
  3. exit(1);
  4. }

具体定时事件处理如下:

  1. /* Process time events */
  2. static int processTimeEvents(aeEventLoop *eventLoop) {
  3. int processed = 0;
  4. aeTimeEvent *te;
  5. long long maxId;
  6. te = eventLoop->timeEventHead;
  7. maxId = eventLoop->timeEventNextId-1;
  8. monotime now = getMonotonicUs();
  9. //删除定时器
  10. while(te) {
  11. long long id;
  12. /* Remove events scheduled for deletion. */
  13. // 下一轮中事件进行删除
  14. if (te->id == AE_DELETED_EVENT_ID) {
  15. aeTimeEvent *next = te->next;
  16. /* If a reference exists for this timer event,
  17. * don't free it. This is currently incremented
  18. * for recursive timerProc calls */
  19. if (te->refcount) {
  20. te = next;
  21. continue;
  22. }
  23. if (te->prev)
  24. te->prev->next = te->next;
  25. else
  26. eventLoop->timeEventHead = te->next;
  27. if (te->next)
  28. te->next->prev = te->prev;
  29. if (te->finalizerProc) {
  30. te->finalizerProc(eventLoop, te->clientData);
  31. now = getMonotonicUs();
  32. }
  33. zfree(te);
  34. te = next;
  35. continue;
  36. }
  37. /* Make sure we don't process time events created by time events in
  38. * this iteration. Note that this check is currently useless: we always
  39. * add new timers on the head, however if we change the implementation
  40. * detail, this check may be useful again: we keep it here for future
  41. * defense. */
  42. if (te->id > maxId) {
  43. te = te->next;
  44. continue;
  45. }
  46. if (te->when <= now) {
  47. int retval;
  48. id = te->id;
  49. te->refcount++;
  50. // timeProc 返回值 retval 为事件事件执行的间隔
  51. retval = te->timeProc(eventLoop, id, te->clientData);
  52. te->refcount--;
  53. processed++;
  54. now = getMonotonicUs();
  55. if (retval != AE_NOMORE) {
  56. te->when = now + retval * 1000;
  57. } else {
  58. // 如果超时,那么标记为删除
  59. te->id = AE_DELETED_EVENT_ID;
  60. }
  61. }
  62. te = te->next;
  63. }
  64. return processed;
  65. }


作者:心城以北
原文链接:
https://juejin.cn/post/7069279726036058142

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/786670
推荐阅读
相关标签
  

闽ICP备14008679号