赞
踩
(注:从小白角度出发,刚接触卷积神经网络的小伙伴,很多人和我一样就是想知道这段代码怎么把信号输入进去,怎么看输出结果,怎么把输出结果与测试集的数据进行对比,从而知道测试结果,这些我在下面有解释。本文最后会附有链接,包括我用的数据,源码。大家可以看一下数据格式,我当时就是不知道表格里的数据到底是什么格式,然后搞了好久!!!!如果有问题的小伙伴可以留言,我会尽力解答。。)
编辑器:Anaconda+jupyter
环境 :python :3.7.10
tensorflow::2.3.0
代码如下:
import keras from scipy.io import loadmat import matplotlib.pyplot as plt import glob import numpy as np import pandas as pd import math import os from keras.layers import * from keras.models import * from keras.optimizers import * //这里是我导入的训练集数据训练集,大家对应自己的信号数据就好,数据我下面会发,大家可以看一下数据的格式; MANIFEST_DIR = r'C:\Users\Administrator\Desktop\test\frftdata\train\frfttrain1.0.csv' Batch_size = 30 Long = 800 Lens = 200 def convert2oneHot(index, lens): hot = np.zeros((lens,)) hot[int(index)] = 1 return(hot) def xs_gen(path=MANIFEST_DIR, batch_size=Batch_size, train=True, Lens=Lens): img_list = pd.read_csv(path) if train: img_list = np.array(img_list)[:Lens] print("Found %s train items." % len(img_list)) print("list 1 is", img_list[0, -1]) steps = math.ceil(len(img_list) / batch_size) else: img_list = np.array(img_list)[Lens:] print("Found %s test items." % len(img_list)) print("list 1 is", img_list[0, -1]) steps = math.ceil(len(img_list) / batch_size) while True: for i in range(steps): batch_list = img_list[i * batch_size: i * batch_size + batch_size] np.random.shuffle(batch_list) batch_x = np.array([file for file in batch_list[:, 1:-1]]) batch_y = np.array([convert2oneHot(label, 4) for label in batch_list[:, -1]]) yield batch_x, batch_y //这里是导入的我测试集的数据 TEST_MANIFEST_DIR = r'C:\Users\Administrator\Desktop\test\frftdata\test\frfttest1.0.csv' def ts_gen(path=TEST_MANIFEST_DIR, batch_size=Batch_size): img_list = pd.read_csv(path) img_list = np.array(img_list)[:Lens] print("Found %s test items." % len(img_list)) print("list 1 is", img_list[0, -1]) steps = math.ceil(len(img_list) / batch_size) while True: for i in range(steps): batch_list = img_list[i * batch_size:i * batch_size + batch_size] batch_x = np.array([file for file in batch_list[:, 1:]]) yield batch_x TIME_PERIODS = 5000 def build_model(input_shape=(TIME_PERIODS,), num_classes=4): model = Sequential() model.add(Reshape((TIME_PERIODS, 1), input_shape=input_shape)) model.add(Conv1D(16, 8, strides=2, activation='relu', input_shape=(TIME_PERIODS, 1))) model.add(Conv1D(16, 8, strides=2, activation='relu', padding="same")) model.add(MaxPooling1D(2)) model.add(Conv1D(64, 4, strides=2, activation='relu', padding="same")) model.add(Conv1D(64, 4, strides=2, activation='relu', padding="same")) model.add(MaxPooling1D(2)) model.add(Conv1D(256, 4, strides=2, activation='relu', padding="same")) model.add(Conv1D(256, 4, strides=2, activation='relu', padding="same")) model.add(MaxPooling1D(2)) model.add(Conv1D(512, 2, strides=1, activation='relu', padding="same")) model.add(Conv1D(512, 2, strides=1, activation='relu', padding="same")) model.add(MaxPooling1D(2)) """ model.add(Flatten()) model.add(Dropout(0.3)) model.add(Dense(256, activation='relu'))""" model.add(GlobalAveragePooling1D()) model.add(Dropout(0.3)) model.add(Dense(num_classes, activation='softmax')) return(model) Train = True if __name__ == "__main__": if Train == True: train_iter = xs_gen() val_iter = xs_gen(train=False) ckpt = keras.callbacks.ModelCheckpoint( filepath='best_model.{epoch:02d}-{val_loss:.4f}.h5', monitor='val_loss', save_best_only=True, verbose=1 ) model = build_model() opt = Adam(0.0002) model.compile(loss='categorical_crossentropy', optimizer = opt, metrics=['accuracy']) print(model.summary()) train_history = model.fit_generator( generator=train_iter, steps_per_epoch=Lens // Batch_size, epochs=25, initial_epoch=0, validation_data=val_iter, validation_steps=(Long - Lens) // Batch_size, callbacks=[ckpt], ) model.save("finishModel.h5") else: test_iter = ts_gen() model = load_model("best_model.49-0.00.h5") pres = model.predict_generator(generator=test_iter, steps=math.ceil(528 / Batch_size), verbose=1) print(pres.shape) ohpres = np.argmax(pres, axis=1) print(ohpres.shape) df = pd.DataFrame() df["id"] = np.arange(1, len(ohpres) + 1) df["label"] = ohpres df.to_csv("predicts.csv", index=None) test_iter = ts_gen() for x in test_iter: x1 = x[0] break plt.plot(x1) plt.show() def show_train_history(train_history, train, validation): plt.plot(train_history.history[train]) plt.plot(train_history.history[validation]) plt.ylabel('Train History') plt.ylabel(train) plt.xlabel('Epoch') plt.legend(['train', 'validation'], loc='upper left') plt.show() show_train_history(train_history, 'accuracy', 'val_accuracy') show_train_history(train_history, 'loss', 'val_loss') plt.rcParams['font.sans-serif'] = ['SimHei'] plt.figure(figsize=(6, 4)) plt.plot(train_history.history['accuracy'], "g--", label="训练集准确率") plt.plot(train_history.history['val_accuracy'], "g", label="验证集准确率") plt.plot(train_history.history['loss'], "r--", label="训练集损失函数") plt.plot(train_history.history['val_loss'], "r", label="验证集损失函数") plt.title('模型的准确率和损失函数', fontsize=14) plt.ylabel('准确率和损失函数', fontsize=12) plt.xlabel('世代数', fontsize=12) plt.ylim(0) plt.legend() plt.show() //这里是我导入的测试集的标签表格,用来对比神经网络的测试结果,并且后面生成混淆矩阵; //这里的标签就是200个测试集的数据的故障标签 file = r"C:\Users\Administrator\Desktop\shiyong22.csv" all_df = pd.read_csv(file) ndarray = all_df.values ndarray[:2] test_iter = ts_gen() pres = model.predict_generator(generator=test_iter, steps=math.ceil(520 / Batch_size), verbose=1) print(pres.shape) print(ndarray.shape) ohpres = np.argmax(pres, axis=1) print(ohpres.shape) ohpres=ohpres[:200] ohpres import matplotlib.pyplot as plt from sklearn.metrics import confusion_matrix import numpy as np def cm_plot(original_label, predict_label, pic=None): cm = confusion_matrix(original_label, predict_label) plt.figure() plt.matshow(cm, cmap=plt.cm.GnBu) plt.colorbar() for x in range(len(cm)): for y in range(len(cm)): plt.annotate(cm[x, y], xy=(x, y), horizontalalignment='center', verticalalignment='center') plt.ylabel('Predicted label') plt.xlabel('True label') plt.title('Confusion Matrix') if pic is not None: plt.savefig(str(pic) + '.jpg') plt.show() plt.rcParams['font.sans-serif'] = 'SimHei' plt.rcParams['axes.unicode_minus'] = False cm_plot(ndarray, ohpres) from sklearn.metrics import accuracy_score accuracy_score(ndarray, ohpres) train_history.history['loss'] train_history.history['val_loss'] train_history.history['val_accuracy'] train_history.history['accuracy']
数据下载链接:
https://gitee.com/wjj_xiaoxiansheng/cnn_-frft_-data
数据介绍
类别:标签0、1、2、3分别为正常状态、内圈故障、外圈故障、滚动体故障;
信号:每个样本信号5000个数据点,共有1000个样本。从中随机抽取800个样本作为训练集,另外200个样本作为测试集。
注:我是对分数阶傅里叶变换做训练和测试,阶次(从0到1,间隔0.05,阶次为0时就是原始时域信号,阶次为1是就是傅里叶变换的数据结果,从文件夹名称可以看出。大家可以只用0.0阶次就是原始时域信号试一下。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。