赞
踩
处理目标:读取800多个excel中存储的各个城市一段时间的企业信息(每个城市都至少有一个excel的数据),统计每个城市2012-2023年每年各个二级制造业的企业数量
数据大小:800多个excel,共计45GB大小,单个excel大小在1MB-250MB之间
需求分析:由于需要二级制造业和年份两个维度,加上excel中的行和列,不难联想到pandas中的Dataframe;除此之外还需要考虑到大量数据下,普通性能的笔记本要如何简化处理流程,缩短程序的运行时间,字符串的处理和输入、处理、输出的细节;最后代码编写成功后需要先对单个excel进行测试,再对多个excel进行测试,最后加上一些输出信息(为了监控程序的运行进度),再对整体数据跑,最终得出结果
程序设计:为了缩短程序的运行时间,所以先将所有需要的数据在内存中处理好,最后一次性输出到excel中;按照信息专家的设计模式来说,年份和制造子行业的信息都是城市所拥有的信息,所以需要城市类;其次还需要一些筛选符合条件的数据的函数;以及处理单个excel的函数和持久化存储的函数
先看处理单个excel的函数,读入excel之前就要确定excel的列有两种情况,所以先用
pd.read_excel(file_path, engine='openpyxl', nrows=1).columns.tolist()
读取出列并存为列表,将分成两中处理方式,在将excel中的数据读入时,有个很有效提高效率的方式,就是只读取需要的列,read_excel函数的usercols参数就起到这个作用,只有它含有的列才会被读入存储为dataframe;filtered_df生成的方式是原df符合要求的数据才会被保留,.str[:4]指只取前4位,.fillna('0') 指的是空置添0 ,.astype(int)指的是转换成int型变量,.between(2012,2023) 当前列在这个范围中的数据才会被保留,&连接下一个条件,当条件被写成函数时,要用df[column].apply(function_name)来应用函数,数据会被作为参数传入
再按照行业分类和年份作为列和行创建元素全为0的Dataframe,对其进行初始化;对filtered_df逐行遍历按照其中的一列数据为行,一列数据为列的原则对应在result_df上计数,最后返回result_df,这就是处理单个excel的逻辑
- # 对每一个excel进行处理
- def data_analyse(file_path):
- print(file_path)
- cols = pd.read_excel(file_path, engine='openpyxl', nrows=1).columns.tolist()
- # 存在二级行业分类的excel
- if '二级行业分类' in cols:
- df = pd.read_excel(file_path, engine='openpyxl', usecols=['成立日期', '二级行业分类']) # 只取两列
- # 只要12-23年和31个制造子行业的数据
- filtered_df = df[
- df['成立日期'].str[:4].fillna('0').astype(int).between(2012, 2023) & df['二级行业分类'].apply(
- is_manufacture_industry)]
- # 计算企业总数量
- count = len(filtered_df)
- # 建立行业为col和年份为index且初始元素都为0的dataframe
- index_years = list(range(2012, 2024))
- columns_names = manufacturing_industries
- result_df = pd.DataFrame(0, index=index_years, columns=columns_names)
- # 遍历符合要求的每行数据并用计数df进行计数
- for index, row in filtered_df.iterrows():
- year = int(row['成立日期'][:4])
- sub_industry = row['二级行业分类']
- result_df.at[year, sub_industry] += 1
- # 对result_df的每一行求和,计算当前城市的年度制造业总量
- result_df['年度制造业总量'] = result_df.sum(axis=1)
- return result_df, count
- # 没有二级行业分类只有所属行业的excel
- else:
- df = pd.read_excel(file_path, engine='openpyxl', usecols=['成立日期', '所属行业']) # 只取两列
- # 只要12-23年和31个制造子行业的数据
- filtered_df = df[
- df['成立日期'].str[:4].fillna('0').astype(int).between(2012, 2023) & df['所属行业'].apply(
- is_manufacture_industry)]
- # 计算企业总数量
- count = len(filtered_df)
- # 建立行业为col和年份为index且初始元素都为0的dataframe
- index_years = list(range(2012, 2024))
- columns_names = manufacturing_industries
- result_df = pd.DataFrame(0, index=index_years, columns=columns_names)
- # 遍历符合要求的每行数据并用计数df进行计数
- for index, row in filtered_df.iterrows():
- year = int(row['成立日期'][:4])
- sub_industry = row['所属行业']
- result_df.at[year, sub_industry] += 1
- # 对result_df的每一行求和,计算当前城市的年度制造业总量
- result_df['年度制造业总量'] = result_df.sum(axis=1)
- return result_df, count
再看主程序
city_flag是一个开关变量,其作用是判断相邻的两个excel是否是同一个城市的数据,判断的函数是check_cities,其中对excel文件名进行判断,使用全局变量last_city记录上一个excel的数据所属城市,若城市相同则对两个excel的result_df的对应元素相加,使用city_merge函数实现;若不同则创建新的城市对象加入城市列表中,最后完成之后进行持久化存储
- if __name__ == '__main__':
- # 用以区别是否为一同城市的不同数据表
- city_flag = False # 默认是不同的城市
- last_city = ''
- city_list = [] # 所有城市对象的容器
- directory_path = r'G:\中国工商注册企业全量信息(2023.9更新)\全国数据'
- all_files = os.listdir(directory_path)
-
- # 开始循环处理目录中的文件
- for file in all_files:
- # 检查城市是否已经有存在的数据了,并调整city_flag
- last_city = check_cities(file)
- # 文件绝对路径
- file_path = os.path.join(directory_path, file)
- # 数量矩阵
- result_df, count = data_analyse(file_path)
- if city_flag: # 不同的excel,相同的城市,进行df对应元素相加
- city_merge(result_df, count)
- else:
- # 新的城市,创建对象,并加入城市列表
- temp_city = City(last_city, result_df, count)
- city_list.append(temp_city)
-
- # 进行excel结果输出
- save_as_excel()
- print('done!')
check_cities函数,对城市名称进行检查,并更新last_city;提取城市名时使用了正则表达式,表达式会匹配所有的汉字,返回值为更新后的last_city
- # 城市名称检查,返回值为该次的城市名,将作为下一次检查的lastcity
- def check_cities(file):
- global city_flag # 声明全局变量
- global last_city
- # 提取城市名
- simplified_name = ''.join(re.findall(r'[\u4e00-\u9fa5]', file))
- if last_city != simplified_name:
- city_flag = False
- last_city = simplified_name
- else:
- city_flag = True
- return last_city
对相同的城市数据进行合并,若有相同的城市,说明城市列表中最后一个元素与当前城市相同,所以对city_list的最后一个元素的result_df进行修改,加上当前的result_df,就完成了合并
- # 相同城市,对应元素合并
- def city_merge(result_df, count):
- global city_list
- city_list[-1].result_df = city_list[-1].result_df + result_df # 最新加入的城市的dataframe和其后续的excel表的df对应元素相加
- city_list[-1].count = city_list[-1].count + count
持久化存存储结果
最后的内存中的结果是一个city_list,主要是一些格式上的要求:dataframe的前一行要有city_name,且只有第一个dataframe需要列名(因为excel中只有第一行有列名);主要用到两种写法:1.ws.cell,直接对单元格进行写入 2.dataframe.to_excel函数可以整体的将dataframe写入excel
- # 存储到excel中
- def save_as_excel():
- with pd.ExcelWriter('result.xlsx', engine='openpyxl') as writer:
- startrow = 0
- for idx, city in enumerate(city_list):
-
- # 将city_name写入到当前的startrow位置
- ws = writer.sheets['Sheet1'] if 'Sheet1' in writer.sheets else None
- if not ws:
- ws = writer.book.create_sheet('Sheet1')
- ws.cell(row=startrow + 1, column=1, value=city.city_name)
-
- # 对每个城市的12-23年11年制造业总量求和
- # ws.cell(row=startrow + 1, column=2, value='2012-2023年制造业总量')
- # ws.cell(row=startrow + 1, column=3, value=city.count)
-
- # 如果是第一个DataFrame,写入header
- if idx == 0:
- header = True
- else:
- header = False
-
- # 更新startrow为city_name之后的行,并将DataFrame写入到这个位置
- startrow += 1
- city.result_df.to_excel(writer, sheet_name='Sheet1', startrow=startrow, index=True, header=header)
-
- # 更新下一个dataframe的起始行,包括DataFrame本身的长度和一个额外的行为下一个city_name预留的位置
- startrow += city.result_df.shape[0] + 1
所有源码:
- import os
- import re
- import pandas as pd
-
- # 制造业二级分类
- manufacturing_industries = ['农副食品加工业', '食品制造业', '酒、饮料和精制茶制造业', '烟草制品业', '纺织业',
- '纺织服装、服饰业', '皮革、毛皮、羽毛及其制品和制鞋业', '木材加工和木、竹、藤、棕、草制品业',
- '家具制造业', '造纸和纸制品业', '印刷和记录媒介复制业',
- '文教、工美、体育和娱乐用品制造业', '石油、煤炭及其他燃料加工业',
- '化学原料和化学制品制造业', '医药制造业', '化学纤维制造业', '橡胶和塑料制品业',
- '非金属矿物制品业', '黑色金属冶炼和压延加工业', '有色金属冶炼和压延加工业',
- '金属制品业', '通用设备制造业', '专用设备制造业', '汽车制造业',
- '铁路、船舶、航空航天和其他运输设备制造业', '电气机械和器材制造业',
- '计算机、通信和其他电子设备制造业', '仪器仪表制造业', '其他制造业',
- '废弃资源综合利用业', '金属制品、机械和设备修理业']
-
-
- # 城市类
- class City:
- def __init__(self, city_name, result_df, count):
- self.city_name = city_name
- self.result_df = result_df
- self.result_df.index.name = city_name # 给这个城市的df标注城市名称
- self.count = count # 12-23年该城市的制造业企业总数
-
-
- # 判断时候为制造业,返回值为布尔值
- def is_manufacture_industry(value):
- # 31个制造子行业
- manufacturing_industries = ['农副食品加工业', '食品制造业', '酒、饮料和精制茶制造业', '烟草制品业', '纺织业',
- '纺织服装、服饰业', '皮革、毛皮、羽毛及其制品和制鞋业', '木材加工和木、竹、藤、棕、草制品业',
- '家具制造业', '造纸和纸制品业', '印刷和记录媒介复制业',
- '文教、工美、体育和娱乐用品制造业', '石油、煤炭及其他燃料加工业',
- '化学原料和化学制品制造业', '医药制造业', '化学纤维制造业', '橡胶和塑料制品业',
- '非金属矿物制品业', '黑色金属冶炼和压延加工业', '有色金属冶炼和压延加工业',
- '金属制品业', '通用设备制造业', '专用设备制造业', '汽车制造业',
- '铁路、船舶、航空航天和其他运输设备制造业', '电气机械和器材制造业',
- '计算机、通信和其他电子设备制造业', '仪器仪表制造业', '其他制造业',
- '废弃资源综合利用业', '金属制品、机械和设备修理业']
- return value in manufacturing_industries
-
-
- # 对每一个excel进行处理
- def data_analyse(file_path):
- print(file_path)
- cols = pd.read_excel(file_path, engine='openpyxl', nrows=1).columns.tolist()
- # 存在二级行业分类的excel
- if '二级行业分类' in cols:
- df = pd.read_excel(file_path, engine='openpyxl', usecols=['成立日期', '二级行业分类']) # 只取两列
- # 只要12-23年和31个制造子行业的数据
- filtered_df = df[
- df['成立日期'].str[:4].fillna('0').astype(int).between(2012, 2023) & df['二级行业分类'].apply(
- is_manufacture_industry)]
- # 计算企业总数量
- count = len(filtered_df)
- # 建立行业为col和年份为index且初始元素都为0的dataframe
- index_years = list(range(2012, 2024))
- columns_names = manufacturing_industries
- result_df = pd.DataFrame(0, index=index_years, columns=columns_names)
- # 遍历符合要求的每行数据并用计数df进行计数
- for index, row in filtered_df.iterrows():
- year = int(row['成立日期'][:4])
- sub_industry = row['二级行业分类']
- result_df.at[year, sub_industry] += 1
- # 对result_df的每一行求和,计算当前城市的年度制造业总量
- result_df['年度制造业总量'] = result_df.sum(axis=1)
- return result_df, count
- # 没有二级行业分类只有所属行业的excel
- else:
- df = pd.read_excel(file_path, engine='openpyxl', usecols=['成立日期', '所属行业']) # 只取两列
- # 只要12-23年和31个制造子行业的数据
- filtered_df = df[
- df['成立日期'].str[:4].fillna('0').astype(int).between(2012, 2023) & df['所属行业'].apply(
- is_manufacture_industry)]
- # 计算企业总数量
- count = len(filtered_df)
- # 建立行业为col和年份为index且初始元素都为0的dataframe
- index_years = list(range(2012, 2024))
- columns_names = manufacturing_industries
- result_df = pd.DataFrame(0, index=index_years, columns=columns_names)
- # 遍历符合要求的每行数据并用计数df进行计数
- for index, row in filtered_df.iterrows():
- year = int(row['成立日期'][:4])
- sub_industry = row['所属行业']
- result_df.at[year, sub_industry] += 1
- # 对result_df的每一行求和,计算当前城市的年度制造业总量
- result_df['年度制造业总量'] = result_df.sum(axis=1)
- return result_df, count
-
-
- # 城市名称检查,返回值为该次的城市名,将作为下一次检查的lastcity
- def check_cities(file):
- global city_flag # 声明全局变量
- global last_city
- # 提取城市名
- simplified_name = ''.join(re.findall(r'[\u4e00-\u9fa5]', file))
- if last_city != simplified_name:
- city_flag = False
- last_city = simplified_name
- else:
- city_flag = True
- return last_city
-
-
- # 相同城市,对应元素合并
- def city_merge(result_df, count):
- global city_list
- city_list[-1].result_df = city_list[-1].result_df + result_df # 最新加入的城市的dataframe和其后续的excel表的df对应元素相加
- city_list[-1].count = city_list[-1].count + count
-
-
- # 存储到excel中
- def save_as_excel():
- with pd.ExcelWriter('result.xlsx', engine='openpyxl') as writer:
- startrow = 0
- for idx, city in enumerate(city_list):
-
- # 将city_name写入到当前的startrow位置
- ws = writer.sheets['Sheet1'] if 'Sheet1' in writer.sheets else None
- if not ws:
- ws = writer.book.create_sheet('Sheet1')
- ws.cell(row=startrow + 1, column=1, value=city.city_name)
-
- # 对每个城市的12-23年11年制造业总量求和
- # ws.cell(row=startrow + 1, column=2, value='2012-2023年制造业总量')
- # ws.cell(row=startrow + 1, column=3, value=city.count)
-
- # 如果是第一个DataFrame,写入header
- if idx == 0:
- header = True
- else:
- header = False
-
- # 更新startrow为city_name之后的行,并将DataFrame写入到这个位置
- startrow += 1
- city.result_df.to_excel(writer, sheet_name='Sheet1', startrow=startrow, index=True, header=header)
-
- # 更新下一个dataframe的起始行,包括DataFrame本身的长度和一个额外的行为下一个city_name预留的位置
- startrow += city.result_df.shape[0] + 1
-
-
- if __name__ == '__main__':
- # 用以区别是否为一同城市的不同数据表
- city_flag = False # 默认是不同的城市
- last_city = ''
- city_list = [] # 所有城市对象的容器
- directory_path = r'G:\中国工商注册企业全量信息(2023.9更新)\全国数据'
- all_files = os.listdir(directory_path)
-
- # 开始循环处理目录中的文件
- for file in all_files:
- # 检查城市是否已经有存在的数据了,并调整city_flag
- last_city = check_cities(file)
- # 文件绝对路径
- file_path = os.path.join(directory_path, file)
- # 数量矩阵
- result_df, count = data_analyse(file_path)
- if city_flag: # 不同的excel,相同的城市,进行df对应元素相加
- city_merge(result_df, count)
- else:
- # 新的城市,创建对象,并加入城市列表
- temp_city = City(last_city, result_df, count)
- city_list.append(temp_city)
-
- # 进行excel结果输出
- save_as_excel()
- print('done!')
- # 进行 时间优化,然后开始运行
改进思路:
对于excel的统计尤其是数据量比较大的excel,实际上是CPU密集型的;本程序单核运行,最后完成这45个g的数据的执行时间是20小时左右;CPU密集型的操作适合使用多进程编程来充分利用计算机的性能
用一个简单的多进程例子展示使用python的multiprocessing库来完成多进程编程
在进程函数中,由于使用了共享数据结构,所以在将结果添加到共享数据结构时,要使用.get_lock()方法加锁,来保证当某一个进程向共享数据结构中添加元素时,其他进程不能同时对其进行操作
- import multiprocessing
-
- # 步骤 2:定义一个函数,接受参数并执行任务
- def process_function(arg1, shared_list):
- # 在这里执行任务,可以使用参数 arg1
- result = arg1 * 2
-
- # 将结果添加到共享列表
- with shared_list.get_lock():
- shared_list.append(result)
-
- if __name__ == "__main__":
- # 步骤 1:导入 multiprocessing 模块
-
- # 步骤 3:创建一个共享列表
- manager = multiprocessing.Manager()
- shared_list = manager.list()
-
- # 步骤 4:创建一个进程池
- num_processes = multiprocessing.cpu_count()
- pool = multiprocessing.Pool(processes=num_processes)
-
- # 步骤 5:使用进程池提交任务,传递参数和共享列表
- arg1_values = [1, 2, 3, 4, 5]
-
- # 使用 partial 函数创建一个包装函数,传递共享列表作为参数
- from functools import partial
- process_function_with_shared_list = partial(process_function, shared_list=shared_list)
-
- # 使用 map 方法执行任务
- pool.map(process_function_with_shared_list, arg1_values)
-
- # 步骤 6:等待所有任务完成,关闭进程池
- pool.close()
- pool.join()
-
- # 打印共享列表的内容
- print("Shared List:", shared_list)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。