赞
踩
我们直接跑到最底层,看看kafka的网络层处理是怎么处理的。因为Java的NIO还是偏底层,不能直接用来做应用开发,所以一般都使用像netty的框架或者按照自己的需要封装一些nio,让上层业务不用关心网络处理的细节,只需要创建服务监听端口、接受请求、处理请求、写返回就可以了。我在看netty、thrift等涉及到网络的Java框架时比较喜欢去看他们的nio是怎么封装的,这里也是能够体现作者水平的地方。java nio的基本元素为Selector、Channel、ByteBuffer。
我们从server和client两端分别分析。
kafka server端在org.apache.kafka.common.network中进行了封装。
就像package.html里面写的。
1 2 3 4 5 6 7 8 9 10 11 12 |
|
网络层的启动在SocketServer.kafka中, 属于KafkaServer启动过程中的一部分
首先看一下server.properties中的网络相关配置
1 2 3 4 5 6 7 8 9 10 11 12 |
|
这个类上的注释阐述了kafka server的io线程模型
1 2 3 4 5 6 7 8 |
|
一共三种线程。一个Acceptor线程负责处理新连接请求,会有N个Processor线程,每个都有自己的Selector,负责从socket中读取请求和将返回结果写回。然后会有M个Handler线程,负责处理请求,并且将结果返回给Processor。
将Acceptor和Processor线程分开的目的是为了避免读写频繁影响新连接的接收。
SockerServer创建的时候通过server.properties和默认的配置中获取配置,如numNetworkThread(num.network.threads,也就是线程模型中的N)、
创建processor数组、acceptorMap(因为可能会在多个Endpoint接收请求)、memoryPool(SimpleMemoryPool里主要做的事情是统计监控ByteBuffer的使用)、requestChanne等 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
|
RequestChannel
因为Nio带来的异步特性,就是在一个连接上可以连续发送多个应用层的请求,每个请求得到是一个返回的Future。RequestChannel中将请求和返回结果放在各自的BlockingQueue中,也就是requestQueue和responseQueue,这里的request指客户端发来的请求。requestQueue的大小是queued.max.requests
定义的,默认500。而每个RequestChannel中有numProcessor大小个responseQueue(无界的LinkedBlockingQueue)。
这样Handler从requestQueue中取request处理得到response然后put到responseQueue中。Processor则把接收到的byte转换成requestput到requestQueue中,并从responseQueue中拉response写回给对应的socket。
startup中创建Processor、Acceptor。创建connectionQuotas, 限制每个客户端ip的最大连接数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
|
Acceptor创建过程中启动了Processor线程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
|
Acceptor和Processor启动后各自执行自己的loop。
Acceptor只负责接收新连接,并采用round-robin的方式交给各个Processor
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
|
Acceptor接收配置socket并传给processor
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
|
Processor的循环。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
|
处理新连接 configureNewConnections
Acceptor传过来的新socket放在了一个ConcorrentLinkedQueue中,
congiureNewConnections()负责获取ip端口号等信息然后注册到Processor自己的selector上。这个selector是Kafka封装了一层的KSelector
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
|
processNewResponses
从requestChannel中poll待写回的Response,这里是将Channel的send变量设置为Response.responseSend等待Selector处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
|
Selector.send
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
|
processCompletedReceives
Selector在接收到请求后,将数据放到一个List中,Processor取出后put到requestChannel的requestQueue中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
|
processCompletedSends
在Selector发送完成Resposne后,从inflightResponse中remove掉这个connnection -> resposne的键值对,当前inflightResposne只用于验证response的正确性,就是一个Channel写的数据必须在发送后先记录在inflightResponse中
1 2 3 4 5 6 7 8 9 |
|
processDisconnected
写失败的连接和由于各种原因close的连接,需要清理已经占用的内存空间,例如inflightResponses。
1 2 3 4 5 6 7 8 9 10 |
|
至此网络部分基本分析完成,后面有涉及到的要注意的地方会单独介绍。
startup完成后,KafkaServer继续完成其他的startup
clients包里分成主要Send、Receive、KafkaChannel和Selector四部分
Selectable是其中的网络操作的接口, Selector是具体的实现, 包括了发送请求、接收返回、建立连接、断开连接等操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
|
Send作为要发送数据的接口, 子类实现complete()方法用于判断是否已经发送完成,实现writeTo(GatheringByteChannel channel)方法来实现写入到Channel中,
size()方法返回要发送的数据的大小
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
|
以ByteBufferSend实现为例, 保存ByteBuffer数组作为要发送的内容,size就是这些ByteBuffer.remaining()的和,发送只需要委托给channe.write即可,每次发送后检查剩余待发送的大小,当没有待发送的内容并且channel中也都已经发送完成就表示Send已经完成了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
|
NetworkSend类继承了ByteBufferSend,增加了4字节表示内容大小(不包含这4byte)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
|
与Send对应的是Receive,表示从Channel中读取的数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
|
org.apache.kafka.common.network.Selector类则负责具体的连接写入读取等操作
下面分析下这几个操作的实现
connect过程,由于connect是异步的,所以connect方法返回后不一定已经连接成功了,需要等SelectionKey.isConnectable()后判断一次Channel.finishConnect才算连接成功。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
|
poll方法,poll方法会调用一次JavaSelector的select方法,然后处理SelectionKey,分成可连接可读可写
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
|
pollSelectionKeys
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
|
Selector.send(Send send)方法只需要找到对应的channel然后调用KafkaChanne.setSend(Send send), KafkaChannel中同时只允许写一个Send对象,发送完成才能发送下一个
KafkaClient是Kafka定义的高层的接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
|
关键的接口有send和poll, send方法将要发送的内容保存起来,真正的Channel读写发生在poll方法中
KafkaClient的实现类是NetworkClient。
ClientRequest中通过requetBuilder给不同类型的请求设置不同的请求内容
1 2 3 4 5 6 7 8 |
|
同样的,ClientResponse也有对应各个类型不同的返回体
1 2 3 4 5 6 7 8 9 |
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
|
poll的处理流程为
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
|
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。