赞
踩
直接上代码:Mapper
- package com.winksi.dotstat;
- import java.io.IOException;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.mapreduce.Mapper;
-
- public class DotProducedMapper extends Mapper<LongWritable, Text, Text, Text>{
- @Override
- public void map(LongWritable key, Text value, Context context)
- throws IOException, InterruptedException{
- String line = value.toString();
- String fileds[] = line.split(",");
- String imsi = fileds[0];
- String adccompany = fileds[1];
- String phone = fileds[2];
- String cur_time = fileds[3];
- String call_length = fileds[4];
- String call_type = fileds[5];
- String show_length = fileds[6];
- String pkgname = fileds[7];
- String ip = fileds[8];
- String model = fileds[9];
- String ossdk = fileds[10];
- String av = fileds[11];
- String cityId = fileds[12];
- StringBuffer strKey = new StringBuffer();
- strKey.append(model).append(",").append(adccompany).append(",")
- .append(ossdk.toUpperCase()).append(",").append(av).append(",")
- .append(phone);
- context.write(new Text(strKey.toString()), new Text(imsi));
- }
- }
-
Reducer:
- package com.winksi.dotstat;
-
- import java.io.IOException;
- import java.util.Date;
- import java.util.HashSet;
- import java.util.Iterator;
- import java.util.Set;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Reducer;
-
- public class DotProducedReducer extends Reducer<Text, Text, Text, Text>{
- public void reduce(Text key, Iterator<Text> values,
- Context context)
- throws IOException, InterruptedException {
- int count = 0;
- Set<String> set = new HashSet<String>();
- while (values.hasNext()) {
- set.add(values.next().toString());
- count++;
- }
- StringBuffer sb = new StringBuffer();
- sb.append(count).append(",").append(set.size());
- Date yesterdayDate = new Date(new Date().getTime() - 1*24*60*60*1000);
- String yesterday = DotStatPvUv.getCurrentDay(yesterdayDate);
- StringBuffer sbKey = new StringBuffer();
- sbKey.append(key.toString()).append(",").append(yesterday);
- context.write(new Text(sbKey.toString()), new Text(sb.toString()));
- }
- }
-
Spring配置文件:
- <?xml version="1.0" encoding="UTF-8"?>
- <beans:beans xmlns="http://www.springframework.org/schema/hadoop"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xmlns:beans="http://www.springframework.org/schema/beans"
- xmlns:context="http://www.springframework.org/schema/context"
- xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
- http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
- http://www.springframework.org/schema/hadoop http://www.springframework.org/schema/hadoop/spring-hadoop.xsd">
- <context:property-placeholder location="classpath:config.properties" />
- <configuration>
- fs.default.name=${hd.fs}
- </configuration>
-
- <job id="dotProductJob"
- input-path="${input.path}"
- output-path="${output.path}"
- mapper="com.winksi.dotstat.DotProducedMapper"
- reducer="com.winksi.dotstat.DotProducedReducer"/>
-
- <job-runner id="runner" run-at-startup="true"
- job-ref="dotProductJob" />
- </beans:beans>
maven的jar包pom.xml
- <dependency>
- <groupId>org.springframework.data</groupId>
- <artifactId>spring-data-hadoop</artifactId>
- <version>1.0.2.RELEASE-cdh4</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-core</artifactId>
- <version>${hadoop.core.version}</version>
- </dependency>
java代码:
public class Test {
public static void main(String[] args) {
ApplicationContext ctx = new ClassPathXmlApplicationContext("hadoop-mr.xml");
}
}
运行报错:
Exception in thread "main" java.io.IOException: Cannot run program "cygpath": CreateProcess error=2, ϵͳÕҲ»µ½ָ¶ at java.lang.ProcessBuilder.start(ProcessBuilder.java:459) at org.apache.hadoop.util.Shell.runCommand(Shell.java:201) at org.apache.hadoop.util.Shell.run(Shell.java:183) at org.apache.hadoop.fs.FileUtil$CygPathCommand.<init>(FileUtil.java:413) at org.apache.hadoop.fs.FileUtil.makeShellPath(FileUtil.java:439) at org.apache.hadoop.fs.FileUtil.makeShellPath(FileUtil.java:466) at org.apache.hadoop.fs.RawLocalFileSystem.execCommand(RawLocalFileSystem.java:559) at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:551) at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:355) at org.apache.hadoop.fs.FilterFileSystem.mkdirs(FilterFileSystem.java:212)原因是尽管连接的hadoop平台是在linux上面,但是我们的eclipse调试环境是在windows,我们需要安装一个linux模拟器“cygwin”来支持程序的运行。
在windows开发服务器上安装好cygwin,然后在环境变量中添加cygwin的bin目录,比如“D:\Program\cygwin\bin”,问题得以解决。
注意是在系统环境变量的Path上加入“D:\Program\cygwin\bin”这样我们就行像在linux上运行命令一样在windows的dos窗口下执行ls,cd等命令。刚开始我一直是在系统环境变量下的classpath上加的D:\Program\cygwin\bin”,尝试了半天都是失败,后来才发现,是加错了地方。加完之后再cmd窗口中试一下ls,cd等命令。成功后重启myeclipse运行代码,就没有问题了。
------------------------------------------------------------------------------
之前纠结集成的Mapper和Reducer的问题,有两个文件可以继承,具体问题见:
http://wuyanzan60688.blog.163.com/blog/static/127776163201310164244955/
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。