当前位置:   article > 正文

Pytorch实战:基于鲸鱼WOA优化1DCNN的轴承故障诊断_pytorch 1dcnn

pytorch 1dcnn

目录

0.引言

1.关键点

2.WOA优化1DCNN超参数实战

2.1 数据准备

2.2 1DCNN故障诊断建模

2.3 采用WOA优化1DCNN超参数


0.引言

        采用1DCNN进行轴承故障诊断建模,并基于鲸鱼优化算法WOA对1DCNN的超参数进行优化,以实现更高的精度。建立一个两层的1DCNN,优化的参数包括学习率、训练次数、batchsize,卷积层1的核数量、核大小,池化层1的核大小,卷积层2的核数量、核大小,池化层2的核大小,全连接层1、全连接层2的节点数,总共11个超参数。

1.关键点

        在Pytorch中,卷积层与池化层由于无法像tensorflow中一样,将padding设置成“same”模式,因此每一层的输出要手动计算,并且与全连接层的输入节点参数也要精确计算出来,否则节点数不匹配,容易报错。而我们采用优化算法来进行优化的,每一层的参数不是固定的,所以第一步是实现像tensorflow中一样,将卷积层与池化层设计成padding具备“same”模式的结构,代码如下:

  1. class Conv1d(torch.nn.Module):
  2. def __init__(self, in_channels, out_channels, kernel_size, bias=True, padding_layer=nn.ReflectionPad1d):
  3. super().__init__()
  4. ka = kernel_size // 2
  5. kb = ka - 1 if kernel_size % 2 == 0 else ka
  6. self.net = torch.nn.Sequential(
  7. padding_layer((ka,kb)),
  8. nn.Conv1d(in_channels, out_channels, kernel_size, bias=bias)
  9. )
  10. def forward(self, x):
  11. return self.net(x)
  12. class MaxPool1d(torch.nn.Module):
  13. def __init__(self, kernel_size):
  14. super().__init__()
  15. self.net=torch.nn.MaxPool1d(kernel_size=kernel_size)
  16. def forward(self, x):
  17. x1=self.net(x)
  18. padsize=x.size(2)-x1.size(2)
  19. ka = padsize // 2
  20. kb = ka if padsize % 2 == 0 else ka+1
  21. return F.pad(x1,(ka,kb))
  22. net1=Conv1d(1,3,10)#输入通道、输出通道数、卷积核大小
  23. net2=MaxPool1d(3)#池化核大小
  24. dummy=torch.rand(16,1,101)
  25. print(net1(dummy).size())
  26. print(net1(dummy).size())
  27. # torch.Size([16, 3, 101])
  28. # torch.Size([16, 3, 101])

可以看出,无论怎么设置输入的长度,与卷积、池化参数,他的输出长度都是与输入的长度都是一样的。

采用上述代码设计一个两层的1DCNN,代码如下 

  1. class ConvNet(torch.nn.Module):
  2. def __init__(self,num_input,nk1,k1,pk1,nk2,k2,pk2,fc1,fc2, num_classes):
  3. super(ConvNet, self).__init__()
  4. # 1D-CNN 输入1*1024振动信号
  5. self.net = nn.Sequential(
  6. Conv1d(1,nk1 , kernel_size=k1),
  7. MaxPool1d(kernel_size=pk1),
  8. nn.ReLU(),
  9. nn.BatchNorm1d(nk1),
  10. Conv1d(nk1, nk2, kernel_size=k2),
  11. MaxPool1d(kernel_size=pk2),
  12. nn.ReLU(),
  13. nn.BatchNorm1d(nk2)
  14. )
  15. self.feature_extractor = nn.Sequential(
  16. nn.Linear(num_input*nk2, fc1),
  17. nn.ReLU(),
  18. # nn.Dropout(0.5),
  19. nn.Linear(fc1, fc2))
  20. self.classifier=nn.Sequential(
  21. nn.ReLU(),
  22. nn.Linear(fc2, num_classes),
  23. )
  24. def forward(self,x):
  25. x= self.net(x)#进行卷积+池化操作提取振动信号特征
  26. x=x.view(-1, x.size(1)*x.size(2))
  27. feature = self.feature_extractor(x)#将上述特征拉伸为向量输入进全连接层实现分类
  28. logits = self.classifier(feature)#将上述特征拉伸为向量输入进全连接层实现分类
  29. probas = F.softmax(logits, dim=1)# softmax分类器
  30. return logits,probas
  31. net=ConvNet(101,8,3,3,16,3,4,128,128,10)
  32. dummy=torch.rand(16,1,101)
  33. print(net(dummy)[0].size())
  34. # torch.Size([16, 10])
  35. net=ConvNet(111,8,7,3,16,7,4,256,128,10)
  36. dummy=torch.rand(16,1,111)
  37. print(net(dummy)[0].size())
  38. # torch.Size([16, 10])

可以看出,无论怎么设置输入的长度,与卷积、池化参数,他的输出都是16x10(16是batchsize,10是类别数)

2.WOA优化1DCNN超参数实战

2.1 数据准备

        数据依旧采用48k的驱动端轴承故障诊断数据,每种故障样本数为200,每个样本的长度为1024,按照7:2:1的比例划分训练集、验证集、测试集

  1. #coding:utf-8
  2. from scipy.io import loadmat
  3. from scipy.io import savemat
  4. import numpy as np
  5. import os
  6. from sklearn import preprocessing # 0-1编码
  7. from sklearn.model_selection import StratifiedShuffleSplit # 随机划分,保证每一类比例相同
  8. def prepro(d_path, length=864, number=1000, normal=True, rate=[0.7, 0.2, 0.1], enc=True, enc_step=28):
  9. """对数据进行预处理,返回train_X, train_Y, valid_X, valid_Y, test_X, test_Y样本.
  10. :param d_path: 源数据地址
  11. :param length: 信号长度,默认2个信号周期,864
  12. :param number: 每种信号个数,总共10类,默认每个类别1000个数据
  13. :param normal: 是否标准化.True,Fales.默认True
  14. :param rate: 训练集/验证集/测试集比例.默认[0.5,0.25,0.25],相加要等于1
  15. :param enc: 训练集、验证集是否采用数据增强.Bool,默认True
  16. :param enc_step: 增强数据集采样顺延间隔
  17. :return: Train_X, Train_Y, Valid_X, Valid_Y, Test_X, Test_Y
  18. ```
  19. import preprocess.preprocess_nonoise as pre
  20. train_X, train_Y, valid_X, valid_Y, test_X, test_Y = pre.prepro(d_path=path,
  21. length=864,
  22. number=1000,
  23. normal=False,
  24. rate=[0.5, 0.25, 0.25],
  25. enc=True,
  26. enc_step=28)
  27. ```
  28. """
  29. # 获得该文件夹下所有.mat文件名
  30. filenames = os.listdir(d_path)
  31. def capture(original_path):
  32. """读取mat文件,返回字典
  33. :param original_path: 读取路径
  34. :return: 数据字典
  35. """
  36. files = {}
  37. for i in filenames:
  38. # 文件路径
  39. file_path = os.path.join(d_path, i)
  40. file = loadmat(file_path)
  41. file_keys = file.keys()
  42. for key in file_keys:
  43. if 'DE' in key:
  44. files[i] = file[key].ravel()
  45. return files
  46. def slice_enc(data, slice_rate=rate[1] + rate[2]):
  47. """将数据切分为前面多少比例,后面多少比例.
  48. :param data: 单挑数据
  49. :param slice_rate: 验证集以及测试集所占的比例
  50. :return: 切分好的数据
  51. """
  52. keys = data.keys()
  53. Train_Samples = {}
  54. Test_Samples = {}
  55. for i in keys:
  56. slice_data = data[i]
  57. all_lenght = len(slice_data)
  58. end_index = int(all_lenght * (1 - slice_rate))
  59. samp_train = int(number * (1 - slice_rate)) # 700
  60. Train_sample = []
  61. Test_Sample = []
  62. if enc:
  63. enc_time = length // enc_step
  64. samp_step = 0 # 用来计数Train采样次数
  65. for j in range(samp_train):
  66. random_start = np.random.randint(low=0, high=(end_index - 2 * length))
  67. label = 0
  68. for h in range(enc_time):
  69. samp_step += 1
  70. random_start += enc_step
  71. sample = slice_data[random_start: random_start + length]
  72. Train_sample.append(sample)
  73. if samp_step == samp_train:
  74. label = 1
  75. break
  76. if label:
  77. break
  78. else:
  79. for j in range(samp_train):
  80. random_start = np.random.randint(low=0, high=(end_index - length))
  81. sample = slice_data[random_start:random_start + length]
  82. Train_sample.append(sample)
  83. # 抓取测试数据
  84. for h in range(number - samp_train):
  85. random_start = np.random.randint(low=end_index, high=(all_lenght - length))
  86. sample = slice_data[random_start:random_start + length]
  87. Test_Sample.append(sample)
  88. Train_Samples[i] = Train_sample
  89. Test_Samples[i] = Test_Sample
  90. return Train_Samples, Test_Samples
  91. # 仅抽样完成,打标签
  92. def add_labels(train_test):
  93. X = []
  94. Y = []
  95. label = 0
  96. for i in filenames:
  97. x = train_test[i]
  98. X += x
  99. lenx = len(x)
  100. Y += [label] * lenx
  101. label += 1
  102. return X, Y
  103. # one-hot编码
  104. def one_hot(Train_Y, Test_Y):
  105. Train_Y = np.array(Train_Y).reshape([-1, 1])
  106. Test_Y = np.array(Test_Y).reshape([-1, 1])
  107. Encoder = preprocessing.OneHotEncoder()
  108. Encoder.fit(Train_Y)
  109. Train_Y = Encoder.transform(Train_Y).toarray()
  110. Test_Y = Encoder.transform(Test_Y).toarray()
  111. Train_Y = np.asarray(Train_Y, dtype=np.int32)
  112. Test_Y = np.asarray(Test_Y, dtype=np.int32)
  113. return Train_Y, Test_Y
  114. def scalar_stand(Train_X, Test_X):
  115. # 用训练集标准差标准化训练集以及测试集
  116. scalar = preprocessing.StandardScaler().fit(Train_X)
  117. Train_X = scalar.transform(Train_X)
  118. Test_X = scalar.transform(Test_X)
  119. return Train_X, Test_X
  120. def valid_test_slice(Test_X, Test_Y):
  121. test_size = rate[2] / (rate[1] + rate[2])
  122. ss = StratifiedShuffleSplit(n_splits=1, test_size=test_size)
  123. for train_index, test_index in ss.split(Test_X, Test_Y):
  124. X_valid, X_test = Test_X[train_index], Test_X[test_index]
  125. Y_valid, Y_test = Test_Y[train_index], Test_Y[test_index]
  126. return X_valid, Y_valid, X_test, Y_test
  127. # 从所有.mat文件中读取出数据的字典
  128. data = capture(original_path=d_path)
  129. # 将数据切分为训练集、测试集
  130. train, test = slice_enc(data)
  131. # 为训练集制作标签,返回X,Y
  132. Train_X, Train_Y = add_labels(train)
  133. # 为测试集制作标签,返回X,Y
  134. Test_X, Test_Y = add_labels(test)
  135. # 为训练集Y/测试集One-hot标签
  136. Train_Y, Test_Y = one_hot(Train_Y, Test_Y)
  137. # 训练数据/测试数据 是否标准化.
  138. if normal:
  139. Train_X, Test_X = scalar_stand(Train_X, Test_X)
  140. else:
  141. # 需要做一个数据转换,转换成np格式.
  142. Train_X = np.asarray(Train_X)
  143. Test_X = np.asarray(Test_X)
  144. # 将测试集切分为验证集合和测试集.
  145. Valid_X, Valid_Y, Test_X, Test_Y = valid_test_slice(Test_X, Test_Y)
  146. return Train_X, Train_Y, Valid_X, Valid_Y, Test_X, Test_Y
  147. if __name__ == "__main__":
  148. path = '0HP/'
  149. train_X, train_Y, valid_X, valid_Y, test_X, test_Y = prepro(d_path=path,
  150. length=1024,
  151. number=200,
  152. normal=True,
  153. rate=[0.7, 0.2, 0.1],
  154. enc=False,
  155. enc_step=28)
  156. savemat("data_process.mat", {'train_X': train_X,'train_Y': train_Y,
  157. 'valid_X': valid_X,'valid_Y': valid_Y,
  158. 'test_X': test_X,'test_Y': test_Y})

2.2 1DCNN故障诊断建模

        基于1中的1DCNN进行故障诊断建模,参数我们随意设置,测试集精度为80.5%(可以手动调参,提高精度,不过我比较懒,而且要对比出优化的重要性)

  1. # coding: utf-8
  2. # In[1]: 导入必要的库函数
  3. import numpy as np
  4. import torch
  5. import torch.nn as nn
  6. import torch.nn.functional as F
  7. from sklearn.preprocessing import MinMaxScaler,StandardScaler
  8. from model import ConvNet,Model_fit
  9. import matplotlib.pyplot as plt
  10. if torch.cuda.is_available():
  11. torch.backends.cudnn.deterministic = True
  12. from scipy.io import loadmat
  13. device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
  14. # In[2] 加载数据
  15. num_classes=10
  16. # 振动信号----1D-CNN输入
  17. data=loadmat('data_process.mat')
  18. x_train1=data['train_X']
  19. x_valid1=data['valid_X']
  20. y_train=data['train_Y'].argmax(axis=1)
  21. y_valid=data['valid_Y'].argmax(axis=1)
  22. ss1=StandardScaler().fit(x_train1) #MinMaxScaler StandardScaler
  23. x_train1=ss1.transform(x_train1)
  24. x_valid1=ss1.transform(x_valid1)
  25. x_train1=x_train1.reshape(-1,1,1024)
  26. x_valid1=x_valid1.reshape(-1,1,1024)
  27. # 转换为torch的输入格式
  28. train_features1 = torch.tensor(x_train1).type(torch.FloatTensor)
  29. valid_features1 = torch.tensor(x_valid1).type(torch.FloatTensor)
  30. train_labels = torch.tensor(y_train).type(torch.LongTensor)
  31. valid_labels = torch.tensor(y_valid).type(torch.LongTensor)
  32. print(train_features1.shape)
  33. print(train_labels.shape)
  34. N=train_features1.size(0)
  35. # In[3]: 参数设置
  36. learning_rate = 0.005#学习率
  37. num_epochs = 10#迭代次数
  38. batch_size = 64 #batchsize
  39. # In[4]: 模型设置
  40. torch.manual_seed(0)
  41. torch.cuda.manual_seed(0)
  42. model=ConvNet(train_features1.size(-1),8,3,3,16,3,4,128,128,10)
  43. train_again=True # True就重新训练
  44. if train_again:
  45. # In[5]:
  46. Model=Model_fit(model,batch_size,learning_rate,num_epochs,device,verbose=True)
  47. Model.train(train_features1,train_labels,valid_features1,valid_labels)
  48. model= Model.model
  49. train_loss=Model.train_loss
  50. valid_loss=Model.valid_loss
  51. valid_acc=Model.valid_acc
  52. train_acc=Model.train_acc
  53. torch.save(model,'model/W_CNN1.pkl')#保存整个网络参数
  54. # In[]
  55. #loss曲线
  56. plt.figure()
  57. plt.plot(np.array(train_loss),label='train')
  58. plt.plot(np.array(valid_loss),label='valid')
  59. plt.title('loss curve')
  60. plt.legend()
  61. plt.savefig('图片保存/loss')
  62. # accuracy 曲线
  63. plt.figure()
  64. plt.plot(np.array(train_acc),label='train')
  65. plt.plot(np.array(valid_acc),label='valid')
  66. plt.title('accuracy curve')
  67. plt.legend()
  68. plt.savefig('图片保存/accuracy')
  69. plt.show()
  70. else:
  71. model=torch.load('model/W_CNN1.pkl',map_location=torch.device('cpu'))#加载模型
  72. Model=Model_fit(model,batch_size,learning_rate,num_epochs,device,verbose=True)
  73. # In[6]: 利用训练好的模型 对测试集进行分类
  74. #提取测试集
  75. x_test1=data['test_X']
  76. y_test=data['test_Y'].argmax(axis=1)
  77. x_test1=ss1.transform(x_test1)
  78. x_test1=x_test1.reshape(-1,1,1024)
  79. test_features1 = torch.tensor(x_test1).type(torch.FloatTensor)
  80. test_labels = torch.tensor(y_test).type(torch.LongTensor)
  81. _,teac=Model.compute_accuracy(test_features1,test_labels)
  82. print('CNN直接分类的测试集正确率为:',teac*100,'%')

2.3 采用WOA优化1DCNN超参数

        以最小化验证集分类错误率为适应度函数进行网络优化,目的是找到一组最优超参数,使得训练好的网络的验证集分类错误率最低。

  1. # coding: utf-8
  2. # In[1]: 导入必要的库函数
  3. import numpy as np
  4. import torch
  5. import torch.nn as nn
  6. import torch.nn.functional as F
  7. from sklearn.preprocessing import MinMaxScaler,StandardScaler
  8. from model import ConvNet,Model_fit
  9. from optim import WOA,HUATU
  10. import matplotlib.pyplot as plt
  11. if torch.cuda.is_available():
  12. torch.backends.cudnn.deterministic = True
  13. from scipy.io import loadmat
  14. device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
  15. #torch.manual_seed(0)
  16. # In[2] 加载数据
  17. num_classes=10
  18. # 振动信号----1D-CNN输入
  19. data=loadmat('data_process.mat')
  20. x_train1=data['train_X']
  21. x_valid1=data['valid_X']
  22. y_train=data['train_Y'].argmax(axis=1)
  23. y_valid=data['valid_Y'].argmax(axis=1)
  24. ss1=StandardScaler().fit(x_train1) #MinMaxScaler StandardScaler
  25. x_train1=ss1.transform(x_train1)
  26. x_valid1=ss1.transform(x_valid1)
  27. x_train1=x_train1.reshape(-1,1,1024)
  28. x_valid1=x_valid1.reshape(-1,1,1024)
  29. # 转换为torch的输入格式
  30. train_features1 = torch.tensor(x_train1).type(torch.FloatTensor)
  31. valid_features1 = torch.tensor(x_valid1).type(torch.FloatTensor)
  32. train_labels = torch.tensor(y_train).type(torch.LongTensor)
  33. valid_labels = torch.tensor(y_valid).type(torch.LongTensor)
  34. # In[] WOA优化CNN
  35. optim_again = True # 为 False 的时候就直接加载之间优化好的超参建建
  36. # 训练模型
  37. if optim_again:
  38. best,trace,process=WOA(train_features1,train_labels,valid_features1,valid_labels)
  39. trace,process=np.array(trace),np.array(process)
  40. np.savez('model/woa_result.npz',trace=trace,best=best,process=process)
  41. else:
  42. para=np.load('model/woa_result.npz')
  43. trace=para['trace'].reshape(-1,)
  44. process=para['process']
  45. best=para['best'].reshape(-1,)
  46. HUATU(trace)
  47. # In[3]: 参数设置
  48. pop=best
  49. learning_rate = pop[0] # 学习率
  50. num_epochs = int(pop[1]) # 迭代次数
  51. batch_size = int(pop[2]) # batchsize
  52. nk1 = int(pop[3]) # conv1核数量
  53. k1 = int(pop[4]) # conv1核大小
  54. pk1 = int(pop[5]) # pool1核大小
  55. nk2 = int(pop[6]) # conv2核数量
  56. k2 = int(pop[7]) # conv2核大小
  57. pk2 = int(pop[8]) # pool2核大小
  58. fc1 = int(pop[9]) #全连接层1节点数
  59. fc2 = int(pop[10]) #全连接层2节点数
  60. torch.manual_seed(0)
  61. torch.cuda.manual_seed(0)
  62. model=ConvNet(train_features1.size(-1),nk1,k1,pk1,nk2,k2,pk2,fc1,fc2,10)
  63. train_again= True #True 就重新训练
  64. # In[5]:
  65. if train_again:
  66. Model=Model_fit(model,batch_size,learning_rate,num_epochs,device,verbose=True)
  67. Model.train(train_features1,train_labels,valid_features1,valid_labels)
  68. model= Model.model
  69. train_loss=Model.train_loss
  70. valid_loss=Model.valid_loss
  71. valid_acc=Model.valid_acc
  72. train_acc=Model.train_acc
  73. torch.save(model,'model/W_CNN2.pkl')#保存整个网络参数
  74. #loss曲线
  75. plt.figure()
  76. plt.plot(np.array(train_loss),label='train')
  77. plt.plot(np.array(valid_loss),label='valid')
  78. plt.title('loss curve')
  79. plt.legend()
  80. plt.savefig('图片保存/loss')
  81. # accuracy 曲线
  82. plt.figure()
  83. plt.plot(np.array(train_acc),label='train')
  84. plt.plot(np.array(valid_acc),label='valid')
  85. plt.title('accuracy curve')
  86. plt.legend()
  87. plt.savefig('图片保存/accuracy')
  88. plt.show()
  89. else:
  90. model=torch.load('model/W_CNN2.pkl',map_location=torch.device('cpu'))#加载模型
  91. Model=Model_fit(model,batch_size,learning_rate,num_epochs,device,verbose=True)
  92. # In[6]: 利用训练好的模型 对测试集进行分类
  93. #提取测试集
  94. x_test1=data['test_X']
  95. y_test=data['test_Y'].argmax(axis=1)
  96. x_test1=ss1.transform(x_test1)
  97. x_test1=x_test1.reshape(-1,1,1024)
  98. test_features1 = torch.tensor(x_test1).type(torch.FloatTensor)
  99. test_labels = torch.tensor(y_test).type(torch.LongTensor)
  100. _,teac=Model.compute_accuracy(test_features1,test_labels)
  101. print('WOA-CNN分类的测试集正确率为:',teac*100,'%')

由于是最小化 验证集分类错误率为适应度函数,所以适应度曲线是一条下降的曲线。

3.代码

代码链接见评论区我的评论

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/很楠不爱3/article/detail/414024
推荐阅读
相关标签
  

闽ICP备14008679号