赞
踩
最简单的MapReduce应用程序至少包含 3 个部分:一个 Map 函数、一个 Reduce 函数和一个 main 函数。在运行一个mapreduce计算任务时候,任务过程被分为两个阶段:map阶段和reduce阶段,每个阶段都是用键值对(key/value)作为输入(input)和输出(output)。main 函数将作业控制和文件输入/输出结合起来。
请参见之前小节的Hadoop集群搭建、windows上部署hadoop包、HDFS API开发等文档;
eclipse JavaSE 开发环境,hadoop-eclipse-plugin插件准备;
Hadoop jar 包准备:
找到windows上部署的hadoop根目录(hadoop开发包下载),需要jar包:
Hadoop-2.6.5\share\hadoop\hdfs\hadoop-hdfs-2.6.5.jar
hadoop-2.6.5\share\hadoop\hdfs\lib\所有jar包
hadoop-2.6.5\share\hadoop\common\hadoop-common-2.6.5.jar
hadoop-2.6.5\share\hadoop\common\lib\所有jar包
hadoop-2.6.5\share\hadoop\mapreduce\除hadoop-mapreduce-examples-2.6.5.jar之外的jar包
hadoop-2.6.5\share\hadoop\mapreduce\lib\所有jar包
将上述jar包新建user library导入eclipse中。(另:hadoop源码包下载,可深入研究hadoop框架底层实现。)
开启Zookeeper集群、Hadoop HA集群、MapReduce集群。
统计每一个单词在整个数据集中出现的总次数(wordcount)。
MapReduce is a programming model and an associated implementation for processing and generating big data sets with a parallel, distributed algorithm on a cluster
/* * KEYIN:输入kv数据对中key的数据类型 * VALUEIN:输入kv数据对中value的数据类型 * KEYOUT:输出kv数据对中key的数据类型 * VALUEOUT:输出kv数据对中value的数据类型 */ public class Mapwc extends Mapper<LongWritable, Text, Text, IntWritable>{ /* * map方法是提供给map task进程来调用的,map task进程是每读取一行文本来调用一次自定义的map方法 * map task在调用map方法时,传递的参数: * 一行的起始偏移量LongWritable作为key * 一行的文本内容Text作为value */ @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { String line = value.toString(); StringTokenizer words = new StringTokenizer(line); while(words.hasMoreTokens()){ context.write(new Text(words.nextToken()), new IntWritable(1)); } } }
public class Reducewc extends Reducer<Text, IntWritable, Text, IntWritable>{ /* * reduce方法提供给reduce task进程来调用 * * reduce task会将shuffle阶段分发过来的大量kv数据对进行聚合,其机制是相同key的kv对聚合为一组 * 然后reduce task对每一组聚合kv调用一次我们自定义的reduce方法 * 比如:<hello,1><hello,1><hello,1><tom,1><tom,1><tom,1> * hello组会调用一次reduce方法进行处理,tom组也会调用一次reduce方法进行处理 * 调用时传递的参数: * key:一组kv中的key * values:一组kv中所有value的迭代器 */ @Override protected void reduce(Text words, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException { int sum = 0; for(IntWritable s : values){ sum+=s.get(); } context.write(words, new IntWritable(sum)); } }
public class Jobwc { public static void main(String[] args) throws IOException { //1.环境变量 Configuration configuration = new Configuration(); configuration.set("fs.defaultFS","hdfs://node01:8020"); configuration.set("yarn.resourcemanager.hostname", "node02:8088"); //2.设置Job任务的相关信息 Job job = Job.getInstance(configuration); job.setJarByClass(Jobwc.class); job.setJobName("wc"); job.setMapperClass(Mapwc.class); job.setReducerClass(Reducewc.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //3.输入数据文件,读取HDFS上的文件 FileInputFormat.addInputPaths(job, "/wc/input/word.txt"); //4.输出结果到制定地方 Path path = new Path("/wc/output"); FileSystem fs = FileSystem.get(configuration); if (fs.exists(path)) { fs.delete(path,true); } FileOutputFormat.setOutputPath(job, path); //5.结束 boolean f; try { f = job.waitForCompletion(true); if (f) { System.out.println("job success !"); } else { System.out.println("-------------"); } } catch (ClassNotFoundException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
MapReduce 1 a 3 algorithm 1 an 1 and 2 associated 1 big 1 cluster 1 data 1 distributed 1 for 1 generating 1 implementation 1 is 1 model 1 on 1 parallel, 1 processing 1 programming 1 sets 1 with 1
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。