赞
踩
如:
Spark Load | 通过Spark导入外部数据 |
Broker Load | 通过Broker导入外部存储数据 |
Stream Load | 流式导入数据(本地文件及内存数据) |
Routine Load | 导入Kafka数据 |
Binlog Load | 采集Mysql Binlog 导入数据 |
Insert Into | 外部表通过INSERT方式导入数据 |
S3 Load | S3协议的对象存储数据导入 |
分别对应不同的写入场景。
本文使用简单案例,实现Stream Load方式写入doris表中,能够满足基本的写入需要。
主要内容为:
- 1.写入数据url拼接
-
- 2.调用拼接类
-
- 3.写入数据
-
- 4.写入结果判断,处理(成功删掉文件,失败发送钉钉告警)
-
- 5.condition传入read_json_by_line和strip_outer_array的区别
示例:
- # demo curl --location-trusted -u user:passwd [-H ""...] -T data.file -XPUT http://fe_host:http_port/api/{db}/{table}/_stream_load
-
- # curl --location-trusted -u root -H "columns: k1,k2,source_sequence,v1,v2" -H "function_column.sequence_col: source_sequence" -T testData http://host:port/api/testDb/testTbl/_stream_load
参数详解可以查看Stream load | Apache Doris
- # -*- coding: utf-8 -*-
- # @FileName: stream_load.py
- # @Software: PyCharm
-
- import json
- import os
-
-
- class Stream_Load_Helper():
-
- #初始化类
- def __init__(self, user, password, filepath, host, port, db, tb, condition=None):
- self.user = user
- self.password = password
- self.filePath = filepath
- self.host = host
- self.port = port
- self.db = db
- self.tb = tb
- self.condition = condition
-
- self.user_password = "\"" + user + ":" + password +"\""
- self.condition_curl = ""
- for i in condition.keys():
- self.condition_curl += "-H" + " \"" + i + ":" + condition.get(i) + "\"" + " "
-
- #拼接curl请求连接
- def load_command_curl(self):
- url = 'http://' + self.host + ":" + self.port + "/api/" + self.db + "/" + self.tb + "/_stream_load"
- command_list = []
- command_list.append("curl"),
- command_list.append("--location-trusted"),
- command_list.append("-u"),
- command_list.append(self.user_password),
- command_list.append(self.condition_curl),
- command_list.append("-T"),
- command_list.append(self.filePath),
- command_list.append(url),
- command = " ".join(command_list)
-
- return command
-
-
- if __name__ == '__main__':
- stream_load_helper = Stream_Load_Helper(
- user="11",
- password="22",
- filepath="33",
- host="44",
- port="55",
- db="66",
- tb="77",
- condition={"a": "1", "b": "2"}
- )
- command = stream_load_helper.load_command_curl()
-
- print(command)
-
- # os.system('ls -al')
'运行
执行打印结果为
curl --location-trusted -u "11:22" -H "a:1" -H "b:2" -T 33 http://44:55/api/66/77/_stream_load
- ##stream load 写入doris连接器
- def stream_load_curl(filepath, condition,db,tb):
-
-
- host, port, user, password = get_db_conf()
-
- # print(type(filepath))
- stream_load_helper = Stream_Load_Helper(user=user,
- password=password,
- filepath=filepath,
- host=host,
- port=str(port),
- db=db,
- tb=tb,
- condition=condition)
-
- return stream_load_helper.load_command_curl()
-
-
- ## 读取数据库连接配置文件,区分win和linux
- def get_db_conf():
- cf = configparser.ConfigParser()
-
- cf.read("db.conf")
- if "win" in sys.platform:
- host = cf.get("doris_marketing_advertising_data_win", "host")
- port = cf.get("doris_marketing_advertising_data_win", "port")
- user = cf.get("doris_marketing_advertising_data_win", "user")
- password = cf.get("doris_marketing_advertising_data_win", "password")
- else:
- host = cf.get("doris_marketing_advertising_data_linux", "host")
- port = cf.get("doris_marketing_advertising_data_linux", "port")
- user = cf.get("doris_marketing_advertising_data_linux", "user")
- password = cf.get("doris_marketing_advertising_data_linux", "password")
- return host,port,user,password
'运行
- [doris_marketing_advertising_data_win]
- user = xxx
- password = xxx
- host = xxx
- port = 1123
- db = xxx
- table= xxx
-
-
- [doris_marketing_advertising_data_linux]
- user = xxx
- password = xxx
- host = xxx
- port = 1123
- db = xxx
- table= xxx
-
- """
- @param file_path: json文件名
- @param condition: 传入的参数
- @return:
- """
- db=xx
- tb=xx
- result = subprocess.getoutput(stream_load_curl(file_path,condition={"format": "json", "read_json_by_line": "True"},db=db,tb=tb)
- print(result)
- #写入成功,删掉文件
- if "\"Status\": \"Success\"" in result:
- os.remove(file_path)
- else:
- send_dingtalk(mess="数据入库失败,请检查脚本")
-
-
-
- ##发送钉钉报警,注意关键词需要与在钉钉上配置的关键词一致
- def send_dingtalk( mess):
- """
- @param
- @param mess: 报警内容
- @return:
- """
-
- # WebHook地址# WebHook地址
- webhook = 'xxx'
- # 初始化机器人
- xiaoding = DingtalkChatbot(webhook)
-
- # Text消息@所有人
- content = "%s" % time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + "\n" + "%s:%s" % ("关键词-"+ mess)
- xiaoding.send_text(msg=content)
result 返回的结果为json格式(取自官网示例):
- {
- "TxnId": 1003,
- "Label": "b6f3bc78-0d2c-45d9-9e4c-faa0a0149bee",
- "Status": "Success",
- "ExistingJobStatus": "FINISHED", // optional
- "Message": "OK",
- "NumberTotalRows": 1000000,
- "NumberLoadedRows": 1000000,
- "NumberFilteredRows": 1,
- "NumberUnselectedRows": 0,
- "LoadBytes": 40888898,
- "LoadTimeMs": 2144,
- "BeginTxnTimeMs": 1,
- "StreamLoadPutTimeMs": 2,
- "ReadDataTimeMs": 325,
- "WriteDataTimeMs": 1933,
- "CommitAndPublishTimeMs": 106,
- "ErrorURL": "http://192.168.1.1:8042/api/_load_error_log?file=__shard_0/error_log_insert_stmt_db18266d4d9b4ee5-abb00ddd64bdf005_db18266d4d9b4ee5_abb00ddd64bdf005"
- }
当Status 为Success时为成功
如果失败会有失败原因ErrorURL,使用curl ErrorURL可以查看失败详情
- {"key":"v"}
- {"key":"v"}
- {"key":"v"}
[{"key":"v"},{"key":"v"},{"key":"v"}]
dori使用strem_load对json文件解析时。如果是strip_outer_array,会一次请把整个JsonArray读取,然后再解析成一个个jsonobject写入,所以如果文件很大时,会占用很多的资源。
当为read_json_by_line方式写入时,会直接以换行符进行解析jsonobject,会更节约资源。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。