当前位置:   article > 正文

pandas resample时间序列数据处理(时间序列归一化,对齐,映射,空值填充)_pandas resample补全指定范围时间序列

pandas resample补全指定范围时间序列

pandas时间序列数据处理(时间序列归一化,对齐,映射,空值填充)

实现功能
  1. 将下面的csv数据对应到每间隔五分钟的时间序列上保证数据完整
  2. 详细描述: 时间区间数据对应 主要是将把每五分钟区间内的数据对应到五分钟倍数的区间上:example 将0,1 ,2分钟数据对应到00:00, 将3, 4, 5到10分钟之间的数据,将5, 6 ,7分钟数据对应到第五分钟,将7, 8 ,9, 10, 11 12 分钟数据对应到第10分钟, 13, 14, 15, 16, 17对应到第15分钟 依此类推
原始数据

在这里插入图片描述
将ct列对应成每五分钟一条数据的对齐

处理后数据

在这里插入图片描述

方法一 自定义时间判断函数

import pandas as pd
from datetime import datetime


def fill_data(df):
    # 均值填充pv
    df['pv'].fillna(value=df.mean(), inplace=True)
    # 前向填充 后向填充  _id  pid (均值填充id会出问题)
    df.fillna(method='bfill', inplace=True)
    df.fillna(method='ffill', inplace=True)
    return df


def merge_type_transform(df, tran_list):
    # 处理merge后的类型变化问题 float >>>> str
    df[tran_list] = df[tran_list].astype('str')
    for i in tran_list:
        df[i] = df[i].apply(lambda x: x.replace('.0', '').strip())
    #  一些价格数据或者短数字转字符串可用以下数据
    # 方法一
    # df['price'] = df['price'].map(lambda x:str(x))))
    # 方法二
    # df['price'] = df['price'].astype('str'))
    return df


def time_seq_map(df, ymd_time=None):
    """
    时间对齐  数据填充
    """
    # 手动传入时间
    time = pd.date_range(ymd_time + ' 00:00:00', ymd_time + ' 23:59:59', freq="5min")
    # 自动提取时间
    # extract_date_df = df.loc[:1, 'ct'].apply(lambda x: x.strftime('%Y-%m-%d'))
    # time = pd.date_range(extract_date_df[0] + ' 00:00:00', extract_date_df[0] + ' 23:59:59', freq="5min")
    # 生成一个标准每五分钟一行的时间序列dataframe
    df1 = pd.DataFrame({'ct1': time})
    # 与原始数据合并
    df = df.merge(df1, left_on='ct', right_on='ct1', how='right', sort='ct1')
    del df['ct']
    df.rename(columns={'ct1': 'ct'}, inplace=True)
    # 数据填充
    seq_map_df = fill_data(df)
    # 数据类型转化
    fina_df = merge_type_transform(seq_map_df, tran_list=['pid'])
    return fina_df


def _time_standardize_strategy(x):
    """
    自定义的一个时间函数:
    时间区间数据对应 主要是将把每五分钟区间内的数据对应到五分钟倍数的区间上:example 将0,1 ,2分钟数据对应到00:00, 将3, 4, 5到10分钟之间的数据,将5, 6 ,7分钟数据对应到第五分钟,将7, 8 ,9, 10, 11 12 分钟数据对应到第10分钟, 13, 14, 15, 16, 17对应到第15分钟 依此类推
    """
    hour = x.hour
    minute = x.minute
    second_ret = 0
    if minute % 5 >= 3:
        minute_new = (minute // 5 + 1) * 5
    else:
        minute_new = (minute // 5) * 5

    if minute_new == 60:
        minute_ret = 0
        hour_ret = hour + 1
        if hour_ret == 24:
            # 只是针对当天(最大时间23:55:00) example: 2021-07-26 23:59:59 ==>> 2021-07-26 23:55:00
            hour_ret = 23
            minute_ret = 55
            day_ret = x.day
            # day是连续性天数  example: 2021-07-26 23:59:59 ==>> 2021-07-27 00:00:00
            # hour_ret = 0
            # day_ret = x.day + 1
        else:
            day_ret = x.day
    else:
        minute_ret = minute_new
        hour_ret = hour
        day_ret = x.day
    return datetime(x.year, x.month, day_ret, int(hour_ret), int(minute_ret), int(second_ret))



def detail_time_series(df, null_thre=0.9):
    """
    时间归一化主函数
    """
    # 先对数据做时间序列去重
    df = df.copy()
    df = df.drop_duplicates(subset=['ct'])
    # 将ct变为datetime
    df.loc[:, 'ct'] = pd.to_datetime(df['ct'])

    df['ct'] = df['ct'].apply(_time_standardize_strategy)
    # 时间序列字段去重
    df = df.drop_duplicates(subset=['ct'])
    print(df)
    # 将索引类型改成 DatetimeIndex  将ct设置程索引  采样统计需要时间序列作为索引
    df.index = pd.DatetimeIndex(df.ct, name='index')
    # 空值统计 设定阈值
    df1 = df.resample('24H').count()
    # 将空值少于百分之10的时间list提取出来
    index_list = df1.index[df1['ct'] > int(len(df1['ct']) * null_thre)]
    df_list = []
    for time_str in index_list:
        ymd_time = time_str.strftime('%Y-%m-%d')
        day_data = df[ymd_time]
        stand_time_dfs = day_data.reset_index(drop=True)
        # 时间序列对其 空值用均值填充
        final_df = time_seq_map(stand_time_dfs, ymd_time)
        df_list.append(final_df)
    return df_list

# 读取原始数据
df = pd.read_csv('./bbb.csv')

df_list = detail_time_series(df)
# 将多个id对应好的数据合并成一个DataFrame
df = pd.concat(df_list, ignore_index=True)
print('最终数据', df)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119

方法二 pandas resample

import pandas as pd

def fill_data(df):
    # 均值填充pv
    df['pv'].fillna(value=df.mean(), inplace=True)
    # 前向填充 后向填充  _id  pid (均值填充id会出问题)
    df.fillna(method='bfill', inplace=True)
    df.fillna(method='ffill', inplace=True)
    return df


def merge_type_transform(df, tran_list):
    # 处理merge后的类型变化问题 float >>>> str
    df[tran_list] = df[tran_list].astype('str')
    for i in tran_list:
        df[i] = df[i].apply(lambda x: x.replace('.0', '').strip())
    #  一些价格数据或者短数字转字符串可用以下数据
    # 方法一
    # df['price'] = df['price'].map(lambda x:str(x))))
    # 方法二
    # df['price'] = df['price'].astype('str'))
    return df


def time_seq_map(df, ymd_time=None):
    """
    时间对齐  数据填充
    """
    # 手动传入时间
    time = pd.date_range(ymd_time + ' 00:00:00', ymd_time + ' 23:59:59', freq="5min")
    # 自动提取时间
    # extract_date_df = df.loc[:1, 'ct'].apply(lambda x: x.strftime('%Y-%m-%d'))
    # time = pd.date_range(extract_date_df[0] + ' 00:00:00', extract_date_df[0] + ' 23:59:59', freq="5min")
    # 生成一个标准每五分钟一行的时间序列dataframe
    df1 = pd.DataFrame({'ct1': time})
    # 与原始数据合并
    df = df.merge(df1, left_on='ct', right_on='ct1', how='right', sort='ct1')
    del df['ct']
    df.rename(columns={'ct1': 'ct'}, inplace=True)
    seq_map_df = fill_data(df)
    fina_df = merge_type_transform(seq_map_df, tran_list=['pid'])
    return fina_df.drop_duplicates(subset=['ct'])
    # return fina_df


def detail_time_series(df, null_thre):
    """
    时间归一化
    """
    # 先对数据做时间序列去重
    df = df.copy()
    df = df.drop_duplicates(subset=['ct'])
    # 将ct变为datetime
    df.loc[:, 'ct'] = pd.to_datetime(df['ct'])
    # 将索引类型改成 DatetimeIndex
    df.index = pd.DatetimeIndex(df.ct)
    # 做第一遍判断
    df1 = df.resample('24H').count()
    index_list = df1.index[df1['pid'] > int(len(df1['pid']) * null_thre)]
    time_list = [x.strftime('%Y-%m-%d') for x in index_list]
    print(time_list)

    df_list = []
    for time_str in index_list:
        ymd_time = time_str.strftime('%Y-%m-%d')
        day_data = df[ymd_time]
        # 时间归一化 间隔5分钟
        stand_time_df = day_data.resample(rule='5T').ffill()
        del stand_time_df['ct']
        # 将时间序列索引变成列 在重置索引
        stand_time_dfs = stand_time_df.rename_axis('ct').reset_index()
        # 时间序列对其 空值用均值填充
        final_df = time_seq_map(stand_time_dfs, ymd_time)
        df_list.append(final_df)
    return df_list


df = pd.read_csv('./bbb.csv')

df_list = detail_time_series(df, 0.9)  # 空值少于10%均值填充
df = pd.concat(df_list, ignore_index=True)
print(df)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/2023面试高手/article/detail/230243
推荐阅读
相关标签
  

闽ICP备14008679号