当前位置:   article > 正文

flink集成hive中遇到的问题以及解决方案_flink连接hive依赖冲突怎么办

flink连接hive依赖冲突怎么办

版本:
1)hadoop:2.7.2
2)hive:2.3.4
3)flink:1.11.2
4)os:centos 7.5

1. 首先集成hadoop

1.1 添加如下依赖,因为会有jar包冲突的问题,注意jar包的 ,以及jar包的 ,否则会报类似 hadoop.config找不到的错误

<dependencies>
    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-hive -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-hive_2.12</artifactId>
        <version>1.11.2</version>
        <scope>provided</scope>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.hive/hive-exec -->
    <dependency>
        <groupId>org.apache.hive</groupId>
        <artifactId>hive-exec</artifactId>
        <version>2.3.4</version>
        <exclusions>
            <exclusion>
                <groupId>org.codehaus.janino</groupId>
                <artifactId>janino</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.codehaus.janino</groupId>
                <artifactId>commons-compiler</artifactId>
            </exclusion>
        </exclusions>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-bridge -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-api-java-bridge_2.12</artifactId>
        <version>1.11.2</version>
        <scope>provided</scope>
    </dependency>

    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>2.8.3</version>
    </dependency>

    </dependencies>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41

1.2 需要在${FLINK_HOME}下添加对应版本的jar包: flink-shaded-hadoop-2-uber-2.7.2-10.0.jar
因为flink从1.11版本后不在自带此jar包,因为需要自己编译,编译过程参考
(1)https://blog.csdn.net/Charlie_jun/article/details/108362658
(2)https://www.bookstack.cn/read/flink-1.11.1-zh/d7dedd0862dd0121.md#providing-hadoop-classes

2. 将hive-site.xml文件放在maven项目的resource目录下。

3. 如下报错 Embedded metastore is not allowed. Make sure you have set a valid value for hive.metastore.uris

Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Embedded metastore is not allowed. Make sure you have set a valid value for hive.metastore.uris
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302) ~[flink-clients_2.11-1.11.0.jar:1.11.0]
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) ~[flink-clients_2.11-1.11.0.jar:1.11.0]
        at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) ~[flink-clients_2.11-1.11.0.jar:1.11.0]
        at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230) ~[flink-clients_2.11-1.11.0.jar:1.11.0]
        ... 10 more
Caused by: java.lang.IllegalArgumentException: Embedded metastore is not allowed. Make sure you have set a valid value for hive.metastore.uris
        at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:139) ~[data-flow-1.0.jar:?]
        at org.apache.flink.table.catalog.hive.HiveCatalog.<init>(HiveCatalog.java:171) ~[flink-sql-connector-hive-2.2.0_2.11-1.11.0.jar:1.11.0]
        at org.apache.flink.table.catalog.hive.HiveCatalog.<init>(HiveCatalog.java:157) ~[flink-sql-connector-hive-2.2.0_2.11-1.11.0.jar:1.11.0]
        at cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveAndHbase$.main(FromKafkaSinkHiveAndHbase.scala:27) ~[data-flow-1.0.jar:?]
        at cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveAndHbase.main(FromKafkaSinkHiveAndHbase.scala) ~[data-flow-1.0.jar:?]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_161]
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_161]
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_161]
        at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_161]
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) ~[flink-clients_2.11-1.11.0.jar:1.11.0]
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) ~[flink-clients_2.11-1.11.0.jar:1.11.0]
        at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) ~[flink-clients_2.11-1.11.0.jar:1.11.0]
        at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230) ~[flink-clients_2.11-1.11.0.jar:1.11.0]
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

原因:Flink 集成 Hive 时,不支持 embedded metastore 的, 需要起一个hive metastore 并在conf文件配置,在server端中,hive.metastore.uris为none,在client端中,hive.metastore.uris要配置相应的值: 如 thrift://myv175.mlamp.cn:9083,先启动 nohup hive --service metastore >> ~/metastore.log 2>&1 & ,后启动 nohup hive --service hiveserver2 >> ~/hiveserver2.log 2>&1 &

参考地址:
(1)https://blog.csdn.net/aA518189/article/details/103622261
(2)https://blog.csdn.net/xianpanjia4616/article/details/107603028
(3)http://apache-flink.147419.n8.nabble.com/flink1-11-sink-hive-error-td4939.html

4. 报错 The main method caused an error: Unable to instantiate java compiler

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Unable to instantiate java compiler
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
    at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
    at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
    at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
    at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
    at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
    at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
    at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
Caused by: java.lang.IllegalStateException: Unable to instantiate java compiler
    at org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.compile(JaninoRelMetadataProvider.java:434)
    at org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.load3(JaninoRelMetadataProvider.java:375)
    at org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.lambda$static$0(JaninoRelMetadataProvider.java:109)
    at org.apache.flink.calcite.shaded.com.google.common.cache.CacheLoader$FunctionToCacheLoader.load(CacheLoader.java:149)
    at org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3542)
    at org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2323)
    at org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2286)
    at org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2201)
    at org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache.get(LocalCache.java:3953)
    at org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3957)
    at org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4875)
    at org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.create(JaninoRelMetadataProvider.java:475)
    at org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.revise(JaninoRelMetadataProvider.java:488)
    at org.apache.calcite.rel.metadata.RelMetadataQuery.revise(RelMetadataQuery.java:193)
    at org.apache.calcite.rel.metadata.RelMetadataQuery.getPulledUpPredicates(RelMetadataQuery.java:797)
    at org.apache.calcite.rel.rules.ReduceExpressionsRule$ProjectReduceExpressionsRule.onMatch(ReduceExpressionsRule.java:298)
    at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319)
    at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:560)
    at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:419)
    at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:256)
    at org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
    at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:215)
    at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:202)
    at org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
    at org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
    at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
    at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
    at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
    at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
    at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
    at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
    at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
    at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)
    at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90)
    at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
    at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
    at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
    at realtime.log.reports.RealtimeUserReport$.active_user_10min(RealtimeUserReport.scala:245)
    at realtime.log.reports.RealtimeUserReport$.main(RealtimeUserReport.scala:163)
    at realtime.log.reports.RealtimeUserReport.main(RealtimeUserReport.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
    ... 11 more
Caused by: java.lang.ClassCastException: org.codehaus.janino.CompilerFactory cannot be cast to org.codehaus.commons.compiler.ICompilerFactory
    at org.codehaus.commons.compiler.CompilerFactoryFactory.getCompilerFactory(CompilerFactoryFactory.java:129)
    at org.codehaus.commons.compiler.CompilerFactoryFactory.getDefaultCompilerFactory(CompilerFactoryFactory.java:79)
    at org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.compile(JaninoRelMetadataProvider.java:432)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70

原因: jar包冲突,注意引入包的

<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-exec</artifactId>
    <version>1.2.1</version>
    <exclusions>
        <exclusion>
            <groupId>org.codehaus.janino</groupId>
            <artifactId>janino</artifactId>
        </exclusion>
        <exclusion>
            <groupId>org.codehaus.janino</groupId>
            <artifactId>commons-compiler</artifactId>
        </exclusion>
    </exclusions>
</dependency>

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

5. 报错 Cannot instantiate user function.

Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:250)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:115)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:370)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: cannot assign instance of org.apache.commons.collections.map.LinkedMap to field org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.pendingOffsetsToCommit of type org.apache.commons.collections.map.LinkedMap in instance of org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
    at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2233)
    at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1405)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2288)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
    at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:235)
    ... 5 more
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

原因:修改:默认设置指示首先从用户代码jar加载类,这意味着用户代码jar可以包含和加载不同于Flink使用的依赖项(传递性地)。在conf/flink-conf.yaml 添加如下内容并重启 flink.

classloader.resolve-order: parent-first
  • 1
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/484117
推荐阅读
相关标签
  

闽ICP备14008679号