赞
踩
相信大家都有用过QQ, QQ会不定期的向你推荐些可能认识的人(显示你们有多少个共同好友)。
那么它的数据是哪里来的呢?实际上是每个人好友列表数据分析出来的。
比如说 A 的好友列表中有 B C, B的好友列表有 C D E ,那么系统可能会给A推荐 D E ,这样才是合理的方案。
对于QQ用户来说,用户群是非常庞大的,分析每个人好友列表,用文本保存,假设保存的信息是如下图:
用户ID | 好友列表 | 其他数据 |
---|---|---|
A | B C D … | … |
看下QQ的注册人数:
8亿人,假设一个人的好友列表占用10kB 处理下来数据量约是 8T,显然用普通的处理方式,存储都是一个问题,更别说数据的处理了。
这个时候我们就完全可以用HDFS和MapReduce去处理这些数据。
这里我准备了一点数据,如下
ID 好友列表
--------------------------
tom cat hadoop hello
hello mr tom world hive
cat tom hive
hive cat hadoop world hello mr
mr hive hello
hadoop tom hive world
world hadoop hive hello
第一步: Mapper
分析每行数据,拿第一行举例,tom和好友列表的每一个都是属于直接好友,而tom的好友列表两两组合都属于 非直接好友关系,但是都有一个共同的好友 tom 。
如果 用 <tom-cat,0> 表示直接好友关系 <cat-hadoop,1> 表示二层关系,这样我们筛选出value为1的就是我们需要的数据,
思考一个问题,<tom-cat,0> <cat-tom,0> 这样Key是属于两个,这里我们代码用的是自定义Key去解决。
第二步:Reduce
遍历数据,values中含有0的代表,说明是直接好友,抛弃数据,留下values是1的即可,汇总。
到这,我们把需要的数据抽取出来了,但是相对<tom-cat,1> 系统可以向tom 推荐cat 系统也可以向 cat 推荐 tom,而且系统的推荐也是根据共同的数量来优先推荐的
所以这里,我们需要有第二个MapReduce程序 ,来对数据进行排序和整合
第三步:Mapper
自定义排序类,实现排序
第四步:Reduce
输出数据即可
MapStep01.java
package com.shsxt.mr.friend; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.hsqldb.lib.StringUtil; public class MapStep01 extends Mapper<LongWritable, Text, FoF, IntWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String lines = value.toString(); //我这里为了方便,单词和单词之间分隔符都是用 " " 展示数据时,是为了大家方便观看 String [] friends = StringUtil.split(lines, " "); for (int i = 1; i < friends.length; i++) { String friend = friends[i]; //对于每一行,第一个数据和好友列表任意组合都是直接好友,Value=0 表示 context.write(new FoF(friend,friends[0]), new IntWritable(0)); //对于第二个数据和其后面的组合都是非直接好友 value=1表示 for (int j = i+1; j < friends.length; j++) { String fri = friends[j]; //写出的Key 是自定义类,下面贴出代码 context.write(new FoF(fri, friend), new IntWritable(1)); } } } }
FoF.java
package com.shsxt.mr.friend; import org.apache.hadoop.io.Text; //继承Text,Text实现了WritableComparable,省写序列化代码了 public class FoF extends Text { public FoF() { super(); } public FoF(String friend01,String friend02){ set(getOf(friend01,friend02)); } //防止相同的数据组合造成数据的重复 如 cat-tomcat 和tomcat-cat private String getOf(String friend01, String friend02) { int c = friend01.compareTo(friend02); if (c>0) { return friend02 + "\t" + friend01; } return friend01+"\t"+friend02; } }
ReduceStep01.java
package com.shsxt.mr.friend; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import org.hsqldb.lib.StringUtil; public class ReduceStep01 extends Reducer<FoF, IntWritable, Text, NullWritable> { @Override protected void reduce(FoF key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; boolean flag = true; //到这边的数据类型 cat hadoop {0,0,1,1} for (IntWritable value : values) { //只要value中含有0 说明是直接好友,跳过本次循环 if (value.get()==0) { flag = false; break; } //统计共同好友的个数 sum+=value.get(); } if (flag) { //把数据全部放到了key中,方便后续操作 String msg = StringUtil.split(key.toString(), "\t")[0]+" "+ StringUtil.split(key.toString(), "\t")[1]+" "+sum; context.write(new Text(msg), NullWritable.get()); } } }
MapStep02.java
package com.shsxt.mr.friend; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class MapStep02 extends Mapper<LongWritable, Text, FriendSort, IntWritable>{ @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { //数据的处理 String lines = value.toString(); String friend01 = lines.split(" ")[0]; String friend02 = lines.split(" ")[1]; int hot = Integer.parseInt(lines.split(" ")[2]); System.out.println(friend01+" "+friend02+" "+hot); System.out.println(friend02+" "+friend01+" "+hot); //自定义了类,对数据进行排序,下面贴出代码 context.write(new FriendSort(friend01, friend02, hot), new IntWritable(hot)); context.write(new FriendSort(friend02, friend01, hot), new IntWritable(hot)); } }
FriendSort.java
package com.shsxt.mr.friend; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; public class FriendSort implements WritableComparable<FriendSort>{ private String friend01; private String friend02; private int hot; //自定义排序方法 @Override public int compareTo(FriendSort friend) { int a = friend01.compareTo(friend.getFriend01()); int b = friend02.compareTo(friend.getFriend02()); //对每个数据进行字典排序-> 字典排序默认是 升序排序 if (a==0) { if (b==0) { //当前面的数据相同,根据相同好友个数--> 倒叙排序 return -Integer.compare(hot, friend.getHot()); }else { return b; } } return a; } //序列化,hadoop中的序列化机制和java不一样 @Override public void write(DataOutput out) throws IOException { out.writeUTF(friend01); out.writeUTF(friend02); out.writeInt(hot); } //反序列化 @Override public void readFields(DataInput in) throws IOException { this.friend01 = in.readUTF(); this.friend02 = in.readUTF(); this.hot = in.readInt(); } public FriendSort() { super(); } public FriendSort(String friend01, String friend02, int hot) { super(); this.friend01 = friend01; this.friend02 = friend02; this.hot = hot; } public String getFriend01() { return friend01; } public void setFriend01(String friend01) { this.friend01 = friend01; } public String getFriend02() { return friend02; } public void setFriend02(String friend02) { this.friend02 = friend02; } public int getHot() { return hot; } public void setHot(int hot) { this.hot = hot; } }
ReduceStep02.java
package com.shsxt.mr.friend; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class ReduceStep02 extends Reducer<FriendSort, IntWritable, Text, NullWritable> { @Override protected void reduce(FriendSort friend, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable i : values) { //其实这里应该是不会重复了的 sum = i.get(); } String msg = friend.getFriend01()+" "+friend.getFriend02()+" "+sum; System.out.println(msg); context.write(new Text(msg), NullWritable.get()); } }
JobRunner.java
这个类中设置都是固定的,不做过多介绍了
package com.shsxt.mr.friend; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class JobRunner { public static void main(String[] args) throws Exception { boolean a = job01(); if (a) { job02(); } } static boolean job01() throws Exception{ Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://node02:8020"); conf.set("yarn.resourcemanager.hostname", "node02:8088"); Job job = Job.getInstance(conf); job.setJobName("job01.."); job.setJarByClass(JobRunner.class); job.setMapperClass(MapStep01.class); job.setReducerClass(ReduceStep01.class); job.setMapOutputKeyClass(FoF.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path("/friend/input/data.txt")); FileSystem fs = FileSystem.get(conf); Path out = new Path("/friend/output/01/"); if (fs.exists(out)) { fs.delete(out, true); } FileOutputFormat.setOutputPath(job, out); return job.waitForCompletion(true); } static boolean job02() throws Exception{ Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://node02:8020"); conf.set("yarn.resourcemanager.hostname", "node02:8088"); Job job = Job.getInstance(conf); job.setJobName("job02.."); job.setJarByClass(JobRunner.class); job.setMapperClass(MapStep02.class); job.setReducerClass(ReduceStep02.class); job.setMapOutputKeyClass(FriendSort.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path("/friend/output/01/")); FileSystem fs = FileSystem.get(conf); Path out = new Path("/friend/output/02/"); if (fs.exists(out)) { fs.delete(out, true); } FileOutputFormat.setOutputPath(job, out); return job.waitForCompletion(true); } }
运行结果:
cat hadoop 2 cat hello 2 cat mr 1 cat world 1 hadoop cat 2 hadoop hello 3 hadoop mr 1 hello cat 2 hello hadoop 3 hive tom 3 mr cat 1 mr hadoop 1 mr tom 1 mr world 2 tom hive 3 tom mr 1 tom world 2 world cat 1 world mr 2 world tom 2
统计每年每月气温最高的两天,下面是示例数据
1949-10-01 14:21:02 34c
1949-10-01 19:21:02 38c
1949-10-02 14:01:02 36c
1950-01-01 11:21:02 32c
1950-10-01 12:21:02 37c
1951-12-01 12:21:02 23c
1950-10-02 12:21:02 41c
1950-10-03 12:21:02 27c
1951-07-01 12:21:02 45c
1951-07-02 12:21:02 46c
TqMap.java
package com.shsxt.mr.tq; import java.io.IOException; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.Date; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.hsqldb.lib.StringUtil; public class TqMap extends Mapper<LongWritable, Text, TQ, Text> { TQ tq = new TQ(); Text vwd = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // value : 1949-10-01 14:21:02 34c try { String [] lines = StringUtil.split(value.toString(), "\t"); SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd"); Date date = null; date = dateFormat.parse(lines[0]); Calendar calendar = Calendar.getInstance(); calendar.setTime(date); tq.setYear(calendar.get(Calendar.YEAR)); tq.setMonth(calendar.get(Calendar.MONTH)+1); tq.setDay(calendar.get(calendar.DAY_OF_MONTH)); int wd = Integer.parseInt(lines[1].substring(0, lines[1].length()-1)); tq.setWd(wd); vwd.set(wd+""); //把输出的Key封装对象,方便后面比较操作 context.write(tq, vwd); } catch (ParseException e) { e.printStackTrace(); } } }
TQ.java
package com.shsxt.mr.tq; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class TQ implements WritableComparable<TQ>{ private int year; private int month; private int day; private int wd; //重写序列化 @Override public void write(DataOutput out) throws IOException { out.writeInt(year); out.writeInt(month); out.writeInt(day); out.writeInt(wd); } //重写反序列化 @Override public void readFields(DataInput in) throws IOException { this.year = in.readInt(); this.month = in.readInt(); this.day = in.readInt(); this.wd = in.readInt(); } //自定义比较器 @Override public int compareTo(TQ o) { int c1 = Integer.compare(this.year, o.getYear()); if (c1==0) { int c2 = Integer.compare(this.month, o.getMonth()); if (c2==0) { return Integer.compare(this.getDay(),o.getDay()); } return c2; } return c1; } public int getYear() { return year; } //省略 getter setter.... }
TqPartitioner.java
Map阶段,分区设置
package com.shsxt.mr.tq;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class TqPartitioner extends Partitioner<TQ, Text> {
@Override
public int getPartition(TQ key, Text value, int numPartitions) {
return key.getYear() % numPartitions;
}
}
TqGroupComparator.java
自定义分组聚合类
package com.shsxt.mr.tq; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class TqGroupComparator extends WritableComparator { public TqGroupComparator(){ super(TQ.class,true); } @Override public int compare(WritableComparable a, WritableComparable b) { TQ t1 = (TQ) a; TQ t2 = (TQ) b; //根据年月分组聚合 int c1 = Integer.compare(t1.getYear(), t2.getYear()); if (c1==0) { return Integer.compare(t1.getMonth(), t2.getMonth()); } return c1; } }
TqSortComparator.java
自定义排序类
package com.shsxt.mr.tq; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class TqSortComparator extends WritableComparator{ public TqSortComparator(){ super(TQ.class,true); } @Override public int compare(WritableComparable a, WritableComparable b) { TQ t1 = (TQ) a; TQ t2 = (TQ) b; int c1 = Integer.compare(t1.getYear(), t2.getYear()); if (c1==0) { int c2 = Integer.compare(t1.getMonth(), t2.getMonth()); if (c2==0) { //年月相同时,按照温度降序排序 return -Integer.compare(t1.getWd(), t2.getWd()); } return c2; } return c1; } }
TqReduce.java
package com.shsxt.mr.tq; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class TqReduce extends Reducer<TQ, Text, Text, Text> { Text rkey = new Text(); Text rvalue = new Text(); // 1949-10-01 34 // 1949-10-01 38 // 1949-10-01 37 // 1949-10-02 39 @Override protected void reduce(TQ key, Iterable<Text> values, Context context) throws IOException, InterruptedException { int flag = 0; int day = 0; // key 1949-10-01 values : {38,36,34} // key 1949-10-02 此时这两个都是一组的,因为按照年 月分组的,忽略了具体的day // 具体分组 例如: //1949-10 {38,36,34} --> 但是对应的key中的day实际可能不同 for (Text text : values) { //最高气温 if (flag==0) { day = key.getDay(); rkey.set(key.getYear()+"-"+key.getMonth()+"-"+key.getDay()); rvalue.set(key.getWd()+""); context.write(rkey,rvalue); flag++; } //找出与最高气温日期不同的,就是第二个高气温的时间 if (flag!=0 && day!=key.getDay()) { rkey.set(key.getYear()+"-"+key.getMonth()+"-"+key.getDay()); rvalue.set(key.getWd()+""); context.write(rkey, rvalue); break; } } } }
TQJobRunner.java
package com.shsxt.mr.tq; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class TQJobRunner { public static void main(String[] args) throws Exception { //1.设置集群相关配置 Configuration conf = new Configuration(true); conf.set("fs.defaultFS", "hdfs://node02:8020"); conf.set("yarn.resourcemanager.hostname", "node02:8088"); //2.创建一个Job,设置job相关的信息 Job job = Job.getInstance(conf); job.setJobName("tq"); job.setJarByClass(TQJobRunner.class); //3.设置要处理的数据位置 FileInputFormat.addInputPaths(job, "/tq/input/tq.txt"); Path output = new Path("/tq/output/"); FileSystem fs = FileSystem.get(conf); //4.判断输入路径是否存在,存在的话删除 if (fs.exists(output)) { fs.delete(output, true); } FileOutputFormat.setOutputPath(job, output); job.setMapperClass(TqMap.class); job.setMapOutputKeyClass(TQ.class); job.setMapOutputValueClass(Text.class); //5.设置reduce相关信息 job.setReducerClass(TqReduce.class); //Map分区三个,可以用三个reduce同时工作 job.setNumReduceTasks(3); //6.Map过程中分区信息设置 job.setPartitionerClass(TqPartitioner.class); //7.二次排序,在shuffle阶段,reduce之前执行 对温度进行倒序排序 job.setSortComparatorClass(TqSortComparator.class); //对数据进行按照年份月份 分组聚合 job.setGroupingComparatorClass(TqGroupComparator.class); job.setCombinerKeyGroupingComparatorClass(TqGroupComparator.class); job.waitForCompletion(true); System.out.println("success....."); } }
运行结果:
1950-1-1 32
1950-10-2 41
1950-10-1 37
1951-7-2 46
1951-7-1 45
1951-12-1 23
1949-10-1 38
1949-10-2 36
数据分成了三份,按照分区(年份)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。