赞
踩
将ct列对应成每五分钟一条数据的对齐
import pandas as pd from datetime import datetime def fill_data(df): # 均值填充pv df['pv'].fillna(value=df.mean(), inplace=True) # 前向填充 后向填充 _id pid (均值填充id会出问题) df.fillna(method='bfill', inplace=True) df.fillna(method='ffill', inplace=True) return df def merge_type_transform(df, tran_list): # 处理merge后的类型变化问题 float >>>> str df[tran_list] = df[tran_list].astype('str') for i in tran_list: df[i] = df[i].apply(lambda x: x.replace('.0', '').strip()) # 一些价格数据或者短数字转字符串可用以下数据 # 方法一 # df['price'] = df['price'].map(lambda x:str(x)))) # 方法二 # df['price'] = df['price'].astype('str')) return df def time_seq_map(df, ymd_time=None): """ 时间对齐 数据填充 """ # 手动传入时间 time = pd.date_range(ymd_time + ' 00:00:00', ymd_time + ' 23:59:59', freq="5min") # 自动提取时间 # extract_date_df = df.loc[:1, 'ct'].apply(lambda x: x.strftime('%Y-%m-%d')) # time = pd.date_range(extract_date_df[0] + ' 00:00:00', extract_date_df[0] + ' 23:59:59', freq="5min") # 生成一个标准每五分钟一行的时间序列dataframe df1 = pd.DataFrame({'ct1': time}) # 与原始数据合并 df = df.merge(df1, left_on='ct', right_on='ct1', how='right', sort='ct1') del df['ct'] df.rename(columns={'ct1': 'ct'}, inplace=True) # 数据填充 seq_map_df = fill_data(df) # 数据类型转化 fina_df = merge_type_transform(seq_map_df, tran_list=['pid']) return fina_df def _time_standardize_strategy(x): """ 自定义的一个时间函数: 时间区间数据对应 主要是将把每五分钟区间内的数据对应到五分钟倍数的区间上:example 将0,1 ,2分钟数据对应到00:00, 将3, 4, 5到10分钟之间的数据,将5, 6 ,7分钟数据对应到第五分钟,将7, 8 ,9, 10, 11 12 分钟数据对应到第10分钟, 13, 14, 15, 16, 17对应到第15分钟 依此类推 """ hour = x.hour minute = x.minute second_ret = 0 if minute % 5 >= 3: minute_new = (minute // 5 + 1) * 5 else: minute_new = (minute // 5) * 5 if minute_new == 60: minute_ret = 0 hour_ret = hour + 1 if hour_ret == 24: # 只是针对当天(最大时间23:55:00) example: 2021-07-26 23:59:59 ==>> 2021-07-26 23:55:00 hour_ret = 23 minute_ret = 55 day_ret = x.day # day是连续性天数 example: 2021-07-26 23:59:59 ==>> 2021-07-27 00:00:00 # hour_ret = 0 # day_ret = x.day + 1 else: day_ret = x.day else: minute_ret = minute_new hour_ret = hour day_ret = x.day return datetime(x.year, x.month, day_ret, int(hour_ret), int(minute_ret), int(second_ret)) def detail_time_series(df, null_thre=0.9): """ 时间归一化主函数 """ # 先对数据做时间序列去重 df = df.copy() df = df.drop_duplicates(subset=['ct']) # 将ct变为datetime df.loc[:, 'ct'] = pd.to_datetime(df['ct']) df['ct'] = df['ct'].apply(_time_standardize_strategy) # 时间序列字段去重 df = df.drop_duplicates(subset=['ct']) print(df) # 将索引类型改成 DatetimeIndex 将ct设置程索引 采样统计需要时间序列作为索引 df.index = pd.DatetimeIndex(df.ct, name='index') # 空值统计 设定阈值 df1 = df.resample('24H').count() # 将空值少于百分之10的时间list提取出来 index_list = df1.index[df1['ct'] > int(len(df1['ct']) * null_thre)] df_list = [] for time_str in index_list: ymd_time = time_str.strftime('%Y-%m-%d') day_data = df[ymd_time] stand_time_dfs = day_data.reset_index(drop=True) # 时间序列对其 空值用均值填充 final_df = time_seq_map(stand_time_dfs, ymd_time) df_list.append(final_df) return df_list # 读取原始数据 df = pd.read_csv('./bbb.csv') df_list = detail_time_series(df) # 将多个id对应好的数据合并成一个DataFrame df = pd.concat(df_list, ignore_index=True) print('最终数据', df)
import pandas as pd def fill_data(df): # 均值填充pv df['pv'].fillna(value=df.mean(), inplace=True) # 前向填充 后向填充 _id pid (均值填充id会出问题) df.fillna(method='bfill', inplace=True) df.fillna(method='ffill', inplace=True) return df def merge_type_transform(df, tran_list): # 处理merge后的类型变化问题 float >>>> str df[tran_list] = df[tran_list].astype('str') for i in tran_list: df[i] = df[i].apply(lambda x: x.replace('.0', '').strip()) # 一些价格数据或者短数字转字符串可用以下数据 # 方法一 # df['price'] = df['price'].map(lambda x:str(x)))) # 方法二 # df['price'] = df['price'].astype('str')) return df def time_seq_map(df, ymd_time=None): """ 时间对齐 数据填充 """ # 手动传入时间 time = pd.date_range(ymd_time + ' 00:00:00', ymd_time + ' 23:59:59', freq="5min") # 自动提取时间 # extract_date_df = df.loc[:1, 'ct'].apply(lambda x: x.strftime('%Y-%m-%d')) # time = pd.date_range(extract_date_df[0] + ' 00:00:00', extract_date_df[0] + ' 23:59:59', freq="5min") # 生成一个标准每五分钟一行的时间序列dataframe df1 = pd.DataFrame({'ct1': time}) # 与原始数据合并 df = df.merge(df1, left_on='ct', right_on='ct1', how='right', sort='ct1') del df['ct'] df.rename(columns={'ct1': 'ct'}, inplace=True) seq_map_df = fill_data(df) fina_df = merge_type_transform(seq_map_df, tran_list=['pid']) return fina_df.drop_duplicates(subset=['ct']) # return fina_df def detail_time_series(df, null_thre): """ 时间归一化 """ # 先对数据做时间序列去重 df = df.copy() df = df.drop_duplicates(subset=['ct']) # 将ct变为datetime df.loc[:, 'ct'] = pd.to_datetime(df['ct']) # 将索引类型改成 DatetimeIndex df.index = pd.DatetimeIndex(df.ct) # 做第一遍判断 df1 = df.resample('24H').count() index_list = df1.index[df1['pid'] > int(len(df1['pid']) * null_thre)] time_list = [x.strftime('%Y-%m-%d') for x in index_list] print(time_list) df_list = [] for time_str in index_list: ymd_time = time_str.strftime('%Y-%m-%d') day_data = df[ymd_time] # 时间归一化 间隔5分钟 stand_time_df = day_data.resample(rule='5T').ffill() del stand_time_df['ct'] # 将时间序列索引变成列 在重置索引 stand_time_dfs = stand_time_df.rename_axis('ct').reset_index() # 时间序列对其 空值用均值填充 final_df = time_seq_map(stand_time_dfs, ymd_time) df_list.append(final_df) return df_list df = pd.read_csv('./bbb.csv') df_list = detail_time_series(df, 0.9) # 空值少于10%均值填充 df = pd.concat(df_list, ignore_index=True) print(df)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。