赞
踩
```python import os import pyarrow.parquet as pq from concurrent.futures import ThreadPoolExecutor import csv import time # 定义一个函数来处理单个 Parquet 文件 def process_parquet_file(parquet_file, csv_file): try: start_time = time.time() # 记录开始时间 # 打开 CSV 文件,使用追加模式 # csv_file_handle = open(csv_file, 'a', newline='') csv_file_handle = open(csv_file, 'a', newline='', encoding='utf-8') # 获取 Parquet 文件的字段名 parquet_data = pq.ParquetFile(parquet_file) field_names = parquet_data.schema.names # 写入 CSV 文件的标题行(只在文件创建时写入一次) if os.path.getsize(csv_file) == 0: csv_writer = csv.writer(csv_file_handle) csv_writer.writerow(field_names) # 逐块读取 Parquet 数据并写入 CSV 文件 for i in range(parquet_data.num_row_groups): row_group = parquet_data.read_row_group(i) df = row_group.to_pandas() # 过滤特殊字符或替换为合适的占位符 df = df.apply(lambda x: x.replace('\ue108', '') if isinstance(x, str) else x) df.to_csv(csv_file_handle, mode='a', header=False, index=False) # 关闭文件句柄 csv_file_handle.close() end_time = time.time() # 记录结束时间 elapsed_time = end_time - start_time print(f"{parquet_file} 处理完毕,用时 {elapsed_time:.2f} 秒") except Exception as e: print(f"处理 {parquet_file} 时出现错误: {str(e)}") finally: print(f"{parquet_file} 处理完毕,用时 {elapsed_time:.2f} 秒") if __name__ == "__main__": datadir = './' # 包含 Parquet 文件的文件夹 parquet_files = [os.path.join(datadir, f) for f in os.listdir(datadir) if f.endswith('.parquet')] # 创建线程池并行处理 Parquet 文件 with ThreadPoolExecutor(max_workers=4) as executor: for parquet_file in parquet_files: # 生成输出 CSV 文件名(根据 Parquet 文件名) csv_file = os.path.splitext(parquet_file)[0] + '.csv' # 使用线程池处理 Parquet 文件 executor.submit(process_parquet_file, parquet_file, csv_file) print("所有文件处理完毕")
注意parquet文件存放路径。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。