当前位置:   article > 正文

云计算复习之MapReduce & YARN期末复习整理_mapreduce期末考试题

mapreduce期末考试题

自定义分区

一个例题:

最终输出:

如何实现呢?

思考之后发现这个就是把表中的商品id和点击次数去进行一个排序再输出。主要难点在于怎么去构造map和reduce的过程。先来看看MapReduce的流程:
分布式程序有三个流程:(1)MRAppMaster
                                        (2)MapTask
                                        (3)ReduceTask
说白了就是首先申请资源,然后执行分而治之的过程,把一个大任务拆分成小的然后重新组合,再输出给HDFS

下面我给出MapReduce的执行图

(本来做了个图结果没保存md)

Map

1.逻辑切片,即考虑出解决问题应当启动几个MapTask,默认情况下:

切片大小=底层文件存储的块大小(128M)

eg:有两个文件a.txt,b.txt,分别装了300M,200M的东西,a将切成两个128M和一个44M大小的块,b切成两个块,所以总共用5个块,也就是5个MapTask处理

2.TextInputFormat,用来按行读数据的,对切片的数据返回<key,value>,key就是行号,value就是本行的内容

3.调用map进行处理,图上很清晰,就是buffer缓存,spill溢出的数据写入缓冲区,溢出到磁盘上,sort排序,merge合并。这边举个例子来解释为什么要搞个缓冲区:

有一个水龙头打开防水,一下子放太多冲击力太大了,所以用一个水杯来缓冲一下水流,倒满水杯溢出的水再流出出水口去,起了缓冲效果

 Reduce过程

这边注意!ReduceTask主动从MapTask复制拉取所需数据

把来的数据merge,sort,

分组,如果排序号的数据键相等的分为一组,最后写入HDFS中。

上图中红色框框的为shuffle过程,意思是洗牌,将map端的无序输出变成有序输出

但是由于这个步骤繁多,磁盘之间多次往复,所以导致了速度慢

以一个简单的例子来说明MapReduce中的Shuffle过程。
假设我们有一个任务,需要计算某个地区所有用户的平均年龄。数据存储在多个文件中,每个文件包含一部分用户的年龄信息。
Map阶段:
每个Map任务读取一个文件,将文件中的每一行(即每个用户的年龄)作为输入。
Map任务将输入数据转换为键值对的形式,例如(用户ID, 年龄)。
Map任务将处理后的键值对输出到本地磁盘上的环形缓冲区。
Shuffle阶段:
在Shuffle阶段,MapReduce框架从每个Map任务的输出中提取键值对,并根据键(这里是用户ID)进行排序和分组。
对于每个键,Shuffle过程会找到所有与之相关的值(这里是年龄),并将它们聚集到一个位置。
最终,Shuffle过程会根据用户ID将相关的年龄值分组到一起。
Reduce阶段:
Reduce任务接收Shuffle阶段的输出,即按照用户ID分组的相关年龄值。
Reduce任务计算每个用户的平均年龄,并将结果输出到最终的输出文件中。

再举个例子:

YARN

YARN意思是另一种资源协调者,通用资源管理系统与调度平台。

通用:支持各种计算程序。

YARN理解为横跨多台计算机的操作系统平台。

分类

有RM,NM,AM(一个大哥带三个小弟),RM决定程序间资源分配的最终权限,仲裁者。

NM,干活的,分配回收资源,可以启动container容器,监视容器

AM是应用层面,程序内部资源,监督程序,随着程序而出现。

这八步就是MR和YARN的交互流程。

回到本题来,本题分为四个函数来解决,自定义key,自定义分区函数类,map部分,reduce部分。用到了MapReduce的自定键值对,函数。

一个排序函数就是要进行两次排序,就是先判断点击次数是不是一样,一样的情况下再判断商品id,排序。

  1. public static class IntPair implements WritableComparable<IntPair>
  2. {
  3. int first; //第一个成员变量
  4. int second; //第二个成员变量
  5. public void set(int left, int right)
  6. {
  7. first = left;
  8. second = right;
  9. }
  10. public int getFirst()
  11. {
  12. return first;
  13. }
  14. public int getSecond()
  15. {
  16. return second;
  17. }
  18. @Override
  19. //反序列化,从流中的二进制转换成IntPair
  20. public void readFields(DataInput in) throws IOException
  21. {
  22. // TODO Auto-generated method stub
  23. first = in.readInt();
  24. second = in.readInt();
  25. }
  26. @Override
  27. //序列化,将IntPair转化成使用流传送的二进制
  28. public void write(DataOutput out) throws IOException
  29. {
  30. // TODO Auto-generated method stub
  31. out.writeInt(first);
  32. out.writeInt(second);
  33. }
  34. @Override
  35. //key的比较
  36. public int compareTo(IntPair o)
  37. {
  38. // TODO Auto-generated method stub
  39. if (first != o.first)
  40. {
  41. return first < o.first ? 1 : -1;
  42. }
  43. else if (second != o.second)
  44. {
  45. return second < o.second ? -1 : 1;
  46. }
  47. else
  48. {
  49. return 0;
  50. }
  51. }
  52. @Override
  53. public int hashCode()
  54. {
  55. return first * 157 + second;
  56. }
  57. @Override
  58. public boolean equals(Object right)
  59. {
  60. if (right == null)
  61. return false;
  62. if (this == right)
  63. return true;
  64. if (right instanceof IntPair)
  65. {
  66. IntPair r = (IntPair) right;
  67. return r.first == first && r.second == second;
  68. }
  69. else
  70. {
  71. return false;
  72. }
  73. }
  74. }

(我看不太懂,反正就是比较)所有自定义的key应该实现接口WritableComparable,因为是可序列的并且可比较的,并重载方法。该类中包含以下几种方法:1.反序列化,从流中的二进制转换成IntPair 方法为public void readFields(DataInput in) throws IOException 2.序列化,将IntPair转化成使用流传送的二进制 方法为public void write(DataOutput out)3. key的比较 public int compareTo(IntPair o) 另外新定义的类应该重写的两个方法 public int hashCode() 和public boolean equals(Object right) 。

 以下是分区函数部分,parirition

  1. public static class FirstPartitioner extends Partitioner<IntPair, IntWritable>
  2. {
  3. @Override
  4. public int getPartition(IntPair key, IntWritable value,int numPartitions)
  5. {
  6. return Math.abs(key.getFirst() * 127) % numPartitions;
  7. }
  8. }

对key进行分区,根据自定义key中first乘以127取绝对值在对numPartions取余来进行分区。这主要是为实现第一次排序。

分组函数类代码

  1. public static class GroupingComparator extends WritableComparator
  2. {
  3. protected GroupingComparator()
  4. {
  5. super(IntPair.class, true);
  6. }
  7. @Override
  8. //Compare two WritableComparables.
  9. public int compare(WritableComparable w1, WritableComparable w2)
  10. {
  11. IntPair ip1 = (IntPair) w1;
  12. IntPair ip2 = (IntPair) w2;
  13. int l = ip1.getFirst();
  14. int r = ip2.getFirst();
  15. return l == r ? 0 : (l < r ? -1 : 1);
  16. }
  17. }

这些也是属于基础的一个分组。

Map部分

  1. public static class Map extends Mapper<LongWritable, Text, IntPair, IntWritable>
  2. {
  3. //自定义map
  4. private final IntPair intkey = new IntPair();
  5. private final IntWritable intvalue = new IntWritable();
  6. public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
  7. {
  8. String line = value.toString();
  9. StringTokenizer tokenizer = new StringTokenizer(line);
  10. int left = 0;
  11. int right = 0;
  12. if (tokenizer.hasMoreTokens())
  13. {
  14. left = Integer.parseInt(tokenizer.nextToken());
  15. if (tokenizer.hasMoreTokens())
  16. right = Integer.parseInt(tokenizer.nextToken());
  17. intkey.set(right, left);
  18. intvalue.set(left);
  19. context.write(intkey, intvalue);
  20. }
  21. }
  22. }

在map阶段,使用job.setInputFormatClass定义的InputFormat将输入的数据集分割成小数据块splites,同时InputFormat提供一个RecordReder的实现。本例子中使用的是TextInputFormat,他提供的RecordReder会将文本的一行的行号作为key,这一行的文本作为value。这就是自定义Map的输入是<LongWritable, Text>的原因。然后调用自定义Map的map方法,将一个个<LongWritable, Text>键值对输入给Map的map方法。注意输出应该符合自定义Map中定义的输出<IntPair, IntWritable>。最终是生成一个List<IntPair, IntWritable>。在map阶段的最后,会先调用job.setPartitionerClass对这个List进行分区,每个分区映射到一个reducer。每个分区内又调用job.setSortComparatorClass设置的key比较函数类排序。可以看到,这本身就是一个二次排序。如果没有通过job.setSortComparatorClass设置key比较函数类,则使用key实现compareTo方法。在本例子中,使用了IntPair实现RcompareTo方法。

Reduce部分

  1. public static class Reduce extends Reducer<IntPair, IntWritable, Text, IntWritable>
  2. {
  3. private final Text left = new Text();
  4. private static final Text SEPARATOR = new Text("------------------------------------------------");
  5. public void reduce(IntPair key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException
  6. {
  7. context.write(SEPARATOR, null);
  8. left.set(Integer.toString(key.getFirst()));
  9. System.out.println(left);
  10. for (IntWritable val : values)
  11. {
  12. context.write(left, val);
  13. //System.out.println(val);
  14. }
  15. }
  16. }

在reduce阶段,reducer接收到所有映射到这个reducer的map输出后,也是会调用job.setSortComparatorClass设置的key比较函数类对所有数据对排序。然后开始构造一个key对应的value迭代器。这时就要用到分组,使用job.setGroupingComparatorClass设置的分组函数类。只要这个比较器比较的两个key相同,他们就属于同一个组,它们的value放在一个value迭代器,而这个迭代器的key使用属于同一个组的所有key的第一个key。最后就是进入Reducer的reduce方法,reduce方法的输入是所有的key和它的value迭代器。同样注意输入与输出的类型必须与自定义的Reducer中声明的一致。

源码:

  1. package mapreduce;
  2. import java.io.DataInput;
  3. import java.io.DataOutput;
  4. import java.io.IOException;
  5. import java.util.StringTokenizer;
  6. import org.apache.hadoop.conf.Configuration;
  7. import org.apache.hadoop.fs.Path;
  8. import org.apache.hadoop.io.IntWritable;
  9. import org.apache.hadoop.io.LongWritable;
  10. import org.apache.hadoop.io.Text;
  11. import org.apache.hadoop.io.WritableComparable;
  12. import org.apache.hadoop.io.WritableComparator;
  13. import org.apache.hadoop.mapreduce.Job;
  14. import org.apache.hadoop.mapreduce.Mapper;
  15. import org.apache.hadoop.mapreduce.Partitioner;
  16. import org.apache.hadoop.mapreduce.Reducer;
  17. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  18. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  19. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  20. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
  21. public class SecondarySort
  22. {
  23. public static class IntPair implements WritableComparable<IntPair>
  24. {
  25. int first;
  26. int second;
  27. public void set(int left, int right)
  28. {
  29. first = left;
  30. second = right;
  31. }
  32. public int getFirst()
  33. {
  34. return first;
  35. }
  36. public int getSecond()
  37. {
  38. return second;
  39. }
  40. @Override
  41. public void readFields(DataInput in) throws IOException
  42. {
  43. // TODO Auto-generated method stub
  44. first = in.readInt();
  45. second = in.readInt();
  46. }
  47. @Override
  48. public void write(DataOutput out) throws IOException
  49. {
  50. // TODO Auto-generated method stub
  51. out.writeInt(first);
  52. out.writeInt(second);
  53. }
  54. @Override
  55. public int compareTo(IntPair o)
  56. {
  57. // TODO Auto-generated method stub
  58. if (first != o.first)
  59. {
  60. return first < o.first ? 1 : -1;
  61. }
  62. else if (second != o.second)
  63. {
  64. return second < o.second ? -1 : 1;
  65. }
  66. else
  67. {
  68. return 0;
  69. }
  70. }
  71. @Override
  72. public int hashCode()
  73. {
  74. return first * 157 + second;
  75. }
  76. @Override
  77. public boolean equals(Object right)
  78. {
  79. if (right == null)
  80. return false;
  81. if (this == right)
  82. return true;
  83. if (right instanceof IntPair)
  84. {
  85. IntPair r = (IntPair) right;
  86. return r.first == first && r.second == second;
  87. }
  88. else
  89. {
  90. return false;
  91. }
  92. }
  93. }
  94. public static class FirstPartitioner extends Partitioner<IntPair, IntWritable>
  95. {
  96. @Override
  97. public int getPartition(IntPair key, IntWritable value,int numPartitions)
  98. {
  99. return Math.abs(key.getFirst() * 127) % numPartitions;
  100. }
  101. }
  102. public static class GroupingComparator extends WritableComparator
  103. {
  104. protected GroupingComparator()
  105. {
  106. super(IntPair.class, true);
  107. }
  108. @Override
  109. //Compare two WritableComparables.
  110. public int compare(WritableComparable w1, WritableComparable w2)
  111. {
  112. IntPair ip1 = (IntPair) w1;
  113. IntPair ip2 = (IntPair) w2;
  114. int l = ip1.getFirst();
  115. int r = ip2.getFirst();
  116. return l == r ? 0 : (l < r ? -1 : 1);
  117. }
  118. }
  119. public static class Map extends Mapper<LongWritable, Text, IntPair, IntWritable>
  120. {
  121. private final IntPair intkey = new IntPair();
  122. private final IntWritable intvalue = new IntWritable();
  123. public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
  124. {
  125. String line = value.toString();
  126. StringTokenizer tokenizer = new StringTokenizer(line);
  127. int left = 0;
  128. int right = 0;
  129. if (tokenizer.hasMoreTokens())
  130. {
  131. left = Integer.parseInt(tokenizer.nextToken());
  132. if (tokenizer.hasMoreTokens())
  133. right = Integer.parseInt(tokenizer.nextToken());
  134. intkey.set(right, left);
  135. intvalue.set(left);
  136. context.write(intkey, intvalue);
  137. }
  138. }
  139. }
  140. public static class Reduce extends Reducer<IntPair, IntWritable, Text, IntWritable>
  141. {
  142. private final Text left = new Text();
  143. private static final Text SEPARATOR = new Text("------------------------------------------------");
  144. public void reduce(IntPair key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException
  145. {
  146. context.write(SEPARATOR, null);
  147. left.set(Integer.toString(key.getFirst()));
  148. System.out.println(left);
  149. for (IntWritable val : values)
  150. {
  151. context.write(left, val);
  152. //System.out.println(val);
  153. }
  154. }
  155. }
  156. public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException
  157. {
  158. Configuration conf = new Configuration();
  159. Job job = new Job(conf, "secondarysort");
  160. job.setJarByClass(SecondarySort.class);
  161. job.setMapperClass(Map.class);
  162. job.setReducerClass(Reduce.class);
  163. job.setPartitionerClass(FirstPartitioner.class);
  164. job.setGroupingComparatorClass(GroupingComparator.class);
  165. job.setMapOutputKeyClass(IntPair.class);
  166. job.setMapOutputValueClass(IntWritable.class);
  167. job.setOutputKeyClass(Text.class);
  168. job.setOutputValueClass(IntWritable.class);
  169. job.setInputFormatClass(TextInputFormat.class);
  170. job.setOutputFormatClass(TextOutputFormat.class);
  171. String[] otherArgs=new String[2];
  172. otherArgs[0]="hdfs://localhost:9000/mymapreduce8/in/goods_visit2";
  173. otherArgs[1]="hdfs://localhost:9000/mymapreduce8/out";
  174. FileInputFormat.setInputPaths(job, new Path(otherArgs[0]));
  175. FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
  176. System.exit(job.waitForCompletion(true) ? 0 : 1);
  177. }
  178. }

弄懂了Map和Reduce的流程,这个简单的例题应该也能看懂,有不懂的可以一起探讨以下

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/1002006
推荐阅读
相关标签
  

闽ICP备14008679号