赞
踩
spark 目前较为基础且常用的场景应该就是对 hive 表进行读写操作 ,尤其通过使用spark sql 实现数据分析、关联等操作
常用的方式是直接采用spark on hive的方式,在创建SparkSession时开启enableHiveSupport。连接上hive之后,可以利用Spark sql对hive表进行读、写、join等操作,官方也推荐Spark sql模式,因为其支持对dataframe(Dataset的特列,DataFrame=Dataset[Row] )进行操作,很多数据分析人员习惯使用python,而python没有dataset,而且sql方式对数据进行批处理方式更为直观。
这里就不进行赘述,简单说明下:
-- 加载服务器本地文件,以覆盖写的方式
load data local inpath '/home/test_a.txt' overwrite into table zero_test_a partition(partition_name='20220518');
-- 加载hdfs文件,以追加写的方式
load data inpath '/user/hdfs/test_b.txt' into table zero_test_a partition(partition_name='20220518');
连接hive时候注意开启 enableHiveSupport() 配置,另外本地测试需要加上 master(“local[*]”) 或者 master(“local[]”)
val spark = SparkSession
.builder()
//本地idea测试时候需要加上
.master("local[*]")
.appName("spark_opts_hive")
// 开启Hive支持
.enableHiveSupport()
//配置metastore.uris
.config("hive.metastore.uris","thrift://xxx.xxx.xxx.xxx:9083")
// 配置Hivewarehouse地址
.config("hive.metastore.warehouse.dir","/user/hive/warehouse")
// 如果设置了账号和密码的需要配置
.config("username","zero")
.config("password","zero")
.getOrCreate()
完成连接之后,可以通过spark.sql()方式实现读取、关联等操作,具体请看代码示例。
package com.zero.scala.sparkSql import org.apache.spark.sql.{DataFrame, SparkSession} import scala.collection.mutable.ArrayBuffer /** * sparksql 对hive表进行join * 使用sparksession */ object SparkHiveOps { var sparkSession:SparkSession = _ private def create_sparksession():SparkSession = { val hiveMetaUris = "thrift://xxx.xxx.xxx.xxx:9083" SparkSession.builder().master("local[*]") .appName("createSparkSession") .enableHiveSupport() .config("hive.metastore.warehouse.dir","/user/hive/warehouse") .config("hive.metastore.uris","hiveMetaUris") .getOrCreate() } private def init() :Unit = { sparkSession = create_sparksession() sparkSession.sql("use zeroDb") } private def destroy_sparkSession(ss:SparkSession) : Unit ={ if(ss != null) ss.close() } /** * join */ private def joinOps():DataFrame = { sparkSession.sql("select a.c1,a.c2,b.c2 from zero_test_a a left join zero_test_b b on a.c1=b.c1 where b.c2='xxx'") } def main(args: Array[String]): Unit = { init() joinOps().show(5) destroy_sparkSession(sparkSession) } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。