当前位置:   article > 正文

Spark操作Hive分区表_df.write.mode(append)模式需要先建表吗

df.write.mode(append)模式需要先建表吗

原作者写的比较清楚了,特别是DDL建了表后,又用Spark向表里写数据常常写不进去,会报异常。

原文地址:https://dongkelun.com/2018/12/04/sparkHivePatition/

前言

前面学习总结了Hive分区表,现在学习总结一下Spark如何操作Hive分区表,包括利用Spark DataFrame创建Hive的分区表和Spark向已经存在Hive分区表里插入数据,并记录一下遇到的问题以及如何解决。

1、Spark创建分区表

只写主要代码,完整代码见附录

  1. val data = Array(("001", "张三", 21, "2018"), ("002", "李四", 18, "2017"))
  2. val df = spark.createDataFrame(data).toDF("id", "name", "age", "year")
  3. //可以将append改为overwrite,这样如果表已存在会删掉之前的表,新建表
  4. df.write.mode("append").partitionBy("year").saveAsTable("new_test_partition")
  • 1
  • 2
  • 3
  • 4

然后在Hive命令行里看一下,新建的表是否有分区字段year
用命令

desc new_test_partition;
  • 1

show create table new_test_partition;
  • 1

根据下面的结果可以看到新建的表确实有分区字段year

  1. hive> desc new_test_partition;
  2. OK
  3. id string
  4. name string
  5. age int
  6. year string
  7. # Partition Information
  8. # col_name data_type comment
  9. year string
  10. Time taken: 0.432 seconds, Fetched: 9 row(s)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

2、向已存在的表插入数据

2.1 Spark创建的分区表

  • 这种情况其实和建表语句一样就可以了
  • 不需要开启动态分区
df.write.mode("append").partitionBy("year").saveAsTable("new_test_partition")
  • 1

当然也有其他方式插入数据,会在后面讲到。

2.2 在Hive命令行创建的表

  • 这里主要指和Spark创建的表的文件格式不一样,Spark默认的文件格式为PARQUET,为在命令行Hive默认的文件格式为TEXTFILE,这种区别,也导致了异常的出现。
  • 需要开启动态分区
  • 不开启会有异常:
Exception in thread "main" org.apache.spark.SparkException: Dynamic partition strict mode requires at least one static partition column. To turn this off set hive.exec.dynamic.partition.mode=nonstrict
  • 1

2.2.1 建表

Hive分区表学习总结的建表语句建表(之前已经建过就不用重复建了)。

  1. create table test_partition (
  2. id string comment 'ID',
  3. name string comment '名字',
  4. age int comment '年龄'
  5. )
  6. comment '测试分区'
  7. partitioned by (year int comment '年')
  8. ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' ;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

2.2.2 异常

试着用上面的插入语句插入数据

df.write.mode("append").partitionBy("year").saveAsTable("test_partition")
  • 1

抛出异常

  1. Exception in thread "main" org.apache.spark.sql.AnalysisException: The format of the existing table dkl.test_partition is `HiveFileFormat`. It doesn't match the specified format `ParquetFileFormat`.;
  • 1

原因就是上面说的文件格式不一致造成的。

2.2.3 解决办法

用fomat指定格式

df.write.mode("append").format("Hive").partitionBy("year").saveAsTable("test_partition")
  • 1

2.3 其他方法

  1. df.createOrReplaceTempView("temp_table")
  2. sql("insert into test_partition select * from temp_table")
  3. df.write.insertInto("test_partition")
  • 1
  • 2
  • 3

其中insertInto不需要也不能将df进行partitionBy,否则会抛出异常

  1. df.write.partitionBy("year").insertInto("test_partition")
  2. Exception in thread "main" org.apache.spark.sql.AnalysisException: insertInto() can't be used together with partitionBy(). Partition columns have already be defined for the table. It is not necessary to use partitionBy().;
  • 1
  • 2

3、完整代码

  1. package com.dkl.blog.spark.hive
  2. import org.apache.spark.sql.SparkSession
  3. /**
  4. * 博客:Spark操作Hive分区表
  5. * https://dongkelun.com/2018/12/04/sparkHivePatition/
  6. *
  7. */
  8. object SparkHivePatition {
  9. def main(args: Array[String]): Unit = {
  10. val spark = SparkSession
  11. .builder()
  12. .appName("SparkHive")
  13. .master("local")
  14. .config("spark.sql.parquet.writeLegacyFormat", true)
  15. .enableHiveSupport()
  16. .getOrCreate()
  17. import spark.sql
  18. val data = Array(("001", "张三", 21, "2018"), ("002", "李四", 18, "2017"))
  19. val df = spark.createDataFrame(data).toDF("id", "name", "age", "year")
  20. //创建临时表
  21. df.createOrReplaceTempView("temp_table")
  22. //切换hive的数据库
  23. sql("use dkl")
  24. // 1、创建分区表,可以将append改为overwrite,这样如果表已存在会删掉之前的表,新建表
  25. df.write.mode("append").partitionBy("year").saveAsTable("new_test_partition")
  26. //2、向Spark创建的分区表写入数据
  27. df.write.mode("append").partitionBy("year").saveAsTable("new_test_partition")
  28. sql("insert into new_test_partition select * from temp_table")
  29. df.write.insertInto("new_test_partition")
  30. //开启动态分区
  31. sql("set hive.exec.dynamic.partition.mode=nonstrict")
  32. //3、向在Hive里用Sql创建的分区表写入数据,抛出异常
  33. // df.write.mode("append").partitionBy("year").saveAsTable("test_partition")
  34. // 4、解决方法
  35. df.write.mode("append").format("Hive").partitionBy("year").saveAsTable("test_partition")
  36. sql("insert into test_partition select * from temp_table")
  37. df.write.insertInto("test_partition")
  38. //这样会抛出异常
  39. // df.write.partitionBy("year").insertInto("test_partition")
  40. spark.stop
  41. }
  42. }
  •  
声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号