当前位置:   article > 正文

分布式系统开发实战:分布式计算,实战:基于Spark词频统计_分布式词频统计代码

分布式词频统计代码

实战:基于Spark词频统计

下面,我们将演示基于Spark框架来实现词频统计功能。

项目概述

我们将创建一个名为“spark-word-count”的应用。在该应用中,我们将使用Spark来实现对文章中单词的出现频率进行统计。

为了能够正常运行该应用,需要在应用中添加以下Spark依赖。

  1. <properties>
  2. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  3. <spark.version>2.3.0</spark.version>
  4. </properties>
  5. <dependencies>
  6. <dependency>
  7. <groupId>org.apache.spark</groupId>
  8. <artifactId>spark-core_2.11</artifactId>
  9. <version>${spark.version}</version>
  10. </dependency>
  11. <dependency>
  12. <groupId>org.apache.spark</groupId>
  13. <artifactId>spark-sql_2.11</artifactId>
  14. <version>${spark.version}</version>
  15. </dependency>
  16. </dependencies>

项目配置

我们事先在D盘下准备了一个TXT文本文件——rfc7230.txt。该文件是HTTP规范RFC 7230的全文内容。

当我们的应用启动之后,会读取该文件的内容,作为词频统计的基础。

编码实现

基于Spark的词频统计程序将会变得非常简单。以下是应用

JavaWordCount的所有内容。

  1. package com.waylau.spark;
  2. import scala.Tuple2;
  3. import org.apache.spark.api.java.JavaPairRDD;
  4. import org.apache.spark.api.java.JavaRDD;
  5. import org.apache.spark.sql.SparkSession;
  6. import java.util.Arrays;
  7. import java.util.List;
  8. import java.util.regex.Pattern;
  9. public final class JavaWordCount {
  10. private static final Pattern SPACE = Pattern.compile(" ");
  11. public static void main(String[] args) throws Exception {
  12. if (args.length < 1) {
  13. System.err.println("Usage: JavaWordCount <file>");
  14. System.exit(1);
  15. }
  16. SparkSession spark = SparkSession.builder().appName("JavaWordCount").
  17. getOrCreate();
  18. JavaRDD<String> lines = spark.read().textFile(args[0]).javaRDD();
  19. JavaRDD<String> words = lines.flatMap(s -> Arrays.asList(SPACE.split(s)).
  20. iterator());
  21. JavaPairRDD<String, Integer> ones = words.mapToPair(s -> new Tuple2<>(s, 1));
  22. JavaPairRDD<String, Integer> counts = ones.reduceByKey((i1, i2) -> i1 + i2);
  23. List<Tuple2<String, Integer>> output = counts.collect();
  24. for (Tuple2<?, ?> tuple : output) {
  25. System.out.println(tuple._1() + ": " + tuple._2());
  26. }
  27. spark.stop();
  28. }
  29. }

运行

为了能够正常运行该程序,我们要在应用启动参数中指定待统计的文件rfc7230.txt所在的路径。同时,设置程序为local模式。启动参数设置如图13-2所示。

图13-2 启动参数设置

应用正常启动之后,应能在控制台看到以下词频统计信息。

  1. Unfortunately,: 2
  2. .................................................56: 1
  3. constraints: 1
  4. retry.: 2
  5. Saurabh: 1
  6. "accelerator": 1
  7. desirable: 1
  8. listening: 5
  9. components.: 1
  10. GmbH: 1
  11. order: 29
  12. 7234,: 1
  13. Compression: 2
  14. Supported: 1
  15. behind: 2
  16. merge: 1
  17. end: 6
  18. been: 64evaluating: 1
  19. Failures: 2
  20. accomplished: 2
  21. "?": 8
  22. A.2.: 2
  23. clients: 18
  24. 9.: 2
  25. knows: 2
  26. selective: 1
  27. less: 2
  28. Reed,: 1
  29. supporting: 2
  30. 64]: 1
  31. expanded.: 1
  32. Nathan: 1
  33. RWS: 12
  34. ignore: 13
  35. entry: 2
  36. (DQUOTE: 1
  37. are: 145
  38. "path-abempty",: 1
  39. 2.: 5
  40. Nilsson,: 1
  41. Isomaki,: 1
  42. Content-Type:: 1
  43. consists: 4
  44. undesirable: 1
  45. Miles: 1
  46. qvalues: 1
  47. records: 1
  48. different: 11
  49. Smuggling: 2
  50. trailer-part: 5
  51. necessitated: 1
  52. ...

当然,词频统计列表较长,这里只展示了列表中的部分单词。

本节示例,可以在spark-word-count项目下找到。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/758616
推荐阅读
相关标签
  

闽ICP备14008679号