赞
踩
在 KafkaServer 中,会将 SocketServer 的请求通道传给 Kafka 请求处理线程 KafkaRequestHandler 和 KafkaApis
// 请求通道会保存全局的请求队列和每个处理器对应的响应队列 class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMetricsGroup { private var responseListeners: List[(Int) => Unit] = Nil def addResponseListener(onResponse: Int => Unit) { responseListeners ::= onResponse } // queueSize 默认 500,全局的请求队列 private val requestQueue = new ArrayBlockingQueue[RequestChannel.Request](queueSize) // 默认情况下有 3个 processor 线程,每个处理器都有一个响应队列 private val responseQueues = new Array[BlockingQueue[RequestChannel.Response]](numProcessors) for(i <- 0 until numProcessors) responseQueues(i) = new LinkedBlockingQueue[RequestChannel.Response]() /** Get the next request or block until specified time has elapsed */ // 获取下一个请求或块,直到经过指定的时间 // 处理器从请求队列中取出请求,队列为空会阻塞,直到有处理器加入新的请求 def receiveRequest(timeout: Long): RequestChannel.Request = { // 从队列里面获取 request 对象 requestQueue.poll(timeout, TimeUnit.MILLISECONDS) } /** Send a request to be handled, potentially blocking until there is room in the queue for the request */ // 发送一个待处理的请求,可能会阻塞,直到队列中有空间容纳该请求 // 如果请求队列满了,这个方法会阻塞在这里,直到有处理器取走一个请求 def sendRequest(request: RequestChannel.Request) { requestQueue.put(request) } /** Get a response for the given processor if there is one */ // 获取给定处理器的响应(如果有的话) def receiveResponse(processor: Int): RequestChannel.Response = { // 获取对应线程的对应队列里面的响应对象 val response = responseQueues(processor).poll() if (response != null) response.request.responseDequeueTimeMs = SystemTime.milliseconds response } /** Send a response back to the socket server to be sent over the network */ // 将响应发送回要通过网络发送的套接字服务器 // 发送响应给 SocketServer,并最终通过网络返回给客户端 def sendResponse(response: RequestChannel.Response) { // 响应存入一个队列里面,先从数组里面先取出对应的 Processor 队列,然后把响应放到这个队列里面 responseQueues(response.processor).put(response) for(onResponse <- responseListeners) onResponse(response.processor) } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。