当前位置:   article > 正文

使用LSTM模型对心跳时间序列数据预测(Python代码,ipynb环境)_lstm心率异常检测

lstm心率异常检测

所用模块版本:

  1. matplotlib==3.7.1
  2. numpy==1.24.4
  3. pandas==1.5.3
  4. scikit_learn==1.2.2
  5. scipy==1.10.1
  6. seaborn==0.12.2
  7. statsmodels==0.14.0
  8. torch==1.13.1
  9. torch==2.0.1
  10. wfdb==4.1.2

主代码:

  1. import itertools
  2. import pandas as pd
  3. import matplotlib.pyplot as plt
  4. #完整代码:mbd.pub/o/bread/mbd-ZpWUmZ1x
  5. import wfdb
  6. import matplotlib.pyplot as plt
  7. # Specify the path to your downloaded data
  8. path_to_data = 'file_resource/ecg-id-database-1.0.0/Person_03/'
  9. # The record name is the filename without the extension
  10. record_name = 'rec_1'
  11. # Use the 'rdrecord' function to read the ECG data
  12. record = wfdb.rdrecord(f'{path_to_data}/{record_name}')
  13. # Plot the ECG data
  14. plt.figure(figsize=(10, 4))
  15. plt.plot(record.p_signal[:,1])
  16. plt.title('ECG Signal')
  17. plt.xlabel('Time (samples)')
  18. plt.ylabel('Amplitude')
  19. plt.show()
  20. pd.DataFrame(record.p_signal[:,1],columns=["hr"]).to_csv("./P3_rec_1.csv")
  21. hr2 = pd.DataFrame(record.p_signal[:,1],columns=["hr"])[0:10000]
  22. # hr2["index"] = hr2.index
  23. plt.figure(figsize=(10, 4))
  24. from torch import nn
  25. import pandas as pd
  26. import numpy as np
  27. import torch
  28. device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
  29. df = hr2.copy()
  30. df_train = df.loc[:8000].copy()
  31. df_test = df.loc[8000:10000].copy()
  32. target_sensor = "hr"
  33. # features = list(df.columns.difference([target_sensor]))
  34. features = ["hr"]
  35. batch_size =32
  36. forecast_lead = 5
  37. forcast_step = 1
  38. target = f"{target_sensor}_lead{forecast_lead}"
  39. df[target] = df[target_sensor].shift(-forecast_lead)
  40. df = df.iloc[:-forecast_lead]
  41. df_train = df.loc[:8000].copy()
  42. df_test = df.loc[8000-forecast_lead:].copy()
  43. print("Test set fraction:", len(df_train) / len(df_test))
  44. target_mean = df_train[target].mean()
  45. target_stdev = df_train[target].std()
  46. for c in df_train.columns:
  47. mean = df_train[c].mean()
  48. stdev = df_train[c].std()
  49. df_train[c] = (df_train[c] - mean) / stdev
  50. df_test[c] = (df_test[c] - mean) / stdev
  51. import torch
  52. from torch.utils.data import Dataset
  53. # parse the data into sliding windows
  54. class SequenceDataset(Dataset):
  55. def __init__(self, dataframe, target, features,sequence_head=10, sequence_length=5):
  56. self.features = features
  57. self.target = target
  58. self.sequence_head = sequence_head
  59. self.sequence_length = sequence_length
  60. self.y = torch.tensor(dataframe[target].values).float()
  61. self.X = torch.tensor(dataframe[features].values).float()
  62. def __len__(self):
  63. return self.X.shape[0]-self.sequence_length +1
  64. def __getitem__(self, i):
  65. if i >= self.sequence_head - 1:
  66. i_start = i - self.sequence_head + 1
  67. x = self.X[i_start:(i + 1), :]
  68. else:
  69. padding = self.X[0].repeat(self.sequence_head - i - 1, 1)
  70. x = self.X[0:(i + 1), :]
  71. x = torch.cat((padding, x), 0)
  72. return x.to(device), self.y[i:i+self.sequence_length].to(device)
  73. train_dataset = SequenceDataset(
  74. df_train,
  75. target=target,
  76. features=features,
  77. sequence_head=forecast_lead,
  78. sequence_length=forcast_step,
  79. )
  80. from torch.utils.data import DataLoader
  81. torch.manual_seed(99)
  82. train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=False)
  83. for i, (X, y) in enumerate(train_loader):
  84. # print(X)
  85. # print(y)
  86. # print()
  87. if i >3:break
  88. from torch.utils.data import DataLoader
  89. torch.manual_seed(99)
  90. torch.manual_seed(101)
  91. # define the dataset
  92. train_dataset = SequenceDataset(
  93. df_train,
  94. target=target,
  95. features=features,
  96. sequence_head=forecast_lead,
  97. sequence_length=forcast_step,
  98. )
  99. test_dataset = SequenceDataset(
  100. df_test,
  101. target=target,
  102. features=["hr"],
  103. sequence_head=forecast_lead,
  104. sequence_length=forcast_step,
  105. )
  106. train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
  107. test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False)
  108. X, y = next(iter(train_loader))
  109. print("Features shape:", X.shape)
  110. print("Target shape:", y.shape)
  111. from torch import nn
  112. # define the model
  113. class ShallowRegressionLSTM(nn.Module):
  114. def __init__(self, input_size, hidden_size, num_layers, output_size, batch_size):
  115. super().__init__()
  116. self.input_size = input_size
  117. self.hidden_size = hidden_size
  118. self.num_layers = num_layers
  119. self.output_size = output_size
  120. self.num_directions = 1 # 单向LSTM
  121. self.batch_size = batch_size
  122. self.lstm = nn.LSTM(self.input_size, self.hidden_size, self.num_layers, batch_first=True)
  123. self.linear = nn.Linear(self.hidden_size, self.output_size)
  124. def forward(self, input_seq):
  125. batch_size, seq_len = input_seq.shape[0], input_seq.shape[1]
  126. h_0 = torch.randn(self.num_directions * self.num_layers, batch_size, self.hidden_size).to(device)
  127. c_0 = torch.randn(self.num_directions * self.num_layers, batch_size, self.hidden_size).to(device)
  128. output, _ = self.lstm(input_seq, (h_0, c_0))
  129. pred = self.linear(output)
  130. pred = pred[:, -1, :]
  131. return pred
  132. class ShallowRegressionRNN(nn.Module):
  133. def __init__(self, num_sensors, hidden_units):
  134. super().__init__()
  135. self.num_sensors = num_sensors # this is the number of features
  136. self.hidden_units = hidden_units
  137. self.num_layers = 1
  138. self.rnn = nn.RNN(
  139. input_size=num_sensors,
  140. hidden_size=hidden_units,
  141. batch_first=True,
  142. num_layers=self.num_layers
  143. )
  144. self.linear = nn.Linear(in_features=self.hidden_units, out_features=1)
  145. def forward(self, x):
  146. batch_size = x.shape[0]
  147. h0 = torch.zeros(self.num_layers, batch_size, self.hidden_units).requires_grad_().to(device)
  148. _, hn = self.rnn(x, h0)
  149. out = self.linear(hn).flatten()
  150. return out
  151. # instantiated the model
  152. num_hidden_units = 512
  153. # loss_function = nn.MSELoss()
  154. model_lstm = ShallowRegressionLSTM(input_size=len(features), hidden_size=num_hidden_units, num_layers=1,output_size=forcast_step,batch_size=batch_size)
  155. class RMSELoss(nn.Module):
  156. def __init__(self):
  157. super().__init__()
  158. self.mse = nn.MSELoss()
  159. def forward(self,yhat,y):
  160. return torch.sqrt(self.mse(yhat,y))
  161. loss_function = RMSELoss()
  162. def train_model(data_loader, model, loss_function, optimizer):
  163. num_batches = len(data_loader)
  164. total_loss = 0
  165. model.to(device)
  166. model.train()
  167. for X, y in data_loader:
  168. # print(X.shape)
  169. output = model(X)
  170. loss = loss_function(output, y)
  171. optimizer.zero_grad()
  172. loss.backward()
  173. optimizer.step()
  174. total_loss += loss.item()
  175. avg_loss = total_loss / num_batches
  176. print(f"Train loss: {avg_loss}")
  177. def test_model(data_loader, model, loss_function):
  178. num_batches = len(data_loader)
  179. total_loss = 0
  180. model.eval()
  181. with torch.no_grad():
  182. for X, y in data_loader:
  183. output = model(X)
  184. total_loss += loss_function(output, y).item()
  185. avg_loss = total_loss / num_batches
  186. print(f"Test loss: {avg_loss}")
  187. return avg_loss
  188. print("Untrained test\n--------")
  189. # test_model(test_loader, model, loss_function)
  190. avg_loss = 1
  191. model_lstm.to(device)
  192. learning_rate = 5e-4
  193. # train the model
  194. optimizer = torch.optim.Adam(model_lstm.parameters(), lr=learning_rate)
  195. for ix_epoch in range(150):
  196. print(f"Epoch {ix_epoch}\n---------")
  197. train_model(train_loader, model_lstm, loss_function, optimizer=optimizer)
  198. temp = test_model(test_loader, model_lstm, loss_function)
  199. if temp < avg_loss:
  200. avg_loss = temp
  201. torch.save(model_lstm.state_dict(), "model_lstm_%s_%s.pt"% (forecast_lead,forcast_step))
  202. # if ix_epoch % 20 == 0:
  203. # learning_rate = learning_rate * 0.6
  204. # optimizer = torch.optim.Adam(model_lstm.parameters(), lr=learning_rate)
  205. # print(learning_rate)
  206. print()
  207. # save the model
  208. # torch.save(model_lstm.state_dict(), "model_lstm.pt")
  209. model_lstm.load_state_dict(torch.load("model_lstm_%s_%s.pt"% (forecast_lead,forcast_step)))
  210. # predict the model
  211. def predict(data_loader, model):
  212. output = torch.tensor([]).to(device)
  213. model.eval()
  214. with torch.no_grad():
  215. for X, _ in data_loader:
  216. y_star = model(X)
  217. output = torch.cat((output, y_star), 0)
  218. return output
  219. train_eval_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=False)
  220. ystar_col = "Model forecast"
  221. pre = predict(train_eval_loader, model_lstm).cpu().numpy()
  222. print(pre.shape)
  223. df_train[ystar_col] = predict(train_eval_loader, model_lstm).cpu().numpy()
  224. df_test[ystar_col] = predict(test_loader, model_lstm).cpu().numpy()
  225. df_out = pd.concat((df_train, df_test))[[target, ystar_col]]
  226. for c in df_out.columns:
  227. df_out[c] = df_out[c] * target_stdev + target_mean
  228. print(df_out)
  229. # use last predict data to be the next input
  230. def predict_window(data_loader, model, forecast_step=2000):
  231. output = torch.tensor([]).to(device)
  232. model.eval()
  233. count = 0
  234. with torch.no_grad():
  235. for X, _ in data_loader:
  236. y_star = model(X)
  237. output = torch.cat((output, y_star), 0)
  238. count +=1
  239. # print(X)
  240. if count > forecast_lead:
  241. break
  242. for i in range(forecast_step-1):
  243. y_star = model(output[output.shape[0]-forecast_lead:].reshape(1,forecast_lead,1))
  244. print(output)
  245. print(output[output.shape[0]-forecast_lead:])
  246. # y_star = model(output.reshape(1,forecast_lead,1))
  247. output = torch.cat((output, y_star), 0)
  248. if i > 10:
  249. break
  250. return output
  251. res = predict_window(test_loader, model_lstm).cpu().numpy()
  252. print(res)
  253. plt.plot(res)
  254. import matplotlib.pyplot as plt
  255. from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score,mean_absolute_percentage_error
  256. fig, ax = plt.subplots(figsize=(12, 6))
  257. df_out[8000:].plot(ax=ax)
  258. ax.set_title("LSTM model forecast")
  259. ax.set_ylabel("ECG")
  260. ax.set_xlabel("Time")
  261. plt.show()
  262. # calculate the error
  263. # calculate the AIC
  264. from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score,mean_absolute_percentage_error
  265. def calculate_aic(y_true, y_pred, num_params):
  266. mse = mean_squared_error(y_true, y_pred)
  267. aic = len(y_true) * np.log(mse) + 2 * num_params
  268. return aic
  269. mse = mean_squared_error(df_out[target], df_out[ystar_col])
  270. mae = mean_absolute_error(df_out[target], df_out[ystar_col])
  271. r2 = r2_score(df_out[target], df_out[ystar_col])
  272. mape = mean_absolute_percentage_error(df_out[target], df_out[ystar_col])
  273. print(f"R2: {r2:.6f}")
  274. print(f"MAPE: {mape:.6f}")
  275. print(f"MAE: {mae:.6f} ")
  276. print(f"RMSE: {np.sqrt(mse):.6f}")
  277. print(f"mse: {mse:.6f}")
  278. print(f"AIC: {calculate_aic(df_out[target], df_out[ystar_col], 1):.6f}")
  279. num_hidden_units = 128
  280. # use sdg loss
  281. loss_function = nn.MSELoss()
  282. model = ShallowRegressionRNN(num_sensors=len(features), hidden_units=num_hidden_units)
  283. model.to(device)
  284. avg_loss = 1
  285. # loss_function = RMSELoss()
  286. learning_rate = 1e-4
  287. optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
  288. for ix_epoch in range(500):
  289. print(f"Epoch {ix_epoch}\n---------")
  290. train_model(train_loader, model, loss_function, optimizer=optimizer)
  291. temp = test_model(test_loader, model, loss_function)
  292. if temp < avg_loss:
  293. avg_loss = temp
  294. torch.save(model.state_dict(), "model_RNN.pt")
  295. print()
  296. # save the model
  297. # torch.save(model.state_dict(), "model_RNN.pt")
  298. # load the model
  299. model.load_state_dict(torch.load("model_RNN.pt"))
  300. print(avg_loss)
  301. res = predict_window(test_loader, model).cpu().numpy()
  302. print(len(res))
  303. plt.plot(res)
  304. def predict(data_loader, model):
  305. output = torch.tensor([]).to(device)
  306. model.eval()
  307. with torch.no_grad():
  308. for X, _ in data_loader:
  309. y_star = model(X)
  310. output = torch.cat((output, y_star), 0)
  311. return output
  312. train_eval_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=False)
  313. ystar_col = "Model forecast"
  314. df_train[ystar_col] = predict(train_eval_loader, model).cpu().numpy()
  315. df_test[ystar_col] = predict(test_loader, model).cpu().numpy()
  316. df_out = pd.concat((df_train, df_test))[[target, ystar_col]]
  317. # for c in df_out.columns:
  318. # df_out[c] = df_out[c] * target_stdev + target_mean
  319. print(df_out)
  320. import matplotlib.pyplot as plt
  321. fig, ax = plt.subplots(figsize=(12, 6))
  322. df_out[8000:].plot(ax=ax)
  323. ax.set_title("RNN model forecast")
  324. ax.set_ylabel("ECG")
  325. ax.set_xlabel("Time")
  326. plt.show()
  327. from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score,mean_absolute_percentage_error
  328. mse = mean_squared_error(df_out[target], df_out[ystar_col])
  329. mae = mean_absolute_error(df_out[target], df_out[ystar_col])
  330. r2 = r2_score(df_out[target], df_out[ystar_col])
  331. mape = mean_absolute_percentage_error(df_out[target], df_out[ystar_col])
  332. print(f"R2: {r2:.6f}")
  333. print(f"MAPE: {mape:.6f}")
  334. print(f"MAE: {mae:.6f} ")
  335. print(f"RMSE: {np.sqrt(mse):.6f}")
  336. print(f"mse: {mse:.6f}")
  337. print(f"AIC: {calculate_aic(df_out[target], df_out[ystar_col], 1):.6f}")
  338. # aic_res = calaic(df_out[target], df_out[ystar_col], df_out.shape[1])
  339. # print(f"AIC: {aic_res:.6f}")

工学博士,担任《Mechanical System and Signal Processing》等期刊审稿专家,擅长领域:现代信号处理,机器学习,深度学习,数字孪生,时间序列分析,设备缺陷检测、设备异常检测、设备智能故障诊断与健康管理PHM等。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/700146
推荐阅读
相关标签
  

闽ICP备14008679号