赞
踩
导入:import org.apache.hadoop.conf.Configured;
导入:import org.apache.hadoop.util.Tool;。
创建Mapper Class
创建Reducer Class
创建Driver
//读取user表中的数据 ImmutableBytesWritable:key Put:一列数据
public static class ReadUserMapper extends TableMapper<ImmutableBytesWritable, Put> {
@Override
protected void map(ImmutableBytesWritable row, Result value,
Mapper<ImmutableBytesWritable, Result, ImmutableBytesWritable, Put>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
context.write(row, resultToPut(row, value));
}
//和命令:put 'user','10001','info:address','shanghai' 相同
private static Put resultToPut(ImmutableBytesWritable key, Result result) throws IOException {
Put put = new Put(key.get());
for (KeyValue kv : result.raw()) {
put.add(kv);
}
return put;
}
}

public static class WriteBasicReducer extends TableReducer<ImmutableBytesWritable, Put,ImmutableBytesWritable>{
@Override
protected void reduce(ImmutableBytesWritable key, Iterable<Put> values,
Reducer<ImmutableBytesWritable, Put, ImmutableBytesWritable, Mutation>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
for(Put put:values){
context.write(key, put);
}
}
}
Job job = Job.getInstance(getConf(), this.getClass().getName());
job.setJarByClass(this.getClass());
Scan scan = new Scan();
scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
scan.setCacheBlocks(false); // don't set to true for MR jobs
Scan表示全表扫描,setCaching方法表示一次抓取多少条数据,而setCacheBlock 方法表示是否设置缓存,mapReduce千万不设置缓存,所以设置为false。
设置input和mapper
利用TableMapReduceUtil.initTableMapperJob()方法设置input对象和设置映射关系。
TableMapReduceUtil.initTableMapperJob(
"user", // input HBase table name
scan, // Scan instance to control CF and attribute selection
ReadUserMapper.class, // mapper
Text.class, // mapper output key
Put.class, // mapper output value
job);
TableMapReduceUtil.initTableReducerJob(
"basic", // output table
WriteBasicReducer.class, // reducer class
job);
job.setNumReduceTasks(0);
boolean b = job.waitForCompletion(true);
public int run(String[] args) throws Exception {
// TODO Auto-generated method stub
Job job = Job.getInstance(getConf(), this.getClass().getName());
job.setJarByClass(this.getClass());
Scan scan = new Scan();
scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
scan.setCacheBlocks(false); // don't set to true for MR jobs
TableMapReduceUtil.initTableMapperJob(
"user", // input HBase table name
scan, // Scan instance to control CF and attribute selection
ReadUserMapper.class, // mapper
Text.class, // mapper output key
Put.class, // mapper output value
job);
TableMapReduceUtil.initTableReducerJob(
"basic", // output table
WriteBasicReducer.class, // reducer class
job);
job.setNumReduceTasks(0);
boolean b = job.waitForCompletion(true);
return b?0:1;
}

Configuration configuration = HBaseConfiguration.create();
int status = ToolRunner.run(configuration, new User2BasicMapReduce(), args);
System.exit(status);
public static void main(String[] args) throws Exception {
Configuration configuration = HBaseConfiguration.create();
int status = ToolRunner.run(configuration, new User2BasicMapReduce(), args);
System.exit(status);
}
导出上传到linux系统目录下
然后导出jar包,运行该jar包,上传到FileZilla里面然后输入以下命令,验证mapreduce程序的正确性。
export HBASE_HOME=/opt/sofewares/hbase/hbase-0.98.6-hadoop2
export HADOOP_HOME=/opt/cdh5.3.6/hadoop-2.5.0-cdh5.3.6
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp` $HADOOP_HOME/bin/yarn jar $HADOOP_HOME/jars/hbase-mr-user2basic.jar
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。