赞
踩
转载于:http://www.thebigdata.cn/Hadoop/10150.html
1、创建sc对象
创建一个SparkContext对象,该对象有四个参数:Spark master位置、应用程序名称,Spark安装目录和jar存放位置,对于Spark On YARN而言,最重要的是前两个参数,第一个参数指定为“yarn-standalone”,第二个参数是自定义的字符串,举例如下:
val sc = new SparkContext(args(0), “WordCount”,
System.getenv(“SPARK_HOME”), Seq(System.getenv(“SPARK_TEST_JAR”)))
2、读取数据
读取输入数据。我们要从HDFS上读取文本数据,可以使用SparkContext中的textFile函数将输入文件转换为一个RDD,该函数采用的是Hadoop中的TextInputFormat解析输入数据,举例如下:
val textFile = sc.textFile(args(1))
当然,Spark允许你采用任何Hadoop InputFormat,比如二进制输入格式SequenceFileInputFormat,此时你可以使用SparkContext中的hadoopRDD函数,举例如下:
val inputFormatClass = classOf[SequenceFileInputFormat[Text,Text]]
var hadoopRdd = sc.hadoopRDD(conf, inputFormatClass, classOf[Text], classOf[Text])
或者直接创建一个HadoopRDD对象:
var hadoopRdd = new HadoopRDD(sc, conf,
classOf[SequenceFileInputFormat[Text,Text, classOf[Text], classOf[Text])
3、RDD转换算子操作和转换RDD
对于WordCount而言,首先需要从输入数据中每行字符串中解析出单词,然后将相同单词放到一个桶中,最后统计每个桶中每个单词出现的频率,举例如下:
val result = hadoopRdd.flatMap{
case(key, value) => value.toString().split(“\\s+”);
}.map(word => (word, 1)). reduceByKey (_ + _)
其中,flatMap函数可以将一条记录转换成多条记录(一对多关系),map函数将一条记录转换为另一条记录(一对一关系),reduceByKey函数将key相同的数据划分到一个桶中,并以key为单位分组进行计算,这些函数的具体含义可参考:Spark Transformation。
4、将产生的RDD数据集保存到HDFS上
可以使用SparkContext中的saveAsTextFile哈数将数据集保存到HDFS目录下,默认采用Hadoop提供的TextOutputFormat,每条记录以“(key,value)”的形式打印输出,你也可以采用saveAsSequenceFile函数将数据保存为SequenceFile格式等,举例如下:
result.saveAsSequenceFile(args(2))
当然,一般我们写Spark程序时,需要包含以下两个头文件:
import org.apache.spark._
import SparkContext._
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。