当前位置:   article > 正文

Python 多进程+分组数据流式加载,实现数据的低内存多进程处理_python joblib多进程

python joblib多进程

Python使用多进程时,内存占用会成倍增加。为了降低内存的消耗,采用在硬盘上对中间表持久化在读入的方式降低内存的占用,实现数据处理时的低内存占用多进程加速。同时也可以绕开一些不可pickle的参数,实现多进程
本文实现了如下业务场景:
文件A中数据用来进行模型训练、文件B中数据用来进行预测和查询。其中group_col是一个两张表共有的分类字段,如“省市”或者“颜色”等。B表中group_col仅仅与A表中同组数据有关系,例如A表是苹果的味道,group_col是颜色,那么B表中的苹果红色的苹果只和A表中红色的苹果进行关联分析。
方法:
将两个表按共有字段group_col分组,分组后的每组数据作为中间表持久化为pkl文件。多进程读取pkl文件,然后进行计算。这样做的好处有2个:
一是降低内存占用,数据较大时避免了内存溢出导致程序崩溃。
二是避免了中间存在不可pickle对象导致无法使用多进程。例如上面例子中通过A数据训练了一个模型,该模型是不可pickle对象,这时不能使用多进程。但是通过将文件分割,每个分割的文件分别在多进程内部训练模型,就可以避免传递不可pickle的模型。模型和数据的大小相关时,效果最佳;因为模型和数据的大小相关时分割数据后,单个进程训练模型的时长会降低,训练模型的总耗时不会比一次性读取数据进行训练的耗时增加多少;如果模型和数据的大小无关那么多个进程会大大增加模型的训练时长。

一、文件分割

通过建立新文件夹,将原文件通过分组分割存储为读写非常快的pkl文件,实现中间表的持久化。

1、建立一个新文件夹,文件夹为在原文件存放目录下增加一个以某个文件名,命名的文件
例如path=r"D://a.xlxs" 则创建了一个文件夹path=r"D://a"

def new_dir(file_path):
    #获取文件夹名称和文件名(fileame带后缀)
    dir_path, file_name = os.path.split(file_path)
    #去掉文件名后缀(filename不带后缀)
    filename=os.path.splitext(file_name)[0]
    #生成新的文件路径
    new_path=os.path.join(dir_path, filename )
    #建立新文件夹
    os.makedirs(new_path, exist_ok=True)
    return new_path
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

2、将文件分组后,存为pkl,放于path文件夹

def group_pickle(df, group_col, path):
    lst=[]
    for k, v in df.groupby(group_col):
        name=f"{group_col}_{k}.pkl"
        file_name = os.path.join(path,name)
        v.to_pickle(file_name)
        lst.append(name)        
    return lst
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

二、实现多进程计算

1、建立一个元祖,元祖的两个元素分别是路径名+文件名

lst_dir=[(os.path.join(dir_s1,i),os.path.join(dir_s2,i)) for i in lst_pkl]
  • 1

2、通过joblib库进行多进程计算

from joblib import Parallel, delayed

def func(x, y):
    return x+y
    
results = Parallel(n_jobs=-1)(delayed(func)(*c) for c in C)
print(results)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

三、整体代码

import os
import pandas as pd
from pandasrw import load,dump
from joblib import Parallel, delayed

#建立一个新文件夹,文件夹为在原文件存放目录下增加一个以某个文件名,命名的文件
def new_dir(file_path):
    #获取文件夹名称和文件名(fileame带后缀)
    dir_path, file_name = os.path.split(file_path)
    #去掉文件名后缀(filename不带后缀)
    filename=os.path.splitext(file_name)[0]
    #生成新的文件路径
    new_path=os.path.join(dir_path, filename )
    #建立新文件夹
    os.makedirs(new_path, exist_ok=True)
    return new_path

#将文件分组后,存为pkl,放于path文件夹
def group_pickle(df, group_col, path):
    lst=[]
    for k, v in df.groupby(group_col):
        name=f"{group_col}_{k}.pkl"
        file_name = os.path.join(path,name)
        v.to_pickle(file_name)
        lst.append(name)
    return lst

if __name__ == '__main__':


    #读取数据和生成新文件夹
    path_s1=r"xx"
    df1 = load(path_s1)
    dir_s1 = new_dir(path_s1)

    path_s2 = r"xx"
    dir_s2 = new_dir(path_s2)
    df2 = load(path_s1)

    #数据分组,并返回文件名(不包括路径)列表。
    #在本程序中df1和df2文件名列表分割后是一致的都是分组字段的值。
    group_col="xx"
    lst_pkl = group_pickle(df1, group_col, dir_s2)
    lst_pkl = group_pickle(df2, group_col, dir_s2)

    #在文件名上添加路径新,生成可读取列表,用于进行多进行
    lst_dir = [(os.path.join(dir_s1, i), os.path.join(dir_s2, i)) for i in lst_pkl]

    #多进程进行处理
    def func(x, y):
        return x + y

    results = Parallel(n_jobs=-1)(delayed(func)(*c) for c in C)
    print(results)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/927147
推荐阅读
相关标签
  

闽ICP备14008679号