当前位置:   article > 正文

Python pandas大批量处理多个excel,并进行处理、统计和改进思路_pandas打开多个excel文件

pandas打开多个excel文件

处理目标:读取800多个excel中存储的各个城市一段时间的企业信息(每个城市都至少有一个excel的数据),统计每个城市2012-2023年每年各个二级制造业的企业数量

数据大小:800多个excel,共计45GB大小,单个excel大小在1MB-250MB之间

需求分析:由于需要二级制造业和年份两个维度,加上excel中的行和列,不难联想到pandas中的Dataframe;除此之外还需要考虑到大量数据下,普通性能的笔记本要如何简化处理流程,缩短程序的运行时间,字符串的处理和输入、处理、输出的细节;最后代码编写成功后需要先对单个excel进行测试,再对多个excel进行测试,最后加上一些输出信息(为了监控程序的运行进度),再对整体数据跑,最终得出结果

程序设计:为了缩短程序的运行时间,所以先将所有需要的数据在内存中处理好,最后一次性输出到excel中;按照信息专家的设计模式来说,年份和制造子行业的信息都是城市所拥有的信息,所以需要城市类;其次还需要一些筛选符合条件的数据的函数;以及处理单个excel的函数和持久化存储的函数

先看处理单个excel的函数,读入excel之前就要确定excel的列有两种情况,所以先用

pd.read_excel(file_path, engine='openpyxl', nrows=1).columns.tolist()

读取出列并存为列表,将分成两中处理方式,在将excel中的数据读入时,有个很有效提高效率的方式,就是只读取需要的列,read_excel函数的usercols参数就起到这个作用,只有它含有的列才会被读入存储为dataframe;filtered_df生成的方式是原df符合要求的数据才会被保留,.str[:4]指只取前4位,.fillna('0') 指的是空置添0 ,.astype(int)指的是转换成int型变量,.between(2012,2023) 当前列在这个范围中的数据才会被保留,&连接下一个条件,当条件被写成函数时,要用df[column].apply(function_name)来应用函数,数据会被作为参数传入

再按照行业分类和年份作为列和行创建元素全为0的Dataframe,对其进行初始化;对filtered_df逐行遍历按照其中的一列数据为行,一列数据为列的原则对应在result_df上计数,最后返回result_df,这就是处理单个excel的逻辑

  1. # 对每一个excel进行处理
  2. def data_analyse(file_path):
  3. print(file_path)
  4. cols = pd.read_excel(file_path, engine='openpyxl', nrows=1).columns.tolist()
  5. # 存在二级行业分类的excel
  6. if '二级行业分类' in cols:
  7. df = pd.read_excel(file_path, engine='openpyxl', usecols=['成立日期', '二级行业分类']) # 只取两列
  8. # 只要12-23年和31个制造子行业的数据
  9. filtered_df = df[
  10. df['成立日期'].str[:4].fillna('0').astype(int).between(2012, 2023) & df['二级行业分类'].apply(
  11. is_manufacture_industry)]
  12. # 计算企业总数量
  13. count = len(filtered_df)
  14. # 建立行业为col和年份为index且初始元素都为0的dataframe
  15. index_years = list(range(2012, 2024))
  16. columns_names = manufacturing_industries
  17. result_df = pd.DataFrame(0, index=index_years, columns=columns_names)
  18. # 遍历符合要求的每行数据并用计数df进行计数
  19. for index, row in filtered_df.iterrows():
  20. year = int(row['成立日期'][:4])
  21. sub_industry = row['二级行业分类']
  22. result_df.at[year, sub_industry] += 1
  23. # 对result_df的每一行求和,计算当前城市的年度制造业总量
  24. result_df['年度制造业总量'] = result_df.sum(axis=1)
  25. return result_df, count
  26. # 没有二级行业分类只有所属行业的excel
  27. else:
  28. df = pd.read_excel(file_path, engine='openpyxl', usecols=['成立日期', '所属行业']) # 只取两列
  29. # 只要12-23年和31个制造子行业的数据
  30. filtered_df = df[
  31. df['成立日期'].str[:4].fillna('0').astype(int).between(2012, 2023) & df['所属行业'].apply(
  32. is_manufacture_industry)]
  33. # 计算企业总数量
  34. count = len(filtered_df)
  35. # 建立行业为col和年份为index且初始元素都为0的dataframe
  36. index_years = list(range(2012, 2024))
  37. columns_names = manufacturing_industries
  38. result_df = pd.DataFrame(0, index=index_years, columns=columns_names)
  39. # 遍历符合要求的每行数据并用计数df进行计数
  40. for index, row in filtered_df.iterrows():
  41. year = int(row['成立日期'][:4])
  42. sub_industry = row['所属行业']
  43. result_df.at[year, sub_industry] += 1
  44. # 对result_df的每一行求和,计算当前城市的年度制造业总量
  45. result_df['年度制造业总量'] = result_df.sum(axis=1)
  46. return result_df, count

再看主程序

city_flag是一个开关变量,其作用是判断相邻的两个excel是否是同一个城市的数据,判断的函数是check_cities,其中对excel文件名进行判断,使用全局变量last_city记录上一个excel的数据所属城市,若城市相同则对两个excel的result_df的对应元素相加,使用city_merge函数实现;若不同则创建新的城市对象加入城市列表中,最后完成之后进行持久化存储

  1. if __name__ == '__main__':
  2. # 用以区别是否为一同城市的不同数据表
  3. city_flag = False # 默认是不同的城市
  4. last_city = ''
  5. city_list = [] # 所有城市对象的容器
  6. directory_path = r'G:\中国工商注册企业全量信息(2023.9更新)\全国数据'
  7. all_files = os.listdir(directory_path)
  8. # 开始循环处理目录中的文件
  9. for file in all_files:
  10. # 检查城市是否已经有存在的数据了,并调整city_flag
  11. last_city = check_cities(file)
  12. # 文件绝对路径
  13. file_path = os.path.join(directory_path, file)
  14. # 数量矩阵
  15. result_df, count = data_analyse(file_path)
  16. if city_flag: # 不同的excel,相同的城市,进行df对应元素相加
  17. city_merge(result_df, count)
  18. else:
  19. # 新的城市,创建对象,并加入城市列表
  20. temp_city = City(last_city, result_df, count)
  21. city_list.append(temp_city)
  22. # 进行excel结果输出
  23. save_as_excel()
  24. print('done!')

check_cities函数,对城市名称进行检查,并更新last_city;提取城市名时使用了正则表达式,表达式会匹配所有的汉字,返回值为更新后的last_city

  1. # 城市名称检查,返回值为该次的城市名,将作为下一次检查的lastcity
  2. def check_cities(file):
  3. global city_flag # 声明全局变量
  4. global last_city
  5. # 提取城市名
  6. simplified_name = ''.join(re.findall(r'[\u4e00-\u9fa5]', file))
  7. if last_city != simplified_name:
  8. city_flag = False
  9. last_city = simplified_name
  10. else:
  11. city_flag = True
  12. return last_city

对相同的城市数据进行合并,若有相同的城市,说明城市列表中最后一个元素与当前城市相同,所以对city_list的最后一个元素的result_df进行修改,加上当前的result_df,就完成了合并

  1. # 相同城市,对应元素合并
  2. def city_merge(result_df, count):
  3. global city_list
  4. city_list[-1].result_df = city_list[-1].result_df + result_df # 最新加入的城市的dataframe和其后续的excel表的df对应元素相加
  5. city_list[-1].count = city_list[-1].count + count

持久化存存储结果

最后的内存中的结果是一个city_list,主要是一些格式上的要求:dataframe的前一行要有city_name,且只有第一个dataframe需要列名(因为excel中只有第一行有列名);主要用到两种写法:1.ws.cell,直接对单元格进行写入  2.dataframe.to_excel函数可以整体的将dataframe写入excel

  1. # 存储到excel中
  2. def save_as_excel():
  3. with pd.ExcelWriter('result.xlsx', engine='openpyxl') as writer:
  4. startrow = 0
  5. for idx, city in enumerate(city_list):
  6. # 将city_name写入到当前的startrow位置
  7. ws = writer.sheets['Sheet1'] if 'Sheet1' in writer.sheets else None
  8. if not ws:
  9. ws = writer.book.create_sheet('Sheet1')
  10. ws.cell(row=startrow + 1, column=1, value=city.city_name)
  11. # 对每个城市的12-23年11年制造业总量求和
  12. # ws.cell(row=startrow + 1, column=2, value='2012-2023年制造业总量')
  13. # ws.cell(row=startrow + 1, column=3, value=city.count)
  14. # 如果是第一个DataFrame,写入header
  15. if idx == 0:
  16. header = True
  17. else:
  18. header = False
  19. # 更新startrow为city_name之后的行,并将DataFrame写入到这个位置
  20. startrow += 1
  21. city.result_df.to_excel(writer, sheet_name='Sheet1', startrow=startrow, index=True, header=header)
  22. # 更新下一个dataframe的起始行,包括DataFrame本身的长度和一个额外的行为下一个city_name预留的位置
  23. startrow += city.result_df.shape[0] + 1

所有源码:

  1. import os
  2. import re
  3. import pandas as pd
  4. # 制造业二级分类
  5. manufacturing_industries = ['农副食品加工业', '食品制造业', '酒、饮料和精制茶制造业', '烟草制品业', '纺织业',
  6. '纺织服装、服饰业', '皮革、毛皮、羽毛及其制品和制鞋业', '木材加工和木、竹、藤、棕、草制品业',
  7. '家具制造业', '造纸和纸制品业', '印刷和记录媒介复制业',
  8. '文教、工美、体育和娱乐用品制造业', '石油、煤炭及其他燃料加工业',
  9. '化学原料和化学制品制造业', '医药制造业', '化学纤维制造业', '橡胶和塑料制品业',
  10. '非金属矿物制品业', '黑色金属冶炼和压延加工业', '有色金属冶炼和压延加工业',
  11. '金属制品业', '通用设备制造业', '专用设备制造业', '汽车制造业',
  12. '铁路、船舶、航空航天和其他运输设备制造业', '电气机械和器材制造业',
  13. '计算机、通信和其他电子设备制造业', '仪器仪表制造业', '其他制造业',
  14. '废弃资源综合利用业', '金属制品、机械和设备修理业']
  15. # 城市类
  16. class City:
  17. def __init__(self, city_name, result_df, count):
  18. self.city_name = city_name
  19. self.result_df = result_df
  20. self.result_df.index.name = city_name # 给这个城市的df标注城市名称
  21. self.count = count # 12-23年该城市的制造业企业总数
  22. # 判断时候为制造业,返回值为布尔值
  23. def is_manufacture_industry(value):
  24. # 31个制造子行业
  25. manufacturing_industries = ['农副食品加工业', '食品制造业', '酒、饮料和精制茶制造业', '烟草制品业', '纺织业',
  26. '纺织服装、服饰业', '皮革、毛皮、羽毛及其制品和制鞋业', '木材加工和木、竹、藤、棕、草制品业',
  27. '家具制造业', '造纸和纸制品业', '印刷和记录媒介复制业',
  28. '文教、工美、体育和娱乐用品制造业', '石油、煤炭及其他燃料加工业',
  29. '化学原料和化学制品制造业', '医药制造业', '化学纤维制造业', '橡胶和塑料制品业',
  30. '非金属矿物制品业', '黑色金属冶炼和压延加工业', '有色金属冶炼和压延加工业',
  31. '金属制品业', '通用设备制造业', '专用设备制造业', '汽车制造业',
  32. '铁路、船舶、航空航天和其他运输设备制造业', '电气机械和器材制造业',
  33. '计算机、通信和其他电子设备制造业', '仪器仪表制造业', '其他制造业',
  34. '废弃资源综合利用业', '金属制品、机械和设备修理业']
  35. return value in manufacturing_industries
  36. # 对每一个excel进行处理
  37. def data_analyse(file_path):
  38. print(file_path)
  39. cols = pd.read_excel(file_path, engine='openpyxl', nrows=1).columns.tolist()
  40. # 存在二级行业分类的excel
  41. if '二级行业分类' in cols:
  42. df = pd.read_excel(file_path, engine='openpyxl', usecols=['成立日期', '二级行业分类']) # 只取两列
  43. # 只要12-23年和31个制造子行业的数据
  44. filtered_df = df[
  45. df['成立日期'].str[:4].fillna('0').astype(int).between(2012, 2023) & df['二级行业分类'].apply(
  46. is_manufacture_industry)]
  47. # 计算企业总数量
  48. count = len(filtered_df)
  49. # 建立行业为col和年份为index且初始元素都为0的dataframe
  50. index_years = list(range(2012, 2024))
  51. columns_names = manufacturing_industries
  52. result_df = pd.DataFrame(0, index=index_years, columns=columns_names)
  53. # 遍历符合要求的每行数据并用计数df进行计数
  54. for index, row in filtered_df.iterrows():
  55. year = int(row['成立日期'][:4])
  56. sub_industry = row['二级行业分类']
  57. result_df.at[year, sub_industry] += 1
  58. # 对result_df的每一行求和,计算当前城市的年度制造业总量
  59. result_df['年度制造业总量'] = result_df.sum(axis=1)
  60. return result_df, count
  61. # 没有二级行业分类只有所属行业的excel
  62. else:
  63. df = pd.read_excel(file_path, engine='openpyxl', usecols=['成立日期', '所属行业']) # 只取两列
  64. # 只要12-23年和31个制造子行业的数据
  65. filtered_df = df[
  66. df['成立日期'].str[:4].fillna('0').astype(int).between(2012, 2023) & df['所属行业'].apply(
  67. is_manufacture_industry)]
  68. # 计算企业总数量
  69. count = len(filtered_df)
  70. # 建立行业为col和年份为index且初始元素都为0的dataframe
  71. index_years = list(range(2012, 2024))
  72. columns_names = manufacturing_industries
  73. result_df = pd.DataFrame(0, index=index_years, columns=columns_names)
  74. # 遍历符合要求的每行数据并用计数df进行计数
  75. for index, row in filtered_df.iterrows():
  76. year = int(row['成立日期'][:4])
  77. sub_industry = row['所属行业']
  78. result_df.at[year, sub_industry] += 1
  79. # 对result_df的每一行求和,计算当前城市的年度制造业总量
  80. result_df['年度制造业总量'] = result_df.sum(axis=1)
  81. return result_df, count
  82. # 城市名称检查,返回值为该次的城市名,将作为下一次检查的lastcity
  83. def check_cities(file):
  84. global city_flag # 声明全局变量
  85. global last_city
  86. # 提取城市名
  87. simplified_name = ''.join(re.findall(r'[\u4e00-\u9fa5]', file))
  88. if last_city != simplified_name:
  89. city_flag = False
  90. last_city = simplified_name
  91. else:
  92. city_flag = True
  93. return last_city
  94. # 相同城市,对应元素合并
  95. def city_merge(result_df, count):
  96. global city_list
  97. city_list[-1].result_df = city_list[-1].result_df + result_df # 最新加入的城市的dataframe和其后续的excel表的df对应元素相加
  98. city_list[-1].count = city_list[-1].count + count
  99. # 存储到excel中
  100. def save_as_excel():
  101. with pd.ExcelWriter('result.xlsx', engine='openpyxl') as writer:
  102. startrow = 0
  103. for idx, city in enumerate(city_list):
  104. # 将city_name写入到当前的startrow位置
  105. ws = writer.sheets['Sheet1'] if 'Sheet1' in writer.sheets else None
  106. if not ws:
  107. ws = writer.book.create_sheet('Sheet1')
  108. ws.cell(row=startrow + 1, column=1, value=city.city_name)
  109. # 对每个城市的12-23年11年制造业总量求和
  110. # ws.cell(row=startrow + 1, column=2, value='2012-2023年制造业总量')
  111. # ws.cell(row=startrow + 1, column=3, value=city.count)
  112. # 如果是第一个DataFrame,写入header
  113. if idx == 0:
  114. header = True
  115. else:
  116. header = False
  117. # 更新startrow为city_name之后的行,并将DataFrame写入到这个位置
  118. startrow += 1
  119. city.result_df.to_excel(writer, sheet_name='Sheet1', startrow=startrow, index=True, header=header)
  120. # 更新下一个dataframe的起始行,包括DataFrame本身的长度和一个额外的行为下一个city_name预留的位置
  121. startrow += city.result_df.shape[0] + 1
  122. if __name__ == '__main__':
  123. # 用以区别是否为一同城市的不同数据表
  124. city_flag = False # 默认是不同的城市
  125. last_city = ''
  126. city_list = [] # 所有城市对象的容器
  127. directory_path = r'G:\中国工商注册企业全量信息(2023.9更新)\全国数据'
  128. all_files = os.listdir(directory_path)
  129. # 开始循环处理目录中的文件
  130. for file in all_files:
  131. # 检查城市是否已经有存在的数据了,并调整city_flag
  132. last_city = check_cities(file)
  133. # 文件绝对路径
  134. file_path = os.path.join(directory_path, file)
  135. # 数量矩阵
  136. result_df, count = data_analyse(file_path)
  137. if city_flag: # 不同的excel,相同的城市,进行df对应元素相加
  138. city_merge(result_df, count)
  139. else:
  140. # 新的城市,创建对象,并加入城市列表
  141. temp_city = City(last_city, result_df, count)
  142. city_list.append(temp_city)
  143. # 进行excel结果输出
  144. save_as_excel()
  145. print('done!')
  146. # 进行 时间优化,然后开始运行

改进思路:

对于excel的统计尤其是数据量比较大的excel,实际上是CPU密集型的;本程序单核运行,最后完成这45个g的数据的执行时间是20小时左右;CPU密集型的操作适合使用多进程编程来充分利用计算机的性能

用一个简单的多进程例子展示使用python的multiprocessing库来完成多进程编程

在进程函数中,由于使用了共享数据结构,所以在将结果添加到共享数据结构时,要使用.get_lock()方法加锁,来保证当某一个进程向共享数据结构中添加元素时,其他进程不能同时对其进行操作

  1. import multiprocessing
  2. # 步骤 2:定义一个函数,接受参数并执行任务
  3. def process_function(arg1, shared_list):
  4. # 在这里执行任务,可以使用参数 arg1
  5. result = arg1 * 2
  6. # 将结果添加到共享列表
  7. with shared_list.get_lock():
  8. shared_list.append(result)
  9. if __name__ == "__main__":
  10. # 步骤 1:导入 multiprocessing 模块
  11. # 步骤 3:创建一个共享列表
  12. manager = multiprocessing.Manager()
  13. shared_list = manager.list()
  14. # 步骤 4:创建一个进程池
  15. num_processes = multiprocessing.cpu_count()
  16. pool = multiprocessing.Pool(processes=num_processes)
  17. # 步骤 5:使用进程池提交任务,传递参数和共享列表
  18. arg1_values = [1, 2, 3, 4, 5]
  19. # 使用 partial 函数创建一个包装函数,传递共享列表作为参数
  20. from functools import partial
  21. process_function_with_shared_list = partial(process_function, shared_list=shared_list)
  22. # 使用 map 方法执行任务
  23. pool.map(process_function_with_shared_list, arg1_values)
  24. # 步骤 6:等待所有任务完成,关闭进程池
  25. pool.close()
  26. pool.join()
  27. # 打印共享列表的内容
  28. print("Shared List:", shared_list)

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号