赞
踩
想,全是问题;做,全是办法。
想深入交流doris的私聊我,加微信
做实时数仓的同学对目前比较流行的KFC(Kafka\Flink\ClickHouse)套餐非常熟悉,其实KFD也不错。
大数据组件越来越丰富,但是还没有出现一个兼容OLAP和OLTP的工具,即满足DB和日志的实时存储和复杂查询,又能满足在此基础上的数仓建设,我们尝试过ClickHouse,缺点在于难维护、实时写入效率低,内部碎片合并和数据走zk难以实现大量数据的实时存储;之后使用过impala+kudu,缺点是impala实在是太占用内存,两者结合用起来比较费劲,也是开发了实时同步DB的工具,维护成本太高,也放弃了;最终在参考百度的doris和作业帮的资料下,正式的开始使用Doris,实现了log和DB(包含分表合并)的准实时同步,以及基于doris的数仓建模。
接下来我会就Doris的实时写入部分简单的说一下实现方式,代码和注释为主
不要把字段设计成"not null",好处在于后期改表(加字段)不会影响正常的数据,其他的暂时不方便透露,之后会慢慢讲
为什么选择StreamLoad呢?一开始使用的是insert into,insert into是使用的FE资源的,导致FE繁忙,后期数据量上来会出问题,而streamload不存在这个问题,官方是这么说的(0.12的文档):
Stream load 中,Doris 会选定一个节点作为 Coordinator 节点。该节点负责接数据并分发数据到其他数据节点。
用户通过 HTTP 协议提交导入命令。如果提交到 FE,则 FE 会通过 HTTP redirect 指令将请求转发给某一个 BE。用户也可以直接提交导入命令给某一指定 BE。
导入的最终结果由 Coordinator BE 返回给用户。
^ +
| |
| | 1A. User submit load to FE
| |
| +--v-----------+
| | FE |
- Return result to user | +--+-----------+
| |
| | 2. Redirect to BE
| |
| +--v-----------+
+---+Coordinator BE| 1B. User submit load to BE
+-+-----+----+-+
| | |
+-----+ | +-----+
| | | 3. Distrbute data
| | |
+-v-+ +-v-+ +-v-+
|BE | |BE | |BE |
+---+ +---+ +---+
之后参考京东做法,不断的load小文件实现,实时的数据插入。
踩坑:
实现起来比较简单,无非是在flinkSink代码中嵌入一段执行curl的代码
## 原curl
curl --location-trusted -u 用户名:密码 -T /xxx/test -H "format: json" -H "strip_outer_array: true" http://doris_fe:8030/api/{database}/{table}/_stream_load
## -u 不用解释了,用户名和密码
## -T json文件的地址,内容为[json,json,json],就是jsonlist
## -H 指定参数
## http 指定库名和表名
步骤:生成临时文件createFile
,将数据写入临时文件mappedFile
,执行execCurl
, 删除临时文件deleteFile
(简化版)
/**
* 创建临时内存文件
* @param fileName
* @throws IOException
*/
public static void createFile(String fileName) throws IOException {
File testFile = new File(fileName);
File fileParent = testFile.getParentFile();
if (!fileParent.exists()) {
fileParent.mkdirs();
}
if (!testFile.exists())
testFile.createNewFile();
}
/**
* 删除临时内存文件
* @param fileName
* @return
*/
public static boolean deleteFile(String fileName) {
boolean flag = false;
File file = new File(fileName);
// 路径为文件且不为空则进行删除
if (file.isFile() && file.exists()) {
file.delete();
flag = true;
}
return flag;
}
/**
* 写入内存文件
* @param data
* @param path
*/
public static void mappedFile(String data, String path) {
CharBuffer charBuffer = CharBuffer.wrap(data);
try {
FileChannel fileChannel = FileChannel.open(Paths.get(path), StandardOpenOption.READ, StandardOpenOption.WRITE,
StandardOpenOption.TRUNCATE_EXISTING);
MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, data.getBytes().length*4);
if (mappedByteBuffer != null) {
mappedByteBuffer.clear();
mappedByteBuffer.put(Charset.forName("UTF-8").encode(charBuffer));
}
fileChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 执行curl
* @param curl
* @return
*/
public static String execCurl(String[] curl) {
ProcessBuilder process = new ProcessBuilder(curl);
Process p;
try {
p = process.start();
BufferedReader reader = new BufferedReader(new InputStreamReader(p.getInputStream()));
StringBuilder builder = new StringBuilder();
String line = null;
while ((line = reader.readLine()) != null) {
builder.append(line);
builder.append(System.getProperty("line.separator"));
}
return builder.toString();
} catch (IOException e) {
System.out.print("error");
e.printStackTrace();
}
return null;
}
/**
* 生成Culr
* @param filePath
* @param databases
* @param table
* @return
*/
public static String[] createCurl(String filePath, String databases, String table){
String[] curl = {"curl","--location-trusted", "-u", "用户名:密码", "-T",filePath, "-H","format: json", "-H", "strip_outer_array: true", "http://doris_fe:8030/api/"+databases+"/"+table+"/_stream_load"};
return curl;
}
实现自定义Sink比较简单,这里就简单的分享一下我的怎么写的(简化版)。
class LogCurlSink(insertTimenterval:Long,
insertBatchSize:Int) extends RichSinkFunction[(String, Int, Long, String)] with Serializable{
private val Logger = LoggerFactory.getLogger(this.getClass)
private val mesList = new java.util.ArrayList[String]()
private var lastInsertTime = 0L
override def open(parameters: Configuration): Unit ={
val path = s"/tmp/doris/{databases}/{table}/{ThreadId}"
CurlUtils.createFile(path)
Logger.warn(s"init and create $topic filePath!!!")
}
// (topic,partition,offset,jsonstr)
override def invoke(value: (String, Int, Long, String), context: SinkFunction.Context[_]): Unit = {
if(mesList.size >= this.insertBatchSize || isTimeToDoInsert){
//存入
insertData(mesList)
//此处可以进行受到维护offset
mesList.clear()
this.lastInsertTime = System.currentTimeMillis()
}
mesList.add(value._4)
}
override def close(): Unit = {
val path = s"/tmp/doris/{databases}/{table}/{ThreadId}"
CurlUtils.deleteFile(path)
Logger.warn("close and delete filePath!!!")
}
/**
* 执行插入操作
* @param dataList
*/
private def insertData(dataList: java.util.ArrayList[String]): Unit ={
略
}
/**
* 根据时间判断是否插入数据
*
* @return
*/
private def isTimeToDoInsert = {
val currTime = System.currentTimeMillis
currTime - this.lastInsertTime >= this.insertCkTimenterval
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。