赞
踩
在实际工作中Hadoop会作为一个提供分布式存储和分布式资源管理的一个角色存在,Spark会依赖于Hadoop去做计算。
# scala 代码
object WordCountScala {
def main(args: Array[String]): Unit = {
val conf = new SparkConf();
conf.setAppName("wordCount").setMaster("local")
val context = new SparkContext(conf);
val linesRDD = context.textFile("D:\\hadoop\\logs\\hello.txt");
var wordsRDD = linesRDD.flatMap(line => line.split(" "))
val pairRDD = wordsRDD.map(word => (word, 1))
val wordCountRDD = pairRDD.reduceByKey(_ + _)
wordCountRDD.foreach(wordCount => println(wordCount._1 + "---" + wordCount._2))
context.stop()
}
}
public class WordCountJava {
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("worldCount").setMaster("local");
JavaSparkContext javaSparkContext = new JavaSparkContext();
JavaRDD<String> stringJavaRDD = javaSparkContext.textFile("D:\\hadoop\\logs\\hello.txt");
// 数据切割,把一行数据拆分为一个个的单词
// 第一个是输入数据类型,第二个是输出数据类型
JavaRDD<String> wordRDD = stringJavaRDD.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String line) throws Exception {
return Arrays.asList(line.split(" ")).iterator();
}
});
// 迭代word,装换成(word,1)这种形式
// 第一个是输入参数,第二个是输出第一个参数类型,第三个是输出第二个参数类型
JavaPairRDD<String, Integer> pairRDD = wordRDD.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String word) throws Exception {
return new Tuple2<>(word, 1);
}
});
// 根据key进行分组聚合
JavaPairRDD<String, Integer> wordCountRDD = pairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
// 输出控制台
wordCountRDD.foreach(new VoidFunction<Tuple2<String, Integer>>() {
@Override
public void call(Tuple2<String, Integer> tuple2) throws Exception {
System.out.println(tuple2._1 + "=:=" + tuple2._2);
}
});
javaSparkContext.stop();
}
}
[root@hadoop04 conf]# vim spark-env.sh
export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=18080 -Dspark.history.fs.logDirectory=hdfs://hadoop01:9000/tmp/logs/root/logs"
[root@hadoop04 conf]# vim spark-defaults.conf
spark.eventLof.enable=true
spark.eventLog.compress=true
spark.eventLog.dir=hdfs://hadoop01:9000/tmp/logs/root/logs
spark.history.fs.logDirectory=hdfs://hadoop01:9000/tmp/logs/root/logs
# 启动
[root@hadoop04 conf]# sbin/start-history-server.sh
# 访问
http://hadoop04:18080/
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。