赞
踩
本文主要是学习MapReduce的学习笔记,对所学内容进行记录。
实验环境:
1.Linux Ubuntu 16.04
2.hadoop3.0.0
3.eclipse4.5.1
cd /apps/hadoop/sbin
./start-all.sh
打开eclipse->Window->Preferences;
选择Hadoop Map/Reduce,选择Hadoop包根目录,/apps/hadoop
,点击Apply,点击OK;
点击window–>show view–>other–>mapreduce tools–>map/reduce locations,之后页面会出现对应的标签页;
点击3中图标1,在Local name输入myhadoop,在DFS Master 框下Port输入8020,点击Finish,出现3中右侧页面;
点击3中
图标2,选择下图内容,出现第3步图中左侧内容
完成环境配置环境。
package sort;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class Sort {
public static class IntSortMapper extends Mapper<Object, Text, IntWritable, NullWritable>{
private IntWritable val = new IntWritable();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException{
String line = value.toString().trim();
val.set(Integer.parseInt(line));
context.write(val, NullWritable.get());
}
}
public static class IntSortReducer extends Reducer<IntWritable, NullWritable, IntWritable,IntWritable>{
private IntWritable k = new IntWritable();
public void reduce(IntWritable key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException{
k.set(1);
for (NullWritable value : values) {
context.write(k, key);
}
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
String dir_in = "hdfs://localhost:8020/sort/input";
String dir_out = "hdfs://localhost:8020/sort/output";
Path in = new Path(dir_in);
Path out = new Path(dir_out);
Configuration conf = new Configuration();
Job sortJob = new Job(conf, "my_sort");
sortJob.setJarByClass(Sort.class);
sortJob.setInputFormatClass(TextInputFormat.class);
sortJob.setMapperClass(IntSortMapper.class);
sortJob.setMapOutputKeyClass(IntWritable.class);
sortJob.setMapOutputValueClass(NullWritable.class);
FileInputFormat.addInputPath(sortJob, in);
sortJob.setReducerClass(IntSortReducer.class);
sortJob.setNumReduceTasks(1);
sortJob.setOutputKeyClass(IntWritable.class);
sortJob.setOutputValueClass(IntWritable.class);
FileOutputFormat.setOutputPath(sortJob, out);
sortJob.waitForCompletion(true);
}
}
cp /apps/hadoop/etc/hadoop/{core-site.xml,hdfs-site.xml,log4j.properties} /home/dolphin/workspace/test/src
cd /apps
,新建三个txt,如下。file1.txt
2
32
654
32
15
756
65223
file2.txt
5956
22
650
92
file3.txt
26
54
6
/sort/input
hadoop fs -put file1.txt /sort/input
hadoop fs -put file2.txt /sort/input
hadoop fs -put file3.txt /sort/input
IntPair.java
package sort;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class IntPair implements WritableComparable<IntPair>{
private int first;
private int second;
public IntPair(){
}
public IntPair(int left, int right){
set(left, right);
}
public void set(int left, int right){
first = left;
second = right;
}
@Override
public void readFields(DataInput in) throws IOException{
first = in.readInt();
second = in.readInt();
}
@Override
public void write(DataOutput out) throws IOException{
out.writeInt(first);
out.writeInt(second);
}
@Override
public int compareTo(IntPair o)
{
if (first != o.first){
return first < o.first ? -1 : 1;
}else if (second != o.second){
return second < o.second ? -1 : 1;
}else{
return 0;
}
}
@Override
public int hashCode(){
return first * 157 + second;
}
@Override
public boolean equals(Object right){
if (right == null)
return false;
if (this == right)
return true;
if (right instanceof IntPair){
IntPair r = (IntPair) right;
return r.first == first && r.second == second;
}else{
return false;
}
}
public int getFirst(){
return first;
}
public int getSecond(){
return second;
}
}
Rank.java
package sort;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
@SuppressWarnings("deprecation")
public class Rank {
public static class Map extends Mapper<LongWritable, Text, IntPair, IntWritable> {
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
int left = 0;
int right = 0;
if (tokenizer.hasMoreTokens()) {
left = Integer.parseInt(tokenizer.nextToken());
if (tokenizer.hasMoreTokens())
right = Integer.parseInt(tokenizer.nextToken());
context.write(new IntPair(left, right), new IntWritable(right));
}
}
}
public static class FirstPartitioner extends Partitioner<IntPair, IntWritable>{
@Override
public int getPartition(IntPair key, IntWritable value,int numPartitions){
return Math.abs(key.getFirst() * 127) % numPartitions;
}
}
@SuppressWarnings("rawtypes")
public static class GroupingComparator extends WritableComparator{
protected GroupingComparator(){
super(IntPair.class, true);
}
@Override
public int compare(WritableComparable w1, WritableComparable w2){
IntPair ip1 = (IntPair) w1;
IntPair ip2 = (IntPair) w2;
int l = ip1.getFirst();
int r = ip2.getFirst();
return l == r ? 0 : (l < r ? -1 : 1);
}
}
public static class Reduce extends Reducer<IntPair, IntWritable, Text, IntWritable> {
public void reduce(IntPair key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
for (IntWritable val : values) {
context.write(new Text(Integer.toString(key.getFirst())), val);
}
}
}
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
Job job = new Job(conf, "secondarysort");
job.setJarByClass(Rank.class);
FileInputFormat.setInputPaths(job, new Path("hdfs://localhost:8020/rank/input"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:8020/rank/output"));
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setPartitionerClass(FirstPartitioner.class);
job.setGroupingComparatorClass(GroupingComparator.class);
job.setMapOutputKeyClass(IntPair.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
40 20
40 10
40 30
40 5
30 30
30 20
30 10
30 40
50 20
50 50
50 10
50 60
hadoop fs -mkdir -p /rank/input
;hadoop fs -put /apps/rank /rank/input
;package sort;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class Rank1 {
public static class InvertedIndexMapper extends Mapper<Object, Text, Text, Text>{
private Text keyInfo = new Text();
private Text valueInfo = new Text();
private FileSplit split;
@Override
protected void map(Object key, Text value, Mapper<Object, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
split = (FileSplit) context.getInputSplit();
System.out.println("O??"+key);
System.out.println("<"+value);
StringTokenizer itr = new StringTokenizer( value.toString());
while( itr.hasMoreTokens() ){
keyInfo.set( itr.nextToken()+":"+split.getPath().toString());
valueInfo.set("1");
context.write(keyInfo, valueInfo);
}
System.out.println("key"+keyInfo);
System.out.println("value"+valueInfo);
}
}
public static class InvertedIndexCombiner extends Reducer<Text, Text, Text, Text>{
private Text info = new Text();
@Override
protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
int sum = 0;
for (Text value : values) {
sum += Integer.parseInt(value.toString() );
}
int splitIndex = key.toString().indexOf(":");
info.set( key.toString().substring( splitIndex + 1) +":"+sum );
key.set( key.toString().substring(0,splitIndex));
context.write(key, info);
System.out.println("key"+key);
System.out.println("value"+info);
}
}
public static class InvertedIndexReducer extends Reducer<Text, Text, Text, Text>{
private Text result = new Text();
@Override
protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
String fileList = new String();
for (Text value : values) {
fileList += value.toString()+";";
}
result.set(fileList);
context.write(key, result);
}
}
public static void main(String[] args) {
try {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf,"InvertedIndex");
job.setJarByClass(Rank1.class);
job.setMapperClass(InvertedIndexMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setCombinerClass(InvertedIndexCombiner.class);
job.setReducerClass(InvertedIndexReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path("hdfs://localhost:8020/rank1/input"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:8020/rank1/output"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
} catch (IllegalStateException e) {
e.printStackTrace();
} catch (IllegalArgumentException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
text1.txt
I Love Hadoop
I like ZhouSiYuan
I love me
text2.txt
I Love MapReduce
I like NBA
I love Hadoop
hadoop fs -mkdir -p /rank1/input
;hadoop fs -put /apps/text1 /rank1/input
hadoop fs -put /apps/text2 /rank1/input
在过程上,排序并没有太大难点,关键在于利用MapReduce排序的算法实现以及Hadoop提供的方法的使用。建议做完实验后认真研究代码,做好自己的注释,便于理解和记忆。
如果有不尽完善的地方,欢迎交流~
学习平台:海豚大数据实验室
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。