赞
踩
朱金华 njzhujinhua 2017/12/03
本系列以Spark2.2.0版本为依据整理。
第一节通过Spark交互式shell熟悉其计算过程。
在2.0版本之前,Spark的主要编程接口是RDD(弹性分布式数据集),在2.0之后,则主推Dataset,他与RDD一样是强类型,但更加优化。RDD接口仍然支持,但为了更优性能考虑还是用Dataset的好。
在spark目录中运行bin/spark-shell,或将spark安装目录设为SPARK_HOME
环境变量且将其$SPARK_HOME/bin
加到PATH中,则以后可在任意目录执行spark-shell即可启动。
RDD可以从Hadoop的InputFormats文件(如hdfs文件)创建,也可读写本地文件,也可由其他RDD经转换而来。Dataset也具有这些性质。以读取文件为例,RDD时代可以在shell中通过sc.textFile(filename)直接读取,在Dataset则需要通过spark.read.textFile(filename)读取。
scala> val rdd1=sc.textFile("README.md")
rdd1: org.apache.spark.rdd.RDD[String] = README.md MapPartitionsRDD[72] at textFile at <console>:24
此处sc为SparkContext
顺便看下其textFile的实现,支持从HDFS文件系统,本地文件系统,及其他类hadoop文件系统的URI,并返回一个字符串的RDD。在程序中读取本地文件时要注意该文件在所有节点皆存在,因为任何executor的task都可能会执行此代码。
/**
* Read a text file from HDFS, a local file system (available on all nodes), or any
* Hadoop-supported file system URI, and return it as an RDD of Strings.
* @param path path to the text file on a supported file system
* @param minPartitions suggested minimum number of partitions for the resulting RDD
* @return RDD of lines of the text file
*/
def textFile(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
assertNotStopped()
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map(pair => pair._2.toString).setName(path)
}
scala> val dataset1=spark.read.textFile("README.md")
org.apache.spark.sql.AnalysisException: Path does not exist: file:/home/hadoop/README.md;
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$14.apply(DataSource.scala:360)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$14.apply(DataSource.scala:348)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.immutable.List.flatMap(List.scala:344)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:348)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
at org.apache.spark.sql.DataFrameReader.text(DataFrameReader.scala:623)
at org.apache.spark.sql.DataFrameReader.textFile(DataFrameReader.scala:657)
at org.apache.spark.sql.DataFrameReader.textFile(DataFrameReader.scala:632)
... 48 elided
scala> val dataset1=spark.read.textFile("/webdev/hadoop/dev/spark/README.md")
dataset1: org.apache.spark.sql.Dataset[String] = [value: string]

此处实验可见spark.read方式并没有读取环境变量到真正的SPARK_HOME下查找,而是当前目录下寻找但未果。
Spark SQL是查询结构化数据的一个模块,与基础的Spark RDD API不同,Spark SQL提供了查询结构化数据及计算结果等信息的接口。与Spark SQL进行交互的方式包括SQL和Dataset API。运行SQL的查询结果将以Dataset/DataFrame的形式返回。
从上述的实验返回结果页很明确的看出Dataset挂在spark.sql包下,而此处的spark类型为org.apache.spark.sql.SparkSession。
spark.read
返回的实际是一个DataFrameReader
/**
* Returns a [[DataFrameReader]] that can be used to read non-streaming data in as a
* `DataFrame`.
* {{{
* sparkSession.read.parquet("/path/to/file.parquet")
* sparkSession.read.schema(schema).json("/path/to/file.json")
* }}}
*
* @since 2.0.0
*/
def read: DataFrameReader = new DataFrameReader(self)
DataFrameReader.textFile的实现为
/**
* Loads text files and returns a [[Dataset]] of String. The underlying schema of the Dataset
* contains a single string column named "value".
*
* If the directory structure of the text files contains partitioning information, those are
* ignored in the resulting Dataset. To include partitioning information as columns, use `text`.
*
* Each line in the text files is a new element in the resulting Dataset. For example:
* {{{
* // Scala:
* spark.read.textFile("/path/to/spark/README.md")
*
* // Java:
* spark.read().textFile("/path/to/spark/README.md")
* }}}
*
* @param paths input path
* @since 2.0.0
*/
@scala.annotation.varargs
def textFile(paths: String*): Dataset[String] = {
assertNoSpecifiedSchema("textFile")
text(paths : _*).select("value").as[String](sparkSession.implicits.newStringEncoder)
}

其加载文本文件并返回一个string的Dataset,这个dataset仅包含单个名为”value”的列。若文本文件的目录结构包含分区信息,在读到的dataset中也将被忽略,要想将这些分区信息作为schema列信息的话,需要用text
API, 看textFile的实现,其也是用的text的特殊参数。
查看内容可以使用
dataset1.collect().foreach(println)
或
dataset1.take(10).foreach(println)
。
其中collect方式返回所有记录,take(n)返回n条记录
scala> dataset1.take(3)
res13: Array[String] = Array(# Apache Spark, "", Spark is a fast and general cluster computing system for Big Data. It provides)
演示将每行按空格拆分为单词的过程,这里假设用空格分隔,暂不考虑标点符号的分隔。
先如前所述看下前三行的内容,展示下其格式
scala> dataset1.take(3)
res13: Array[String] = Array(# Apache Spark, "", Spark is a fast and general cluster computing system for Big Data. It provides)
然后利用flatMap按照一对多转换,即字符串line转为数组。
scala> val words=dataset1.flatMap(line=>line.split(" "))
words: org.apache.spark.sql.Dataset[String] = [value: string]
查看结果的前十个元素
scala> words.take(10)
res14: Array[String] = Array(#, Apache, Spark, "", Spark, is, a, fast, and, general)
进一步的可以利用count查看其个数
scala> words.count
res47: Long = 566
Dataset没有reduceByKey接口,但可以使用groupByKey接口。我们实现wordcount功能可以使用:
scala> val groupwords=words.groupByKey(a=>a)
groupwords: org.apache.spark.sql.KeyValueGroupedDataset[String,String] = org.apache.spark.sql.KeyValueGroupedDataset@59de6c1f
scala> val cnt=groupwords.count()
cnt: org.apache.spark.sql.Dataset[(String, Long)] = [value: string, count(1): bigint]
scala> cnt.collect()
res53: Array[(String, Long)] = Array((online,1), (graphs,1), (["Parallel,1), (["Building,1), (thread,1), (documentation,3), (command,,2), (abbreviated,1), (overview,1), (rich,1), (set,2), (-DskipTests,1), (name,1), (page](http://spark.apache.org/documentation.html).,1), (["Specifying,1), (stream,1), (run:,1), (not,1), (programs,2), (tests,2), (./dev/run-tests,1), (will,1), ([run,1), (particular,2), (option,1), (Alternatively,,1), (by,1), (must,1), (using,5), (you,4), (MLlib,1), (DataFrames,,1), (variable,1), (Note,1), (core,1), (more,1), (protocols,1), (guidance,2), (shell:,2), (can,7), (site,,1), (systems.,1), (Maven,1), ([building,1), (configure,1), (for,12), (README,1), (Interactive,2), (how,3), ([Configuration,1), (Hive,2), (system,1), (provides,1), (Hadoop-supported,1), (pre-built,...
以上连起来就是
scala> val ds=spark.read.textFile("README.md").flatMap(a=>a.split("\\s+")).groupByKey(a=>a).count
ds: org.apache.spark.sql.Dataset[(String, Long)] = [value: string, count(1): bigint]
如果Dataset再转为利用经典的rdd的方式则可以
scala> val ds2rdd1=spark.read.textFile("README.md").rdd.flatMap(a=>a.split("\\s+")).map(a=>(a,1)).reduceByKey(_+_)
ds2rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[128] at reduceByKey at <console>:23
scala> val ds2rdd2=spark.read.textFile("README.md").flatMap(a=>a.split("\\s+")).map(a=>(a,1)).rdd.reduceByKey(_+_)
ds2rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[135] at reduceByKey at <console>:23
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。