赞
踩
全部解决了
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:717) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:597) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:579) at org.apache.flink.streaming.connectors.kafka.internal.Kafka09PartitionDiscoverer.initializeConnections(Kafka09PartitionDiscoverer.java:58) at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:470) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.kafka.common.KafkaException: Error registering mbean kafka.consumer:type=consumer-metrics,client-id=consumer-1 at org.apache.kafka.common.metrics.JmxReporter.reregister(JmxReporter.java:159) at org.apache.kafka.common.metrics.JmxReporter.metricChange(JmxReporter.java:77) at org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:436) at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:249) at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:234) at org.apache.kafka.common.network.Selector$SelectorMetrics.<init>(Selector.java:680) at org.apache.kafka.common.network.Selector.<init>(Selector.java:140) at org.apache.kafka.common.network.Selector.<init>(Selector.java:147) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:658) ... 11 more Caused by: javax.management.InstanceAlreadyExistsException: kafka.consumer:type=consumer-metrics,client-id=consumer-1 at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898) at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966) at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900) at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324) at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) at org.apache.kafka.common.metrics.JmxReporter.reregister(JmxReporter.java:157) ... 19 more
org.apache.kafka.common.KafkaException: Error unregistering mbean at org.apache.kafka.common.metrics.JmxReporter.unregister(JmxReporter.java:150) at org.apache.kafka.common.metrics.JmxReporter.reregister(JmxReporter.java:155) at org.apache.kafka.common.metrics.JmxReporter.metricChange(JmxReporter.java:77) at org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:436) at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:249) at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:234) at org.apache.kafka.clients.consumer.internals.Fetcher$FetchManagerMetrics.recordPartitionLag(Fetcher.java:1143) at org.apache.kafka.clients.consumer.internals.Fetcher$FetchManagerMetrics.access$1900(Fetcher.java:1018) at org.apache.kafka.clients.consumer.internals.Fetcher.drainRecords(Fetcher.java:533) at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:488) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1061) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:257) Caused by: javax.management.InstanceNotFoundException: kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-2 at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.getMBean(DefaultMBeanServerInterceptor.java:1095) at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.exclusiveUnregisterMBean(DefaultMBeanServerInterceptor.java:427) at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.unregisterMBean(DefaultMBeanServerInterceptor.java:415) at com.sun.jmx.mbeanserver.JmxMBeanServer.unregisterMBean(JmxMBeanServer.java:546) at org.apache.kafka.common.metrics.JmxReporter.unregister(JmxReporter.java:148) ... 12 more
org.apache.flink.util.FlinkException: The assigned slot container_1557122316998_0110_01_000310_3 was removed. at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlot(SlotManager.java:893) at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlots(SlotManager.java:863) at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.internalUnregisterTaskManager(SlotManager.java:1058) at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.unregisterTaskManager(SlotManager.java:385) at org.apache.flink.runtime.resourcemanager.ResourceManager.closeTaskManagerConnection(ResourceManager.java:825) at org.apache.flink.yarn.YarnResourceManager.lambda$onContainersCompleted$0(YarnResourceManager.java:362) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) at akka.actor.Actor$class.aroundReceive(Actor.scala:502) at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) at akka.actor.ActorCell.invoke(ActorCell.scala:495) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at akka.dispatch.Mailbox.run(Mailbox.scala:224) at akka.dispatch.Mailbox.exec(Mailbox.scala:234) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
解决方案:
前两个报错在网上搜了一堆,jira上也看了问题说明,大概意思是一个job中有多个kafka的输入,也就是同时消费多个topic,对应的client_id相同。代码中设置了不同的client_id后解决了,讲道理这个client_id是自己生成的,不应该人为来指定。
第三个错误是资源问题,之前申请的资源不太合理,在yarn上重新指定了个队列,指定了任务的队列。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。