当前位置:   article > 正文

看完这篇文章,你就能熟练运用SparkSql_熟练使用sql进行数据提取和分析,熟悉hivesql、sparksql等开发和调优

熟练使用sql进行数据提取和分析,熟悉hivesql、sparksql等开发和调优

花了几天休息的时间整理了这篇文章,就为了让你读完就能深入了解并熟练运用Spark SQL!如果你觉得有用的话请收藏加关注,你的转发和点赞是我最大的动力!原创不易,转载请注明出处!

 

本文基于Spark官方网站(spark.apache.org),加上自己的理解和实验编写。文中Demo基于Spark2.4。

1、概述

Spark SQL是一个用于结构化数据处理的Spark模块。与基本的Spark RDD API不同,Spark SQL提供的接口为Spark提供了关于数据结构和正在执行的计算的更多信息。在内部,Spark SQL使用这些额外的信息来执行额外的优化。有几种与Spark SQL交互的方法,包括SQL和数据集API。在计算结果时,使用相同的执行引擎,而不依赖于用于表示计算的API/语言。这种统一意味着开发人员可以很容易地在不同的api之间来回切换,这些api提供了最自然的方式来表达给定的转换。

1.1、SQL

Spark SQL的一个用途是执行SQL查询。Spark SQL还可以用于从现有Hive中读取数据。在另一种编程语言中运行SQL时,结果将作为数据集/DataFrame返回。您还可以使用命令行或JDBC/ODBC与SQL接口交互。

1.2、Datasets and DataFrames

Dataset是数据的分布式集合。Dataset是Spark 1.6中添加的新接口,它提供了RDDs(强类型、使用强大lambda函数的能力)和Spark SQL优化执行引擎的优点。可以从JVM对象构造数据集,然后使用功能 transformations (map, flatMap, filter, etc.).进行操作。 Dataset API在Scala和Java中可用。Python不支持 Dataset API。但是由于Python的动态特性, Dataset API的许多优点已经可用(例如,您可以按名称自然地访问行。columnname的字段)。R的情况类似。 DataFrame是组织到命名列中的Dataset 。它在概念上等价于关系数据库中的表或R/Python中的数据框架,但是在底层有更丰富的优化。DataFrames可以从许多数据源构建,例如:结构化数据文件、Hive中的表、外部数据库或现有的rdd。DataFrame API在Scala、Java、Python和r中都可以使用。在Scala和Java中,DataFrame 由行Dataset表示。在Scala API中,DataFrame只是Dataset[Row]的类型别名。而在Java API中,用户需要使用Dataset来表示一个DataFrame。 在整个文档中,我们经常将Scala/Java Datasets of Rows 称为DataFrames。

2、开始入门

2.1、使用SparkSession开始着手

 
  1. SparkSession spark = SparkSession

  2.  .builder()

  3.  .appName("Java Spark SQL basic example")

  4.  .config("spark.some.config.option", "some-value")

  5.  .getOrCreate();

  6.  

  7. Dataset<Row> df = spark.read().json("examples/src/main/resources/people.json");

  8.  

  9. // Displays the content of the DataFrame to stdout

  10. df.show();

  11. // +----+-------+

  12. // | age|   name|

  13. // +----+-------+

  14. // |null|Michael|

  15. // |  30|   Andy|

  16. // |  19| Justin|

  17. // +----+-------+

在上例中,使用sparksession.read.json方法读取一个java文件,并返回一个dataset对象。通过调用dataset的方法可以进行数据的操作。

2.2、非类型化DataSet操作(即DataFrame操作)

DataFrames为Scala、Java、Python和R中的结构化数据操作提供了一种特定于领域的语言。 如上所述,在Spark 2.0中,DataFrames只是Scala和Java API中的行DataSet。与强类型Scala/Java数据集提供的“类型化转换”相比,这些操作也称为“非类型化转换”。 以下是一些使用数据集进行结构化数据处理的基本例子:

 
  1. // col("...") is preferable to df.col("...")

  2. import static org.apache.spark.sql.functions.col;

  3.  

  4. // Print the schema in a tree format

  5. df.printSchema();

  6. // root

  7. // |-- age: long (nullable = true)

  8. // |-- name: string (nullable = true)

  9.  

  10. // Select only the "name" column

  11. df.select("name").show();

  12. // +-------+

  13. // |   name|

  14. // +-------+

  15. // |Michael|

  16. // |   Andy|

  17. // | Justin|

  18. // +-------+

  19.  

  20. // Select everybody, but increment the age by 1

  21. df.select(col("name"), col("age").plus(1)).show();

  22. // +-------+---------+

  23. // |   name|(age + 1)|

  24. // +-------+---------+

  25. // |Michael|     null|

  26. // |   Andy|       31|

  27. // | Justin|       20|

  28. // +-------+---------+

  29.  

  30. // Select people older than 21

  31. df.filter(col("age").gt(21)).show();

  32. // +---+----+

  33. // |age|name|

  34. // +---+----+

  35. // | 30|Andy|

  36. // +---+----+

  37.  

  38. // Count people by age

  39. df.groupBy("age").count().show();

  40. // +----+-----+

  41. // | age|count|

  42. // +----+-----+

  43. // |  19|    1|

  44. // |null|    1|

  45. // |  30|    1|

  46. // +----+-----+

除了简单的列引用和表达式外,数据集还具有丰富的函数库,包括字符串操作、日期算法、常见的数学操作等等。完整的列表可以在DataFrame函数引用中获得。(http://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/functions.html)

2.3、以编程方式运行SQL查询

SparkSession上的sql函数允许应用程序以编程方式运行sql查询,并以Dataset的形式返回结果。

 
  1. import org.apache.spark.sql.Dataset;

  2. import org.apache.spark.sql.Row;

  3.  

  4. // Register the DataFrame as a SQL temporary view

  5. df.createOrReplaceTempView("people");

  6.  

  7. Dataset<Row> sqlDF = spark.sql("SELECT * FROM people");

  8. sqlDF.show();

  9. // +----+-------+

  10. // | age|   name|

  11. // +----+-------+

  12. // |null|Michael|

  13. // |  30|   Andy|

  14. // |  19| Justin|

  15. // +----+-------+

2.4、全局临时视图

Spark SQL中的临时视图是会话范围的,如果创建它的会话终止,它将消失。如果您希望所有会话之间共享一个临时视图,并在Spark应用程序终止之前保持活动状态,那么您可以创建一个全局临时视图。全局临时视图被绑定到系统保存的数据库globaltemp,我们必须使用限定名来引用它,例如,SELECT * FROM globaltemp.view1.

 
  1. // Register the DataFrame as a global temporary view

  2. df.createGlobalTempView("people");

  3.  

  4. // Global temporary view is tied to a system preserved database `global_temp`

  5. spark.sql("SELECT * FROM global_temp.people").show();

  6. // +----+-------+

  7. // | age|   name|

  8. // +----+-------+

  9. // |null|Michael|

  10. // |  30|   Andy|

  11. // |  19| Justin|

  12. // +----+-------+

  13.  

  14. // Global temporary view is cross-session

  15. spark.newSession().sql("SELECT * FROM global_temp.people").show();

  16. // +----+-------+

  17. // | age|   name|

  18. // +----+-------+

  19. // |null|Michael|

  20. // |  30|   Andy|

  21. // |  19| Justin|

  22. // +----+-------+

2.5、创建Datasets

Dataset类似于RDDs,但是,它们不使用Java序列化或Kryo,而是使用专门的编码器序列化对象,以便通过网络进行处理或传输。虽然编码器和标准序列化都负责将对象转换成字节,但编码器是动态生成的代码,使用的格式允许Spark执行许多操作,如过滤、排序和哈希,而无需将字节反序列化回对象。

 
  1. import java.util.Arrays;

  2. import java.util.Collections;

  3. import java.io.Serializable;

  4.  

  5. import org.apache.spark.api.java.function.MapFunction;

  6. import org.apache.spark.sql.Dataset;

  7. import org.apache.spark.sql.Row;

  8. import org.apache.spark.sql.Encoder;

  9. import org.apache.spark.sql.Encoders;

  10.  

  11. public static class Person implements Serializable {

  12.  private String name;

  13.  private int age;

  14.  

  15.  public String getName() {

  16.    return name;

  17.  }

  18.  

  19.  public void setName(String name) {

  20.    this.name = name;

  21.  }

  22.  

  23.  public int getAge() {

  24.    return age;

  25.  }

  26.  

  27.  public void setAge(int age) {

  28.    this.age = age;

  29.  }

  30. }

  31.  

  32. // Create an instance of a Bean class

  33. Person person = new Person();

  34. person.setName("Andy");

  35. person.setAge(32);

  36.  

  37. // Encoders are created for Java beans

  38. Encoder<Person> personEncoder = Encoders.bean(Person.class);

  39. Dataset<Person> javaBeanDS = spark.createDataset(

  40.  Collections.singletonList(person),

  41.  personEncoder

  42. );

  43. javaBeanDS.show();

  44. // +---+----+

  45. // |age|name|

  46. // +---+----+

  47. // | 32|Andy|

  48. // +---+----+

  49.  

  50. // Encoders for most common types are provided in class Encoders

  51. Encoder<Integer> integerEncoder = Encoders.INT();

  52. Dataset<Integer> primitiveDS = spark.createDataset(Arrays.asList(1, 2, 3), integerEncoder);

  53. Dataset<Integer> transformedDS = primitiveDS.map(

  54.    (MapFunction<Integer, Integer>) value -> value + 1,

  55.    integerEncoder);

  56. transformedDS.collect(); // Returns [2, 3, 4]

  57.  

  58. // DataFrames can be converted to a Dataset by providing a class. Mapping based on name

  59. String path = "examples/src/main/resources/people.json";

  60. Dataset<Person> peopleDS = spark.read().json(path).as(personEncoder);

  61. peopleDS.show();

  62. // +----+-------+

  63. // | age|   name|

  64. // +----+-------+

  65. // |null|Michael|

  66. // |  30|   Andy|

  67. // |  19| Justin|

  68. // +----+-------+

2.6、进行RDD操作

Spark SQL支持两种不同的方法来将现有的RDDs转换为数据集。第一种方法使用反射来推断包含特定类型对象的RDD的模式。这种基于反射的方法可以生成更简洁的代码,并且当您在编写Spark应用程序时已经知道模式时,这种方法可以很好地工作。 创建数据集的第二种方法是通过编程接口,该接口允许您构造模式,然后将其应用于现有的RDD。虽然这个方法比较冗长,但它允许您在运行时才知道列及其类型时构造数据集。

  • 使用反射推断模式

 
  1. import org.apache.spark.api.java.JavaRDD;

  2. import org.apache.spark.api.java.function.Function;

  3. import org.apache.spark.api.java.function.MapFunction;

  4. import org.apache.spark.sql.Dataset;

  5. import org.apache.spark.sql.Row;

  6. import org.apache.spark.sql.Encoder;

  7. import org.apache.spark.sql.Encoders;

  8.  

  9. // Create an RDD of Person objects from a text file

  10. JavaRDD<Person> peopleRDD = spark.read()

  11.  .textFile("examples/src/main/resources/people.txt")

  12.  .javaRDD()

  13.  .map(line -> {

  14.    String[] parts = line.split(",");

  15.    Person person = new Person();

  16.    person.setName(parts[0]);

  17.    person.setAge(Integer.parseInt(parts[1].trim()));

  18.    return person;

  19.  });

  20.  

  21. // Apply a schema to an RDD of JavaBeans to get a DataFrame

  22. Dataset<Row> peopleDF = spark.createDataFrame(peopleRDD, Person.class);

  23. // Register the DataFrame as a temporary view

  24. peopleDF.createOrReplaceTempView("people");

  25.  

  26. // SQL statements can be run by using the sql methods provided by spark

  27. Dataset<Row> teenagersDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19");

  28.  

  29. // The columns of a row in the result can be accessed by field index

  30. Encoder<String> stringEncoder = Encoders.STRING();

  31. Dataset<String> teenagerNamesByIndexDF = teenagersDF.map(

  32.    (MapFunction<Row, String>) row -> "Name: " + row.getString(0),

  33.    stringEncoder);

  34. teenagerNamesByIndexDF.show();

  35. // +------------+

  36. // |       value|

  37. // +------------+

  38. // |Name: Justin|

  39. // +------------+

  40.  

  41. // or by field name

  42. Dataset<String> teenagerNamesByFieldDF = teenagersDF.map(

  43.    (MapFunction<Row, String>) row -> "Name: " + row.<String>getAs("name"),

  44.    stringEncoder);

  45. teenagerNamesByFieldDF.show();

  46. // +------------+

  47. // |       value|

  48. // +------------+

  49. // |Name: Justin|

  50. // +------------+

  • 以编程方式指定模式 当不能预先定义JavaBean类时(例如,记录的结构编码在字符串中,或者解析文本数据集,并且针对不同的用户以不同的方式投影字段),可以通过三个步骤以编程方式创建数据集。 1.从原始RDD中创建行RDD; 2.创建由一个StructType表示的模式,该结构与步骤1中创建的RDD中的行结构匹配。 3.通过SparkSession提供的createDataFrame方法将模式应用到行RDD。

 
  1. import java.util.ArrayList;

  2. import java.util.List;

  3.  

  4. import org.apache.spark.api.java.JavaRDD;

  5. import org.apache.spark.api.java.function.Function;

  6.  

  7. import org.apache.spark.sql.Dataset;

  8. import org.apache.spark.sql.Row;

  9.  

  10. import org.apache.spark.sql.types.DataTypes;

  11. import org.apache.spark.sql.types.StructField;

  12. import org.apache.spark.sql.types.StructType;

  13.  

  14. // Create an RDD

  15. JavaRDD<String> peopleRDD = spark.sparkContext()

  16.  .textFile("examples/src/main/resources/people.txt", 1)

  17.  .toJavaRDD();

  18.  

  19. // The schema is encoded in a string

  20. String schemaString = "name age";

  21.  

  22. // Generate the schema based on the string of schema

  23. List<StructField> fields = new ArrayList<>();

  24. for (String fieldName : schemaString.split(" ")) {

  25.  StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);

  26.  fields.add(field);

  27. }

  28. StructType schema = DataTypes.createStructType(fields);

  29.  

  30. // Convert records of the RDD (people) to Rows

  31. JavaRDD<Row> rowRDD = peopleRDD.map((Function<String, Row>) record -> {

  32.  String[] attributes = record.split(",");

  33.  return RowFactory.create(attributes[0], attributes[1].trim());

  34. });

  35.  

  36. // Apply the schema to the RDD

  37. Dataset<Row> peopleDataFrame = spark.createDataFrame(rowRDD, schema);

  38.  

  39. // Creates a temporary view using the DataFrame

  40. peopleDataFrame.createOrReplaceTempView("people");

  41.  

  42. // SQL can be run over a temporary view created using DataFrames

  43. Dataset<Row> results = spark.sql("SELECT name FROM people");

  44.  

  45. // The results of SQL queries are DataFrames and support all the normal RDD operations

  46. // The columns of a row in the result can be accessed by field index or by field name

  47. Dataset<String> namesDS = results.map(

  48.    (MapFunction<Row, String>) row -> "Name: " + row.getString(0),

  49.    Encoders.STRING());

  50. namesDS.show();

  51. // +-------------+

  52. // |        value|

  53. // +-------------+

  54. // |Name: Michael|

  55. // |   Name: Andy|

  56. // | Name: Justin|

  57. // +-------------+

2.7、聚集操作

内置的DataFrames函数提供常见的聚合,如count()、countDistinct()、avg()、max()、min()等。此外,用户不受预定义聚合函数的限制,可以创建自己的聚合函数。

  • 非类型化用户定义的聚合函数 用户必须扩展UserDefinedAggregateFunction抽象类来实现自定义非类型化聚合函数。例如,用户定义的平均值可以如下所示:

 
  1. import java.util.ArrayList;

  2. import java.util.List;

  3.  

  4. import org.apache.spark.sql.Dataset;

  5. import org.apache.spark.sql.Row;

  6. import org.apache.spark.sql.SparkSession;

  7. import org.apache.spark.sql.expressions.MutableAggregationBuffer;

  8. import org.apache.spark.sql.expressions.UserDefinedAggregateFunction;

  9. import org.apache.spark.sql.types.DataType;

  10. import org.apache.spark.sql.types.DataTypes;

  11. import org.apache.spark.sql.types.StructField;

  12. import org.apache.spark.sql.types.StructType;

  13.  

  14. public static class MyAverage extends UserDefinedAggregateFunction {

  15.  

  16.  private StructType inputSchema;

  17.  private StructType bufferSchema;

  18.  

  19.  public MyAverage() {

  20.    List<StructField> inputFields = new ArrayList<>();

  21.    inputFields.add(DataTypes.createStructField("inputColumn", DataTypes.LongType, true));

  22.    inputSchema = DataTypes.createStructType(inputFields);

  23.  

  24.    List<StructField> bufferFields = new ArrayList<>();

  25.    bufferFields.add(DataTypes.createStructField("sum", DataTypes.LongType, true));

  26.    bufferFields.add(DataTypes.createStructField("count", DataTypes.LongType, true));

  27.    bufferSchema = DataTypes.createStructType(bufferFields);

  28.  }

  29.  // Data types of input arguments of this aggregate function

  30.  public StructType inputSchema() {

  31.    return inputSchema;

  32.  }

  33.  // Data types of values in the aggregation buffer

  34.  public StructType bufferSchema() {

  35.    return bufferSchema;

  36.  }

  37.  // The data type of the returned value

  38.  public DataType dataType() {

  39.    return DataTypes.DoubleType;

  40.  }

  41.  // Whether this function always returns the same output on the identical input

  42.  public boolean deterministic() {

  43.    return true;

  44.  }

  45.  // Initializes the given aggregation buffer. The buffer itself is a `Row` that in addition to

  46.  // standard methods like retrieving a value at an index (e.g., get(), getBoolean()), provides

  47.  // the opportunity to update its values. Note that arrays and maps inside the buffer are still

  48.  // immutable.

  49.  public void initialize(MutableAggregationBuffer buffer) {

  50.    buffer.update(0, 0L);

  51.    buffer.update(1, 0L);

  52.  }

  53.  // Updates the given aggregation buffer `buffer` with new input data from `input`

  54.  public void update(MutableAggregationBuffer buffer, Row input) {

  55.    if (!input.isNullAt(0)) {

  56.      long updatedSum = buffer.getLong(0) + input.getLong(0);

  57.      long updatedCount = buffer.getLong(1) + 1;

  58.      buffer.update(0, updatedSum);

  59.      buffer.update(1, updatedCount);

  60.    }

  61.  }

  62.  // Merges two aggregation buffers and stores the updated buffer values back to `buffer1`

  63.  public void merge(MutableAggregationBuffer buffer1, Row buffer2) {

  64.    long mergedSum = buffer1.getLong(0) + buffer2.getLong(0);

  65.    long mergedCount = buffer1.getLong(1) + buffer2.getLong(1);

  66.    buffer1.update(0, mergedSum);

  67.    buffer1.update(1, mergedCount);

  68.  }

  69.  // Calculates the final result

  70.  public Double evaluate(Row buffer) {

  71.    return ((double) buffer.getLong(0)) / buffer.getLong(1);

  72.  }

  73. }

  74.  

  75. // Register the function to access it

  76. spark.udf().register("myAverage", new MyAverage());

  77.  

  78. Dataset<Row> df = spark.read().json("examples/src/main/resources/employees.json");

  79. df.createOrReplaceTempView("employees");

  80. df.show();

  81. // +-------+------+

  82. // |   name|salary|

  83. // +-------+------+

  84. // |Michael|  3000|

  85. // |   Andy|  4500|

  86. // | Justin|  3500|

  87. // |  Berta|  4000|

  88. // +-------+------+

  89.  

  90. Dataset<Row> result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees");

  91. result.show();

  92. // +--------------+

  93. // |average_salary|

  94. // +--------------+

  95. // |        3750.0|

  96. // +--------------+

  • 类型安全的用户定义聚合函数 强类型数据集的用户定义聚合围绕聚合器抽象类。例如,类型安全的用户定义平均值可以如下所示:

 
  1. import java.io.Serializable;

  2.  

  3. import org.apache.spark.sql.Dataset;

  4. import org.apache.spark.sql.Encoder;

  5. import org.apache.spark.sql.Encoders;

  6. import org.apache.spark.sql.SparkSession;

  7. import org.apache.spark.sql.TypedColumn;

  8. import org.apache.spark.sql.expressions.Aggregator;

  9.  

  10. public static class Employee implements Serializable {

  11.  private String name;

  12.  private long salary;

  13.  

  14.  // Constructors, getters, setters...

  15.  

  16. }

  17.  

  18. public static class Average implements Serializable  {

  19.  private long sum;

  20.  private long count;

  21.  

  22.  // Constructors, getters, setters...

  23.  

  24. }

  25.  

  26. public static class MyAverage extends Aggregator<Employee, Average, Double> {

  27.  // A zero value for this aggregation. Should satisfy the property that any b + zero = b

  28.  public Average zero() {

  29.    return new Average(0L, 0L);

  30.  }

  31.  // Combine two values to produce a new value. For performance, the function may modify `buffer`

  32.  // and return it instead of constructing a new object

  33.  public Average reduce(Average buffer, Employee employee) {

  34.    long newSum = buffer.getSum() + employee.getSalary();

  35.    long newCount = buffer.getCount() + 1;

  36.    buffer.setSum(newSum);

  37.    buffer.setCount(newCount);

  38.    return buffer;

  39.  }

  40.  // Merge two intermediate values

  41.  public Average merge(Average b1, Average b2) {

  42.    long mergedSum = b1.getSum() + b2.getSum();

  43.    long mergedCount = b1.getCount() + b2.getCount();

  44.    b1.setSum(mergedSum);

  45.    b1.setCount(mergedCount);

  46.    return b1;

  47.  }

  48.  // Transform the output of the reduction

  49.  public Double finish(Average reduction) {

  50.    return ((double) reduction.getSum()) / reduction.getCount();

  51.  }

  52.  // Specifies the Encoder for the intermediate value type

  53.  public Encoder<Average> bufferEncoder() {

  54.    return Encoders.bean(Average.class);

  55.  }

  56.  // Specifies the Encoder for the final output value type

  57.  public Encoder<Double> outputEncoder() {

  58.    return Encoders.DOUBLE();

  59.  }

  60. }

  61.  

  62. Encoder<Employee> employeeEncoder = Encoders.bean(Employee.class);

  63. String path = "examples/src/main/resources/employees.json";

  64. Dataset<Employee> ds = spark.read().json(path).as(employeeEncoder);

  65. ds.show();

  66. // +-------+------+

  67. // |   name|salary|

  68. // +-------+------+

  69. // |Michael|  3000|

  70. // |   Andy|  4500|

  71. // | Justin|  3500|

  72. // |  Berta|  4000|

  73. // +-------+------+

  74.  

  75. MyAverage myAverage = new MyAverage();

  76. // Convert the function to a `TypedColumn` and give it a name

  77. TypedColumn<Employee, Double> averageSalary = myAverage.toColumn().name("average_salary");

  78. Dataset<Double> result = ds.select(averageSalary);

  79. result.show();

  80. // +--------------+

  81. // |average_salary|

  82. // +--------------+

  83. // |        3750.0|

  84. // +--------------+

3、数据源

Spark SQL支持通过DataFrame接口对各种数据源进行操作。DataFrame可以使用关系转换操作,也可以用来创建临时视图。将DataFrame注册为临时视图允许对其数据运行SQL查询。本节描述使用Spark数据源加载和保存数据的一般方法,然后讨论内置数据源可用的特定选项。

3.1、通用的加载/保存功能

在最简单的形式中,默认数据源为parquet(除非将spark.sql.sources.default手动指定选项)将用于所有操作。

 
  1. Dataset<Row> usersDF = spark.read().load("examples/src/main/resources/users.parquet");

  2. usersDF.select("name", "favorite_color").write().save("namesAndFavColors.parquet");

  • 手动指定选项 您还可以手动指定要使用的数据源以及希望传递给数据源的任何额外选项。数据源由其完全限定的名称(即但是对于内置的源代码,您也可以使用它们的短名称(json、parquet、jdbc、orc、libsvm、csv、text)。从任何数据源类型加载的DataFrames都可以使用此语法转换为其他类型。

要加载JSON文件,可以使用:

 
  1. Dataset<Row> peopleDF =

  2.  spark.read().format("json").load("examples/src/main/resources/people.json");

  3. peopleDF.select("name", "age").write().format("parquet").save("namesAndAges.parquet");

要加载CSV文件,你可以使用:

 
  1. Dataset<Row> peopleDFCsv = spark.read().format("csv")

  2.  .option("sep", ";")

  3.  .option("inferSchema", "true")

  4.  .option("header", "true")

  5.  .load("examples/src/main/resources/people.csv");

额外的选项也在写操作期间使用。例如,可以控制ORC数据源的Bloom过滤器和字典编码。下面的ORC示例将在 favoritecolor上创建Bloom过滤器,并对name and favoritecolor使用字典编码。对于parquet,也存在parquet.enable.dictionary选项。要了解更多关于额外ORC/Parquet选项的详细信息,请访问官方的Apache ORC/Parquet网站。

 
  1. usersDF.write.format("orc")

  2.  .option("orc.bloom.filter.columns", "favorite_color")

  3.  .option("orc.dictionary.key.threshold", "1.0")

  4.  .save("users_with_options.orc")

  • 直接对文件运行SQL 您也可以使用SQL直接查询文件,而不是使用read-api将文件加载到数据框中并进行查询。

 
  1. Dataset<Row> sqlDF =

  2.  spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`");

  • 保存模式 保存操作可以选择采用保存模式,该模式指定如何处理现有数据(如果存在)。重要的是要认识到这些保存模式不使用任何锁定,也不是原子的。此外,在执行覆盖时,数据将在写入新数据之前被删除。 

  • 保存到持久表 也可以使用saveastable命令将数据帧作为持久表保存到hive元存储中。请注意,使用此功能不需要现有的配置单元部署。Spark将为您创建一个默认的本地Hive元存储(使用Derby)。与creatorreplacetempview命令不同,saveastable将具体化数据帧的内容,并创建指向配置单元元存储中数据的指针。即使在spark程序重新启动之后,只要您保持与同一个元存储的连接,持久表仍然存在。通过在具有表名称的SparkSession上调用Table方法,可以创建持久表的数据帧。

对于基于文件的数据源,例如文本、拼花、JSON等,可以通过路径选项指定自定义表路径,例如df.write.option(“path”,“/some/path”).saveastable(“t”)。删除表时,自定义表路径将不会被删除,表数据仍然存在。如果没有指定自定义表路径,spark会将数据写入仓库目录下的默认表路径。删除表时,默认的表路径也将被删除。

从spark 2.1开始,持久数据源表在hive元存储中存储了每个分区的元数据。这带来了几个好处:

由于元存储只能返回查询所需的分区,因此不再需要查找表的第一个查询上的所有分区。 配置单元DDL,如改变表分区…集合位置现在可用于使用数据源API创建的表。 请注意,在创建外部数据源表(具有路径选项的表)时,默认情况下不会收集分区信息。要同步元存储中的分区信息,可以调用MSCK REPAIR TABLE.

  • 包装、分类和分区 对于基于文件的数据源,也可以对输出进行存储、排序或分区。存储桶和排序仅适用于持久表:

 
  1. peopleDF.write().bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed");

而在使用数据集API时,分区可以与save和saveastable一起使用。

 
  1. usersDF

  2.  .write()

  3.  .partitionBy("favorite_color")

  4.  .format("parquet")

  5.  .save("namesPartByColor.parquet");

对于单个表,可以同时使用分区和bucketing:

 
  1. peopleDF

  2.  .write()

  3.  .partitionBy("favorite_color")

  4.  .bucketBy(42, "name")

  5.  .saveAsTable("people_partitioned_bucketed");

PartitionBy创建一个目录结构,如分区发现部分所述。因此,它对于具有高基数的列的适用性有限。与此相反,bucketby将数据分布在固定数量的存储桶中,并且可以在多个唯一值未绑定时使用。

3.2、Parquet文件

parquet是许多其他数据处理系统支持的柱状格式。Spark SQL支持读取和写入自动保留原始数据模式的拼花文件。写入拼花文件时,由于兼容性原因,所有列都自动转换为可以为空。

  • 以编程方式加载数据 使用上述示例中的数据:

 
  1. import org.apache.spark.api.java.function.MapFunction;

  2. import org.apache.spark.sql.Encoders;

  3. import org.apache.spark.sql.Dataset;

  4. import org.apache.spark.sql.Row;

  5.  

  6. Dataset<Row> peopleDF = spark.read().json("examples/src/main/resources/people.json");

  7.  

  8. // DataFrames can be saved as Parquet files, maintaining the schema information

  9. peopleDF.write().parquet("people.parquet");

  10.  

  11. // Read in the Parquet file created above.

  12. // Parquet files are self-describing so the schema is preserved

  13. // The result of loading a parquet file is also a DataFrame

  14. Dataset<Row> parquetFileDF = spark.read().parquet("people.parquet");

  15.  

  16. // Parquet files can also be used to create a temporary view and then used in SQL statements

  17. parquetFileDF.createOrReplaceTempView("parquetFile");

  18. Dataset<Row> namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19");

  19. Dataset<String> namesDS = namesDF.map(

  20.    (MapFunction<Row, String>) row -> "Name: " + row.getString(0),

  21.    Encoders.STRING());

  22. namesDS.show();

  23. // +------------+

  24. // |       value|

  25. // +------------+

  26. // |Name: Justin|

  27. // +------------+

  • 分区发现 表分区是类似hive的系统中常用的优化方法。在分区表中,数据通常存储在不同的目录中,分区列值编码在每个分区目录的路径中。所有内置文件源(包括text/csv/json/orc/parquet)都能够自动发现和推断分区信息。例如,我们可以使用以下目录结构将以前使用过的所有人口数据存储到一个分区表中,并使用两个额外的列(gender和country)作为分区列:

 
  1. path

  2. └── to

  3.    └── table

  4.        ├── gender=male

  5.        │   ├── ...

  6.        │   │

  7.        │   ├── country=US

  8.        │   │   └── data.parquet

  9.        │   ├── country=CN

  10.        │   │   └── data.parquet

  11.        │   └── ...

  12.        └── gender=female

  13.            ├── ...

  14.            │

  15.            ├── country=US

  16.            │   └── data.parquet

  17.            ├── country=CN

  18.            │   └── data.parquet

  19.            └── ...

通过将path/to/table传递给sparksession.read.parquet或sparksession.read.load,spark sql将自动从路径中提取分区信息。现在,返回的数据帧的模式变为:

 
  1. root

  2. |-- name: string (nullable = true)

  3. |-- age: long (nullable = true)

  4. |-- gender: string (nullable = true)

  5. |-- country: string (nullable = true)

请注意,分区列的数据类型是自动推断的。目前,支持数字数据类型、日期、时间戳和字符串类型。有时用户可能不想自动推断分区列的数据类型。对于这些用例,可以通过spark.sql.sources.partitionColumnTypeInterrusion.enabled配置自动类型推断,默认值为true。当类型推断被禁用时,字符串类型将用于分区列。

从spark 1.6.0开始,默认情况下,分区发现仅查找给定路径下的分区。对于上面的示例,如果用户将path/to/table/gender=male传递给sparksession.read.parquet或sparksession.read.load,则gender不会被视为分区列。如果用户需要指定分区发现应该使用的基本路径,则可以在数据源选项中设置基本路径。例如,当path/to/table/gender=male是数据的路径,并且用户将basepath设置为path/to/table/时,gender将是分区列。

  • 模式合并 与Protocol Buffer, Avro, and Thrift一样,parquet也支持模式演化。用户可以从一个简单的模式开始,并根据需要逐步向该模式添加更多列。这样,用户最终可能会得到多个具有不同但相互兼容模式的parquet文件。parquet数据源现在能够自动检测到这种情况并合并所有这些文件的模式。

由于模式合并是一个相对昂贵的操作,并且在大多数情况下不是必需的,所以我们默认从1.5.0开始将其关闭。您可以通过

在读取拼花文件时(如下面的示例所示),将数据源选项mergeschema设置为true,或者 将全局SQL选项spark.sql.parquet.mergeschema设置为true。

 
  1. import java.io.Serializable;

  2. import java.util.ArrayList;

  3. import java.util.Arrays;

  4. import java.util.List;

  5.  

  6. import org.apache.spark.sql.Dataset;

  7. import org.apache.spark.sql.Row;

  8.  

  9. public static class Square implements Serializable {

  10.  private int value;

  11.  private int square;

  12.  

  13.  // Getters and setters...

  14.  

  15. }

  16.  

  17. public static class Cube implements Serializable {

  18.  private int value;

  19.  private int cube;

  20.  

  21.  // Getters and setters...

  22.  

  23. }

  24.  

  25. List<Square> squares = new ArrayList<>();

  26. for (int value = 1; value <= 5; value++) {

  27.  Square square = new Square();

  28.  square.setValue(value);

  29.  square.setSquare(value * value);

  30.  squares.add(square);

  31. }

  32.  

  33. // Create a simple DataFrame, store into a partition directory

  34. Dataset<Row> squaresDF = spark.createDataFrame(squares, Square.class);

  35. squaresDF.write().parquet("data/test_table/key=1");

  36.  

  37. List<Cube> cubes = new ArrayList<>();

  38. for (int value = 6; value <= 10; value++) {

  39.  Cube cube = new Cube();

  40.  cube.setValue(value);

  41.  cube.setCube(value * value * value);

  42.  cubes.add(cube);

  43. }

  44.  

  45. // Create another DataFrame in a new partition directory,

  46. // adding a new column and dropping an existing column

  47. Dataset<Row> cubesDF = spark.createDataFrame(cubes, Cube.class);

  48. cubesDF.write().parquet("data/test_table/key=2");

  49.  

  50. // Read the partitioned table

  51. Dataset<Row> mergedDF = spark.read().option("mergeSchema", true).parquet("data/test_table");

  52. mergedDF.printSchema();

  53.  

  54. // The final schema consists of all 3 columns in the Parquet files together

  55. // with the partitioning column appeared in the partition directory paths

  56. // root

  57. //  |-- value: int (nullable = true)

  58. //  |-- square: int (nullable = true)

  59. //  |-- cube: int (nullable = true)

  60. //  |-- key: int (nullable = true)

  • Hive/Parquet Schema Reconciliation 从表模式处理的角度来看,hive和parquet有两个关键区别。 1.Hive不区分大小写,而parquet不区分大小写 2.hive认为所有的列都可以为空,而拼花地板中的空性是很重要的。

因此,在将Hive元存储parquet表转换为Spark SQL parquet表时,必须将Hive元存储模式与parquet模式协调一致。调节规则为:

  1. 两个架构中具有相同名称的字段必须具有相同的数据类型,而不管是否可以为空。协调字段应具有parquet侧的数据类型,以便考虑为空性。

  2. 已协调的架构正好包含在配置单元元存储架构中定义的字段。

    • 只出现在parquet图模式中的任何字段都将在已协调的模式中删除。

    • 只出现在配置单元元存储架构中的任何字段都将作为可空字段添加到已协调的架构中。

  • Metadata Refreshing Spark SQL缓存parquet元数据以获得更好的性能。当启用hive metastore parquet表转换时,也会缓存这些转换表的元数据。如果这些表是由配置单元或其他外部工具更新的,则需要手动刷新它们以确保元数据的一致性。

 
  1. // spark is an existing SparkSession

  2. spark.catalog().refreshTable("my_table");

  • Configuration parquet的配置可以使用sparksession上的setconf方法完成,也可以使用sql运行set key=value命令。 

3.3 ORC文件

自Spark 2.3以来,Spark支持一种针对ORC文件的带有新ORC文件格式的矢量化ORC读卡器。为此,新添加了以下配置。当spark.sql.orc.impl设置为native并且spark.sql.orc.enableVectorizedReader设置为true时,矢量化读卡器用于本机ORC表(例如,使用ORC子句创建的表)。对于hive orc serde表(例如,使用hive选项(fileformat'orc')的子句创建的表),当spark.sql.hive.convertmetastoreorc也设置为true时,将使用矢量化读卡器。

3.4 JSON文件

spark sql可以自动推断JSON数据集的模式,并将其作为DataSet加载。可以在DataSet或JSON文件上使用sparksession.read().json()完成此转换。

请注意,作为JSON文件提供的文件不是典型的JSON文件。每一行必须包含一个独立的、自包含的有效JSON对象。有关详细信息,请参阅(http://jsonlines.org/)

对于常规的多行JSON文件,将multiLine选项设置为true。

 
  1. import org.apache.spark.sql.Dataset;

  2. import org.apache.spark.sql.Row;

  3.  

  4. // A JSON dataset is pointed to by path.

  5. // The path can be either a single text file or a directory storing text files

  6. Dataset<Row> people = spark.read().json("examples/src/main/resources/people.json");

  7.  

  8. // The inferred schema can be visualized using the printSchema() method

  9. people.printSchema();

  10. // root

  11. //  |-- age: long (nullable = true)

  12. //  |-- name: string (nullable = true)

  13.  

  14. // Creates a temporary view using the DataFrame

  15. people.createOrReplaceTempView("people");

  16.  

  17. // SQL statements can be run by using the sql methods provided by spark

  18. Dataset<Row> namesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19");

  19. namesDF.show();

  20. // +------+

  21. // |  name|

  22. // +------+

  23. // |Justin|

  24. // +------+

  25.  

  26. // Alternatively, a DataFrame can be created for a JSON dataset represented by

  27. // a Dataset<String> storing one JSON object per string.

  28. List<String> jsonData = Arrays.asList(

  29.        "{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");

  30. Dataset<String> anotherPeopleDataset = spark.createDataset(jsonData, Encoders.STRING());

  31. Dataset<Row> anotherPeople = spark.read().json(anotherPeopleDataset);

  32. anotherPeople.show();

  33. // +---------------+----+

  34. // |        address|name|

  35. // +---------------+----+

  36. // |[Columbus,Ohio]| Yin|

  37. // +---------------+----+

3.5 Hive Tables

Spark SQL还支持读取和写入存储在Apache配置单元中的数据。但是,由于hive有大量依赖项,因此这些依赖项不包括在默认的spark分布中。如果在类路径上可以找到hive依赖项,spark将自动加载它们。请注意,这些配置单元依赖项也必须存在于所有工作节点上,因为它们需要访问配置单元序列化和反序列化库(SERDE),以便访问存储在配置单元中的数据。

配置配置单元是通过将hive-site.xml、core-site.xml(用于安全配置)和hdfs-site.xml(用于hdfs配置)文件放在conf/中完成的。

使用hive时,必须使用hive支持实例化sparksession,包括与持久hive元存储的连接、对hive serdes的支持以及hive用户定义函数。没有现有配置单元部署的用户仍然可以启用配置单元支持。当hive-site.xml未配置时,上下文会自动在当前目录中创建metastore_db,并创建由spark.sql.warehouse.dir配置的目录,该目录默认为当前目录中启动spark应用程序的目录spark warehouse。注意,hive-site.xml中的hive.metastore.warehouse.dir属性由于spark 2.0.0而被弃用。相反,使用spark.sql.warehouse.dir指定数据库在仓库中的默认位置。您可能需要向启动Spark应用程序的用户授予写权限。

 
  1. import java.io.File;

  2. import java.io.Serializable;

  3. import java.util.ArrayList;

  4. import java.util.List;

  5.  

  6. import org.apache.spark.api.java.function.MapFunction;

  7. import org.apache.spark.sql.Dataset;

  8. import org.apache.spark.sql.Encoders;

  9. import org.apache.spark.sql.Row;

  10. import org.apache.spark.sql.SparkSession;

  11.  

  12. public static class Record implements Serializable {

  13.  private int key;

  14.  private String value;

  15.  

  16.  public int getKey() {

  17.    return key;

  18.  }

  19.  

  20.  public void setKey(int key) {

  21.    this.key = key;

  22.  }

  23.  

  24.  public String getValue() {

  25.    return value;

  26.  }

  27.  

  28.  public void setValue(String value) {

  29.    this.value = value;

  30.  }

  31. }

  32.  

  33. // warehouseLocation points to the default location for managed databases and tables

  34. String warehouseLocation = new File("spark-warehouse").getAbsolutePath();

  35. SparkSession spark = SparkSession

  36.  .builder()

  37.  .appName("Java Spark Hive Example")

  38.  .config("spark.sql.warehouse.dir", warehouseLocation)

  39.  .enableHiveSupport()

  40.  .getOrCreate();

  41.  

  42. spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive");

  43. spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src");

  44.  

  45. // Queries are expressed in HiveQL

  46. spark.sql("SELECT * FROM src").show();

  47. // +---+-------+

  48. // |key|  value|

  49. // +---+-------+

  50. // |238|val_238|

  51. // | 86| val_86|

  52. // |311|val_311|

  53. // ...

  54.  

  55. // Aggregation queries are also supported.

  56. spark.sql("SELECT COUNT(*) FROM src").show();

  57. // +--------+

  58. // |count(1)|

  59. // +--------+

  60. // |    500 |

  61. // +--------+

  62.  

  63. // The results of SQL queries are themselves DataFrames and support all normal functions.

  64. Dataset<Row> sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key");

  65.  

  66. // The items in DataFrames are of type Row, which lets you to access each column by ordinal.

  67. Dataset<String> stringsDS = sqlDF.map(

  68.    (MapFunction<Row, String>) row -> "Key: " + row.get(0) + ", Value: " + row.get(1),

  69.    Encoders.STRING());

  70. stringsDS.show();

  71. // +--------------------+

  72. // |               value|

  73. // +--------------------+

  74. // |Key: 0, Value: val_0|

  75. // |Key: 0, Value: val_0|

  76. // |Key: 0, Value: val_0|

  77. // ...

  78.  

  79. // You can also use DataFrames to create temporary views within a SparkSession.

  80. List<Record> records = new ArrayList<>();

  81. for (int key = 1; key < 100; key++) {

  82.  Record record = new Record();

  83.  record.setKey(key);

  84.  record.setValue("val_" + key);

  85.  records.add(record);

  86. }

  87. Dataset<Row> recordsDF = spark.createDataFrame(records, Record.class);

  88. recordsDF.createOrReplaceTempView("records");

  89.  

  90. // Queries can then join DataFrames data with data stored in Hive.

  91. spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show();

  92. // +---+------+---+------+

  93. // |key| value|key| value|

  94. // +---+------+---+------+

  95. // |  2| val_2|  2| val_2|

  96. // |  2| val_2|  2| val_2|

  97. // |  4| val_4|  4| val_4|

  98. // ...

  • 指定hive表的存储格式 当您创建一个hive表时,您需要定义这个表应该如何从文件系统读/写数据,即“输入格式”和“输出格式”。您还需要定义此表应如何将数据反序列化为行,或将行序列化为数据,即“serde”。以下选项可用于指定存储格式(“serde”、“input format”、“output format”),例如,使用hive选项(fileformat“parquet”)创建table src(id int)。默认情况下,我们将以纯文本形式读取表文件。注意,在创建表时还不支持Hive存储处理程序,您可以使用Hive端的存储处理程序创建表,并使用Spark SQL来读取它。

  • 与不同版本的 Hive Metastore交互 Spark SQL的Hive支持中最重要的部分之一是与Hive元存储的交互,这使得Spark SQL能够访问Hive表的元数据。从spark 1.4.0开始,可以使用下面描述的配置,使用spark SQL的单个二进制构建来查询不同版本的hive元存储。注意,与用于与元存储对话的Hive版本无关,内部Spark SQL将根据Hive 1.2.1编译,并将这些类用于内部执行(serdes、udfs、udafs等)。 以下选项可用于配置用于检索元数据的配置单元版本:

3.6 JDBC To Other Databases

Spark SQL还包括一个数据源,可以使用JDBC从其他数据库中读取数据。与使用JDBCRDD相比,应该优先使用此功能。这是因为结果以DataFrame 的形式返回,并且可以在Spark SQL中轻松地处理它们,或者与其他数据源联接。JDBC数据源也更容易从Java或Python中使用,因为它不需要用户提供CLSASTG。(请注意,这与Spark SQL JDBC服务器不同,后者允许其他应用程序使用Spark SQL运行查询)。

要开始,您需要在spark类路径上包含特定数据库的JDBC驱动程序。例如,要从spark shell连接到postgres,您将运行以下命令:

 
  1. bin/spark-shell --driver-class-path postgresql-9.4.1207.jar --jars postgresql-9.4.1207.jar

可以使用数据源API将远程数据库中的表作为数据帧或Spark SQL临时视图加载。用户可以在数据源选项中指定JDBC连接属性。用户和密码通常作为登录到数据源的连接属性提供。除了连接属性外,Spark还支持以下不区分大小写的选项:

 
  1. // Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods

  2. // Loading data from a JDBC source

  3. Dataset<Row> jdbcDF = spark.read()

  4.  .format("jdbc")

  5.  .option("url", "jdbc:postgresql:dbserver")

  6.  .option("dbtable", "schema.tablename")

  7.  .option("user", "username")

  8.  .option("password", "password")

  9.  .load();

  10.  

  11. Properties connectionProperties = new Properties();

  12. connectionProperties.put("user", "username");

  13. connectionProperties.put("password", "password");

  14. Dataset<Row> jdbcDF2 = spark.read()

  15.  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);

  16.  

  17. // Saving data to a JDBC source

  18. jdbcDF.write()

  19.  .format("jdbc")

  20.  .option("url", "jdbc:postgresql:dbserver")

  21.  .option("dbtable", "schema.tablename")

  22.  .option("user", "username")

  23.  .option("password", "password")

  24.  .save();

  25.  

  26. jdbcDF2.write()

  27.  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);

  28.  

  29. // Specifying create table column data types on write

  30. jdbcDF.write()

  31.  .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")

  32.  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);

3.7 Apache Avro Data Source Guide

  • 部署

spark avro模块是外部的,默认情况下不包括在spark submit或spark shell中。 与任何Spark应用程序一样,Spark Submit用于启动应用程序。spark-avro_2.11及其依赖项可以直接添加到spark submit中,使用--packages,例如,

 
  1. ./bin/spark-submit --packages org.apache.spark:spark-avro_2.11:2.4.0 ...

For experimenting on spark-shell, you can also use --packages to add org.apache.spark:spark-avro_2.11 and its dependencies directly,

 
  1. ./bin/spark-shell --packages org.apache.spark:spark-avro_2.11:2.4.0 ...

  • Load and Save Functions 因为spark avro模块是外部的,所以在dataframereader或dataframewriter中没有.avro API。

要以avro格式加载/保存数据,需要将数据源选项格式指定为avro(或org.apache.spark.sql.avro)。

 
  1. Dataset<Row> usersDF = spark.read().format("avro").load("examples/src/main/resources/users.avro");

  2. usersDF.select("name", "favorite_color").write().format("avro").save("namesAndFavColors.avro");

  • toavro() and fromavro() Avro包提供函数toavro以Avro格式将列编码为二进制,fromavro()将Avro二进制数据解码为列。这两个函数都将一列转换为另一列,输入/输出SQL数据类型可以是复杂类型或原语类型。 当从Kafka这样的流源读取或写入数据时,使用Avro记录作为列非常有用。每个Kafka键值记录都将添加一些元数据,例如Kafka中的摄入时间戳、Kafka中的偏移量等等。 如果包含数据的“value”字段位于Avro中,那么可以使用fromavro()来提取数据、丰富数据、清理数据,然后再将其向下推到Kafka或将其写入文件。 toavro()可用于将结构体转换为Avro记录。当您在向Kafka写入数据时希望将多个列重新编码为单个列时,此方法尤其有用。 这两个函数目前只在Scala和Java中可用。

 
  1. import org.apache.spark.sql.avro.*;

  2.  

  3. // `from_avro` requires Avro schema in JSON string format.

  4. String jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc")));

  5.  

  6. Dataset<Row> df = spark

  7.  .readStream()

  8.  .format("kafka")

  9.  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")

  10.  .option("subscribe", "topic1")

  11.  .load();

  12.  

  13. // 1. Decode the Avro data into a struct;

  14. // 2. Filter by column `favorite_color`;

  15. // 3. Encode the column `name` in Avro format.

  16. Dataset<Row> output = df

  17.  .select(from_avro(col("value"), jsonFormatSchema).as("user"))

  18.  .where("user.favorite_color == \"red\"")

  19.  .select(to_avro(col("user.name")).as("value"));

  20.  

  21. StreamingQuery query = output

  22.  .writeStream()

  23.  .format("kafka")

  24.  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")

  25.  .option("topic", "topic2")

  26.  .start();

  • Data Source Option 可以使用DataFrameReader或DataFrameWriter上的.option方法设置Avro的数据源选项。

  • 配置 可以使用SparkSession上的setConf方法或使用SQL运行SET key=value命令来配置Avro。

  • 与Databricks spark-avro的兼容性 这个Avro数据源模块最初来自Databricks的开源存储库spark-avro,并与之兼容。 默认情况下,SQL配置为spark.sql.legacy.replaceDatabricksSparkAvro。启用,数据源提供程序com. database .spark。avro映射到这个内置的avro模块。对于使用提供程序属性com. database . Spark创建的Spark表。在catalog元存储中,如果您正在使用这个内置的avro模块,那么映射对于加载这些表是必不可少的。 注意,在Databricks的spark-avro中,为快捷函数.avro()创建了隐式类AvroDataFrameWriter和AvroDataFrameReader。在这个内置但外部的模块中,这两个隐式类都被删除了。请在DataFrameWriter或DataFrameReader中使用.format("avro"),这样应该足够干净和良好。 如果您喜欢使用自己构建的spark-avro jar文件,您可以简单地禁用配置spark.sql.legacy.replaceDatabricksSparkAvro。启用,并在部署应用程序时使用选项jar。有关更多细节,请参阅应用程序提交指南中的高级依赖项管理部分。

  • Supported types for Avro -> Spark SQL conversion 目前Spark支持读取Avro记录下的所有原始类型和复杂类型。除了上面列出的类型之外,它还支持读取union类型。以下三种类型被认为是基本联合类型: 1.union(int, long)将映射到LongType。 2.union(float, double)将映射到DoubleType。 3.union(something, null),其中的something是任何受支持的Avro类型。这将映射到与其他类型相同的Spark SQL类型,并将nullable设置为true。所有其他联合类型都被认为是复杂的。它们将被映射到StructType,其中字段名是member0、member1等,根据union的成员。这与在Avro和parquet之间转换时的行为是一致的。

它还支持读取以下Avro逻辑类型:

  • 支持类型的Spark SQL -> Avro转换 Spark支持将所有Spark SQL类型写入Avro。对于大多数类型,从Spark类型到Avro类型的映射是直接的(例如,IntegerType转换为int);然而,以下列出了一些特殊情况: 您还可以使用avroSchema选项指定整个输出Avro模式,这样Spark SQL类型就可以转换为其他Avro类型。以下转换在默认情况下不应用,需要用户指定的Avro模式: 

3.8 故障排查

  • JDBC驱动程序类必须在客户端会话和所有执行器上对原始类加载程序可见。这是因为Java的DRIVER管理器类做了一个安全检查,导致在打开连接时忽略原始类加载器不可见的所有驱动程序。实现这一点的一个方便方法是修改所有工作节点上的compute_classpath.sh以包括驱动程序jar。

  • 一些数据库(如h2)将所有名称转换为大写。您需要使用大写来引用Spark SQL中的那些名称。

  • 用户可以在数据源选项中指定特定于供应商的JDBC连接属性来进行特殊处理。例如,spark.read.format(“jdbc”).option(“url”,oracle jdbc url).option(“oracle.jdbc.mapDateTimeStamp”,“false”)。Oracle.jdbc.mapDateTimeStamp默认为true,用户通常需要禁用此标志,以避免将Oracle日期解析为时间戳。

4、性能调优

4.1、在内存中缓存数据

Spark SQL可以通过调用Spark .catalog. cachetable(“tableName”)或dataFrame.cache()来使用内存中的柱状格式缓存表。然后Spark SQL将只扫描所需的列,并自动调优压缩,以最小化内存使用和GC压力。可以调用spark.catalog. unachetable(“tableName”)从内存中删除表。 可以使用SparkSession上的setConf方法或使用SQL运行SET key=value命令来配置内存缓存。

4.2、其他配置选项

还可以使用以下选项来优化查询执行的性能。随着更多优化的自动执行,这些选项可能会在未来的版本中被弃用。

4.3、用于SQL查询的广播提示

当将每个指定的表与另一个表或视图连接时,BROADCAST提示引导Spark广播它们。当Spark决定连接方法时,广播散列连接(即,最好是BHJ),即使统计数据高于配置spark.sq . autobroadcastjointhreshold。当指定连接的两边时,Spark广播统计信息较低的一方。注意Spark并不保证总是选择BHJ,因为并非所有情况(例如完全外部连接)都支持BHJ。在选择broadcast嵌套循环连接时,仍然遵循提示。

 
  1. import static org.apache.spark.sql.functions.broadcast;

  2. broadcast(spark.table("src")).join(spark.table("records"), "key").show();

5、分布式的SQL引擎

Spark SQL还可以使用JDBC/ODBC或命令行接口充当分布式查询引擎。在这种模式下,终端用户或应用程序可以直接与Spark SQL交互来运行SQL查询,而不需要编写任何代码。

5.1、运行Thrift JDBC/ODBC服务器

这里实现的Thrift JDBC/ODBC服务器对应于Hive 1.2.1中的HiveServer2。您可以使用Spark或Hive 1.2.1附带的beeline脚本测试JDBC服务器。 要启动JDBC/ODBC服务器,请在Spark目录中运行以下命令:

 
  1. ./sbin/start-thriftserver.sh

这个脚本接受所有bin/spark-submit命令行选项,加上一个——hiveconf选项来指定Hive属性。您可以运行./sbin/start-thriftserver.sh——帮助获取所有可用选项的完整列表。默认情况下,服务器监听localhost:10000。您可以通过任何环境变量覆盖此行为,即:

 
  1. export HIVE_SERVER2_THRIFT_PORT=<listening-port>

  2. export HIVE_SERVER2_THRIFT_BIND_HOST=<listening-host>

  3. ./sbin/start-thriftserver.sh \

  4.  --master <master-uri> \

  5.  ...

或系统属性:

 
  1. ./sbin/start-thriftserver.sh \

  2.  --hiveconf hive.server2.thrift.port=<listening-port> \

  3.  --hiveconf hive.server2.thrift.bind.host=<listening-host> \

  4.  --master <master-uri>

  5.  ...

现在您可以使用beeline来测试Thrift JDBC/ODBC服务器:

 
  1. ./bin/beeline

  2.  

  3. #Connect to the JDBC/ODBC server in beeline with:

  4. beeline> !connect jdbc:hive2://localhost:10000

Beeline会询问您的用户名和密码。在非安全模式下,只需在计算机上输入用户名和空白密码。 Hive的配置是通过放置hive-site.xml, core-site.xml and hdfs-site.xml到conf/中。 您还可以使用Hive附带的beeline脚本。 Thrift JDBC server还支持通过HTTP传输发送Thrift RPC消息。使用以下设置将HTTP模式作为系统属性或在hive-site中启用。conf/中的xml文件:

 
  1. hive.server2.transport.mode - Set this to value: http

  2. hive.server2.thrift.http.port - HTTP port number to listen on; default is 10001

  3. hive.server2.http.endpoint - HTTP endpoint; default is cliservice

要进行测试,请使用beeline在http模式下连接JDBC/ODBC服务器,使用:

 
  1. beeline> !connect jdbc:hive2://<host>:<port>/<database>?hive.server2.transport.mode=http;hive.server2.thrift.http.path=<http_endpoint>

5.2、运行Spark SQL CLI

Spark SQL CLI是一种方便的工具,可以在本地模式下运行Hive metastore服务并执行命令行输入的查询。请注意,Spark SQL CLI不能与Thrift JDBC服务器通信。 要启动Spark SQL CLI,请在Spark目录中运行以下命令:

 
  1. ./bin/spark-sql

Hive的配置是通过放置hive-site.xml, core-site.xml and hdfs-site.xml到conf/中。

6、参考

6.1、数据类型

Spark SQL和DataFrames支持以下数据类型:

  • 数值类型 ByteType:表示1字节有符号整数。数字的范围是从-128到127。 ShortType:表示2字节有符号整数。数字的范围从-32768到32767。 IntegerType:表示4字节有符号整数。数字的范围是从-2147483648到2147483647。 LongType:表示8字节有符号整数。数字范围从-9223372036854775808到9223372036854775807。 FloatType:表示4字节的单精度浮点数。 DoubleType:表示8字节双精度浮点数。 DecimalType:表示任意精度的带符号小数。内部支持java.math.BigDecimal。BigDecimal由一个任意精度的整数无标度值和一个32位整数标度组成。

  • 字符串类型 StringType:表示字符串值。

  • 二进制类型 BinaryType:表示字节序列值。

  • 布尔类型 BooleanType:表示布尔值。

  • Datetime类型 TimestampType:表示值,包括字段year、month、day、hour、minute和second的值。 DateType:表示值,其中包含字段year、month、day的值。

  • 复杂类型 ArrayType(elementType, containsNull):表示包含元素序列的值,元素的类型为elementType。containsNull用于指示ArrayType值中的元素是否可以具有空值。 MapType(keyType, valueType, valueContainsNull):表示由一组键-值对组成的值。键的数据类型由键类型描述,值的数据类型由valueType描述。对于MapType值,键值不允许为空。valueContainsNull用于指示MapType值的值是否可以为空值。 StructType(fields):用StructFields (fields)序列描述的结构表示值。 StructField(name, dataType, nullable):表示StructType中的一个字段。字段的名称由名称表示。字段的数据类型由数据类型表示。nullable用于指示此字段的值是否可以为空值。

Spark SQL的所有数据类型都位于org.apache.spark.sql.types的包中。要访问或创建数据类型,请使用org.apache.spark.sql.types.DataTypes中提供的工厂方法。

6.1、NaN语义

在处理与标准浮点语义不完全匹配的浮点类型或双类型时,对非数字(NaN)有特殊的处理。具体地说: 1.NaN = NaN返回true。 2.在聚合中,所有NaN值都分组在一起。 3.NaN在连接键中被视为一个正常值。 4.NaN值按升序排列,比任何其他数字值都大,持续时间最长

6.2、算术运算

对数值类型(小数除外)执行的操作不检查溢出。这意味着如果一个操作导致溢出,结果与Java/Scala程序中返回的操作相同。如果两个整数的和大于可表示的最大值,则结果为负数)。

原创不易,转载请注明出处。

 

欢迎扫码关注
获取更多干货

 

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小丑西瓜9/article/detail/397808
推荐阅读
  

闽ICP备14008679号