赞
踩
在数据量较大的业务场景中,spark在数据处理、传统机器学习训练、
深度学习相关业务,能取得较明显的效率提升。
本篇围绕spark大数据背景下的推理,介绍一些优雅的使用方式。
目前在10亿+数据量的推理场景中使用,需要用户自己实现批数据准备,基于RDD的方法完成模型推理输出。
业务使用中的问题:
spark加速深度学习推理,基本思路为:
广播模型参数,不仅能减少模型重复加载带来的流量和io,而且能加速推理前模型加载的速度。
driver广播模型参数:
# Load ResNet50 on driver node and broadcast its state.
model_state = models.resnet50(pretrained=True).state_dict()
bc_model_state = sc.broadcast(model_state)
worker读取模型参数:
def get_model_for_eval():
"""Gets the broadcasted model."""
model = models.resnet50(pretrained=True)
model.load_state_dict(bc_model_state.value)
model.eval()
return model
目前主流的深度学习框架,dataset的实现大多基于本地存储,在读取分布式存储的场景 需要用户自定义实现。
自定义实现有2个方法:
方法一迭代与使用的存储类型会保持同步,且每次使用前需要明确使用的分布式存储,虽然实现方法容易但是使用流程略显麻烦。
方法二不需要关心分布式存储类型,只要需要获取并解析spark dataframe列传入内容即可。
本文采用方法二实现dataset:
# 从二进制流中解析图片信息 def pil_loader(binary_file): # open path as file to avoid ResourceWarning (https://github.com/python-pillow/Pillow/issues/835) image_io = io.BytesIO(binary_file) img = Image.open(image_io) return img.convert('RGB') # Create a custom PyTorch dataset class. class ImageDataset(Dataset): def __init__(self, data, transform=None): self.data = data self.transform = transform def __len__(self): return len(self.data) def __getitem__(self, index): image = pil_loader(self.data[index]) if self.transform is not None: image = self.transform(image) return image
Pandas udf是基于RDD的一个低门槛高性能的实现方法,pandas udf能自定义处理逻辑,以列的方式操作datafrme内容。
这是社区目前推荐的自定义处理方式。
# Define the function for model inference. # PyArrow >= 1.0.0 must be installed; @pandas_udf(ArrayType(FloatType())) def predict_batch_udf(binaray_data: pd.Series) -> pd.Series: transform = transforms.Compose([ transforms.Resize(224), transforms.CenterCrop(224), transforms.ToTensor(), transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) ]) images = ImageDataset(binaray_data, transform=transform) loader = torch.utils.data.DataLoader(images, batch_size=500, num_workers=8) model = get_model_for_eval() model.to(device) all_predictions = [] with torch.no_grad(): for batch in loader: predictions = list(model(batch.to(device)).cpu().numpy()) for prediction in predictions: all_predictions.append(prediction) return pd.Series(all_predictions)
# 调用pandas udf
predictions_df = df. \
select(col("filename"), predict_batch_udf(col("data")).alias("prediction"))
更多代码细节:
https://github.com/Crazybean-lwb/deeplearning-pyspark/blob/master/examples/pytorch-inference.py
打通到模型仓mlflow功能:
# Create the PySpark UDF
import mlflow.pyfunc
pyfunc_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)
# 调用pandas udf
df = spark_df.withColumn("prediction", pyfunc_udf(struct([...])))
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。