当前位置:   article > 正文

Flink实践:Flink1.11连接Hive2.3.6_failed to create hive metastore client

failed to create hive metastore client

1.使用步骤

        下载安装flink。并进行如下配置:

        1、在sql-client-defaults.yaml中添加:

  1. catalogs:
  2. - name: myhive
  3. type: hive
  4. hive-conf-dir: /usr/local/hive/conf
  5. hive-version: 2.3.6

        2、修改hive的hive-site.xml

  1. <configuration>
  2. <property>
  3. <name>hive.metastore.warehouse.dir</name>
  4. <value>/user/hive/warehouse</value>
  5. </property>
  6. <property>
  7. <name>hive.metastore.local</name>
  8. <value>false</value>
  9. </property>
  10. <property>
  11. <name>javax.jdo.option.ConnectionURL</name>
  12. <value>jdbc:mysql://hadoop2:3306/hive?createDatabaseIfNotExist=true&amp;useSSL=false</value>
  13. </property>
  14. <property>
  15. <name>javax.jdo.option.ConnectionDriverName</name>
  16. <value>com.mysql.jdbc.Driver</value>
  17. </property>
  18. <property>
  19. <name>javax.jdo.option.ConnectionUserName</name>
  20. <value>root</value>
  21. </property>
  22. <property>
  23. <name>javax.jdo.option.ConnectionPassword</name>
  24. <value>123456</value>
  25. </property>
  26. <property>
  27. <name>hive.metastore.schema.verification</name>
  28. <value>true</value>
  29. </property>
  30. <property>
  31. <name>hive.metastore.uris</name>
  32. <value>thrift://hadoop2:9083</value>
  33. </property>
  34. </configuration>

        3、 开启hive9083端口

  1. 前台启动:hive --service metastore
  2. 后台启动:nohup hive --service metastore &

        4、启动flink进行验证:sql-client.sh embedded

2.windows下flink读取kafka数据写入hive

        完整代码如下:

  1. def main(args: Array[String]): Unit = {
  2. val env = StreamExecutionEnvironment.getExecutionEnvironment
  3. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  4. // 启用检查点,指定触发checkpoint的时间间隔(单位:毫秒,默认500毫秒),默认情况是不开启的
  5. env.enableCheckpointing(5000L)
  6. // 根据用户指定的数量可以同时触发多个Checkpoint,进而提升Checkpoint整体的效率
  7. env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
  8. // 设定语义模式,默认情况是exactly_once
  9. env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
  10. // 设定Checkpoint超时时间,默认为10分钟
  11. env.getCheckpointConfig.setCheckpointTimeout(60000)
  12. // 设定两个Checkpoint之间的最小时间间隔,防止出现例如状态数据过大而导致Checkpoint执行时间过长,从而导致Checkpoint积压过多,
  13. // 最终Flink应用密切触发Checkpoint操作,会占用了大量计算资源而影响到整个应用的性能(单位:毫秒)
  14. env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
  15. val settings = EnvironmentSettings.newInstance()
  16. .useBlinkPlanner()
  17. .inStreamingMode()
  18. .build()
  19. val tableEnv = StreamTableEnvironment.create(env, settings)
  20. tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE)
  21. tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(60))
  22. // tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
  23. // 构造hive catalog
  24. // Catalog名称
  25. val name = "myhive"
  26. // 默认数据库名称
  27. val defaultDatabase = "testflink"
  28. // hive-site.xml路径
  29. val hiveConfDir = "E:\\workspace\\FlinkDemo11\\FlinkSQL\\src\\main\\resources";
  30. // Hive版本号
  31. val version = "2.3.6";
  32. val hiveCatalog = new HiveCatalog(name, defaultDatabase, hiveConfDir, version)
  33. tableEnv.registerCatalog(name, hiveCatalog)
  34. tableEnv.useCatalog(name)
  35. tableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)
  36. tableEnv.executeSql("show tables")
  37. tableEnv.executeSql(
  38. """
  39. |CREATE TABLE input (
  40. | userId String,
  41. | userName STRING,
  42. | r_t TIMESTAMP(3),
  43. | WATERMARK FOR r_t AS r_t - INTERVAL '5' SECOND
  44. |) WITH (
  45. | 'connector' = 'kafka', -- using kafka connector
  46. | 'topic' = 'flinksource', -- kafka topic
  47. | 'scan.startup.mode' = 'latest-offset', -- reading from the beginning
  48. | 'properties.group.id' = 'group1',
  49. | 'properties.bootstrap.servers' = 'hadoop1:9092,hadoop2:9092,hadoop3:9092', -- kafka broker address
  50. | 'format' = 'json' -- the data format is json
  51. |)
  52. |
  53. """.stripMargin)
  54. tableEnv.executeSql(
  55. """
  56. |CREATE TABLE output_print2(
  57. | userId STRING,
  58. | userName STRING,
  59. | dt String,
  60. | hr String,
  61. | mm String
  62. |) WITH (
  63. | 'connector' = 'print'
  64. |)
  65. """.stripMargin)
  66. val insertSql1 = "insert into output_print2 SELECT userId, userName, " +
  67. "DATE_FORMAT(r_t, 'yyyy-MM-dd') as dt," +
  68. "DATE_FORMAT(r_t, 'HH') as hr," +
  69. "DATE_FORMAT(r_t, 'mm') as mm " +
  70. "FROM input"
  71. tableEnv.executeSql(insertSql1)
  72. tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
  73. tableEnv.executeSql(
  74. """
  75. |CREATE TABLE output(
  76. | userId STRING,
  77. | userName STRING,
  78. | dt String,
  79. | hr String
  80. |) PARTITIONED BY (dt string,hr string) TBLPROPERTIES (
  81. | 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
  82. | 'sink.partition-commit.trigger'='partition-time',
  83. | 'sink.partition-commit.delay'='0s',
  84. | 'sink.partition-commit.policy.kind'='metastore,success-file',
  85. | 'partition.time-extractor.kind'='custom',
  86. | 'partition.time-extractor.class'='MyPartTimeExtractor'
  87. |)
  88. """.stripMargin)
  89. val insertSql = "insert into output SELECT userId, userName, " +
  90. "DATE_FORMAT(r_t, 'yyyy-MM-dd') as dt," +
  91. "DATE_FORMAT(r_t, 'HH') as hr " +
  92. "FROM input"
  93. tableEnv.executeSql(insertSql)
  94. }

需要注意以下几点:

1、new HiveCatalog(name, defaultDatabase, hiveConfDir, version)

        name:为catalog的名称,即我们在配置文件sql-client-defaults.yaml中设置的catalog name。

        defaultDatabase:使用的hive中的数据库的名称。

        hiveConfDir:hive-site.xml的地址。

        version:hive的版本号

2、当我们需要连接kafka时,使用

tableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)

       连接hive时,使用

tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)

        不然会显示语法错误。

3、hive表的参数解释:

  • partition.time-extractor.timestamp-pattern:配置hour级别的partition时间抽取策略
  • sink.partition-commit.trigger:使用partition中抽取时间,加上watermark决定partiton commit的时机
  • sink.partition-commit.delay:配置dalay为小时级,当 watermark > partition时间 + 1小时,会commit这个partition
  • sink.partition-commit.policy.kind:partitiion commit的策略是:先更新metastore(addPartition),再写SUCCESS文件​​​​​​​​​​​​​​

 4、如果数据有写入,但是在hive中查询不到数据,有可能是没有开启checkpoint。

3.记录在实践过程中出现的不少问题

1、在maven项目一种引入hive-exec2.3.6的包,会出现如下问题:

Could not find artifact org.pentaho:pentaho-aggdesigner-algorithm:jar:5.1.5-jhyde的解决方法

        需要在maven的setting.xml中添加如下镜像:

  1. </mirror>
  2. <mirror>
  3. <id>aliyunmaven</id>
  4. <mirrorOf>*</mirrorOf>
  5. <name>spring-plugin</name>
  6. <url>https://maven.aliyun.com/repository/spring-plugin</url>
  7. </mirror>
  8. <mirror>
  9. <id>repo2</id>
  10. <name>Mirror from Maven Repo2</name>
  11. <url>https://repo.spring.io/plugins-release/</url>
  12. <mirrorOf>central</mirrorOf>
  13. </mirror>

2、Error: java: 无法访问org.apache.hadoop.mapred.JobConf 找不到org.apache.hadoop.mapred.JobConf的类文件

        需要添加如下依赖包:

  1. <dependency>
  2. <groupId>org.apache.hadoop</groupId>
  3. <artifactId>hadoop-mapreduce-client-core</artifactId>
  4. <version>${hadoop.version}</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.hadoop</groupId>
  8. <artifactId>hadoop-common</artifactId>
  9. <version>${hadoop.version}</version>
  10. </dependency>
  11. <dependency>
  12. <groupId>org.apache.hadoop</groupId>
  13. <artifactId>hadoop-mapreduce-client-common</artifactId>
  14. <version>${hadoop.version}</version>as
  15. </dependency>
  16. <dependency>
  17. <groupId>org.apache.hadoop</groupId>
  18. <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
  19. <version>${hadoop.version}</version>
  20. </dependency>

3、Make sure you have set a valid value for hive.metastore.uris

需要在hive-site.xml中添加:

  1. <property>
  2. <name>hive.metastore.local</name>
  3. <value>false</value>
  4. </property>
  5. <property>
  6. <name>hive.metastore.uris</name>
  7. <value>thrift://hadoop2:9083</value>
  8. </property>

4、Exception in thread "main" org.apache.flink.table.catalog.exceptions.CatalogException: Failed to create Hive Metastore client

        需要开启hive9083端口:

  1. 前台启动:hive --service metastore
  2. 后台启动:nohup hive --service metastore &

5、如下java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration错误,具体如下:

  1. Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue.
  2. at org.apache.flink.table.client.SqlClient.main(SqlClient.java:213)
  3. Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Could not create execution context.
  4. at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:870)
  5. at org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:227)
  6. at org.apache.flink.table.client.SqlClient.start(SqlClient.java:108)
  7. at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201)
  8. Caused by: java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration
  9. at org.apache.flink.table.catalog.hive.factories.HiveCatalogFactory.createCatalog(HiveCatalogFactory.java:84)
  10. at org.apache.flink.table.client.gateway.local.ExecutionContext.createCatalog(ExecutionContext.java:378)
  11. at org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$null$5(ExecutionContext.java:626)
  12. at java.util.HashMap.forEach(HashMap.java:1289)
  13. at org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$6(ExecutionContext.java:625)
  14. at org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:264)
  15. at org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:624)
  16. at org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:523)
  17. at org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:183)
  18. at org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:136)
  19. at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:859)
  20. ... 3 more
  21. Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.conf.Configuration
  22. at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
  23. at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
  24. at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:338)
  25. at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
  26. ... 14 more

        解决方案:

        flink 1.11开始官方不再维护flink-shaded-hadoop-2-uber jar,可以使用之前版本的shade,社区建议导入hadoop classpath,执行这行代码,

export HADOOP_CLASSPATH=`hadoop classpath`

         如果是提交到fink集群出现此问题。则需要从maven工厂下载 flink-shaded-hadoop-2-uber-2.7.5-10.0.jar放在lib目录下并重启集群。

6、Table options do not contain an option key 'connector' for discovering a connector.

        当执行insert语句时会出现如上错误,有可能是没有在.yaml里面配置好hive相关的选项,也有可能是表示在Flink SQL中建的,而非是在hive中建的。需要在hive中建表。

7、Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function。具体如下:

  1. Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.
  2. at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:275)
  3. at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:471)
  4. at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:393)
  5. at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:459)
  6. at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:393)
  7. at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:155)
  8. at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:459)
  9. at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
  10. at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
  11. at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
  12. at java.lang.Thread.run(Thread.java:748)
  13. Caused by: java.io.StreamCorruptedException: unexpected block data
  14. at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1581)
  15. at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)
  16. at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2158)
  17. at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
  18. at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
  19. at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)
  20. at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2158)
  21. at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
  22. at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
  23. at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)
  24. at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2158)
  25. at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
  26. at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
  27. at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)
  28. at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2158)
  29. at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
  30. at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
  31. at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)
  32. at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202)
  33. at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
  34. at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
  35. at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)
  36. at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202)
  37. at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
  38. at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
  39. at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427)
  40. at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
  41. at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
  42. at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
  43. at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
  44. at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:260)
  45. ... 10 more

        解决方案:

        flink-conf.yaml中添加如下配置,并重启集群:

classloader.resolve-order: parent-first

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop】
推荐阅读
相关标签
  

闽ICP备14008679号