赞
踩
项目源码:https://github.com/cw1322311203/hbasedemo/tree/master/hbase-mr-mysql
目标:对HBase中的student表数据的value值进行wordcount,并写入MySQL
前置准备:
在maven中加入MySQL jdbc驱动包
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.25</version>
</dependency>
运行MapReduce时候报错:java.io.IOException: com.mysql.jdbc.Driver,一般是由于程序找不到mysql驱动包。解决方法是让每个tasktracker运行MapReduce程序时都可以找到该驱动包。
添加包有两种方式:
> CDH版本的放到/opt/cloudera/parcels/CDH-5.16.2-1.cdh5.16.2.p0.8/lib/hadoop/lib中
首先把包传到集群上:
$ hadoop fs -put mysql-connector-java-5.1.0-bin.jar /hdfsPath/
在mr程序提交job前,添加语句:
DistributedCache.addFileToClassPath(new Path(“/hdfsPath/mysql-connector-java- 5.1.0-bin.jar”), conf);
MySQL中的数据库和表要实现创建好
为了方便 MapReduce 直接访问关系型数据库(Mysql,Oracle),Hadoop提供了DBInputFormat和DBOutputFormat两个类。通过DBInputFormat类把数据库表数据读入到HDFS,根据DBOutputFormat类把MapReduce产生的结果集导入到数据库表中。
具体代码:
主类
package com.cw.bigdata.mr3;
import com.cw.bigdata.mr3.tool.HBase2MysqlTool;
import org.apache.hadoop.util.ToolRunner;
public class HBase2MysqlApplication {
public static void main(String[] args) throws Exception {
ToolRunner.run(new HBase2MysqlTool(), args);
}
}
Tool类
package com.cw.bigdata.mr3.tool; import com.cw.bigdata.mr3.bean.CacheData; import com.cw.bigdata.mr3.format.MysqlOutputFormat; import com.cw.bigdata.mr3.mapper.ScanHbaseMapper; import com.cw.bigdata.mr3.reducer.Hbase2MysqlReducer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobStatus; import org.apache.hadoop.util.Tool; public class HBase2MysqlTool implements Tool { public int run(String[] args) throws Exception { Job job = Job.getInstance(); job.setJarByClass(HBase2MysqlTool.class); // mapper TableMapReduceUtil.initTableMapperJob( "student", new Scan(), ScanHbaseMapper.class, Text.class, CacheData.class, job ); // reducer job.setReducerClass(Hbase2MysqlReducer.class); job.setOutputKeyClass(Text.class); job.setMapOutputValueClass(CacheData.class); job.setOutputFormatClass(MysqlOutputFormat.class); return job.waitForCompletion(true) ? JobStatus.State.SUCCEEDED.getValue() : JobStatus.State.FAILED.getValue(); } public void setConf(Configuration configuration) { } public Configuration getConf() { return null; } }
Mapper类
package com.cw.bigdata.mr3.mapper; import com.cw.bigdata.mr3.bean.CacheData; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Text; import java.io.IOException; public class ScanHbaseMapper extends TableMapper<Text, CacheData> { @Override protected void map(ImmutableBytesWritable key, Result result, Context context) throws IOException, InterruptedException { for (Cell cell : result.rawCells()) { String name = Bytes.toString(CellUtil.cloneValue(cell)); CacheData data = new CacheData(); data.setName(name); data.setCount(1); System.out.println(name); context.write(new Text(name), data); } } }
Reducer类
package com.cw.bigdata.mr3.reducer; import com.cw.bigdata.mr3.bean.CacheData; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class Hbase2MysqlReducer extends Reducer<Text, CacheData, Text, CacheData> { @Override protected void reduce(Text key, Iterable<CacheData> datas, Context context) throws IOException, InterruptedException { int sum = 0; for (CacheData data : datas) { sum += data.getCount(); } CacheData sumData = new CacheData(); sumData.setName(key.toString()); sumData.setCount(sum); System.err.println(sumData.getName() + ":" + sumData.getCount()); context.write(key, sumData); } }
bean对象
package com.cw.bigdata.mr3.bean; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class CacheData implements WritableComparable<CacheData> { private String name; private int count; public String getName() { return name; } public void setName(String name) { this.name = name; } public int getCount() { return count; } public void setCount(int count) { this.count = count; } public int compareTo(CacheData data) { return name.compareTo(data.name); } public void write(DataOutput out) throws IOException { out.writeUTF(name); out.writeInt(count); } public void readFields(DataInput in) throws IOException { name = in.readUTF(); count = in.readInt(); } }
format类
package com.cw.bigdata.mr3.format; import com.cw.bigdata.mr3.bean.CacheData; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.SQLException; public class MysqlOutputFormat extends OutputFormat<Text, CacheData> { class MysqlRecordWriter extends RecordWriter<Text, CacheData> { private static final String MYSQL_DRIVER_CLASS = "com.mysql.jdbc.Driver"; //private static final String MYSQL_URL = "jdbc:mysql://localhost:3306/company?useUnicode=true&characterEncoding=UTF-8"; private static final String MYSQL_URL = "jdbc:mysql://192.168.139.101:3306/company?useUnicode=true&characterEncoding=UTF-8"; private static final String MYSQL_USERNAME = "root"; private static final String MYSQL_PASSWORD = "123456"; private Connection connection; public MysqlRecordWriter() { try { Class.forName(MYSQL_DRIVER_CLASS); connection = DriverManager.getConnection(MYSQL_URL, MYSQL_USERNAME, MYSQL_PASSWORD); } catch (Exception e) { e.printStackTrace(); } } public void write(Text key, CacheData data) throws IOException, InterruptedException { String sql = "insert into statresult (name,sumcnt) values(?,?)"; PreparedStatement preparedStatement = null; try { preparedStatement = connection.prepareStatement(sql); preparedStatement.setObject(1, key.toString()); preparedStatement.setObject(2, data.getCount()); preparedStatement.executeUpdate(); } catch (Exception e) { e.printStackTrace(); } finally { if (preparedStatement != null) { try { preparedStatement.close(); } catch (SQLException e) { e.printStackTrace(); } } } } public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { if (connection != null) { try { connection.close(); } catch (SQLException e) { e.printStackTrace(); } } } } public RecordWriter<Text, CacheData> getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { return new MysqlRecordWriter(); } public void checkOutputSpecs(JobContext jobContext) throws IOException, InterruptedException { } private FileOutputCommitter committer = null; public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException { if (committer == null) { Path output = getOutputPath(context); committer = new FileOutputCommitter(output, context); } return committer; } public static Path getOutputPath(JobContext job) { String name = job.getConfiguration().get(FileOutputFormat.OUTDIR); return name == null ? null : new Path(name); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。