当前位置:   article > 正文

python使用Stream Load方式写入doris_python抽取接口数据插入doris

python抽取接口数据插入doris

0. doris支持的多种数据写入方式

如:

导入总览 | Apache Doris

分别对应不同的写入场景。

本文使用简单案例,实现Stream Load方式写入doris表中,能够满足基本的写入需要。

主要内容为:
 

  1. 1.写入数据url拼接
  2. 2.调用拼接类
  3. 3.写入数据
  4. 4.写入结果判断,处理(成功删掉文件,失败发送钉钉告警)
  5. 5.condition传入read_json_by_line和strip_outer_array的区别

1.Stream Load 通过 HTTP 协议提交和传输数据。

示例:

  1. # demo curl --location-trusted -u user:passwd [-H ""...] -T data.file -XPUT http://fe_host:http_port/api/{db}/{table}/_stream_load
  2. # curl --location-trusted -u root -H "columns: k1,k2,source_sequence,v1,v2" -H "function_column.sequence_col: source_sequence" -T testData http://host:port/api/testDb/testTbl/_stream_load

参数详解可以查看Stream load | Apache Doris

2.使用python代码对上述http请求方式进行封装

  1. # -*- coding: utf-8 -*-
  2. # @FileName: stream_load.py
  3. # @Software: PyCharm
  4. import json
  5. import os
  6. class Stream_Load_Helper():
  7. #初始化类
  8. def __init__(self, user, password, filepath, host, port, db, tb, condition=None):
  9. self.user = user
  10. self.password = password
  11. self.filePath = filepath
  12. self.host = host
  13. self.port = port
  14. self.db = db
  15. self.tb = tb
  16. self.condition = condition
  17. self.user_password = "\"" + user + ":" + password +"\""
  18. self.condition_curl = ""
  19. for i in condition.keys():
  20. self.condition_curl += "-H" + " \"" + i + ":" + condition.get(i) + "\"" + " "
  21. #拼接curl请求连接
  22. def load_command_curl(self):
  23. url = 'http://' + self.host + ":" + self.port + "/api/" + self.db + "/" + self.tb + "/_stream_load"
  24. command_list = []
  25. command_list.append("curl"),
  26. command_list.append("--location-trusted"),
  27. command_list.append("-u"),
  28. command_list.append(self.user_password),
  29. command_list.append(self.condition_curl),
  30. command_list.append("-T"),
  31. command_list.append(self.filePath),
  32. command_list.append(url),
  33. command = " ".join(command_list)
  34. return command
  35. if __name__ == '__main__':
  36. stream_load_helper = Stream_Load_Helper(
  37. user="11",
  38. password="22",
  39. filepath="33",
  40. host="44",
  41. port="55",
  42. db="66",
  43. tb="77",
  44. condition={"a": "1", "b": "2"}
  45. )
  46. command = stream_load_helper.load_command_curl()
  47. print(command)
  48. # os.system('ls -al')
'
运行

 执行打印结果为

curl --location-trusted -u "11:22" -H "a:1" -H "b:2"  -T 33 http://44:55/api/66/77/_stream_load

3.调用封装的类:

  1. ##stream load 写入doris连接器
  2. def stream_load_curl(filepath, condition,db,tb):
  3. host, port, user, password = get_db_conf()
  4. # print(type(filepath))
  5. stream_load_helper = Stream_Load_Helper(user=user,
  6. password=password,
  7. filepath=filepath,
  8. host=host,
  9. port=str(port),
  10. db=db,
  11. tb=tb,
  12. condition=condition)
  13. return stream_load_helper.load_command_curl()
  14. ## 读取数据库连接配置文件,区分win和linux
  15. def get_db_conf():
  16. cf = configparser.ConfigParser()
  17. cf.read("db.conf")
  18. if "win" in sys.platform:
  19. host = cf.get("doris_marketing_advertising_data_win", "host")
  20. port = cf.get("doris_marketing_advertising_data_win", "port")
  21. user = cf.get("doris_marketing_advertising_data_win", "user")
  22. password = cf.get("doris_marketing_advertising_data_win", "password")
  23. else:
  24. host = cf.get("doris_marketing_advertising_data_linux", "host")
  25. port = cf.get("doris_marketing_advertising_data_linux", "port")
  26. user = cf.get("doris_marketing_advertising_data_linux", "user")
  27. password = cf.get("doris_marketing_advertising_data_linux", "password")
  28. return host,port,user,password
'
运行

4.db.conf 

  1. [doris_marketing_advertising_data_win]
  2. user = xxx
  3. password = xxx
  4. host = xxx
  5. port = 1123
  6. db = xxx
  7. table= xxx
  8. [doris_marketing_advertising_data_linux]
  9. user = xxx
  10. password = xxx
  11. host = xxx
  12. port = 1123
  13. db = xxx
  14. table= xxx

5.写入doris

  1. """
  2. @param file_path: json文件名
  3. @param condition: 传入的参数
  4. @return:
  5. """
  6. db=xx
  7. tb=xx
  8. result = subprocess.getoutput(stream_load_curl(file_path,condition={"format": "json", "read_json_by_line": "True"},db=db,tb=tb)
  9. print(result)
  10. #写入成功,删掉文件
  11. if "\"Status\": \"Success\"" in result:
  12. os.remove(file_path)
  13. else:
  14. send_dingtalk(mess="数据入库失败,请检查脚本")
  15. ##发送钉钉报警,注意关键词需要与在钉钉上配置的关键词一致
  16. def send_dingtalk( mess):
  17. """
  18. @param
  19. @param mess: 报警内容
  20. @return:
  21. """
  22. # WebHook地址# WebHook地址
  23. webhook = 'xxx'
  24. # 初始化机器人
  25. xiaoding = DingtalkChatbot(webhook)
  26. # Text消息@所有人
  27. content = "%s" % time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + "\n" + "%s:%s" % ("关键词-"+ mess)
  28. xiaoding.send_text(msg=content)

result 返回的结果为json格式(取自官网示例):

  1. {
  2. "TxnId": 1003,
  3. "Label": "b6f3bc78-0d2c-45d9-9e4c-faa0a0149bee",
  4. "Status": "Success",
  5. "ExistingJobStatus": "FINISHED", // optional
  6. "Message": "OK",
  7. "NumberTotalRows": 1000000,
  8. "NumberLoadedRows": 1000000,
  9. "NumberFilteredRows": 1,
  10. "NumberUnselectedRows": 0,
  11. "LoadBytes": 40888898,
  12. "LoadTimeMs": 2144,
  13. "BeginTxnTimeMs": 1,
  14. "StreamLoadPutTimeMs": 2,
  15. "ReadDataTimeMs": 325,
  16. "WriteDataTimeMs": 1933,
  17. "CommitAndPublishTimeMs": 106,
  18. "ErrorURL": "http://192.168.1.1:8042/api/_load_error_log?file=__shard_0/error_log_insert_stmt_db18266d4d9b4ee5-abb00ddd64bdf005_db18266d4d9b4ee5_abb00ddd64bdf005"
  19. }

当Status 为Success时为成功

如果失败会有失败原因ErrorURL,使用curl  ErrorURL可以查看失败详情

6.其他:

1.condition传入read_json_by_line和strip_outer_array的区别

        1.condition 为条件 
        当condition  = {"format": "json", "read_json_by_line": "True"}时文件内数据格式为一个json体为一行,即

  1. {"key":"v"}
  2. {"key":"v"}
  3. {"key":"v"}

        2.当condition  = {"format": "json", "strip_outer_array": "True"}时文件内数据格式为一整个jsonarray对象,即

[{"key":"v"},{"key":"v"},{"key":"v"}]

        3.区别

dori使用strem_load对json文件解析时。如果是strip_outer_array,会一次请把整个JsonArray读取,然后再解析成一个个jsonobject写入,所以如果文件很大时,会占用很多的资源。

当为read_json_by_line方式写入时,会直接以换行符进行解析jsonobject,会更节约资源。
 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/天景科技苑/article/detail/973143
推荐阅读
相关标签
  

闽ICP备14008679号