当前位置:   article > 正文

Paimon Flink本地读取数据报错

Paimon Flink本地读取数据报错

1.idea本地读取paimon

用idea在本地读取paimon的表时需要添加的依赖除了官网提出的和hadoop相关的,paimon-flink之类相关的除外还需要其他额外依赖

  1. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  2. import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
  3. public class Test {
  4. public static void main(String[] args) {
  5. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  6. StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
  7. tableEnv.executeSql(
  8. "CREATE CATALOG my_jdbc WITH (\n" +
  9. "'type' = 'paimon',\n" +
  10. "'metastore' = 'jdbc',\n" +
  11. "'uri' = 'jdbc:mysql://test3:3306/paimon',\n" +
  12. "'jdbc.user' = 'root',\n" +
  13. "'jdbc.password' = 'N^57F9m2RI#rp8',\n" +
  14. "'catalog-key'='jdbc',\n" +
  15. "'warehouse' = 'hdfs://test1:8020/user/paimon'\n" +
  16. " )");
  17. tableEnv.executeSql("use catalog my_jdbc");
  18. tableEnv.sqlQuery("select * from test").execute().print();
  19. }
  20. }

以上是我的代码,用的jdbc catalog读取hadoop上的数据,下面是最开始的依赖:

  1. <dependency>
  2. <groupId>org.apache.paimon</groupId>
  3. <artifactId>paimon-flink-1.18</artifactId>
  4. <version>0.8.0</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.hadoop</groupId>
  8. <artifactId>hadoop-client</artifactId>
  9. <version>3.3.6</version>
  10. </dependency>
  11. <dependency>
  12. <groupId>mysql</groupId>
  13. <artifactId>mysql-connector-java</artifactId>
  14. <version>8.0.28</version>
  15. </dependency>
  16. <dependency>
  17. <groupId>org.apache.flink</groupId>
  18. <artifactId>flink-core</artifactId>
  19. <version>1.18.0</version>
  20. </dependency>
  21. <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner -->
  22. <dependency>
  23. <groupId>org.apache.flink</groupId>
  24. <artifactId>flink-table-planner_2.12</artifactId>
  25. <version>1.18.0</version>
  26. </dependency>
  27. <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java -->
  28. <dependency>
  29. <groupId>org.apache.flink</groupId>
  30. <artifactId>flink-table-api-java</artifactId>
  31. <version>1.18.0</version>
  32. </dependency>
  33. <dependency>
  34. <groupId>org.apache.flink</groupId>
  35. <artifactId>flink-clients</artifactId>
  36. <version>1.18.0</version>
  37. </dependency>

2. 报错1

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/connector/base/source/hybrid/HybridSource$SourceFactory

  1. Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/connector/base/source/hybrid/HybridSource$SourceFactory
  2. at org.apache.paimon.flink.source.DataTableSource.getScanRuntimeProvider(DataTableSource.java:192)
  3. at org.apache.paimon.flink.source.table.BaseTableSource.getScanRuntimeProvider(BaseTableSource.java:42)
  4. at org.apache.flink.table.planner.connectors.DynamicSourceUtils.validateScanSource(DynamicSourceUtils.java:478)
  5. at org.apache.flink.table.planner.connectors.DynamicSourceUtils.prepareDynamicSource(DynamicSourceUtils.java:161)
  6. at org.apache.flink.table.planner.connectors.DynamicSourceUtils.convertSourceToRel(DynamicSourceUtils.java:125)
  7. at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:118)
  8. at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3997)
  9. at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2867)
  10. at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2427)
  11. at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2341)
  12. at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2286)
  13. at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:723)
  14. at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:709)
  15. at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3843)
  16. at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:617)
  17. at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:229)
  18. at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:205)
  19. at org.apache.flink.table.planner.operations.SqlNodeConvertContext.toRelRoot(SqlNodeConvertContext.java:69)
  20. at org.apache.flink.table.planner.operations.converters.SqlQueryConverter.convertSqlNode(SqlQueryConverter.java:48)
  21. at org.apache.flink.table.planner.operations.converters.SqlNodeConverters.convertSqlNode(SqlNodeConverters.java:73)
  22. at org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convertValidatedSqlNode(SqlNodeToOperationConversion.java:272)
  23. at org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convert(SqlNodeToOperationConversion.java:262)
  24. at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
  25. at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:708)
  26. at kafka.Test.main(Test.java:25)
  27. Caused by: java.lang.ClassNotFoundException: org.apache.flink.connector.base.source.hybrid.HybridSource$SourceFactory
  28. at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
  29. at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
  30. at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
  31. at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
  32. ... 25 more

从报错大概可以看出需要这个 org/apache/flink/connector/base路径底下的source/hybrid/HybridSource$SourceFactory类,查了一下这个类在flink-connector-base类里面,所以添加上这个依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-base</artifactId>
    <version>1.18.0</version>
</dependency>

3.报错2

加上了上面的依赖继续跑然后又报错了:NoClassDefFoundError: org/apache/flink/connector/file/src/reader/BulkFormat$RecordIterator

  1. Exception in thread "main" java.lang.RuntimeException: Failed to fetch next result
  2. at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109)
  3. at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
  4. at org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:222)
  5. at org.apache.flink.table.utils.print.TableauStyle.print(TableauStyle.java:120)
  6. at org.apache.flink.table.api.internal.TableResultImpl.print(TableResultImpl.java:153)
  7. at kafka.Test.main(Test.java:25)
  8. Caused by: java.io.IOException: Failed to fetch job execution result
  9. at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:185)
  10. at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:121)
  11. at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
  12. ... 5 more
  13. Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
  14. at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
  15. at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
  16. at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:183)
  17. ... 7 more
  18. Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
  19. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
  20. at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
  21. at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
  22. at java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:628)
  23. at java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1996)
  24. at org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobExecutionResult(MiniClusterJobClient.java:138)
  25. at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:182)
  26. ... 7 more
  27. Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
  28. at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:176)
  29. at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:107)
  30. at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:285)
  31. at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:276)
  32. at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:269)
  33. at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:764)
  34. at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:741)
  35. at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83)
  36. at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:488)
  37. at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  38. at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  39. at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  40. at java.lang.reflect.Method.invoke(Method.java:498)
  41. at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:309)
  42. at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
  43. at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:307)
  44. at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:222)
  45. at org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)
  46. at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
  47. at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
  48. at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
  49. at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
  50. at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
  51. at org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
  52. at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
  53. at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
  54. at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
  55. at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
  56. at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
  57. at org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
  58. at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
  59. at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
  60. at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)
  61. at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241)
  62. at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253)
  63. at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
  64. at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
  65. at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
  66. at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
  67. Caused by: java.lang.BootstrapMethodError: java.lang.NoClassDefFoundError: org/apache/flink/connector/file/src/reader/BulkFormat$RecordIterator
  68. at org.apache.paimon.flink.source.FileStoreSourceReader.<init>(FileStoreSourceReader.java:55)
  69. at org.apache.paimon.flink.source.FlinkSource.createReader(FlinkSource.java:59)
  70. at org.apache.flink.streaming.api.operators.SourceOperator.initReader(SourceOperator.java:314)
  71. at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.init(SourceOperatorStreamTask.java:93)
  72. at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:718)
  73. at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:693)
  74. at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
  75. at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:922)
  76. at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
  77. at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
  78. at java.lang.Thread.run(Thread.java:750)
  79. Caused by: java.lang.NoClassDefFoundError: org/apache/flink/connector/file/src/reader/BulkFormat$RecordIterator
  80. ... 11 more
  81. Caused by: java.lang.ClassNotFoundException: org.apache.flink.connector.file.src.reader.BulkFormat$RecordIterator
  82. at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
  83. at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
  84. at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
  85. at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
  86. ... 11 more

从上面大概可以看出应该是缺flink-connector-files依赖,添加上去之后成功了

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-files</artifactId>
    <version>1.18-SNAPSHOT</version>
</dependency>

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/代码探险家/article/detail/1010890
推荐阅读
相关标签
  

闽ICP备14008679号