当前位置:   article > 正文

sparksql dataframe变成csv保存_Spark大数据分析(三):DataFrame和SQL

sparksql,将dataframe保存为csv文件并保存在hdfs上面
首发于公众号“大数据风控与机器学习”。

Spark SQL 是 Spark 处理结构化数据的一个模块, 与基础的 Spark RDD API 不同, Spark SQL 提供了查询结构化数据及计算结果等信息的接口.在内部, Spark SQL 使用这个额外的信息去执行额外的优化.有几种方式可以跟 Spark SQL 进行交互, 包括 SQL 和 Dataset API.当使用相同执行引擎进行计算时, 无论使用哪种 API / 语言都可以快速的计算。

SQL

Spark SQL 的功能之一是执行 SQL 查询,Spark SQL 也能够被用于从已存在的 Hive 环境中读取数据。当以另外的编程语言运行SQL 时, 查询结果将以 Dataset/DataFrame的形式返回,也可以使用 命令行或者通过 JDBC/ODBC与 SQL 接口交互.

DataFrames

从RDD里可以生成类似大家在pandas中的DataFrame,同时可以方便地在上面完成各种操作。

1.构建SparkSession

Spark SQL中所有功能的入口点是 SparkSession 类. 要创建一个 SparkSession, 仅使用 SparkSession.builder()就可以了:

from 

2.创建 DataFrames

在一个 SparkSession中, 应用程序可以从一个 已经存在的 RDD 或者 hive表, 或者从Spark数据源中创建一个DataFrames.

举个例子, 下面就是基于一个JSON文件创建一个DataFrame:

  1. df = spark.read.json("data/people.json")
  2. df.show()#必须使用show()不然不会打印

4c2e1623f0b5825569d5b51d4a00348a.png

3.DataFrame 操作

DataFrames 提供了一个特定的语法用在 Scala, Java, Python and R中机构化数据的操作。

在Python中,可以通过(df.age) 或者(df['age'])来获取DataFrame的列. 虽然前者便于交互式操作, 但是还是建议用户使用后者, 这样不会破坏列名,也能引用DataFrame的类。

通过以下操作进行select

  1. #查看字段属性
  2. df.printSchema()

root
|-- age: long (nullable = true)
|-- name: string (nullable = true)

df.select("name").show()

10d35fcb71b1c499cdd6ece20222e539.png
df.select(["name",'age']).show()

38abc8fafedd410548badf0532897eab.png
df.select(df['name'], df['age'] + 1).show()

f8d288705be6db4429467ae9a90802e3.png

以下操作的filter做条件过滤

df.filter(df['age'] > 21).show()

a1ed0a2c674f1bc89c3ca4196a867bc4.png
df.groupBy("age").count().show()

5a129bb2800d699e830da4737426e71b.png

还可以创建视图,然后使用SQL语句进行处理。得到的也是dataframe。

  1. df.createOrReplaceTempView("people")
  2. sqlDF = spark.sql("SELECT * FROM people")
  3. sqlDF.show()

de8e71806aecafddba534dc01715cf3f.png

spark DataFrame与RDD交互

Spark SQL 支持两种不同的方法用于转换已存在的 RDD 成为 Dataset

第一种方法是使用反射去推断一个包含指定的对象类型的 RDD 的 Schema.在你的 Spark 应用程序中当你已知 Schema 时这个基于方法的反射可以让你的代码更简洁.

第二种用于创建 Dataset 的方法是通过一个允许你构造一个 Schema 然后把它应用到一个已存在的 RDD 的编程接口.然而这种方法更繁琐, 当列和它们的类型知道运行时都是未知时它允许你去构造 Dataset.

当数据不规整,无法像csv或者excel等文件一样直接读取时,可以通过如下形式自定义dataframe样式。

  1. from pyspark.sql import Row
  2. sc = spark.sparkContext
  3. lines = sc.textFile("data/people.txt")
  4. parts = lines.map(lambda l: l.split(","))
  5. people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))
  6. # Infer the schema, and register the DataFrame as a table.
  7. schemaPeople = spark.createDataFrame(people)
  8. schemaPeople.createOrReplaceTempView("people")
  9. teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
  10. type(teenagers)

pyspark.sql.dataframe.DataFrame

type(teenagers.rdd)

pyspark.rdd.RDD

teenagers.rdd.first()

Row(name='Justin')

  1. teenNames = teenagers.rdd.map(lambda p: "Name: " + p.name).collect()
  2. for name in teenNames:
  3. print(name)

Name: Justi

以编程的方式指定Schema

也可以通过以下的方式去初始化一个 DataFrame。

  • RDD从原始的RDD创建一个RDD的toples或者一个列表;
  • Step 1 被创建后, 创建 Schema 表示一个 StructType 匹配 RDD 中的结构.
  • 通过 SparkSession 提供的 createDataFrame 方法应用 Schema 到 RDD .
  1. from pyspark.sql.types import *
  2. sc = spark.sparkContext
  3. # Load a text file and convert each line to a Row.
  4. lines = sc.textFile("data/people.txt")
  5. parts = lines.map(lambda l: l.split(","))
  6. # Each line is converted to a tuple.
  7. people = parts.map(lambda p: (p[0], p[1].strip()))
  8. # The schema is encoded in a string.
  9. schemaString = "name age"
  10. fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
  11. schema = StructType(fields)
  12. # Apply the schema to the RDD.
  13. schemaPeople = spark.createDataFrame(people, schema)
  14. schemaPeople.createOrReplaceTempView("people")
  15. results = spark.sql("SELECT name FROM people")
  16. results.show()

f9ec30a6851b5bf1c928de185ce6f35f.png
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/花生_TL007/article/detail/609222
推荐阅读
相关标签
  

闽ICP备14008679号