当前位置:   article > 正文

Hadoop-4.MapReduce的应用_实验2:mapreduce的应用

实验2:mapreduce的应用

目录:

  1. MapReduce简介
  2. MapReduce应用场景
  3. MapReduce环境搭建
  4. Java操作MapReduce
  5. MapReduce程序运行
  6. 序列化
  7. RPC协议
  8. MapReduce输入处理类
  9. MapReduce执行过程MapReduce执行过程简概

 

一.MapReduce简介

1.概念

2.概述

 

二.MapReduce应用场景

 

三.MapReduce环境搭建

伪分布式环境搭建:在hdfs环境搭建好的前提下,官网文档:http://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/SingleCluster.html

  1. 配置
    第一步:先修改文件名,因为hadoop目录下只有mapred-site.xml.template文件:
    mv mapred-site.xml.template mapred-site.xml
    第二步:修改hadoop安装目录下etc/hadoop/mapred-site.xml:
    1. <configuration>
    2. <property>
    3. <name>mapreduce.framework.name</name>
    4. <value>yarn</value>
    5. </property>
    6. </configuration>

    第三步:修改hadoop安装目录下etc/hadoop/yarn-site.xml:

    1. <configuration>
    2. <property>
    3. <name>yarn.nodemanager.aux-services</name>
    4. <value>mapreduce_shuffle</value>
    5. </property>
    6. </configuration>
  2. 启动
    1. 先启动hdfs:start-dfs.sh
      验证:
    2. 启动yarn,mapReduce建立在yarn框架之上,启动yam就启动了mapReduce:
      1. 在hadoop安装目录下的sbin目录下 start-yarn.sh
      2. 验证:
  3. 测试:
    例如:http://localhost:8088/
    说明:为什么是http,而不是https,https比http协议更安全,有转密技术,https需要资金购买,所以用https作为测试不现实

 

四.Java操作MapReduce



需求:

开发步骤:

  1. 将 bbb.txt 上传到hdfs下,对它进行操作
    1. [root@kd01 test]# vi bbb.txt
    2. [root@kd01 test]# hdfs dfs -put bbb.txt /user/yiguang/
  2. 创建一个maven项目,在pom.xml文件中添加依赖,是在hdfs的基础上,所以hdfs的依赖包也需要

    pom.xml
    1. <dependencies>
    2. <dependency>
    3. <groupId>junit</groupId>
    4. <artifactId>junit</artifactId>
    5. <version>3.8.1</version>
    6. <scope>test</scope>
    7. </dependency>
    8. <!-- hdfs依赖包 -->
    9. <dependency>
    10. <groupId>org.apache.hadoop</groupId>
    11. <artifactId>hadoop-hdfs</artifactId>
    12. <version>2.7.3</version>
    13. </dependency>
    14. <dependency>
    15. <groupId>org.apache.hadoop</groupId>
    16. <artifactId>hadoop-common</artifactId>
    17. <version>2.7.3</version>
    18. </dependency>
    19. <dependency>
    20. <groupId>org.apache.hadoop</groupId>
    21. <artifactId>hadoop-client</artifactId>
    22. <version>2.7.3</version>
    23. </dependency>
    24. <!-- mapreduce依赖包 -->
    25. <dependency>
    26. <groupId>org.apache.hadoop</groupId>
    27. <artifactId>hadoop-mapreduce-client-core</artifactId>
    28. <version>2.7.3</version>
    29. </dependency>
    30. <dependency>
    31. <groupId>org.apache.hadoop</groupId>
    32. <artifactId>hadoop-mapreduce-client-common</artifactId>
    33. <version>2.7.3</version>
    34. </dependency>
    35. </dependencies>
  3. 编写map函数
    1. package com.yiguang.test.mapreduceTest;
    2. import java.io.IOException;
    3. import org.apache.hadoop.io.IntWritable;
    4. import org.apache.hadoop.io.LongWritable;
    5. import org.apache.hadoop.io.Text;
    6. import org.apache.hadoop.mapreduce.Mapper;
    7. public class WordMap extends Mapper<LongWritable, Text, Text, LongWritable>{
    8. @Override
    9. protected void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException {
    10. /*
    11. * key: 输入的key
    12. * value: 数据
    13. * context: Map上下文
    14. */
    15. //得到一行的值
    16. //将拿到的一行数据通过空格分割为一个一个的元素
    17. String[] str=value.toString().split(" ");
    18. for(String str1:str) {
    19. //将每一个元素发送到reduce
    20. context.write(new Text(str1),new LongWritable(1));
    21. }
    22. }
    23. }
  4. 编写reduce函数
    1. package com.yiguang.test.mapreduceTest;
    2. import java.io.IOException;
    3. import org.apache.hadoop.io.IntWritable;
    4. import org.apache.hadoop.io.LongWritable;
    5. import org.apache.hadoop.io.Text;
    6. import org.apache.hadoop.mapreduce.Reducer;
    7. public class WordReduce extends Reducer<Text,LongWritable,Text,LongWritable>{
    8. @Override
    9. protected void reduce(Text key,Iterable<LongWritable> values,Context context) throws IOException, InterruptedException {
    10. int sum=0;
    11. for(LongWritable iw:values) {
    12. sum+=iw.get();
    13. }
    14. //输出
    15. context.write(key, new LongWritable(sum));
    16. }
    17. }
  5. 编写main函数,注意:引入类时不要引入了其他的类,否则会出错,例如:Hadoop执行job出错:java.lang.ClassCastException
    1. package com.yiguang.test.mapreduceTest;
    2. import java.io.IOException;
    3. import org.apache.hadoop.conf.Configuration;
    4. import org.apache.hadoop.fs.Path;
    5. import org.apache.hadoop.io.LongWritable;
    6. import org.apache.hadoop.io.Text;
    7. import org.apache.hadoop.mapreduce.Job;
    8. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    9. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    10. public class WordJob {
    11. public static void main(String[] args) throws Exception{
    12. //创建一个job=map+reduce
    13. Job job=Job.getInstance(new Configuration());
    14. job.setJobName("kd01");
    15. //指定任务的入口
    16. job.setJarByClass(WordJob.class);
    17. //指定job的mapper
    18. job.setMapperClass(WordMap.class);
    19. job.setMapOutputKeyClass(Text.class);
    20. job.setMapOutputValueClass(LongWritable.class);
    21. //指定job的reducer
    22. job.setReducerClass(WordReduce.class);
    23. job.setOutputKeyClass(Text.class);
    24. job.setOutputValueClass(LongWritable.class);
    25. //指定任务的输入:原始数据的存放位置
    26. FileInputFormat.addInputPath(job, new Path("/user/yiguang/bbb.txt"));
    27. //指定任务的输出:指定一个空目录或者一个不存在的目录
    28. FileOutputFormat.setOutputPath(job, new Path("/kd01_out"));
    29. //提交任务
    30. job.waitForCompletion(true);
    31. }
    32. }
  6. 运行:第五个知识点

 

五.MapReduce程序运行

注意:在项目编译的时候jdk的版本与在linux中发布hadoop时,hadoop关联的jdk的版本,两者要一致,如果两者不同,可以修改项目编译的版本,在pom.xml中通过插件的方式,修改maven项目jdk编译的版本。指定maven项目编译JDK版本

发布:

[root@kd01 home]# hadoop jar mapreduceTest-0.0.1-SNAPSHOT.jar com.yiguang.test.mapreduceTest.WordJob

查看:

 

六.序列化



 

七.RPC协议


例如:zookeeper+dubbo,发布接口,供使用者调用
介绍及使用:https://www.jb51.net/article/127852.htm
https://blog.csdn.net/zhaowen25/article/details/45443951

 

八.MapReduce输入处理类

类结构

InputFormat:

  • 决定读取数据的格式,可以是文件或数据库等
  • 作用
    1. 验证作业输入的正确性,如格式等
    2. 将输入文件切割成逻辑分片(InputSplit),一个InputSplit将会被分配给一个独立的Map任务
    3. 提供RecordReader实现,读取InputSplit中的"K-V对"供Mapper使用
  • 方法:
    1. List getSplits(): 获取由输入文件计算出输入分片(InputSplit),解决数据或文件分割成片问题
    2. RecordReader <k,v>createRecordReader():</k,v> 创建RecordReader,从InputSplit中读取数据,解决读取分片中数据问题

FileInputFormat:

  • FileInputFormat是所有以文件为数据源的InputFormat实现的基类,FileInputFormat保存作为job输入的所有文件,并实现了对输入文件计算splits的方法。至于获得记录的方法是有不同的子类 ------TextInputFormat进行实现的。
  • InputForamt负责处理MR的输入部分。

TextInputFormat 

  • TextInputFormat 是默认的处理类,处理普通文本文件
  • 输入文件中的每一行就是一个记录,Key是这一行的byte offset(起始偏移量),而value是这一行的内容
  • 默认以\n 或回车键作为一行记录
  • TextInputFormat继承了FileInputFormat

InputSplit(重点)

  • 代表一个个逻辑分片,并没有真正存储数据,只是提供了一个如何将数据分片的方法
  • Split内有Location信息,利于数据局部化
  • 一个InputSplit给一个单独的Map处理
  • 在执行mapreduce 之前,原始数据被分割成若干个split,每个split作为一个map任务的输入,在map执行过程中split被分解为一个个记录(key- value对),map会依次处理每一个记录。
    FileInputFormat只划分比HDFS block大的文件,所以FileInputFormat 划分的结果是这个文件或这个文件的一部分。
    如果一个文件的大小比block小,将不会被划分,这也是Hadoop处理大文件的效率要比处理很多小小文件效率高的原因。
    当Hadoop处理很多小文件(文件大小小于hdfs block大小)的时候,由于FileInputFormat 不会对小文件进行划分,所以每一个小文件都会被当做一个split 并分配一个map任务,导致效率低下。

RecordReader:将InputSplit拆分成一个个<key,value>对给Map处理,也是实际的文件读取分隔对象</key,value>

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/429260
推荐阅读
相关标签
  

闽ICP备14008679号