赞
踩
原题目如上,本文只是针对题目简单应用了LSTM、SVM,未做深入探究。
LSTM的Python代码如下:
# -*- coding:utf-8 -*- import os import time import warnings import numpy as np import matplotlib.pyplot as plt from numpy import newaxis from keras.layers.core import Dense, Activation, Dropout from keras.layers.recurrent import LSTM from keras.models import Sequential import pandas as pd %matplotlib inline os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' warnings.filterwarnings("ignore") #处理数据使之符合LSTM的要求+划分train和test #interval 时间间隔(每隔1分钟取一次数据,或是隔5分钟取一次数据) #start_time 所用数据集的起始时间 def load_data(seq_len, normalise_window, interval ,start_time , end_time): f = open(r'D:\chenxinyi\信息系统项目实践\EURUSD1.csv') # 将日期作为index,顺序排列,出现KeyError!!!!解决:在read_csv中加index_col实现同样效果 df = pd.read_csv(f, encoding='gbk', index_col=0) # 按给定时间间隔提取数据 select_min = np.linspace(0, len(df) - 1, int(len(df) / interval)) df = df.iloc[select_min, :] data = df.loc[start_time:end_time] print(data.head()) # 将DataFrame 转化成array data = data.values sequence_length = seq_len + 1 result = [] for index in range(len(data) - sequence_length):#为了最后剩一个sequence长度的数据 result.append(data[index: index + sequence_length,3]) #数据归一化处理 # print(result.head()) if normalise_window: result = normalise_windows(result) result = np.array(result) row = round(0.8 * result.shape[0]) train = result[:int(row), :] np.random.shuffle(train) x_train = train[:, :-1]#取出train中的每一行,但是对于列,最后的一列不取 y_train = train[:, -1] #此处指的是行列的划分取值(行,列) x_test = result[int(row):, :-1] y_test = result[int(row):, -1] x_train = np.reshape(x_train, (x_train.shape[0], x_train.shape[1], 1)) x_test = np.reshape(x_test, (x_test.shape[0], x_test.shape[1], 1)) return [x_train, y_train, x_test, y_test] def normalise_windows(window_data): normalised_data = [] for window in window_data: normalised_window = [((float(p) / float(window[0])) - 1) for p in window] normalised_data.append(normalised_window) return normalised_data #构建LSTM模型 def build_model(layers): model = Sequential() model.add(LSTM( input_shape=(layers[1], layers[0]), output_dim=layers[1], return_sequences=True)) model.add(Dropout(0.2)) model.add(LSTM( layers[2], return_sequences=False)) model.add(Dropout(0.2)) model.add(Dense( output_dim=layers[3])) model.add(Activation("linear")) start = time.time() model.compile(loss="mse", optimizer="rmsprop") print("> Compilation Time : ", time.time() - start) return model def predict_sequences_multiple(model, data, window_size, prediction_len): prediction_seqs = [] #滑动窗口 for i in range(int(len(data)/prediction_len)): curr_frame = data[i*prediction_len] predicted = [] #滚动预测 for j in range(prediction_len): predicted.append(model.predict(curr_frame[newaxis,:,:])[0,0])#增加维度 curr_frame = curr_frame[1:] curr_frame = np.insert(curr_frame, [window_size-1], predicted[-1], axis=0) prediction_seqs.append(predicted) return prediction_seqs def plot_results_multiple(predicted_data, true_data, prediction_len): fig = plt.figure(facecolor='white') ax = fig.add_subplot(111) ax.plot(true_data, label='True Data') for i, data in enumerate(predicted_data): padding = [None for p in range(i * prediction_len)] plt.plot(padding + data, label='Prediction') plt.legend() plt.show() if __name__=='__main__': global_start_time = time.time() epochs = 1 seq_len = 50 print('> Loading data... ') X_train, y_train, X_test, y_test = load_data(seq_len, True,1,'2003.12.01 03:45:00','2003.12.03 22:25:00') print('> Data Loaded. Compiling...') model = build_model([1, 50, 100, 1]) model.fit( X_train, y_train, batch_size=512, nb_epoch=epochs, #validation_split:0~1之间的浮点数,用来指定训练集的一定比例数据作为验证集 validation_split=0.05) predictions = predict_sequences_multiple(model, X_test, seq_len, 50) print('Training duration (s) : ', time.time() - global_start_time) plot_results_multiple(predictions, y_test, 50)
SVM代码如下:
# -*- coding:utf-8 -*- #用来加载CSV数据的工具包 import pandas as pd from sklearn import svm,preprocessing import time import numpy as np import warnings import matplotlib.pyplot as plt %matplotlib inline warnings.filterwarnings("ignore", category=FutureWarning, module="sklearn", lineno=193) def forecast(myType=1 , interval=1 ,start_time='2003.12.01 03:45:00' , end_time='2003.12.03 22:25:00'): # 预测涨跌:type=0,预测值:type=1;时间间隔1min:interval=1, 5min:interval=5, 30min:interval=30 # 运行时间 start = time.clock() # 路径中含中文,需先open() f = open(r'D:\chenxinyi\信息系统项目实践\EURUSD1.csv') # 将日期作为index,顺序排列,出现KeyError!!!!解决:在read_csv中加index_col实现同样效果 df = pd.read_csv(f, encoding='gbk', index_col=0) # 按给定时间间隔提取数据 select_min = np.linspace(0, len(df) - 1, int(len(df) / interval)) df = df.iloc[select_min, :] data = df.loc[start_time:end_time] print(data.head()) if myType == 0: value = pd.Series(data['Close'] - data['Close'].shift(1), \ index=data.index) value = value.bfill() value[value >= 0] = 1 value[value < 0] = 0 else: #value = data['Close'].shift(-1) # 全部数据往前移一位 #value[end_time] = value[len(value) - 1] # 用倒数第二位填充倒数第一位NAN value = data['Close'] data['Value'] = value # 用下一个非空缺值填充空缺值 data = data.fillna(method='bfill') data = data.astype('float64') # print data.head() # 对样本特征进行归一化处理 data_X = data.drop(['Value'], axis=1) data_X = preprocessing.scale(data_X) # 选取数据的80%作为训练集,20%作为测试集 L = len(data) train = int(L * 0.8) total_predict_data = L - train # 保存真实值、预测值 predictions = [] real = [] error = 0 # 开始循环预测,每次向前预测一个值 correct = 0 train_original = train while train < L: Data_train = data_X[train - train_original:train] value_train = value[train - train_original:train] Data_predict = data_X[train:train + 1] value_real = value[train:train + 1] # 核函数分别选取'ploy','linear','rbf' if myType == 0: classifier = svm.SVC(C=1.0,kernel='sigmoid') classifier.fit(Data_train, value_train) value_predict = classifier.predict(Data_predict) print("value_real=%d value_predict=%d" % (value_real[0], value_predict)) if (value_real[0] == int(value_predict)): correct = correct + 1 train = train + 1 elif myType == 1: classifier = svm.SVR(kernel='rbf') classifier.fit(Data_train, value_train) value_predict = classifier.predict(Data_predict) predictions.append(value_predict[0]) real.append(value_real[0]) if train - train_original != total_predict_data - 1: error += abs(value_real[0] - value_predict) print("value_real=%f value_predict=%f error=%f" % ( value_real[0], value_predict, abs(value_real[0] - value_predict))) train = train + 1 if myType == 0: #正确率 correct = correct * 100 / total_predict_data print("Correct=%.2f%%" % correct) elif myType == 1: plot_results(predictions,real) #获得测试集上的均方误差MSE val_loss = get_mse(real,predictions) print("val_loss=%e" % (val_loss)) end = time.clock() print('Running time: %s Seconds' % (end - start)) if myType == 0: return correct,end-start else: return val_loss,end-start def plot_results(predicted_data, true_data):#, filename fig = plt.figure(facecolor='white') ax = fig.add_subplot(111) ax.plot(true_data, label='True Data') plt.plot(predicted_data, label='Prediction') plt.legend() plt.show() #plt.savefig(filename+'.png') def get_mse(records_real, records_predict): records_real = records_real[:-1] records_predict = records_predict[:len(records_real)] return sum([(x - y) ** 2 for x, y in zip(records_real, records_predict)]) / len(records_real) if __name__=='__main__': forecast(0 ,1 ,'2003.12.01 03:45:00' ,'2003.12.03 22:25:00')
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。