当前位置:   article > 正文

SparkSQL操作Hive Table(enableHiveSupport())

enablehivesupport

Spark SQL支持对Hive的读写操作。然而因为Hive有很多依赖包,所以这些依赖包没有包含在默认的Spark包里面。如果Hive依赖的包能在classpath找到,Spark将会自动加载它们。需要注意的是,这些Hive依赖包必须复制到所有的工作节点上,因为它们为了能够访问存储在Hive的数据,会调用Hive的序列化和反序列化(SerDes)包。Hive的配置文件hive-site.xmlcore-site.xml(security配置)和hdfs-site.xml(HDFS配置)是保存在conf目录下面。
当使用Hive时,必须初始化一个支持Hive的SparkSession,用户即使没有部署一个Hive的环境仍然可以使用Hive。当没有配置hive-site.xml时,Spark会自动在当前应用目录创建metastore_db和创建由spark.sql.warehouse.dir配置的目录,如果没有配置,默认是当前应用目录下的spark-warehouse目录。
注意:从Spark 2.0.0版本开始,hive-site.xml里面的hive.metastore.warehouse.dir属性已经被spark.sql.warehouse.dir替代,用于指定warehouse的默认数据路径(必须有写权限)。

import java.io.Serializable;  
import java.util.ArrayList;  
import java.util.List;  

import org.apache.spark.api.java.function.MapFunction;  
import org.apache.spark.sql.Dataset;  
import org.apache.spark.sql.Encoders;  
import org.apache.spark.sql.Row;  
import org.apache.spark.sql.SparkSession;  

public static class Record implements Serializable {  
  private int key;  
  private String value;  

  public int getKey() {  
    return key;  
  }  

  public void setKey(int key) {  
    this.key = key;  
  }  

  public String getValue() {  
    return value;  
  }  

  public void setValue(String value) {  
    this.value = value;  
  }  
}  

// warehouseLocation points to the default location for managed databases and tables  
String warehouseLocation = "/spark-warehouse";  
// init spark session with hive support  
SparkSession spark = SparkSession  
  .builder()  
  .appName("Java Spark Hive Example")  
  .master("local[*]")  
  .config("spark.sql.warehouse.dir", warehouseLocation)  
  .enableHiveSupport()  
  .getOrCreate();  

spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)");  
spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src");  

// Queries are expressed in HiveQL  
spark.sql("SELECT * FROM src").show();  
// +---+-------+  
// |key|  value|  
// +---+-------+  
// |238|val_238|  
// | 86| val_86|  
// |311|val_311|  
// ...  
// only showing top 20 rows  

// Aggregation queries are also supported.  
spark.sql("SELECT COUNT(*) FROM src").show();  
// +--------+  
// |count(1)|  
// +--------+  
// |    500 |  
// +--------+  

// The results of SQL queries are themselves DataFrames and support all normal functions.  
Dataset<Row> sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key");  

// The items in DaraFrames are of type Row, which lets you to access each column by ordinal.  
Dataset<String> stringsDS = sqlDF.map(row -> "Key: " + row.get(0) + ", Value: " + row.get(1), Encoders.STRING());  
stringsDS.show();  
// +--------------------+  
// |               value|  
// +--------------------+  
// |Key: 0, Value: val_0|  
// |Key: 0, Value: val_0|  
// |Key: 0, Value: val_0|  
// ...  

// You can also use DataFrames to create temporary views within a SparkSession.  
List<Record> records = new ArrayList<Record>();  
for (int key = 1; key < 100; key++) {  
  Record record = new Record();  
  record.setKey(key);  
  record.setValue("val_" + key);  
  records.add(record);  
}  
Dataset<Row> recordsDF = spark.createDataFrame(records, Record.class);  
recordsDF.createOrReplaceTempView("records");  

// Queries can then join DataFrames data with data stored in Hive.  
spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show();  
// +---+------+---+------+  
// |key| value|key| value|  
// +---+------+---+------+  
// |  2| val_2|  2| val_2|  
// |  2| val_2|  2| val_2|  
// |  4| val_4|  4| val_4|  
// ...  
// only showing top 20 rows  
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99

如果使用eclipse运行上述代码的话需要添加spark-hive的jars,下面是maven的配置:

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive_2.11 -->  
<dependency>  
    <groupId>org.apache.spark</groupId>  
    <artifactId>spark-hive_2.11</artifactId>  
    <version>2.1.0</version>  
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

否则的话会遇到下面错误:

Exception in thread "main" java.lang.IllegalArgumentException: Unable to instantiate SparkSession with Hive support because Hive classes are not found.  
    at org.apache.spark.sql.SparkSession$Builder.enableHiveSupport(SparkSession.scala:815)  
    at JavaSparkHiveExample.main(JavaSparkHiveExample.java:17)  
  • 1
  • 2
  • 3

与不同版本Hive Metastore的交互

Spark SQL对Hive的支持其中一个最重要的部分是与Hive metastore的交互,使得Spark SQL可以访问Hive表的元数据。从Spark 1.4.0版本开始,Spark SQL使用下面的配置可以用于查询不同版本的Hive metastores。需要注意的是,本质上Spark SQL会使用编译后的Hive 1.2.1版本的那些类来用于内部操作(serdes、UDFs、UDAFs等等)。

这里写图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/煮酒与君饮/article/detail/869724
推荐阅读
相关标签
  

闽ICP备14008679号