- import pandas as pd
- import tensorflow as tf
- import numpy as np
- import tensorflow.keras as keras
- from tensorflow.keras import Sequential
- from tensorflow.keras.layers import Conv1D
- from tensorflow.keras.layers import MaxPooling1D
- from tensorflow.keras.layers import Flatten
- from tensorflow.keras.layers import Dense
- from tensorflow.keras.layers import Dropout
- from tensorflow.keras.layers import BatchNormalization
- from tensorflow.keras.layers import Softmax
- from sklearn.metrics import classification_report
- from keras.utils.vis_utils import plot_model
- from sklearn.preprocessing import StandardScaler
- import matplotlib.pyplot as plt
- import pickle
- from scipy.stats import zscore
- from sklearn.model_selection import train_test_split
- from sklearn import preprocessing
- import gc
- from keras.callbacks import ReduceLROnPlateau
- from collections import Counter
- import seaborn as sns
- from sklearn.metrics import confusion_matrix
- from sklearn.utils.class_weight import compute_class_weight
- from imblearn.over_sampling import RandomOverSampler, SMOTE, ADASYN
- all_sub_data = np.load("/content/drive/MyDrive/major project/all_sub_data.npy")
- print(all_sub_data.shape)
- for sub in range(all_sub_data.shape[0]):
- all_sub_data[sub] = zscore(all_sub_data[sub], axis = 1) #zscore normalize each channel
- gc.collect()
- if(inp_choice == 'm'):
- labels = pd.read_excel("/content/drive/MyDrive/major project/metadata/Labels.xls")
- #for multiclass classification
- sub_labels = labels["Valence-Arousal Model Quadrant"].astype('int')
- gc.collect()
- sub_labels
- #Distribution of Multi-Class Labels
- if(inp_choice == 'm'):
- # Add frequency bar plot here for dataset label distribution
- c = Counter(sub_labels)
- print(c)
- plt.figure()
- plt.bar([0,1,2,3], [c[0], c[1], c[2], c[3]])
- plt.show()
- #Distribution of Binary Class Labels
- if(inp_choice == 'b'):
- # Add frequency bar plot here for dataset label distribution
- c = Counter(sub_labels)
- print(c)
- plt.figure()
- plt.bar([0,1], [c[0], c[1]])
- plt.show()
- ### Label Binarization of multi-class labels
- if(inp_choice == 'm'):
- multi_class_weights = compute_class_weights("balanced", classes = [0,1,2,3], y=sub_labels)
- print(multi_class_weights)
- d_multi_class_weights = dict(enumerate(multi_class_weights))
- print(d_multi_class_weights)
- if(inp_choice == 'm'):
- lb = preprocessing.LabelBinarizer()
- sub_labels = lb.fit_transform(sub_labels)
- print(lb.classes_)
- print(sub_labels.shape)
- print(sub_labels)
- ### Label Binarization of 2/Binary labels
- if(inp_choice == 'b'):
- bin_class_weights = compute_class_weight("balanced", classes = [0,1], y=sub_labels)
- print(bin_class_weights)
- d_bin_class_weights = dict(enumerate(bin_class_weights))
- print(d_bin_class_weights)
- def encode(x):
- if(x==1):
- return [0,1]
- elif(x==0):
- return [1,0]
- else:
- print("invalid value")
- return None
- if(inp_choice == 'b'):
- sub_labels_bin = np.array(list(map(encode, sub_labels)))
- print(sub_labels_bin.shape)
- print(sub_labels_bin[:6])
- if(inp_choice == 'b'):
- sub_labels_bin[0]
- if(inp_choice == 'b'):
- sub_labels = sub_labels_bin
- gc.collect()
- print(sub_labels.shape)
- def inv_bin(x):
- if(x == [0,1]):
- return 1
- elif(x==[1,0]):
- return 0
- X_train, X_test, y_train, y_test = train_test_split(all_sub_data, sub_labels, test_size =
- 0.1, random_state = 42,shuffle = True,
- stratify = sub_labels)
- X_train.shape, y_train.shape, X_test.shape, y_test.shape
- #12,6 and 4 subsignals are generated from 8064 length EEG signal, labels repeated accordingly
- y_train_12 = np.repeat(y_train, 12, axis = 0)
- #y_train_6 = np.repeat(y_train, 6, axis = 0)
- #y_train_4 = np.repeat(y_train, 4, axis = 0)
- del(y_train)
- gc.collect()
- #print(y_train_12.shape, y_train_6.shape, y_train_4.shape)
- print(y_train_12.shape)
- try:
- del y_train_6, y_train_4
- except:
- pass
- gc.collect()
- try:
- del c_train, c_test, c_train_12, c
- except:
- pass
- gc.collect()
- ## Loading Training data with different window sizes
- channel_wise = np.transpose(X_train, (1,0,2))
- del(X_train)
- gc.collect()
- def process_input(instances, sub_signals):
- #instances must be channel wise of shape (32, -1, 8064)
- samples = int(8064/sub_signals)
- transformed = []
- for i in range(instances.shape[0]):
- transformed.append(np.reshape(instances[i], (-1,samples,1)))
- transformed = np.array(transformed)
- print(transformed.shape, 'is the shape obtained.')
- gc.collect()
- return transformed
### 12 sub signals of length 672 each, total 13824 instances
- #X_train_12 = np.load("/content/drive/MyDrive/major project/data_augmentation/channel_wise_12.npy")
- #print(X_train_12.shape, 'Shape of Training Data')
- X_train_12 = process_input(channel_wise, 12)
- gc.collect()
- #X_train_6 = np.load("/content/drive/MyDrive/major project/data_augmentation/channel_wise_6.npy")
- #print(X_train_6.shape, 'Shape of Training Data')
- #gc.collect()
- #X_train_4 = np.load("/content/drive/MyDrive/major project/data_augmentation/channel_wise_4.npy")
- #print(X_train_4.shape, 'Shape of Training Data')
- #gc.collect()
- def process_input_ensemble(instances, sub_signals):
- """
- This Function explicity adds a dimension for the sub_signals, hence is used for ensembel modeling to traverse that dimension.
- Otherwise also we can simply assume, since the dataset is ordered that the groups of len(sub_signals) are obtained from one EEG signal
- """
- #instances must be channel wise of shape (32, -1, 8064)
- gc.collect()
- samples = int(8064/sub_signals)
- transformed = []
- for i in range(instances.shape[0]):
- transformed.append(np.reshape(instances[i], (-1,sub_signals,samples,1)))
- gc.collect()
- transformed = np.array(transformed)
- print(transformed.shape, 'is the shape obtained.')
- gc.collect()
- #output shape will be (len(intances), 32, sub_signals, samples, 1)
- return transformed
- X_test_channels = np.transpose(X_test, (1,0,2))
- x_test_12 = process_input(X_test_channels, 12)
- y_test_12 = np.repeat(np.array(y_test), 12, axis = 0)
- print(x_test_12.shape, y_test_12.shape)
- """
- x_test_6 = process_input(X_test_channels, 6)
- y_test_6 = np.repeat(np.array(y_test), 6, axis = 0)
- print(x_test_6.shape, y_test_6.shape)
- """
- """
- x_test_6 = process_input(X_test_channels, 6)
- y_test_6 = np.repeat(np.array(y_test), 6, axis = 0)
- print(x_test_6.shape, y_test_6.shape)
- """
- def plot_evaluation_curves(history, EPOCHS, mtrc):
- acc = history.history[mtrc[0]]
- val_acc = history.history[mtrc[1]]
- loss = history.history['loss']
- val_loss = history.history['val_loss']
- epochs_range = range(EPOCHS)
- plt.figure(figsize=(10, 6))
- plt.subplot(1, 2, 1)
- plt.plot(epochs_range, acc, label='Training ' + mtrc[0])
- plt.plot(epochs_range, val_acc, label='Validation '+ mtrc[0])
- plt.legend(loc='lower right')
- plt.title('Training and Validation ' + mtrc[0])
- plt.subplot(1, 2, 2)
- plt.plot(epochs_range, loss, label='Training Loss')
- plt.plot(epochs_range, val_loss, label='Validation Loss')
- plt.legend(loc='upper right')
- plt.title('Training and Validation Loss')
- plt.show()
- def make_confusion_matrix(cf,
- group_names=None,
- categories='auto',
- count=True,
- percent=True,
- cbar=True,
- xyticks=True,
- xyplotlabels=True,
- sum_stats=True,
- figsize=None,
- cmap='Blues',
- title=None):
- '''
- This function will make a pretty plot of an sklearn Confusion Matrix cm using a Seaborn heatmap visualization.
- Arguments
- ---------
- cf: confusion matrix to be passed in
- group_names: List of strings that represent the labels row by row to be shown in each square.
- categories: List of strings containing the categories to be displayed on the x,y axis. Default is 'auto'
- count: If True, show the raw number in the confusion matrix. Default is True.
- normalize: If True, show the proportions for each category. Default is True.
- cbar: If True, show the color bar. The cbar values are based off the values in the confusion matrix.
- Default is True.
- xyticks: If True, show x and y ticks. Default is True.
- xyplotlabels: If True, show 'True Label' and 'Predicted Label' on the figure. Default is True.
- sum_stats: If True, display summary statistics below the figure. Default is True.
- figsize: Tuple representing the figure size. Default will be the matplotlib rcParams value.
- cmap: Colormap of the values displayed from matplotlib.pyplot.cm. Default is 'Blues'
- See http://matplotlib.org/examples/color/colormaps_reference.html
- title: Title for the heatmap. Default is None.
- '''
- blanks = ['' for i in range(cf.size)]
- if group_names and len(group_names)==cf.size:
- group_labels = ["{}\n".format(value) for value in group_names]
- else:
- group_labels = blanks
- if count:
- group_counts = ["{0:0.0f}\n".format(value) for value in cf.flatten()]
- else:
- group_counts = blanks
- if percent:
- group_percentages = ["{0:.2%}".format(value) for value in cf.flatten()/np.sum(cf)]
- else:
- group_percentages = blanks
- box_labels = [f"{v1}{v2}{v3}".strip() for v1, v2, v3 in zip(group_labels,group_counts,group_percentages)]
- box_labels = np.asarray(box_labels).reshape(cf.shape[0],cf.shape[1])
- if sum_stats:
- #Accuracy is sum of diagonal divided by total observations
- accuracy = np.trace(cf) / float(np.sum(cf))
- #if it is a binary confusion matrix, show some more stats
- if len(cf)==2:
- #Metrics for Binary Confusion Matrices
- precision = cf[1,1] / sum(cf[:,1])
- recall = cf[1,1] / sum(cf[1,:])
- f1_score = 2*precision*recall / (precision + recall)
- stats_text = "\n\nAccuracy={:0.3f}\nPrecision={:0.3f}\nRecall={:0.3f}\nF1 Score={:0.3f}".format(
- accuracy,precision,recall,f1_score)
- else:
- stats_text = "\n\nAccuracy={:0.3f}".format(accuracy)
- else:
- stats_text = ""
- if figsize==None:
- #Get default figure size if not set
- figsize = plt.rcParams.get('figure.figsize')
- if xyticks==False:
- #Do not show categories if xyticks is False
- categories=False
- plt.figure(figsize=figsize)
- sns.heatmap(cf,annot=box_labels,fmt="",cmap=cmap,cbar=cbar,xticklabels=categories,yticklabels=categories)
- if xyplotlabels:
- plt.ylabel('True label')
- plt.xlabel('Predicted label' + stats_text)
- else:
- plt.xlabel(stats_text)
- if title:
- plt.title(title)
- def show_test_metrics(x_test, y_test, model):
- print("Testing Set MEASURES: ")
- #Testing Data metrics
- pred_y = model.predict(x_test)
- c_pred = Counter(np.argmax(pred_y, axis = 1))
- print(c_pred, "Predicted Distribution of Testing Dataset")
- c_true = Counter(np.argmax(y_test, axis = 1))
- print(c_true, "Actual Distribution of Testing Dataset")
- print(model.evaluate(x_test, y_test), "Testing Measures of Model.")
- print("Classification Report of Model on Testing Data")
- print(classification_report(np.argmax(y_test, axis =1), np.argmax(model.predict(x_test), axis = 1) , digits = 4) )
- make_confusion_matrix(confusion_matrix(np.argmax(y_test, axis = 1), np.argmax(model.predict(x_test), axis = 1)))
- print("------------------------------------------------------------\n")
- def show_metrics(x_train=None, x_val=None, y_train=None, y_val=None, model=None):
- #Training Data metrics
- pred_y = model.predict(x_train)
- c_pred = Counter(np.argmax(pred_y, axis = 1))
- print(c_pred, "Predicted Distribution of Training Dataset")
- c_true = Counter(np.argmax(y_train, axis = 1))
- print(c_true, "Actual Distribution of Training Dataset")
- print(model.evaluate(x_train, y_train), "Training Measures of Model.")
- print("Classification Report of Model on Training Data")
- print(classification_report(np.argmax(y_train, axis =1), np.argmax(model.predict(x_train), axis = 1) , digits = 4) )
- make_confusion_matrix(confusion_matrix(np.argmax(y_train, axis = 1), np.argmax(model.predict(x_train), axis = 1)))
- print("\n------------------------------------------------------------------------------------------\n")
- #Validation Data Metrics
- pred_y = model.predict(x_val)
- c_pred = Counter(np.argmax(pred_y, axis = 1))
- print(c_pred, "Predicted Distribution of Validation Dataset")
- c_true = Counter(np.argmax(y_val, axis = 1))
- print(c_true, "Actual Distribution of Validation Dataset")
- print(model.evaluate(x_val, y_val), "Validation Measures of Model.")
- print("Classification Report of Model on Validation Data")
- print(classification_report(np.argmax(y_val, axis = 1), np.argmax(model.predict(x_val), axis = 1), digits = 4))
- make_confusion_matrix(confusion_matrix(np.argmax(y_val, axis = 1), np.argmax(model.predict(x_val), axis =1)) )
- print("\n------------------------------------------------------------------------------------------\n")
- gc.collect()
- keras.metrics.TruePositives(name='tp'),
- keras.metrics.FalsePositives(name='fp'),
- keras.metrics.TrueNegatives(name='tn'),
- keras.metrics.FalseNegatives(name='fn'),
- keras.metrics.BinaryAccuracy(name='accuracy'),
- keras.metrics.Precision(name='precision'),
- keras.metrics.Recall(name='recall'),
- keras.metrics.AUC(name='auc'),
- ]
- def dummy_models(sub_signals=12, metrics = METRICS):
- sample_size = int(8064/sub_signals)
- models = [0]*32
- for i in range(32):
- models[i] = Sequential()
- models[i].add(Dense(100, activation = 'relu', input_shape = (sample_size, 1)))
- models[i].add(Flatten())
- models[i].add(Dense(40, activation = 'relu'))
- try:
- #fc2 and softmax
- if (inp_choice == 'b'):
- #binary classification
- models[i].add(Dense(2, activation = 'softmax'))
- models[i].compile(optimizer= tf.keras.optimizers.Adam(learning_rate = 1e-4) , loss = tf.keras.losses.BinaryCrossentropy(), metrics= metrics)
- elif (inp_choice == 'm'):
- #mutliclass classification
- models[i].add(Dense(4, activation = 'softmax'))
- models[i].compile(optimizer= tf.keras.optimizers.Adam(learning_rate=1e-4) , loss = tf.keras.losses.CategoricalCrossentropy(), metrics= metrics)
- except ValueError:
- raise ValueError("ValueError exception thrown. Invalid Classfication choice, No Model was Created")
- print("All models defined.")
- return models
- null_data = np.zeros((13824, 672, 1))
- def create_models(dense_par=20, sub_signals=12, metrics = METRICS):
- sample_size = int(8064/sub_signals)
- models = [0]*32
- for i in range(32):
- models[i] = Sequential()
- #block 1
- models[i].add(Conv1D(filters=32, kernel_size=5,strides = 3, input_shape=(sample_size, 1)))
- models[i].add(BatchNormalization())
- models[i].add(tf.keras.layers.Activation('relu'))
- #block 2
- models[i].add(Conv1D(filters=24, kernel_size=3,strides = 2))
- models[i].add(BatchNormalization())
- models[i].add(tf.keras.layers.Activation('relu'))
- #block 3
- models[i].add(Conv1D(filters=16, kernel_size=3,strides = 2))
- models[i].add(BatchNormalization())
- models[i].add(tf.keras.layers.Activation('relu'))
- #block 4
- models[i].add(Conv1D(filters=8, kernel_size=3,strides = 2))
- models[i].add(BatchNormalization())
- models[i].add(tf.keras.layers.Activation('relu'))
- #fc-1
- models[i].add(Flatten())
- models[i].add(Dense(dense_par, activation='relu'))
- #dropout
- models[i].add(Dropout(rate = 0.5))
- try:
- #fc2 and softmax
- if (inp_choice == 'b'):
- #binary classification
- models[i].add(Dense(2, activation = 'softmax'))
- models[i].compile(optimizer= tf.keras.optimizers.Adam(learning_rate = 1e-4) , loss = tf.keras.losses.BinaryCrossentropy(), metrics= metrics)
- elif (inp_choice == 'm'):
- #mutliclass classification
- models[i].add(Dense(4, activation = 'softmax'))
- models[i].compile(optimizer= tf.keras.optimizers.Adam(learning_rate=1e-4) , loss = tf.keras.losses.CategoricalCrossentropy(), metrics= metrics)
- except ValueError:
- raise ValueError("ValueError exception thrown. Invalid Classfication choice, No Model was Created")
- print("All models defined.")
- return models
- baseline_models = dummy_models(sub_signals = 12, metrics = METRICS)
- history = [0]*32
- epochs = 500
- #range will be 32 in actual code
- for j in range(1):
- print(f'Individual Net : {j+1}')
- x_train, x_val, y_train, y_val = train_test_split( X_train_12[j],y_train_12, test_size = 0.1, random_state = 42, shuffle = True, stratify = y_train_12)
- #Input Dependent dummy model
- history[j] = baseline_models[j].fit(x_train, y_train, batch_size = 2048, epochs = epochs, \
- validation_data = (x_val, y_val), shuffle = True, verbose = 0)
- show_metrics(x_train, x_val, y_train, y_val, baseline_models[j])
- show_test_metrics(x_test_12[j], y_test_12, baseline_models[j])
- plot_evaluation_curves(history[j], epochs, ('accuracy', 'val_accuracy'))
- plot_evaluation_curves(history[j], epochs, ('precision', 'val_precision'))
- plot_evaluation_curves(history[j], epochs, ('recall', 'val_recall'))
- plot_evaluation_curves(history[j], epochs, ('auc', 'val_auc'))
- print("\n-----------------------------------------------------------------------------------------")
- baseline_models[j].save("/content/drive/MyDrive/major project/1D-CNN Models/Vanilla Models/bin_12_baseline")
- gc.collect()
- gc.collect()
