赞
踩
需要历史数据,部分应用部分周期需要全量一个文件,部分历史需要每天一个文件,新的数据 T+1 。
每个文件上传成功后需要一个状态空文件 _SUCCESS 文件
注:如果系统简单,要求简单,也可以选择数据存成hive表形式,直接文件可parquet文件。这里选择用 impala- shell 导出方式是客户有复杂的要求等等。
(1)python 做脚本
(2)impala-shell 命令方式导出符合要求的csv文件
(3)pandas 和 pyarraow.parquet 结合操作csv
(4)发送标准文件到 OSS
欢迎关注,一起学习
(1)导出csv文件
- impala-shell
- -i {imp_host}
- -d {imp_database}
- -q " {sql} "
- -B --output_delimiter="\\t"
- --print_header
- -o /data/xx.csv
(2) 环境执行
os.system(cmd)
(3) csv 转 parquet
- import pyarrow as pa
- import pyarrow.parquet as pq
-
- csv_file = pd.read_csv(csvFile, delimiter='\t',low_memory=False)
-
- user_schema = pa.schema([
- ('user_id', pa.int64()),
- ('name', pa.string())
- ])
-
- table = pa.Table.from_pandas(csv_file,schema=user_schema)
-
- pq.write_table(table, "/data/xxxx.parquet")
(4) 发送到OSS
- import oss2
-
- auth_endpoint = 'auth_endpoint'
-
- access_key_id = 'access_key_id'
-
- access_key_secret = 'auth_endpoint'
-
- bucket_name = 'bucket_name'
-
- auth = oss2.Auth(access_key_id, access_key_secret)
-
- bucket = oss2.Bucket(auth,auth_endpoint, bucket_name)
-
-
- with open(output_file, 'rb') as file_obj:
- # 上传具体数据
- put_object = bucket.put_object(oss_path, file_obj)
- if put_object and put_object.status == 200:
- # 上传_success文件夹
- put_success=bucket.put_object('{0}/_SUCCESS'.format(oss_success_path), '')
- if put_success and put_success.status == 200:
- print("oss data success file upload success")
- else:
- print("oss success data upload fail")
- else:
- print("oss success data upload fail")
(5)日期循环
- import datetime
-
- startTime = (datetime.datetime.now() + datetime.timedelta(days=-1)).strftime("%Y%m%d")
- endTime = (datetime.datetime.now()).strftime("%Y%m%d")
-
- if (is_today == 0):
- startTime = datetime.datetime.strptime('2025-05-09 00:00:00', '%Y-%m-%d %H:%M:%S').strftime('%Y%m%d')
- endTime = datetime.datetime.strptime('2025-08-17 00:00:00', '%Y-%m-%d %H:%M:%S').strftime('%Y%m%d')
-
- while startTime < endTime:
-
- print('逻辑处理')
-
- startTime = (datetime.datetime.strptime(startTime, '%Y%m%d') + datetime.timedelta(days=1)).strftime(
- '%Y%m%d')
(1) out of memory
Error tokenizing data. C error: out of memory
因为读取 CSV文件过大,内存放不下,设置 low_memory=False,机器内存小于文件大小,那是倒不出来的,除非分块读取
- file_index = 0
-
- for file in pd.read_csv(csvFile,
- delimiter='\t',
- low_memory=False,
- chunksize=20000):
-
- table = pa.Table.from_pandas(file, schema=schema)
-
- pq.write_table(table, '/data/xxxx_{1}.parquet'.format(file_index))
-
- file_index+=1
(2) 数据类型转换问题
- pyarrow.lib.ArrowTypeError:
- ('Expected a string or bytes dtype, got int64',
- 'Conversion failed for column xxxx with type int64')
当数据列出现 Null 的时候,转换 int64 转化不过去,就会报错,需要转为 str 同理一样,其他数据类型也会出现这种情况,当schema定义某列为int64,32,16 .... 的时候,如果出现Null,会自定识别为 float 类型。
- csv_file = pd.read_csv(csvFile, delimiter='\t',low_memory=False)
-
- csv_file['xxxx'] = csv_file['xxxx'].astype(str)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。