当前位置:   article > 正文

基于自编码器的心电图信号异常检测(Python)

基于自编码器的心电图信号异常检测(Python)

使用的数据集来自PTB心电图数据库,包括14552个心电图记录,包括两类:正常心跳和异常心跳,采样频率为125Hz。

  1. import numpy as np
  2. np.set_printoptions(suppress=True)
  3. import pandas as pd
  4. import matplotlib.pyplot as plt
  5. import seaborn as sns
  6. from scipy.io import arff
  7. from sklearn.model_selection import train_test_split
  8. import matplotlib
  9. matplotlib.rcParams["figure.figsize"] = (6, 4)
  10. plt.style.use("ggplot")
  11. import tensorflow as tf
  12. from tensorflow import data
  13. from tensorflow.keras.models import Model, Sequential
  14. from tensorflow.keras.callbacks import EarlyStopping
  15. from tensorflow.keras.metrics import mae
  16. from tensorflow.keras import layers
  17. from tensorflow import keras
  18. from sklearn.metrics import accuracy_score, recall_score, precision_score, confusion_matrix, f1_score, classification_report
  19. import os
  20. isGPU = tf.config.list_physical_devices('GPU')
  21. directory_path = r'ECG Heartbeat Categorization Dataset'
  22. for dirname, _, filenames in os.walk(directory_path):
  23. for filename in filenames:
  24. print(os.path.join(dirname, filename))
  25. normal_df = pd.read_csv("ECG Heartbeat Categorization Dataset/ptbdb_normal.csv").iloc[:, :-1]
  26. anomaly_df = pd.read_csv("ECG Heartbeat Categorization Dataset/ptbdb_abnormal.csv").iloc[:, :-1]
  27. print("Shape of Normal data", normal_df.shape)
  28. print("Shape of Abnormal data", anomaly_df.shape)
  29. Shape of Normal data (4045, 187)
  30. Shape of Abnormal data (10505, 187)
  31. def plot_sample(normal, anomaly):
  32. index = np.random.randint(0, len(normal_df), 2)
  33. fig, ax = plt.subplots(1, 2, sharey=True, figsize=(10, 4))
  34. ax[0].plot(normal.iloc[index[0], :].values, label=f"Case {index[0]}")
  35. ax[0].plot(normal.iloc[index[1], :].values, label=f"Case {index[1]}")
  36. ax[0].legend(shadow=True, frameon=True, facecolor="inherit", loc=1, fontsize=9)
  37. ax[0].set_title("Normal")
  38. ax[1].plot(anomaly.iloc[index[0], :].values, label=f"Case {index[0]}")
  39. ax[1].plot(anomaly.iloc[index[1], :].values, label=f"Case {index[1]}")
  40. ax[1].legend(shadow=True, frameon=True, facecolor="inherit", loc=1, fontsize=9)
  41. ax[1].set_title("Anomaly")
  42. plt.tight_layout()
  43. plt.show()
  44. plot_sample(normal_df, anomaly_df)

  1. CLASS_NAMES = ["Normal", "Anomaly"]
  2. normal_df_copy = normal_df.copy()
  3. anomaly_df_copy = anomaly_df.copy()
  4. print(anomaly_df_copy.columns.equals(normal_df_copy.columns))
  5. normal_df_copy = normal_df_copy.set_axis(range(1, 188), axis=1)
  6. anomaly_df_copy = anomaly_df_copy.set_axis(range(1, 188), axis=1)
  7. normal_df_copy = normal_df_copy.assign(target = CLASS_NAMES[0])
  8. anomaly_df_copy = anomaly_df_copy.assign(target = CLASS_NAMES[1])
  9. df = pd.concat((normal_df_copy, anomaly_df_copy))
  10. def plot_smoothed_mean(data, class_name = "normal", step_size=5, ax=None):
  11. df = pd.DataFrame(data)
  12. roll_df = df.rolling(step_size)
  13. smoothed_mean = roll_df.mean().dropna().reset_index(drop=True)
  14. smoothed_std = roll_df.std().dropna().reset_index(drop=True)
  15. margin = 3*smoothed_std
  16. lower_bound = (smoothed_mean - margin).values.flatten()
  17. upper_bound = (smoothed_mean + margin).values.flatten()
  18. ax.plot(smoothed_mean.index, smoothed_mean)
  19. ax.fill_between(smoothed_mean.index, lower_bound, y2=upper_bound, alpha=0.3, color="red")
  20. ax.set_title(class_name, fontsize=9)
  21. fig, axes = plt.subplots(1, 2, figsize=(8, 4), sharey=True)
  22. axes = axes.flatten()
  23. for i, label in enumerate(CLASS_NAMES, start=1):
  24. data_group = df.groupby("target")
  25. data = data_group.get_group(label).mean(axis=0, numeric_only=True).to_numpy()
  26. plot_smoothed_mean(data, class_name=label, step_size=20, ax=axes[i-1])
  27. fig.suptitle("Plot of smoothed mean for each class", y=0.95, weight="bold")
  28. plt.tight_layout()

  1. normal_df.drop("target", axis=1, errors="ignore", inplace=True)
  2. normal = normal_df.to_numpy()
  3. anomaly_df.drop("target", axis=1, errors="ignore", inplace=True)
  4. anomaly = anomaly_df.to_numpy()
  5. X_train, X_test = train_test_split(normal, test_size=0.15, random_state=45, shuffle=True)
  6. print(f"Train shape: {X_train.shape}, Test shape: {X_test.shape}, anomaly shape: {anomaly.shape}")
  7. Train shape: (3438, 187), Test shape: (607, 187), anomaly shape: (10505, 187)
  8. class AutoEncoder(Model):
  9. def __init__(self, input_dim, latent_dim):
  10. super(AutoEncoder, self).__init__()
  11. self.input_dim = input_dim
  12. self.latent_dim = latent_dim
  13. self.encoder = tf.keras.Sequential([
  14. layers.Input(shape=(input_dim,)),
  15. layers.Reshape((input_dim, 1)), # Reshape to 3D for Conv1D
  16. layers.Conv1D(128, 3, strides=1, activation='relu', padding="same"),
  17. layers.BatchNormalization(),
  18. layers.MaxPooling1D(2, padding="same"),
  19. layers.Conv1D(128, 3, strides=1, activation='relu', padding="same"),
  20. layers.BatchNormalization(),
  21. layers.MaxPooling1D(2, padding="same"),
  22. layers.Conv1D(latent_dim, 3, strides=1, activation='relu', padding="same"),
  23. layers.BatchNormalization(),
  24. layers.MaxPooling1D(2, padding="same"),
  25. ])
  26. # Previously, I was using UpSampling. I am trying Transposed Convolution this time around.
  27. self.decoder = tf.keras.Sequential([
  28. layers.Conv1DTranspose(latent_dim, 3, strides=1, activation='relu', padding="same"),
  29. # layers.UpSampling1D(2),
  30. layers.BatchNormalization(),
  31. layers.Conv1DTranspose(128, 3, strides=1, activation='relu', padding="same"),
  32. # layers.UpSampling1D(2),
  33. layers.BatchNormalization(),
  34. layers.Conv1DTranspose(128, 3, strides=1, activation='relu', padding="same"),
  35. # layers.UpSampling1D(2),
  36. layers.BatchNormalization(),
  37. layers.Flatten(),
  38. layers.Dense(input_dim)
  39. ])
  40. def call(self, X):
  41. encoded = self.encoder(X)
  42. decoded = self.decoder(encoded)
  43. return decoded
  44. input_dim = X_train.shape[-1]
  45. latent_dim = 32
  46. model = AutoEncoder(input_dim, latent_dim)
  47. model.build((None, input_dim))
  48. model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.01), loss="mae")
  49. model.summary()
  50. Model: "auto_encoder"
  51. _________________________________________________________________
  52. Layer (type) Output Shape Param #
  53. =================================================================
  54. sequential (Sequential) (None, 24, 32) 63264
  55. sequential_1 (Sequential) (None, 187) 640603
  56. =================================================================
  57. Total params: 703867 (2.69 MB)
  58. Trainable params: 702715 (2.68 MB)
  59. Non-trainable params: 1152 (4.50 KB)
  60. epochs = 100
  61. batch_size = 128
  62. early_stopping = EarlyStopping(patience=10, min_delta=1e-3, monitor="val_loss", restore_best_weights=True)
  63. history = model.fit(X_train, X_train, epochs=epochs, batch_size=batch_size,
  64. validation_split=0.1, callbacks=[early_stopping])

  1. plt.plot(history.history['loss'], label="Training loss")
  2. plt.plot(history.history['val_loss'], label="Validation loss", ls="--")
  3. plt.legend(shadow=True, frameon=True, facecolor="inherit", loc="best", fontsize=9)
  4. plt.title("Training loss")
  5. plt.ylabel("Loss")
  6. plt.xlabel("Epoch")
  7. plt.show()

  1. train_mae = model.evaluate(X_train, X_train, verbose=0)
  2. test_mae = model.evaluate(X_test, X_test, verbose=0)
  3. anomaly_mae = model.evaluate(anomaly_df, anomaly_df, verbose=0)
  4. print("Training dataset error: ", train_mae)
  5. print("Testing dataset error: ", test_mae)
  6. print("Anormaly dataset error: ", anomaly_mae)

  1. Training dataset error: 0.014224529266357422
  2. Testing dataset error: 0.01488062646239996
  3. Anormaly dataset error: 0.043484628200531006
  4. def predict(model, X):
  5. pred = model.predict(X, verbose=False)
  6. loss = mae(pred, X)
  7. return pred, loss

  1. _, train_loss = predict(model, X_train)
  2. _, test_loss = predict(model, X_test)
  3. _, anomaly_loss = predict(model, anomaly)
  4. threshold = np.mean(train_loss) + np.std(train_loss) # Setting threshold for distinguish normal data from anomalous data
  5. bins = 40
  6. plt.figure(figsize=(9, 5), dpi=100)
  7. sns.histplot(np.clip(train_loss, 0, 0.5), bins=bins, kde=True, label="Train Normal")
  8. sns.histplot(np.clip(test_loss, 0, 0.5), bins=bins, kde=True, label="Test Normal")
  9. sns.histplot(np.clip(anomaly_loss, 0, 0.5), bins=bins, kde=True, label="anomaly")
  10. ax = plt.gca() # Get the current Axes
  11. ylim = ax.get_ylim()
  12. plt.vlines(threshold, 0, ylim[-1], color="k", ls="--")
  13. plt.annotate(f"Threshold: {threshold:.3f}", xy=(threshold, ylim[-1]), xytext=(threshold+0.009, ylim[-1]),
  14. arrowprops=dict(facecolor='black', shrink=0.05), fontsize=9)
  15. plt.legend(shadow=True, frameon=True, facecolor="inherit", loc="best", fontsize=9)
  16. plt.show()

  1. def plot_examples(model, data, ax, title):
  2. pred, loss = predict(model, data)
  3. ax.plot(data.flatten(), label="Actual")
  4. ax.plot(pred[0], label = "Predicted")
  5. ax.fill_between(range(1, 188), data.flatten(), pred[0], alpha=0.3, color="r")
  6. ax.legend(shadow=True, frameon=True,
  7. facecolor="inherit", loc=1, fontsize=7)
  8. # bbox_to_anchor = (0, 0, 0.8, 0.25))
  9. ax.set_title(f"{title} (loss: {loss[0]:.3f})", fontsize=9.5)
  10. fig, axes = plt.subplots(2, 5, sharey=True, sharex=True, figsize=(12, 6))
  11. random_indexes = np.random.randint(0, len(X_train), size=5)
  12. for i, idx in enumerate(random_indexes):
  13. data = X_train[[idx]]
  14. plot_examples(model, data, ax=axes[0, i], title="Normal")
  15. for i, idx in enumerate(random_indexes):
  16. data = anomaly[[idx]]
  17. plot_examples(model, data, ax=axes[1, i], title="anomaly")
  18. plt.tight_layout()
  19. fig.suptitle("Sample plots (Actual vs Reconstructed by the CNN autoencoder)", y=1.04, weight="bold")
  20. fig.savefig("autoencoder.png")
  21. plt.show()

Model Evaluation

  1. def evaluate_model(model, data):
  2. pred, loss = predict(model, data)
  3. if id(data) == id(anomaly):
  4. accuracy = np.sum(loss > threshold)/len(data)
  5. else:
  6. accuracy = np.sum(loss <= threshold)/len(data)
  7. return f"Accuracy: {accuracy:.2%}"
  8. print("Training", evaluate_model(model, X_train))
  9. print("Testing", evaluate_model(model, X_test))
  10. print("Anomaly", evaluate_model(model, anomaly))
  11. Training Accuracy: 88.66%
  12. Testing Accuracy: 84.51%
  13. Anomaly Accuracy: 77.34%
  14. def prepare_labels(model, train, test, anomaly, threshold=threshold):
  15. ytrue = np.concatenate((np.ones(len(X_train)+len(X_test), dtype=int), np.zeros(len(anomaly), dtype=int)))
  16. _, train_loss = predict(model, train)
  17. _, test_loss = predict(model, test)
  18. _, anomaly_loss = predict(model, anomaly)
  19. train_pred = (train_loss <= threshold).numpy().astype(int)
  20. test_pred = (test_loss <= threshold).numpy().astype(int)
  21. anomaly_pred = (anomaly_loss < threshold).numpy().astype(int)
  22. ypred = np.concatenate((train_pred, test_pred, anomaly_pred))
  23. return ytrue, ypred
  24. def plot_confusion_matrix(model, train, test, anomaly, threshold=threshold):
  25. ytrue, ypred = prepare_labels(model, train, test, anomaly, threshold=threshold)
  26. accuracy = accuracy_score(ytrue, ypred)
  27. precision = precision_score(ytrue, ypred)
  28. recall = recall_score(ytrue, ypred)
  29. f1 = f1_score(ytrue, ypred)
  30. print(f"""\
  31. Accuracy: {accuracy:.2%}
  32. Precision: {precision:.2%}
  33. Recall: {recall:.2%}
  34. f1: {f1:.2%}\n
  35. """)
  36. cm = confusion_matrix(ytrue, ypred)
  37. cm_norm = confusion_matrix(ytrue, ypred, normalize="true")
  38. data = np.array([f"{count}\n({pct:.2%})" for count, pct in zip(cm.ravel(), cm_norm.ravel())]).reshape(cm.shape)
  39. labels = ["Anomaly", "Normal"]
  40. plt.figure(figsize=(5, 4))
  41. sns.heatmap(cm, annot=data, fmt="", xticklabels=labels, yticklabels=labels)
  42. plt.ylabel("Actual")
  43. plt.xlabel("Predicted")
  44. plt.title("Confusion Matrix", weight="bold")
  45. plt.tight_layout()
  46. plot_confusion_matrix(model, X_train, X_test, anomaly, threshold=threshold)
  47. Accuracy: 80.32%
  48. Precision: 59.94%
  49. Recall: 88.03%
  50. f1: 71.32%

  1. ytrue, ypred = prepare_labels(model, X_train, X_test, anomaly, threshold=threshold)
  2. print(classification_report(ytrue, ypred, target_names=CLASS_NAMES))

precision recall f1-score support
Normal 0.94 0.77 0.85 10505
Anomaly 0.60 0.88 0.71 4045
accuracy 0.80 14550
macro avg 0.77 0.83 0.78 14550
weighted avg 0.85 0.80 0.81 14550

工学博士,担任《Mechanical System and Signal Processing》《中国电机工程学报》《控制与决策》等期刊审稿专家,擅长领域:现代信号处理,机器学习,深度学习,数字孪生,时间序列分析,设备缺陷检测、设备异常检测、设备智能故障诊断与健康管理PHM等。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/734281
推荐阅读
相关标签
  

闽ICP备14008679号