当前位置:   article > 正文

Kafka从上手到实践 - Kafka集群:Kafka Listeners | 凌云时刻

inter.broker.listener.name

凌云时刻 · 技术

导读:这一章节主要对和Listener相关的四个配置项做以详细解释。

作者 | 计缘

来源 | 凌云时刻(微信号:linuxpk)

概述

listenersadvertised.listenerslistener.security.protocol.mapinter.broker.listener.name这四个配置项可能是大家最容易混淆和最不容易理解的。

在解释这些配置项之前,我们先来明确几个概念。

  • 部署Broker的阿里云ECS称为Host Machine。

  • 在阿里云ECS里启动的Producer或者Consumer,比如使用Kafka CLI启动的称为Internal Client。

  • 在大家的IDEA中使用Java编写的,或者第三方的Producer/Consumer,称为External Client。

  • Host Machine具有外网IP和内网IP。

  • Internal Client可以同时和Host Machine的外网IP及内网IP通信。

  • External Client只能和Host Machine的外网IP通信。

  • 多个阿里云ECS之间可以同时通过外网IP及内网IP通信。

    • 既在这个特定的场景下,Host Machine之间可以同时通过外网IP及内网IP通信。

    • 再换句话说就是不同Host Machine上的Broker之间可以同时通过外网IP及内网IP通信。

如上图所示,是一个很常见的Kafka集群场景,涵盖了上述的概念。图中那些通信虚线箭头就是靠Kafka的Listener建立的,并且是通过Kafka中不同的Listener建立的,这些Listener分为Internal Listener和External Listener。如下图所示:

那么这些Listener的创建以及内外部如何通信都是由上面那四个配置项决定的。

listener.security.protocol.map


先来看listener.security.protocol.map配置项,在上一章节中介绍过,它是配置监听者的安全协议的,比如PLAINTEXTSSLSASL_PLAINTEXTSASL_SSL。因为它是以Key/Value的形式配置的,所以往往我们也使用该参数给Listener命名:

listener.security.protocol.map=EXTERNAL_LISTENER_CLIENTS:SSL,INTERNAL_LISTENER_CLIENTS:PLAINTEXT,INTERNAL_LISTENER_BROKER:PLAINTEXT

使用Key作为Listener的名称。就如上图所示,Internal Producer、External Producer、Internal Consumer、External Consumer和Broker通信以及Broker之间互相通信时都很有可能使用不同的Listener。这些不同的Listener有监听内网IP的,有监听外网IP的,还有不同安全协议的,所以使用Key来表示更加直观。当然这只是一种非官方的用法,Key本质上还是代表了安全协议,如果只有一个安全协议,多个Listener的话,那么这些Listener所谓的名称肯定都是相同的。

listeners

listeners就是主要用来定义Kafka Broker的Listener的配置项。

listeners=EXTERNAL_LISTENER_CLIENTS://阿里云ECS外网IP:9092,INTERNAL_LISTENER_CLIENTS://阿里云ECS内网IP:9093,INTERNAL_LISTENER_BROKER://阿里云ECS内网IP:9094

上面的配置表示,这个Broker定义了三个Listener,一个External Listener,用于External Producer和External Consumer连接使用。也许因为业务场景的关系,Internal Producer和Broker之间使用不同的安全协议进行连接,所以定义了两个不同协议的Internal Listener,分别用于Internal Producer和Broker之间连接使用。

通过之前的章节,我们知道Kafka是由Zookeeper进行管理的,由Zookeeper负责Leader选举,Broker Rebalance等工作。所以External Producer和External Consumer其实是通过Zookeeper中提供的信息和Broker通信交互的。所以listeners中配置的信息都会发布到Zookeeper中,但是这样就会把Broker的所有Listener信息都暴露给了外部Clients,在安全上是存在隐患的,我们希望只把给外部Clients使用的Listener暴露出去,此时就需要用到下面这个配置项了。

advertised.listeners

advertised.listeners参数的作用就是将Broker的Listener信息发布到Zookeeper中,供Clients(Producer/Consumer)使用。如果配置了advertised.listeners,那么就不会将listeners配置的信息发布到Zookeeper中去了:

advertised.listeners=EXTERNAL_LISTENER_CLIENTS://阿里云ECS外网IP:9092

这里在Zookeeper中发布了供External Clients(Producer/Consumer)使用的ListenerEXTERNAL_LISTENER_CLIENTS。所以advertised.listeners配置项实现了只把给外部Clients使用的Listener暴露出去的需求。

inter.broker.listener.name

这个配置项从名称就可以看出它的作用了,就是指定一个listener.security.protocol.map配置项中配置的Key,或者说指定一个或一类Listener的名称,将它作为Internal Listener。这个Listener专门用于Kafka集群中Broker之间的通信

inter.broker.listener.name=INTERNAL_LISTENER_BROKER

listener 和 advertised.listeners 的关系

先来看看KafkaConfig.scalaSocketServer.scala源码中的这几行代码片段:

  1. // KafkaConfig.scala
  2. ...
  3. val ListenersProp = "listeners"
  4. ...
  5. def dataPlaneListeners: Seq[EndPoint] = {
  6. Option(getString(KafkaConfig.ControlPlaneListenerNameProp)) match {
  7. case Some(controlPlaneListenerName) => listeners.filterNot(_.listenerName.value() == controlPlaneListenerName)
  8. case None => listeners
  9. }
  10. }
  11. ...
  12. def listeners: Seq[EndPoint] = {
  13. Option(getString(KafkaConfig.ListenersProp)).map { listenerProp =>
  14. CoreUtils.listenerListToEndPoints(listenerProp, listenerSecurityProtocolMap)
  15. }.getOrElse(CoreUtils.listenerListToEndPoints("PLAINTEXT://" + hostName + ":" + port, listenerSecurityProtocolMap))
  16. }
  17. // SocketServer.scala
  18. def startup(startupProcessors: Boolean = true) {
  19. this.synchronized {
  20. connectionQuotas = new ConnectionQuotas(config.maxConnectionsPerIp, config.maxConnectionsPerIpOverrides)
  21. createControlPlaneAcceptorAndProcessor(config.controlPlaneListener)
  22. createDataPlaneAcceptorsAndProcessors(config.numNetworkThreads, config.dataPlaneListeners)
  23. if (startupProcessors) {
  24. startControlPlaneProcessor()
  25. startDataPlaneProcessors()
  26. }
  27. }
  28. ...
  29. private def createDataPlaneAcceptorsAndProcessors(dataProcessorsPerListener: Int,
  30. endpoints: Seq[EndPoint]): Unit = synchronized {
  31. endpoints.foreach { endpoint =>
  32. val dataPlaneAcceptor = createAcceptor(endpoint)
  33. addDataPlaneProcessors(dataPlaneAcceptor, endpoint, dataProcessorsPerListener)
  34. KafkaThread.nonDaemon(s"data-plane-kafka-socket-acceptor-${endpoint.listenerName}-${endpoint.securityProtocol}-${endpoint.port}", dataPlaneAcceptor).start()
  35. dataPlaneAcceptor.awaitStartup()
  36. dataPlaneAcceptors.put(endpoint, dataPlaneAcceptor)
  37. info(s"Created data-plane acceptor and processors for endpoint : $endpoint")
  38. }
  39. }

startup()方法是Kafka Broker创建启动Socket连接的入口,既用来创建Acceptor线程的入口,该线程负责处理Socket连接。 createDataPlaneAcceptorsAndProcessors()方法的第二个参数config.dataPlaneListeners可以看到取的就是listeners配置项的内容。

 

  1. /**
  2. * Create a server socket to listen for connections on.
  3. */
  4. private def openServerSocket(host: String, port: Int): ServerSocketChannel = {
  5. val socketAddress =
  6. if (host == null || host.trim.isEmpty)
  7. new InetSocketAddress(port)
  8. else
  9. new InetSocketAddress(host, port)
  10. val serverChannel = ServerSocketChannel.open()
  11. serverChannel.configureBlocking(false)
  12. if (recvBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
  13. serverChannel.socket().setReceiveBufferSize(recvBufferSize)
  14. try {
  15. serverChannel.socket.bind(socketAddress)
  16. info("Awaiting socket connections on %s:%d.".format(socketAddress.getHostString, serverChannel.socket.getLocalPort))
  17. } catch {
  18. case e: SocketException =>
  19. throw new KafkaException("Socket server failed to bind to %s:%d: %s.".format(socketAddress.getHostString, port, e.getMessage), e)
  20. }
  21. serverChannel
  22. }

跟到里面,可以看到如果没有配置listeners,那么会使用网卡地址创建Socket连接,对于阿里云ECS,就是内网IP。

再来看看KafkaServer.scala源码中的这几行代码片段:

  1. ...
  2. val brokerInfo = createBrokerInfo
  3. val brokerEpoch = zkClient.registerBroker(brokerInfo)
  4. ...
  5. private[server] def createBrokerInfo: BrokerInfo = {
  6. val endPoints = config.advertisedListeners.map(e => s"${e.host}:${e.port}")
  7. zkClient.getAllBrokersInCluster.filter(_.id != config.brokerId).foreach { broker =>
  8. val commonEndPoints = broker.endPoints.map(e => s"${e.host}:${e.port}").intersect(endPoints)
  9. require(commonEndPoints.isEmpty, s"Configured end points ${commonEndPoints.mkString(",")} in" +
  10. s" advertised listeners are already registered by broker ${broker.id}")
  11. }
  12. val listeners = config.advertisedListeners.map { endpoint =>
  13. if (endpoint.port == 0)
  14. endpoint.copy(port = socketServer.boundPort(endpoint.listenerName))
  15. else
  16. endpoint
  17. }
  18. val updatedEndpoints = listeners.map(endpoint =>
  19. if (endpoint.host == null || endpoint.host.trim.isEmpty)
  20. endpoint.copy(host = InetAddress.getLocalHost.getCanonicalHostName)
  21. else
  22. endpoint
  23. )
  24. val jmxPort = System.getProperty("com.sun.management.jmxremote.port", "-1").toInt
  25. BrokerInfo(Broker(config.brokerId, updatedEndpoints, config.rack), config.interBrokerProtocolVersion, jmxPort)
  26. }

从上面的代码可以看到,advertised.listeners主要用于向Zookeeper注册Broker的连接信息,但是不参与创建Socket连接。

所以从这几处源码内容可以得出结论,Kafka Broker真正建立通信连接使用的是listeners配置项里的内容,而advertised.listeners只用于向Zookeeper注册Broker的连接信息,既向Client暴露Broker对外的连接信息(Endpoint)。

另外在KafkaConfig.scala源码中还有有这么几行代码:

 

  1. val advertisedListenerNames = advertisedListeners.map(_.listenerName).toSet
  2. val listenerNames = listeners.map(_.listenerName).toSet
  3. require(advertisedListenerNames.contains(interBrokerListenerName),
  4. s"${KafkaConfig.InterBrokerListenerNameProp} must be a listener name defined in ${KafkaConfig.AdvertisedListenersProp}. " +
  5. s"The valid options based on currently configured listeners are ${advertisedListenerNames.map(_.value).mkString(",")}")
  6. require(advertisedListenerNames.subsetOf(listenerNames),
  7. s"${KafkaConfig.AdvertisedListenersProp} listener names must be equal to or a subset of the ones defined in ${KafkaConfig.ListenersProp}. " +
  8. s"Found ${advertisedListenerNames.map(_.value).mkString(",")}. The valid options based on the current configuration " +
  9. s"are ${listenerNames.map(_.value).mkString(",")}"

从上面的代码片段可以得出两个结论:

  • advertised.listeners配置项中配置的Listener名称或者说安全协议必须在listeners中存在。因为真正创建连接的是listeners中的信息。

  • inter.broker.listener.name配置项中配置的Listener名称或者说安全协议必须在advertised.listeners中存在。因为Broker之间也是要通过advertised.listeners配置项获取Internal Listener信息的。

小结

这一章节主要大家详细解释了Broker几个比较容易混淆和不好理解的配置项,解释了什么是内外部Listener,如何暴露Listener等。这些配置在我们搭建Kafka集群时至关重要。希望能给小伙伴们带来帮助。

  

 

  

END

往期精彩文章回顾

Kafka从上手到实践 - Kafka集群:配置Broker

Kafka从上手到实践:搭建Zookeeper集群

Kafka从上手到实践-Zookeeper CLI:CRUD zNode

Kafka从上手到实践 - 初步认知:Zookeeper

Kafka从上手到实践:Kafka Java Consumer

Kafka从上手到实践:Kafka Java Producer

Kafka CLI:Reseting Offset & Config CLI

Kafka CLI:Consumer CLI & Producer CLI

Kafka CLI:Topic CLI & Producer CLI

Kafka从上手到实践 - 实践真知:搭建单机Kafka

Kafka从上手到实践 - 庖丁解牛:Consumer

Kafka从上手到实践 - 庖丁解牛:Producer

Kafka从上手到实践 - 庖丁解牛:Partition

Kafka从上手到实践 - 庖丁解牛:Topic & Broker

Kafka从上手到实践 - 初步认知:MQ系统

长按扫描二维码关注凌云时刻

每日收获前沿技术与科技洞见

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/822392
推荐阅读
相关标签
  

闽ICP备14008679号