赞
踩
利用LSTM工具的神经网络算法预测未来一天的股票涨跌
最终测试得分,加大数据量可以提高结果精确度,
- ##########################导入所需库##########################
- import pandas as pd
- import numpy as np
- from numpy import log
- from sklearn.preprocessing import MinMaxScaler
- from tensorflow.keras.models import Sequential
- from tensorflow.keras.layers import LSTM, Dense
- #import tensorflow.keras
- from sklearn.metrics import r2_score
- import random
- import matplotlib.pyplot as plt
- #############################################################
-
- ####################设置中文、负号正常显示#####################
- plt.rcParams['font.sans-serif'] = ['SimHei']
- plt.rcParams['axes.unicode_minus'] = False
- #############################################################
-
- ################## 配置文件读写函数库调用封装 ##################
- import os
- import configparser
- def process_config(A, file_name, section_name, key_name, new_value):
- # 获取当前py文件所在的文件夹路径
- current_folder = os.path.dirname(os.path.abspath(__file__))
-
- # 拼接配置文件路径
- config_file_path = os.path.join(current_folder, file_name)
-
- # 创建一个配置文件解析器对象
- config = configparser.ConfigParser()
-
- # 读取配置文件
- config.read(config_file_path)
-
- if A == 'R_value':
- # 获取指定section下的key对应的value
- value = config.get(section_name, key_name)
- return value
- elif A == 'W_value':
- # 修改指定section下的key对应的value
- config.set(section_name, key_name, new_value)
- with open(config_file_path, 'w') as configfile:
- config.write(configfile)
- elif A == 'W_new_section':
- # 添加一个新的section
- config.add_section(section_name)
- with open(config_file_path, 'w') as configfile:
- config.write(configfile)
- elif A == 'W_new_key':
- # 在新的section下添加一个key-value对
- config.set(section_name, key_name, new_value)
- with open(config_file_path, 'w') as configfile:
- config.write(configfile)
- else:
- pass
- # 调用方法:
- # 1. 读取指定section下的key对应的value:`process_config('R_value', 'config.ini', 'section_name', 'key_name', None)`
- # 2. 修改指定section下的key对应的value:`process_config('W_value', 'config.ini', 'section_name', 'key_name', 'new_value')`
- # 3. 添加一个新的section:`process_config('W_new_section', 'config.ini', 'section_name', None, None)`
- # 4. 在新的section下添加一个key-value对:`process_config('W_new_key', 'config.ini', 'section_name', 'key_name', 'new_value')`
- ##############################################################
-
- ############# 全局参数,所有要调整的参数都在这里 ###################################
- token = process_config('R_value', 'config.ini', 'section_token', 'tushare_token', None) #在网站tushare注册用户并获取数据的密钥 注册网址:https://tushare.pro/register?reg=641623
- ts_code = '603178.SH' # 股票代码
- start_date = '19901101' # 起始日期
- end_date = '20241201' # 结束日期
- dim = 300 # 输出维度数,也是LSTM的网络节点数
- epochs = 100 # 训练代数,可以理解为训练次数
- days = 30 # 读取多少天的数据作为一次预测。例如读取20天的历史数据来预测未来1天的情况
- batch_size = days*1 # 训练批次大小,就是一次性读取多少个样本进行一运算,越大运算速度越快,但是占用内存和显存越多,根据自己的机器性能设置。同时该参数还决定梯度下降算法的下降步长数。
- split_ratio = 0.6 #0.9代表选90%作为训练数据,10%测试数据
- ### 开始构建网络 ###
- n_steps = days # 输入张量的维度数
- n_features = 7 # 输入张量的维度
- model_2 = Sequential()
- # 激活函数用relu
- model_2.add(LSTM(dim, activation='relu', input_shape=(n_steps, n_features)))
- # 输出层使用全连接层,只要一个输出节点
- model_2.add(Dense(1))
- # 选择优化器和损失函数,优化器为线性规划算法,损失函数使用的是高维空间测定距离函数
- model_2.compile(optimizer='rmsprop', loss='mse')
- #################################################################################
-
- ######## 从tushare获取数据 ########
- import tushare as ts
- # 封装tushare接口
- def get_stock_data(ts_code, start_date, end_date):
- pro = ts.pro_api(token)
- df = pro.daily(ts_code=ts_code, start_date=start_date, end_date=end_date)
- return df
-
- data = get_stock_data(ts_code, start_date, end_date)
- data = data.iloc[::-1] # dataframe反转,因为接口的第一条数据是最新行情,我们希望最新行情在后面
- print(data)
- data_ = data[['open', 'close', 'high', 'low','pct_chg','vol']]
- data_['日收益率'] = log(data_[['close']]).diff(-1)
- data_.dropna(axis=0, inplace=True)
- data_.to_csv("123.csv")
- print(data_)
-
-
- ########### 加工数据 ###########
- def processData(data, lb):
- X, Y = [], []
- for i in range(len(data) - lb - 1):
- X.append(data[i:(i + lb), 0])
- try:
- Y.append(data[(i + 2 + lb), 0])
- except:
- Y.append(data[(i + lb), 0])
- return np.array(X), np.array(Y)
-
- def pData(data, lb):
- X = []
- for i in range(len(data) - lb - 1):
- X.append(data[i:(i + lb)])
- return np.array(X)
- ##################################
-
- close = data_['close']
- cl = np.array(close)
- max_close = cl.max() # 数据归一化之前,保存原始数据最大值,恢复元数据时用得到
- min_close = cl.min() # 数据归一化之前,保存原始数据最小值,恢复元数据时用得到
- cl = cl.reshape(cl.shape[0], 1)
- scl = MinMaxScaler()
- sc2 = MinMaxScaler()
- cl = scl.fit_transform(cl)
-
- # 生成标签
- _,y = processData(cl, days)
- X = data_.values
- plt.plot(X[int(X.shape[0] * 0.90):].reshape(-1,1),label='y_test') # 假设 X 是一个包含数据的数组,X.shape[0] 表示 X 的长度
- # plt.legend() # 显示图例 # int(X.shape[0] * 0.90) 表示取 X 中最后 10% 的数据作为绘图数据
- plt.show() # 显示图形 # reshape(-1, 1) 将数据转换为列向量形式
- X = sc2.fit_transform(X) # label='y_test' 为折线图添加标签
- X = pData(X, days)
-
- ########## (split_ratio)训练数据分割,如0.9代表选90%作为训练数据,10%测试数据 ##########
- y_train, y_test = y[:int(y.shape[0] * split_ratio)], y[int(y.shape[0] * split_ratio):]
- x_train, x_test = X[:int(X.shape[0] * split_ratio)], X[int(X.shape[0] * split_ratio):]
- print("++++++++++++++++++++", type(x_test))
- ######################################################################################
-
- History = model_2.fit(x_train, y_train, batch_size=batch_size, epochs=epochs,validation_data=(x_test, y_test), shuffle=False)
- plt.plot(History.history['loss'], label='loss')
- plt.plot(History.history['val_loss'], label='val loss')
- plt.legend(loc='best')
- plt.title('The loss values of real and predict')
- #plt.savefig('D:/mojin/self/try/LSTM/result/真实值与预测值的loss值.jpg')
- #plt.show()
- #####################
-
- #####将测试集中的所有切片以序列的方式进行预测,查看预测结果与真实值的拟合情况#####
- y_pred = model_2.predict(x_test)
- fig = plt.gcf()
- plt.figure(figsize=(8, 4))
- y_test = y_test.reshape(-1,1)
- y_test = np.multiply(y_test, max_close - min_close)
- y_test = np.add(y_test, min_close) # 还原原来数据归一化之前的数值
- y_pred = np.multiply(y_pred, max_close - min_close)
- y_pred = np.add(y_pred, min_close) # 还原原来数据归一化之前的数值
- plt.plot(y_test.reshape(-1,1),label='真实值')
- plt.plot(y_pred,label='预测值')
- plt.title('Predict of test data')
- plt.text(100, 0.4, s='测试集R2:%.3f' % (r2_score(y_test.reshape(-1,1), y_pred)))
- plt.legend(loc='best')
- print("测试最大值:%.2f " % max_close,"测试最小值:%.2f " % min_close)
- plt.show()
- ############################################################################
-
- train_score = np.sqrt(np.mean((model_2.predict(x_train) - y_train) ** 2))
- test_score = np.sqrt(np.mean((y_pred - y_test) ** 2))
- print('训练得分: %.2f RMSE' % train_score)
- print('测试得分: %.2f RMSE' % test_score)
-
-
- ######### 预测数据 ##############
- a = x_test[len(x_test)-1]
- Xt = model_2.predict(a.reshape(1, days, n_features)) #预测数据只需要调用这个函数就行, a 的数值可以任意构建
- print("预测数据: ")
- print(a)
- print("预测结果: ", Xt)
- # 还原原来数据归一化之前的数值
- print("预测归一化之前的数值:", Xt[0][0] * (max_close - min_close) + min_close)
训练结果如下:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。