当前位置:   article > 正文

Flink消费Kafka错误:org.apache.kafka.common.KafkaException: Failed to construct kafka consumer_connectors.kafka.internal.kafkaconsumerthread.run

connectors.kafka.internal.kafkaconsumerthread.run

全部解决了

org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:717)
	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:597)
	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:579)
	at org.apache.flink.streaming.connectors.kafka.internal.Kafka09PartitionDiscoverer.initializeConnections(Kafka09PartitionDiscoverer.java:58)
	at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:470)
	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.KafkaException: Error registering mbean kafka.consumer:type=consumer-metrics,client-id=consumer-1
	at org.apache.kafka.common.metrics.JmxReporter.reregister(JmxReporter.java:159)
	at org.apache.kafka.common.metrics.JmxReporter.metricChange(JmxReporter.java:77)
	at org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:436)
	at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:249)
	at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:234)
	at org.apache.kafka.common.network.Selector$SelectorMetrics.<init>(Selector.java:680)
	at org.apache.kafka.common.network.Selector.<init>(Selector.java:140)
	at org.apache.kafka.common.network.Selector.<init>(Selector.java:147)
	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:658)
	... 11 more
Caused by: javax.management.InstanceAlreadyExistsException: kafka.consumer:type=consumer-metrics,client-id=consumer-1
	at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
	at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
	at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
	at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
	at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
	at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
	at org.apache.kafka.common.metrics.JmxReporter.reregister(JmxReporter.java:157)
	... 19 more
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
org.apache.kafka.common.KafkaException: Error unregistering mbean
	at org.apache.kafka.common.metrics.JmxReporter.unregister(JmxReporter.java:150)
	at org.apache.kafka.common.metrics.JmxReporter.reregister(JmxReporter.java:155)
	at org.apache.kafka.common.metrics.JmxReporter.metricChange(JmxReporter.java:77)
	at org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:436)
	at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:249)
	at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:234)
	at org.apache.kafka.clients.consumer.internals.Fetcher$FetchManagerMetrics.recordPartitionLag(Fetcher.java:1143)
	at org.apache.kafka.clients.consumer.internals.Fetcher$FetchManagerMetrics.access$1900(Fetcher.java:1018)
	at org.apache.kafka.clients.consumer.internals.Fetcher.drainRecords(Fetcher.java:533)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:488)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1061)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
	at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:257)
Caused by: javax.management.InstanceNotFoundException: kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-2
	at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.getMBean(DefaultMBeanServerInterceptor.java:1095)
	at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.exclusiveUnregisterMBean(DefaultMBeanServerInterceptor.java:427)
	at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.unregisterMBean(DefaultMBeanServerInterceptor.java:415)
	at com.sun.jmx.mbeanserver.JmxMBeanServer.unregisterMBean(JmxMBeanServer.java:546)
	at org.apache.kafka.common.metrics.JmxReporter.unregister(JmxReporter.java:148)
	... 12 more
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
org.apache.flink.util.FlinkException: The assigned slot container_1557122316998_0110_01_000310_3 was removed.
	at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlot(SlotManager.java:893)
	at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlots(SlotManager.java:863)
	at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.internalUnregisterTaskManager(SlotManager.java:1058)
	at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.unregisterTaskManager(SlotManager.java:385)
	at org.apache.flink.runtime.resourcemanager.ResourceManager.closeTaskManagerConnection(ResourceManager.java:825)
	at org.apache.flink.yarn.YarnResourceManager.lambda$onContainersCompleted$0(YarnResourceManager.java:362)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
	at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
	at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
	at akka.actor.ActorCell.invoke(ActorCell.scala:495)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
	at akka.dispatch.Mailbox.run(Mailbox.scala:224)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

解决方案:
前两个报错在网上搜了一堆,jira上也看了问题说明,大概意思是一个job中有多个kafka的输入,也就是同时消费多个topic,对应的client_id相同。代码中设置了不同的client_id后解决了,讲道理这个client_id是自己生成的,不应该人为来指定。
第三个错误是资源问题,之前申请的资源不太合理,在yarn上重新指定了个队列,指定了任务的队列。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/花生_TL007/article/detail/621311
推荐阅读
相关标签
  

闽ICP备14008679号