赞
踩
场景:
假设我们现在有两张表数据:商品表和订单表
订单表:订单编号 商品ID 购买数量001 01 2 002 01 1 003 02 1 004 03 2 005 04 1
- 1
- 2
- 3
- 4
- 5
商品表:商品ID 商品名称 商品单价
01 华为Mate40 5999 02 华为笔记本 6999 03 小米笔记本 3999
- 1
- 2
- 3
现在要求我们对两个表进行left join操作
解析思路:
逆向思考–>Reduce阶段需要让相同的商品ID为一组,也就是排序时需要按照商品ID进行排序
Map阶段:分别读取商品表信息和订单表信息(文件),排序顺序为商品ID、相同顺序下商品表信息在前,订单信息在后
Reduce阶段前:自定义分组按照商品ID进行分组
Reduce阶段:输出商品信息Join订单信息
Mapper输出key:ProductOrderBean
,val:NullWritable
(切忌:虽然看似是分组以商品ID分组,但是不可以用Text作为Mapper的输出Key,因为如果按照Text分组只能保证key是有序的,我们这里需要的是第一顺序是商品ID,第二顺序是商品信息在订单信息前面)
ProductOrderBean.java
package com.xbz.study.bigdata.hadoop.mr.reducejoin; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class ProductOrderBean implements WritableComparable<ProductOrderBean> { //商品信息 private String productId; private String productName; private Float price; //订单信息 private String orderId; private int number; private float totalPrice; @Override public int compareTo(ProductOrderBean o) { int productIdOrder = this.getProductId().compareTo(o.getProductId()); if(productIdOrder == 0 ){ //商品ID相同时,商品信息 return o.getProductName().compareTo(this.getProductName()); } return productIdOrder; } @Override public String toString() { return this.orderId + "\t" + this.productId +"\t" + this.productName+"\t" + this.number +"\t" + this.price +"\t" + this.totalPrice +"\t" ; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(productId); out.writeUTF(productName); out.writeFloat(price); out.writeUTF(orderId); out.writeInt(number); out.writeFloat(totalPrice); } @Override public void readFields(DataInput in) throws IOException { productId = in.readUTF(); productName = in.readUTF(); price = in.readFloat(); orderId = in.readUTF(); number = in.readInt(); totalPrice = in.readFloat(); } //setter getter省略 }
GroupingComparator.java/Reducer.java/Mapper.java/Driver.java
package com.xbz.study.bigdata.hadoop.mr.reducejoin; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; import java.util.Iterator; public class Driver { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(Driver.class); job.setMapperClass(ReduceJoinMapper.class); job.setReducerClass(ReduceJoinReducer.class); job.setMapOutputKeyClass(ProductOrderBean.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(ProductOrderBean.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.setInputPaths(job,new Path("D:\\hadoop-study\\reducejoin\\input")); FileOutputFormat.setOutputPath(job,new Path("D:\\hadoop-study\\reducejoin\\output")); job.setGroupingComparatorClass(ProductIdGroupingComparator.class); boolean b = job.waitForCompletion(true); System.exit( b ? 0 : 1); } private static class ReduceJoinMapper extends Mapper<LongWritable, Text,ProductOrderBean,NullWritable> { private FileSplit fs; private ProductOrderBean k = new ProductOrderBean(); @Override protected void setup(Context context) throws IOException, InterruptedException { fs = (FileSplit)context.getInputSplit(); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String name = fs.getPath().getName(); String oneLine = value.toString(); String[] s = oneLine.split(" "); if("production.txt".equals(name)){ k.setProductId(s[0]); k.setProductName(s[1]); k.setPrice(Float.valueOf(s[2])); //重点:将未用到的字段覆盖值 k.setOrderId(""); k.setNumber(0); }else if("order.txt".equals(name)){ k.setOrderId(s[0]); k.setProductId(s[1]); k.setNumber(Integer.valueOf(s[2])); //重点:将未用到的字段覆盖值 k.setProductName(""); k.setPrice(0f); } context.write(k, NullWritable.get()); } } private static class ReduceJoinReducer extends Reducer<ProductOrderBean,NullWritable,ProductOrderBean,NullWritable> { @Override protected void reduce(ProductOrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { String productName = key.getProductName(); Float price = key.getPrice(); Iterator<NullWritable> iterator = values.iterator(); while(iterator.hasNext()){ iterator.next(); if(StringUtils.isNotBlank(key.getOrderId())){ key.setProductName(productName); key.setPrice(price); key.setTotalPrice(key.getNumber()*price); context.write(key,NullWritable.get()); } } } } private static class ProductIdGroupingComparator extends WritableComparator { public ProductIdGroupingComparator(){ super(ProductOrderBean.class,true); } @Override public int compare(WritableComparable a, WritableComparable b) { ProductOrderBean pa = (ProductOrderBean) a; ProductOrderBean pb = (ProductOrderBean) b; return pa.getProductId().compareTo(pb.getProductId()); } } }
如果ReduceTask有多个,可能出现由于数据分布不均匀造成数据倾斜,主要是shuffle过程分区造成数据倾斜。数据倾斜只会发生在Reduce阶段,而不会是Map阶段,因为Map阶段多个任务主要是通过分片情况决定的。
针对数据倾斜问题,我们可以考虑使用MapJoin的方式进行处理,但是MapJoin的局限性在于只能处理大表和小表的join问题。
依旧是上面的案例
我们假设商品表是小表,我们在任务启动时,在小表数据文件,加载到任务的缓存文件,然后在mapper阶段完成Bean的输出,无需Reduce参与
Mapper.java/Driver.java
package com.xbz.study.bigdata.hadoop.mr.mapjoin; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.URI; import java.util.HashMap; public class Driver { public static void main(String[] args) throws Exception{ Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(Driver.class); job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(ProductOrderBean.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(ProductOrderBean.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.setInputPaths(job,new Path("D:\\hadoop-study\\mapjoin\\input\\order.txt")); FileOutputFormat.setOutputPath(job,new Path("D:\\hadoop-study\\mapjoin\\output")); //添加缓存文件 job.addCacheFile(URI.create("file:///D:/hadoop-study/mapjoin/input/production.txt")); boolean b = job.waitForCompletion(true); System.exit( b ? 0 : 1); } static class MyMapper extends Mapper<LongWritable, Text,ProductOrderBean, NullWritable>{ private HashMap<String,ProductOrderBean> productions = new HashMap<>(); private ProductOrderBean k = new ProductOrderBean(); @Override protected void setup(Context context) throws IOException, InterruptedException { URI[] cacheFiles = context.getCacheFiles(); URI orderFileUrl = cacheFiles[0]; FileSystem fileSystem = FileSystem.get(context.getConfiguration()); FSDataInputStream inputStream = fileSystem.open(new Path(orderFileUrl)); BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream)); String oneLine = null; while((oneLine = bufferedReader.readLine())!=null){ String[] s = oneLine.split(" "); ProductOrderBean bean = new ProductOrderBean(); bean.setProductId(s[0]); bean.setProductName(s[1]); bean.setPrice(Float.valueOf(s[2])); productions.put(bean.getProductId(),bean); } } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String oneLine = value.toString(); String[] s = oneLine.split(" "); k.setOrderId(s[0]); k.setProductId(s[1]); k.setNumber(Integer.valueOf(s[2])); ProductOrderBean productBean = productions.get(k.getProductId()); if(productBean != null){ k.setProductName(productBean.getProductName()); k.setPrice(productBean.getPrice()); k.setTotalPrice(k.getPrice()*k.getNumber()); }else{ //重点:将未用到的字段覆盖值 k.setProductName(""); k.setPrice(0f); k.setTotalPrice(0f); } context.write(k,NullWritable.get()); } } }
案例
如下给出一张分数表,要求计算出前三名的信息
重点:要尽快完成结果输出,比如总数据量为1亿数据量,输出前三程序停止运行
成绩单张三 60 李四 70 王五 90 赵六 75 孙七 88
- 1
- 2
- 3
- 4
- 5
低效率代码演示:
/** * 通过bean作为key,并实现WritableComparable,map-reduce阶段完成排序 */ @Data public class ScoreBean implements WritableComparable<ScoreBean> { private String name; private Float score; @Override public int compareTo(ScoreBean o) { return o.getScore().compareTo(this.getScore()); } @Override public void write(DataOutput out) throws IOException { out.writeUTF(name); out.writeFloat(score); } @Override public void readFields(DataInput in) throws IOException { name = in.readUTF(); score = in.readFloat(); } } //map阶段只做组装bean输出 public static class IMapper extends Mapper<LongWritable, Text,ScoreBean, NullWritable> { private ScoreBean k = new ScoreBean(); @Override protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException { String oneLine = value.toString(); String[] s = oneLine.split(" "); k.setName(s[0]); k.setScore(Float.valueOf(s[1])); context.write(k, NullWritable.get()); } } //reduce阶段看似只获取了前三个key,即TOPN-->no problem??? //实际上,假设map端输出1亿调数据,reduce的方法就会被循环执行1亿次,但是只有前N次是有效的。 public static class IReducer extends Reducer<ScoreBean,NullWritable,ScoreBean,NullWritable>{ private int i = 0; @Override protected void reduce(ScoreBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { if(i++<3){ context.write(key,NullWritable.get()); } } }
分析此过程:假设数据量很大,在Reduce阶段需要将全部数据迭代一次才能输出完成,实际我们只需要Top3,性能浪费。
解决思路:既然是Reduce循环造成,那么我们是否可以让他只做一次Reduce,即将所有数据看做是一组
改进版
不要忘记 job设置自定义分组
public static class IReducer2 extends Reducer<ScoreBean,NullWritable,ScoreBean,NullWritable>{ @Override protected void reduce(ScoreBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { Iterator<NullWritable> iterator = values.iterator(); for (int i = 0; i < 3; i++) { if(iterator.hasNext()){ iterator.next(); context.write(key,NullWritable.get()); } } } } public static class AllOneGroupingComparator extends WritableComparator{ public AllOneGroupingComparator(){ super(ScoreBean.class,true); } @Override public int compare(WritableComparable a, WritableComparable b) { return 0; } }
//设置自定义分组
job.setGroupingComparatorClass(AllOneGroupingComparator.class);
案例
解析文件单词生成倒排索引
a.txthello world every body like bigdata but me only like money
- 1
- 2
- 3
b.txt
hello everyone hello ladies and gentlemen ladies are beautiful
- 1
- 2
- 3
最终我们需要生成的文件内容
[单词]-><[文件名],[单词出现次数]>
例如:
hello-><a.txt,1> <b.txt,2>
解析
可以将其分解为两个MR
首先第一个MR对每个文件进行单词统计
key:文件名+单词
val:单词数量
第二步MR
Mapper将上一个key进行拆分,成为
key:单词
val:文件名+单词出现次数
Reducer按照Key进行汇总结果
全量代码
package com.xbz.study.bigdata.hadoop.mr.reverseindex; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import java.io.IOException; import java.util.Iterator; import java.util.concurrent.atomic.AtomicInteger; public class Driver { public static void main(String[] args) throws Exception { //第一个Job 负责统计每个文件单词出现数量 Configuration conf = new Configuration(); Job job1 = Job.getInstance(conf); job1.setJarByClass(Driver.class); job1.setMapperClass(FirstMapper.class); job1.setReducerClass(FirstReducer.class); job1.setMapOutputKeyClass(Text.class); job1.setMapOutputValueClass(IntWritable.class); job1.setOutputKeyClass(Text.class); job1.setOutputValueClass(IntWritable.class); FileInputFormat.setInputPaths(job1,new Path("D:\\hadoop-study\\reverseindex\\input")); FileOutputFormat.setOutputPath(job1,new Path("D:\\hadoop-study\\reverseindex\\output")); //设置job输出 job1.setOutputFormatClass(SequenceFileOutputFormat.class); boolean b = job1.waitForCompletion(true); if(!b){ System.err.println("第一步执行错误"); System.exit(1); return; } //第二个Job 负责将单词和文件进行切分并统计最终结果 Job job2 = Job.getInstance(conf); job2.setJarByClass(Driver.class); job2.setMapperClass(SecondMapper.class); job2.setReducerClass(SecondReducer.class); job2.setMapOutputKeyClass(Text.class); job2.setMapOutputValueClass(Text.class); job2.setOutputKeyClass(Text.class); job2.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(job2,new Path("D:\\hadoop-study\\reverseindex\\output")); FileOutputFormat.setOutputPath(job2,new Path("D:\\hadoop-study\\reverseindex\\output2")); //设置job输入格式 job2.setInputFormatClass(SequenceFileInputFormat.class); boolean b2 = job2.waitForCompletion(true); if(!b2){ System.err.println("第二步执行错误"); } System.exit(b2 ? 0 : 1); } static class FirstMapper extends Mapper<LongWritable, Text,Text, IntWritable>{ private FileSplit fs; private Text k = new Text(); private IntWritable v = new IntWritable(1); @Override protected void setup(Context context) throws IOException, InterruptedException { fs = (FileSplit) context.getInputSplit(); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String fileName = fs.getPath().getName(); String oneLine = value.toString(); String[] split = oneLine.split("\\s+"); for (String word : split) { k.set(fileName+"--"+ word); context.write(k,v); } } } static class FirstReducerextends Reducer<Text,IntWritable,Text,IntWritable>{ private IntWritable v = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { AtomicInteger count = new AtomicInteger(); values.forEach((i->{ count.addAndGet(i.get()); })); v.set(count.get()); context.write(key,v); } } private static class SecondMapper extends Mapper<Text,IntWritable,Text,Text> { private Text k = new Text(); private Text v = new Text(); @Override protected void map(Text key, IntWritable value, Context context) throws IOException, InterruptedException { String keyStr = key.toString(); String[] split = keyStr.split("--"); k.set(split[1]); v.set(split[0]+","+value.get()); context.write(k,v); } } private static class SecondReducer extends Reducer<Text,Text,Text,Text> { private Text v = new Text(); private StringBuilder sb = new StringBuilder(); @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { sb.delete(0,sb.length()); Iterator<Text> iterator = values.iterator(); while(iterator.hasNext()){ Text next = iterator.next(); sb.append("<").append(next.toString()).append("> "); } v.set(sb.toString()); context.write(key,v); } } }
假设数据
A->C,U,H,I,J,K,O B->A,D,G,H,I,K,L,M,N,P,Q,R,S,T,U,V,X,Z C->L D->Q,R,C,S,F,Z,L E->Q,B,S,T,U,W,X,H,Y,N,O F->A,G,H,Z,M,N G->P,A,S,E,W,K,M,N I->D,V,K,N J->A,B,E,F,G,H,L,M,N,O,Q,S,T,U,W,X,Y,Z K->B,D,E,F,H,J,L,M,O,R,U,W,X,Z L->Q,A,Z,J,K M->B,G,H,I,J,K,L,N,P,Q,R,S,T,V,W,X,Z N->B,C,D,F,H,J,K,L,M,O,P,Q,S,T,X,Z O->C,D,E,F,G,H,K,L,P,Q,V,W,X,Y,Z P->A,C,E,G,H,I,J,K,L,N,R,V,X Q->P,R,D,T,U,W,H,I,J,L,N,O R->P,Q,S,F,V,H,I,M S->Q T->D,F,G,I,L,M,N,Q,R,S,U,W,Z U->P,B,C,T,E,F,H,X,K,M,N V->A,C,E,H,J,K,M,R,T,U,W,X,Y W->B,C,D,I,J,K,L,M,N,O,R,S,X,Y Y->B,R,C,D,T,E,X,I,K,N Z->Q,A,T,U,E,F,V,G,I,L
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
解析思路:
首先我们将关系反向梳理
比如:A->B,C,D,是正向的,用户A的好友有B,C,D。那么谁把B作为他的好友这样就是反向的。
第一步:我们先找出每个人所有的被好友的数据,即
Map:key->好友,val->当前用户
Reduce:key:好友,val:所有将key作为好友的集合
第二步:以第一步结果为输入,迭代val(内嵌循环),输出key:用户1-用户2(此处用户1,2需要按照一定的规则固定顺序),val:好友(即第一步的key)
Map:用户1-用户2
Reduce:用户1和用户2的共同好友的集合
package com.xbz.study.bigdata.hadoop.mr.friends; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import java.io.IOException; import java.util.Iterator; public class Driver { public static void main(String[] args) throws Exception { //第一个Job Configuration conf = new Configuration(); Job job1 = Job.getInstance(conf); job1.setJarByClass(Driver.class); job1.setMapperClass(FirstMapper.class); job1.setReducerClass(FirstReducer.class); job1.setMapOutputKeyClass(Text.class); job1.setMapOutputValueClass(Text.class); job1.setOutputKeyClass(Text.class); job1.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(job1,new Path("D:\\hadoop-study\\friends\\input")); FileOutputFormat.setOutputPath(job1,new Path("D:\\hadoop-study\\friends\\output")); //设置job输出 job1.setOutputFormatClass(SequenceFileOutputFormat.class); boolean b = job1.waitForCompletion(true); if(!b){ System.err.println("第一步执行错误"); System.exit(1); return; } //第二个Job 负责将单词和文件进行切分并统计最终结果 Job job2 = Job.getInstance(conf); job2.setJarByClass(Driver.class); job2.setMapperClass(SecondMapper.class); job2.setReducerClass(SecondReducer.class); job2.setMapOutputKeyClass(Text.class); job2.setMapOutputValueClass(Text.class); job2.setOutputKeyClass(Text.class); job2.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(job2,new Path("D:\\hadoop-study\\friends\\output")); FileOutputFormat.setOutputPath(job2,new Path("D:\\hadoop-study\\friends\\output2")); //设置job输入格式 job2.setInputFormatClass(SequenceFileInputFormat.class); boolean b2 = job2.waitForCompletion(true); if(!b2){ System.err.println("第二步执行错误"); } System.exit(b2 ? 0 : 1); } static class FirstMapper extends Mapper<LongWritable, Text,Text, Text> { private Text k = new Text(); private Text v = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String oneLine = value.toString(); String[] split = oneLine.split("->"); String[] split1 = split[1].split(","); for (String friend :split1) { k.set(friend); v.set(split[0]); context.write(k,v); } } } static class FirstReducer extends Reducer<Text,Text, Text,Text> { private Text v = new Text(); private StringBuilder sb = new StringBuilder(); @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { sb.delete(0,sb.length()); Iterator<Text> iterator = values.iterator(); while(iterator.hasNext()){ Text next = iterator.next(); sb.append(next.toString()+" "); } v.set(sb.toString()); context.write(key,v); } } private static class SecondMapper extends Mapper<Text,Text,Text,Text> { private Text k = new Text(); @Override protected void map(Text key, Text value, Context context) throws IOException, InterruptedException { String oneLine = value.toString(); String[] s = oneLine.split(" "); String userA = null; String userB = null; for (int i = 0; i < s.length; i++) { userA = s[i]; for (int j = i+1; j < s.length; j++) { userB = s[j]; if(userA.compareTo(userB)>0){ k.set("<"+userB + ","+userA+">"); }else{ k.set("<"+userA + ","+userB+">"); } context.write(k,key); } } } } private static class SecondReducer extends Reducer<Text,Text,Text,Text> { private Text v = new Text(); private StringBuilder sb = new StringBuilder(); @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { sb.delete(0,sb.length()); values.forEach((value)->{ sb.append(value).append(" "); }); v.set(sb.toString()); context.write(key,v); } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。