当前位置:   article > 正文

MapReduce流程(WordCount案例实现)_mapreduce的wordcount案例

mapreduce的wordcount案例

1 MapReduce概述

设计构思

MapReduce是一个分布式运算程序的编程框架,核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在Hadoop集群上。

MapReduce设计并提供了统一的计算框架,为程序员隐藏了绝大多数系统层面的处理细节。为程序员提供一个抽象和高层的编程接口和框架。程序员仅需要关心其应用层的具体计算问题,仅需编写少量的处理应用本身计算问题的程序代码。如何具体完成这个并行计算任务所相关的诸多系统层细节被隐藏起来,交给计算框架去处理:

Map和Reduce为程序员提供了一个清晰的操作接口抽象描述。MapReduce中定义了如下的Map和Reduce两个抽象的编程接口,由用户去编程实现.Map和Reduce,MapReduce处理的数据类型是<key,value>键值对。

以WordCount为例:

  • Map: ( k1 ; v1 ) →[ ( k2 ; v2 ) ]

    map过程

  • Shuffle过程(不需要我们写)

    shuffle过程

  • Reduce:( k2 ; [ v2 ] )→[ ( k3 , v3 ) ]

Reduce过程

实例进程

实例进程分类

一个完整的mapreduce程序在分布式运行时有三类实例进程:

  1. MRAppMaster负责整个程序的过程调度及状态协调
  2. MapTask负责map阶段的整个数据处理流程
  3. ReduceTask负责reduce阶段的整个数据处理流程

完整执行过程

  1. Client提交计算任务

  2. 启动AppMaster进程

  3. AppMaster请求分配资源

  4. ResourceManager回复资源列表

  5. AppMaster要求NodeManager分配资源

  6. NodeManager执行具体的计算任务

  7. NodeManager将计算状态和结果汇报给AppMaster

  8. AppMaster汇报计算结果

image-20210811172904602

总结

ResourceManager分配任务

NodeManager实际执行任务

2 MapReduce编程规范

Map阶段2个步骤

  1. 设置InputFormat类(设置文件读取方式),将数据切分为Key-Value(K1和V1)对(改行文本的偏移量,该行数据),输入到第二步
  2. 自定义Map逻辑,将第一步的结果转换成另外的Key-Value (K2和V2)对,输出结果

Shuffle阶段4个步骤

  1. 对输出的Key-Value对进行分区
  2. 对不同分区的数据按照相同的Key排序
  3. (可选)对分组过的数据初步规约,降低数据的网络拷贝
  4. 对数据进行分组,相同Key的value放入一个集合中

Reduce阶段2个步骤

  1. 对多个Map任务的结果进行排序以及合并,编写Reduce函数实现自己的逻辑,对输入的Key-value进行处理,转为新的Key-value (K3和V3)输出
  2. 设置OutputFormat处理并保存Reduce输出的Key-Value数据

MapReduce流程

3.实现WordCount案例

3.1准备工作

1)在hadoop集群上上传文件至hdfs

#新建一个文件
vim wordcount.txt

#在文件中写入实例数据 按i键进入文件写入模式,写入后按esc,输入:wq!保存更改
hello,world,hadoop
hive,hello,tom
love,hadoop
hdfs

#上传到HDFS
## 在HDFS文件系统中增加目录wordcount
hdfs dfs -mkdir /wordcount/
## 将刚刚创建的文件上传至该文件夹,登录web界面(50070)可以看到成功上传
hdfs dfs -put wordcount.txt /wordcount
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

2)导入依赖pom.xml

注意查看自己的hadoop版本

    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.6.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.6.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.6.1</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
            <scope>test</scope>
        </dependency> 
    </dependencies>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

3)测试hadoop连接环境

可以使用BigData插件测试,参照文章https://blog.csdn.net/weixin_44155966/article/details/108820920

3.2Map代码编写

package com.hunan.MapReduce;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/*
四个泛型解释:
KEYIN:K1的类型      偏移量
VALUEIN:V1的类型    每行字符串

KEYOUT:K2的类型      单词
VALUEOUT:V2的类型    固定值1

 */
//hadoop自己的类型
public class WordCountMapper extends Mapper<LongWritable,Text, Text,LongWritable> {

    //map方法就是将K1,V1转为K2,V2
    /*
        参数:
            key   :  K1  行偏移量
            value :  V1  每一行的文本数据
            context: 表示上下文对象,将各个流程连在一起
     */
    /*
            如何将K1,V1转换为K2,V2
            K1       V1
            0        hello,world,hadoop
            19       hive,hello,tom
            ----------------------
            K2       V2
            hello    1
            world    1
            hadoop   1
            hive     1
            hello    1
            tom      1
     */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        Text text = new Text();
        LongWritable longWritable = new LongWritable();
        //1.将一行的文本数据进行拆分
        String[] split = value.toString().split(",");
        //2.遍历数组,组装K2,V2
        for (String s : split) {
            //3.将K2,V2写入上下文中
            //context.write(new Text(s), new LongWritable(1));
            text.set(s);
            longWritable.set(1);
            context.write(text,longWritable);
        }

    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58

3.3Reduce代码编写

package com.hunan.MapReduce;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;


/*
四个泛型解释:
KEYIN:K2的类型      单词
VALUEIN:V2的类型    1

KEYOUT:K3的类型      单词
VALUEOUT:V3的类型    个数

 */
public class WordCountReducer extends Reducer<Text, LongWritable,Text,LongWritable> {
    //map方法就是将新K2,V2转为K3,V3
    /*
        参数:
            key: 新K2
            values: 集合 新V2
            context:上下文对象
     */
    /*
            如何将新K2,V2转为K3,V3
            新K2       新V2
            hello     <1,1>
            world    <1>
            hadoop   <1,1,1>
            ---------------
            K3         V3
            hello       2
            world       1
            hadoop      3
     */
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
        long count=0;
        //1.遍历集合,将集合中数字相加,得到V3
        for (LongWritable value : values) {
            count+=value.get();
        }
        //2.将K2和V3写入上下文中
        context.write(key,new LongWritable(count));
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50

3.4任务类编写

package com.hunan.MapReduce;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class JobMain extends Configured implements Tool {
    //该方法用于指定一个job任务,从提交到结果保存的整个任务
    public int run(String[] args) throws Exception {
        //1.创建一个job任务对象
        Job job = Job.getInstance(super.getConf(), "WordConut");

        //2.配置job任务对象(八个步骤)

        //第一步:指定文件读取方式和读取路径
        job.setInputFormatClass(TextInputFormat.class);
        //TextInputFormat.addInputPath(job,new Path("hdfs://master:9000/wordcount"));
        TextInputFormat.addInputPath(job,new Path("file:///C:\\wordcount"));
        //第二步:指定Map阶段的处理方式和数据类型
        job.setMapperClass(WordCountMapper.class);
        job.setMapOutputKeyClass(Text.class);//设置Map阶段K2的类型
        job.setMapOutputKeyClass(LongWritable.class);//设置Map阶段V2的类型
        //第三,四,五,六步   采用默认的shuffle阶段处理
        //第七步:指定ruduce阶段的处理方式和数据类型
        job.setReducerClass(WordCountReducer.class);
        job.setOutputKeyClass(Text.class);//设置Reduce阶段K2的类型
        job.setMapOutputKeyClass(LongWritable.class);//设置Reduce阶段V2的类型
        //第八步:设置输出类型和输出路径
        job.setOutputFormatClass(TextOutputFormat.class);
        //TextOutputFormat.setOutputPath(job,new Path("hdfs://master:9000/wordcount_out"));
        TextOutputFormat.setOutputPath(job,new Path("file:///D:\\wordcount_output"));
        //等待任务结束
        boolean b = job.waitForCompletion(true);

        return b? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        //configuration.set("mapred.job.tracker", "192.168.60.101:9000");
        //启动job任务
        int run = ToolRunner.run(configuration, new JobMain(), args);
        System.exit(run);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53

4.MapReduce运行模式

4.1 集群运行模式

  1. 将MapReduce程序提交给Yarn集群,分发到很多的节点上并发执行

  2. 处理的数据和输出结果应该位于HDFS文件系统

  3. 提交集群的实现步骤:将程序打成JAR包(双击右侧maven-Lifecycle-package),并上传至节点,然后在集群上用hadoop命令启动。

  4. 两个参数分别为jar包名,main方法的路径

hadoop jar hadoop_learning-1.0-SNAPSHOT.jar com.hunan.MapReduce.JobMain
  • 1

4.2 本地运行模式

  1. MapReduce程序在本地以单进程的形式运行
  2. 处理的数据及输出结果在本地文件系统。
TextInputFormat.addInputPath(job,new Path("file:///d:\\data\\wordcount"));
TextOutputFormat.setOutputPath(job,new Path("file:///d:\\data\\wordcount_output"));
  • 1
  • 2
hadoop jar hadoop_learning-1.0-SNAPSHOT.jar com.hunan.MapReduce.JobMain
  • 1
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小丑西瓜9/article/detail/506364
推荐阅读
  

闽ICP备14008679号