赞
踩
A time series database (TSDB) is a software system that is optimized for handling time series data, arrays of numbers indexed by time (a datetime or a datetime range)
以上是维基百科对于时序数据库的定义。可以把它拆解成3个方面来看:时序特性,数据特性,数据库特性。
时序特性:
数据特性:
数据库特性(CRUD)
虽然通用关系数据库可以存储时序数据,但是由于缺乏针对时间的特殊优化,比如按时间间隔存储和检索数据等等,因此在处理这些数据时效率相对不高。
第一代时序数据典型来源于监控领域,直接基于平板文件的简单存储工具成为这类数据的首先存储方式。
以RRDTool,Wishper为代表,通常这类系统处理的数据模型比较单一,单机容量受限,并且内嵌于监控告警方案。
伴随着大数据和Hadoop的发展,时序数据量开始迅速增长,系统业务对于处理时序数据的扩展性等方面提出更多的要求。
基于通用存储而专门构建的时间序列数据库开始出现,它可以按时间间隔高效地存储和处理这些数据。像OpenTSDB,KairosDB等等。
这类时序数据库在继承通用存储优势的基础上,利用时序的特性规避部分通用存储的劣势,并且在数据模型,聚合分析方面做了贴合时序的大量创新。
比如OpenTSDB继承了HBase的宽表属性结合时序设计了偏移量的存储模型,利用salt缓解热点问题等等。
然而它也有诸多不足之处,比如低效的全局UID机制,聚合数据的加载不可控,无法处理高基数标签查询等等。
随着docker,kubernetes, 微服务等技术的发展,以及对于IoT的发展预期越来越强烈。
在数据随着时间而增长的过程中,时间序列数据成为增长最快的数据类型之一。
高性能,低成本的垂直型时序数据库开始诞生,以InfluxDB为代表的具有时序特征的数据存储引擎逐步引领市场。
它们通常具备更加高级的数据处理能力,高效的压缩算法和符合时序特征的存储引擎。
比如InfluxDB的基于时间的TSMT存储,Gorilla压缩,面向时序的窗口计算函数p99,rate,自动rollup等等。
同时由于索引分离的架构,在膨胀型时间线,乱序等场景下依然面临着很大的挑战。
目前,DB-Engines把时间序列数据库作为独立的目录来分类统计,下图就是2018年业内流行的时序数据库的关注度排名和最近5年的变化趋势。
公有云
AWS Timestream
2018.11 Amazon在AWS re Invent大会发布Timestream预览版。适用于 IoT 和运营应用程序等场景。提供自适应查询处理引擎快速地分析数据,自动对数据进行汇总、保留、分层和压缩处理。按照写入流量,存储空间,查询数据量的方式计费,以serverless的形式做到最低成本管理。
Azure Series Insights
2017.4 Microsoft发布时序见解预览版,提供的完全托管、端到端的存储和查询高度情景化loT时序数据解决方案。强大的可视化效果用于基于资产的数据见解和丰富的交互式临时数据分析。
针对数据类型分为暖数据分析和原始数据分析,按照存储空间和查询量分别计费。
开源
OpenTSDB
OpenTSDB是一个分布式的、可伸缩的时间序列数据库. 引入metric,tags等概念设计了一套针对时序场景的数据模型,底层采用HBase作为存储,利用时序场景的特性,采用特殊的rowkey方式,来提高时序的聚合和查询能力。
Prometheus
Prometheus会将所有采集到的样本数据以时间序列(time-series)的方式保存在内存数据库中,并且定时保存到硬盘上。需要远端存储来保证可靠和扩展性。
InfluxDB
InfluxDB是单机开源的时序数据库,由Go语言编写,无需特殊的环境依赖,简单方便。采用独有的TSMT结构实现高性能的读写。分布式需要商业化支持。
Timescale
面向SQL生态的时序数据库,固定Schema,底层基于PG,按时间管理chunk table。
学术
BTrDB
BtrDB面向高精度时序数据的存储应用,设计并提出了 “time-partitioning version-annotated copy-on-write tree” 的数据结构,为每一条时间线构建了一棵树,并且引入版本的概念处理数据的乱序场景
Confluo
Confluo设计了新型的数据结构”Atomic MultiLog“,采用现代CPU硬件支持的原子指令集,支持百万级数据点高并发写入,毫秒级在线查询,占用很少的的CPU资源实现即席查询
Chronixdb
ChronixDB基于Solr提供了时序存储,并且实现了特有的无损压缩算法,可以与Spark集成,提供丰富的时序分析能力。
商业&工业
PI
PI是OSI软件公司开发的大型实时数据库,广泛应用于电力,化工等行业,采用了旋转门压缩专利技术和独到的二次过滤技术,使进入到PI数据库的数据经过了最有效的压缩,极大地节省了硬盘空间
KDB
KDB是Kx System开发的时间序列数据库,通常用于处理交易行情相关数据。支持流、内存计算和实时分析Billion级别的记录以及快速访问TB级别的历史数据。
Gorilla
Gorilla是Facebook的一个基于内存的时序数据库,采用了一种新的时间序列压缩算法。
可以将数据从16字节压缩到平均1.37字节,缩小12倍.并且设计了针对压缩算法的内存数据结构.在保持对单个时间序列进行时间段查找的同时也能快速和高效的进行全数据扫描。
通过将时间序列数据写到不同地域的主机中,容忍单节点故障,网络切换,甚至是整个数据中心故障。
投资市场
业界典型时序数据库解析
近2年来时序数据库正处于高速发展的阶段。国内外云市场各大主流厂商已经从整个时序生态的不同角度切入,形成各自特色的解决方案完成布局,开始抢占流量。
而以Facebook Gorilla为代表的优秀的时序数据库则是脱胎于满足自身业务发展的需要。学术上,在时序领域里面更是涌现了一大批黑科技,把时序数据的技术深度推向更高的台阶。
阿里巴巴的TSDB团队自2016年第一版时序数据库落地后,逐步服务于DBPaaS,Sunfire等等集团业务,在2017年中旬公测后,于2018年3月底正式商业化。
在此过程中,TSDB在技术方面不断吸纳时序领域各家之长,开启了自研的时序数据库发展之路。
数据地址:https://archive.ics.uci.edu/ml/datasets/Beijing%20PM2.5%20Data
目标:预测未来北京PM2.5的值
优 35微克(ug)/每立方
良 35~75微克(ug)/每立方
轻度污染 75~115微克(ug)/每立方
中度污染 115~150微克(ug)/每立方
重度污染 150~250微克(ug)/每立方
严重污染 250及以上微克(ug)/每立方
DEWP: 露点
TEMP: 温度
PRES:气压
cbwd: 组合风向
Iws: 风速
Is:积累雪量
Ir:积累雨量
from math import sqrt from numpy import concatenate from matplotlib import pyplot as plt from pandas import read_csv from pandas import DataFrame from pandas import concat from sklearn.preprocessing import MinMaxScaler from sklearn.preprocessing import LabelEncoder from sklearn.metrics import mean_squared_error from keras.models import Sequential from keras.layers import Dense from keras.layers import LSTM # convert series to supervised learning def series_to_supervised(data, n_in=1, n_out=1, dropnan=True): n_vars = 1 if type(data) is list else data.shape[1] df = DataFrame(data) cols, names = list(), list() # input sequence (t-n, ... t-1) for i in range(n_in, 0, -1): cols.append(df.shift(i)) names += [('var%d(t-%d)' % (j+1, i)) for j in range(n_vars)] # forecast sequence (t, t+1, ... t+n) for i in range(0, n_out): cols.append(df.shift(-i)) if i == 0: names += [('var%d(t)' % (j+1)) for j in range(n_vars)] else: names += [('var%d(t+%d)' % (j+1, i)) for j in range(n_vars)] # put it all together agg = concat(cols, axis=1) agg.columns = names # drop rows with NaN values if dropnan: agg.dropna(inplace=True) return agg # load dataset dataset = read_csv('pollution.csv') dataset = dataset.drop(['date','wnd_dir'], axis=1) values = dataset.values # integer encode direction encoder = LabelEncoder() values[:,4] = encoder.fit_transform(values[:,4]) # ensure all data is float values = values.astype('float32') # normalize features scaler = MinMaxScaler(feature_range=(0, 1)) scaled = scaler.fit_transform(values) # specify the number of lag hours n_hours = 3 n_features = 8 # frame as supervised learning reframed = series_to_supervised(scaled, n_hours, 1) print(reframed.shape) # split into train and test sets values = reframed.values n_train_hours = 365 * 24 train = values[:n_train_hours, :] test = values[n_train_hours:, :] # split into input and outputs n_obs = n_hours * n_features train_X, train_y = train[:, :n_obs], train[:, -n_features] test_X, test_y = test[:, :n_obs], test[:, -n_features] print(train_X.shape, len(train_X), train_y.shape) # reshape input to be 3D [samples, timesteps, features] train_X = train_X.reshape((train_X.shape[0], n_hours, n_features)) test_X = test_X.reshape((test_X.shape[0], n_hours, n_features)) print(train_X.shape, train_y.shape, test_X.shape, test_y.shape) # design network model = Sequential() model.add(LSTM(50, input_shape=(train_X.shape[1], train_X.shape[2]))) model.add(Dense(1)) model.compile(loss='mae', optimizer='adam') # fit network history = model.fit(train_X, train_y, epochs=50, batch_size=72, validation_data=(test_X, test_y), verbose=2, shuffle=False) # plot history plt.plot(history.history['loss'], label='train') plt.plot(history.history['val_loss'], label='test') plt.legend() plt.show() # make a prediction yhat = model.predict(test_X) test_X = test_X.reshape((test_X.shape[0], n_hours*n_features)) # invert scaling for forecast inv_yhat = concatenate((yhat, test_X[:, -6:]), axis=1) inv_yhat = scaler.inverse_transform(inv_yhat) inv_yhat = inv_yhat[:,0] # invert scaling for actual test_y = test_y.reshape((len(test_y), 1)) inv_y = concatenate((test_y, test_X[:, -6:]), axis=1) inv_y = scaler.inverse_transform(inv_y) inv_y = inv_y[:,0] # calculate RMSE rmse = sqrt(mean_squared_error(inv_y, inv_yhat)) print('Test RMSE: %.3f' % rmse) # Test RMSE: 2.167
数据地址:https://tianchi.aliyun.com/competition/entrance/231573/information
数据集一共包括4张表:用户基本信息数据、用户申购赎回数据、收益率表和银行间拆借利率表。
2.8万用户,284万行为数据,294天拆解利率,427天收益率
时间:2013-07-01到2014-08-31
预测2014年9月的申购和赎回
""" reference:https://machinelearningmastery.com/time-series-prediction-lstm-recurrent-neural-networks-python-keras/ Note: 1.LSTMs are sensitive to the scale of the input data, specifically when the sigmoid (default) or tanh activation functions are used. It can be a good practice to rescale the data to the range of 0-to-1, also called normalizing. 2.The LSTM network expects the input data (X) to be provided with a specific array structure in the form of: [samples, time steps, features]. """ import math import numpy import pandas from keras.layers import LSTM, RNN, GRU, SimpleRNN from keras.layers import Dense, Dropout from keras.callbacks import EarlyStopping import matplotlib.pyplot as plt from keras.models import Sequential from sklearn.preprocessing import MinMaxScaler import os numpy.random.seed(2019) class RNNModel(object): def __init__(self, look_back=1, epochs_purchase=20, epochs_redeem=40, batch_size=1, verbose=2, patience=10, store_result=False): self.look_back = look_back self.epochs_purchase = epochs_purchase self.epochs_redeem = epochs_redeem self.batch_size = batch_size self.verbose = verbose self.store_result = store_result self.patience = patience self.purchase = df_tmp.values[:, 0:1] self.redeem = df_tmp.values[:, 1:2] def access_data(self, data_frame): # load the data set data_set = data_frame data_set = data_set.astype('float32') # LSTMs are sensitive to the scale of the input data, specifically when the sigmoid (default) or tanh activation functions are used. It can be a good practice to rescale the data to the range of 0-to-1, also called normalizing. scaler = MinMaxScaler(feature_range=(0, 1)) data_set = scaler.fit_transform(data_set) # reshape into X=t and Y=t+1 train_x, train_y, test = self.create_data_set(data_set) # reshape input to be [samples, time steps, features] train_x = numpy.reshape(train_x, (train_x.shape[0], 1, train_x.shape[1])) return train_x, train_y, test, scaler # convert an array of values into a data set matrix def create_data_set(self, data_set): data_x, data_y = [], [] for i in range(len(data_set)-self.look_back - 30): a = data_set[i:(i + self.look_back), 0] data_x.append(a) data_y.append(list(data_set[i + self.look_back: i + self.look_back + 30, 0])) # print(numpy.array(data_y).shape) return numpy.array(data_x), numpy.array(data_y), data_set[-self.look_back:, 0].reshape(1, 1, self.look_back) def rnn_model(self, train_x, train_y, epochs): model = Sequential() model.add(LSTM(64, input_shape=(1, self.look_back), return_sequences=True)) model.add(LSTM(32, return_sequences=False)) model.add(Dense(32)) model.add(Dense(30)) model.compile(loss='mean_squared_error', optimizer='adam') model.summary() early_stopping = EarlyStopping('loss', patience=self.patience) history = model.fit(train_x, train_y, epochs=epochs, batch_size=self.batch_size, verbose=self.verbose, callbacks=[early_stopping]) return model def predict(self, model, data): prediction = model.predict(data) return prediction def plot_show(self, predict): predict = predict[['purchase', 'redeem']] predict.plot() plt.show() def run(self): purchase_train_x, purchase_train_y, purchase_test, purchase_scaler = self.access_data(self.purchase) redeem_train_x, redeem_train_y, redeem_test, redeem_scaler = self.access_data(self.redeem) purchase_model = self.rnn_model(purchase_train_x, purchase_train_y, self.epochs_purchase) redeem_model = self.rnn_model(redeem_train_x, redeem_train_y, self.epochs_redeem) purchase_predict = self.predict(purchase_model, purchase_test) redeem_predict = self.predict(redeem_model, redeem_test) test_user = pandas.DataFrame({'report_date': [20140900 + i for i in range(1, 31)]}) purchase = purchase_scaler.inverse_transform(purchase_predict).reshape(30, 1) redeem = redeem_scaler.inverse_transform(redeem_predict).reshape(30, 1) test_user['purchase'] = purchase test_user['redeem'] = redeem print(test_user) """Store submit file""" if self.store_result is True: test_user.to_csv('submit_lstm.csv', encoding='utf-8', index=None, header=None) """plot result picture""" self.plot_show(test_user) if __name__ == '__main__': initiation = RNNModel(look_back=40, epochs_purchase=150, epochs_redeem=230, batch_size=16, verbose=2, patience=50, store_result=True) initiation.run()
1.时序数据库
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。