当前位置:   article > 正文

Parquet文件推送数据到OSS_pyarrow.lib.arrowtypeerror expected a string or by

pyarrow.lib.arrowtypeerror expected a string or bytes dtype, got int64

1. 任务背景

  • 任务说明:公司 saas 数据分析类产品,客户需要把行为数据回传到客户指定文件系统中(oss)
  • 周期:T+1
  • 数据格式:parquet
  • 数据范围:部分表全量,部分表增量
  • 其他要求:

需要历史数据,部分应用部分周期需要全量一个文件,部分历史需要每天一个文件,新的数据 T+1 。

每个文件上传成功后需要一个状态空文件 _SUCCESS 文件

2. 任务分析

a. 分析

  • 本数据平台 impala+kudu+hive 架构,impala-shll可导出 csv文件。

注:如果系统简单,要求简单,也可以选择数据存成hive表形式,直接文件可parquet文件。这里选择用 impala- shell 导出方式是客户有复杂的要求等等。

  • csv 转 parquet ,并且 parquet文件需要携带 schema 信息。
  • 脚本需要支持按某一时间段的每天处理。

b. 方案

(1)python 做脚本

(2)impala-shell 命令方式导出符合要求的csv文件

(3)pandas 和 pyarraow.parquet 结合操作csv

(4)发送标准文件到 OSS

欢迎关注,一起学习

3. 开发

(1)导出csv文件

  1. impala-shell
  2. -i {imp_host}
  3. -d {imp_database}
  4. -q " {sql} "
  5. -B --output_delimiter="\\t"
  6. --print_header
  7. -o /data/xx.csv

(2) 环境执行

os.system(cmd)

(3) csv 转 parquet

  1. import pyarrow as pa
  2. import pyarrow.parquet as pq
  3. csv_file = pd.read_csv(csvFile, delimiter='\t',low_memory=False)
  4. user_schema = pa.schema([
  5. ('user_id', pa.int64()),
  6. ('name', pa.string())
  7. ])
  8. table = pa.Table.from_pandas(csv_file,schema=user_schema)
  9. pq.write_table(table, "/data/xxxx.parquet")

(4) 发送到OSS

  1. import oss2
  2. auth_endpoint = 'auth_endpoint'
  3. access_key_id = 'access_key_id'
  4. access_key_secret = 'auth_endpoint'
  5. bucket_name = 'bucket_name'
  6. auth = oss2.Auth(access_key_id, access_key_secret)
  7. bucket = oss2.Bucket(auth,auth_endpoint, bucket_name)
  8. with open(output_file, 'rb') as file_obj:
  9. # 上传具体数据
  10. put_object = bucket.put_object(oss_path, file_obj)
  11. if put_object and put_object.status == 200:
  12. # 上传_success文件夹
  13. put_success=bucket.put_object('{0}/_SUCCESS'.format(oss_success_path), '')
  14. if put_success and put_success.status == 200:
  15. print("oss data success file upload success")
  16. else:
  17. print("oss success data upload fail")
  18. else:
  19. print("oss success data upload fail")

(5)日期循环

  1. import datetime
  2. startTime = (datetime.datetime.now() + datetime.timedelta(days=-1)).strftime("%Y%m%d")
  3. endTime = (datetime.datetime.now()).strftime("%Y%m%d")
  4. if (is_today == 0):
  5. startTime = datetime.datetime.strptime('2025-05-09 00:00:00', '%Y-%m-%d %H:%M:%S').strftime('%Y%m%d')
  6. endTime = datetime.datetime.strptime('2025-08-17 00:00:00', '%Y-%m-%d %H:%M:%S').strftime('%Y%m%d')
  7. while startTime < endTime:
  8. print('逻辑处理')
  9. startTime = (datetime.datetime.strptime(startTime, '%Y%m%d') + datetime.timedelta(days=1)).strftime(
  10. '%Y%m%d')

4. 遇到的问题

(1) out of memory

Error tokenizing data. C error: out of memory

因为读取 CSV文件过大,内存放不下,设置 low_memory=False,机器内存小于文件大小,那是倒不出来的,除非分块读取

  1. file_index = 0
  2. for file in pd.read_csv(csvFile,
  3. delimiter='\t',
  4. low_memory=False,
  5. chunksize=20000):
  6. table = pa.Table.from_pandas(file, schema=schema)
  7. pq.write_table(table, '/data/xxxx_{1}.parquet'.format(file_index))
  8. file_index+=1

(2) 数据类型转换问题

  1. pyarrow.lib.ArrowTypeError:
  2. ('Expected a string or bytes dtype, got int64',
  3. 'Conversion failed for column xxxx with type int64')

当数据列出现 Null 的时候,转换 int64 转化不过去,就会报错,需要转为 str 同理一样,其他数据类型也会出现这种情况,当schema定义某列为int64,32,16 .... 的时候,如果出现Null,会自定识别为 float 类型。

  1. csv_file = pd.read_csv(csvFile, delimiter='\t',low_memory=False)
  2. csv_file['xxxx'] = csv_file['xxxx'].astype(str)
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/516041
推荐阅读
相关标签
  

闽ICP备14008679号