赞
踩
背景:flink_2.12-1.14.3
kafka_2.12-3.1.0
#kafka的读取ddl CREATE TABLE source ( name STRING, age STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'test', 'properties.bootstrap.servers' = 'ip:prot', 'properties.group.id' = 'group_test', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); #kafka的写入ddl CREATE TABLE sink ( name STRING, sex STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'test1', 'properties.bootstrap.servers' = 'ip:prot', 'format' = 'json' ); #业务逻辑实现 insert into sink select * from source;
!!!可能出现的问题:
1.Caused by: java.lang.NoClassDefFoundError: org/apache/kafka/clients/consumer/OffsetResetStrategy
这是缺少kafka-clients的jar包,只需将下载的kafka-clients jar包放在lib目录下重启flink,提交任务
Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue. at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:201) at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161) Caused by: java.lang.NoClassDefFoundError: org/apache/kafka/clients/consumer/OffsetResetStrategy at org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer.earliest(OffsetsInitializer.java:147) at org.apache.flink.connector.kafka.source.KafkaSourceBuilder.<init>(KafkaSourceBuilder.java:104) at org.apache.flink.connector.kafka.source.KafkaSource.builder(KafkaSource.java:117) at org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.createKafkaSource(KafkaDynamicSource.java:379) at org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.getScanRuntimeProvider(KafkaDynamicSource.java:216) at org.apache.flink.table.planner.connectors.DynamicSourceUtils.validateScanSource(DynamicSourceUtils.java:452) at org.apache.flink.table.planner.connectors.DynamicSourceUtils.prepareDynamicSource(DynamicSourceUtils.java:160) at org.apache.flink.table.planner.connectors.DynamicSourceUtils.convertSourceToRel(DynamicSourceUtils.java:124) at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:85) at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585) at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:177) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:169) at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:1057) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:1026) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:301) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:639) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:290) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101) at org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$parseStatement$1(LocalExecutor.java:172) at org.apache.flink.table.client.gateway.context.ExecutionContext.wrapClassLoader(ExecutionContext.java:88) at org.apache.flink.table.client.gateway.local.LocalExecutor.parseStatement(LocalExecutor.java:172) at org.apache.flink.table.client.cli.CliClient.parseCommand(CliClient.java:396) at org.apache.flink.table.client.cli.CliClient.executeStatement(CliClient.java:324) at org.apache.flink.table.client.cli.CliClient.executeFile(CliClient.java:314) at org.apache.flink.table.client.cli.CliClient.executeInNonInteractiveMode(CliClient.java:230) at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:153) at org.apache.flink.table.client.SqlClient.start(SqlClient.java:95) at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187) ... 1 more Caused by: java.lang.ClassNotFoundException: org.apache.kafka.clients.consumer.OffsetResetStrategy at java.net.URLClassLoader.findClass(URLClassLoader.java:387) at java.lang.ClassLoader.loadClass(ClassLoader.java:418) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352) at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ... 37 more
2.org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
这是因为kafka的导入的包与版本不一致导致的错误,按照安装的版本重新下载jar包即可
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138) ~[flink-dist_2.12-1.14.3.jar:1.14.3] at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getGlobalFailureHandlingResult(ExecutionFailureHandler.java:101) ~[flink-dist_2.12-1.14.3.jar:1.14.3] at org.apache.flink.runtime.scheduler.DefaultScheduler.handleGlobalFailure(DefaultScheduler.java:273) ~[flink-dist_2.12-1.14.3.jar:1.14.3] at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.lambda$failJob$0(OperatorCoordinatorHolder.java:563) ~[flink-dist_2.12-1.14.3.jar:1.14.3] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:455) ~[flink-rpc-akka_c979f80b-8dd8-49a4-a019-20da907d2799.jar:1.14.3] at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-rpc-akka_c979f80b-8dd8-49a4-a019-20da907d2799.jar:1.14.3] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:455) ~[flink-rpc-akka_c979f80b-8dd8-49a4-a019-20da907d2799.jar:1.14.3] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213) ~[flink-rpc-akka_c979f80b-8dd8-49a4-a019-20da907d2799.jar:1.14.3] at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78) ~[flink-rpc-akka_c979f80b-8dd8-49a4-a019-20da907d2799.jar:1.14.3] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163) ~[flink-rpc-akka_c979f80b-8dd8-49a4-a019-20da907d2799.jar:1.14.3] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) [flink-rpc-akka_c979f80b-8dd8-49a4-a019-20da907d2799.jar:1.14.3] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) [flink-rpc-akka_c979f80b-8dd8-49a4-a019-20da907d2799.jar:1.14.3] at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) [flink-rpc-akka_c979f80b-8dd8-49a4-a019-20da907d2799.jar:1.14.3] at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) [flink-rpc-akka_c979f80b-8dd8-49a4-a019-20da907d2799.jar:1.14.3] at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) [flink-rpc-akka_c979f80b-8dd8-49a4-a019-20da907d2799.jar:1.14.3] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-rpc-akka_c979f80b-8dd8-49a4-a019-20da907d2799.jar:1.14.3] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-rpc-akka_c979f80b-8dd8-49a4-a019-20da907d2799.jar:1.14.3] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-rpc-akka_c979f80b-8dd8-49a4-a019-20da907d2799.jar:1.14.3] at akka.actor.Actor.aroundReceive(Actor.scala:537) [flink-rpc-akka_c979f80b-8dd8-49a4-a019-20da907d2799.jar:1.14.3] at akka.actor.Actor.aroundReceive$(Actor.scala:535) [flink-rpc-akka_c979f80b-8dd8-49a4-a019-20da907d2799.jar:1.14.3] at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) [flink-rpc-akka_c979f80b-8dd8-49a4-a019-20da907d2799.jar:1.14.3] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) [flink-rpc-akka_c979f80b-8dd8-49a4-a019-20da907d2799.jar:1.14.3] at akka.actor.ActorCell.invoke(ActorCell.scala:548) [flink-rpc-akka_c979f80b-8dd8-49a4-a019-20da907d2799.jar:1.14.3] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) [flink-rpc-akka_c979f80b-8dd8-49a4-a019-20da907d2799.jar:1.14.3] at akka.dispatch.Mailbox.run(Mailbox.scala:231) [flink-rpc-akka_c979f80b-8dd8-49a4-a019-20da907d2799.jar:1.14.3] at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:545) at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:223) at org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:285) at org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.handleUncaughtExceptionFromAsyncCall(SourceCoordinatorContext.java:298) at org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:42) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to list subscribed topic partitions due to at org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.checkPartitionChanges(KafkaSourceEnumerator.java:237) at org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$null$1(ExecutorNotifier.java:86) at org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40) ... 3 more Caused by: java.lang.RuntimeException: Failed to get metadata for topics [test]. at org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata(KafkaSubscriberUtils.java:47) at org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getSubscribedTopicPartitions(TopicListSubscriber.java:52) at org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.getSubscribedTopicPartitions(KafkaSourceEnumerator.java:222) at org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:83) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) ... 3 more Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=describeTopics, deadlineMs=1667810025114, tries=1, nextAllowedTryMs=-9223372036854775709) timed out at 9223372036854775807 after 1 attempt(s)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。