赞
踩
Flink从1.9开始支持集成Hive,不过1.9版本为beta版,不推荐在生产环境中使用。在最新版Flink1.10版本,标志着对 Blink的整合宣告完成,达到了对 Hive 的生产级别集成,Hive作为数据仓库系统的绝对核心,承担着绝大多数的离线数据ETL计算和数据管理,期待Flink未来对Hive的完美支持。
而 HiveCatalog 会与一个 Hive Metastore 的实例连接,提供元数据持久化的能力。要使用 Flink 与 Hive 进行交互,用户需要配置一个 HiveCatalog,并通过 HiveCatalog 访问 Hive 中的元数据。
要与Hive集成,需要在Flink的lib目录下添加额外的依赖jar包,以使集成在Table API程序或SQL Client中的SQL中起作用。或者,可以将这些依赖项放在文件夹中,并分别使用Table API程序或SQL Client 的-C
或-l
选项将它们添加到classpath中。本文使用第一种方式,即将jar包直接复制到$FLINK_HOME/lib目录下。本文使用的Hive版本为2.3.4(对于不同版本的Hive,可以参照官网选择不同的jar包依赖),总共需要3个jar包,如下:
flink-connector-hive_2.11-1.10.0.jar
flink-shaded-hadoop-2-uber-2.7.5-8.0.jar
hive-exec-2.3.4.jar
其中hive-exec-2.3.4.jar在hive的lib文件夹下,另外两个需要自行下载,下载地址:flink-connector-hive_2.11-1.10.0.jar
[https://repo1.maven.org/maven2/org/apache/flink/flink-connector-hive_2.11/1.10.0/]
flink-shaded-hadoop-2-uber-2.7.5-8.0.jar
[https://maven.aliyun.com/mvn/search]
切莫拔剑四顾心茫然,话不多说,直接上代码。
<dependency> <groupId>org.apache.flinkgroupId> <artifactId>flink-connector-hive_2.11artifactId> <version>1.10.0version> <scope>providedscope>dependency><dependency> <groupId>org.apache.flinkgroupId> <artifactId>flink-table-api-java-bridge_2.11artifactId> <version>1.10.0version> <scope>providedscope>dependency><dependency> <groupId>org.apache.hivegroupId> <artifactId>hive-execartifactId> <version>${hive.version}version> <scope>providedscope>dependency>
package com.flink.sql.hiveintegration;import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.TableEnvironment;import org.apache.flink.table.catalog.hive.HiveCatalog;/** * @Created with IntelliJ IDEA. * @author : jmx * @Date: 2020/3/31 * @Time: 13:22 * */public class FlinkHiveIntegration { public static void main(String[] args) throws Exception { EnvironmentSettings settings = EnvironmentSettings .newInstance() .useBlinkPlanner() // 使用BlinkPlanner .inBatchMode() // Batch模式,默认为StreamingMode .build(); //使用StreamingMode /* EnvironmentSettings settings = EnvironmentSettings .newInstance() .useBlinkPlanner() // 使用BlinkPlanner .inStreamingMode() // StreamingMode .build();*/ TableEnvironment tableEnv = TableEnvironment.create(settings); String name = "myhive"; // Catalog名称,定义一个唯一的名称表示 String defaultDatabase = "qfbap_ods"; // 默认数据库名称 String hiveConfDir = "/opt/modules/apache-hive-2.3.4-bin/conf"; // hive-site.xml路径 String version = "2.3.4"; // Hive版本号 HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version); tableEnv.registerCatalog("myhive", hive); tableEnv.useCatalog("myhive"); // 创建数据库,目前不支持创建hive表 String createDbSql = "CREATE DATABASE IF NOT EXISTS myhive.test123"; tableEnv.sqlUpdate(createDbSql); }}
Flink的表和SQL API可以处理用SQL语言编写的查询,但是这些查询需要嵌入到用Java或Scala编写的程序中。此外,这些程序在提交到集群之前需要与构建工具打包。这或多或少地限制了Java/Scala程序员对Flink的使用。
SQL客户端旨在提供一种简单的方式,无需一行Java或Scala代码,即可将表程序编写、调试和提交到Flink集群。Flink SQL客户端CLI允许通过命令行的形式运行分布式程序。使用Flink SQL cli访问Hive,需要配置sql-client-defaults.yaml文件。
目前 HiveTableSink 不支持流式写入(未实现 AppendStreamTableSink)。需要将执行模式改成 batch
模式,否则会报如下错误:
org.apache.flink.table.api.TableException: Stream Tables can only be emitted by AppendStreamTableSink, RetractStreamTableSink, or UpsertStreamTableSink.
需要修改的配置内容如下:
#...省略的配置项...#==============================================================================# Catalogs#==============================================================================# 配置catalogs,可以配置多个.catalogs: # empty list - name: myhive type: hive hive-conf-dir: /opt/modules/apache-hive-2.3.4-bin/conf hive-version: 2.3.4 default-database: qfbap_ods#...省略的配置项...#==============================================================================# Execution properties#==============================================================================# Properties that change the fundamental execution behavior of a table program.execution: # select the implementation responsible for planning table programs # possible values are 'blink' (used by default) or 'old' planner: blink # 'batch' or 'streaming' execution type: batch
bin/sql-client.sh embedded
在启动之前,确保Hive的metastore已经开启了,否则会报Failed to create Hive Metastore client异常。启动成功,如下图:
启动之后,就可以在此Cli下执行SQL命令访问Hive的表了,基本的操作如下:
-- 命令行帮助Flink SQL> help-- 查看当前会话的catalog,其中myhive为自己配置的,default_catalog为默认的Flink SQL> show catalogs;default_catalogmyhive-- 使用catalogFlink SQL> use catalog myhive;-- 查看当前catalog的数据库Flink SQL> show databases;-- 创建数据库Flink SQL> create database testdb;-- 删除数据库Flink SQL> drop database testdb;-- 创建表Flink SQL> create table tbl(id int,name string);-- 删除表Flink SQL> drop table tbl;-- 查询表Flink SQL> select * from code_city;-- 插入数据Flink SQL> insert overwrite code_city select id,city,province,event_time from code_city_delta ;Flink SQL> INSERT into code_city values(1,'南京','江苏','');
本文以最新版本的Flink为例,对Flink集成Hive进行了实操。首先通过代码的方式与Hive进行集成,然后介绍了如何使用Flink SQL 客户端访问Hive,并对其中会遇到的坑进行了描述,最后给出了Flink SQL Cli的详细使用。相信在未来的版本中Flink SQL会越来越完善,期待Flink未来对Hive的完美支持。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。