当前位置:   article > 正文

Spark与Iceberg整合写操作-INSERT INTO,MERGE INTO,INSERT OVERWRITE,DELETE FROM,UPDATE,sparkstreaming整合iceber...

merge into table is not supported temporarily

1.8.7 Spark与Iceberg整合写操作

1.8.7.1 INSERT INTO

"insert into"是向Iceberg表中插入数据,有两种语法形式:"INSERT INTO tbl VALUES (1,"zs",18),(2,"ls",19)"、"INSERT INTO tbl SELECT ...",以上两种方式比较简单,这里不再详细记录。

1.8.7.2 MERGE INTO

Iceberg "merge into"语法可以对表数据进行行级更新或删除,在Spark3.x版本之后支持,其原理是重写包含需要删除和更新行数据所在的data files。"merge into"可以使用一个查询结果数据来更新目标表的数据,其语法通过类似join关联方式,根据指定的匹配条件对匹配的行数据进行相应操作。"merge into"语法如下:

  1. MERGE INTO tbl t
  2. USING (SELECT ...) s
  3. ON t.id = s.id
  4. WHEN MATCHED AND ... THEN DELETE //删除
  5. WHEN MATCHED AND ... THEN UPDATE SET ... //更新
  6. WHEN MATCHED AND ... AND ... THEN UPDATE SET ... //多条件更新
  7. WHEN NOT MATCHED ADN ... THEN INSERT (col1,col2...) VALUES(s.col1,s.col2 ...)//匹配不上向目标表插入数据

具体案例如下:

  1. 首先创建a表和b表,并插入数据
  1. val spark: SparkSession = SparkSession.builder().master("local").appName("SparkOperateIceberg")
  2. //指定hadoop catalog,catalog名称为hadoop_prod
  3. .config("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog")
  4. .config("spark.sql.catalog.hadoop_prod.type", "hadoop")
  5. .config("spark.sql.catalog.hadoop_prod.warehouse", "hdfs://mycluster/sparkoperateiceberg")
  6. .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
  7. .getOrCreate()
  8. //创建一张表 a ,并插入数据
  9. spark.sql(
  10. """
  11. |create table hadoop_prod.default.a (id int,name string,age int) using iceberg
  12. """.stripMargin)
  13. spark.sql(
  14. """
  15. |insert into hadoop_prod.default.a values (1,"zs",18),(2,"ls",19),(3,"ww",20)
  16. """.stripMargin)
  17. //创建另外一张表b ,并插入数据
  18. spark.sql(
  19. """
  20. |create table hadoop_prod.default.b (id int,name string,age int,tp string) using iceberg
  21. """.stripMargin)
  22. spark.sql(
  23. """
  24. |insert into hadoop_prod.default.b values (1,"zs",30,"delete"),(2,"李四",31,"update"),(4,"王五",32,"add")
  25. """.stripMargin)
  1. 使用MERGE INTO 语法向目标表更新、删除、新增数据

这里我们计划将b表与a表匹配id,如果b表中tp字段是"delete"那么a表中对应的id数据删除,如果b表中tp字段是"update",那么a表中对应的id数据其他字段进行更新,如果a表与b表id匹配不上,那么将b表中的数据插入到a表中,具体操作如下:

  1. //将表b 中与表a中相同id的数据更新到表a,表a中没有表b中有的id对应数据写入增加到表a
  2. spark.sql(
  3. """
  4. |merge into hadoop_prod.default.a t1
  5. |using (select id,name ,age,tp from hadoop_prod.default.b) t2
  6. |on t1.id = t2.id
  7. |when matched and t2.tp = 'delete' then delete
  8. |when matched and t2.tp = 'update' then update set t1.name = t2.name,t1.age = t2.age
  9. |when not matched then insert (id,name,age) values (t2.id,t2.name,t2.age)
  10. """.stripMargin)
  11. spark.sql("""select * from hadoop_prod.default.a """).show()

最终结果如下:

image.png

注意:更新数据时,在查询的数据中只能有一条匹配的数据更新到目标表,否则将报错。

关于我的 INSERT INTO 测试代码和结果如下:

  1. package com.shujia.spark.iceberg
  2. import org.apache.spark.sql.SparkSession
  3. object MergeInto {
  4. def main(args: Array[String]): Unit = {
  5. /**
  6. *
  7. * MERGE INTO tbl t
  8. * USING (SELECT ...) s
  9. * ON t.id = s.id
  10. * WHEN MATCHED AND ... THEN DELETE //删除
  11. * WHEN MATCHED AND ... THEN UPDATE SET ... //更新
  12. * WHEN MATCHED AND ... AND ... THEN UPDATE SET ... //多条件更新
  13. * WHEN NOT MATCHED ADN ... THEN INSERT (col1,col2...) VALUES(s.col1,s.col2 ...)//匹配不上向目标表插入数据
  14. *
  15. */
  16. val spark: SparkSession = SparkSession
  17. .builder()
  18. .appName("test")
  19. //指定hadoop catalog,catalog名称为hadoop_prod
  20. .config("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog")
  21. .config("spark.sql.catalog.hadoop_prod.type", "hadoop")
  22. .config("spark.sql.catalog.hadoop_prod.warehouse", "hdfs://master:9000/spark")
  23. .getOrCreate()
  24. //首先创建a表和b表,并插入数据
  25. //创建一张表 a ,并插入数据
  26. spark.sql(
  27. """
  28. |create table if not exists hadoop_prod.default.a (id int,name string,age int) using iceberg
  29. """.stripMargin)
  30. spark.sql(
  31. """
  32. |insert into hadoop_prod.default.a values (1,"zs",18),(2,"ls",19),(3,"ww",20)
  33. |
  34. """.stripMargin)
  35. //创建另外一张表b ,并插入数据
  36. spark.sql(
  37. """
  38. |create table if not exists hadoop_prod.default.b (id int,name string,age int,tp string) using iceberg
  39. """.stripMargin)
  40. spark.sql(
  41. """
  42. |insert into hadoop_prod.default.b values
  43. | (1,"zs",30,"delete")
  44. |,(2,"李四",31,"update")
  45. |,(4,"王五",32,"add")
  46. |
  47. """.stripMargin)
  48. /**
  49. *
  50. * 使用MERGE INTO 语法向目标表更新、删除、新增数据
  51. *
  52. * 这里我们计划将b表与a表匹配id,如果b表中tp字段是"delete"那么a表中对应的id数据删除,
  53. * 如果b表中tp字段是"update",那么a表中对应的id数据其他字段进行更新,如果a表与b表id匹配不上,
  54. * 那么将b表中的数据插入到a表中,具体操作如下:
  55. */
  56. //将表b 中与表a中相同id的数据更新到表a,表a中没有表b中有的id对应数据写入增加到表a
  57. spark.sql(
  58. """
  59. |merge into hadoop_prod.default.a t1
  60. |using (select id,name ,age,tp from hadoop_prod.default.b) t2
  61. |on t1.id = t2.id
  62. |when matched and t2.tp = 'delete' then delete
  63. |when matched and t2.tp = 'update' then update set t1.name = t2.name,t1.age = t2.age
  64. |when not matched then insert (id,name,age) values (t2.id,t2.name,t2.age)
  65. |
  66. """.stripMargin)
  67. spark.sql("""select * from hadoop_prod.default.a """).show()
  68. /**
  69. *
  70. * 进过测试报错如下:
  71. *Exception in thread "main" java.lang.UnsupportedOperationException:
  72. * MERGE INTO TABLE is not supported temporarily.
  73. *
  74. * 翻译:
  75. *Exception in thread "main" java.lang.UnsupportedOperationException: MERGE INTO TABLE is not supported temporarily.
  76. *线程“main”java.lang.UnsupportedOperationException:暂时不支持MERGE INTO TABLE。
  77. *
  78. */
  79. //spark 提交任务的命令
  80. //spark-submit --master yarn --class com.shujia.spark.iceberg.MergeInto spark-1.0.jar
  81. }
  82. }
1.8.7.3 INSERT OVERWRITE

"insert overwrite"可以覆盖Iceberg表中的数据,这种操作会将表中全部数据替换掉,建议如果有部分数据替换操作可以使用"merge into"操作。

对于Iceberg分区表使用"insert overwrite"操作时,有两种情况,第一种是“动态覆盖”,第二种是“静态覆盖”。

  • 动态分区覆盖:

动态覆盖会全量将原有数据覆盖,并将新插入的数据根据Iceberg表分区规则自动分区,类似Hive中的动态分区。

  • 静态分区覆盖:

静态覆盖需要在向Iceberg中插入数据时需要手动指定分区,如果当前Iceberg表存在这个分区,那么只有这个分区的数据会被覆盖,其他分区数据不受影响,如果Iceberg表不存在这个分区,那么相当于给Iceberg表增加了个一个分区。具体操作如下:

  1. 创建三张表

创建test1分区表、test2普通表、test3普通表三张表,并插入数据,每张表字段相同,但是插入数据不同。

  1. //创建 test1 分区表,并插入数据
  2. spark.sql(
  3. """
  4. |create table hadoop_prod.default.test1 (id int,name string,loc string)
  5. |using iceberg
  6. |partitioned by (loc)
  7. """.stripMargin)
  8. spark.sql(
  9. """
  10. |insert into hadoop_prod.default.test1 values (1,"zs","beijing"),(2,"ls","shanghai")
  11. """.stripMargin)
  12. //创建 test2 普通表,并插入数据
  13. spark.sql(
  14. """
  15. |create table hadoop_prod.default.test2 (id int,name string,loc string)
  16. |using iceberg
  17. """.stripMargin)
  18. spark.sql(
  19. """
  20. |insert into hadoop_prod.default.test2 values (10,"x1","shandong"),(11,"x2","hunan")
  21. """.stripMargin)
  22. //创建 test3 普通表,并插入数据
  23. spark.sql(
  24. """
  25. |create table hadoop_prod.default.test3 (id int,name string,loc string)
  26. |using iceberg
  27. """.stripMargin)
  28. spark.sql(
  29. """
  30. |insert into hadoop_prod.default.test3 values (3,"ww","beijing"),(4,"ml","shanghai"),(5,"tq","guangzhou")
  31. """.stripMargin)
  1. 使用insert overwrite 读取test3表中的数据覆盖到test2表中
  1. //使用insert overwrite 读取test3 表中的数据覆盖到test2 普通表中
  2. spark.sql(
  3. """
  4. |insert overwrite hadoop_prod.default.test2
  5. |select id,name,loc from hadoop_prod.default.test3
  6. """.stripMargin)
  7. //查询 test2 表中的数据
  8. spark.sql(
  9. """
  10. |select * from hadoop_prod.default.test2
  11. """.stripMargin).show()

Iceberg 表 test2结果如下:

image.png

  1. 使用insert overwrite 读取test3表数据,动态分区方式覆盖到表test1
  1. // 使用insert overwrite 读取test3表数据 动态分区方式覆盖到表 test1
  2. spark.sql(
  3. """
  4. |insert overwrite hadoop_prod.default.test1
  5. |select id,name,loc from hadoop_prod.default.test3
  6. """.stripMargin)
  7. //查询 test1 表数据
  8. spark.sql(
  9. """
  10. |select * from hadoop_prod.default.test1
  11. """.stripMargin).show()

Iceberg 表 test1结果如下:

image.png

  1. 静态分区方式,将iceberg表test3的数据覆盖到Iceberg表test1中

这里可以将test1表删除,然后重新创建,加载数据,也可以直接读取test3中的数据静态分区方式更新到test1。另外,使用insert overwrite 语法覆盖静态分区方式时,查询的语句中就不要再次写入分区列,否则会重复。

  1. //删除表test1,重新创建表test1 分区表,并插入数据
  2. spark.sql(
  3. """
  4. |drop table hadoop_prod.default.test1
  5. """.stripMargin)
  6. spark.sql(
  7. """
  8. |create table hadoop_prod.default.test1 (id int,name string,loc string)
  9. |using iceberg
  10. |partitioned by (loc)
  11. """.stripMargin)
  12. spark.sql(
  13. """
  14. |insert into hadoop_prod.default.test1 values (1,"zs","beijing"),(2,"ls","shanghai")
  15. """.stripMargin)
  16. spark.sql("select * from hadoop_prod.default.test1").show()

Iceberg 表 test1结果如下:

image.png

//注意:指定静态分区"jiangsu",静态分区下,就不要在查询 “loc" 列了,否则重复

  1. spark.sql(
  2. """
  3. |insert overwrite hadoop_prod.default.test1
  4. |partition (loc = "jiangsu")
  5. |select id,name from hadoop_prod.default.test3
  6. """.stripMargin)
  7. //查询 test1 表数据
  8. spark.sql(
  9. """
  10. |select * from hadoop_prod.default.test1
  11. """.stripMargin).show()

Iceberg 表 test1结果如下:

image.png

注意:使用insert overwrite 读取test3表数据 静态分区方式覆盖到表 test1,表中其他分区数据不受影响,只会覆盖指定的静态分区数据。

进过测试 INSERT OVERWRITE 我的测试代码及结果如下:

  1. package com.shujia.spark.iceberg
  2. import org.apache.spark.sql.SparkSession
  3. object InsertOverwrite {
  4. def main(args: Array[String]): Unit = {
  5. /*
  6. "insert overwrite"可以覆盖Iceberg表中的数据,这种操作会将表中全部数据替换掉,
  7. 建议如果有部分数据替换操作可以使用"merge into"操作。
  8. 对于Iceberg分区表使用"insert overwrite"操作时,有两种情况,第一种是“动态覆盖”,第二种是“静态覆盖”。
  9. 动态分区覆盖:
  10. 动态覆盖会全量将原有数据覆盖,并将新插入的数据根据Iceberg表分区规则自动分区,类似Hive中的动态分区。
  11. 静态分区覆盖:
  12. 静态覆盖需要在向Iceberg中插入数据时需要手动指定分区,如果当前Iceberg表存在这个分区,
  13. 那么只有这个分区的数据会被覆盖,其他分区数据不受影响,如果Iceberg表不存在这个分区,
  14. 那么相当于给Iceberg表增加了个一个分区。具体操作如下:
  15. */
  16. val spark: SparkSession = SparkSession
  17. .builder()
  18. .appName("test")
  19. //指定hadoop catalog,catalog名称为hadoop_prod
  20. .config("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog")
  21. .config("spark.sql.catalog.hadoop_prod.type", "hadoop")
  22. .config("spark.sql.catalog.hadoop_prod.warehouse", "hdfs://master:9000/spark")
  23. .getOrCreate()
  24. /**
  25. *
  26. * 删除表
  27. */
  28. //删除表test1,重新创建表test1 分区表,并插入数据
  29. spark.sql(
  30. """
  31. |drop table if exists hadoop_prod.default.test1
  32. """.stripMargin)
  33. spark.sql(
  34. """
  35. |drop table if exists hadoop_prod.default.test2
  36. """.stripMargin)
  37. spark.sql(
  38. """
  39. |drop table if exists hadoop_prod.default.test3
  40. """.stripMargin)
  41. //创建test1分区表、test2普通表、test3普通表三张表,并插入数据,每张表字段相同,但是插入数据不同。
  42. //创建 test1 分区表,并插入数据
  43. spark.sql(
  44. """
  45. |create table if not exists hadoop_prod.default.test1 (id int,name string,loc string)
  46. |using iceberg
  47. |partitioned by (loc)
  48. """.stripMargin)
  49. spark.sql(
  50. """
  51. |insert into hadoop_prod.default.test1 values (1,"zs","beijing"),(2,"ls","shanghai")
  52. """.stripMargin)
  53. //创建 test2 普通表,并插入数据
  54. spark.sql(
  55. """
  56. |create table if not exists hadoop_prod.default.test2 (id int,name string,loc string)
  57. |using iceberg
  58. """.stripMargin)
  59. spark.sql(
  60. """
  61. |insert into hadoop_prod.default.test2 values (10,"x1","shandong"),(11,"x2","hunan")
  62. """.stripMargin)
  63. //创建 test3 普通表,并插入数据
  64. spark.sql(
  65. """
  66. |create table if not exists hadoop_prod.default.test3 (id int,name string,loc string)
  67. |using iceberg
  68. """.stripMargin)
  69. spark.sql(
  70. """
  71. |insert into hadoop_prod.default.test3 values (3,"ww","beijing"),(4,"ml","shanghai"),(5,"tq","guangzhou")
  72. """.stripMargin)
  73. //使用insert overwrite 读取test3表中的数据覆盖到test2表中 -- 非分区插入非分区
  74. spark.sql(
  75. """
  76. |insert overwrite hadoop_prod.default.test2
  77. |select id,name,loc from hadoop_prod.default.test3
  78. |
  79. """.stripMargin)
  80. //查询 test2 表中的数据
  81. spark.sql(
  82. """
  83. |select * from hadoop_prod.default.test2
  84. |
  85. """.stripMargin).show()
  86. // 使用insert overwrite 读取test3表数据 动态分区方式覆盖到表 test1 -- 非分区插入分区表
  87. spark.sql(
  88. """
  89. |insert overwrite hadoop_prod.default.test1
  90. |select id,name,loc from hadoop_prod.default.test3
  91. """.stripMargin)
  92. //查询 test1 表数据
  93. spark.sql(
  94. """
  95. |select * from hadoop_prod.default.test1
  96. """.stripMargin).show()
  97. //spark 提交任务的命令
  98. //spark-submit --master yarn --class com.shujia.spark.iceberg.InsertOverwrite spark-1.0.jar
  99. }
  100. }
  101. =======第二部分==========
  102. package com.shujia.spark.iceberg
  103. import org.apache.spark.sql.SparkSession
  104. object InsertOverwrite2 {
  105. def main(args: Array[String]): Unit = {
  106. /*
  107. 静态分区方式,将iceberg表test3的数据覆盖到Iceberg表test1中
  108. 这里可以将test1表删除,然后重新创建,加载数据,也可以直接读取test3中的数据静态分区方式更新到test1。
  109. 另外,使用insert overwrite 语法覆盖静态分区方式时,查询的语句中就不要再次写入分区列,否则会重复。
  110. */
  111. val spark: SparkSession = SparkSession
  112. .builder()
  113. .appName("test")
  114. //指定hadoop catalog,catalog名称为hadoop_prod
  115. .config("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog")
  116. .config("spark.sql.catalog.hadoop_prod.type", "hadoop")
  117. .config("spark.sql.catalog.hadoop_prod.warehouse", "hdfs://master:9000/spark")
  118. .getOrCreate()
  119. //删除表test1,重新创建表test1 分区表,并插入数据
  120. spark.sql(
  121. """
  122. |drop table hadoop_prod.default.test1
  123. """.stripMargin)
  124. spark.sql(
  125. """
  126. |create table hadoop_prod.default.test1 (id int,name string,loc string)
  127. |using iceberg
  128. |partitioned by (loc)
  129. """.stripMargin)
  130. spark.sql(
  131. """
  132. |insert into hadoop_prod.default.test1 values (1,"zs","beijing"),(2,"ls","shanghai")
  133. """.stripMargin)
  134. spark.sql("select * from hadoop_prod.default.test1").show()
  135. //spark 提交任务的命令
  136. //spark-submit --master yarn --class com.shujia.spark.iceberg.InsertOverwrite2 spark-1.0.jar
  137. }
  138. }
1.8.7.4 DELETE FROM

Spark3.x版本之后支持"Delete from"可以根据指定的where条件来删除表中数据。如果where条件匹配Iceberg表一个分区的数据,Iceberg仅会修改元数据,如果where条件匹配的表的单个行,则Iceberg会重写受影响行所在的数据文件。具体操作如下:

  1. //创建表 delete_tbl ,并加载数据
  2. spark.sql(
  3. """
  4. |create table hadoop_prod.default.delete_tbl (id int,name string,age int) using iceberg
  5. |""".stripMargin)
  6. spark.sql(
  7. """
  8. |insert into hadoop_prod.default.delete_tbl values (1,"zs",18),(2,"ls",19),(3,"ww",20),(4,"ml",21),(5,"tq",22),(6,"gb",23)
  9. """.stripMargin)
  10. //根据条件范围删除表 delete_tbl 中的数据
  11. spark.sql(
  12. """
  13. |delete from hadoop_prod.default.delete_tbl where id >3 and id <6
  14. """.stripMargin)
  15. spark.sql("select * from hadoop_prod.default.delete_tbl").show()

Iceberg 表 delete_tbl结果如下:

image.png

  1. //根据条件删除表 delete_tbl 中的一条数据
  2. spark.sql(
  3. """
  4. |delete from hadoop_prod.default.delete_tbl where id = 2
  5. """.stripMargin)
  6. spark.sql("select * from hadoop_prod.default.delete_tbl").show()

Iceberg 表 delete_tbl结果如下:

image.png

测试DELECT FROM 测试代码及结果如下:

  1. package com.shujia.spark.iceberg
  2. import org.apache.spark.sql.SparkSession
  3. object DeleteFrom {
  4. def main(args: Array[String]): Unit = {
  5. /*
  6. Spark3.x版本之后支持"Delete from"可以根据指定的where条件来删除表中数据。
  7. 如果where条件匹配Iceberg表一个分区的数据,Iceberg仅会修改元数据,如果where条件匹配的表的单个行,\
  8. 则Iceberg会重写受影响行所在的数据文件。具体操作如下:
  9. */
  10. val spark: SparkSession = SparkSession
  11. .builder()
  12. .appName("test")
  13. //指定hadoop catalog,catalog名称为hadoop_prod
  14. .config("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog")
  15. .config("spark.sql.catalog.hadoop_prod.type", "hadoop")
  16. .config("spark.sql.catalog.hadoop_prod.warehouse", "hdfs://master:9000/spark")
  17. .getOrCreate()
  18. //创建表 delete_tbl ,并加载数据
  19. spark.sql(
  20. """
  21. |create table if not exists hadoop_prod.default.delete_tb2 (id int,name string,age int) using iceberg
  22. |""".stripMargin)
  23. spark.sql(
  24. """
  25. |insert into hadoop_prod.default.delete_tb2 values
  26. |(1,"zs",18),(2,"ls",19),(3,"ww",20),(4,"ml",21),(5,"tq",22),(6,"gb",23),(7,"gx",00)
  27. |
  28. """.stripMargin)
  29. spark.sql("select * from hadoop_prod.default.delete_tb2").show()
  30. //根据条件范围删除表 delete_tbl 中的数据
  31. // spark.sql(
  32. // """
  33. // |
  34. // |delete from hadoop_prod.default.delete_tb2 where id >3 and id <6
  35. // |
  36. // """.stripMargin)
  37. spark.sql("select * from hadoop_prod.default.delete_tb2").show()
  38. //根据条件删除表 delete_tbl 中的一条数据
  39. spark.sql(
  40. """
  41. |delete from hadoop_prod.default.delete_tb2 where id = 2
  42. """.stripMargin)
  43. spark.sql("select * from hadoop_prod.default.delete_tb2").show()
  44. //spark 提交任务的命令
  45. //spark-submit --master yarn --class com.shujia.spark.iceberg.DeleteFrom spark-1.0.jar
  46. /**
  47. *
  48. * 进过测试:报错-错误信息如下:
  49. *
  50. * Exception in thread "main" org.apache.spark.sql.AnalysisException:
  51. * Cannot delete from table hadoop_prod.default.delete_tb2 where [GreaterThan(id,3), LessThan(id,6)]
  52. *
  53. * 翻译:
  54. * 线程“main”org.apache.spark.sql中出现异常。AnalysisException:
  55. * 无法从表hadoop_prod.default中删除。delete_tb2其中[大于(id,3),小于(id,6)]
  56. *
  57. *
  58. */
  59. }
  60. }
1.8.7.5 UPDATE

Spark3.x+版本支持了update更新数据操作,可以根据匹配的条件进行数据更新操作。操作如下:

  1. //创建表 delete_tbl ,并加载数据
  2. spark.sql(
  3. """
  4. |create table hadoop_prod.default.update_tbl (id int,name string,age int) using iceberg
  5. |""".stripMargin)
  6. spark.sql(
  7. """
  8. |insert into hadoop_prod.default.update_tbl values (1,"zs",18),(2,"ls",19),(3,"ww",20),(4,"ml",21),(5,"tq",22),(6,"gb",23)
  9. """.stripMargin)

通过“update”更新表中id小于等于3的数据name列改为“zhangsan”,age列改为30,操作如下:

  1. //更新 delete_tbl 表
  2. spark.sql(
  3. """
  4. |update hadoop_prod.default.update_tbl set name = 'zhangsan' ,age = 30
  5. |where id <=3
  6. """.stripMargin)
  7. spark.sql(
  8. """
  9. |select * from hadoop_prod.default.update_tbl
  10. """.stripMargin).show()

Iceberg 表 update_tbl结果如下:

image.png

1.8.7.6 DataFrame API 写入Iceberg表

Spark向Iceberg中写数据时不仅可以使用SQL方式,也可以使用DataFrame Api方式操作Iceberg,建议使用SQL方式操作。

DataFrame创建Iceberg表分为创建普通表和分区表,创建分区表时需要指定分区列,分区列可以是多个列。创建表的语法如下:

  1. df.write(tbl).create() 相当于 CREATE TABLE AS SELECT ...
  2. df.write(tbl).replace() 相当于 REPLACE TABLE AS SELECT ...
  3. df.write(tbl).append() 相当于 INSERT INTO ...
  4. df.write(tbl).overwritePartitions() 相当于动态 INSERT OVERWRITE ...

具体操作如下:

  1. //1.准备数据,使用DataFrame Api 写入Iceberg表及分区表
  2. val nameJsonList = List[String](
  3. "{\"id\":1,\"name\":\"zs\",\"age\":18,\"loc\":\"beijing\"}",
  4. "{\"id\":2,\"name\":\"ls\",\"age\":19,\"loc\":\"shanghai\"}",
  5. "{\"id\":3,\"name\":\"ww\",\"age\":20,\"loc\":\"beijing\"}",
  6. "{\"id\":4,\"name\":\"ml\",\"age\":21,\"loc\":\"shanghai\"}")
  7. import spark.implicits._
  8. val df: DataFrame = spark.read.json(nameJsonList.toDS)
  9. //创建普通表df_tbl1,并将数据写入到Iceberg表,其中DF中的列就是Iceberg表中的列
  10. df.writeTo("hadoop_prod.default.df_tbl1").create()
  11. //查询表 hadoop_prod.default.df_tbl1 中的数据,并查看数据存储结构
  12. spark.read.table("hadoop_prod.default.df_tbl1").show()

Iceberg 表 df_tbl1结果如下:

image.png

Iceberg 表 df_tbl1存储如下:

image.png

  1. //创建分区表df_tbl2,并将数据写入到Iceberg表,其中DF中的列就是Iceberg表中的列
  2. df.sortWithinPartitions($"loc")//写入分区表,必须按照分区列进行排序
  3. .writeTo("hadoop_prod.default.df_tbl2")
  4. .partitionedBy($"loc")//这里可以指定多个列为联合分区
  5. .create()
  6. //查询分区表 hadoop_prod.default.df_tbl2 中的数据,并查看数据存储结构
  7. spark.read.table("hadoop_prod.default.df_tbl2").show()

Iceberg 分区表 df_tbl2结果如下:

image.png

Iceberg 分区表 df_tbl2存储如下:

image.png

1.8.8 Structured Streaming实时写入Iceberg

目前Spark中Structured Streaming只支持实时向Iceberg中写入数据,不支持实时从Iceberg中读取数据,下面案例我们将使用Structured Streaming从Kafka中实时读取数据,然后将结果实时写入到Iceberg中。

  1. 创建Kafka topic

启动Kafka集群,创建“kafka-iceberg-topic”

[root@node1 bin]# ./kafka-topics.sh  --zookeeper node3:2181,node4:2181,node5:2181  --create  --topic kafka-iceberg-topic  --partitions 3 --replication-factor 3
  1. 编写向Kafka生产数据代码
  1. /**
  2. * 向Kafka中写入数据
  3. */
  4. object WriteDataToKafka {
  5. def main(args: Array[String]): Unit = {
  6. val props = new Properties()
  7. props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092")
  8. props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
  9. props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
  10. val producer = new KafkaProducer[String,String](props)
  11. var counter = 0
  12. var keyFlag = 0
  13. while(true){
  14. counter +=1
  15. keyFlag +=1
  16. val content: String = userlogs()
  17. producer.send(new ProducerRecord[String, String]("kafka-iceberg-topic", content))
  18. //producer.send(new ProducerRecord[String, String]("kafka-iceberg-topic", s"key-$keyFlag", content))
  19. if(0 == counter%100){
  20. counter = 0
  21. Thread.sleep(5000)
  22. }
  23. }
  24. producer.close()
  25. }
  26. def userlogs()={
  27. val userLogBuffer = new StringBuffer("")
  28. val timestamp = new Date().getTime();
  29. var userID = 0L
  30. var pageID = 0L
  31. //随机生成的用户ID
  32. userID = Random.nextInt(2000)
  33. //随机生成的页面ID
  34. pageID = Random.nextInt(2000);
  35. //随机生成Channel
  36. val channelNames = Array[String]("Spark","Scala","Kafka","Flink","Hadoop","Storm","Hive","Impala","HBase","ML")
  37. val channel = channelNames(Random.nextInt(10))
  38. val actionNames = Array[String]("View", "Register")
  39. //随机生成action行为
  40. val action = actionNames(Random.nextInt(2))
  41. val dateToday = new SimpleDateFormat("yyyy-MM-dd").format(new Date())
  42. userLogBuffer.append(dateToday)
  43. .append("\t")
  44. .append(timestamp)
  45. .append("\t")
  46. .append(userID)
  47. .append("\t")
  48. .append(pageID)
  49. .append("\t")
  50. .append(channel)
  51. .append("\t")
  52. .append(action)
  53. System.out.println(userLogBuffer.toString())
  54. userLogBuffer.toString()
  55. }
  56. }
  1. 编写Structured Streaming读取Kafka数据实时写入Iceberg
  1. object StructuredStreamingSinkIceberg {
  2. def main(args: Array[String]): Unit = {
  3. //1.准备对象
  4. val spark: SparkSession = SparkSession.builder().master("local").appName("StructuredSinkIceberg")
  5. //指定hadoop catalog,catalog名称为hadoop_prod
  6. .config("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog")
  7. .config("spark.sql.catalog.hadoop_prod.type", "hadoop")
  8. .config("spark.sql.catalog.hadoop_prod.warehouse", "hdfs://mycluster/structuredstreaming")
  9. .getOrCreate()
  10. // spark.sparkContext.setLogLevel("Error")
  11. //2.创建Iceberg 表
  12. spark.sql(
  13. """
  14. |create table if not exists hadoop_prod.iceberg_db.iceberg_table (
  15. | current_day string,
  16. | user_id string,
  17. | page_id string,
  18. | channel string,
  19. | action string
  20. |) using iceberg
  21. """.stripMargin)
  22. val checkpointPath = "hdfs://mycluster/iceberg_table_checkpoint"
  23. val bootstrapServers = "node1:9092,node2:9092,node3:9092"
  24. //多个topic 逗号分开
  25. val topic = "kafka-iceberg-topic"
  26. //3.读取Kafka读取数据
  27. val df = spark.readStream
  28. .format("kafka")
  29. .option("kafka.bootstrap.servers", bootstrapServers)
  30. .option("auto.offset.reset", "earliest")
  31. .option("group.id", "iceberg-kafka")
  32. .option("subscribe", topic)
  33. .load()
  34. import spark.implicits._
  35. import org.apache.spark.sql.functions._
  36. val resDF = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  37. .as[(String, String)].toDF("id", "data")
  38. val transDF: DataFrame = resDF.withColumn("current_day", split(col("data"), "\t")(0))
  39. .withColumn("ts", split(col("data"), "\t")(1))
  40. .withColumn("user_id", split(col("data"), "\t")(2))
  41. .withColumn("page_id", split(col("data"), "\t")(3))
  42. .withColumn("channel", split(col("data"), "\t")(4))
  43. .withColumn("action", split(col("data"), "\t")(5))
  44. .select("current_day", "user_id", "page_id", "channel", "action")
  45. //结果打印到控制台,Default trigger (runs micro-batch as soon as it can)
  46. // val query: StreamingQuery = transDF.writeStream
  47. // .outputMode("append")
  48. // .format("console")
  49. // .start()
  50. //4.流式写入Iceberg表
  51. val query = transDF.writeStream
  52. .format("iceberg")
  53. .outputMode("append")
  54. //每分钟触发一次Trigger.ProcessingTime(1, TimeUnit.MINUTES)
  55. //每10s 触发一次 Trigger.ProcessingTime(1, TimeUnit.MINUTES)
  56. .trigger(Trigger.ProcessingTime(10, TimeUnit.SECONDS))
  57. .option("path", "hadoop_prod.iceberg_db.iceberg_table")
  58. .option("fanout-enabled", "true")
  59. .option("checkpointLocation", checkpointPath)
  60. .start()
  61. query.awaitTermination()
  62. }
  63. }

注意:以上代码执行时由于使用的Spark版本为3.1.2,其依赖的Hadoop版本为Hadoop3.2版本,所以需要在本地Window中配置Hadoop3.1.2的环境变量以及将对应的hadoop.dll放入window "C:\Windows\System32"路径下。

Structuerd Streaming向Iceberg实时写入数据有以下几个注意点:

  • 写Iceberg表写出数据支持两种模式:append和complete,append是将每个微批数据行追加到表中。complete是替换每个微批数据内容。
  • 向Iceberg中写出数据时指定的path可以是HDFS路径,可以是Iceberg表名,如果是表名,要预先创建好Iceberg表。
  • 写出参数fanout-enabled指的是如果Iceberg写出的表是分区表,在向表中写数据之前要求Spark每个分区的数据必须排序,但这样会带来数据延迟,为了避免这个延迟,可以设置“fanout-enabled”参数为true,可以针对每个Spark分区打开一个文件,直到当前task批次数据写完,这个文件再关闭。
  • 实时向Iceberg表中写数据时,建议trigger设置至少为1分钟提交一次,因为每次提交都会产生一个新的数据文件和元数据文件,这样可以减少一些小文件。为了进一步减少数据文件,建议定期合并“data files”(参照1.9.6.9)和删除旧的快照(1.9.6.10)。
  1. 查看Iceberg中数据结果

启动向Kafka生产数据代码,启动向Iceberg中写入数据的Structured Streaming程序,执行以下代码来查看对应的Iceberg结果:

  1. //1.准备对象
  2. val spark: SparkSession = SparkSession.builder().master("local").appName("StructuredSinkIceberg")
  3. //指定hadoop catalog,catalog名称为hadoop_prod
  4. .config("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog")
  5. .config("spark.sql.catalog.hadoop_prod.type", "hadoop")
  6. .config("spark.sql.catalog.hadoop_prod.warehouse", "hdfs://mycluster/structuredstreaming")
  7. .getOrCreate()
  8. //2.读取Iceberg 表中的数据结果
  9. spark.sql(
  10. """
  11. |select * from hadoop_prod.iceberg_db.iceberg_table
  12. """.stripMargin).show()
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/代码探险家/article/detail/853125
推荐阅读
相关标签
  

闽ICP备14008679号