赞
踩
DataFrame 将数据写入hive中时,默认的是hive默认数据库,insert into没有指定数据库的参数,数据写入hive表或者hive表分区中:
1、将DataFrame数据写入到hive表中
从DataFrame类中可以看到与hive表有关的写入API有一下几个:
- registerTempTable(tableName:String):Unit,
- inserInto(tableName:String):Unit
- insertInto(tableName:String,overwrite:Boolean):Unit
- saveAsTable(tableName:String,source:String,mode:SaveMode,options:Map[String,String]):Unit
有很多重载函数,不一一列举
registerTempTable函数是创建spark临时表
insertInto函数是向表中写入数据,可以看出此函数不能指定数据库和分区等信息,不可以直接写入。
向hive数据仓库写入数据必须指定数据库,hive数据表建立可以在hive上建立,或者使用hiveContext.sql("create table .....")
下面语句是向指定数据库数据表中写入数据:
- case class Person(name:String,col1:Int,col2:String)
-
- val sc = new org.apache.spark.SparkContext
- val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
-
- import hiveContext.implicits._
- hiveContext.sql("use DataBaseName")
- val data = sc.textFile("path").map(x=>x.split("\\s+")).map(x=>Person(x(0),x(1).toInt,x(2)))
- data.toDF().insertInto("tableName")
创建一个case类将RDD中数据类型转为case类类型,然后通过toDF转换DataFrame,调用insertInto函数时,首先指定数据库,使用的是hiveContext.sql("use DataBaseName") 语句,就可以将DataFrame数据写入hive数据表中了。
2、将DataFrame数据写入hive指定数据表的分区中
hive数据表建立可以在hive上建立,或者使用hiveContext.sql("create table....") ,使用saveAsTable时数据存储格式有限,默认格式为parquet,将数据写入分区的思路是:首先将DataFrame数据写入临时表,之后由hiveContext.sql语句将数据写入hive分区表中,具体操作如下:
- case class Person(name:String,col1:Int,col2:String)
-
- val sc = new org.apache.spark.SparkContext
- val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
-
- import hiveContext.implicits._
- hiveContext.sql("use DataBaseName")
- val data = sc.textFile("path").map(x=>x.split("\\s+")).map(x=>Person(x(0),x(1).toInt,x(2)))
- data.toDF().registerTempTable("table1")
- hiveContext.sql("insert into table2 partition(date='2015-04-02') select name,col1,col2 from table1")
上面代码是spark版本1.6
下面代码是spark版本2.0及以上版本
- val session = SparkSession.builder().appName("WarehouseInventoryByNewMysqlSnap").enableHiveSupport().getOrCreate()
- val sc: SparkContext=session.sparkContext
- session.sql("use bi_work")
- import session.implicits._
- val data = sc.textFile("path").map(x=>x.split("\\s+")).map(x=>Person(x(0),x(1).toInt,x(2)))
- data.toDF().registerTempTable("table1")
- session.sql("insert into table2 partition(date='2015-04-02') select name,col1,col2 from table1")
这边捎带介绍一下hive创建分区表:
hive分区表:是指在创建表时指定的partition的分区空间,若需要创建有分区的表,需要在create表的时候调用可选参数partitioned by。
注意:
一个表可以拥有一个或者多个分区,每个分区以文件夹的形式单独存在表文件夹的目录下
hive的表和列名不区分大小写
分区是以字段的形式在表的结构中存在,通过desc table_name 命令可以查看到字段存在,该字段是分区的标识
建表的语句:
- CREATE EXTERNAL TABLE bi_work.`dw_inventory_snap`
- (`warehouse_id` string COMMENT '',
- `internal_id` string COMMENT '',
- `logical_inventory` string COMMENT '',
- `create_time` timestamp COMMENT '')
- PARTITIONED BY (`snap_time` string) --指定分区
- row format delimited fields terminated by '\t'
— THE END —
◤半年文章精选系列◥
Flink从入门到放弃之源码解析系列
大数据成神之路-基础篇
大数据成神之路-进阶篇
Flink入门系列
Flink高级进阶
Hadoop生态圈系列
《如何从根源上解决 HDFS 小文件问题》(https://dwz.cn/FqDPpRUc)
《Hadoop解决小文件存储思路》(https://dwz.cn/2oCdmCkw)
《一个知识点将你拒之门外之Hbase的二级索引》(https://dwz.cn/umfBOZ5l)
实时计算系列(spark、kafka等)
规范和系统设计
杂谈
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。