当前位置:   article > 正文

用Gradio实现PaddleTS时序预测全流程可视化:以风电预测任务为例_paddle load_from_dataframe

paddle load_from_dataframe

★★★ 本文源自AlStudio社区精品项目,【点击此处】查看更多精品内容 >>>

0 项目背景

很多了解飞桨的同学们都知道,在庞大的飞桨工具库套件家族中,有一款知名的“零代码”全流程开发工具——PaddleX的图形化开发客户端。

但有些遗憾的是,PaddleX的全流程开发长期以来只提供了面向CV领域即图像分类、目标检测、图像分割的解决方案。

而飞桨其它方向的工具库,一直以来在“零代码”开发客户端方面还是有所欠缺的。随着Streamlit和Gradio等机器学习可视化web框架的引入,以及各类飞桨工具库的调用API愈加清晰、整齐,现在,给这些工具库插上“零代码”的翅膀,让更多初学者参与到深度学习应用开发中来,已不再是特别困难的事情了。

另一方面,本次“中国软件杯”大学生软件设计大赛——龙源风电赛道的区域赛阶段,也要求选手实现模型算法的落地。基于Gradio快速实现轻量级部署,也是选手们在设计时可以考虑的一种方向。

因此,本文就基于AI Studio提供的Gradio应用部署解决方案,以“中国软件杯”大学生软件设计大赛——龙源风电赛道的场景为例,介绍如何实现一个面向通用场景的PaddleTS全流程零代码开发案例。

参考资料:

1 环境准备

!pip install paddlets
  • 1

2 功能设计

任何一个软件开发工具的实现,都离不开前期对整体结构的设计。根据一贯的从易到难迭代原则,本项目首先聚焦实现PaddleTS时间序列预测任务的全流程“零代码”开发。

注:红色字体为后续迭代内容。

下面,就逐一介绍一下工具箱重点实现的内容。

2.1 数据集参数设置

由于风电预测这个任务,数据集完全来自真实产业场景,前期数据预处理的任务非常关键,如:

  • 时间戳重复
  • 日期格式特殊(日-月-年)
  • 部分数据字段为整数(与PaddleTS对协变量格式要求冲突)
  • 预测时间不连续(根据前日5:00前数据预测后一天风机实际功率)

因此,我们结合该项目设计的数据集参数选项,将会非常有代表性,基本覆盖时间序列任务可能要面临的各种情况。

在数据集参数设置中,最需要关注的是多列变量如何传参与后台处理——如:有多个协变量、多个预测值。

本项目采用的是手动设置分隔符,再在后台处理的方式。用户只需在不同字段名间输入空格,后台就通过split()函数将字符串切成字典,从而让传入数据符合PaddleTS处理的格式要求。

2.2 训练参数设置

时间序列预测任务的训练参数设置也是比较固定的,因为PaddleTS模型在这方面已经抽象得非常到位了。

以初始化MLPRegressor模型为例,只有两个必传的参数:

  1. in_chunk_len: 输入时序窗口的大小,代表每次训练以及预测时候输入到模型中的前序的时间步长。

  2. out_chunk_len: 输出时序窗口的大小,代表每次训练以及预测时候输出的后续的时间步长。

大部分时间序列预测模型的传参非常一致。

为简化页面复杂度,本项目只提取了其中最常用、最经常需要调整的batch_sizelrepochs进行界面传参,其余使用默认设置。

2.3 模型训练和加载

众所周知,深度学习模型训练都需要比较长的时间,尽管时间序列预测任务和CV、大模型类训练时长相比,已经算是“极速”了,但是在Gradio前端应用使用时,由于训练过程我们暂时无法实现可视化,一直等待页面加载模型训练结果会非常枯燥。而且,模型加载后的预测时间,相对于模型训练时间又非常地“微不足道”,要使得界面展示更加简洁美观,还需要进一步优化设计。

因此,本项目做了两个处理。

  • gr.Accordion()折叠了未训练时的可视化结果展示页签
  • gr.Tab()把模型训练和模型加载完全分开

3 开发过程

尽管部署Gradio应用只需启动一个*.gradio.py的脚本文件,但是本项目涉及到的输入输出和函数处理特别多,因此,把fn函数独立出来开发,会使得gradio脚本更加简洁。

另外,既然是面对通用时间序列预测任务的工具箱,在设计函数处理时,就应该尽可能避免单个场景数据的影响。时序训练的关键函数代码如下:

def get_ts_df(upfile, time_colunm, dayfirst, observed_colunms, trans_colunms,
              pred_colunms, time_freq, train_ratio, test_ratio, in_chunk_len,
              out_chunk_len, skip_chunk_len, choose_model, max_epochs,
              learning_rate, batch_size):

    df = pd.read_csv(upfile.name,
                     parse_dates=[time_colunm],
                     infer_datetime_format=True,
                     dayfirst=dayfirst)
    df = df.drop_duplicates(subset=[time_colunm], keep='first')
    fig = altair.Chart(df[:1000]).mark_line().encode(x=time_colunm,
                                                    y=pred_colunms.split()[-1]).configure_point(
    size=1200
)
    df[trans_colunms.split()] = df[trans_colunms.split()].fillna(
        0).astype(np.float64())
    target_cov_dataset = TSDataset.load_from_dataframe(
        df,
        time_col=time_colunm,
        target_cols=pred_colunms.split(),
        observed_cov_cols=observed_colunms.split(),
        freq=time_freq,
        fill_missing_dates=True,
        fillna_method='pre')
    train_dataset, val_test_dataset = target_cov_dataset.split(train_ratio)
    val_dataset, test_dataset = val_test_dataset.split(test_ratio)
    scaler = StandardScaler()
    scaler.fit(train_dataset)
    ts_train_scaled = scaler.transform(train_dataset)
    ts_val_scaled = scaler.transform(val_dataset)
    ts_test_scaled = scaler.transform(test_dataset)
    if choose_model == 'MLP':
        model = MLPRegressor(
            in_chunk_len=int(in_chunk_len),
            out_chunk_len=int(out_chunk_len),
            skip_chunk_len=int(skip_chunk_len),
            max_epochs=int(max_epochs),
            batch_size=int(batch_size),
            optimizer_params=dict(learning_rate=learning_rate),
        )
    elif choose_model == 'LSTM':
        model = LSTNetRegressor(
            in_chunk_len=int(in_chunk_len),
            out_chunk_len=int(out_chunk_len),
            skip_chunk_len=int(skip_chunk_len),
            max_epochs=int(max_epochs),
            batch_size=int(batch_size),
            optimizer_params=dict(learning_rate=learning_rate),
        )
    elif choose_model == 'RNN':
        model = RNNBlockRegressor(
            in_chunk_len=int(in_chunk_len),
            out_chunk_len=int(out_chunk_len),
            skip_chunk_len=int(skip_chunk_len),
            max_epochs=int(max_epochs),
            batch_size=int(batch_size),
            optimizer_params=dict(learning_rate=learning_rate),
        )
    elif choose_model == 'TCN':
        model = TCNRegressor(
            in_chunk_len=int(in_chunk_len),
            out_chunk_len=int(out_chunk_len),
            skip_chunk_len=int(skip_chunk_len),
            max_epochs=int(max_epochs),
            batch_size=int(batch_size),
            optimizer_params=dict(learning_rate=learning_rate),
        )
    elif choose_model == 'Transformer':
        model = TransformerModel(
            in_chunk_len=int(in_chunk_len),
            out_chunk_len=int(out_chunk_len),
            skip_chunk_len=int(skip_chunk_len),
            max_epochs=int(max_epochs),
            batch_size=int(batch_size),
            optimizer_params=dict(learning_rate=learning_rate),
        )
    model.fit(ts_train_scaled, ts_val_scaled)
    if os.path.exists("model"):
        os.remove("model")
        os.remove("model_model_meta")
        os.remove("model_network_statedict")
    model.save("model")
    subset_test_pred_dataset = model.predict(ts_val_scaled)
    subset_test_dataset, _ = ts_test_scaled.split(len(subset_test_pred_dataset.target))
    mae = MAE()
    mae(subset_test_dataset, subset_test_pred_dataset)
    test_dataset = scaler.inverse_transform(ts_test_scaled)
    fig2 = altair.Chart(
        model.predict(test_dataset).to_dataframe().reset_index().rename(
            columns={"index": time_colunm})).mark_line().encode(
                x=time_colunm, y=pred_colunms.split()[-1])
    result_df = model.predict(test_dataset).to_dataframe()
    result_df.reset_index().rename(
            columns={"index": time_colunm}).to_csv('res.csv')
    return df.head(), fig, fig2, mae(subset_test_dataset, subset_test_pred_dataset), result_df.reset_index().rename(
fig, fig2, mae(subset_test_dataset, subset_test_pred_dataset), result_df.reset_index().rename(
            columns={"index": time_colunm}).head()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97

完整代码实现读者可以查阅项目的gradio_fns.pybase.gradio.py文件。

4 注意事项

由于AI Studio上的Gradio做了一些网络限制,以及一些还需要完善的部署问题,实际开发时,还有以下注意事项:

  • AI Studio上Gradio报错机制较弱,因此调试代码时要逐一生成中间文件,以帮助定位出问题的代码行
  • 训练时间过长时(或页面等待时间超过一分钟),Notebook内部的Gradio调试脚本不会返回结果页面的刷新,需要综合监控GPU/CPU性能变化,以及是否生成模型文件辅助判断训练是否成功
  • fn函数处理输入输出任务较多时,建议读者先在Notebook中手动传参调试
  • Gradio组件回传的参数格式可能需要调整,如in_chunk_len通过gr.Number()获取后需要强制转换为int格式
  • 尽量不要调整skip_chunk_len参数,否则计算MAE时会出现错误

5 实现效果

读者可以到应用界面测试体验“零代码”时间序列预测效果。

6 小结

本项目是基于PaddleTS和Gradio基本实现了时间序列预测任务的全流程可视化开发,在后续迭代中,将主要进行以下几方面的优化:

  • 增加集成学习的适配
  • 针对各种报错,通过gr.Error()进行信息提示
  • 增加时序异常、时序分类等其它PaddleTS任务
  • 实现飞桨工具库的Gradio可视化开发

此文章为搬运
原项目链接

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/106329
推荐阅读
相关标签
  

闽ICP备14008679号