当前位置:   article > 正文

MapReduce的编程开发——排序_mapreduce实验:二次排序 实验任务书

mapreduce实验:二次排序 实验任务书


前言

本文主要是学习MapReduce的学习笔记,对所学内容进行记录。
实验环境:
1.Linux Ubuntu 16.04

2.hadoop3.0.0

3.eclipse4.5.1


一、启动Hadoop

  1. 进入Hadoop启动目录cd /apps/hadoop/sbin
  2. 启动Hadoop./start-all.sh
  3. 输入‘jps’,启动后显示如下信息
    在这里插入图片描述

二、环境搭配

  1. 打开eclipse->Window->Preferences;

  2. 选择Hadoop Map/Reduce,选择Hadoop包根目录,/apps/hadoop,点击Apply,点击OK;

  3. 点击window–>show view–>other–>mapreduce tools–>map/reduce locations,之后页面会出现对应的标签页;
    界面

  4. 点击3中图标1,在Local name输入myhadoop,在DFS Master 框下Port输入8020,点击Finish,出现3中右侧页面;
    在这里插入图片描述

  5. 点击3中

  6. 图标2,选择下图内容,出现第3步图中左侧内容
    在这里插入图片描述
    完成环境配置环境。

三、普通排序实验

  1. 新建项目,file–>new–>project,Map/ Reduce Project项目,创建工程名为test,新建包sort,新建Class类Sort,排序代码如下;
package sort;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class Sort {
    public static class IntSortMapper extends Mapper<Object, Text, IntWritable, NullWritable>{        
        private IntWritable val = new IntWritable();        
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException{
            String line = value.toString().trim();
            val.set(Integer.parseInt(line));
            context.write(val, NullWritable.get());
        }
    }   
    public static class IntSortReducer extends Reducer<IntWritable, NullWritable, IntWritable,IntWritable>{
        private IntWritable k = new IntWritable();
        public void reduce(IntWritable key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException{
            k.set(1);
            for (NullWritable value : values) {
                context.write(k, key);
            }
        }
    }    
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        String dir_in = "hdfs://localhost:8020/sort/input";
        String dir_out = "hdfs://localhost:8020/sort/output";
        Path in = new Path(dir_in);
        Path out = new Path(dir_out);        
        Configuration conf = new Configuration();
        Job sortJob = new Job(conf, "my_sort");
        sortJob.setJarByClass(Sort.class);
        sortJob.setInputFormatClass(TextInputFormat.class);
        sortJob.setMapperClass(IntSortMapper.class);
        sortJob.setMapOutputKeyClass(IntWritable.class);
        sortJob.setMapOutputValueClass(NullWritable.class);
        FileInputFormat.addInputPath(sortJob, in);
        sortJob.setReducerClass(IntSortReducer.class);
        sortJob.setNumReduceTasks(1);
        sortJob.setOutputKeyClass(IntWritable.class);
        sortJob.setOutputValueClass(IntWritable.class);
        FileOutputFormat.setOutputPath(sortJob, out);
        sortJob.waitForCompletion(true);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  1. 配置代码运行所需文件,在控制台将Hadoop相关文件复制到项目当中的src包下cp /apps/hadoop/etc/hadoop/{core-site.xml,hdfs-site.xml,log4j.properties} /home/dolphin/workspace/test/src
  2. 创建并上传程序的输入文件,进入apps目录,cd /apps,新建三个txt,如下。
file1.txt
2
32
654
32
15
756
65223
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
file2.txt
5956
22
650
92
  • 1
  • 2
  • 3
  • 4
  • 5
file3.txt
26
54
6
  • 1
  • 2
  • 3
  • 4
  1. 本地创建完成后,通过eclipse在Hadoop上创建/sort/input/路径,存放输入文件。在DFS locations下的myhadoop下的文件标志上右击,点击Create new directory,创建新路径/sort/input
    在这里插入图片描述
  2. 上传三个文件
hadoop fs -put file1.txt /sort/input
hadoop fs -put file2.txt /sort/input
hadoop fs -put file3.txt /sort/input
  • 1
  • 2
  • 3
  1. 执行程序
    在eclipse中打开Sort.java类文件,右击界面,依次点击Run As–>Java Application,运行程序,再次刷新文件目录,如下图,排序成功。
    在这里插入图片描述

四、二次排序实验

  1. 在sort包下创建IntPair类和Rank类,类的实现代码如下:

IntPair.java

package sort;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class IntPair implements WritableComparable<IntPair>{
    private int first;
    private int second;
    
    public IntPair(){
    }    
    public IntPair(int left, int right){
        set(left, right);
    }    
    public void set(int left, int right){
        first = left;
        second = right;
    }    
    @Override
    public void readFields(DataInput in) throws IOException{
        first = in.readInt();
        second = in.readInt();
    }   
    @Override
    public void write(DataOutput out) throws IOException{
        out.writeInt(first);
        out.writeInt(second);
    }    
    @Override
    public int compareTo(IntPair o)
    {
        if (first != o.first){
            return first < o.first ? -1 : 1;
        }else if (second != o.second){
            return second < o.second ? -1 : 1;
        }else{
            return 0;
        }
    }    
    @Override
    public int hashCode(){
        return first * 157 + second;
    }   
    @Override
    public boolean equals(Object right){
        if (right == null)
            return false;
        if (this == right)
            return true;
        if (right instanceof IntPair){
            IntPair r = (IntPair) right;
            return r.first == first && r.second == second;
        }else{
            return false;
        }
    }    
    public int getFirst(){
        return first;
    }   
    public int getSecond(){
        return second;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63

Rank.java

package sort;

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
@SuppressWarnings("deprecation")
public class Rank {
    public static class Map extends Mapper<LongWritable, Text, IntPair, IntWritable> {
        
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            StringTokenizer tokenizer = new StringTokenizer(line);
            int left = 0;
            int right = 0;
            if (tokenizer.hasMoreTokens()) {
                left = Integer.parseInt(tokenizer.nextToken());
                if (tokenizer.hasMoreTokens())
                    right = Integer.parseInt(tokenizer.nextToken());
                context.write(new IntPair(left, right), new IntWritable(right));
            }
        }
    }
    public static class FirstPartitioner extends Partitioner<IntPair, IntWritable>{
        @Override
        public int getPartition(IntPair key, IntWritable value,int numPartitions){
            return Math.abs(key.getFirst() * 127) % numPartitions;
        }
    }
    @SuppressWarnings("rawtypes")
    public static class GroupingComparator extends WritableComparator{
        protected GroupingComparator(){
            super(IntPair.class, true);
        }
        
        @Override
        public int compare(WritableComparable w1, WritableComparable w2){
            IntPair ip1 = (IntPair) w1;
            IntPair ip2 = (IntPair) w2;
            int l = ip1.getFirst();
            int r = ip2.getFirst();
            return l == r ? 0 : (l < r ? -1 : 1);
        }
    }
    
    public static class Reduce extends Reducer<IntPair, IntWritable, Text, IntWritable> {
        
        public void reduce(IntPair key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            for (IntWritable val : values) {
                context.write(new Text(Integer.toString(key.getFirst())), val);
            }
        }
    }
    
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        Configuration conf = new Configuration();       
        Job job = new Job(conf, "secondarysort");
        job.setJarByClass(Rank.class);
        FileInputFormat.setInputPaths(job, new Path("hdfs://localhost:8020/rank/input"));
        FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:8020/rank/output"));
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);
        job.setPartitionerClass(FirstPartitioner.class);
        job.setGroupingComparatorClass(GroupingComparator.class);
        job.setMapOutputKeyClass(IntPair.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  1. 在apps目录下创建rank文件作为输入文件,并在文件中写入数据,保存
40 20
40 10
40 30
40 5
30 30
30 20
30 10
30 40
50 20
50 50
50 10
50 60
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  1. 在Hadoop上创建输入数据存放目录hadoop fs -mkdir -p /rank/input
  2. 将本地的输入数据上传到Hadoop上hadoop fs -put /apps/rank /rank/input
  3. 运行Rank.java文件,得到二次排序结果如下图所示,形成如图左侧文件目录
    在这里插入图片描述

五、倒序索引实验

  1. 创建排序类Rank1.java,编辑并保存如下代码:
package sort;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class Rank1 {
    public static class InvertedIndexMapper extends Mapper<Object, Text, Text, Text>{

        private Text keyInfo = new Text(); 
        private Text valueInfo = new Text(); 
        private FileSplit split; 
        @Override
        protected void map(Object key, Text value, Mapper<Object, Text, Text, Text>.Context context)
                throws IOException, InterruptedException {
            split = (FileSplit) context.getInputSplit();
            System.out.println("O??"+key);
            System.out.println("<"+value);
            StringTokenizer itr = new StringTokenizer( value.toString());
            while( itr.hasMoreTokens() ){
                keyInfo.set( itr.nextToken()+":"+split.getPath().toString());
                valueInfo.set("1");
                context.write(keyInfo, valueInfo);
            }
            System.out.println("key"+keyInfo);
            System.out.println("value"+valueInfo);
        }
    }
    public static class InvertedIndexCombiner extends Reducer<Text, Text, Text, Text>{
        private Text info = new Text();
        @Override
        protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
                throws IOException, InterruptedException {
            int sum = 0;
            for (Text value : values) {
                sum += Integer.parseInt(value.toString() );
            }

            int splitIndex = key.toString().indexOf(":");
            info.set( key.toString().substring( splitIndex + 1) +":"+sum );
            key.set( key.toString().substring(0,splitIndex));
            context.write(key, info);
            System.out.println("key"+key);
            System.out.println("value"+info);
        }
    }
    public static class InvertedIndexReducer extends Reducer<Text, Text, Text, Text>{

        private Text result = new Text();

        @Override
        protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
                throws IOException, InterruptedException {
            String fileList = new String();
            for (Text value : values) {
                fileList += value.toString()+";";
            }
            result.set(fileList);

            context.write(key, result);
        }

    }

    public static void main(String[] args) {
        try {
            Configuration conf = new Configuration();

            Job job = Job.getInstance(conf,"InvertedIndex");
            job.setJarByClass(Rank1.class);
            job.setMapperClass(InvertedIndexMapper.class);

            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);

            job.setCombinerClass(InvertedIndexCombiner.class);
            job.setReducerClass(InvertedIndexReducer.class);

            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            FileInputFormat.addInputPath(job, new Path("hdfs://localhost:8020/rank1/input"));
            FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:8020/rank1/output"));
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        } catch (IllegalStateException e) {
            e.printStackTrace();
        } catch (IllegalArgumentException e) {
            e.printStackTrace();
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  1. 在app文件夹下创建数据文件text1.txt和text2.txt,内容如下:
text1.txt
I Love Hadoop
I like ZhouSiYuan
I love me
  • 1
  • 2
  • 3
  • 4
text2.txt
I Love MapReduce
I like NBA
I love Hadoop
  • 1
  • 2
  • 3
  • 4
  1. 在Hadoop上创建输入数据存放目录hadoop fs -mkdir -p /rank1/input
  2. 将数据文件上传到hadoop上
hadoop fs -put /apps/text1 /rank1/input
hadoop fs -put /apps/text2 /rank1/input
  • 1
  • 2
  1. 执行Rank1.java文件,结果如下图所示:
    在这里插入图片描述

总结

在过程上,排序并没有太大难点,关键在于利用MapReduce排序的算法实现以及Hadoop提供的方法的使用。建议做完实验后认真研究代码,做好自己的注释,便于理解和记忆。
如果有不尽完善的地方,欢迎交流~

学习平台:海豚大数据实验室

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/747989
推荐阅读
  

闽ICP备14008679号