赞
踩
Kafka服务端,即Broker,负责消息的持久化,是个不断接收外部请求、处理请求,然后发送处理结果的Java进程。
Broker的高处理性能在于高效保存排队中的请求。
Broker底层请求对象的建模
请求队列的实现原理
Broker请求处理方面的核心监控指标。
Broker与Clients主要基于Request/Response机制交互,所以看看如何建模或定义Request和Response。
定义了Kafka Broker支持的各类请求。
Request才是真正的定义各类Clients端或Broker端请求的实现类。
Processor线程的序号,即该请求由哪个Processor线程接收处理。
num.network.threads
控制Broker每个监听器上创建的Processor线程数假设listeners配置为PLAINTEXT://localhost:9092,SSL://localhost:9093
,则默认情况下Broker启动时会创建6个Processor线程,每3个为一组,分别给listeners参数中设置的两个监听器使用,每组的序号分别是0、1、2。
当Request被后面的I/O线程处理完成后,还要依靠Processor线程发送Response给请求方,因此,Request必须记录它之前被哪个Processor线程接收。
Processor线程只是网络接收线程,并不会执行真正的I/O线程才负责的Request请求处理逻辑。
维护Request对象被创建的时间,用于计算各种时间统计指标。
请求对象中的很多JMX(Java Management Extensions)指标,特别是时间类统计指标,都需要startTimeNanos字段,纳秒单位的时间戳信息,可实现细粒度时间统计精度。
一个非阻塞式内存缓冲区,用于避免Request对象无限使用内存。
内存缓冲区的接口类MemoryPool,实现类SimpleMemoryPool。可重点关注下SimpleMemoryPool#tryAllocate,怎么为Request对象分配内存。
@Override
public ByteBuffer tryAllocate(int sizeBytes) {
if (sizeBytes < 1)
throw new IllegalArgumentException("requested size " + sizeBytes + "<=0");
if (sizeBytes > maxSingleAllocationSize)
throw new IllegalArgumentException("requested size " + sizeBytes + " is larger than maxSingleAllocationSize " + maxSingleAllocationSize);
long available;
boolean success = false;
//in strict mode we will only allocate memory if we have at least the size required.
//in non-strict mode we will allocate memory if we have _any_ memory available (so available memory
//can dip into the negative and max allocated memory would be sizeBytes + maxSingleAllocationSize)
long threshold = strict ? sizeBytes : 1;
while ((available = availableMemory.get()) >= threshold) {
success = availableMemory.compareAndSet(available, available - sizeBytes);
if (success)
break;
}
if (success) {
maybeRecordEndOfDrySpell();
} else {
if (oomTimeSensor != null) {
startOfNoMemPeriod.compareAndSet(0, System.nanoTime());
}
log.trace("refused to allocate buffer of size {}", sizeBytes);
return null;
}
ByteBuffer allocated = ByteBuffer.allocate(sizeBytes);
bufferToBeReturned(allocated);
return allocated;
}
真正保存Request对象内容的字节缓冲区。Request发送方须按Kafka RPC协议规定格式向该缓冲区写入字节,否则抛InvalidRequestException。
当Broker接收到ApiVersionsRequest,它会返回Broker当前支持的请求类型列表,包括请求类型名称、支持的最早版本号和最新版本号。查看Kafka的bin目录,能找到kafka-broker-api-versions.sh
脚本工具。它就是,构造ApiVersionsRequest对象,然后发送给对应的Broker。
若是ApiVersions类型请求,代码中为什么要判断一下它的版本呢?
和处理其他类型请求不同,Kafka必须保证版本号比最新支持版本还要高的ApiVersions请求也能被处理。这主要是考虑客户端和服务器端版本兼容。客户端发请求给Broker,可能不知道Broker到底支持哪些版本请求,它需使用ApiVersionsRequest去获取完整请求版本支持列表。若不做该判断,Broker可能无法处理客户端发送的ApiVersionsRequest。
metrics是Request相关的各种监控指标的一个管理类。它构建了一个Map,封装了所有请求JMX指标。
定义了与Request对应的各类响应。
正常需要发送Response。
无需发送Response。
标识关闭连接通道的Response。
后两个Response类不常用,仅在对Socket连接进行限流时,才会使用。
Response代码
abstract class Response(val request: Request) {
locally {
val nowNs = Time.SYSTEM.nanoseconds
request.responseCompleteTimeNanos = nowNs
if (request.apiLocalCompleteTimeNanos == -1L)
request.apiLocalCompleteTimeNanos = nowNs
}
def processor: Int = request.processor
def responseString: Option[String] = Some("")
def onComplete: Option[Send => Unit] = None
override def toString: String
}
该抽象类只有一个属性字段:request。即每个Response对象都要保存它对应的Request对象。
onComplete方法是调用指定回调逻辑的地方。
SendResponse类就重写了该方法:
class SendResponse(request: Request,
val responseSend: Send,
val responseAsString: Option[String],
val onCompleteCallback: Option[Send => Unit])
extends Response(request) {
......
// 指定输入参数onCompleteCallback
override def onComplete: Option[Send => Unit] = onCompleteCallback
}
onComplete方法把函数赋值给另一个函数,并作为结果返回。好处在于可以灵活变更onCompleteCallback实现不同回调逻辑。
实现了Kafka Request队列。传输Request/Response的通道。有了Request和Response的基础,下面我们可以学习RequestChannel类的实现了。
RequestChannel类实现KafkaMetricsGroup trait,后者封装许多实用指标监控方法:
每个RequestChannel对象实例创建时,会定义队列保存Broker接收到的各类请求,这个队列被称为请求队列或Request队列。
Kafka使用Java提供的阻塞队列ArrayBlockingQueue实现请求队列,并利用它天然提供的线程安全保证多个线程能够并发安全高效地访问请求队列。
Request队列的最大长度。当Broker启动时,SocketServer组件会创建RequestChannel对象
并把Broker端参数queued.max.requests
赋值给queueSize。默认情况每个RequestChannel上的队列长度500。
封装RequestChannel的Processor线程池。每个Processor线程负责具体的请求处理逻辑。
刚才的processors即是被创建的Processor线程池,使用Java#ConcurrentHashMap保存:
因此当前Kafka Broker端所有网络线程都是在RequestChannel中维护的。
分别实现增加和移除线程。每当Broker启动,都会调用addProcessor方法,向RequestChannel对象添加num.network.threads
个Processor线程。
num.network.threads
这个参数的更新模式(Update Mode)是Cluster-wide,即Kafka允许你动态修改此参数值。比如,Broker启动时指定num.network.threads
为8,之后你通过kafka-configs命令将其修改为3。显然该操作会减少Processor线程池中的线程数量。在这个场景下,removeProcessor方法会被调用。
即收发Request和发送Response。
sendRequest和receiveRequest:
整个流程其实就是“生产者-消费者”模式,依靠ArrayBlockingQueue的线程安全确保整个过程的线程安全
没有所谓的接收Response,只有发送Response,即sendResponse方法。sendResponse是啥意思呢?其实就是把Response对象发送出去,也就是将Response添加到Response队列的过程。
当Processor处理完某个Request后,会把自己的序号封装进对应的Response对象。
一旦找出之前是由哪个Processor线程处理,代码直接调用该Processor的enqueueResponse方法,将Response放入Response队列中,等待后续发送。
RequestChannel类定义封装了与Request队列相关的重要监控指标,以实时动态地监测Request和Response的性能表现。
object RequestMetrics {
val consumerFetchMetricName = ApiKeys.FETCH.name + "Consumer"
val followFetchMetricName = ApiKeys.FETCH.name + "Follower"
val RequestsPerSec = "RequestsPerSec"
val RequestQueueTimeMs = "RequestQueueTimeMs"
val LocalTimeMs = "LocalTimeMs"
val RemoteTimeMs = "RemoteTimeMs"
val ThrottleTimeMs = "ThrottleTimeMs"
val ResponseQueueTimeMs = "ResponseQueueTimeMs"
val ResponseSendTimeMs = "ResponseSendTimeMs"
val TotalTimeMs = "TotalTimeMs"
val RequestBytes = "RequestBytes"
val MessageConversionsTimeMs = "MessageConversionsTimeMs"
val TemporaryMemoryBytes = "TemporaryMemoryBytes"
val ErrorsPerSec = "ErrorsPerSec"
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。