常常会碰到各种各样时间序列预测问题,如商场人流量的预测、商品价格的预测、股价的预测,等等。TensorFlow新引入了一个TensorFlow Time Series库(以下简称为TFTS),它可以帮助在TensorFlow中快速搭建高性能的时间序列预测系统,并提供包括AR、LSTM在内的多个模型。
- NumpyReader:用于从Numpy数组中读入数据
- CSVReader:用于从CSV文件中读入数据
import numpy as np import matplotlib matplotlib.use('agg') import matplotlib.pyplot as plt import tensorflow as tf from tensorflow.contrib.timeseries.python.timeseries import NumpyReader
x = np.array(range(1000)) noise = np.random.uniform(-0.2, 0.2, 1000) y = np.sin(np.pi * x / 100) + x / 200. + noise plt.plot(x, y) plt.savefig('timeseries_y.jpg')
data = {
tf.contrib.timeseries.TrainEvalFeatures.TIMES: x,
tf.contrib.timeseries.TrainEvalFeatures.VALUES: y,
reader = NumpyReader(data)
首先把x和y变成Python中的字典(变量data)。上面的定义直接写成“data={‘times':x, ‘values':y}”也是可以的。写成比较复杂的形式是为了和源码中的写法保持一致。
with tf.Session() as sess: full_data = reader.read_full() # 调用read_full方法会生成读取队列 # 要用tf.train.start_queue_runners启动队列才能正常进行读取 coord = tf.train.Coordinator() threads = tf.train.start_queue_runners(sess=sess, coord=coord) print(sess.run(full_data)) coord.request_stop()
train_input_fn = tf.contrib.timeseries.RandomWindowInputFn(
reader, batch_size=2, window_size=10)
以batch_size=2, window_size=10为例,可以打印出一个batch的数据:
with tf.Session() as sess: batch_data = train_input_fn.create_batch() coord = tf.train.Coordinator() threads = tf.train.start_queue_runners(sess=sess, coord=coord) one_batch = sess.run(batch_data[0]) coord.request_stop() print('one_batch_data:', one_batch) # one_batch_data: {'times': array([[11, 12, 13, 14, 15, 16, 17, 18, 19, 20], # [21, 22, 23, 24, 25, 26, 27, 28, 29, 30]]), 'values': array([[[0.33901882], # [0.29966548], # [0.64006627], # [0.35204604], # [0.66049626], # [0.57470108], # [0.68309054], # [0.46613038], # [0.60309193], # [0.84166497]], # # [[0.77312242], # [0.82185951], # [0.71022706], # [0.63987861], # [0.7011966 ], # [0.84051192], # [1.05796465], # [0.92981324], # [1.0542786 ], # [0.89828743]]])}
原先的数据长度为1000的时间序列(x=np.array(range(1000))),使用tf.contrib.timeseries.RandomWindowInputFn,并指定window_size=10, batch_size=2的功能是在这长度为1000的时间序列中,随机选取长度为10的序列,并在每个batch里包含两个这样的序列。这也可以从打印出的数据中看出来。
import tensorflow as tf csv_file_name = './period_trend.csv' reader = tf.contrib.timeseries.CSVReader(csv_file_name)
with tf.Session() as sess: data = reader.read_full() coord = tf.train.Coordinator() threads = tf.train.start_queue_runners(sess=sess, coord=coord) print(sess.run(data)) coord.request_stop()
train_input_fn = tf.contrib.timeseries.RandomWindowInputFn(reader, batch_size=4, window_size=16)
with tf.Session() as sess: data = train_input_fn.create_batch() coord = tf.train.Coordinator() threads = tf.train.start_queue_runners(sess=sess, coord=coord) batch1 = sess.run(data[0]) batch2 = sess.run(data[0]) coord.request_stop() print('batch1:', batch1) print('batch2:', batch2)
自回归模型(Autoregressive model,简称为AR模型)是统计学上处理时间序列模型的基本方法之一。TFTS中已经实现了一个自回归模型,我们只需要对其进行调用即可使用。
x = np.array(range(1000)) noise = np.random.uniform(-0.2, 0.2, 1000) y = np.sin(np.pi * x / 100) + x / 200. + noise plt.plot(x, y) plt.savefig('timeseries_y.jpg') data = { tf.contrib.timeseries.TrainEvalFeatures.TIMES: x, tf.contrib.timeseries.TrainEvalFeatures.VALUES: y, } reader = NumpyReader(data) train_input_fn = tf.contrib.timeseries.RandomWindowInputFn( reader, batch_size=16, window_size=40)
ar = tf.contrib.timeseries.ARRegressor( periodicities=200, input_window_size=30, output_window_size=10, num_features=1, loss=tf.contrib.timeseries.ARModel.NORMAL_LIKELIHOOD_LOSS)
- periodicities:序列的规律性周期。在定义数据时使用的语句是“y=np.sin(np.pi * x /100)+x /200.+noise”,因此周期为200
- input_window_size:模型每次输入的值
- output_window_size:模型每次输出的值
- num_features:表示在一个时间点上观察到的数的维度。这里每一步都是一个单独的值,所以num_features=1
- model_dir:模型训练好后保存的地址,如果不指定的话,会随机分配一个临时地址
input_window_size和output_window_size加起来必须等于train_input_fn中总的window_size。总的window_size为40, input_window_size为30,output_window_size为10;也是说,一个batch内每个序列的长度为40,其中前30个数被当作模型的输入值,后面10个数为这些输入对应的目标输出值。
ar.train(input_fn=train_input_fn, steps=6000)
evaluation_input_fn = tf.contrib.timeseries.WholeDatasetInputFn(reader) # keys of evaluation: ['covariance', 'loss', 'mean', 'observed', 'start_tuple', 'times', 'global_step'] evaluation = ar.evaluate(input_fn=evaluation_input_fn, steps=1)
(predictions,) = tuple(ar.predict( input_fn=tf.contrib.timeseries.predict_continuation_input_fn( evaluation, steps=250)))
plt.figure(figsize=(15, 5)) plt.plot(data['times'].reshape(-1), data['values'].reshape(-1), label='origin') plt.plot(evaluation['times'].reshape(-1), evaluation['mean'].reshape(-1), label='evaluation') plt.plot(predictions['times'].reshape(-1), predictions['mean'].reshape(-1), label='prediction') plt.xlabel('time_step') plt.ylabel('values') plt.legend(loc=4) plt.savefig('predict_result.jpg')

# coding: utf-8 from __future__ import print_function import numpy as np import matplotlib matplotlib.use('agg') import matplotlib.pyplot as plt import tensorflow as tf from tensorflow.contrib.timeseries.python.timeseries import NumpyReader def main(_): x = np.array(range(1000)) noise = np.random.uniform(-0.2, 0.2, 1000) y = np.sin(np.pi * x / 100) + x / 200. + noise plt.plot(x, y) plt.savefig('timeseries_y.jpg') data = { tf.contrib.timeseries.TrainEvalFeatures.TIMES: x, tf.contrib.timeseries.TrainEvalFeatures.VALUES: y, } reader = NumpyReader(data) train_input_fn = tf.contrib.timeseries.RandomWindowInputFn( reader, batch_size=16, window_size=40) ar = tf.contrib.timeseries.ARRegressor( periodicities=200, input_window_size=30, output_window_size=10, num_features=1, loss=tf.contrib.timeseries.ARModel.NORMAL_LIKELIHOOD_LOSS) ar.train(input_fn=train_input_fn, steps=6000) evaluation_input_fn = tf.contrib.timeseries.WholeDatasetInputFn(reader) # keys of evaluation: ['covariance', 'loss', 'mean', 'observed', 'start_tuple', 'times', 'global_step'] evaluation = ar.evaluate(input_fn=evaluation_input_fn, steps=1) (predictions,) = tuple(ar.predict( input_fn=tf.contrib.timeseries.predict_continuation_input_fn( evaluation, steps=250))) plt.figure(figsize=(15, 5)) plt.plot(data['times'].reshape(-1), data['values'].reshape(-1), label='origin') plt.plot(evaluation['times'].reshape(-1), evaluation['mean'].reshape(-1), label='evaluation') plt.plot(predictions['times'].reshape(-1), predictions['mean'].reshape(-1), label='prediction') plt.xlabel('time_step') plt.ylabel('values') plt.legend(loc=4) plt.savefig('predict_result.jpg') if __name__ == '__main__': tf.logging.set_verbosity(tf.logging.INFO) tf.app.run()
x = np.array(range(1000)) noise = np.random.uniform(-0.2, 0.2, 1000) y = np.sin(np.pi * x / 50) + np.cos(np.pi * x / 50) + np.sin(np.pi * x / 25) + noise data = { tf.contrib.timeseries.TrainEvalFeatures.TIMES: x, tf.contrib.timeseries.TrainEvalFeatures.VALUES: y, } reader = NumpyReader(data) train_input_fn = tf.contrib.timeseries.RandomWindowInputFn( reader, batch_size=4, window_size=100)
estimator = ts_estimators.TimeSeriesRegressor( model=_LSTMModel(num_features=1, num_units=128), optimizer=tf.train.AdamOptimizer(0.001))
estimator.train(input_fn=train_input_fn, steps=2000) # 训练模型 evaluation_input_fn = tf.contrib.timeseries.WholeDatasetInputFn(reader) # 测试数据 evaluation = estimator.evaluate(input_fn=evaluation_input_fn, steps=1) # 得到评估后的数据 # 评估后预测200步数据 (predictions,) = tuple(estimator.predict( input_fn=tf.contrib.timeseries.predict_continuation_input_fn( evaluation, steps=200)))
observed_times = evaluation["times"][0] observed = evaluation["observed"][0, :, :] evaluated_times = evaluation["times"][0] evaluated = evaluation["mean"][0] predicted_times = predictions['times'] predicted = predictions["mean"] plt.figure(figsize=(15, 5)) plt.axvline(999, linestyle="dotted", linewidth=4, color='r') observed_lines = plt.plot(observed_times, observed, label="observation", color="k") evaluated_lines = plt.plot(evaluated_times, evaluated, label="evaluation", color="g") predicted_lines = plt.plot(predicted_times, predicted, label="prediction", color="r") plt.legend(handles=[observed_lines[0], evaluated_lines[0], predicted_lines[0]], loc="upper left") plt.savefig('predict_result.jpg')

import numpy as np import tensorflow as tf from tensorflow.contrib.timeseries.python.timeseries import estimators as ts_estimators from tensorflow.contrib.timeseries.python.timeseries import model as ts_model from tensorflow.contrib.timeseries.python.timeseries import NumpyReader import matplotlib matplotlib.use("agg") import matplotlib.pyplot as plt class _LSTMModel(ts_model.SequentialTimeSeriesModel): """A time series model-building example using an RNNCell.""" def __init__(self, num_units, num_features, dtype=tf.float32): """Initialize/configure the model object. Note that we do not start graph building here. Rather, this object is a configurable factory for TensorFlow graphs which are run by an Estimator. Args: num_units: The number of units in the model's LSTMCell. num_features: The dimensionality of the time series (features per timestep). dtype: The floating point data type to use. """ super(_LSTMModel, self).__init__( # Pre-register the metrics we'll be outputting (just a mean here). train_output_names=["mean"], predict_output_names=["mean"], num_features=num_features, dtype=dtype) self._num_units = num_units # Filled in by initialize_graph() self._lstm_cell = None self._lstm_cell_run = None self._predict_from_lstm_output = None def initialize_graph(self, input_statistics): """Save templates for components, which can then be used repeatedly. This method is called every time a new graph is created. It's safe to start adding ops to the current default graph here, but the graph should be constructed from scratch. Args: input_statistics: A math_utils.InputStatistics object. """ super(_LSTMModel, self).initialize_graph(input_statistics=input_statistics) self._lstm_cell = tf.nn.rnn_cell.LSTMCell(num_units=self._num_units) # Create templates so we don't have to worry about variable reuse. self._lstm_cell_run = tf.make_template( name_="lstm_cell", func_=self._lstm_cell, create_scope_now_=True) # Transforms LSTM output into mean predictions. self._predict_from_lstm_output = tf.make_template( name_="predict_from_lstm_output", func_=lambda inputs: tf.layers.dense(inputs=inputs, units=self.num_features), create_scope_now_=True) def get_start_state(self): """Return initial state for the time series model.""" return ( # Keeps track of the time associated with this state for error checking. tf.zeros([], dtype=tf.int64), # The previous observation or prediction. tf.zeros([self.num_features], dtype=self.dtype), # The state of the RNNCell (batch dimension removed since this parent # class will broadcast). [tf.squeeze(state_element, axis=0) for state_element in self._lstm_cell.zero_state(batch_size=1, dtype=self.dtype)]) def _transform(self, data): """Normalize data based on input statistics to encourage stable training.""" mean, variance = self._input_statistics.overall_feature_moments return (data - mean) / variance def _de_transform(self, data): """Transform data back to the input scale.""" mean, variance = self._input_statistics.overall_feature_moments return data * variance + mean def _filtering_step(self, current_times, current_values, state, predictions): """Update model state based on observations. Note that we don't do much here aside from computing a loss. In this case it's easier to update the RNN state in _prediction_step, since that covers running the RNN both on observations (from this method) and our own predictions. This distinction can be important for probabilistic models, where repeatedly predicting without filtering should lead to low-confidence predictions. Args: current_times: A [batch size] integer Tensor. current_values: A [batch size, self.num_features] floating point Tensor with new observations. state: The model's state tuple. predictions: The output of the previous `_prediction_step`. Returns: A tuple of new state and a predictions dictionary updated to include a loss (note that we could also return other measures of goodness of fit, although only "loss" will be optimized). """ state_from_time, prediction, lstm_state = state with tf.control_dependencies( [tf.assert_equal(current_times, state_from_time)]): transformed_values = self._transform(current_values) # Use mean squared error across features for the loss. predictions["loss"] = tf.reduce_mean( (prediction - transformed_values) ** 2, axis=-1) # Keep track of the new observation in model state. It won't be run # through the LSTM until the next _imputation_step. new_state_tuple = (current_times, transformed_values, lstm_state) return (new_state_tuple, predictions) def _prediction_step(self, current_times, state): """Advance the RNN state using a previous observation or prediction.""" _, previous_observation_or_prediction, lstm_state = state lstm_output, new_lstm_state = self._lstm_cell_run( inputs=previous_observation_or_prediction, state=lstm_state) next_prediction = self._predict_from_lstm_output(lstm_output) new_state_tuple = (current_times, next_prediction, new_lstm_state) return new_state_tuple, {"mean": self._de_transform(next_prediction)} def _imputation_step(self, current_times, state): """Advance model state across a gap.""" # Does not do anything special if we're jumping across a gap. More advanced # models, especially probabilistic ones, would want a special case that # depends on the gap size. return state def _exogenous_input_step( self, current_times, current_exogenous_regressors, state): """Update model state based on exogenous regressors.""" raise NotImplementedError( "Exogenous inputs are not implemented for this example.") if __name__ == '__main__': tf.logging.set_verbosity(tf.logging.INFO) x = np.array(range(1000)) noise = np.random.uniform(-0.2, 0.2, 1000) y = np.sin(np.pi * x / 50) + np.cos(np.pi * x / 50) + np.sin(np.pi * x / 25) + noise data = { tf.contrib.timeseries.TrainEvalFeatures.TIMES: x, tf.contrib.timeseries.TrainEvalFeatures.VALUES: y, } reader = NumpyReader(data) train_input_fn = tf.contrib.timeseries.RandomWindowInputFn( reader, batch_size=4, window_size=100) estimator = ts_estimators.TimeSeriesRegressor( model=_LSTMModel(num_features=1, num_units=128), optimizer=tf.train.AdamOptimizer(0.001)) estimator.train(input_fn=train_input_fn, steps=2000) evaluation_input_fn = tf.contrib.timeseries.WholeDatasetInputFn(reader) evaluation = estimator.evaluate(input_fn=evaluation_input_fn, steps=1) # Predict starting after the evaluation (predictions,) = tuple(estimator.predict( input_fn=tf.contrib.timeseries.predict_continuation_input_fn( evaluation, steps=200))) observed_times = evaluation["times"][0] observed = evaluation["observed"][0, :, :] evaluated_times = evaluation["times"][0] evaluated = evaluation["mean"][0] predicted_times = predictions['times'] predicted = predictions["mean"] plt.figure(figsize=(15, 5)) plt.axvline(999, linestyle="dotted", linewidth=4, color='r') observed_lines = plt.plot(observed_times, observed, label="observation", color="k") evaluated_lines = plt.plot(evaluated_times, evaluated, label="evaluation", color="g") predicted_lines = plt.plot(predicted_times, predicted, label="prediction", color="r") plt.legend(handles=[observed_lines[0], evaluated_lines[0], predicted_lines[0]], loc="upper left") plt.savefig('predict_result.jpg')
0 0.926906299771 1.99107237682 2.56546245685 3.07914768197 4.04839057867
1 0.108010001864 1.41645361423 2.1686839775 2.94963962176 4.1263503303
2 -0.800567600028 1.0172132907 1.96434754116 2.99885333086 4.04300485864
3 0.0607042871898 0.719540073421 1.9765012584 2.89265588817 4.0951014426
99 0.987764008058 1.85581989607 2.84685706149 2.94760204892 6.0212151724
csv_file_name = path.join("./data/multivariate_periods.csv") reader = tf.contrib.timeseries.CSVReader( csv_file_name, column_names=((tf.contrib.timeseries.TrainEvalFeatures.TIMES,) + (tf.contrib.timeseries.TrainEvalFeatures.VALUES,) * 5)) train_input_fn = tf.contrib.timeseries.RandomWindowInputFn( reader, batch_size=4, window_size=32)
estimator = ts_estimators.TimeSeriesRegressor( model=_LSTMModel(num_features=5, num_units=128), optimizer=tf.train.AdamOptimizer(0.001))

from os import path import tensorflow as tf from tensorflow.contrib.timeseries.python.timeseries import estimators as ts_estimators from tensorflow.contrib.timeseries.python.timeseries import model as ts_model import matplotlib matplotlib.use("agg") import matplotlib.pyplot as plt class _LSTMModel(ts_model.SequentialTimeSeriesModel): """A time series model-building example using an RNNCell.""" def __init__(self, num_units, num_features, dtype=tf.float32): """Initialize/configure the model object. Note that we do not start graph building here. Rather, this object is a configurable factory for TensorFlow graphs which are run by an Estimator. Args: num_units: The number of units in the model's LSTMCell. num_features: The dimensionality of the time series (features per timestep). dtype: The floating point data type to use. """ super(_LSTMModel, self).__init__( # Pre-register the metrics we'll be outputting (just a mean here). train_output_names=["mean"], predict_output_names=["mean"], num_features=num_features, dtype=dtype) self._num_units = num_units # Filled in by initialize_graph() self._lstm_cell = None self._lstm_cell_run = None self._predict_from_lstm_output = None def initialize_graph(self, input_statistics): """Save templates for components, which can then be used repeatedly. This method is called every time a new graph is created. It's safe to start adding ops to the current default graph here, but the graph should be constructed from scratch. Args: input_statistics: A math_utils.InputStatistics object. """ super(_LSTMModel, self).initialize_graph(input_statistics=input_statistics) self._lstm_cell = tf.nn.rnn_cell.LSTMCell(num_units=self._num_units) # Create templates so we don't have to worry about variable reuse. self._lstm_cell_run = tf.make_template( name_="lstm_cell", func_=self._lstm_cell, create_scope_now_=True) # Transforms LSTM output into mean predictions. self._predict_from_lstm_output = tf.make_template( name_="predict_from_lstm_output", func_=lambda inputs: tf.layers.dense(inputs=inputs, units=self.num_features), create_scope_now_=True) def get_start_state(self): """Return initial state for the time series model.""" return ( # Keeps track of the time associated with this state for error checking. tf.zeros([], dtype=tf.int64), # The previous observation or prediction. tf.zeros([self.num_features], dtype=self.dtype), # The state of the RNNCell (batch dimension removed since this parent # class will broadcast). [tf.squeeze(state_element, axis=0) for state_element in self._lstm_cell.zero_state(batch_size=1, dtype=self.dtype)]) def _transform(self, data): """Normalize data based on input statistics to encourage stable training.""" mean, variance = self._input_statistics.overall_feature_moments return (data - mean) / variance def _de_transform(self, data): """Transform data back to the input scale.""" mean, variance = self._input_statistics.overall_feature_moments return data * variance + mean def _filtering_step(self, current_times, current_values, state, predictions): """Update model state based on observations. Note that we don't do much here aside from computing a loss. In this case it's easier to update the RNN state in _prediction_step, since that covers running the RNN both on observations (from this method) and our own predictions. This distinction can be important for probabilistic models, where repeatedly predicting without filtering should lead to low-confidence predictions. Args: current_times: A [batch size] integer Tensor. current_values: A [batch size, self.num_features] floating point Tensor with new observations. state: The model's state tuple. predictions: The output of the previous `_prediction_step`. Returns: A tuple of new state and a predictions dictionary updated to include a loss (note that we could also return other measures of goodness of fit, although only "loss" will be optimized). """ state_from_time, prediction, lstm_state = state with tf.control_dependencies( [tf.assert_equal(current_times, state_from_time)]): transformed_values = self._transform(current_values) # Use mean squared error across features for the loss. predictions["loss"] = tf.reduce_mean( (prediction - transformed_values) ** 2, axis=-1) # Keep track of the new observation in model state. It won't be run # through the LSTM until the next _imputation_step. new_state_tuple = (current_times, transformed_values, lstm_state) return (new_state_tuple, predictions) def _prediction_step(self, current_times, state): """Advance the RNN state using a previous observation or prediction.""" _, previous_observation_or_prediction, lstm_state = state lstm_output, new_lstm_state = self._lstm_cell_run( inputs=previous_observation_or_prediction, state=lstm_state) next_prediction = self._predict_from_lstm_output(lstm_output) new_state_tuple = (current_times, next_prediction, new_lstm_state) return new_state_tuple, {"mean": self._de_transform(next_prediction)} def _imputation_step(self, current_times, state): """Advance model state across a gap.""" # Does not do anything special if we're jumping across a gap. More advanced # models, especially probabilistic ones, would want a special case that # depends on the gap size. return state def _exogenous_input_step( self, current_times, current_exogenous_regressors, state): """Update model state based on exogenous regressors.""" raise NotImplementedError( "Exogenous inputs are not implemented for this example.") if __name__ == '__main__': tf.logging.set_verbosity(tf.logging.INFO) csv_file_name = path.join("./data/multivariate_periods.csv") reader = tf.contrib.timeseries.CSVReader( csv_file_name, column_names=((tf.contrib.timeseries.TrainEvalFeatures.TIMES,) + (tf.contrib.timeseries.TrainEvalFeatures.VALUES,) * 5)) train_input_fn = tf.contrib.timeseries.RandomWindowInputFn( reader, batch_size=4, window_size=32) estimator = ts_estimators.TimeSeriesRegressor( model=_LSTMModel(num_features=5, num_units=128), optimizer=tf.train.AdamOptimizer(0.001)) estimator.train(input_fn=train_input_fn, steps=200) evaluation_input_fn = tf.contrib.timeseries.WholeDatasetInputFn(reader) evaluation = estimator.evaluate(input_fn=evaluation_input_fn, steps=1) # Predict starting after the evaluation (predictions,) = tuple(estimator.predict( input_fn=tf.contrib.timeseries.predict_continuation_input_fn( evaluation, steps=100))) observed_times = evaluation["times"][0] observed = evaluation["observed"][0, :, :] evaluated_times = evaluation["times"][0] evaluated = evaluation["mean"][0] predicted_times = predictions['times'] predicted = predictions["mean"] plt.figure(figsize=(15, 5)) plt.axvline(99, linestyle="dotted", linewidth=4, color='r') observed_lines = plt.plot(observed_times, observed, label="observation", color="k") evaluated_lines = plt.plot(evaluated_times, evaluated, label="evaluation", color="g") predicted_lines = plt.plot(predicted_times, predicted, label="prediction", color="r") plt.legend(handles=[observed_lines[0], evaluated_lines[0], predicted_lines[0]], loc="upper left") plt.savefig('predict_result.jpg')