当前位置:   article > 正文

Spark 用 scala 实现读取 hive 表进行读、写等操作_scala语言 spark写入hive表数据的方法 无账号密码

scala语言 spark写入hive表数据的方法 无账号密码

   spark 目前较为基础且常用的场景应该就是对 hive 表进行读写操作 ,尤其通过使用spark sql 实现数据分析、关联等操作
  常用的方式是直接采用spark on hive的方式,在创建SparkSession时开启enableHiveSupport。连接上hive之后,可以利用Spark sql对hive表进行读、写、join等操作,官方也推荐Spark sql模式,因为其支持对dataframe(Dataset的特列,DataFrame=Dataset[Row] )进行操作,很多数据分析人员习惯使用python,而python没有dataset,而且sql方式对数据进行批处理方式更为直观。

1.环境准备

   这里就不进行赘述,简单说明下:

  • 1)将hive-site.xml拷贝到项目代码的resources目录下
  • 2)在hive库中新建测试表;提前准好测试数据,通过load方式将文本数据加载到hive表
    加载服务器数据文件sql语句,具体看下面sql
    【补充】文件路径有:服务器本地文件、hdfs文件;加载方式:overwrite into(覆盖写)、into(追加写)
-- 加载服务器本地文件,以覆盖写的方式
load data local inpath '/home/test_a.txt' overwrite into table zero_test_a partition(partition_name='20220518');
-- 加载hdfs文件,以追加写的方式
load data inpath '/user/hdfs/test_b.txt' into table zero_test_a partition(partition_name='20220518');
  • 1
  • 2
  • 3
  • 4

2.实操代码

2.1 spark连接hive

连接hive时候注意开启 enableHiveSupport() 配置,另外本地测试需要加上 master(“local[*]”) 或者 master(“local[]”)

 val spark = SparkSession
      .builder()
      //本地idea测试时候需要加上
      .master("local[*]")
      .appName("spark_opts_hive")
      // 开启Hive支持
      .enableHiveSupport()
      //配置metastore.uris
      .config("hive.metastore.uris","thrift://xxx.xxx.xxx.xxx:9083")
      // 配置Hivewarehouse地址
      .config("hive.metastore.warehouse.dir","/user/hive/warehouse")
      // 如果设置了账号和密码的需要配置
      .config("username","zero")
      .config("password","zero")
      .getOrCreate()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

2.2 操作hive表

完成连接之后,可以通过spark.sql()方式实现读取、关联等操作,具体请看代码示例。

package com.zero.scala.sparkSql

import org.apache.spark.sql.{DataFrame, SparkSession}

import scala.collection.mutable.ArrayBuffer

/**
 * sparksql 对hive表进行join
 * 使用sparksession
 */
object SparkHiveOps {
  var sparkSession:SparkSession = _
  private def create_sparksession():SparkSession = {
    val hiveMetaUris = "thrift://xxx.xxx.xxx.xxx:9083"
    SparkSession.builder().master("local[*]")
      .appName("createSparkSession")
      .enableHiveSupport()
      .config("hive.metastore.warehouse.dir","/user/hive/warehouse")
      .config("hive.metastore.uris","hiveMetaUris")
      .getOrCreate()
  }
  private def init() :Unit = {
    sparkSession = create_sparksession()
    sparkSession.sql("use zeroDb")
  }
  private  def destroy_sparkSession(ss:SparkSession) : Unit ={
    if(ss != null) ss.close()
  }
  /**
   * join
   */
  private def joinOps():DataFrame = {
    sparkSession.sql("select a.c1,a.c2,b.c2 from zero_test_a a left join zero_test_b b on a.c1=b.c1 where b.c2='xxx'")
  }

  def main(args: Array[String]): Unit = {
    init()
    joinOps().show(5)
    destroy_sparkSession(sparkSession)
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号