3.txt1hadoop fs -put 3.txt /tmp/3.txt全文的例子均以该文件做测试用例,统计单词出现的次数(WordCount)。 1、原生态的_hadoop中执行mapreduce任务的几种方式">
赞
踩
说明:
测试文件:
1 | echo -e "aa\tbb \tcc\nbb\tcc\tdd" > 3.txt |
1 | hadoop fs -put 3.txt /tmp/3.txt |
全文的例子均以该文件做测试用例,统计单词出现的次数(WordCount)。
1、原生态的方式:java 源码编译打包成jar包后,由 hadoop 脚本调度执行,举例:
01 | import java.io.IOException; |
02 | import java.util.StringTokenizer; |
03 |
04 | import org.apache.hadoop.conf.Configuration; |
05 | import org.apache.hadoop.fs.Path; |
06 | import org.apache.hadoop.io.IntWritable; |
07 | import org.apache.hadoop.io.Text; |
08 | import org.apache.hadoop.mapreduce.Job; |
09 | import org.apache.hadoop.mapreduce.Mapper; |
10 | import org.apache.hadoop.mapreduce.Reducer; |
11 | import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; |
12 | import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; |
13 | import org.apache.hadoop.util.GenericOptionsParser; |
14 |
15 | public class WordCount { |
16 |
17 | public static class TokenizerMapper extends |
18 | Mapper<Object, Text, Text, IntWritable> { |
19 | /** |
20 | * LongWritable, IntWritable, Text 均是 Hadoop 中实现的用于封装 Java 数据类型的类, |
21 | * 这些类实现了WritableComparable接口, |
22 | * 都能够被串行化从而便于在分布式环境中进行数据交换, |
23 | * 你可以将它们分别视为long,int,String 的替代品。 |
24 | */ |
25 | // IntWritable one 相当于 java 原生类型 int 1 |
26 | private final static IntWritable one = new IntWritable( 1 ); |
27 | private Text word = new Text(); |
28 |
29 | public void map(Object key, Text value, Context context) |
30 | throws IOException, InterruptedException { |
31 | // 每行记录都会调用 map 方法处理,此处是每行都被分词 |
32 | StringTokenizer itr = new StringTokenizer(value.toString()); |
33 | while (itr.hasMoreTokens()) { |
34 | word.set(itr.nextToken()); |
35 | // 输出每个词及其出现的次数 1,类似 <word1,1><word2,1><word1,1> |
36 | context.write(word, one); |
37 | } |
38 | } |
39 | } |
40 |
41 | public static class IntSumReducer extends |
42 | Reducer<Text, IntWritable, Text, IntWritable> { |
43 | private IntWritable result = new IntWritable(); |
44 |
45 | public void reduce(Text key, Iterable<IntWritable> values, |
46 | Context context) throws IOException, InterruptedException { |
47 | // key 相同的键值对会被分发到同一个 reduce中处理 |
48 | // 例如 <word1,<1,1>>在 reduce1 中处理,而<word2,<1>> 会在 reduce2 中处理 |
49 | int sum = 0 ; |
50 | // 相同的key(单词)的出现次数会被 sum 累加 |
51 | for (IntWritable val : values) { |
52 | sum += val.get(); |
53 | } |
54 | result.set(sum); |
55 | // 1个 reduce 处理完1 个键值对后,会输出其 key(单词)对应的结果(出现次数) |
56 | context.write(key, result); |
57 | } |
58 | } |
59 |
60 | public static void main(String[] args) throws Exception { |
61 | Configuration conf = new Configuration(); |
62 | // 多队列hadoop集群中,设置使用的队列 |
63 | conf.set( "mapred.job.queue.name" , "regular" ); |
64 | // 之所以此处不直接用 argv[1] 这样的,是为了排除掉运行时的集群属性参数,例如队列参数, |
65 | // 得到用户输入的纯参数,如路径信息等 |
66 | String[] otherArgs = new GenericOptionsParser(conf, args) |
67 | .getRemainingArgs(); |
68 | if (otherArgs.length != 2 ) { |
69 | System.err.println( "Usage: wordcount <in> <out>" ); |
70 | System.exit( 2 ); |
71 | } |
72 | Job job = new Job(conf, "word count" ); |
73 | job.setJarByClass(WordCount. class ); |
74 | // map、reduce 输入输出类 |
75 | job.setMapperClass(TokenizerMapper. class ); |
76 | job.setCombinerClass(IntSumReducer. class ); |
77 | job.setReducerClass(IntSumReducer. class ); |
78 | job.setOutputKeyClass(Text. class ); |
79 | job.setOutputValueClass(IntWritable. class ); |
80 | // 输入输出路径 |
81 | FileInputFormat.addInputPath(job, new Path(otherArgs[ 0 ])); |
82 | FileOutputFormat.setOutputPath(job, new Path(otherArgs[ 1 ])); |
83 | // 多子job的类中,可以保证各个子job串行执行 |
84 | System.exit(job.waitForCompletion( true ) ? 0 : 1 ); |
85 | } |
86 | } |
执行:
1 | bin/hadoop jar /tmp/wordcount.jar WordCount /tmp/3.txt /tmp/5 |
结果:
1 | hadoop fs - cat /tmp/5/* |
2 | aa 1 |
3 | bb 2 |
4 | cc 2 |
5 | dd 1 |
参考资料:
Hadoop - Map/Reduce 通过WordCount例子的变化来了解新版hadoop接口的变化
http://blog.csdn.net/derekjiang/article/details/6836209
Hadoop示例程序WordCount运行及详解
http://samuschen.iteye.com/blog/763940
官方的 wordcount v1.0 例子
http://hadoop.apache.org/docs/r1.1.1/mapred_tutorial.html#Example%3A+WordCount+v1.0
1 | A1 = load '/data/3.txt' ; |
2 | A = stream A1 through `sed "s/\t/ /g" `; |
3 | B = foreach A generate flatten(TOKENIZE((chararray)$0)) as word; |
4 | C = filter B by word matches '\\w+' ; |
5 | D = group C by word; |
6 | E = foreach D generate COUNT (C), group ; |
7 | dump E; |
注意:不同分隔符对load及后面的$0的影响。
1 | create table textlines(text string); |
2 | load data inpath '/data/3.txt' overwrite into table textlines; |
3 | SELECT wordColumn, count (1) FROM textlines LATERAL VIEW explode(split(text, '\t+' )) wordTable AS wordColumn GROUP BY wordColumn; |
详情请见:
1 | #!/usr/bin/python |
2 | import os,re,sys |
3 | for line in sys.stdin: |
4 | for i in line.strip().split( "\t" ): |
5 | print i |
reduce:
01 | #!/usr/bin/python |
02 | import os,re,sys |
03 | arr = {} |
04 | for words in sys.stdin: |
05 | word = words.strip() |
06 | if word not in arr: |
07 | arr[word] = 1 |
08 | else : |
09 | arr[word] + = 1 |
10 | for k, v in arr.items(): |
11 | print str (k) + ": " + str (v) |
最后在shell下执行:
1 | hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming-0.20.203.0.jar - file map.py - file reduce.py -mapper map.py -reducer reduce.py -input /data/3.txt -output /data/py |
注意:脚本开头需要显示指定何种解释器以及赋予脚本执行权限
1 | #!/bin/bash |
2 | tr '\t' '\n' |
reduce:
1 | #!/bin/bash |
2 | sort | uniq -c |
最后在shell下执行:
01 | june@deepin:~/hadoop/hadoop-0.20.203.0/tmp> |
02 | hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming-0.20.203.0.jar - file map.py - file reduce.py -mapper map.py -reducer reduce.py -input /data/3.txt -output /data/py |
03 | packageJobJar: [map.py, reduce.py, /home/june/data_hadoop/tmp/hadoop-unjar2676221286002400849/] [] /tmp/streamjob8722854685251202950.jar tmpDir=null |
04 | 12/10/14 21:57:00 INFO mapred.FileInputFormat: Total input paths to process : 1 |
05 | 12/10/14 21:57:00 INFO streaming.StreamJob: getLocalDirs(): [/home/june/data_hadoop/tmp/mapred/ local ] |
06 | 12/10/14 21:57:00 INFO streaming.StreamJob: Running job: job_201210141552_0041 |
07 | 12/10/14 21:57:00 INFO streaming.StreamJob: To kill this job, run: |
08 | 12/10/14 21:57:00 INFO streaming.StreamJob: /home/june/hadoop/hadoop-0.20.203.0/bin/../bin/hadoop job -Dmapred.job.tracker=localhost:9001 - kill job_201210141552_0041 |
09 | 12/10/14 21:57:00 INFO streaming.StreamJob: Tracking URL: http://localhost:50030/jobdetails.jsp?jobid=job_201210141552_0041 |
10 | 12/10/14 21:57:01 INFO streaming.StreamJob: map 0% reduce 0% |
11 | 12/10/14 21:57:13 INFO streaming.StreamJob: map 67% reduce 0% |
12 | 12/10/14 21:57:19 INFO streaming.StreamJob: map 100% reduce 0% |
13 | 12/10/14 21:57:22 INFO streaming.StreamJob: map 100% reduce 22% |
14 | 12/10/14 21:57:31 INFO streaming.StreamJob: map 100% reduce 100% |
15 | 12/10/14 21:57:37 INFO streaming.StreamJob: Job complete: job_201210141552_0041 |
16 | 12/10/14 21:57:37 INFO streaming.StreamJob: Output: /data/py |
17 | june@deepin:~/hadoop/hadoop-0.20.203.0/tmp> |
18 | hadoop fs - cat /data/py/part-00000 |
19 | 1 aa |
20 | 1 bb |
21 | 1 bb |
22 | 2 cc |
23 | 1 dd |
24 | june@deepin:~/hadoop/hadoop-0.20.203.0/tmp> |
特别提示:上述有些方法对字段后的空格忽略或计算,请注意仔细甄别。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。