赞
踩
2
32
654
32
15
756
65223
5956
22
650
92
26
54
6
1 2
2 6
3 15
4 22
5 26
6 32
7 32
8 54
9 92
10 650
11 654
12 756
13 5956
14 65223
这主要是因为使用map和reduce就已经能够完成任务了。
2、MapReduce实现
Map代码
- import java.io.IOException;
-
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Mapper;
-
- public class SortMapper extends Mapper<LongWritable, Text, IntWritable, Text> {
-
- private Text val = new Text("");
- @Override
- protected void map(LongWritable key, Text value,Context context)
- throws IOException, InterruptedException {
- String line = value.toString();
- if(line.trim().length()>0){
- context.write(new IntWritable(Integer.valueOf(line.trim())), val);
- }
- }
-
- }
Reduce代码
- import java.io.IOException;
-
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Reducer;
-
- public class SortReducer extends Reducer<IntWritable, Text, IntWritable, IntWritable> {
-
- private IntWritable num = new IntWritable(1);
- @Override
- protected void reduce(IntWritable key, Iterable<Text> values,Context context)
- throws IOException, InterruptedException {
-
- for(Text val:values){
- context.write(num, key);
- num = new IntWritable(num.get()+1);
- }
-
- }
-
- }
程序入口
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
- public class JobMain {
-
- /**
- * @param args
- */
- public static void main(String[] args)throws Exception {
- Configuration configuration = new Configuration();
- Job job = new Job(configuration,"sort-job");
- job.setJarByClass(JobMain.class);
-
- job.setMapperClass(SortMapper.class);
- job.setMapOutputKeyClass(IntWritable.class);
- job.setMapOutputValueClass(Text.class);
-
- job.setReducerClass(SortReducer.class);
- job.setOutputKeyClass(IntWritable.class);
- job.setOutputValueClass(IntWritable.class);
-
- FileInputFormat.addInputPath(job, new Path(args[0]));
- FileSystem fs = FileSystem.get(configuration);
- Path outputDir = new Path(args[1]);
- if(fs.exists(outputDir)){
- fs.delete(outputDir, true);
- }
- FileOutputFormat.setOutputPath(job, outputDir);
-
- System.exit(job.waitForCompletion(true)?0:1);
-
- }
-
- }
- val three = sc.textFile("/tmp/spark/three",3)
-
- var idx = 0
- import org.apache.spark.HashPartitioner
-
- val res = three.filter(_.trim().length>0).map(num=>(num.trim.toInt,"")).partitionBy(new HashPartitioner(1)).sortByKey().map(t => {
- idx += 1
- (idx,t._1)
- }).collect.foreach(x => println(x._1 +"\t" + x._2) )
由入输入文件有多个,产生不同的分区,为了生产序号,使用HashPartitioner将中间的RDD归约到一起。
最后结果是一样的
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。