赞
踩
下载安装flink。并进行如下配置:
1、在sql-client-defaults.yaml中添加:
- catalogs:
- - name: myhive
- type: hive
- hive-conf-dir: /usr/local/hive/conf
- hive-version: 2.3.6
2、修改hive的hive-site.xml
- <configuration>
- <property>
- <name>hive.metastore.warehouse.dir</name>
- <value>/user/hive/warehouse</value>
- </property>
- <property>
- <name>hive.metastore.local</name>
- <value>false</value>
- </property>
- <property>
- <name>javax.jdo.option.ConnectionURL</name>
- <value>jdbc:mysql://hadoop2:3306/hive?createDatabaseIfNotExist=true&useSSL=false</value>
- </property>
- <property>
- <name>javax.jdo.option.ConnectionDriverName</name>
- <value>com.mysql.jdbc.Driver</value>
- </property>
- <property>
- <name>javax.jdo.option.ConnectionUserName</name>
- <value>root</value>
- </property>
- <property>
- <name>javax.jdo.option.ConnectionPassword</name>
- <value>123456</value>
- </property>
- <property>
- <name>hive.metastore.schema.verification</name>
- <value>true</value>
- </property>
- <property>
- <name>hive.metastore.uris</name>
- <value>thrift://hadoop2:9083</value>
- </property>
- </configuration>
3、 开启hive9083端口
- 前台启动:hive --service metastore
- 后台启动:nohup hive --service metastore &
4、启动flink进行验证:sql-client.sh embedded
完整代码如下:
- def main(args: Array[String]): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- // 启用检查点,指定触发checkpoint的时间间隔(单位:毫秒,默认500毫秒),默认情况是不开启的
- env.enableCheckpointing(5000L)
- // 根据用户指定的数量可以同时触发多个Checkpoint,进而提升Checkpoint整体的效率
- env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
- // 设定语义模式,默认情况是exactly_once
- env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
- // 设定Checkpoint超时时间,默认为10分钟
- env.getCheckpointConfig.setCheckpointTimeout(60000)
- // 设定两个Checkpoint之间的最小时间间隔,防止出现例如状态数据过大而导致Checkpoint执行时间过长,从而导致Checkpoint积压过多,
- // 最终Flink应用密切触发Checkpoint操作,会占用了大量计算资源而影响到整个应用的性能(单位:毫秒)
- env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
-
- val settings = EnvironmentSettings.newInstance()
- .useBlinkPlanner()
- .inStreamingMode()
- .build()
- val tableEnv = StreamTableEnvironment.create(env, settings)
-
- tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE)
- tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(60))
- // tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
-
- // 构造hive catalog
- // Catalog名称
- val name = "myhive"
- // 默认数据库名称
- val defaultDatabase = "testflink"
- // hive-site.xml路径
- val hiveConfDir = "E:\\workspace\\FlinkDemo11\\FlinkSQL\\src\\main\\resources";
- // Hive版本号
- val version = "2.3.6";
- val hiveCatalog = new HiveCatalog(name, defaultDatabase, hiveConfDir, version)
- tableEnv.registerCatalog(name, hiveCatalog)
- tableEnv.useCatalog(name)
- tableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)
- tableEnv.executeSql("show tables")
- tableEnv.executeSql(
- """
- |CREATE TABLE input (
- | userId String,
- | userName STRING,
- | r_t TIMESTAMP(3),
- | WATERMARK FOR r_t AS r_t - INTERVAL '5' SECOND
- |) WITH (
- | 'connector' = 'kafka', -- using kafka connector
- | 'topic' = 'flinksource', -- kafka topic
- | 'scan.startup.mode' = 'latest-offset', -- reading from the beginning
- | 'properties.group.id' = 'group1',
- | 'properties.bootstrap.servers' = 'hadoop1:9092,hadoop2:9092,hadoop3:9092', -- kafka broker address
- | 'format' = 'json' -- the data format is json
- |)
- |
- """.stripMargin)
-
-
- tableEnv.executeSql(
- """
- |CREATE TABLE output_print2(
- | userId STRING,
- | userName STRING,
- | dt String,
- | hr String,
- | mm String
- |) WITH (
- | 'connector' = 'print'
- |)
- """.stripMargin)
- val insertSql1 = "insert into output_print2 SELECT userId, userName, " +
- "DATE_FORMAT(r_t, 'yyyy-MM-dd') as dt," +
- "DATE_FORMAT(r_t, 'HH') as hr," +
- "DATE_FORMAT(r_t, 'mm') as mm " +
- "FROM input"
- tableEnv.executeSql(insertSql1)
-
-
- tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
- tableEnv.executeSql(
- """
- |CREATE TABLE output(
- | userId STRING,
- | userName STRING,
- | dt String,
- | hr String
- |) PARTITIONED BY (dt string,hr string) TBLPROPERTIES (
- | 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
- | 'sink.partition-commit.trigger'='partition-time',
- | 'sink.partition-commit.delay'='0s',
- | 'sink.partition-commit.policy.kind'='metastore,success-file',
- | 'partition.time-extractor.kind'='custom',
- | 'partition.time-extractor.class'='MyPartTimeExtractor'
- |)
- """.stripMargin)
-
- val insertSql = "insert into output SELECT userId, userName, " +
- "DATE_FORMAT(r_t, 'yyyy-MM-dd') as dt," +
- "DATE_FORMAT(r_t, 'HH') as hr " +
- "FROM input"
- tableEnv.executeSql(insertSql)
- }
需要注意以下几点:
1、new HiveCatalog(name, defaultDatabase, hiveConfDir, version)
name:为catalog的名称,即我们在配置文件sql-client-defaults.yaml中设置的catalog name。
defaultDatabase:使用的hive中的数据库的名称。
hiveConfDir:hive-site.xml的地址。
version:hive的版本号
2、当我们需要连接kafka时,使用
tableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)
连接hive时,使用
tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
不然会显示语法错误。
3、hive表的参数解释:
partition.time-extractor.timestamp-pattern:配置hour级别的partition时间抽取策略
sink.partition-commit.trigger:使用partition中抽取时间,加上watermark决定partiton commit的时机
sink.partition-commit.delay:配置dalay为小时级,当 watermark > partition时间 + 1小时,会commit这个partition
sink.partition-commit.policy.kind:partitiion commit的策略是:先更新metastore(addPartition),再写SUCCESS文件
4、如果数据有写入,但是在hive中查询不到数据,有可能是没有开启checkpoint。
1、在maven项目一种引入hive-exec2.3.6的包,会出现如下问题:
Could not find artifact org.pentaho:pentaho-aggdesigner-algorithm:jar:5.1.5-jhyde的解决方法
需要在maven的setting.xml中添加如下镜像:
- </mirror>
- <mirror>
- <id>aliyunmaven</id>
- <mirrorOf>*</mirrorOf>
- <name>spring-plugin</name>
- <url>https://maven.aliyun.com/repository/spring-plugin</url>
- </mirror>
-
- <mirror>
- <id>repo2</id>
- <name>Mirror from Maven Repo2</name>
- <url>https://repo.spring.io/plugins-release/</url>
- <mirrorOf>central</mirrorOf>
- </mirror>
2、Error: java: 无法访问org.apache.hadoop.mapred.JobConf 找不到org.apache.hadoop.mapred.JobConf的类文件
需要添加如下依赖包:
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-mapreduce-client-core</artifactId>
- <version>${hadoop.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <version>${hadoop.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-mapreduce-client-common</artifactId>
- <version>${hadoop.version}</version>as
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
- <version>${hadoop.version}</version>
- </dependency>
3、Make sure you have set a valid value for hive.metastore.uris
需要在hive-site.xml中添加:
- <property>
- <name>hive.metastore.local</name>
- <value>false</value>
- </property>
-
- <property>
- <name>hive.metastore.uris</name>
- <value>thrift://hadoop2:9083</value>
- </property>
4、Exception in thread "main" org.apache.flink.table.catalog.exceptions.CatalogException: Failed to create Hive Metastore client
需要开启hive9083端口:
- 前台启动:hive --service metastore
- 后台启动:nohup hive --service metastore &
5、如下java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration错误,具体如下:
- Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue.
- at org.apache.flink.table.client.SqlClient.main(SqlClient.java:213)
- Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Could not create execution context.
- at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:870)
- at org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:227)
- at org.apache.flink.table.client.SqlClient.start(SqlClient.java:108)
- at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201)
- Caused by: java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration
- at org.apache.flink.table.catalog.hive.factories.HiveCatalogFactory.createCatalog(HiveCatalogFactory.java:84)
- at org.apache.flink.table.client.gateway.local.ExecutionContext.createCatalog(ExecutionContext.java:378)
- at org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$null$5(ExecutionContext.java:626)
- at java.util.HashMap.forEach(HashMap.java:1289)
- at org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$6(ExecutionContext.java:625)
- at org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:264)
- at org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:624)
- at org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:523)
- at org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:183)
- at org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:136)
- at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:859)
- ... 3 more
- Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.conf.Configuration
- at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
- at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
- at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:338)
- at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
- ... 14 more
解决方案:
flink 1.11开始官方不再维护flink-shaded-hadoop-2-uber jar,可以使用之前版本的shade,社区建议导入hadoop classpath,执行这行代码,
export HADOOP_CLASSPATH=`hadoop classpath`
如果是提交到fink集群出现此问题。则需要从maven工厂下载 flink-shaded-hadoop-2-uber-2.7.5-10.0.jar放在lib目录下并重启集群。
6、Table options do not contain an option key 'connector' for discovering a connector.
当执行insert语句时会出现如上错误,有可能是没有在.yaml里面配置好hive相关的选项,也有可能是表示在Flink SQL中建的,而非是在hive中建的。需要在hive中建表。
7、Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function。具体如下:
- Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.
- at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:275)
- at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:471)
- at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:393)
- at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:459)
- at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:393)
- at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:155)
- at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:459)
- at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
- at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
- at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
- at java.lang.Thread.run(Thread.java:748)
- Caused by: java.io.StreamCorruptedException: unexpected block data
- at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1581)
- at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)
- at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2158)
- at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
- at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
- at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)
- at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2158)
- at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
- at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
- at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)
- at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2158)
- at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
- at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
- at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)
- at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2158)
- at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
- at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
- at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)
- at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202)
- at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
- at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
- at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)
- at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202)
- at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
- at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
- at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427)
- at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
- at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
- at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
- at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
- at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:260)
- ... 10 more
解决方案:
flink-conf.yaml中添加如下配置,并重启集群:
classloader.resolve-order: parent-first
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。