赞
踩
公司要搭建数据中台,离线数据采集就是其中重要一环,目前是基于alibaba的datax组件来搞得。datax的优势,支持多源数据库之间相互同步,日志信息非常清楚,方便定位错误。而且基于datax二次开发也比较简单(实现job,task接口)。
目前的缺点,对于hive的支持力度没那么大,目前官方只实现了hdfs的读写。对于如何基于datax写入hive本文做了一些自己尝试的方法。
由于spark,flink代码,只需要实现数据的读入转化为相应的数据结构,就可以通过spark-sql,flink-sql进行多源的数据处理。我们之前的做法是直接通过spark,flinkjar 包的方式,通过传入一个json路径,实现的同步。
但是datax的插件的开发,是reader,writer接口独立开发的,而且数据也是一条条处理的,那么spark肯定不行了。flink的话,需要初始化flink环境,再去实现一个从接口拿去数据的sink,最后souce到hive上,感觉太过于笨重了。
flume支持把数据实时写入hive表,但是hive表必须是事务表。我这边通过hive.hcatelog写入了1千万数据(两列),几秒就写完了。因此感觉这个方案超级可行。
将hdfs的文件导入hive,datax实现了hdfs的写入,只需要加一个hdfs导入hive就ok了,这个时候通过jdbc连接hive,执行load命令就ok了。
需要修改源码的地方,datax在写入hdfs的时候,必须要一个存在的hdfs目录。指定的分区路径如果不存在就需要手工创建一个。在task实现完毕后,通过jdbcload进hive。
@Override public void prepare() { //增加新增目录 LOG.info(String.format("没有目录,咱们就创建目录-------[%s]",path.toString())); Path add_path=new Path(path); if (!hdfsHelper.isPathexists(path)) { hdfsHelper.addDir(add_path); } //若路径已经存在,检查path是否是目录 if(hdfsHelper.isPathexists(path)){ if(!hdfsHelper.isPathDir(path)){ throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE, String.format("您配置的path: [%s] 不是一个合法的目录, 请您注意文件重名, 不合法目录名等情况.", path)); } //根据writeMode对目录下文件进行处理 Path[] existFilePaths = hdfsHelper.hdfsDirList(path,fileName); boolean isExistFile = false; if(existFilePaths.length > 0){ isExistFile = true; } /** if ("truncate".equals(writeMode) && isExistFile ) { LOG.info(String.format("由于您配置了writeMode truncate, 开始清理 [%s] 下面以 [%s] 开头的内容", path, fileName)); hdfsHelper.deleteFiles(existFilePaths); } else */ if ("append".equalsIgnoreCase(writeMode)) { LOG.info(String.format("由于您配置了writeMode append, 写入前不做清理工作, [%s] 目录下写入相应文件名前缀 [%s] 的文件", path, fileName)); } else if ("nonconflict".equalsIgnoreCase(writeMode) && isExistFile) { LOG.info(String.format("由于您配置了writeMode nonConflict, 开始检查 [%s] 下面的内容", path)); List<String> allFiles = new ArrayList<String>(); for (Path eachFile : existFilePaths) { allFiles.add(eachFile.toString()); } LOG.error(String.format("冲突文件列表为: [%s]", StringUtils.join(allFiles, ","))); throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE, String.format("由于您配置了writeMode nonConflict,但您配置的path: [%s] 目录不为空, 下面存在其他文件或文件夹.", path)); } } }
打包命令在项目目录下运行 mvn -U clean package assembly:assembly -Dmaven.test.skip=true
可以在pom.xml注释其他的module让整个打包进度快一点,千万不要删掉了这两个模块
package com.alibaba.datax.plugin.writer.hivewriter; import com.alibaba.datax.common.element.Column; import com.alibaba.datax.common.element.Record; import com.alibaba.datax.common.exception.DataXException; import com.alibaba.datax.common.plugin.RecordReceiver; import com.alibaba.datax.common.spi.Writer; import com.alibaba.datax.common.util.Configuration; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hive.hcatalog.streaming.HiveEndPoint; import org.apache.hive.streaming.HiveStreamingConnection; import org.apache.hive.streaming.StreamingConnection; import org.apache.hive.streaming.StrictDelimitedInputWriter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.*; public class HiveHCatelogWriter extends Writer { public static class Job extends Writer.Job { private static final Logger LOG = LoggerFactory.getLogger(Job.class); private Configuration writerSliceConfig = null; private String writeMode; private String metaStoreUri; private String database; private String table; private String ds; private List<String> pars; @Override public void init() { this.writerSliceConfig = this.getPluginJobConf(); this.validateParameter(); try { HiveMateUtil.createHiveMateUtil(this.metaStoreUri); if (!HiveMateUtil.hiveMetaStoreClient.tableExists(this.database, this.table)) { throw DataXException.asDataXException(HiveHcatelogWriterErrorCode.CONNECT_HIVESTREAMING_ERROR,String.format("表不存在:%s.%s",this.database,this.table)); } if(this.writeMode.equals("overwrite")) { HiveMateUtil.hiveMetaStoreClient.truncateTable(this.database, this.table, this.pars); } }catch (Exception e){ LOG.error(e.getMessage()); throw DataXException.asDataXException(HiveHcatelogWriterErrorCode.CONNECT_HIVESTREAMING_ERROR,e.getMessage()); }finally { HiveMateUtil.closeHiveMateUtil(); } } private void validateParameter() { this.metaStoreUri=this.writerSliceConfig.getNecessaryValue(Key.META_STORE_URI, HiveHcatelogWriterErrorCode.REQUIRED_VALUE); this.database=this.writerSliceConfig.getNecessaryValue(Key.DATABASE, HiveHcatelogWriterErrorCode.REQUIRED_VALUE); this.table=this.writerSliceConfig.getNecessaryValue(Key.TABLE, HiveHcatelogWriterErrorCode.REQUIRED_VALUE); this.writeMode=this.writerSliceConfig.getNecessaryValue(Key.WRITE_MODE, HiveHcatelogWriterErrorCode.REQUIRED_VALUE); this.pars = new ArrayList<String>(); if (!StringUtils.isEmpty(this.ds)) { String par1=this.ds.replaceAll(",","/"); this.pars.add(par1); } } @Override public void prepare() { } @Override public void post() { } @Override public void destroy() { } @Override public List<Configuration> split(int mandatoryNumber) { List<Configuration> configurations = new ArrayList<>(); for(int i=0;i<mandatoryNumber;i++) { configurations.add(this.writerSliceConfig); } return configurations; } } public static class Task extends Writer.Task { private static final Logger LOG = LoggerFactory.getLogger(Task.class); private Configuration writerSliceConfig; private StreamingConnection connection; private HiveConf hiveConf; private String metaStoreUri; private String databse; private String table; private String ds; private char FieldDelimiter = '\001'; @Override public void init() { this.writerSliceConfig = this.getPluginJobConf(); this.metaStoreUri = this.writerSliceConfig.getString(Key.META_STORE_URI); this.databse = this.writerSliceConfig.getString(Key.DATABASE); this.table = this.writerSliceConfig.getString(Key.TABLE); this.ds = this.writerSliceConfig.getString(Key.DS, null); hiveConf = createHiveConf(HiveEndPoint.class, this.metaStoreUri); StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder() .withFieldDelimiter(FieldDelimiter) .build(); try { if (StringUtils.isEmpty(this.ds)) { connection = HiveStreamingConnection.newBuilder().withHiveConf(hiveConf) .withDatabase(this.databse) .withTable(this.table) .withAgentInfo("example-agent-1") .withRecordWriter(writer) .connect(); } else { List<String> pars = new ArrayList<String>(); for (String parStr : ds.split(",")) { pars.add(parStr.split("=")[1]); } connection = HiveStreamingConnection.newBuilder().withHiveConf(hiveConf) .withDatabase(this.databse) .withTable(this.table) .withStaticPartitionValues(pars) .withAgentInfo("example-agent-1") .withRecordWriter(writer) .connect(); } } catch (Exception e) { LOG.error(e.getMessage()); throw DataXException.asDataXException(HiveHcatelogWriterErrorCode.CONNECT_HIVESTREAMING_ERROR, e.getMessage()); } } @Override public void prepare() { } @Override public void startWrite(RecordReceiver lineReceiver) { LOG.info("begin do write..."); Record record; try { connection.beginTransaction(); while ((record = lineReceiver.getFromReader()) != null) { int i=record.getColumnNumber(); connection.write(recordToBytes(record)); } connection.commitTransaction(); }catch (Exception e){ LOG.error(e.getMessage()); throw new DataXException(HiveHcatelogWriterErrorCode.HIVE_WRITE_ERROR,"hive写入数据失败"); } LOG.info("end do write"); } @Override public void post() { } @Override public void destroy() { } private byte[] recordToBytes(Record record) { int recordLength = record.getColumnNumber(); if (0 == recordLength) { return new byte[0]; } Column column; StringBuilder sb = new StringBuilder(); for (int i = 0; i < recordLength; i++) { column = record.getColumn(i); sb.append(column.asString()).append(String.valueOf(this.FieldDelimiter)); } sb.setLength(sb.length() - String.valueOf(this.FieldDelimiter).length()); return sb.toString().getBytes(); } private HiveConf createHiveConf(Class<?> clazz, String metaStoreUri) { HiveConf conf = new HiveConf(clazz); if (metaStoreUri != null) { setHiveConf(conf, HiveConf.ConfVars.METASTOREURIS, metaStoreUri); } setHiveConf(conf, HiveConf.ConfVars.HIVE_TXN_MANAGER, "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager"); setHiveConf(conf, HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, true); setHiveConf(conf, HiveConf.ConfVars.METASTORE_EXECUTE_SET_UGI, true); // Avoids creating Tez Client sessions internally as it takes much longer currently setHiveConf(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "mr"); return conf; } private void setHiveConf(HiveConf conf, HiveConf.ConfVars var, String value) { conf.setVar(var, value); } private void setHiveConf(HiveConf conf, HiveConf.ConfVars var, boolean value) { conf.setBoolVar(var, value); } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。