当前位置:   article > 正文

Hadoop的MapReduce使用_hadoop mapreduce使用

hadoop mapreduce使用

一、MapReduce框架结构

一个完整的mapreduce程序在分布式运行时有三类实例进程:

1、MRAppMaster:负责整个程序的过程调度及状态协调

2、MapTask:负责map阶段的整个数据处理流程

3、ReduceTask:负责reduce阶段的整个数据处理流程

二、MapReduce 编程规范及示例编写

2.1 编程规范

1、写一个类(MyMapper),继承hadoop框架的Mapper类,这个类就是map任务。我们只需要重写这个类的map方法(目的就是定义怎么检查每个组的作业)

2、写一个类(MyReducer),继承hadoop框架的Reducer类,这个类就是reduce任务。我们只需要重写这个类的reduce方法(目的就是定义怎么汇总那么多map任务的输出)

3、写一个普通的类(例如Demo),在Dem类中,创建一个Job对象,这个对象用于对MyMapper类和MyReducer类的配对,还用于配置MyMapper类的输出结果类型、输入数据来源。还用于配置MyReducer类的输出结果类型,输出结果目的地等等。

2.2 WordCount示例编写

需求:在一堆给定的文本文件中统计输出每一个单词出现的总次数

  1. public class Demo1_wordcount {
  2. private static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
  3. IntWritable v = new IntWritable(1);
  4. protected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {
  5. String[] arr = value.toString().split("\\s+");
  6. for (String item : arr) {
  7. value.set(item);
  8. context.write(value, v);
  9. }
  10. }
  11. }
  12. private static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
  13. IntWritable v = new IntWritable();
  14. @Override
  15. protected void reduce(Text key, Iterable<IntWritable> value, Context context)throws IOException, InterruptedException {
  16. int count = 0;
  17. Iterator<IntWritable> it = value.iterator();
  18. while(it.hasNext()){
  19. IntWritable next = it.next();
  20. count += next.get();
  21. }
  22. v.set(count);
  23. context.write(key, v);
  24. }
  25. }
  26. public static void main(String[] args) throws Exception {
  27. Job job = Job.getInstance();
  28. //指定当前程序所在的jar包
  29. job.setJarByClass(WordCountDemo01.class);
  30. //指定使用哪个Mapper类
  31. job.setMapperClass(MyMapper.class);
  32. //指定使用哪个Reducer类
  33. job.setReducerClass(MyReducer.class);
  34. //指定mapper的输出类型
  35. job.setMapOutputKeyClass(Text.class);
  36. job.setMapOutputValueClass(IntWritable.class);
  37. //指定reducer的输出类型
  38. job.setOutputKeyClass(Text.class);
  39. job.setOutputValueClass(IntWritable.class);
  40. //指定数据来源
  41. FileInputFormat.setInputPaths(job, new Path(args[0]));
  42. //FileInputFormat.setInputPaths(job, new Path("adata/mr/wordcount/in"));
  43. //指定目的地
  44. FileOutputFormat.setOutputPath(job, new Path(args[1]));
  45. //FileOutputFormat.setOutputPath(job, new Path("adata/mr/wordcount/out"));
  46. //提交任务
  47. //job.submit();看不到日志的,不用它
  48. boolean b = job.waitForCompletion(true);
  49. System.exit(b ? 0 : 1);
  50. }
  51. }

三、MapReduce 程序运行模式

3.1 本地运行模式

什么也不写,直接在工具上运行,默认就是本地运行模式。

前提:在windows中,解压好windows版本的hadoop,并且要配置好环境变量

  1. FileInputFormat.setInputPaths(job, new Path("adata/mr/wordcount/in"));
  2. FileOutputFormat.setOutputPath(job, new Path("adata/mr/wordcount/out"));

3.2 集群运行模式

1、注意修改数据来源是hdfs集群的路径、目的地也是集群中的路径

  1. FileInputFormat.setInputPaths(job, new Path(args[0]));
  2. FileOutputFormat.setOutputPath(job, new Path(args[1]));

2、对包含mr程序的项目打包,并且指定mainclass是谁(主方法所在的类就是mainclass)

  1. <plugin>
  2. <groupId>org.apache.maven.plugins</groupId>
  3. <artifactId>maven-jar-plugin</artifactId>
  4. <version>2.4</version>
  5. <configuration>
  6. <archive>
  7. <manifest>
  8. <addClasspath>true</addClasspath>
  9. <classpathPrefix>lib/</classpathPrefix>
  10. <mainClass>com.hadoop.Demo1_wordcount</mainClass>
  11. </manifest>
  12. </archive>
  13. </configuration>
  14. </plugin>

3、把打好的jar包,上传到linux系统中

4、hdfs集群中要提前创建好数据来源,并且hdfs、yarn集群先启动

start-all.sh

5、hadoop jar hadoop-1.0-SNAPSHOT.jar /wc/in /wc/out

这里的/wc/in会传输到args[0],/wc/out会传输到args[1]中

6、查看yarn集群界面:http://node1:8088

四、序列化

4.1  Writable 序列化接口  

  1. public class FlowBean implements Writable {
  2. private long upflow; //上行流量
  3. private long downflow; //下行流量
  4. public long getUpflow() {
  5. return upflow;
  6. }
  7. public void setUpflow(long upflow) {
  8. this.upflow = upflow;
  9. }
  10. public long getDownflow() {
  11. return downflow;
  12. }
  13. public void setDownflow(long downflow) {
  14. this.downflow = downflow;
  15. }
  16. @Override
  17. public void write(DataOutput out) throws IOException {
  18. //先序列化upflow
  19. out.writeLong(this.upflow);
  20. //再序列化downflow
  21. out.writeLong(this.downflow);
  22. }
  23. @Override
  24. public void readFields(DataInput in) throws IOException {
  25. //第一次调用readLong方法,其实是read到了upflow,因为序列化的时候,是先序列化upflow的
  26. this.upflow = in.readLong();
  27. this.downflow = in.readLong();
  28. }
  29. @Override
  30. public String toString() {
  31. return this.upflow + "\t" + this.downflow + "\t" + (this.upflow + this.downflow);
  32. }
  33. }

4.2 流量汇总

  1. public class Demo02_FlowSum {
  2. public static void main(String[] args) throws Exception {
  3. Job job = Job.getInstance();
  4. job.setJarByClass(Demo02_FlowSum.class);
  5. job.setMapOutputKeyClass(Text.class);
  6. job.setMapOutputValueClass(FlowBean.class);
  7. job.setOutputKeyClass(Text.class);
  8. job.setOutputValueClass(FlowBean.class);
  9. job.setMapperClass(MyMapper.class);
  10. job.setReducerClass(MyReducer.class);
  11. FileInputFormat.setInputPaths(job, new Path("adata/mr/wordcount/in"));
  12. FileOutputFormat.setOutputPath(job, new Path("adata/mr/wordcount/out1"));
  13. System.exit(job.waitForCompletion(true) ? 0 : 1);
  14. }
  15. public static class MyMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
  16. private Text k = new Text();
  17. private FlowBean v = new FlowBean();
  18. @Override
  19. protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  20. String[] arr = value.toString().split("\\s+");
  21. v.setUpflow(Integer.parseInt(arr[arr.length - 3]));
  22. v.setDownflow(Integer.parseInt(arr[arr.length - 2]));
  23. k.set(arr[1]);
  24. context.write(k, v);
  25. }
  26. }
  27. public static class MyReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
  28. private FlowBean flowBean = new FlowBean();
  29. @Override
  30. protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
  31. flowBean.setUpflow(0);
  32. flowBean.setDownflow(0);
  33. for (FlowBean value : values) {
  34. flowBean.setUpflow(value.getUpflow() + flowBean.getUpflow());
  35. flowBean.setDownflow(value.getDownflow() + flowBean.getDownflow());
  36. }
  37. context.write(key, flowBean);
  38. }
  39. }
  40. }

五、自定义排序

5.1 需求

在得出统计每一个用户(手机号)所耗费的总上行流量、下行流量,总流量结果的基础之上再加一个需求:将统计结果按照总流量倒序排序。

5.2 排序代码实现

  1. public class FlowBean1 implements WritableComparable<FlowBean1> {
  2. private String phone;
  3. private int upflow; //上行流量
  4. private int downflow; //下行流量
  5. public String getPhone() {
  6. return phone;
  7. }
  8. public void setPhone(String phone) {
  9. this.phone = phone;
  10. }
  11. public int getUpflow() {
  12. return upflow;
  13. }
  14. public void setUpflow(int upflow) {
  15. this.upflow = upflow;
  16. }
  17. public int getDownflow() {
  18. return downflow;
  19. }
  20. public void setDownflow(int downflow) {
  21. this.downflow = downflow;
  22. }
  23. @Override
  24. public void write(DataOutput out) throws IOException {
  25. out.writeUTF(this.phone);
  26. //先序列化upflow
  27. out.writeInt(this.upflow);
  28. //再序列化downflow
  29. out.writeInt(this.downflow);
  30. }
  31. @Override
  32. public void readFields(DataInput in) throws IOException {
  33. //第一次调用readLong方法,其实是read到了upflow,因为序列化的时候,是先序列化upflow的
  34. this.phone = in.readUTF();
  35. this.upflow = in.readInt();
  36. this.downflow = in.readInt();
  37. }
  38. @Override
  39. public String toString() {
  40. return this.phone + "\t" + this.upflow + "\t" + this.downflow + "\t" + (this.upflow + this.downflow);
  41. }
  42. /*
  43. 1、按照总流量从大到小的顺序排序
  44. 2、总流量一样的情况,按照手机号的字典顺序来排序
  45. */
  46. @Override
  47. public int compareTo(FlowBean1 o) {
  48. int sum = this.upflow + this.downflow - o.upflow - o.downflow;
  49. if (sum != 0) {
  50. return -sum;
  51. }
  52. int num = this.phone.compareTo(o.phone);
  53. if (num != 0) {
  54. return -num;
  55. }
  56. return 111;
  57. }
  58. }
  1. public class Demo03_FlowSumSort {
  2. public static void main(String[] args) throws Exception {
  3. Job job = Job.getInstance();
  4. job.setJarByClass(Demo03_FlowSumSort.class);
  5. job.setMapOutputKeyClass(FlowBean1.class);
  6. job.setMapOutputValueClass(NullWritable.class);
  7. job.setOutputKeyClass(FlowBean1.class);
  8. job.setOutputValueClass(NullWritable.class);
  9. job.setMapperClass(MyMapper.class);
  10. job.setReducerClass(MyReducer.class);
  11. FileInputFormat.setInputPaths(job, new Path("adata/mr/wordcount/out1"));
  12. FileOutputFormat.setOutputPath(job, new Path("adata/mr/wordcount/out2"));
  13. System.exit(job.waitForCompletion(true) ? 0 : 1);
  14. }
  15. public static class MyMapper extends Mapper<LongWritable, Text, FlowBean1, NullWritable> {
  16. private FlowBean1 bean = new FlowBean1();
  17. private NullWritable v = NullWritable.get();
  18. @Override
  19. protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  20. String[] arr = value.toString().split("\\s+");
  21. bean.setPhone(arr[0]);
  22. bean.setUpflow(Integer.parseInt(arr[1]));
  23. bean.setDownflow(Integer.parseInt(arr[2]));
  24. context.write(bean, v);
  25. }
  26. }
  27. public static class MyReducer extends Reducer<FlowBean1, NullWritable, FlowBean1, NullWritable> {
  28. private NullWritable v = NullWritable.get();
  29. @Override
  30. protected void reduce(FlowBean1 key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
  31. context.write(key, v);
  32. }
  33. }
  34. }

六、自定义分区

6.1 需求

将流量汇总统计结果,按照手机归属地不同省份输出到不同文件中。

6.2 代码实现

  1. public class Demo04_Partitioner {
  2. public static void main(String[] args) throws Exception {
  3. Job job = Job.getInstance();
  4. job.setJarByClass(Demo04_Partitioner.class);
  5. job.setMapOutputKeyClass(Text.class);
  6. job.setMapOutputValueClass(NullWritable.class);
  7. job.setOutputKeyClass(Text.class);
  8. job.setOutputValueClass(NullWritable.class);
  9. job.setMapperClass(MyMapper.class);
  10. job.setReducerClass(MyReducer.class);
  11. job.setPartitionerClass(MyPartitioner.class);//设置使用自己的分区类
  12. job.setNumReduceTasks(3);//有几个分区,就设置几个reducetask
  13. FileInputFormat.setInputPaths(job, new Path("adata/mr/wordcount/out2"));
  14. FileOutputFormat.setOutputPath(job, new Path("adata/mr/wordcount/out3"));
  15. System.exit(job.waitForCompletion(true) ? 0 : 1);
  16. }
  17. public static class MyMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
  18. @Override
  19. protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  20. context.write(value, NullWritable.get());
  21. }
  22. }
  23. public static class MyPartitioner extends Partitioner<Text, NullWritable> {
  24. @Override
  25. public int getPartition(Text text, NullWritable nullWritable, int numPartitions) {
  26. String phone = text.toString().substring(0, 3);
  27. switch (phone) {
  28. case "134":
  29. case "135":
  30. case "136":
  31. return 0;
  32. case "137":
  33. case "138":
  34. return 1;
  35. }
  36. return 2;
  37. }
  38. }
  39. public static class MyReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
  40. private NullWritable v = NullWritable.get();
  41. @Override
  42. protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
  43. context.write(key, v);
  44. }
  45. }
  46. }

七、combiner的使用

每一个 map 都可能会产生大量的本地输出,Combiner 的作用就是对 map 端的输出先做一次合并,以减少在 map 和 reduce 节点之间的数据传输量,以提高网络 IO 性能,是 MapReduce 的一种优化手段之一。

Combiner的父类,也是Reducer。使用时,可以公用MyReducer类,作为Combinner类 。注意:Combiner的输出类型,必须和Mapper的输出类型一致。

代码实现:

  1. public class Demo05_WordCount {
  2. public static void main(String[] args) throws Exception {
  3. Job job = Job.getInstance();
  4. job.setCombinerClass(MyCombiner.class);
  5. //指定当前程序所在的jar包
  6. job.setJarByClass(Demo05_WordCount.class);
  7. //指定数据来源
  8. FileInputFormat.setInputPaths(job,new Path("adata/mr/wordcount/in"));
  9. //指定目的地
  10. FileOutputFormat.setOutputPath(job,new Path("adata/mr/wordcount/out4"));
  11. //指定使用哪个mapper类
  12. job.setMapperClass(MyMapper.class);
  13. //指定使用哪个Reducer类
  14. job.setReducerClass(MyReducer.class);
  15. //指定map方法的输出类型
  16. job.setMapOutputKeyClass(Text.class);
  17. job.setMapOutputValueClass(IntWritable.class);
  18. //指定reduce方法的输出类型
  19. job.setOutputKeyClass(Text.class);
  20. job.setOutputValueClass(IntWritable.class);
  21. //提交任务
  22. //job.submit();看不到日志的,不用
  23. boolean b = job.waitForCompletion(true);
  24. //如果是0,就代表程序运行正常,正常退出虚拟机。非0就代表异常
  25. System.exit(b ? 0 : 443);
  26. }
  27. public static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
  28. private IntWritable v = new IntWritable(1);
  29. private Text k = new Text();
  30. @Override
  31. protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  32. String[] arr = value.toString().split("\\s+");
  33. for (String str : arr) {
  34. k.set(str);
  35. context.write(k, v);
  36. }
  37. }
  38. }
  39. public static class MyCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
  40. private IntWritable v = new IntWritable();
  41. @Override
  42. protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
  43. int count = 0;
  44. for (IntWritable value : values) {
  45. count += value.get();
  46. }
  47. v.set(count);
  48. context.write(key, v);
  49. }
  50. }
  51. public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
  52. @Override
  53. protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
  54. for (IntWritable value : values) {
  55. context.write(key, value);
  56. }
  57. }
  58. }
  59. }

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Cpp五条/article/detail/567765
推荐阅读
相关标签
  

闽ICP备14008679号