  1. import pandas as pd
  2. import tensorflow as tf
  3. import numpy as np
  4. import tensorflow.keras as keras
  5. from tensorflow.keras import Sequential
  6. from tensorflow.keras.layers import Conv1D
  7. from tensorflow.keras.layers import MaxPooling1D
  8. from tensorflow.keras.layers import Flatten
  9. from tensorflow.keras.layers import Dense
  10. from tensorflow.keras.layers import Dropout
  11. from tensorflow.keras.layers import BatchNormalization
  12. from tensorflow.keras.layers import Softmax
  13. from sklearn.metrics import classification_report
  14. from keras.utils.vis_utils import plot_model
  15. from sklearn.preprocessing import StandardScaler
  16. import matplotlib.pyplot as plt
  17. import pickle
  18. from scipy.stats import zscore
  19. from sklearn.model_selection import train_test_split
  20. from sklearn import preprocessing
  21. import gc
  22. from keras.callbacks import ReduceLROnPlateau
  23. from collections import Counter
  24. import seaborn as sns
  25. from sklearn.metrics import confusion_matrix
  26. from sklearn.utils.class_weight import compute_class_weight
  27. from imblearn.over_sampling import RandomOverSampler, SMOTE, ADASYN


  1. all_sub_data = np.load("/content/drive/MyDrive/major project/all_sub_data.npy")
  2. print(all_sub_data.shape)



  1. for sub in range(all_sub_data.shape[0]):
  2. all_sub_data[sub] = zscore(all_sub_data[sub], axis = 1) #zscore normalize each channel
  3. gc.collect()



  1. if(inp_choice == 'm'):
  2. labels = pd.read_excel("/content/drive/MyDrive/major project/metadata/Labels.xls")
  3. #for multiclass classification
  4. sub_labels = labels["Valence-Arousal Model Quadrant"].astype('int')
  5. gc.collect()
  6. sub_labels
  7. if(inp_choice == 'm'):
  8. labels = pd.read_excel("/content/drive/MyDrive/major project/metadata/Labels.xls")
  9. #for multiclass classification
  10. sub_labels = labels["Valence-Arousal Model Quadrant"].astype('int')
  11. gc.collect()
  12. sub_labels
  13. #Distribution of Multi-Class Labels
  14. if(inp_choice == 'm'):
  16. # Add frequency bar plot here for dataset label distribution
  17. c = Counter(sub_labels)
  18. print(c)
  19. plt.figure()
  20. plt.bar([0,1,2,3], [c[0], c[1], c[2], c[3]])
  21. plt.show()
  22. #Distribution of Binary Class Labels
  23. if(inp_choice == 'b'):
  24. # Add frequency bar plot here for dataset label distribution
  25. c = Counter(sub_labels)
  26. print(c)
  27. plt.figure()
  28. plt.bar([0,1], [c[0], c[1]])
  29. plt.show()
  30. ### Label Binarization of multi-class labels
  31. if(inp_choice == 'm'):
  32. multi_class_weights = compute_class_weights("balanced", classes = [0,1,2,3], y=sub_labels)
  33. print(multi_class_weights)
  34. d_multi_class_weights = dict(enumerate(multi_class_weights))
  35. print(d_multi_class_weights)
  36. if(inp_choice == 'm'):
  37. lb = preprocessing.LabelBinarizer()
  38. sub_labels = lb.fit_transform(sub_labels)
  39. print(lb.classes_)
  40. print(sub_labels.shape)
  41. print(sub_labels)
  42. ### Label Binarization of 2/Binary labels
  43. if(inp_choice == 'b'):
  44. bin_class_weights = compute_class_weight("balanced", classes = [0,1], y=sub_labels)
  45. print(bin_class_weights)
  46. d_bin_class_weights = dict(enumerate(bin_class_weights))
  47. print(d_bin_class_weights)
  48. def encode(x):
  49. if(x==1):
  50. return [0,1]
  51. elif(x==0):
  52. return [1,0]
  53. else:
  54. print("invalid value")
  55. return None
  56. if(inp_choice == 'b'):
  57. sub_labels_bin = np.array(list(map(encode, sub_labels)))
  58. print(sub_labels_bin.shape)
  59. print(sub_labels_bin[:6])
  60. if(inp_choice == 'b'):
  61. sub_labels_bin[0]
  62. if(inp_choice == 'b'):
  63. sub_labels = sub_labels_bin
  64. gc.collect()
  65. print(sub_labels.shape)
  66. def inv_bin(x):
  67. if(x == [0,1]):
  68. return 1
  69. elif(x==[1,0]):
  70. return 0


  1. X_train, X_test, y_train, y_test = train_test_split(all_sub_data, sub_labels, test_size =
  2. 0.1, random_state = 42,shuffle = True,
  3. stratify = sub_labels)
  4. X_train.shape, y_train.shape, X_test.shape, y_test.shape




  1. #12,6 and 4 subsignals are generated from 8064 length EEG signal, labels repeated accordingly
  2. y_train_12 = np.repeat(y_train, 12, axis = 0)
  3. #y_train_6 = np.repeat(y_train, 6, axis = 0)
  4. #y_train_4 = np.repeat(y_train, 4, axis = 0)
  5. del(y_train)
  6. gc.collect()
  7. #print(y_train_12.shape, y_train_6.shape, y_train_4.shape)
  8. print(y_train_12.shape)
  9. try:
  10. del y_train_6, y_train_4
  11. except:
  12. pass
  13. gc.collect()
  14. try:
  15. del c_train, c_test, c_train_12, c
  16. except:
  17. pass
  18. gc.collect()
  1. ## Loading Training data with different window sizes
  2. channel_wise = np.transpose(X_train, (1,0,2))
  3. del(X_train)
  4. gc.collect()
  5. def process_input(instances, sub_signals):
  6. #instances must be channel wise of shape (32, -1, 8064)
  7. samples = int(8064/sub_signals)
  8. transformed = []
  9. for i in range(instances.shape[0]):
  10. transformed.append(np.reshape(instances[i], (-1,samples,1)))
  11. transformed = np.array(transformed)
  12. print(transformed.shape, 'is the shape obtained.')
  13. gc.collect()
  14. return transformed

### 12 sub signals of length 672 each, total 13824 instances

  1. #X_train_12 = np.load("/content/drive/MyDrive/major project/data_augmentation/channel_wise_12.npy")
  2. #print(X_train_12.shape, 'Shape of Training Data')
  3. X_train_12 = process_input(channel_wise, 12)
  4. gc.collect()


  1. #X_train_6 = np.load("/content/drive/MyDrive/major project/data_augmentation/channel_wise_6.npy")
  2. #print(X_train_6.shape, 'Shape of Training Data')
  3. #gc.collect()
  4. #X_train_4 = np.load("/content/drive/MyDrive/major project/data_augmentation/channel_wise_4.npy")
  5. #print(X_train_4.shape, 'Shape of Training Data')
  6. #gc.collect()


  1. def process_input_ensemble(instances, sub_signals):
  2. """
  3. This Function explicity adds a dimension for the sub_signals, hence is used for ensembel modeling to traverse that dimension.
  4. Otherwise also we can simply assume, since the dataset is ordered that the groups of len(sub_signals) are obtained from one EEG signal
  5. """
  6. #instances must be channel wise of shape (32, -1, 8064)
  7. gc.collect()
  8. samples = int(8064/sub_signals)
  9. transformed = []
  10. for i in range(instances.shape[0]):
  11. transformed.append(np.reshape(instances[i], (-1,sub_signals,samples,1)))
  12. gc.collect()
  13. transformed = np.array(transformed)
  14. print(transformed.shape, 'is the shape obtained.')
  15. gc.collect()
  16. #output shape will be (len(intances), 32, sub_signals, samples, 1)
  17. return transformed
  18. X_test_channels = np.transpose(X_test, (1,0,2))
  19. x_test_12 = process_input(X_test_channels, 12)
  20. y_test_12 = np.repeat(np.array(y_test), 12, axis = 0)
  21. print(x_test_12.shape, y_test_12.shape)
  22. """
  23. x_test_6 = process_input(X_test_channels, 6)
  24. y_test_6 = np.repeat(np.array(y_test), 6, axis = 0)
  25. print(x_test_6.shape, y_test_6.shape)
  26. """
  27. """
  28. x_test_6 = process_input(X_test_channels, 6)
  29. y_test_6 = np.repeat(np.array(y_test), 6, axis = 0)
  30. print(x_test_6.shape, y_test_6.shape)
  31. """


  1. def plot_evaluation_curves(history, EPOCHS, mtrc):
  2. acc = history.history[mtrc[0]]
  3. val_acc = history.history[mtrc[1]]
  4. loss = history.history['loss']
  5. val_loss = history.history['val_loss']
  6. epochs_range = range(EPOCHS)
  7. plt.figure(figsize=(10, 6))
  8. plt.subplot(1, 2, 1)
  9. plt.plot(epochs_range, acc, label='Training ' + mtrc[0])
  10. plt.plot(epochs_range, val_acc, label='Validation '+ mtrc[0])
  11. plt.legend(loc='lower right')
  12. plt.title('Training and Validation ' + mtrc[0])
  13. plt.subplot(1, 2, 2)
  14. plt.plot(epochs_range, loss, label='Training Loss')
  15. plt.plot(epochs_range, val_loss, label='Validation Loss')
  16. plt.legend(loc='upper right')
  17. plt.title('Training and Validation Loss')
  18. plt.show()
  19. def make_confusion_matrix(cf,
  20. group_names=None,
  21. categories='auto',
  22. count=True,
  23. percent=True,
  24. cbar=True,
  25. xyticks=True,
  26. xyplotlabels=True,
  27. sum_stats=True,
  28. figsize=None,
  29. cmap='Blues',
  30. title=None):
  31. '''
  32. This function will make a pretty plot of an sklearn Confusion Matrix cm using a Seaborn heatmap visualization.
  33. Arguments
  34. ---------
  35. cf: confusion matrix to be passed in
  36. group_names: List of strings that represent the labels row by row to be shown in each square.
  37. categories: List of strings containing the categories to be displayed on the x,y axis. Default is 'auto'
  38. count: If True, show the raw number in the confusion matrix. Default is True.
  39. normalize: If True, show the proportions for each category. Default is True.
  40. cbar: If True, show the color bar. The cbar values are based off the values in the confusion matrix.
  41. Default is True.
  42. xyticks: If True, show x and y ticks. Default is True.
  43. xyplotlabels: If True, show 'True Label' and 'Predicted Label' on the figure. Default is True.
  44. sum_stats: If True, display summary statistics below the figure. Default is True.
  45. figsize: Tuple representing the figure size. Default will be the matplotlib rcParams value.
  46. cmap: Colormap of the values displayed from matplotlib.pyplot.cm. Default is 'Blues'
  47. See http://matplotlib.org/examples/color/colormaps_reference.html
  48. title: Title for the heatmap. Default is None.
  49. '''
  51. blanks = ['' for i in range(cf.size)]
  52. if group_names and len(group_names)==cf.size:
  53. group_labels = ["{}\n".format(value) for value in group_names]
  54. else:
  55. group_labels = blanks
  56. if count:
  57. group_counts = ["{0:0.0f}\n".format(value) for value in cf.flatten()]
  58. else:
  59. group_counts = blanks
  60. if percent:
  61. group_percentages = ["{0:.2%}".format(value) for value in cf.flatten()/np.sum(cf)]
  62. else:
  63. group_percentages = blanks
  64. box_labels = [f"{v1}{v2}{v3}".strip() for v1, v2, v3 in zip(group_labels,group_counts,group_percentages)]
  65. box_labels = np.asarray(box_labels).reshape(cf.shape[0],cf.shape[1])
  67. if sum_stats:
  68. #Accuracy is sum of diagonal divided by total observations
  69. accuracy = np.trace(cf) / float(np.sum(cf))
  70. #if it is a binary confusion matrix, show some more stats
  71. if len(cf)==2:
  72. #Metrics for Binary Confusion Matrices
  73. precision = cf[1,1] / sum(cf[:,1])
  74. recall = cf[1,1] / sum(cf[1,:])
  75. f1_score = 2*precision*recall / (precision + recall)
  76. stats_text = "\n\nAccuracy={:0.3f}\nPrecision={:0.3f}\nRecall={:0.3f}\nF1 Score={:0.3f}".format(
  77. accuracy,precision,recall,f1_score)
  78. else:
  79. stats_text = "\n\nAccuracy={:0.3f}".format(accuracy)
  80. else:
  81. stats_text = ""
  83. if figsize==None:
  84. #Get default figure size if not set
  85. figsize = plt.rcParams.get('figure.figsize')
  86. if xyticks==False:
  87. #Do not show categories if xyticks is False
  88. categories=False
  90. plt.figure(figsize=figsize)
  91. sns.heatmap(cf,annot=box_labels,fmt="",cmap=cmap,cbar=cbar,xticklabels=categories,yticklabels=categories)
  92. if xyplotlabels:
  93. plt.ylabel('True label')
  94. plt.xlabel('Predicted label' + stats_text)
  95. else:
  96. plt.xlabel(stats_text)
  97. if title:
  98. plt.title(title)
  99. def show_test_metrics(x_test, y_test, model):
  100. print("Testing Set MEASURES: ")
  101. #Testing Data metrics
  102. pred_y = model.predict(x_test)
  103. c_pred = Counter(np.argmax(pred_y, axis = 1))
  104. print(c_pred, "Predicted Distribution of Testing Dataset")
  105. c_true = Counter(np.argmax(y_test, axis = 1))
  106. print(c_true, "Actual Distribution of Testing Dataset")
  107. print(model.evaluate(x_test, y_test), "Testing Measures of Model.")
  108. print("Classification Report of Model on Testing Data")
  109. print(classification_report(np.argmax(y_test, axis =1), np.argmax(model.predict(x_test), axis = 1) , digits = 4) )
  110. make_confusion_matrix(confusion_matrix(np.argmax(y_test, axis = 1), np.argmax(model.predict(x_test), axis = 1)))
  111. print("------------------------------------------------------------\n")
  112. def show_metrics(x_train=None, x_val=None, y_train=None, y_val=None, model=None):
  113. print("TRAINING MEASURES: ")
  114. #Training Data metrics
  115. pred_y = model.predict(x_train)
  116. c_pred = Counter(np.argmax(pred_y, axis = 1))
  117. print(c_pred, "Predicted Distribution of Training Dataset")
  118. c_true = Counter(np.argmax(y_train, axis = 1))
  119. print(c_true, "Actual Distribution of Training Dataset")
  120. print(model.evaluate(x_train, y_train), "Training Measures of Model.")
  121. print("Classification Report of Model on Training Data")
  122. print(classification_report(np.argmax(y_train, axis =1), np.argmax(model.predict(x_train), axis = 1) , digits = 4) )
  123. make_confusion_matrix(confusion_matrix(np.argmax(y_train, axis = 1), np.argmax(model.predict(x_train), axis = 1)))
  124. print("\n------------------------------------------------------------------------------------------\n")
  125. #Validation Data Metrics
  126. print("VALIDATION MEASURES: ")
  127. pred_y = model.predict(x_val)
  128. c_pred = Counter(np.argmax(pred_y, axis = 1))
  129. print(c_pred, "Predicted Distribution of Validation Dataset")
  130. c_true = Counter(np.argmax(y_val, axis = 1))
  131. print(c_true, "Actual Distribution of Validation Dataset")
  132. print(model.evaluate(x_val, y_val), "Validation Measures of Model.")
  133. print("Classification Report of Model on Validation Data")
  134. print(classification_report(np.argmax(y_val, axis = 1), np.argmax(model.predict(x_val), axis = 1), digits = 4))
  135. make_confusion_matrix(confusion_matrix(np.argmax(y_val, axis = 1), np.argmax(model.predict(x_val), axis =1)) )
  136. print("\n------------------------------------------------------------------------------------------\n")
  137. gc.collect()


  1. METRICS = [
  2. keras.metrics.TruePositives(name='tp'),
  3. keras.metrics.FalsePositives(name='fp'),
  4. keras.metrics.TrueNegatives(name='tn'),
  5. keras.metrics.FalseNegatives(name='fn'),
  6. keras.metrics.BinaryAccuracy(name='accuracy'),
  7. keras.metrics.Precision(name='precision'),
  8. keras.metrics.Recall(name='recall'),
  9. keras.metrics.AUC(name='auc'),
  10. ]
  11. def dummy_models(sub_signals=12, metrics = METRICS):
  12. sample_size = int(8064/sub_signals)
  13. models = [0]*32
  14. for i in range(32):
  15. models[i] = Sequential()
  16. models[i].add(Dense(100, activation = 'relu', input_shape = (sample_size, 1)))
  17. models[i].add(Flatten())
  18. models[i].add(Dense(40, activation = 'relu'))
  19. try:
  20. #fc2 and softmax
  21. if (inp_choice == 'b'):
  22. #binary classification
  23. models[i].add(Dense(2, activation = 'softmax'))
  24. models[i].compile(optimizer= tf.keras.optimizers.Adam(learning_rate = 1e-4) , loss = tf.keras.losses.BinaryCrossentropy(), metrics= metrics)
  25. elif (inp_choice == 'm'):
  26. #mutliclass classification
  27. models[i].add(Dense(4, activation = 'softmax'))
  28. models[i].compile(optimizer= tf.keras.optimizers.Adam(learning_rate=1e-4) , loss = tf.keras.losses.CategoricalCrossentropy(), metrics= metrics)
  29. except ValueError:
  30. raise ValueError("ValueError exception thrown. Invalid Classfication choice, No Model was Created")
  31. print("All models defined.")
  32. return models
  33. null_data = np.zeros((13824, 672, 1))
  34. def create_models(dense_par=20, sub_signals=12, metrics = METRICS):
  35. sample_size = int(8064/sub_signals)
  36. models = [0]*32
  37. for i in range(32):
  38. models[i] = Sequential()
  39. #block 1
  40. models[i].add(Conv1D(filters=32, kernel_size=5,strides = 3, input_shape=(sample_size, 1)))
  41. models[i].add(BatchNormalization())
  42. models[i].add(tf.keras.layers.Activation('relu'))
  43. #block 2
  44. models[i].add(Conv1D(filters=24, kernel_size=3,strides = 2))
  45. models[i].add(BatchNormalization())
  46. models[i].add(tf.keras.layers.Activation('relu'))
  47. #block 3
  48. models[i].add(Conv1D(filters=16, kernel_size=3,strides = 2))
  49. models[i].add(BatchNormalization())
  50. models[i].add(tf.keras.layers.Activation('relu'))
  51. #block 4
  52. models[i].add(Conv1D(filters=8, kernel_size=3,strides = 2))
  53. models[i].add(BatchNormalization())
  54. models[i].add(tf.keras.layers.Activation('relu'))
  55. #fc-1
  56. models[i].add(Flatten())
  57. models[i].add(Dense(dense_par, activation='relu'))
  58. #dropout
  59. models[i].add(Dropout(rate = 0.5))
  60. try:
  61. #fc2 and softmax
  62. if (inp_choice == 'b'):
  63. #binary classification
  64. models[i].add(Dense(2, activation = 'softmax'))
  65. models[i].compile(optimizer= tf.keras.optimizers.Adam(learning_rate = 1e-4) , loss = tf.keras.losses.BinaryCrossentropy(), metrics= metrics)
  66. elif (inp_choice == 'm'):
  67. #mutliclass classification
  68. models[i].add(Dense(4, activation = 'softmax'))
  69. models[i].compile(optimizer= tf.keras.optimizers.Adam(learning_rate=1e-4) , loss = tf.keras.losses.CategoricalCrossentropy(), metrics= metrics)
  70. except ValueError:
  71. raise ValueError("ValueError exception thrown. Invalid Classfication choice, No Model was Created")
  72. print("All models defined.")
  73. return models


10.1 第一次验证

  1. baseline_models = dummy_models(sub_signals = 12, metrics = METRICS)
  2. history = [0]*32
  3. epochs = 500
  4. #range will be 32 in actual code
  5. for j in range(1):
  6. print(f'Individual Net : {j+1}')
  7. x_train, x_val, y_train, y_val = train_test_split( X_train_12[j],y_train_12, test_size = 0.1, random_state = 42, shuffle = True, stratify = y_train_12)
  8. #Input Dependent dummy model
  9. history[j] = baseline_models[j].fit(x_train, y_train, batch_size = 2048, epochs = epochs, \
  10. validation_data = (x_val, y_val), shuffle = True, verbose = 0)
  11. show_metrics(x_train, x_val, y_train, y_val, baseline_models[j])
  12. show_test_metrics(x_test_12[j], y_test_12, baseline_models[j])
  13. plot_evaluation_curves(history[j], epochs, ('accuracy', 'val_accuracy'))
  14. plot_evaluation_curves(history[j], epochs, ('precision', 'val_precision'))
  15. plot_evaluation_curves(history[j], epochs, ('recall', 'val_recall'))
  16. plot_evaluation_curves(history[j], epochs, ('auc', 'val_auc'))
  17. print("\n-----------------------------------------------------------------------------------------")
  18. baseline_models[j].save("/content/drive/MyDrive/major project/1D-CNN Models/Vanilla Models/bin_12_baseline")
  19. gc.collect()
