赞
踩
转化为HBase表的三大来源:RDB Table、Client API、Files
如何构造通用性的代码模板实现向HBase表的转换,是一个值得考虑的问题。这篇文章着重讲解RDB表向HBase表的转换。
首先,我们需要分别构造rdb和hbase的对象,根据批处理的思想,我们可以考虑批量将rdb中的数据导出,并且转化为List<Put>
的格式,直接导入HBase表中,最后释放资源,伪代码模板如下:
rdb=...
hbase=...
rdb.init();
hbase.init();
while(rdb.hasNextBatch()){
List<Put> batch = rdb.nextBatch();
hbase.putBatch(batch);
}
hbase.close();
rdb.close();
内含HBase和RDB转换所有配置信息的配置文件,因为该配置文件是在启动时就需要进行配置,因此我们需要按以下图片进行配置导入配置文件:
public interface RDB extends Com {
// 要提升性能,需要使用批处理
boolean hasNextBatch() throws SQLException;// 是否存在下一个批次
List<Put> nextBatch() throws SQLException;// 一个put代表往一个hbase表的一行的一个列族的一个列插入一条数据,对Hbase来说,批次就是List<Put>
}
public class RDBImpl implements RDB { private static Logger logger = Logger.getLogger(RDBImpl.class); // JDBC 的基本元素:连接对象(装载[驱动]、[URL]、[账号]、[密码])->执行对象(SQL语句)->结果集 private Properties config; /** * 它们需要设置成全局变量的原因是它们需要共享 */ private Connection con; private PreparedStatement pst; private ResultSet rst; // 定义每个批次处理的记录数的最大数量 private int batchSize; // hbase的行键对应rdb的列的列名 private String hbaseRowKeyRdbCol; private Map<String,Map<String,String>> hbaseRdbColMapping; // RDB配置可以灵活地从外部传入(构造方法),从内部读取(config()) public RDBImpl(Properties config) { this.config = config; } @Override public Properties config() { return config; } /** * 内部资源初始化 */ @Override public void init() throws Exception{ con = getConnection(); logger.info("RDB 创建 [ 连接 ] 对象成功"); pst = getStatement(con); logger.info("RDB 创建 [ 执行 ] 对象成功"); rst = getResult(pst); logger.info("RDB 创建 [ 结果集 ] 成功"); batchSize = batchSize(); hbaseRdbColMapping = hbaseRdbColumnsMapping(); } @Override public void close() { closeAll(rst,pst,con); } private String driver(){ return checkAndGetConfig("rdb.driver"); } private String url(){ return checkAndGetConfig("rdb.url"); } private String username(){ return checkAndGetConfig("rdb.username"); } private String password(){ return checkAndGetConfig("rdb.password"); } private String sql(){ return checkAndGetConfig("rdb.sql"); } private int batchSize(){ return Integer.parseInt(checkAndGetConfig("rdb.batchSize")); } // java.sql下的Connection private Connection getConnection() throws ClassNotFoundException, SQLException { // 装载驱动 Class.forName(driver()); // 获取并返回连接对象 return DriverManager.getConnection(url(),username(),password()); } private PreparedStatement getStatement(Connection con) throws SQLException { return con.prepareStatement(sql()); } private ResultSet getResult(PreparedStatement statement) throws SQLException { return statement.executeQuery(); } /** * hbase 列族和列与rdb中列的映射关系 * hbase列族 hbase列 rdb列 * @return Map<String,Map<String,String>> */ private Map<String, Map<String,String>> hbaseRdbColumnsMapping(){ String mapping = checkAndGetConfig("rdb.hbase.columns.mapping"); Map<String,Map<String,String>> map = new HashMap<>(); String[] pss = mapping.split(","); for(String ps : pss){ String[] pp = ps.split("->"); String[] p = pp[0].split(":"); String rdbCol = pp[1],hbaseColFamily,hbaseColName; if(p.length==1){ hbaseRowKeyRdbCol = pp[1]; }else { hbaseColFamily = p[0]; hbaseColName = p[1]; if(!map.containsKey(hbaseColFamily)){ map.put(hbaseColFamily,new HashMap<>()); } map.get(hbaseColFamily).put(hbaseColName,rdbCol); } } return map; } /** * 将RDB的列转化为字节数组(需要确定列的数据类型) * @param rdbColumn * @return * @throws SQLException */ private byte[] toBytesFromRdb(String rdbColumn) throws SQLException { Object obj = rst.getObject(rdbColumn); if(obj instanceof String){ return Bytes.toBytes((String)obj); } else if(obj instanceof Float){ return Bytes.toBytes(((Float)obj).floatValue()); } else if(obj instanceof Double){ return Bytes.toBytes(((Double)obj).doubleValue()); } else if(obj instanceof BigDecimal){ return Bytes.toBytes((BigDecimal)obj); } else if(obj instanceof Short){ return Bytes.toBytes(((Short) obj).shortValue()); } else if(obj instanceof Integer){ return Bytes.toBytes(((Integer)obj).intValue()); } else if(obj instanceof Boolean){ return Bytes.toBytes((Boolean)((Boolean) obj).booleanValue()); } else { throw new SQLException("HBase不支持转化为字节数组的类型:"+obj.getClass().getName()); } } /** * 将HBase的列名或列族名转化为字节数组 * @param name * @return */ private byte[] toBytes(String name){ return Bytes.toBytes(name); } // 最后一个批次的数据最少有一条 @Override public boolean hasNextBatch() throws SQLException{ return rst.next(); } @Override public List<Put> nextBatch() throws SQLException{ // 预先分配容量 List<Put> list = new ArrayList<>(batchSize); int count = 0; do{ /** * 如何将一行解析为多个put(结合配置文件) * 对每条数据,创建一个带行键的put,向put中放入HBase列族名,HBase列名,RDB列名 */ Put put = new Put(toBytesFromRdb(hbaseRowKeyRdbCol)); for (Map.Entry<String, Map<String, String>> e : hbaseRdbColMapping.entrySet()) { String columnFamily = e.getKey(); for (Map.Entry<String, String> s : e.getValue().entrySet()) { String hbaseColumn = s.getKey(); String rdbColumn = s.getValue(); // 需要将内容转变为字节数组传入方法 put.addColumn(toBytes(columnFamily),toBytes(hbaseColumn),toBytesFromRdb(rdbColumn)); } } list.add(put); }while(++count<batchSize && rst.next()); return list; } }
如何理解一行转化为多个put?
结果集的实质?
rst.next() 的两个作用
rst.next();
// 1.判定是否存在下一个有效行
// 2.若存在下一个有效行,则指向该有效行
a. 只通过config作为参数构造rdb
b. 以JDBC为核心,需要连接对象(驱动,URL,账号,密码)=>执行对象(SQL)=>结果集,这些都需要被设计为全局变量(因为需要被共享)
c. 既实现了RDB接口,还实现了RDB的继承接口Com中的init()、close()
进行资源的初始化和释放,checkAndGetConfig()
根据传入的配置文件获取配置信息并且赋值给全局变量。
d. 重点:我们还需要对RDB和HBase的映射关系进行解析,最终解析出RDB列名,HBase列族名,HBase列名,具体如何解析参考配置文件transfer.properties
,并将解析出来的名字构造成一个Put对象,由于构造Put对象只能放字节数组,所以需要转化为字节数组的方法,又因为解析RDB的列名需要考虑列的数据类型,而解析HBase的列族或列名不需要考虑,因此需要有两个转换方法==ToBytesFromRDB()和ToBytes()==分别实现两种情况的字节数组转化。
public interface HBase extends Com {
// RDBImpl的nextBatch()返回的就是List<Put>,直接放入HBase表即可。
void putBatch(List<Put> batch) throws IOException;
}
public class HBaseImpl implements HBase { private static Logger loggerHBase = Logger.getLogger(HBaseImpl.class); private Properties config; private Connection con; private Table hbaseTable; public HBaseImpl(Properties config) { this.config = config; } @Override public Properties config() { return config; } @Override public void init() throws Exception { con = getCon(); loggerHBase.info("HBase 创建 [ 连接 ] 成功"); hbaseTable = checkAndGetTable(con); loggerHBase.info("HBase 创建 [ 数据表 ] 成功"); } @Override public void close() { closeAll(hbaseTable,con); } private String tableName(){ return checkAndGetConfig("hbase.table.name"); } private String zkUrl(){ return checkAndGetConfig("hbase.zk"); } private Connection getCon() throws IOException { // hadoop.conf的configuration Configuration config = HBaseConfiguration.create(); config.set("hbase.zookeeper.quorum",zkUrl()); return ConnectionFactory.createConnection(config); } private Table checkAndGetTable(Connection con) throws IOException { /** * Admin : HBase DDL */ Admin admin = con.getAdmin(); TableName tableName = TableName.valueOf(tableName()); // 通过tableName判定表是否存在 if(!admin.tableExists(tableName)){ throw new IOException("HBase表不存在异常:"+tableName); } /** * Table : HBase DML & DQL */ // 传入的参数可以是TableName tableName,ExecutorService pool(表操作可以并发) return con.getTable(tableName); } @Override public void putBatch(List<Put> batch) throws IOException{ hbaseTable.put(batch); } }
HBase的实现类和RDB的实现类也非常类似:
先重写HBase接口中的方法和Com接口中的方法,发现往里放数据需要构造一个Table对象,而Table对象的构建需要一个连接对象和TableName,因此在构造了两个方法tableName()获取配置信息中的TableName(注意:此时的TableName是字符串类型),zkUrl()获取zk.url作为配置构造连接对象。
public interface Com { Logger logger = Logger.getLogger(Com.class); // 获取配置对象 Properties config(); // 初始化资源 void init() throws Exception; // 释放资源 void close(); default String checkAndGetConfig(String key){ if(!config().containsKey(key)){ // 因为该方法可能被用于HBase和RDB throw new RuntimeException("配置项缺失异常:"+key); } String item = config().getProperty(key); logger.info(String.format("获取配置项 %s : %s",key,item)); return item; } default void closeAll(AutoCloseable...acs){ for (AutoCloseable ac : acs) { if (Objects.nonNull(ac)) { try { ac.close(); logger.info(String.format("释放 %s 成功",ac.getClass().getName())); } catch (Exception e) { logger.error("释放资源异常:"+e); } } } } }
在Com接口中,设计了一些普通方法config()
实现配置的导出,init()、close()
资源的初始化和关闭;同样还设计了一些无需实现的默认方法便于实现init()和close()
方法。这些方法适用于RDB和HBase的实现类。
public interface RDBToHBase {
// 创建一个RDB对象
void setRDB(RDB rdb);
// 创建一个HBase对象
void setHBase(HBase hbase);
// 进行数据的传输
void startTransfer();
}
public class RDBToHBaseImpl implements RDBToHBase { // 日志显示 private static Logger loggerRH = Logger.getLogger(RDBToHBaseImpl.class); private RDB rdb; private HBase hbase; @Override public void setRDB(RDB rdb) { this.rdb = rdb; } @Override public void setHBase(HBase hbase) { this.hbase = hbase; } @Override public void startTransfer() { try { rdb.init(); loggerRH.info("RDB 初始化成功"); hbase.init(); loggerRH.info("HBase 初始化成功"); loggerRH.info("数据从 RDB 迁移至 HBase 开始..."); int count = 0; while (rdb.hasNextBatch()) { final List<Put> batch = rdb.nextBatch(); hbase.putBatch(batch); loggerRH.info(String.format("第 %d 批:%d 条数据插入成功",++count,batch.size())); } loggerRH.info("数据从 RDB 迁移至 HBase 结束..."); } catch (Exception e){ loggerRH.error("将 RDB 数据批量迁移至 HBase 异常",e); } finally{ hbase.close(); rdb.close(); } } }
public class AppRDBToHBase { private static Logger logger = Logger.getLogger(AppRDBToHBase.class); private static void start(String[] args){ try { if (Objects.isNull(args) || args.length == 0 || Objects.isNull(args[0])) { throw new NullPointerException("配置文件路径空指针异常"); } final String PATH = args[0]; final File file = new File(PATH); if (!file.exists() || file.length() == 0 || !file.canRead()) { throw new IOException("配置文件不存在、不可读、空白"); } Properties config = new Properties(); // final String path = args[0]; config.load(new FileReader(file)); RDB rdb = new RDBImpl(config); HBase hBase = new HBaseImpl(config); RDBToHBase rdbToHBase = new RDBToHBaseImpl(); rdbToHBase.setRDB(rdb); rdbToHBase.setHBase(hBase); rdbToHBase.startTransfer(); }catch(Exception e){ logger.error("配置异常",e); } } public static void main( String[] args ) { start(args); } }
对于传入的配置文件路径,既要检查路径本身,也要检查路径代表的文件本身。
通过流的方式将文件进行配置,并且利用该配置构造RDB和HBase并进行数据的传输
logger.error()、logger.info()、logger.warn()
,除了对错误信息进行输出之外,也要习惯于补充正常信息的输出,以增强代码的可读性。Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。