当前位置:   article > 正文

【BI学习作业09-时间序列分析】_hadoop时序预测

hadoop时序预测


1.思考题

1.1今天讲解了时间序列预测的两种方式,实际上在数据库内建时间属性后,可以产生时序数据库,请思考什么是时序数据库?为什么时间序列数据成为增长最快的数据类型之一

在这里插入图片描述
A time series database (TSDB) is a software system that is optimized for handling time series data, arrays of numbers indexed by time (a datetime or a datetime range)

以上是维基百科对于时序数据库的定义。可以把它拆解成3个方面来看:时序特性,数据特性,数据库特性。

时序特性:

  • 时间戳:通用的业务场景内以秒和毫秒精度为主,在一些遥感等高频采集领域,时间戳可以达到纳秒级别。时间戳种类包括unix系统时间戳和Calendar, 并且支持时区的自动适配。
  • 采样频率:采集频率一般有2种,一种是周期性的时间采样频率,比如服务器性能相关的定期汇总指标。另外一种是离散型的采样,比如网站的访问等等

数据特性:

  • 数据顺序追加
  • 数据可多维关联
  • 通常高频访问热数据
  • 冷数据需要降维归档
  • 数据主要覆盖数值,状态,事件

数据库特性(CRUD)

  • 写入速率稳定并且远远大于读取
  • 按照时间窗口访问数据
  • 极少更新,存在一定窗口期的覆盖写
  • 批量删除
  • 具备通用数据库要求的高可用,高可靠,可伸缩特性
  • 通常不需要具备事务的能力
1.1.1时序数据库发展简史

在这里插入图片描述

1.1.2第一代时序数据存储系统

虽然通用关系数据库可以存储时序数据,但是由于缺乏针对时间的特殊优化,比如按时间间隔存储和检索数据等等,因此在处理这些数据时效率相对不高。

第一代时序数据典型来源于监控领域,直接基于平板文件的简单存储工具成为这类数据的首先存储方式。

以RRDTool,Wishper为代表,通常这类系统处理的数据模型比较单一,单机容量受限,并且内嵌于监控告警方案。

1.1.3基于通用存储的时序数据库

伴随着大数据和Hadoop的发展,时序数据量开始迅速增长,系统业务对于处理时序数据的扩展性等方面提出更多的要求。

基于通用存储而专门构建的时间序列数据库开始出现,它可以按时间间隔高效地存储和处理这些数据。像OpenTSDB,KairosDB等等。

这类时序数据库在继承通用存储优势的基础上,利用时序的特性规避部分通用存储的劣势,并且在数据模型,聚合分析方面做了贴合时序的大量创新。

比如OpenTSDB继承了HBase的宽表属性结合时序设计了偏移量的存储模型,利用salt缓解热点问题等等。

然而它也有诸多不足之处,比如低效的全局UID机制,聚合数据的加载不可控,无法处理高基数标签查询等等。

1.1.4垂直型时序数据库的出现

随着docker,kubernetes, 微服务等技术的发展,以及对于IoT的发展预期越来越强烈。

在数据随着时间而增长的过程中,时间序列数据成为增长最快的数据类型之一。

高性能,低成本的垂直型时序数据库开始诞生,以InfluxDB为代表的具有时序特征的数据存储引擎逐步引领市场。

它们通常具备更加高级的数据处理能力,高效的压缩算法和符合时序特征的存储引擎。

比如InfluxDB的基于时间的TSMT存储,Gorilla压缩,面向时序的窗口计算函数p99,rate,自动rollup等等。

同时由于索引分离的架构,在膨胀型时间线,乱序等场景下依然面临着很大的挑战。

1.1.5时序数据库发展现状

目前,DB-Engines把时间序列数据库作为独立的目录来分类统计,下图就是2018年业内流行的时序数据库的关注度排名和最近5年的变化趋势。
在这里插入图片描述
公有云

  • AWS Timestream
    2018.11 Amazon在AWS re Invent大会发布Timestream预览版。适用于 IoT 和运营应用程序等场景。提供自适应查询处理引擎快速地分析数据,自动对数据进行汇总、保留、分层和压缩处理。按照写入流量,存储空间,查询数据量的方式计费,以serverless的形式做到最低成本管理。

  • Azure Series Insights
    2017.4 Microsoft发布时序见解预览版,提供的完全托管、端到端的存储和查询高度情景化loT时序数据解决方案。强大的可视化效果用于基于资产的数据见解和丰富的交互式临时数据分析。
    针对数据类型分为暖数据分析和原始数据分析,按照存储空间和查询量分别计费。

开源

  • OpenTSDB
    OpenTSDB是一个分布式的、可伸缩的时间序列数据库. 引入metric,tags等概念设计了一套针对时序场景的数据模型,底层采用HBase作为存储,利用时序场景的特性,采用特殊的rowkey方式,来提高时序的聚合和查询能力。

  • Prometheus
    Prometheus会将所有采集到的样本数据以时间序列(time-series)的方式保存在内存数据库中,并且定时保存到硬盘上。需要远端存储来保证可靠和扩展性。

  • InfluxDB
    InfluxDB是单机开源的时序数据库,由Go语言编写,无需特殊的环境依赖,简单方便。采用独有的TSMT结构实现高性能的读写。分布式需要商业化支持。

  • Timescale
    面向SQL生态的时序数据库,固定Schema,底层基于PG,按时间管理chunk table。

学术

  • BTrDB
    BtrDB面向高精度时序数据的存储应用,设计并提出了 “time-partitioning version-annotated copy-on-write tree” 的数据结构,为每一条时间线构建了一棵树,并且引入版本的概念处理数据的乱序场景

  • Confluo
    Confluo设计了新型的数据结构”Atomic MultiLog“,采用现代CPU硬件支持的原子指令集,支持百万级数据点高并发写入,毫秒级在线查询,占用很少的的CPU资源实现即席查询

  • Chronixdb
    ChronixDB基于Solr提供了时序存储,并且实现了特有的无损压缩算法,可以与Spark集成,提供丰富的时序分析能力。

商业&工业

  • PI
    PI是OSI软件公司开发的大型实时数据库,广泛应用于电力,化工等行业,采用了旋转门压缩专利技术和独到的二次过滤技术,使进入到PI数据库的数据经过了最有效的压缩,极大地节省了硬盘空间

  • KDB
    KDB是Kx System开发的时间序列数据库,通常用于处理交易行情相关数据。支持流、内存计算和实时分析Billion级别的记录以及快速访问TB级别的历史数据。

  • Gorilla
    Gorilla是Facebook的一个基于内存的时序数据库,采用了一种新的时间序列压缩算法。

可以将数据从16字节压缩到平均1.37字节,缩小12倍.并且设计了针对压缩算法的内存数据结构.在保持对单个时间序列进行时间段查找的同时也能快速和高效的进行全数据扫描。
通过将时间序列数据写到不同地域的主机中,容忍单节点故障,网络切换,甚至是整个数据中心故障。

投资市场

  • 2018年时序数据库创业公司在投资市场有2笔著名的投资。
  • Timescale获得了来自Benchmark Capital的$12.4M Series A轮融资。
  • InfluxDB获得了来自Sapphire Ventures的$35M C轮融资。

业界典型时序数据库解析
近2年来时序数据库正处于高速发展的阶段。国内外云市场各大主流厂商已经从整个时序生态的不同角度切入,形成各自特色的解决方案完成布局,开始抢占流量。

而以Facebook Gorilla为代表的优秀的时序数据库则是脱胎于满足自身业务发展的需要。学术上,在时序领域里面更是涌现了一大批黑科技,把时序数据的技术深度推向更高的台阶。

阿里巴巴的TSDB团队自2016年第一版时序数据库落地后,逐步服务于DBPaaS,Sunfire等等集团业务,在2017年中旬公测后,于2018年3月底正式商业化。

在此过程中,TSDB在技术方面不断吸纳时序领域各家之长,开启了自研的时序数据库发展之路。

2.编程题

2.1PM2.5数据集

数据地址:https://archive.ics.uci.edu/ml/datasets/Beijing%20PM2.5%20Data
目标:预测未来北京PM2.5的值

2.1.1PM2.5的分级标准为
优 35微克(ug)/每立方
良 35~75微克(ug)/每立方
轻度污染 75~115微克(ug)/每立方
中度污染 115~150微克(ug)/每立方
重度污染 150~250微克(ug)/每立方
严重污染 250及以上微克(ug)/每立方
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
2.1.2数据集说明

在这里插入图片描述

DEWP: 露点
TEMP: 温度
PRES:气压
cbwd: 组合风向
Iws: 风速
Is:积累雪量
Ir:积累雨量

from math import sqrt
from numpy import concatenate
from matplotlib import pyplot as plt
from pandas import read_csv
from pandas import DataFrame
from pandas import concat
from sklearn.preprocessing import MinMaxScaler
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import mean_squared_error
from keras.models import Sequential
from keras.layers import Dense
from keras.layers import LSTM

# convert series to supervised learning
def series_to_supervised(data, n_in=1, n_out=1, dropnan=True):
	n_vars = 1 if type(data) is list else data.shape[1]
	df = DataFrame(data)
	cols, names = list(), list()
	# input sequence (t-n, ... t-1)
	for i in range(n_in, 0, -1):
		cols.append(df.shift(i))
		names += [('var%d(t-%d)' % (j+1, i)) for j in range(n_vars)]
	# forecast sequence (t, t+1, ... t+n)
	for i in range(0, n_out):
		cols.append(df.shift(-i))
		if i == 0:
			names += [('var%d(t)' % (j+1)) for j in range(n_vars)]
		else:
			names += [('var%d(t+%d)' % (j+1, i)) for j in range(n_vars)]
	# put it all together
	agg = concat(cols, axis=1)
	agg.columns = names
	# drop rows with NaN values
	if dropnan:
		agg.dropna(inplace=True)
	return agg


# load dataset
dataset = read_csv('pollution.csv')
dataset = dataset.drop(['date','wnd_dir'], axis=1)
values = dataset.values
# integer encode direction
encoder = LabelEncoder()

values[:,4] = encoder.fit_transform(values[:,4])
# ensure all data is float
values = values.astype('float32')
# normalize features
scaler = MinMaxScaler(feature_range=(0, 1))
scaled = scaler.fit_transform(values)
# specify the number of lag hours
n_hours = 3
n_features = 8
# frame as supervised learning
reframed = series_to_supervised(scaled, n_hours, 1)
print(reframed.shape)
 
# split into train and test sets
values = reframed.values
n_train_hours = 365 * 24
train = values[:n_train_hours, :]
test = values[n_train_hours:, :]
# split into input and outputs
n_obs = n_hours * n_features
train_X, train_y = train[:, :n_obs], train[:, -n_features]
test_X, test_y = test[:, :n_obs], test[:, -n_features]
print(train_X.shape, len(train_X), train_y.shape)
# reshape input to be 3D [samples, timesteps, features]
train_X = train_X.reshape((train_X.shape[0], n_hours, n_features))
test_X = test_X.reshape((test_X.shape[0], n_hours, n_features))
print(train_X.shape, train_y.shape, test_X.shape, test_y.shape)
 
# design network
model = Sequential()
model.add(LSTM(50, input_shape=(train_X.shape[1], train_X.shape[2])))
model.add(Dense(1))
model.compile(loss='mae', optimizer='adam')
# fit network
history = model.fit(train_X, train_y, epochs=50, batch_size=72, validation_data=(test_X, test_y), verbose=2, shuffle=False)
# plot history
plt.plot(history.history['loss'], label='train')
plt.plot(history.history['val_loss'], label='test')
plt.legend()
plt.show()
 
# make a prediction
yhat = model.predict(test_X)
test_X = test_X.reshape((test_X.shape[0], n_hours*n_features))
# invert scaling for forecast
inv_yhat = concatenate((yhat, test_X[:, -6:]), axis=1)
inv_yhat = scaler.inverse_transform(inv_yhat)
inv_yhat = inv_yhat[:,0]
# invert scaling for actual
test_y = test_y.reshape((len(test_y), 1))
inv_y = concatenate((test_y, test_X[:, -6:]), axis=1)
inv_y = scaler.inverse_transform(inv_y)
inv_y = inv_y[:,0]
# calculate RMSE
rmse = sqrt(mean_squared_error(inv_y, inv_yhat))
print('Test RMSE: %.3f' % rmse) # Test RMSE: 2.167
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101

在这里插入图片描述

2.2资金流入流出预测

数据地址:https://tianchi.aliyun.com/competition/entrance/231573/information
数据集一共包括4张表:用户基本信息数据、用户申购赎回数据、收益率表和银行间拆借利率表。

2.8万用户,284万行为数据,294天拆解利率,427天收益率
时间:2013-07-01到2014-08-31
预测2014年9月的申购和赎回

"""
reference:https://machinelearningmastery.com/time-series-prediction-lstm-recurrent-neural-networks-python-keras/
Note:
1.LSTMs are sensitive to the scale of the input data, specifically when the sigmoid (default) or tanh activation functions are used. It can be a good practice to rescale the data to the range of 0-to-1, also called normalizing.
2.The LSTM network expects the input data (X) to be provided with a specific array structure in the form of: [samples, time steps, features].
"""

import math
import numpy
import pandas
from keras.layers import LSTM, RNN, GRU, SimpleRNN
from keras.layers import Dense, Dropout
from keras.callbacks import EarlyStopping
import matplotlib.pyplot as plt
from keras.models import Sequential
from sklearn.preprocessing import MinMaxScaler
import os

numpy.random.seed(2019)


class RNNModel(object):
    def __init__(self, look_back=1, epochs_purchase=20, epochs_redeem=40, batch_size=1, verbose=2, patience=10, store_result=False):
        self.look_back = look_back
        self.epochs_purchase = epochs_purchase
        self.epochs_redeem = epochs_redeem
        self.batch_size = batch_size
        self.verbose = verbose
        self.store_result = store_result
        self.patience = patience
        self.purchase = df_tmp.values[:, 0:1]
        self.redeem = df_tmp.values[:, 1:2]
        
    def access_data(self, data_frame):
        # load the data set
        data_set = data_frame
        data_set = data_set.astype('float32')

        # LSTMs are sensitive to the scale of the input data, specifically when the sigmoid (default) or tanh activation functions are used. It can be a good practice to rescale the data to the range of 0-to-1, also called normalizing.
        scaler = MinMaxScaler(feature_range=(0, 1))
        data_set = scaler.fit_transform(data_set)

        # reshape into X=t and Y=t+1
        train_x, train_y, test = self.create_data_set(data_set)

        # reshape input to be [samples, time steps, features]
        train_x = numpy.reshape(train_x, (train_x.shape[0], 1, train_x.shape[1]))
        return train_x, train_y, test, scaler

    # convert an array of values into a data set matrix
    def create_data_set(self, data_set):
        data_x, data_y = [], []
        for i in range(len(data_set)-self.look_back - 30):
            a = data_set[i:(i + self.look_back), 0]
            data_x.append(a)
            data_y.append(list(data_set[i + self.look_back: i + self.look_back + 30, 0]))
        # print(numpy.array(data_y).shape)
        return numpy.array(data_x), numpy.array(data_y), data_set[-self.look_back:, 0].reshape(1, 1, self.look_back)

    def rnn_model(self, train_x, train_y, epochs):
        model = Sequential()
        model.add(LSTM(64, input_shape=(1, self.look_back), return_sequences=True))
        model.add(LSTM(32, return_sequences=False))
        model.add(Dense(32))
        model.add(Dense(30))
        model.compile(loss='mean_squared_error', optimizer='adam')
        model.summary()
        early_stopping = EarlyStopping('loss', patience=self.patience)
        history = model.fit(train_x, train_y, epochs=epochs, batch_size=self.batch_size, verbose=self.verbose, callbacks=[early_stopping])
        return model

    def predict(self, model, data):
        prediction = model.predict(data)
        return prediction

    def plot_show(self, predict):
        predict = predict[['purchase', 'redeem']]
        predict.plot()
        plt.show()

    def run(self):
        purchase_train_x, purchase_train_y, purchase_test, purchase_scaler = self.access_data(self.purchase)
        redeem_train_x, redeem_train_y, redeem_test, redeem_scaler = self.access_data(self.redeem)

        purchase_model = self.rnn_model(purchase_train_x, purchase_train_y, self.epochs_purchase)
        redeem_model = self.rnn_model(redeem_train_x, redeem_train_y, self.epochs_redeem)

        purchase_predict = self.predict(purchase_model, purchase_test)
        redeem_predict = self.predict(redeem_model, redeem_test)

        test_user = pandas.DataFrame({'report_date': [20140900 + i for i in range(1, 31)]})

        purchase = purchase_scaler.inverse_transform(purchase_predict).reshape(30, 1)
        redeem = redeem_scaler.inverse_transform(redeem_predict).reshape(30, 1)

        test_user['purchase'] = purchase
        test_user['redeem'] = redeem
        print(test_user)

        """Store submit file"""
        if self.store_result is True:
            test_user.to_csv('submit_lstm.csv', encoding='utf-8', index=None, header=None)
            
        """plot result picture"""
        self.plot_show(test_user)
        
if __name__ == '__main__':
    initiation = RNNModel(look_back=40, epochs_purchase=150, epochs_redeem=230, batch_size=16, verbose=2, patience=50, store_result=True)
    initiation.run()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109

参考资料

1.时序数据库

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/951949
推荐阅读
相关标签
  

闽ICP备14008679号