当前位置:   article > 正文

Python-大文件入Doris方案(切割方式)_pydoris

pydoris

环境准备

  1. Doris环境-账户密码
  2. 需要导入的文件地址,能访问到
  3. 内存方案,这里声明内存方式为1G
  4. 环境需要安装python3,pip3,且需要安装pandas模块

pip3 install --index-url https://pypi.tuna.tsinghua.edu.cn/simple pandas

import pandas as pd
import os
import shutil
import subprocess

# 原始csv文件路径
original_file = "/root/tar_temp/doris_tool/test.csv"
# 切割后的文件存储路径
split_folder = "/root/tar_temp/doris_tool/result"
# 每个切割文件的大小(字节)
split_size = 2 * 1024 * 1024 * 1024

# Doris导入相关配置
doris_username = "root"
doris_password = "XXX"
doris_label = "label3"
doris_timeout = 100
doris_column_separator = ","
doris_base_url = "http://172.18.1.X:8034"
doris_database_name = "test"
doris_table_name = "insert_test_doris"

# 设置最大内存限制(字节)
max_memory = 1 * 1024 * 1024 * 1024

# 读入原始csv文件
df = pd.read_csv(original_file)

# 如果文件大小低于2G,直接将整个文件作为一个切割文件
if os.path.getsize(original_file) <= split_size:
    split_df = df
    split_file = os.path.join(split_folder, "split_0.csv")
    split_df.to_csv(split_file, index=False)

    # 调用Doris的load命令将切割文件导入分区表
    doris_load_url = f"{doris_base_url}/api/{doris_database_name}/{doris_table_name}/_stream_load"
    doris_load_cmd = f"curl --location-trusted -u {doris_username}:{doris_password} " \
                     f"-H \"label:{doris_label}\" -H \"timeout:{doris_timeout}\" " \
                     f"-H \"column_separator:{doris_column_separator}\" -T {split_file} {doris_load_url}"

    # 使用subprocess模块运行curl命令,并设置最大内存限制
    subprocess.run(["bash", "-c", f"ulimit -v {max_memory} && {doris_load_cmd}"])

# 如果文件大小大于2G,按照原有逻辑进行切割
else:
    # 计算切割文件的数量
    n_splits = int((os.path.getsize(original_file) + split_size - 1) / split_size)

    # 切割并保存文件
    for i in range(n_splits):
        start_index = i * split_size
        end_index = min((i + 1) * split_size, len(df))
        split_df = df.iloc[start_index:end_index]
        split_file = os.path.join(split_folder, f"split_{i}.csv")
        split_df.to_csv(split_file, index=False)

        # 如果是最后一个文件,需要重新计算文件大小
        if i == n_splits - 1:
            split_size = end_index - start_index

        # 调用Doris的load命令将切割文件导入分区表
        doris_load_url = f"{doris_base_url}/api/{doris_database_name}/{doris_table_name}/_stream_load"
        doris_load_cmd = f"curl --location-trusted -u {doris_username}:{doris_password} " \
                         f"-H \"label:{doris_label}\" -H \"timeout:{doris_timeout}\" " \
                         f"-H \"column_separator:{doris_column_separator}\" -H \"mem_limit:1G\" -T {split_file} {doris_load_url}"

        # 使用subprocess模块运行curl命令,并设置最大内存限制
        subprocess.run(["bash", "-c", f"ulimit -v {max_memory} && {doris_load_cmd}"])

# 导入完成后删除切割文件夹中的所有文件
shutil.rmtree(split_folder)
os.mkdir(split_folder)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/973137
推荐阅读
相关标签
  

闽ICP备14008679号