当前位置:   article > 正文

【Spark】Spark SQL基础使用详解和案例_spark sql的学习文档

spark sql的学习文档

Spark SQL是Apache Spark的一个模块,它提供了一种基于结构化数据的编程接口。
Spark SQL支持结构化数据的处理,包括数据的读取、转换和查询。它可以将传统的基于表和SQL的操作和Spark的分布式计算相结合,提供强大的数据处理和分析能力。
Spark SQL也可以与其他Spark组件集成,如MLlib和GraphX,以支持更广泛的数据处理场景。

  1. 读入数据
val spark: SparkSession = SparkSession.builder().master("local").appName("agent_log_df").getOrCreate()

val fileRDD: RDD[String] = spark.sparkContext.textFile("datas/agent.log")
val rowRDD: RDD[Row] = fileRDD.map(_.split(" ")).map(
line => Row(line(0), line(1).toInt, line(2).toInt)
)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  1. 创建表结构
// 定义表结构
val df: DataFrame = spark.sqlContext.createDataFrame(rowRDD,
StructType(Seq(StructField("t1", StringType), StructField("t2", IntegerType), StructField("t3", IntegerType)))
)
  • 1
  • 2
  • 3
  • 4
  1. 创建临时表
df.createTempView("tmp_table")
  • 1
  1. sql逻辑
val sql =
  """
    |select t1,t2,t3
    |from (
    |select t1, sum(t2) as t2, sum(t3) as t3 from tmp_table group by t1
    |) t
    |order by t2 desc,t3 desc
    |limit 10
    |""".stripMargin
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  1. sql执行
val result: DataFrame = spark.sql(sql)
  • 1
  1. 结果展示
result.show()
  • 1

上述中有几个关键的类和方法:

  1. sqlContext

Spark的SQLContext是负责Spark SQL操作的上下文对象,它提供了许多与SQL相关的功能,包括读取和处理各种数据源中的数据、执行SQL查询、创建数据框架和表等等。
通过SQLContext,用户可以使用DataFrame API来以结构化和类型安全的方式处理数据,并可以使用SQL语言和Spark SQL的内置函数来进行数据分析和查询。
总体来说,Spark的SQLContext是非常强大和灵活的,可以适应各种数据处理和分析需求,并且在处理大规模数据时具有出色的性能和扩展性。
在使用Spark的SqlContext之前,需要首先初始化一个SparkContext对象并创建一个RDD。

使用SqlContext需要进行以下步骤:

创建一个SparkConf对象,并设置一些参数,如AppName和Master。
使用SparkConf对象创建一个SparkContext对象。
通过SparkContext对象创建一个SqlContext对象。
使用SqlContext对象加载数据,并将其转换为DataFrame类型。
以下是具体的代码示例:

from pyspark.sql import SQLContext
from pyspark import SparkContext, SparkConf

#创建SparkConf
conf = SparkConf().setAppName("sql_example").setMaster("local[*]")

#创建SparkContext
sc = SparkContext(conf=conf)

#创建SqlContext
sqlContext = SQLContext(sc)

#读取数据文件
people = sc.textFile("people.txt")

#将数据转换为一个DataFrame对象
people_df = sqlContext.createDataFrame(people.map(lambda row: row.split(",")), ["name", "age"])
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

在以上示例中,我们首先创建了一个SparkConf对象,并设置了AppName和Master属性。然后使用SparkConf对象创建了一个SparkContext对象,并将其传递给SqlContext构造函数。接着读取了一个数据文件,并使用SqlContext对象将数据转换成DataFrame对象。

注意:使用SqlContext时需要将数据转换成DataFrame对象,而不是RDD对象。如果需要在SqlContext中使用RDD对象,可以将其转换为DataFrame对象,再进行操作。

  1. StructType

Spark的StructType是一种定义结构化数据的数据类型。
它类似于SQL表的结构,每个StructType都由一组结构字段组成,每个结构字段都有一个名称和数据类型。
使用StructType,用户可以对结构化数据进行索引、查询和分析。
StructType被广泛应用于Spark中的DataFrame API和SQL查询中。

使用方法:

导入 Spark SQL 的相关包
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}
定义 StructType
例如,假设要定义一个包含 name 和 age 两个字段的 StructType。则可以按照以下方式定义:

val schema = StructType(
    StructField("name", StringType, true) ::
    StructField("age", IntegerType, true) :: Nil
)
// 其中,StructType 用于表示整个数据结构,StructField 用于表示每个字段的信息,StringType 用于表示字段类型为字符串类,IntegerType 用于表示字段类型为整数。

// 使用 StructType
// 在创建 DataFrame 时,可以通过传递定义好的 StructType 对象来指定 DataFrame 的结构。例如:

val data = spark.sparkContext.parallelize(Seq(("John", 25), ("Mary", 30), ("Jack", 22)))
val df = spark.createDataFrame(data).toDF("name", "age")

df.printSchema()
df.show()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

输出结果为:

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)

+----+---+
|name|age|
+----+---+
|John| 25|
|Mary| 30|
|Jack| 22|
+----+---+
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

栗子

给一个日志文件,过滤出两张表,然后设计表结构,使用Spark SQL实现两张表的连接

object spark_sql_code_1 {
  def main(args: Array[String]): Unit = {
    // TODO 1: 创建spark环境
    val spark: SparkSession = SparkSession.builder().master("local").appName("spark sql code").getOrCreate()

    // TODO 2: 读取数据
    val rowRDD: RDD[Row] = spark.sparkContext.textFile("datas/agent.log")
      .map(line => {
        val words: Array[String] = line.split(" ");
        Row(words(1), words(2).toInt, words(3).toInt)
      })

    rowRDD.persist()

    val tableRDD1: RDD[Row] = rowRDD.filter(row => {
      row.getInt(1) % 2 == 0
    })

    val tableRDD2: RDD[Row] = rowRDD.filter(row => {
      row.getInt(2) % 2 == 0
    })


    // TODO 3: 创建表结构和临时表
    // 定义表结构
    val df1: DataFrame = spark.sqlContext.createDataFrame(tableRDD1,
      StructType(Seq(StructField("t1", StringType), StructField("t2", IntegerType), StructField("t3", IntegerType)))
    )

    df1.createTempView("t")

    val df2: DataFrame = spark.sqlContext.createDataFrame(tableRDD2,
      StructType(Seq(StructField("r1", StringType), StructField("r2", IntegerType), StructField("r3", IntegerType)))
    )

    df2.createTempView("r")

    // TODO 4: sql逻辑
    val sql: String =
      """
        |select r1 as t1, r2 as t2, r3 as t3, 'r' as tp from r
        |""".stripMargin


    // TODO 5: 执行sql
    val result: DataFrame = spark.sql(sql)

    // TODO 6: 结果显示
    result.show()

    // TODO 7: 关闭spark环境
    spark.stop()
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/589139
推荐阅读
相关标签
  

闽ICP备14008679号