赞
踩
pip3 install --index-url https://pypi.tuna.tsinghua.edu.cn/simple pandas
import pandas as pd import os import shutil import subprocess # 原始csv文件路径 original_file = "/root/tar_temp/doris_tool/test.csv" # 切割后的文件存储路径 split_folder = "/root/tar_temp/doris_tool/result" # 每个切割文件的大小(字节) split_size = 2 * 1024 * 1024 * 1024 # Doris导入相关配置 doris_username = "root" doris_password = "XXX" doris_label = "label3" doris_timeout = 100 doris_column_separator = "," doris_base_url = "http://172.18.1.X:8034" doris_database_name = "test" doris_table_name = "insert_test_doris" # 设置最大内存限制(字节) max_memory = 1 * 1024 * 1024 * 1024 # 读入原始csv文件 df = pd.read_csv(original_file) # 如果文件大小低于2G,直接将整个文件作为一个切割文件 if os.path.getsize(original_file) <= split_size: split_df = df split_file = os.path.join(split_folder, "split_0.csv") split_df.to_csv(split_file, index=False) # 调用Doris的load命令将切割文件导入分区表 doris_load_url = f"{doris_base_url}/api/{doris_database_name}/{doris_table_name}/_stream_load" doris_load_cmd = f"curl --location-trusted -u {doris_username}:{doris_password} " \ f"-H \"label:{doris_label}\" -H \"timeout:{doris_timeout}\" " \ f"-H \"column_separator:{doris_column_separator}\" -T {split_file} {doris_load_url}" # 使用subprocess模块运行curl命令,并设置最大内存限制 subprocess.run(["bash", "-c", f"ulimit -v {max_memory} && {doris_load_cmd}"]) # 如果文件大小大于2G,按照原有逻辑进行切割 else: # 计算切割文件的数量 n_splits = int((os.path.getsize(original_file) + split_size - 1) / split_size) # 切割并保存文件 for i in range(n_splits): start_index = i * split_size end_index = min((i + 1) * split_size, len(df)) split_df = df.iloc[start_index:end_index] split_file = os.path.join(split_folder, f"split_{i}.csv") split_df.to_csv(split_file, index=False) # 如果是最后一个文件,需要重新计算文件大小 if i == n_splits - 1: split_size = end_index - start_index # 调用Doris的load命令将切割文件导入分区表 doris_load_url = f"{doris_base_url}/api/{doris_database_name}/{doris_table_name}/_stream_load" doris_load_cmd = f"curl --location-trusted -u {doris_username}:{doris_password} " \ f"-H \"label:{doris_label}\" -H \"timeout:{doris_timeout}\" " \ f"-H \"column_separator:{doris_column_separator}\" -H \"mem_limit:1G\" -T {split_file} {doris_load_url}" # 使用subprocess模块运行curl命令,并设置最大内存限制 subprocess.run(["bash", "-c", f"ulimit -v {max_memory} && {doris_load_cmd}"]) # 导入完成后删除切割文件夹中的所有文件 shutil.rmtree(split_folder) os.mkdir(split_folder)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。