当前位置:   article > 正文

全方位揭秘!大数据从0到1的完美落地之WordCount案例_拥抱大数据 csdn

拥抱大数据 csdn

cb8065380cd79123adbc76c018a3fd85b3b78041

MapReduce的编程规范

用户编写MapReduce程序的时候,需要设计至少三个类: Mapper、Reducer、Driver(用于提交MR的任务)

Mapper
  1. 自定义类,继承 org.apache.hadoop.mapreduce.Mapper 类型
  2. 定义K1,V1,K2,V2的泛型(K1,V1是Mapper的输入数据类型,K2,V2是Mapper的输出数据类型)
  3. 重写map方法(处理逻辑)

iShot2022-01-27 22.24.46

注意: Map方法,每一个键值对都会调用一次。

Reducer
  1. 自定义类,继承 org.apache.hadoop.mapreduce.Reducer 类型
  2. 定义K2,V2,K3,V3的泛型(K2,V2是Reducer的输入数据类型,K3,V3是Reducer的输出数据类型)
  3. 重写reduce方法的处理逻辑

iShot2022-01-27 22.30.38

注意: reduce方法,默认按key分组,每一组都调用一次。

Driver

MapReduce的程序,需要进行执行之前的属性配置与任务的提交,这些操作都需要在Driver类中来完成。

WordCount案例演示

案例需求

给定一个路径,统计这个路径下所有的文件中的每一个单词出现的次数。

放轻松,这个需求非常的简单,相当于我们在学习Java的时候写的Hello World程序那么简单,是一个入门案例。

MapReduce

测试数据
a.txt
hello qianfeng hello 1999 hello beijing hello 
world hello hello java good

b.txt
hello xisanqi hello bingbing 
hello chenchen hello 
ACMilan hello china

c.txt
hello hadoop hello java hello storm hello spark hello redis hello zookeeper
hello hive hello hbase hello flume
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
pom文件
<dependencies>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-common</artifactId>
        <version>3.3.1</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>3.3.1</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs -->
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-hdfs</artifactId>
        <version>3.3.1</version>
    </dependency>
</dependencies>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
定义Mapper类
package wc;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * @company 北京千锋互联科技有限公司
 */
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    /**
     * Map阶段,每当读取到一行的数据,都会触发这个方法
     * 注意: LongWritable、Text、IntWritable都是Hadoop中的序列化的数据类型,相当于Java中的long、String、int类型。这部分内容在后续的序列化部分讲解。
     *
     * @param key 读取到数据的行偏移量,表示这一行的数据的首字符在这个数据块中是第几个字符
     * @param value 读取到的一行的数据
     * @param context 操作上下文
     */
    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
        // 1. 获取读取到的每一行的数据
        String line = value.toString();
        // 2. 将每一行的数据进行切割,得到每一个单词
        String[] words = line.split("\\s+");
        // 3. 将每一个单词与数字1拼接成键值对,写出
        for (String word : words) {
            context.write(new Text(word), new IntWritable(1));
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
定义Reducer类
package wc;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * @company 北京千锋互联科技有限公司
 */
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    /**
     * 经过Shuffle阶段的处理,Map阶段写出的所有键值对按照Key进行了分组,并将所有的值都存入一个集合中。形成了 <K2, Iterable<V2>>的键值对。每一个Key都会触发一次这个方法。
     *
     * @param key K2,在这个需求中就是每一个单词
     * @param values V2,一个单词出现的所有的次数
     * @param context 操作上下文
     */
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
        // 遍历这个Key对应的所有的次数,累加到一起即可得到单词出现的总次数
        // 1. 定义一个变量,用来记录总的次数
        int times = 0;
        // 2. 遍历每一个次数
        for (IntWritable value : values) {
            // 3. 累加次数,因为value是IntWritable类型的,因此需要使用get获取到包装的整型的值
            times += value.get();
        }
        // 4. 将最终的结果写出
        context.write(key, new IntWritable(times));
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
定义Driver类
package wc;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * @company 北京千锋互联科技有限公司
 */
public class WordCountDriver {
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        // 1. 获取配置信息
        //    加载默认的配置,即core-default.xml、hdfs-default.xml、mapred-default.xml和yarn-default.xml中的配置信息
        //    然后读取项目中的core-site.xml、hdfs-site.xml、mapred-site.xml和yarn-site.xml中配置的信息,更新某些属性的值
        Configuration conf = new Configuration();
        // 2. 如果需要继续更新某些属性的值,可以在代码中更新
        conf.set("fs.defaultFS", "hdfs://192.168.10.101:9820");
        // 3. 创建Job对象
        Job job = Job.getInstance(conf);

        // 4. 设置Mapper类型
        job.setMapperClass(WordCountMapper.class);
        // 5. 设置Reducer类型
        job.setReducerClass(WordCountReducer.class);
        // 6. 设置驱动类型
        job.setJarByClass(WordCountDriver.class);

        // 7. 设置Map阶段输出的键值对类型
        //    如果Map阶段输出的键值对类型与Reduce阶段输出的键值对类型相同,则可以省略这个设置
        //    例如: 现在的Map阶段输出是<Text, IntWritable>类型的,与Reduce阶段的数据类型相同,因此可以省略不写
        // job.setMapOutputKeyClass(Text.class);
        // job.setMapOutputValueClass(IntWritable.class);

        // 8. 设置Reduce阶段输出的键值对类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 9. 设置输入输出路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 10. 提交Job
        System.exit(job.waitForCompletion(true) ? 0 : -1);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
打jar包

打包操作:

image-20220127230146326

jar包位置

image-20220127230313517

测试
  1. 在本地创建好测试数据,将其上传到HDFS

    hdfs dfs -put ~/input /wordcount
    
    • 1
  2. 检查YARN是否开启,如果没有开启,将其开启

    start-yarn.sh
    
    • 1
  3. 执行程序

    # hadoop jar jar包名称 驱动类全名 输入路径 输出路径
    # 注意,输出路径不能存在,必须由程序自己创建。如果输出路径存在,会出现异常
    hadoop jar xxxxx.jar com.qianfeng.wordcount.WordCount.Driver /wordcount/input /wordcount/output
    
    • 1
    • 2
    • 3
查看结果
hdfs dfs -cat /wordcount/output/*
  • 1

image-20220628181555290

作业提交全过程详解
(1)作业提交第1步:Client调用job.waitForCompletion方法,向整个集群提交MapReduce作业。
第2步:Client向RM申请一个作业id。
第3步:RM给Client返回该job资源的提交路径和作业id。
第4步:Client提交jar包、切片信息和配置文件到指定的资源提交路径。
第5步:Client提交完资源后,向RM申请运行MrAppMaster。
(2)作业初始化第6步:当RM收到Client的请求后,将该job添加到容量调度器中。
第7步:某一个空闲的NM领取到该Job。
第8步:该NM创建Container,并产生MRAppmaster。
第9步:下载Client提交的资源到本地。
(3)任务分配第10步:MrAppMaster向RM申请运行多个MapTask任务资源。
第11步:RM将运行MapTask任务分配给另外两个NodeManager,另两个NodeManager分别领取任务并创建容器。
(4)任务运行第12步:MR向两个接收到任务的NodeManager发送程序启动脚本,这两个NodeManager分别启动MapTask,MapTask对数据分区排序。
第13步:MrAppMaster等待所有MapTask运行完毕后,向RM申请容器,运行ReduceTask。
第14步:ReduceTask向MapTask获取相应分区的数据。第15步:程序运行完毕后,MR会向RM申请注销自己。
(5)进度和状态更新YARN中的任务将其进度和状态(包括counter)返回给应用管理器,客户端每秒(通过mapreduce.client.progressmonitor.pollinterval设置)向应用管理器请求进度更新,展示给用户。
(6)作业完成
除了向应用管理器请求作业进度外,客户端每5秒都会通过调用waitForCompletion()来检查作业是否完成。时间间隔可以通过mapreduce.client.completion.pollinterval来设置。作业完成之后,应用管理器和Container会清理工作状态。作业的信息会被作业历史服务器存储以备之后用户核查。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/401851
推荐阅读
相关标签
  

闽ICP备14008679号