赞
踩
Doris版本:0.15.0-rc04
当MySQL端批量进行Delete或Update操作,产生大量Binlog,进入到Flink实时同步任务中,Flink实时同步任务通过拼装INSERT INTO语句,批量执行数据同步,这时,就有可能会导致Doris的数据版本超过了最大的限制,后续操作将会被拒绝
官网说明:
Q4. tablet writer write failed, tablet_id=27306172, txn_id=28573520, err=-235 or -215 or -238
这个错误通常发生在数据导入操作中。新版错误码为 -235,老版本错误码可能是 -215。
这个错误的含义是,对应tablet的数据版本超过了最大限制(默认500,由 BE 参数 max_tablet_version_num 控制),后续写入将被拒绝。
比如问题中这个错误,即表示 27306172 这个tablet的数据版本超过了限制。
这个错误通常是因为导入的频率过高,大于后台数据的compaction速度,导致版本堆积并最终超过了限制。此时,我们可以先通过show tablet 27306172 语句,然后执行结果中的 show proc 语句,查看tablet各个副本的情况。
结果中的 versionCount即表示版本数量。如果发现某个副本的版本数量过多,则需要降低导入频率或停止导入,并观察版本数是否有下降。
如果停止导入后,版本数依然没有下降,则需要去对应的BE节点查看be.INFO日志,搜索tablet id以及 compaction关键词,检查compaction是否正常运行。
关于compaction调优相关,可以参阅 ApacheDoris 公众号文章:Doris 最佳实践-Compaction调优(3)
-238 错误通常出现在同一批导入数据量过大的情况,从而导致某一个 tablet 的 Segment 文件过多(默认是 200,由 BE 参数 max_segment_num_per_rowset 控制)。此时建议减少一批次导入的数据量,或者适当提高 BE 配置参数值来解决。
SET show_hidden_columns=true
,之后使用desc tablename
,如果输出中有__DORIS_DELETE_SIGN__
列则支持,如果没有则不支持启用批量删除支持有一下两种形式:
ALTER TABLE tablename ENABLE FEATURE "BATCH_DELETE"
来启用批量删除。本操作本质上是一个schema change 操作,操作立即返回,可以通过show alter table column 来确认操作是否完成import com.alibaba.fastjson.JSONObject; import com.platform.common.model.RespContent; import lombok.extern.slf4j.Slf4j; import java.io.*; import java.net.HttpURLConnection; import java.net.URL; import java.util.*; @Slf4j public class DorisUltils { private final static List<String> DORIS_SUCCESS_STATUS = new ArrayList<>(Arrays.asList("Success", "Publish Timeout")); public static String loadUrlStr = "http://%s/api/%s/%s/_stream_load"; public static void loadBatch(String data, String jsonformat, String columns, String loadUrlStr, String mergeType, String db, String tbl, String authEncoding) { Calendar calendar = Calendar.getInstance(); //导入的lable,全局唯一 String label = String.format("flink_import_%s%02d%02d_%02d%02d%02d_%s", calendar.get(Calendar.YEAR), calendar.get(Calendar.MONTH) + 1, calendar.get(Calendar.DAY_OF_MONTH), calendar.get(Calendar.HOUR_OF_DAY), calendar.get(Calendar.MINUTE), calendar.get(Calendar.SECOND), UUID.randomUUID().toString().replaceAll("-", "")); HttpURLConnection feConn = null; HttpURLConnection beConn = null; int status = 0; String respMsg = "", respContent = ""; try { // build request and send to fe feConn = getConnection(loadUrlStr, label, columns, jsonformat, mergeType, authEncoding); status = feConn.getResponseCode(); // fe send back http response code TEMPORARY_REDIRECT 307 and new be location if (status != 307) { log.warn("status is not TEMPORARY_REDIRECT 307, status: {},url:{}", status, loadUrlStr); return; } String location = feConn.getHeaderField("Location"); if (location == null) { log.warn("redirect location is null"); return; } // build request and send to new be location beConn = getConnection(location, label, columns, jsonformat, mergeType, authEncoding); // send data to be BufferedOutputStream bos = new BufferedOutputStream(beConn.getOutputStream()); bos.write(data.getBytes()); bos.close(); // get respond status = beConn.getResponseCode(); // respMsg = beConn.getResponseMessage(); InputStream stream = (InputStream) beConn.getContent(); BufferedReader br = new BufferedReader(new InputStreamReader(stream)); StringBuilder response = new StringBuilder(); String line; while ((line = br.readLine()) != null) { response.append(line); } respContent = response.toString(); RespContent respContentResult = JSONObject.parseObject(respContent, RespContent.class); // 检测 if (status == 200) { if (!checkStreamLoadStatus(respContentResult)) { log.warn("Stream {} '{}.{}' Date Fail:{},失败连接:{},失败原因:{}", mergeType, db, tbl, data, loadUrlStr, respContent); } else { log.info("Stream {} '{}.{}' Data Success : {}条,用时{}毫秒,数据是:{}", mergeType, db, tbl, respContentResult.getNumberLoadedRows(), respContentResult.getLoadTimeMs(), data); } } else { log.warn("Stream Load Request failed: {},message:{}", status, respContentResult.getMessage()); } } catch (Exception e) { e.printStackTrace(); String err = "failed to load audit via AuditLoader plugin with label: " + label; status = -1; respMsg = e.getMessage(); respContent = err; log.warn(err, respMsg); } finally { if (feConn != null) { feConn.disconnect(); } if (beConn != null) { beConn.disconnect(); } } } //获取http连接信息 private static HttpURLConnection getConnection(String urlStr, String label, String columns, String jsonformat, String mergeType, String authEncoding) throws IOException { URL url = new URL(urlStr); // e.g:http://192.168.1.1:8030/api/db_name/table_name/_stream_load HttpURLConnection conn = (HttpURLConnection) url.openConnection(); conn.setInstanceFollowRedirects(false); conn.setRequestMethod("PUT"); conn.setRequestProperty("Authorization", "Basic " + authEncoding); conn.addRequestProperty("Expect", "100-continue"); conn.addRequestProperty("Content-Type", "text/plain; charset=UTF-8"); conn.addRequestProperty("label", label); conn.addRequestProperty("max_filter_ratio", "0"); conn.addRequestProperty("strict_mode", "false"); conn.addRequestProperty("columns", columns);// e.g:id,field_1,field_2,field_3 conn.addRequestProperty("format", "json"); // conn.addRequestProperty("jsonpaths", jsonformat); conn.addRequestProperty("strip_outer_array", "true"); conn.addRequestProperty("merge_type", mergeType); // e.g:APPEND(增)或 DELETE(删) conn.setDoOutput(true); conn.setDoInput(true); return conn; } /** * 判断StreamLoad是否成功 * * @param respContent streamload返回的响应信息(JSON格式) * @return */ public static Boolean checkStreamLoadStatus(RespContent respContent) { if (DORIS_SUCCESS_STATUS.contains(respContent.getStatus()) && respContent.getNumberTotalRows() == respContent.getNumberLoadedRows()) { return true; } else { return false; } } public static void main(String[] args) { String authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", username, password).getBytes(StandardCharsets.UTF_8)); loadBatch("[{\"a\":\"1\",\"b\":\"2\"},{\"a\":\"3\",\"b\":\"4\"}]","","a,b","http://192.168.1.1:8030/api/db_name/table_name/_stream_load","APPEND","db_name","table_name",authEncoding); } }
public class RespContent { private String status; private long numberLoadedRows; private long numberTotalRows; private String message; private long loadTimeMs; public RespContent(String status, long numberLoadedRows, long numberTotalRows, String message, long loadTimeMs) { this.status = status; this.numberLoadedRows = numberLoadedRows; this.numberTotalRows = numberTotalRows; this.message = message; this.loadTimeMs = loadTimeMs; } public String getStatus() { return status; } public void setStatus(String status) { this.status = status; } public String getMessage() { return message; } public void setMessage(String message) { this.message = message; } public long getNumberLoadedRows() { return numberLoadedRows; } public void setNumberLoadedRows(long numberLoadedRows) { this.numberLoadedRows = numberLoadedRows; } public long getNumberTotalRows() { return numberTotalRows; } public void setNumberTotalRows(long numberTotalRows) { this.numberTotalRows = numberTotalRows; } public long getLoadTimeMs() { return loadTimeMs; } public void setLoadTimeMs(long loadTimeMs) { this.loadTimeMs = loadTimeMs; } }
注意:
数据源:
[
{
"a":"name1",
"b":"17868"
},
{
"a":"name2",
"b":"17870"
}
]
返回结果:
{ "TxnId":7149254, "Label":"flink_import_20220728_173414_073e6992f5504e408bf530154ad5808d", "Status":"Success", "Message":"OK", "NumberTotalRows":800, "NumberLoadedRows":800, "NumberFilteredRows":0, "NumberUnselectedRows":0, "LoadBytes":12801, "LoadTimeMs":42, "BeginTxnTimeMs":0, "StreamLoadPutTimeMs":0, "ReadDataTimeMs":0, "WriteDataTimeMs":22, "CommitAndPublishTimeMs":19, "ErrorURL": "http://192.168.1.1:8042/api/_load_error_log?file=__shard_0/error_log_insert_stmt_db18266d4d9b4ee5-abb00ddd64bdf005_db18266d4d9b4ee5_abb00ddd64bdf005" }
Stream load 导入结果参数解释说明:
注意:由于 Stream load 是同步的导入方式,所以并不会在 Doris 系统中记录导入信息,用户无法异步的通过查看导入命令看到 Stream load。使用时需监听创建导入请求的返回值获取导入结果。
doris文档地址 :https://doris.apache.org/zh-CN/docs/data-operate/import/import-way/stream-load-manual/
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。