当前位置:   article > 正文

MapReduce 初识+案例(词频统计)_mapreduce词频统计

mapreduce词频统计

1. MapReduce

1.1 MapReduce 是什么

MapReduce:是 Hadoop 中的一个分布式计算框架,基于 MapReduce 写出的应用程序能够运行在大型集群上,并以一种可靠容错的方式并行处理上 T 级别的数据集。

一个 MapReduce 作业(Job)通常会把输入的数据切分为若干个独立的数据块,由 Map 任务(Task)以完全并行的方式处理它们。框架会对 Map 的输出先进行排序,然后把结果输入给 Reduce 任务,通常作业的输入和输出都会被存储在文件系统中。

整个框架负责任务的调度和监控,以及重新执行已经失败的任务。

1.2 优点/缺点

1.2.1 优点

  • 易于编程

    MapReduce 将所有的计算抽象为 Map(映射)和 Reduce(聚合)两个阶段,只需要继承并实现 Mapper 和 Reduce 类,就可以完成高性能的分布式程序

  • 高扩展性

    MapReduce 通过将多台机器的计算能力(CPU、内存),提供海量级的计算

  • 高容错性

    高并发(多线程)的分布式程序运行过程中,如果一些线程出现错误或机器故障,MapReduce 可以自动的启动错误重试机制,或将任务转移到其他机器运行,能够保证程序最终正确执行

  • 适用于海量级数据

    HDFS 可以存储的数据量级,MapReduce 可以使用应用程序对其完成计算

1.2.2 缺点

MapReduce 不擅长做实际计算、流式计算、DAG(有向图)计算。

MapReduce 的任务表达能力有限,一个 MapReduce 只能完成一次映射和聚合,像 DAG 任务就需要多次聚合,那就需要将任务拆成多个 MapReduce ,每个MapReduce 任务都需要大量的磁盘 IO,将导致性能低下。

1.3 MapReduce 运行阶段

第一阶段: Map Task 并发实例,完全并行运行,不互相干

第二阶段: Reduce Task 并发实例,获取上一阶段的输出作为本阶段的输入

1.4 MapReduce 进程

MrAppMaster:负责整个程序的过程调度及状态调度

MapTask:负责 Map 阶段的整个数据处理流程

ReduceTask:负责 Reduce 阶段的整个数据处理流程

2. Java 词频统计

在理解 MapReduce 之前,不如先用 Java 实现一个词频统计的实例。

public static void main(String[] args) throws IOException {
    // 1. 创建容器存储结果
    HashMap<String, Integer> map = new HashMap<>();

    // 2. 读取文件
    File file = new File("...");
    String encoding = "utf8";
    List<String> lines = FileUtils.readLines(file, encoding);

    // 3. 遍历每一行
    for (String line : lines) {

        // 4. 切分出每个单词
        String[] words = line.split("\\s+");
        for (String word : words) {

            // 5. 替换掉特殊字符
            String w = word.toLowerCase().replace("\\W", "");

            // 6. 每出现一个单词进行数量 + 1
            if (!w.isEmpty()){
                map.put(w, map.getOrDefault(w,0) + 1 );
            }
        }
    }

    // 7. 将统计结果进行排序
    ArrayList<Map.Entry<String, Integer>> entries = new ArrayList<>(map.entrySet());

    entries.sort(new Comparator<Map.Entry<String, Integer>>() {
        @Override
        public int compare(Map.Entry<String, Integer> o1, Map.Entry<String, Integer> o2) {
            return o2.getValue() - o1.getValue();
        }
    });

    for (Map.Entry<String, Integer> entry : entries) {
        System.out.printf("单词:%s \t出现的个数为 %d\n", entry.getKey(), entry.getValue());
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41

3. MapReduce 编程规范

利用 MapReduce 实现词频统计之前还需要了解 MapReduce 的编程规范。

通常我们编写一个 MapReduce 程序,会将其分解为三个部分:Mapper、Reduce、Driver。

**Mapper: **

  1. 自定义一个类,并继承 Mapper,定义输入输出键值对的泛型
  2. 实现父类的 map() 方法(MapTask进程),定义键值对的参数类型及上下文对象 context
  3. 编写 map 的具体实现,最后通过 context 对象将映射结果写入 MapReduce 框架

**Reduce: **

  1. 自定义一个类,并继承 Reduce,定义输入输出键值对的泛型
  2. 实现父类的 reduce() 方法(ReduceTask进程),定义键值对的参数类型及上下文对象 context
  3. 编写 reduce的具体实现,最后通过 context 对象将聚合结果写入 MapReduce 框架

Driver:

  1. 这是一个包含 main 方法的 MapReduce 任务的入口
  2. 实例化 Job 对象,可选择性的添加各种配置
  3. 将 Job 任务提交到集群

4. MapReduce 词频统计

4.1 Mapper

我们需要继承 Mapper 类,来自于:org.apache.hadoop.mapreduce.Mapper

4.1.1 窥见源码

4.1.1.1 Mapper 类

Mapper 类提供了四个泛型,分别是输入参数的键值对类型、输出参数的键值对类型

public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {...}
  • 1

⚠️ 注意:这里我们不适用 Java 提供的类型,而是使用 Hadoop 实现的序列化类型,公共接口为 Writable

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/747951
推荐阅读
相关标签