当前位置:   article > 正文

eclipse开发Mapreduce初_一、实验目的及要求 1、在eclipse环境下编写一个mapreduce程序,掌握mapredcue

一、实验目的及要求 1、在eclipse环境下编写一个mapreduce程序,掌握mapredcue编程

环境

名称版本地址
eclipseeclipse-jee-neon-1a-win32-x86_64www.baidu.com
hadoophadoop-2.6.0-cdh5.8.2www.baidu.com
eclipse-hadoop插件hadoop-2.6.0 64x下载地址

目录

1.eclipse安装hadoop插件

2.编写mapreduce代码

3.mapreduce原理分析

插件安装

1.下载插件安装到“红色框框”标识目录下即可

2.启动eclipse并配置hadoop插件

3.创建JAVA项目导入Hadoop包

如果不知道导入那些包就将“下图”路径的所有目录下的jar添加进来
  • 1

4.创建class编写mapreduce代码

package mapreduce.xiaozhang.wc;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.StringUtils;

public class wc {

    public static void main(String[] args) throws Exception {

        /*
         * String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
         * if (otherArgs.length != 2) {
         *  System.err.println("Usage: wordcount <in> <out>");
         *  System.exit(2);
         * }
         * 
         *   FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
         *   FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
         * 
         */

        Configuration conf = new Configuration();

        FileSystem fs = FileSystem.get(conf);

        Job job = Job.getInstance(conf);
        //添加mapreduce Main入口类
        job.setJarByClass(wc.class);
        //设置名称
        job.setJobName("wc");
        //设置 mapreduce的 map 和 reduce 入口
        job.setMapperClass(wc.Mymapper.class);
        job.setReducerClass(wc.MyReduce.class);
        //这是输出数据的类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        //需要分析数据的目录
        FileInputFormat.addInputPath(job, new Path("/input/"));
        //分析完数据输出路径
        Path outpath = new Path("/output");
        //输出目录是否存在  存在就删除
        if (fs.exists(outpath)) {
            fs.delete(outpath);
        }
        FileOutputFormat.setOutputPath(job, outpath);
        //校验mapreduce是否执行完
        boolean f = job.waitForCompletion(true);
        if (f) {
            System.out.println("任务执行完成!\n");
        }

    }

    public static class Mymapper extends Mapper<Object, Text, Text, IntWritable> {
        private IntWritable one = new IntWritable(1);

        public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context)
                throws IOException, InterruptedException {

            String[] words = StringUtils.split(value.toString(), '\t');
            for (String var : words) {
                context.write(new Text(var), one);
            }
        }
    }

    public static class MyReduce extends Reducer<Text, IntWritable, Text, IntWritable> {

        private IntWritable result = new IntWritable();

        public void reduce(Text key, Iterable<IntWritable> values,
                Reducer<Text, IntWritable, Text, IntWritable>.Context context)
                throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }

    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96

5.打包
将完成的代码打成JAR包

6.发送到hadoop namenode节点上

7.执行mapreduce
我们先查看需要分析的数据

执行命令
[root@zero239 test]# hadoop jar /opt/test/wc.jar mapreduce.xiaozhang.wc.wc

参数介绍: hadoop jar [jar目录] [Main方法类]
  • 1

8.查看分析结果
[root@zero239 test]# hadoop jar /opt/test/wc.jar mapreduce.xiaozhang.wc.wc
16/12/11 01:44:23 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform… using builtin-java classes where applicable
16/12/11 01:44:26 INFO client.RMProxy: Connecting to ResourceManager at zero239/172.19.4.239:8032
16/12/11 01:44:28 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
16/12/11 01:44:29 INFO input.FileInputFormat: Total input paths to process : 1
16/12/11 01:44:30 INFO mapreduce.JobSubmitter: number of splits:1
16/12/11 01:44:31 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1481429333693_0018
16/12/11 01:44:32 INFO impl.YarnClientImpl: Submitted application application_1481429333693_0018
16/12/11 01:44:32 INFO mapreduce.Job: The url to track the job: http://zero239:8088/proxy/application_1481429333693_0018/
16/12/11 01:44:32 INFO mapreduce.Job: Running job: job_1481429333693_0018
16/12/11 01:44:58 INFO mapreduce.Job: Job job_1481429333693_0018 running in uber mode : false
16/12/11 01:44:58 INFO mapreduce.Job: map 0% reduce 0%
16/12/11 01:45:08 INFO mapreduce.Job: map 100% reduce 0%
16/12/11 01:45:19 INFO mapreduce.Job: map 100% reduce 100%
16/12/11 01:45:20 INFO mapreduce.Job: Job job_1481429333693_0018 completed successfully
16/12/11 01:45:20 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=103
FILE: Number of bytes written=231397
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=163
HDFS: Number of bytes written=62
HDFS: Number of read operations=6
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Launched map tasks=1
Launched reduce tasks=1
Data-local map tasks=1
Total time spent by all maps in occupied slots (ms)=7600
Total time spent by all reduces in occupied slots (ms)=8729
Total time spent by all map tasks (ms)=7600
Total time spent by all reduce tasks (ms)=8729
Total vcore-seconds taken by all map tasks=7600
Total vcore-seconds taken by all reduce tasks=8729
Total megabyte-seconds taken by all map tasks=7782400
Total megabyte-seconds taken by all reduce tasks=8938496
Map-Reduce Framework
Map input records=5
Map output records=5
Map output bytes=87
Map output materialized bytes=103
Input split bytes=96
Combine input records=0
Combine output records=0
Reduce input groups=4
Reduce shuffle bytes=103
Reduce input records=5
Reduce output records=4
Spilled Records=10
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=506
CPU time spent (ms)=4300
Physical memory (bytes) snapshot=308449280
Virtual memory (bytes) snapshot=3017609216
Total committed heap usage (bytes)=163450880
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=67
File Output Format Counters
Bytes Written=62
任务执行完成!

执行命令查询结果
[root@zero239 test]# hadoop dfs -cat /output/part-r-00000
DEPRECATED: Use of this script to execute hdfs command is deprecated.
Instead use the hdfs command for it.

16/12/11 02:10:07 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
holle hadoop    2
holle mapreduce 1
holle zhangzhang        1
word cc 1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

原理分析

1、向client端提交MapReduce job.
  2、随后yarn的ResourceManager进行资源的分配.
  3、由NodeManager进行加载与监控containers.
  4、通过applicationMaster与ResourceManager进行资源的申请及状态的交互,由NodeManagers进行MapReduce运行时job的管理.
  5、通过hdfs进行job配置文件、jar包的各节点分发。

Job 提交过程
  job的提交通过调用submit()方法创建一个JobSubmitter实例,并调用submitJobInternal()方法。整个job的运行过程如下:
  1、向ResourceManager申请application ID,此ID为该MapReduce的jobId。
  2、检查output的路径是否正确,是否已经被创建。
  3、计算input的splits。
  4、拷贝运行job 需要的jar包、配置文件以及计算input的split 到各个节点。
  5、在ResourceManager中调用submitAppliction()方法,执行job
Job 初始化过程
  1、当resourceManager收到了submitApplication()方法的调用通知后,scheduler开始分配Container,随之ResouceManager发送applicationMaster进程,告知每个nodeManager管理器。
  2、由applicationMaster决定如何运行tasks,如果job数据量比较小,applicationMaster便选择将tasks运行在一个JVM中。那么如何判别这个job是大是小呢?当一个job的mappers数量小于10个,只有一个reducer或者读取的文件大小要小于一个HDFS block时,(可通过修改配置项mapreduce.job.ubertask.maxmaps,mapreduce.job.ubertask.maxreduces以及mapreduce.job.ubertask.maxbytes 进行调整)
  3、在运行tasks之前,applicationMaster将会调用setupJob()方法,随之创建output的输出路径(这就能够解释,不管你的mapreduce一开始是否报错,输出路径都会创建)
Task 任务分配
  1、接下来applicationMaster向ResourceManager请求containers用于执行map与reduce的tasks(step 8),这里map task的优先级要高于reduce task,当所有的map tasks结束后,随之进行sort(这里是shuffle过程后面再说),最后进行reduce task的开始。(这里有一点,当map tasks执行了百分之5%的时候,将会请求reduce,具体下面再总结)
  2、运行tasks的是需要消耗内存与CPU资源的,默认情况下,map和reduce的task资源分配为1024MB与一个核,(可修改运行的最小与最大参数配置,mapreduce.map.memory.mb,mapreduce.reduce.memory.mb,mapreduce.map.cpu.vcores,mapreduce.reduce.reduce.cpu.vcores.)
Task 任务执行
  1、这时一个task已经被ResourceManager分配到一个container中,由applicationMaster告知nodemanager启动container,这个task将会被一个主函数为YarnChild的Java application运行,但在运行task之前,首先定位task需要的jar包、配置文件以及加载在缓存中的文件。
  2、YarnChild运行于一个专属的JVM中,所以任何一个map或reduce任务出现问题,都不会影响整个nodemanager的crash或者hang。
  3、每个task都可以在相同的JVM task中完成,随之将完成的处理数据写入临时文件中。
Mapreduce数据流
运行进度与状态更新
  1、MapReduce是一个较长运行时间的批处理过程,可以是一小时、几小时甚至几天,那么Job的运行状态监控就非常重要。每个job以及每个task都有一个包含job(running,successfully completed,failed)的状态,以及value的计数器,状态信息及描述信息(描述信息一般都是在代码中加的打印信息),那么,这些信息是如何与客户端进行通信的呢?
  2、当一个task开始执行,它将会保持运行记录,记录task完成的比例,对于map的任务,将会记录其运行的百分比,对于reduce来说可能复杂点,但系统依旧会估计reduce的完成比例。当一个map或reduce任务执行时,子进程会持续每三秒钟与applicationMaster进行交互。
Job 完成
   最终,applicationMaster会收到一个job完成的通知,随后改变job的状态为successful。最终,applicationMaster与task containers被清空。

Shuffle与Sort
  从map到reduce的过程,被称之为shuffle过程,MapReduce使到reduce的数据一定是经过key的排序的,那么shuffle是如何运作的呢?
  当map任务将数据output时,不仅仅是将结果输出到磁盘,它是将其写入内存缓冲区域,并进行一些预分类。
  
  1、The Map Side
  首先map任务的output过程是一个环状的内存缓冲区,缓冲区的大小默认为100MB(可通过修改配置项mpareduce.task.io.sort.mb进行修改),当写入内存的大小到达一定比例,默认为80%(可通过mapreduce.map.sort.spill.percent配置项修改),便开始写入磁盘。
  在写入磁盘之前,线程将会指定数据写入与reduce相应的patitions中,最终传送给reduce.在每个partition中,后台线程将会在内存中进行Key的排序,(如果代码中有combiner方法,则会在output时就进行sort排序,这里,如果只有少于3个写入磁盘的文件,combiner将会在outputfile前启动,如果只有一个或两个,那么将不会调用)
  这里将map输出的结果进行压缩会大大减少磁盘IO与网络传输的开销(配置参数mapreduce.map .output.compress 设置为true,如果使用第三方压缩jar,可通过mapreduce.map.output.compress.codec进行设置)
   随后这些paritions输出文件将会通过HTTP发送至reducers,传送的最大启动线程通过mapreduce.shuffle.max.threads进行配置。
  2、The Reduce Side
  首先上面每个节点的map都将结果写入了本地磁盘中,现在reduce需要将map的结果通过集群拉取过来,这里要注意的是,需要等到所有map任务结束后,reduce才会对map的结果进行拷贝,由于reduce函数有少数几个复制线程,以至于它可以同时拉取多个map的输出结果。默认的为5个线程(可通过修改配置mapreduce.reduce.shuffle.parallelcopies来修改其个数)
  这里有个问题,那么reducers怎么知道从哪些机器拉取数据呢? 
  当所有map的任务结束后,applicationMaster通过心跳机制(heartbeat mechanism),由它知道mapping的输出结果与机器host,所以reducer会定时的通过一个线程访问applicationmaster请求map的输出结果。
  Map的结果将会被拷贝到reduce task的JVM的内存中(内存大小可在mapreduce.reduce.shuffle.input.buffer.percent中设置)如果不够用,则会写入磁盘。当内存缓冲区的大小到达一定比例时(可通过mapreduce.reduce.shuffle.merge.percent设置)或map的输出结果文件过多时(可通过配置mapreduce.reduce.merge.inmen.threshold),将会除法合并(merged)随之写入磁盘。
  这时要注意,所有的map结果这时都是被压缩过的,需要先在内存中进行解压缩,以便后续合并它们。(合并最终文件的数量可通过mapreduce.task.io.sort.factor进行配置) 最终reduce进行运算进行输出。
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/爱喝兽奶帝天荒/article/detail/851347
推荐阅读
  

闽ICP备14008679号