赞
踩
阻塞IO、非阻塞IO、异步 IO 。
同步阻塞 模型,一个客户单对应一个链接的处理线程
缺点:
1、IO 中如果进行 read 是阻塞操作,如果请求的链接操作不做任何操作,也会导致线程阻塞,浪费线程资源
2、如果线程很多,会导致服务器压力增加,比如 C10K问题
引用场景:
BIO 方式运用数目比较小且固定的架构,这种方式对服务器资源要求比较高,但是程序简单容易理解。
同步非阻塞,是服务器实现的模式是一个线程可以处理多个请求(链接),客户端发送的链接都会注册到多路复用器 selector 上,多路复用器轮训到介入的所有 IO 请求进行处理。
应用场景:
NIO方式适用于链接数目多(轻操作) 的架构,比如聊天服务器,弹幕系统,服务器间通讯,编程比较复杂。Java NIO 模型如下图所示:
总结:
NIO 的三大核心组件:Channel(通道)、Buffer (缓冲区)、Selector (多路复用器)
1、Channel 类似流,每个 Channel 对应一个 buffer 缓冲区。
2、Channel 组册到 Selector 上,由 Selecotor 根据 Channel 读写事件发生时交给空闲线程处理。
3、NIO 中 Buffer 与 Channel 都是可读可写的。
在 linux 系统中是通过调用系统内核函数来创建 socket ,selecotor 对应操作系统的 epoll 描述符。可以将 socket 的连接文件描述符绑定到 epoll 文件描述符上,进行事件的异步通知,实现一个线程处理,并且减少大量的无效遍历,事件处理交给了操作系统的内核,提升效率。
Redis 是一个典型的基于 epoll 的 nio 线程模型, epoll 实例手机所有的事件(连接与读事件)由一个服务线程处理所有命令。
Redis 底层相关的 epoll 的源码实现在 src/ae_epoll.c 文件中。
异步非阻塞、由于操作系统完成后回调通知程序启动线程去处理,一般适用于链接较多且链接时间较长的应用。
应用场景:
AIO 方式适用于链接数目多且比较长(重操作),比如设备每间隔 2秒上报状态。
BIO | NIO | AIO | |
IO模型 | 同步阻塞 | 同步非阻塞(多路复用) | 异步非阻塞 |
编程难度 | 简单 | 复杂 | 复杂 |
可靠性 | 差 | 好 | 好 |
吞吐量 | 高 | 高 | 高 |
处理流程:
优点和缺点:
服务端(基于 netty):
- // 基于 Java 代码为例
- EventLoopGroup bossGroup = new NioEventLoopGroup();
- EventLoopGroup workerGroup = new NioEventLoopGroup();
- try {
- ServerBootstrap b = new ServerBootstrap();
- b.group(bossGroup, workerGroup)
- .channel(NioServerSocketChannel.class)
- .option(ChannelOption.SO_BACKLOG, 4096)
- .childHandler(new JkvServerInitalizer());
-
- ChannelFuture f = b.bind(SERVER_PORT).sync();
- f.channel().closeFuture().sync();
- } finally {
- bossGroup.shutdownGracefully();
- workerGroup.shutdownGracefully();
- }
客户端(基于 netty):
- EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
- try {
-
- Bootstrap bootstrap = new Bootstrap();
- bootstrap.group(eventLoopGroup)
- .channel(NioSocketChannel.class)
- .handler(new MyChatClientInitializer());
-
- Channel channel = bootstrap.connect("localhost",SERVER_PORT).sync().channel();
-
- BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
- for (;;) {
- channel.writeAndFlush(br.readLine() + "\r\n");
- }
- }finally {
- eventLoopGroup.shutdownGracefully();
- }
Redis 采用的是单线程 Reactor。单机压测QPS可以达到10w , 因为 Redis 主要是以内存读写为主,效率是非常高的。
Redis 服务器是一个事件驱动的程序,服务器需要处理一下两类事件:
1、文件事件(file event): Redis 服务器通过套接字与客户端(或者其他 Redis 服务器)进行连接,而文件事件就是服务器对套接字操作的抽。服务器与客户端(或者其他服务器)的通讯都会产生相应的文件事件,而服务器则通过监听并且处理这些事件来完成一些列网络通讯操作
2、 事件事件(time event): Redis 服务器中国呢的一些操作(比如 serverCron 函数)需要在给定的事件点执行,而时间事件就是服务器对着咧定时操作的抽象。
Redis基于Reactor模式开发了自己的网络事件处理器:这个处理器被称为文件事件处理器(file event handler)
文件事件构成,文件事件处理器的4个部分: 套接字、 I/O 多路复用程序、文件事件派发器(dispatcher)、以及事件处理器。
多路复用器, 的所有功能都是通过包装常见的 select、epoll 、evport 和 kququee 这些 i/o 多路复用函数库来实现了,每个 i/o 多路复用器在 redis 中都对应一个单独的文件比如:src\ae_epoll.c、src\ac_evport.c、src\ac_kqueue.c、src\ac_select.c 等。
因为 Redis 每个 I/O 多路复用函数库都实现了相同的 API , 所以 I/O 多路复用程序的底层实现是可以互换的。
Redis 在 I/O 多路复用程序实现源码中通过 #include 宏定义了相应的谷子额,程序会在编译期间自动选择系统中性能最高的 I/O 多路复用函数库来作为 Redis 的 I/O多路复用程序的底层实现:
- /* Include the best multiplexing layer supported by this system.
- * The following should be ordered by performances, descending. */
- #ifdef HAVE_EVPORT
- #include "ae_evport.c"
- #else
- #ifdef HAVE_EPOLL
- #include "ae_epoll.c"
- #else
- #ifdef HAVE_KQUEUE
- #include "ae_kqueue.c"
- #else
- #include "ae_select.c"
- #endif
- #endif
- #endif
I/O 多路复用程序可以监听多个套接字的 ae.h/AE_READABLE 事件和 ae.h/AE_WRITABLE 事件,这两类事件和套接字操作之间的对应关系如下:
如果套接字同时可读可写,那么服务器先读套接字,后写套接字。
1、连接应答处理器
networking.c/acceptTcpHandler函数是Redis的连接应答处理器,这个处理器用于对连接服务器监听套接字的客户端进行应答,具体实现为 sys/socket.h/accept 函数的包装。
- void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
- int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
- char cip[NET_IP_STR_LEN];
- UNUSED(el);
- UNUSED(mask);
- UNUSED(privdata);
-
- while(max--) {
- cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
- if (cfd == ANET_ERR) {
- if (errno != EWOULDBLOCK)
- serverLog(LL_WARNING,
- "Accepting client connection: %s", server.neterr);
- return;
- }
- anetCloexec(cfd);
- serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
- acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
- }
- }
2、命令请求处理器
networking.c/readQueryFromClient 函数是Redis的命令请求处理器,这个处理器负责从套接字中读入客户端发送的命令请求内容,具体实现为 unistd.h/read 函数的包装。
- void readQueryFromClient(connection *conn) {
- client *c = connGetPrivateData(conn);
- int nread, readlen;
- size_t qblen;
-
- /* Check if we want to read from the client later when exiting from
- * the event loop. This is the case if threaded I/O is enabled. */
- if (postponeClientRead(c)) return;
-
- /* Update total number of reads on server */
- atomicIncr(server.stat_total_reads_processed, 1);
-
- readlen = PROTO_IOBUF_LEN;
- /* If this is a multi bulk request, and we are processing a bulk reply
- * that is large enough, try to maximize the probability that the query
- * buffer contains exactly the SDS string representing the object, even
- * at the risk of requiring more read(2) calls. This way the function
- * processMultiBulkBuffer() can avoid copying buffers to create the
- * Redis Object representing the argument. */
- if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
- && c->bulklen >= PROTO_MBULK_BIG_ARG)
- {
- ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf);
-
- /* Note that the 'remaining' variable may be zero in some edge case,
- * for example once we resume a blocked client after CLIENT PAUSE. */
- if (remaining > 0 && remaining < readlen) readlen = remaining;
- }
-
- qblen = sdslen(c->querybuf);
- if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
- c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
- nread = connRead(c->conn, c->querybuf+qblen, readlen);
- if (nread == -1) {
- if (connGetState(conn) == CONN_STATE_CONNECTED) {
- return;
- } else {
- serverLog(LL_VERBOSE, "Reading from client: %s",connGetLastError(c->conn));
- freeClientAsync(c);
- return;
- }
- } else if (nread == 0) {
- serverLog(LL_VERBOSE, "Client closed connection");
- freeClientAsync(c);
- return;
- } else if (c->flags & CLIENT_MASTER) {
- /* Append the query buffer to the pending (not applied) buffer
- * of the master. We'll use this buffer later in order to have a
- * copy of the string applied by the last command executed. */
- c->pending_querybuf = sdscatlen(c->pending_querybuf,
- c->querybuf+qblen,nread);
- }
- sdsIncrLen(c->querybuf,nread);
- c->lastinteraction = server.unixtime;
- if (c->flags & CLIENT_MASTER) c->read_reploff += nread;
- atomicIncr(server.stat_net_input_bytes, nread);
- if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
- sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();
- bytes = sdscatrepr(bytes,c->querybuf,64);
- serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
- sdsfree(ci);
- sdsfree(bytes);
- freeClientAsync(c);
- return;
- }
- /* There is more data in the client input buffer, continue parsing it
- * in case to check if there is a full command to execute. */
- processInputBuffer(c);
- }
3、命令回复处理器
networking.c/sendReplyToClient 函数是Redis的命令回复处理器,这个处理器负责将服务器执行命令后得到的命令回复通过套接字返回给客户端,具体实现为unistd.h/write 函数的包装。
- /* Write event handler. Just send data to the client. */
- void sendReplyToClient(connection *conn) {
- client *c = connGetPrivateData(conn);
- writeToClient(c,1);
- }
实际上redis支持的是周期任务事件,即执行完之后不会删除,而是在重新插入链表。
定时器采用链表的方式进行管理,新定时任务插入链表表头。
- if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
- serverPanic("Can't create event loop timers.");
- exit(1);
- }
具体定时事件处理如下:
- /* Process time events */
- static int processTimeEvents(aeEventLoop *eventLoop) {
- int processed = 0;
- aeTimeEvent *te;
- long long maxId;
-
- te = eventLoop->timeEventHead;
- maxId = eventLoop->timeEventNextId-1;
- monotime now = getMonotonicUs();
-
- //删除定时器
- while(te) {
- long long id;
-
- /* Remove events scheduled for deletion. */
- // 下一轮中事件进行删除
- if (te->id == AE_DELETED_EVENT_ID) {
- aeTimeEvent *next = te->next;
- /* If a reference exists for this timer event,
- * don't free it. This is currently incremented
- * for recursive timerProc calls */
- if (te->refcount) {
- te = next;
- continue;
- }
- if (te->prev)
- te->prev->next = te->next;
- else
- eventLoop->timeEventHead = te->next;
- if (te->next)
- te->next->prev = te->prev;
- if (te->finalizerProc) {
- te->finalizerProc(eventLoop, te->clientData);
- now = getMonotonicUs();
- }
- zfree(te);
- te = next;
- continue;
- }
- /* Make sure we don't process time events created by time events in
- * this iteration. Note that this check is currently useless: we always
- * add new timers on the head, however if we change the implementation
- * detail, this check may be useful again: we keep it here for future
- * defense. */
- if (te->id > maxId) {
- te = te->next;
- continue;
- }
-
- if (te->when <= now) {
- int retval;
-
- id = te->id;
- te->refcount++;
- // timeProc 返回值 retval 为事件事件执行的间隔
- retval = te->timeProc(eventLoop, id, te->clientData);
- te->refcount--;
- processed++;
- now = getMonotonicUs();
- if (retval != AE_NOMORE) {
- te->when = now + retval * 1000;
- } else {
- // 如果超时,那么标记为删除
- te->id = AE_DELETED_EVENT_ID;
- }
- }
- te = te->next;
- }
- return processed;
- }
作者:心城以北
原文链接:
https://juejin.cn/post/7069279726036058142
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。