当前位置:   article > 正文

【MapReduce】 MR初识_代码mr是什么意思

代码mr是什么意思


一、定义

1.MR是一个分布式运算程序的编程框架,是用户开发“基于Hadoop的数据分析应用”的核心框架

2.MR的核心功能将用户编写的业务逻辑代码和自带默认组件合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上

返回顶部


二、优缺点

优点

  • 易于编程 : 简单的实现一些接口就可以完成分布式程序 ---- 与简单的串行程序一样。
    * 良好的扩展性 : 可以通过增加机器提高运行。
    * 高容错性:可以将故障机器上的计算任务转移至另一个节点上运行,完全有Hadoop内部自动完成。
    * 适合PB级以上海量数据的离线处理

缺点

  • 不擅长实时计算:无法像mysql一样,在毫秒或者秒级内返回结果。
    * 不擅长流式计算:流式计算的输入数据是动态的,而MR的数据数据集是静态的,不能动态变化。
    * 不擅长DAG有向图的计算:多个应用程序之间存在依赖关系,后一个应用程序的输入为前一个的输出。此情况下,MR可以做,但是会造成大量的磁盘IO,性能低下。

返回顶部


三、MR核心编程思想 — 案例WordCount

在这里插入图片描述

  • MR运算程序分为两个部分:MapReduce

  • Map数据的划分:按照块级划分,默认每128M为一块

  • 每一块数据集对应启动一个MapTask,实行完全并行计算,互不干扰。

    Task处理时,读取数据并按行处理按空格切分
    形成(K,V)键值对的形式
    将所有的键值对按照单词首字母,分成两个分区溢写到磁盘

  • Reduce阶段的并发ReduceTask,完全互不相犯。但是数据来源于上一个阶段所有MapTask并发实例的输出

  • MR编程模型只能包含一个M端和一个R端,若业务逻辑复杂,可采用多个MR程序串行运行。

返回顶部


四、MR进程

一个完整的MR程序在分布式运行时有三类实例进程:

MrAppMaster负责成个程序的过程调度及状态协调 (准备一些资源 ~ 内存~ CPU~ 开启后续进程等等,job级别)
MapTask : 负责Map阶段的整个数据处理流程 —
ReduceTask : 负责Reduce阶段的整个数据处理流程 —

返回顶部


五、MR编程规范

三部分:Map 、 Reduce 、 Driver

  • Map阶段
    - 用户自定义的Mapper继承自己的父类
    - Mapper的输入、输出数据都是KV对的形式,且类型可自定义
    - Mapper中的业务逻辑写在map()方法中
    - map()方法(MapTask进程)对每一个<K,V>只调用一次 — 对输入行级数据只调用一次,处理后输出

  • Reduce阶段
    - 用户自定义的Reducer继承自己的父类
    - Reducer的输入数据类型对应Mapper的输出数据类型,也是KV对Reducer的数入,就是Mapper的输出
    - Reducer中的业务逻辑写在Reducer()方法中
    - ReduceTask进程对每一组相同 K 的 <K,V> 组调用一次reduce()方法

  • Driver阶段
    - 相当于Yarn集群的客户端,用于提交我们整个程序到Yarn集群,提交的是封装了MR程序相关运行参数的Job对象

在这里插入图片描述

返回顶部


六、wordCount案例

① 创建工程

▲创建Maven项目

在这里插入图片描述
在这里插入图片描述
返回顶部


▲pom.xml文件添加依赖
<dependencies>
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>RELEASE</version>
		</dependency>
		<dependency>
			<groupId>org.apache.logging.log4j</groupId>
			<artifactId>log4j-core</artifactId>
			<version>2.8.2</version>
		</dependency>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-common</artifactId>
			<version>2.7.2</version>
		</dependency>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-client</artifactId>
			<version>2.7.2</version>
		</dependency>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-hdfs</artifactId>
			<version>2.7.2</version>
		</dependency>
</dependencies>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

返回顶部


▲配置日志文件

在项目的src/main/resources目录下,新建一个文件,命名为“log4j.properties”,在文件中填入。

log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

返回顶部


在这里插入图片描述

② 代码实现

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 *  Mapper 阶段
 *  KEYIN 输入数据的key类型
 *  VALUEIN 输入数据的value类型
 *  KEYOUT 输出数据的key类型
 *  VALUEOUT 输出数据的value类型
 */
public class wordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
    // 创建对象
    Text k = new Text();
    IntWritable v = new IntWritable();
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 1.获取一行数据
        // atguigu atguigu
        String line = value.toString();

        // 2.切分
        String[] words = line.split(" ");

        // 3.循环写出
        for (String word:words){
            // 设置键 atguigu
            k.set(word);
            // 设置词频为 1 , 也可以在上面创建对象时默认为1
            v.set(1);
            // 生成键值对 (atguigu,1)
            context.write(k,v);
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;

/**
 *  Reducer 阶段
 *  KEYIN ,VALUEIN  Reducer阶段输入(Mapper阶段输出)数据的类型
 *  KEYOUT 最终输出数据的key类型
 *  VALUEOUT 最终输出数据的value类型
 */
public class wordCountReducer extends Reducer<Text, IntWritable,Text,IntWritable> {

    IntWritable v = new IntWritable();

    @Override
    // Iterable<IntWritable> values 对key的value值进行迭代实现词频统计
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

        // atguigu,1
        // atguigu,1

        // 1.累加求和
        int sum = 0;
        for (IntWritable value:values){
            // value是IntWritable类型数据,通过get转为int型,才好计算
            sum  += value.get();
        }
        // 2.写出结果
        v.set(sum);
        context.write(key,v);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;

public class wordCountDriver {

    public static void main(String[] args) {

        Configuration conf = new Configuration();
        Job job = null;
        try {
            // 1.获取job对象
            job = Job.getInstance(conf);
            // 2.设置jar存储位置
            job.setJarByClass(wordCountDriver.class);
            // 3.关联map、reduce类
            job.setMapperClass(wordCountMapper.class);
            job.setReducerClass(wordCountReducer.class);
            // 4.设置Mapper阶段输出数据的key、value类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
            // 5.设置Reducer阶段输出数据的key、value类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            // 6.设置输入、出路径
//            FileInputFormat.setInputPaths(job,new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第一章_MR概述\\dataset\\"));
//            FileOutputFormat.setOutputPath(job,new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第一章_MR概述\\output\\"));
            // 打jar包
            FileInputFormat.setInputPaths(job,new Path(args[0]));
            FileOutputFormat.setOutputPath(job,new Path(args[1]));
            // 7.提交job
            job.waitForCompletion(true);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46

返回顶部


③ 本地运行

  • 设置输入输出路径为本地路径
// 6.设置输入、出路径
FileInputFormat.setInputPaths(job,new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第一章_MR概述\\dataset\\"));
FileOutputFormat.setOutputPath(job,new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第一章_MR概述\\output\\"));
  • 1
  • 2
  • 3
  • 运行结果
    在这里插入图片描述

返回顶部


④ 集群运行

▲环境配置 — 打jar包Maven依赖
 <plugin>
     <artifactId>maven-assembly-plugin </artifactId>
       <configuration>
           <descriptorRefs>
               <descriptorRef>jar-with-dependencies</descriptorRef>
           </descriptorRefs>
           <archive>
               <manifest>
                   <mainClass>第一章_MR概述.wordCountDriver</mainClass> // 这里要对应自己的主类路径进行修改
               </manifest>
           </archive>
       </configuration>
       <executions>
           <execution>
               <id>make-assembly</id>
               <phase>package</phase>
               <goals>
                   <goal>single</goal>
               </goals>
           </execution>
       </executions>
   </plugin>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

返回顶部


▲生成jar包

在这里插入图片描述
返回顶部


▲具体执行
  • 上传文件到hdfs,以及通过Xshell上传jar包至虚拟机~
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
  • 输出文件不需要创建,在执行jar包时会自动创建,如不删掉,会报文件夹已存在的异常。所以在上面删除了output文件夹~
  • 执行jar包运行命令
  • hadoop jar jar包名 主类 输入文件路径 输出文件路径
    在这里插入图片描述

返回顶部


▲查看结果

在这里插入图片描述
在这里插入图片描述

返回顶部


<!-- 这里是以后会用的一些jar的总配置,仅供参考!!! -->
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.zyx</groupId>
    <artifactId>MR</artifactId>
    <version>1.0-SNAPSHOT</version>
    <dependencies>
        <!--   测试     -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>RELEASE</version>
        </dependency>
        <!--   日志     -->
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.8.2</version>
        </dependency>
        <!--    hadoop    -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.7.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.7.2</version>
        </dependency>

        <!--  jdbc     -->
<!--        <dependency>-->
<!--            <groupId>mysql</groupId>-->
<!--            <artifactId>mysql-connector-java</artifactId>-->
<!--            <version>5.1.40</version>-->
<!--        </dependency>-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.19</version>
        </dependency>
        <!--   分词器     -->
        <dependency>
            <groupId>com.janeluo</groupId>
            <artifactId>ikanalyzer</artifactId>
            <version>2012_u6</version>
        </dependency>

        <!--   fastJson 依赖   -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.6</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.6.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                    <compilerArguments>
                        <extdirs>libs</extdirs>
                        <!--  rt包没有打到项目中去 -->
                        <verbose />
                        <!-- C:/Program Files/Java/jdk1.8.0_201 是我本地安装的jdk家目录,rt.jar等jar 我在 jdk家目录下的 /jre/lib/ 目录中有发现存在,你们需要注意确认自己的实际情况,Windows分隔符英文分号,linux分隔符英文冒号  -->
                        <bootclasspath>G:/Projects/jdk1.8.0_202/jre/lib/rt.jar;G:/Projects/jdk1.8.0_202/jre/lib/jce.jar;G:/Projects/jdk1.8.0_202/jre/lib/jsse.jar</bootclasspath>
                    </compilerArguments>
                </configuration>
            </plugin>
<!--            <plugin>-->
<!--                <artifactId>maven-compiler-plugin</artifactId>-->
<!--                <version>2.3.2</version>-->
<!--                <configuration>-->
<!--                    <source>1.8</source>-->
<!--                    <target>1.8</target>-->
<!--                </configuration>-->
<!--            </plugin>-->
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <archive>
                        <manifest>
                            <mainClass>通信数据处理.DriverTest</mainClass>
                        </manifest>
                    </archive>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <!--下面是为了使用 mvn package命令,如果不加则使用mvn assembly-->
                <executions>
                    <execution>
                        <id>make-assemble</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/代码探险家/article/detail/777837
推荐阅读
相关标签
  

闽ICP备14008679号