赞
踩
1 环境准备
准备大数据集群 .安装HDFS ,HIVE,SAPRK ,FLINK
下载运行集群环境运行是需要的jar包
下载地址:http://iceberg.apache.org/releases/
2 spark将读写iceberg表中数据
准备spark集群, 并配置iceberg环境
在spark的配置目录中添加hadoop和hive的配置文件 方便spark与hadoop和hive整合
在spark的jars包中添加下载好的iceberg的jar包
2.1 catalog为hadoop
2.1.1 sparksql操作
spark-sql --packages org.apache.iceberg:iceberg-spark3-runtime:0.10.0 \
--conf spark.sql.catalog.hadoop_prod=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.hadoop_prod.type=hadoop \
--conf spark.sql.catalog.hadoop_prod.warehouse=hdfs://linux01:8020/doit/iceberg/warehouse
创建表
-- 使用系统默认的数据源 会将表生成在默认的本地文件夹中
spark-sql> create table tb_test1(id int ,name string) using iceberg ;
-- 切换启动SQL脚本是自己设置的数据源hadoop_prod
use hadoop_prod.default ;
-- 创建表
create table tb_test1(id int ,name string) using iceberg ;
查看表的位置在HDFS上
插入入数据到iceberg表中
insert into tb_test1 values(1,'马云'),(2,'马蓉'),(3,'马保国') ;
select * from tb_test1 ;
1 马云
2 马蓉
3 马保国
2.1.2 sparkshell操作
-- 登录shell客户端
spark-shell --packages org.apache.iceberg:iceberg-spark3-runtime:0.10.0 \
--conf spark.sql.catalog.hadoop_prod=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.hadoop_prod.type=hadoop \
--conf spark.sql.catalog.hadoop_prod.warehouse=hdfs://linux01:8020/doit/iceberg/warehouse
直接读取上面在sparksql中创建的表中的数据
scala> spark.read.format("iceberg").load("hdfs://linux01:8020//doit/iceberg/warehouse/default/tb_test1").show
+---+------+
| id| name|
+---+------+
| 1| 马云|
| 2| 马蓉|
| 3|马保国|
+---+------+
使用sparkAPI建表和插入数据在后面的API中在详细介绍
2.1.3 IDEA spark项目操作
使用IDEA创建maven项目 , 添加依赖
pom.xml文件
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<scala.version>2.12.12</scala.version>
<spark.version>3.0.0</spark.version>
<hadoop.version>3.1.1</hadoop.version>
<encoding>UTF-8</encoding>
</properties>
<dependencies>
<!-- 导入scala的依赖 -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!-- 导入spark的依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<!--JDBC驱动包-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.48</version>
</dependency>
<!--hive-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.62</version>
</dependency>
<dependency>
<groupId>c3p0</groupId>
<artifactId>c3p0</artifactId>
<version>0.9.1.2</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-core</artifactId>
<version>0.10.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-spark3-runtime -->
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-spark3-runtime</artifactId>
<version>0.10.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.avro/avro -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.9.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 指定编译java的插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
</plugin>
<!-- 指定编译scala的插件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
val spark: SparkSession = SparkSession
.builder()
.config("spark.sql.catalog.hadoop_prod.type", "hadoop") // 设置数据源类别为hadoop
.config("spark.sql.catalog.hadoop_prod", classOf[SparkCatalog].getName)
.config("spark.sql.catalog.hadoop_prod.warehouse", "hdfs://linux01:8020//doit/iceberg/warehouse/") // 设置数据源位置
.appName(this.getClass.getSimpleName)
.master("local[*]")
.getOrCreate()
// 获取表结构信息
val df = spark.table("hadoop_prod.default.tb_test1")
df.printSchema()
// 读取指定表下的数据
//spark.read.format("iceberg").load("/doit/iceberg/warehouse/default/tb_test1").show()
//3372567346381641315
/**
* select snapshot_id from hadoop_prod.default.tb_test1.snapshots ;
* select * from hadoop_prod.default.tb_test1.snapshots ;
*/
// 读取指定快照下的数据
spark.read.option("snapshot-id", 3372567346381641315l).format("iceberg").load("/doit/iceberg/warehouse/default/tb_test1").show
2.2 catalog为hive
spark可以使用sparkshell , sparksql 和idea中创建iceberg表 ,在hive中使用iceberg支持以后可以再hive中对数据进行分析处理 , 但是不能对数据进行修改和创建表等操作 , 也就是说暂且还不支持写操作 !
2.2.1 sparksql操作
spark-sql --packages org.apache.iceberg:iceberg-spark3-runtime:0.10.0 --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf spark.sql.catalog.spark_catalog.type=hive
-- 在hive中创建iceberg的数据库
create database spark_catalog.hang ;
-- 切换数据源数据库
use spark_catalog.hang ;
-- 创建iceberg表
create table tb_hang(id int , name string) using iceberg ;
在hive的工作目录中查看 出现如下目录 :
向表中插入数据 -----> 在hive客户端查询数据
-- 开启hive对iceberg的支持
SET iceberg.mr.catalog=hive;
set iceberg.engine.hive.enabled=true ;
-- 在hive端查看当前的数据库
show database ;
+----------------+
| database_name |
+----------------+
| db_doit19 |
| db_icer |
| default |
| hang |
+----------------+
切换 数据库
use hang ;
select * from tb_hang ;
+-------------+---------------+
| tb_hang.id | tb_hang.name |
+-------------+---------------+
| 2 | hangge |
| 1 | hang |
+-------------+---------------+
————————————————
版权声明:本文为CSDN博主「白眼黑刺猬」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/qq_37933018/article/details/110452480
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。