当前位置:   article > 正文

Python 将parquet文件转换为csv文件

parquet文件转换为csv

Python 将parquet文件转换为csv文件

使用pyarrow插件将parquet文件转换为csv


```python
import os
import pyarrow.parquet as pq
from concurrent.futures import ThreadPoolExecutor
import csv
import time

# 定义一个函数来处理单个 Parquet 文件
def process_parquet_file(parquet_file, csv_file):
    try:
        start_time = time.time()  # 记录开始时间

        # 打开 CSV 文件,使用追加模式
        # csv_file_handle = open(csv_file, 'a', newline='')
        csv_file_handle = open(csv_file, 'a', newline='', encoding='utf-8')

        # 获取 Parquet 文件的字段名
        parquet_data = pq.ParquetFile(parquet_file)
        field_names = parquet_data.schema.names

        # 写入 CSV 文件的标题行(只在文件创建时写入一次)
        if os.path.getsize(csv_file) == 0:
            csv_writer = csv.writer(csv_file_handle)
            csv_writer.writerow(field_names)

        # 逐块读取 Parquet 数据并写入 CSV 文件
        for i in range(parquet_data.num_row_groups):
            row_group = parquet_data.read_row_group(i)
            df = row_group.to_pandas()
            # 过滤特殊字符或替换为合适的占位符
            df = df.apply(lambda x: x.replace('\ue108', '') if isinstance(x, str) else x)
            df.to_csv(csv_file_handle, mode='a', header=False, index=False)

        # 关闭文件句柄
        csv_file_handle.close()

        end_time = time.time()  # 记录结束时间
        elapsed_time = end_time - start_time
        print(f"{parquet_file} 处理完毕,用时 {elapsed_time:.2f} 秒")
    except Exception as e:
        print(f"处理 {parquet_file} 时出现错误: {str(e)}")
    finally:
        print(f"{parquet_file} 处理完毕,用时 {elapsed_time:.2f} 秒")

if __name__ == "__main__":
    datadir = './'  # 包含 Parquet 文件的文件夹
    parquet_files = [os.path.join(datadir, f) for f in os.listdir(datadir) if f.endswith('.parquet')]

    # 创建线程池并行处理 Parquet 文件
    with ThreadPoolExecutor(max_workers=4) as executor:
        for parquet_file in parquet_files:
            # 生成输出 CSV 文件名(根据 Parquet 文件名)
            csv_file = os.path.splitext(parquet_file)[0] + '.csv'

            # 使用线程池处理 Parquet 文件
            executor.submit(process_parquet_file, parquet_file, csv_file)

    print("所有文件处理完毕")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59

注意parquet文件存放路径。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/515976
推荐阅读
相关标签
  

闽ICP备14008679号