当前位置:   article > 正文

MapReduce读取文本,实现降序排序_mapreduce降序

mapreduce降序

目录

1、Maven导入hadoop-client包

2、core-site.xml文件配置

3、log4j.properties 文件配置

4、Top5.java(主要代码)

5、测试数据

6、运行结果


1、Maven导入hadoop-client包

  1. <dependency>
  2. <groupId>org.apache.hadoop</groupId>
  3. <artifactId>hadoop-client</artifactId>
  4. <version>2.7.3</version>
  5. </dependency>

2、core-site.xml文件配置

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
  3. <configuration xmlns:xi="http://www.w3.org/2001/XInclude">
  4. <!--
  5. <property>
  6. <name>fs.defaultFS</name>
  7. <value>hdfs://localhost:8020</value>
  8. <descript>配置HDFS环境,不配置则默认使用Windows系统下的磁盘</descript>
  9. </property>
  10. -->
  11. <property>
  12. <name>fs.defaultFS</name>
  13. <value>file://34455/</value>
  14. <descript>使用Windows系统下的磁盘</descript>
  15. </property>
  16. </configuration>

3、log4j.properties 文件配置

  1. hadoop.root.logger=INFO,console
  2. hadoop.log.dir=.
  3. hadoop.log.file=hadoop.log
  4. log4j.threshold=ALL
  5. log4j.appender.NullAppender=org.apache.log4j.varia.NullAppender
  6. hadoop.log.maxfilesize=256MB
  7. hadoop.log.maxbackupindex=20
  8. log4j.appender.RFA=org.apache.log4j.RollingFileAppender
  9. log4j.appender.RFA.File=${hadoop.log.dir}/${hadoop.log.file}
  10. log4j.appender.RFA.MaxFileSize=${hadoop.log.maxfilesize}
  11. log4j.appender.RFA.MaxBackupIndex=${hadoop.log.maxbackupindex}
  12. log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
  13. log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
  14. log4j.appender.DRFA.File=${hadoop.log.dir}/${hadoop.log.file}
  15. log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
  16. log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
  17. log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
  18. log4j.logger.org.apache.hadoop.conf.Configuration=ERROR
  19. log4j.appender.console=org.apache.log4j.ConsoleAppender
  20. log4j.appender.console.target=System.err
  21. log4j.appender.console.layout=org.apache.log4j.PatternLayout
  22. log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
  23. #Default values
  24. hadoop.tasklog.taskid=null
  25. hadoop.tasklog.iscleanup=false
  26. hadoop.tasklog.noKeepSplits=4
  27. hadoop.tasklog.totalLogFileSize=100
  28. hadoop.tasklog.purgeLogSplits=true
  29. hadoop.tasklog.logsRetainHours=12
  30. log4j.appender.TLA=org.apache.hadoop.mapred.TaskLogAppender
  31. log4j.appender.TLA.taskId=${hadoop.tasklog.taskid}
  32. log4j.appender.TLA.isCleanup=${hadoop.tasklog.iscleanup}
  33. log4j.appender.TLA.totalLogFileSize=${hadoop.tasklog.totalLogFileSize}
  34. log4j.appender.TLA.layout=org.apache.log4j.PatternLayout
  35. log4j.appender.TLA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
  36. hadoop.security.logger=INFO,NullAppender
  37. hadoop.security.log.maxfilesize=256MB
  38. hadoop.security.log.maxbackupindex=20
  39. log4j.category.SecurityLogger=${hadoop.security.logger}
  40. hadoop.security.log.file=SecurityAuth-${user.name}.audit
  41. log4j.appender.RFAS=org.apache.log4j.RollingFileAppender
  42. log4j.appender.RFAS.File=${hadoop.log.dir}/${hadoop.security.log.file}
  43. log4j.appender.RFAS.layout=org.apache.log4j.PatternLayout
  44. log4j.appender.RFAS.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
  45. log4j.appender.RFAS.MaxFileSize=${hadoop.security.log.maxfilesize}
  46. log4j.appender.RFAS.MaxBackupIndex=${hadoop.security.log.maxbackupindex}
  47. log4j.appender.DRFAS=org.apache.log4j.DailyRollingFileAppender
  48. log4j.appender.DRFAS.File=${hadoop.log.dir}/${hadoop.security.log.file}
  49. log4j.appender.DRFAS.layout=org.apache.log4j.PatternLayout
  50. log4j.appender.DRFAS.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
  51. log4j.appender.DRFAS.DatePattern=.yyyy-MM-dd
  52. hdfs.audit.logger=INFO,NullAppender
  53. hdfs.audit.log.maxfilesize=256MB
  54. hdfs.audit.log.maxbackupindex=20
  55. log4j.logger.org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit=${hdfs.audit.logger}
  56. log4j.additivity.org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit=false
  57. log4j.appender.RFAAUDIT=org.apache.log4j.RollingFileAppender
  58. log4j.appender.RFAAUDIT.File=${hadoop.log.dir}/hdfs-audit.log
  59. log4j.appender.RFAAUDIT.layout=org.apache.log4j.PatternLayout
  60. log4j.appender.RFAAUDIT.layout.ConversionPattern=%d{ISO8601} %p %c{2}: %m%n
  61. log4j.appender.RFAAUDIT.MaxFileSize=${hdfs.audit.log.maxfilesize}
  62. log4j.appender.RFAAUDIT.MaxBackupIndex=${hdfs.audit.log.maxbackupindex}
  63. mapred.audit.logger=INFO,NullAppender
  64. mapred.audit.log.maxfilesize=256MB
  65. mapred.audit.log.maxbackupindex=20
  66. log4j.logger.org.apache.hadoop.mapred.AuditLogger=${mapred.audit.logger}
  67. log4j.additivity.org.apache.hadoop.mapred.AuditLogger=false
  68. log4j.appender.MRAUDIT=org.apache.log4j.RollingFileAppender
  69. log4j.appender.MRAUDIT.File=${hadoop.log.dir}/mapred-audit.log
  70. log4j.appender.MRAUDIT.layout=org.apache.log4j.PatternLayout
  71. log4j.appender.MRAUDIT.layout.ConversionPattern=%d{ISO8601} %p %c{2}: %m%n
  72. log4j.appender.MRAUDIT.MaxFileSize=${mapred.audit.log.maxfilesize}
  73. log4j.appender.MRAUDIT.MaxBackupIndex=${mapred.audit.log.maxbackupindex}
  74. hadoop.mapreduce.jobsummary.logger=${hadoop.root.logger}
  75. hadoop.mapreduce.jobsummary.log.file=hadoop-mapreduce.jobsummary.log
  76. hadoop.mapreduce.jobsummary.log.maxfilesize=256MB
  77. hadoop.mapreduce.jobsummary.log.maxbackupindex=20
  78. log4j.appender.JSA=org.apache.log4j.RollingFileAppender
  79. log4j.appender.JSA.File=${hadoop.log.dir}/${hadoop.mapreduce.jobsummary.log.file}
  80. log4j.appender.JSA.MaxFileSize=${hadoop.mapreduce.jobsummary.log.maxfilesize}
  81. log4j.appender.JSA.MaxBackupIndex=${hadoop.mapreduce.jobsummary.log.maxbackupindex}
  82. log4j.appender.JSA.layout=org.apache.log4j.PatternLayout
  83. log4j.appender.JSA.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
  84. log4j.logger.org.apache.hadoop.mapred.JobInProgress$JobSummary=${hadoop.mapreduce.jobsummary.logger}
  85. log4j.additivity.org.apache.hadoop.mapred.JobInProgress$JobSummary=false
  86. log4j.logger.org.apache.hadoop.yarn.server.resourcemanager.RMAppManager$ApplicationSummary=${yarn.server.resourcemanager.appsummary.logger}
  87. log4j.additivity.org.apache.hadoop.yarn.server.resourcemanager.RMAppManager$ApplicationSummary=false
  88. log4j.appender.RMSUMMARY=org.apache.log4j.RollingFileAppender
  89. log4j.appender.RMSUMMARY.File=${hadoop.log.dir}/${yarn.server.resourcemanager.appsummary.log.file}
  90. log4j.appender.RMSUMMARY.MaxFileSize=256MB
  91. log4j.appender.RMSUMMARY.MaxBackupIndex=20
  92. log4j.appender.RMSUMMARY.layout=org.apache.log4j.PatternLayout
  93. log4j.appender.RMSUMMARY.layout.ConversionPattern=%d{ISO8601} %p %c{2}: %m%n

4、Top5.java(主要代码)

  1. package com.gxwz.mapreduce;
  2. import java.io.IOException;
  3. import java.util.Arrays;
  4. import java.util.Collections;
  5. import java.util.Comparator;
  6. import java.util.HashMap;
  7. import java.util.LinkedList;
  8. import java.util.List;
  9. import java.util.Map;
  10. import java.util.Map.Entry;
  11. import org.apache.hadoop.conf.Configuration;
  12. import org.apache.hadoop.conf.Configured;
  13. import org.apache.hadoop.fs.FileSystem;
  14. import org.apache.hadoop.fs.Path;
  15. import org.apache.hadoop.io.IntWritable;
  16. import org.apache.hadoop.io.LongWritable;
  17. import org.apache.hadoop.io.Text;
  18. import org.apache.hadoop.mapreduce.Job;
  19. import org.apache.hadoop.mapreduce.Mapper;
  20. import org.apache.hadoop.mapreduce.Reducer;
  21. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  22. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  23. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  24. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
  25. import org.apache.hadoop.util.Tool;
  26. import org.apache.hadoop.util.ToolRunner;
  27. /**
  28. * TODO MapReduce读取文本,实现降序排序
  29. * @author com
  30. * @Date 2019年9月28日 Configured
  31. */
  32. public class Top5 extends Configured implements Tool {
  33. public static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
  34. Text outkey = new Text();
  35. IntWritable outval = new IntWritable(1);
  36. String [] line = null;
  37. @Override
  38. protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
  39. throws IOException, InterruptedException {
  40. line = value.toString().split("\t");
  41. if(null != line && line.length > 0 && Arrays.toString(line).length()>2) {
  42. for (String s : line) {
  43. outkey.set(s);
  44. context.write(outkey, outval);
  45. }
  46. }
  47. }
  48. }
  49. public static class MyReduce extends Reducer<Text, IntWritable, Text, LongWritable> {
  50. Text outkey = new Text();
  51. LongWritable outval = new LongWritable();
  52. Integer sum = new Integer(0); //非new生成的Long变量指向的是java常量池中的对象,而new Long()生成的变量指向堆中新建的对象,两者在内存中的地址不同
  53. Map<String, Long> map = new HashMap<String, Long>();
  54. @Override
  55. protected void reduce(Text key, Iterable<IntWritable> values,
  56. Reducer<Text, IntWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
  57. sum = 0;
  58. for (IntWritable value : values) {
  59. sum += value.get();
  60. }
  61. map.put(key.toString(), (long)sum);
  62. }
  63. @Override
  64. protected void cleanup(Reducer<Text, IntWritable, Text, LongWritable>.Context context)
  65. throws IOException, InterruptedException {
  66. List<Map.Entry<String, Long>> list = new LinkedList<Map.Entry<String,Long>>(map.entrySet());
  67. Collections.sort(list, new Comparator<Map.Entry<String,Long>>() {
  68. @Override
  69. public int compare(Entry<String, Long> o1, Entry<String, Long> o2) {
  70. return (int) (o2.getValue() - o1.getValue());
  71. }
  72. });
  73. for (Entry<String, Long> entry : list) {
  74. System.out.println(entry.getKey()+":"+entry.getValue());
  75. outkey.set(entry.getKey());
  76. outval.set(entry.getValue());
  77. context.write(outkey, outval);
  78. }
  79. }
  80. }
  81. @Override
  82. public int run(String[] args) throws Exception {
  83. // 1、配置文件获取
  84. Configuration conf = this.getConf();
  85. // 2、获取文件目录
  86. FileSystem fs = FileSystem.get(conf);
  87. // 3、定义 job的输入输出路径
  88. Path inpath = new Path(args[0]);
  89. Path outpath = new Path(args[1]);
  90. // 4、判断输出文件是否为空
  91. if(fs.exists(outpath)) {
  92. fs.delete(outpath, true);
  93. System.out.println("The old path has been deleted!");
  94. }
  95. // 5、获取一个job的实例
  96. Job job = Job.getInstance();
  97. // 6、设置MapReduce的打包类
  98. job.setJarByClass(Top5.class);
  99. // 7、设置Mapper类和Reducer类
  100. job.setMapperClass(MyMapper.class);
  101. job.setReducerClass(MyReduce.class);
  102. // 8、设置MR的输入输出格式
  103. job.setInputFormatClass(TextInputFormat.class);
  104. job.setOutputFormatClass(TextOutputFormat.class);
  105. // 9、因为Mapper的输出和Reducer的输出类型不一样,所有还需设置Mapper类的输出类
  106. job.setMapOutputKeyClass(Text.class);
  107. job.setMapOutputValueClass(IntWritable.class);
  108. // 10、设置job的输入输出路径
  109. FileInputFormat.addInputPath(job, inpath);
  110. FileOutputFormat.setOutputPath(job, outpath);
  111. // 11、提交job任务
  112. int result = job.waitForCompletion(true) ? 0 : 1;
  113. return result;
  114. }
  115. //C:\Users\com\Desktop\mr\top10\ C:\Users\com\Desktop\mr\top10\output\
  116. public static void main(String[] args) {
  117. String [] path = new String[2];
  118. path[0] = "C:\\Users\\com\\Desktop\\mr\\top10"; //输入路径
  119. path[1] = "C:\\Users\\com\\Desktop\\mr\\top10\\output"; //输出路径
  120. try {
  121. int result = ToolRunner.run(new Top5(), path);
  122. String msg = result==0?"job finish!":"job fail!";
  123. System.out.println(msg);
  124. System.exit(result);
  125. } catch (Exception e) {
  126. e.printStackTrace();
  127. }
  128. }
  129. }

5、测试数据

  1. 小明 小绿 小黑
  2. 小红 小红 小白
  3. 小蓝 小蓝 小蓝
  4. 小黑 小白 小黑
  5. 小红 小红 小黄
  6. 小黑 小白 小绿
  7. 小红 小蓝 小蓝
  8. 小红 小红 小黄
  9. 小绿 小蓝 小蓝
  10. 小黑 小白 小蓝

6、运行结果

  1. 小蓝 8
  2. 小红 7
  3. 小黑 5
  4. 小白 4
  5. 小绿 3
  6. 小黄 2
  7. 小明 1

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/天景科技苑/article/detail/747983
推荐阅读
  

闽ICP备14008679号