- import os
- import numpy as np
- import random
- import math
- def normalization(data): # 归一化:均值0,方差1
- data = (data - data.mean()) / data.std()
- return data
- def random_rowl(data): # 把行的顺序随机排列
- idx = np.arange(data.shape[0])
- np.random.shuffle(idx)
- data = data[idx[0:]]
- return data
- def getfilelist(path): # 读取文件夹下所有txt文件
- filelist = []
- for filename in os.listdir(path):
- if os.path.splitext(filename)[1] == '.txt':
- filelist.append(filename)
- random.shuffle(filelist) # 随机打乱文件列表里的顺序
- return filelist
- path = '第一类数据的文件夹子路径','第二类数据的文件夹子路径','第三类数据的文件夹子路径'
- sort_num = 3 # 一共多少类?
- for i in range(sort_num): # 个数修改为路径的数目,与文件夹个数相同
- fileName = "{}/".format(path[i])
- files = getfilelist(fileName)
- # files=getfilelist(path[i]+'/')
- a = len(files)
- print(path[i], a)
- data = np.loadtxt(fileName + files[0])[:, 1]
- data = normalization(data)
- for j in range(1, a):
- data1 = np.loadtxt(fileName + '/' + files[j])[:, 1]
- data1 = normalization(data1)
- data = np.c_[data, data1]
- data = data.T
- # 加标签
- label = np.zeros((len(data), sort_num)) # 标签也改为分类的数目,与文件夹个数相同
- for m in range(len(label)):
- label[m, i] = 1
- data = np.c_[data, label]
- data = random_rowl(data)
- t = math.floor(len(data) * 0.7)
- v = math.floor(len(data) * 0.2)
- train = data[:t, :]
- val = data[t:(t + v), :]
- test = data[(t + v):, :]
- np.savetxt(path[i] + '_train.txt', train, fmt='%.6f')
- np.savetxt(path[i] + '_val.txt', val, fmt='%.6f')
- np.savetxt(path[i] + '_test.txt', test, fmt='%.6f')
- train = np.loadtxt(path[0] + '_train.txt')
- val = np.loadtxt(path[0] + '_val.txt')
- test = np.loadtxt(path[0] + '_test.txt')
- for i in range(1, sort_num): # 需要修改为与分类个数相同,最后一个数与文件夹
- train1 = np.loadtxt(path[i] + '_train.txt')
- val1 = np.loadtxt(path[i] + '_val.txt')
- test1 = np.loadtxt(path[i] + '_test.txt')
- train = random_rowl(np.r_[train, train1])
- val = random_rowl(np.r_[val, val1])
- test = random_rowl(np.r_[test, test1])
- np.savetxt('train.txt', train, fmt='%.6f') # 划分训练集
- np.savetxt('val.txt', val, fmt='%.6f') # 划分验证集
- np.savetxt('test.txt', test, fmt='%.6f') # 划分测试集
- # 从train.txt、val.txt、test.txt中分别获取数据和标签
- class_num = sort_num # 分类的类数
- train_labels = np.loadtxt('./train.txt')[:, -class_num:]
- test_labels = np.loadtxt('./test.txt')[:, -class_num:]
- val_labels = np.loadtxt('./val.txt')[:, -class_num:]
- train_data = np.loadtxt('./train.txt')[:, :-class_num]
- test_data = np.loadtxt('./test.txt')[:, :-class_num]
- val_data = np.loadtxt('./val.txt')[:, :-class_num]
- np.savetxt('train_data.txt', train_data, fmt='%.6f')
- np.savetxt('val_data.txt', val_data, fmt='%.6f')
- np.savetxt('test_data.txt', test_data, fmt='%.6f')
- np.savetxt('train_labels.txt', train_labels, fmt='%d')
- np.savetxt('val_labels.txt', val_labels, fmt='%d')
- np.savetxt('test_labels.txt', test_labels, fmt='%d')
- import numpy as np
- import torch.optim as optim
- import torch.nn as nn
- import torch
- from torch.utils.data import DataLoader, Dataset
- class Bottlrneck(torch.nn.Module):
- def __init__(self, In_channel, Med_channel, Out_channel, downsample=False):
- super(Bottlrneck, self).__init__()
- self.stride = 1
- if downsample == True:
- self.stride = 2
- self.layer = torch.nn.Sequential(
- torch.nn.Conv1d(In_channel, Med_channel, 1, self.stride),
- torch.nn.BatchNorm1d(Med_channel),
- torch.nn.ReLU(),
- torch.nn.Conv1d(Med_channel, Med_channel, 3, padding=1),
- torch.nn.BatchNorm1d(Med_channel),
- torch.nn.ReLU(),
- torch.nn.Conv1d(Med_channel, Out_channel, 1),
- torch.nn.BatchNorm1d(Out_channel),
- torch.nn.ReLU(),
- )
- if In_channel != Out_channel:
- self.res_layer = torch.nn.Conv1d(In_channel, Out_channel, 1, self.stride)
- else:
- self.res_layer = None
- def forward(self, x):
- if self.res_layer is not None:
- residual = self.res_layer(x)
- else:
- residual = x
- return self.layer(x) + residual
- class ResNet(torch.nn.Module):
- def __init__(self, in_channels=2, classes=6):
- super(ResNet, self).__init__()
- self.features = torch.nn.Sequential(
- torch.nn.Conv1d(in_channels, 64, kernel_size=7, stride=2, padding=3),
- torch.nn.MaxPool1d(3, 2, 1),
- Bottlrneck(64, 64, 256, False),
- Bottlrneck(256, 64, 256, False),
- Bottlrneck(256, 64, 256, False),
- #
- Bottlrneck(256, 128, 512, True),
- Bottlrneck(512, 128, 512, False),
- Bottlrneck(512, 128, 512, False),
- Bottlrneck(512, 128, 512, False),
- #
- Bottlrneck(512, 256, 1024, True),
- Bottlrneck(1024, 256, 1024, False),
- Bottlrneck(1024, 256, 1024, False),
- Bottlrneck(1024, 256, 1024, False),
- Bottlrneck(1024, 256, 1024, False),
- Bottlrneck(1024, 256, 1024, False),
- #
- Bottlrneck(1024, 512, 2048, True),
- Bottlrneck(2048, 512, 2048, False),
- Bottlrneck(2048, 512, 2048, False),
- torch.nn.AdaptiveAvgPool1d(1)
- )
- self.classifer = torch.nn.Sequential(
- torch.nn.Linear(2048, classes)
- )
- def forward(self, x):
- x = torch.Tensor.view(x, (-1, 2, 511))
- x = self.features(x)
- x = x.view(-1, 2048)
- x = self.classifer(x)
- return x
- class MyDataset(Dataset):
- def __init__(self, raman_dir, label_file):
- self.raman_dir = np.loadtxt(raman_dir)
- self.label_file = np.loadtxt(label_file)
- self.raman_data = []
- self.label_list = []
- for index in self.raman_dir:
- self.raman_data.append(index)
- for index in self.label_file:
- self.label_list.append(index)
- def __getitem__(self, idx):
- raman = torch.Tensor(self.raman_data[idx])
- label = torch.Tensor(self.label_list[idx])
- label = np.argmax(label)
- return raman, label
- def __len__(self):
- # 获取数据集的长度
- return len(self.label_list)
- if __name__ == '__main__':
- # 准备数据集
- train_data = './train_data.txt'
- val_data = './val_data.txt'
- # 标签
- train_label = './train_labels.txt'
- val_label = './val_labels.txt'
- # 创建数据加载器
- batch_size = 128
- # 训练集输入
- train_dataset = MyDataset(train_data, train_label)
- train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
- # 测试集输入
- test_dataset = MyDataset(test_data, test_label)
- test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True)
- # 验证集输入
- val_dataset = MyDataset(val_data, val_label)
- val_dataloader = DataLoader(val_dataset, batch_size=batch_size, shuffle=True)
- train_data_size = len(train_dataset)
- val_data_size = len(val_dataset)
- # 创建网络模型
- class_num = 6
- test_data_length = 600
- ResNetOutput = ResNet(in_channels=2, classes=class_num).cuda()
- # 定义损失函数
- loss_fn = nn.CrossEntropyLoss().cuda()
- # 定义优化器
- learning_rate = 0.00001
- optimizer = optim.Adam(ResNetOutput.parameters(), lr=learning_rate)
- # 记录验证的次数
- total_train_step = 0
- total_val_step = 0
- # 训练
- epoch = 100
- acc_list = np.zeros(epoch)
- print("{0:-^27}".format('Train_Model'))
- for i in range(epoch):
- print("----------epoch={}----------".format(i + 1))
- ResNetOutput.train()
- for data in train_dataloader: # data 是batch大小
- raman_train_data, t_labels = data
- raman_train_data = raman_train_data.cuda()
- t_labels = t_labels.cuda()
- output = ResNetOutput(raman_train_data)
- loss = loss_fn(output, t_labels)
- # 优化器优化模型
- optimizer.zero_grad() # 梯度清零
- loss.backward() # 反向传播
- optimizer.step() # 优化更新参数
- total_train_step = total_train_step + 1
- print("train_times:{},Loss:{}".format(total_train_step, loss.item()))
- # 测试步骤开始
- ResNetOutput.eval()
- total_val_loss = 0
- total_accuracy = 0
- with torch.no_grad(): # 测试的时候不需要对梯度进行调整,所以梯度设置不调整
- for data in val_dataloader:
- raman_val_data, v_labels = data
- raman_val_data = raman_val_data.cuda()
- v_labels = v_labels.cuda()
- outputs = ResNetOutput(raman_val_data)
- loss = loss_fn(outputs, v_labels)
- total_val_loss = total_val_loss + loss.item() # 计算损失值的和
- accuracy = 0
- for j in v_labels:
- if outputs.argmax(1)[j] == v_labels[j]:
- accuracy = accuracy + 1
- # accuracy = (outputs.argmax(1) == v_labels).sum() # 计算一个数据的精确度
- total_accuracy = total_accuracy + accuracy
- val_acc = float(total_accuracy / 1440) * 100
- acc_list[i] = val_acc
- print('the_classification_is_correct :', total_accuracy) # 正确分类的个数
- print("val_Loss:{}".format(total_val_loss))
- print("val_acc:{}".format(float(total_accuracy / 1440) * 100), '%')
- total_val_step += 1
- torch.save(ResNetOutput, "ResNet_{}.pth".format(i))
- # torch.save(ResNetOutput.state_dict(), "ResNet_{}.pth".format(i))
- print("{0:-^24}".format('Model_Save'), '\n')
- print('val_max=', max(acc_list), '%')
x = torch.Tensor.view(x, (-1, 2, 511)) # 511的位置要根据自己的数据集形状调整
- import numpy as np
- import torch.optim as optim
- import torch.nn as nn
- import torch
- from torch.utils.data import DataLoader, Dataset
- class DenseLayer(torch.nn.Module):
- def __init__(self, in_channels, middle_channels=128, out_channels=32):
- super(DenseLayer, self).__init__()
- self.layer = torch.nn.Sequential(
- torch.nn.BatchNorm1d(in_channels),
- torch.nn.ReLU(inplace=True),
- torch.nn.Conv1d(in_channels, middle_channels, 1),
- torch.nn.BatchNorm1d(middle_channels),
- torch.nn.ReLU(inplace=True),
- torch.nn.Conv1d(middle_channels, out_channels, 3, padding=1)
- )
- def forward(self, x):
- return torch.cat([x, self.layer(x)], dim=1)
- class DenseBlock(torch.nn.Sequential):
- def __init__(self, layer_num, growth_rate, in_channels, middele_channels=128):
- super(DenseBlock, self).__init__()
- for i in range(layer_num):
- layer = DenseLayer(in_channels + i * growth_rate, middele_channels, growth_rate)
- self.add_module('denselayer%d' % (i), layer)
- class Transition(torch.nn.Sequential):
- def __init__(self, channels):
- super(Transition, self).__init__()
- self.add_module('norm', torch.nn.BatchNorm1d(channels))
- self.add_module('relu', torch.nn.ReLU(inplace=True))
- self.add_module('conv', torch.nn.Conv1d(channels, channels // 2, 3, padding=1))
- self.add_module('Avgpool', torch.nn.AvgPool1d(2))
- class DenseNet(torch.nn.Module):
- def __init__(self, layer_num=(6, 12, 24, 16), growth_rate=32, init_features=64, in_channels=1, middele_channels=128,
- classes=6):
- super(DenseNet, self).__init__()
- self.feature_channel_num = init_features
- self.conv = torch.nn.Conv1d(in_channels, self.feature_channel_num, 7, 2, 3)
- self.norm = torch.nn.BatchNorm1d(self.feature_channel_num)
- self.relu = torch.nn.ReLU()
- self.maxpool = torch.nn.MaxPool1d(3, 2, 1)
- # self.attention = SpatialAttention(64) # 空间注意力机制
- self.DenseBlock1 = DenseBlock(layer_num[0], growth_rate, self.feature_channel_num, middele_channels)
- self.feature_channel_num = self.feature_channel_num + layer_num[0] * growth_rate
- self.Transition1 = Transition(self.feature_channel_num)
- self.DenseBlock2 = DenseBlock(layer_num[1], growth_rate, self.feature_channel_num // 2, middele_channels)
- self.feature_channel_num = self.feature_channel_num // 2 + layer_num[1] * growth_rate
- self.Transition2 = Transition(self.feature_channel_num)
- self.DenseBlock3 = DenseBlock(layer_num[2], growth_rate, self.feature_channel_num // 2, middele_channels)
- self.feature_channel_num = self.feature_channel_num // 2 + layer_num[2] * growth_rate
- self.Transition3 = Transition(self.feature_channel_num)
- self.DenseBlock4 = DenseBlock(layer_num[3], growth_rate, self.feature_channel_num // 2, middele_channels)
- self.feature_channel_num = self.feature_channel_num // 2 + layer_num[3] * growth_rate
- self.avgpool = torch.nn.AdaptiveAvgPool1d(1)
- self.classifer = torch.nn.Sequential(
- torch.nn.Linear(self.feature_channel_num, self.feature_channel_num // 2),
- torch.nn.ReLU(),
- torch.nn.Dropout(0.5),
- torch.nn.Linear(self.feature_channel_num // 2, classes),
- )
- def forward(self, x):
- x = torch.Tensor.view(x, (-1, 1, 1022)) # 第一个参数是batch_size
- x = x.resize_as_(x.to(torch.float32))
- x = self.conv(x)
- x = self.norm(x) # 正则化
- x = self.relu(x)
- x = self.maxpool(x)
- x = self.DenseBlock1(x)
- x = self.Transition1(x)
- x = self.DenseBlock2(x)
- x = self.Transition2(x)
- x = self.DenseBlock3(x)
- x = self.Transition3(x)
- x = self.DenseBlock4(x)
- x = self.avgpool(x)
- x = x.view(-1, self.feature_channel_num)
- x = self.classifer(x)
- return x
- class MyDataset(Dataset): # 利用Pytorch的内置函数加载数据
- def __init__(self, raman_dir, label_file):
- self.raman_dir = np.loadtxt(raman_dir)
- self.label_file = np.loadtxt(label_file)
- self.raman_data = []
- self.label_list = []
- for index in self.raman_dir:
- self.raman_data.append(index)
- for index in self.label_file:
- self.label_list.append(index)
- def __getitem__(self, idx):
- raman = torch.Tensor(self.raman_data[idx])
- label = torch.Tensor(self.label_list[idx])
- label = np.argmax(label)
- return raman, label
- def __len__(self):
- # 获取数据集的长度
- return len(self.label_list)
- if __name__ == '__main__':
- # 准备数据集
- train_data = './train_data.txt'
- val_data = './val_data.txt'
- # 标签
- train_label = './train_labels.txt'
- val_label = './val_labels.txt'
- # 数据长度
- test_data_length = 600
- val_data_length = 1440
- # 分类的类别
- class_num = 6 # 注意调整58行的函数中的class(分类类别)
- # 学习率
- learning_rate = 0.00001
- # 批处理大小
- batch_size = 128
- # 数据加载器
- train_dataset = MyDataset(train_data, train_label)
- train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
- test_dataset = MyDataset(test_data, test_label)
- test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True)
- val_dataset = MyDataset(val_data, val_label)
- val_dataloader = DataLoader(val_dataset, batch_size=batch_size, shuffle=True)
- train_data_size = len(train_dataset)
- val_data_size = len(val_dataset)
- # 创建网络模型
- DenseNetOutput = DenseNet().cuda() # .Cuda()数据是指放到GPU上
- # 定义损失函数
- loss_fn = nn.CrossEntropyLoss().cuda() # 交叉熵函数
- # 定义优化器
- optimizer = optim.Adam(DenseNetOutput.parameters(), lr=learning_rate)
- # 记录验证的次数
- total_train_step = 0
- total_val_step = 0
- writer = SummaryWriter("logs_train") # 记录训练的权重
- # 训练
- epoch = 30 #
- acc_list = np.zeros(epoch)
- print("{0:-^27}".format('Train_Model'))
- for i in range(epoch):
- print("----------epoch={}----------".format(i + 1))
- DenseNetOutput.train()
- for data in train_dataloader: # data 是batch大小
- raman_train_data, t_labels = data
- raman_train_data = raman_train_data.cuda()
- t_labels = t_labels.cuda()
- output = DenseNetOutput(raman_train_data)
- loss = loss_fn(output, t_labels)
- # 优化器优化模型
- optimizer.zero_grad() # 梯度清零
- loss.backward() # 反向传播
- optimizer.step() # 优化更新参数
- total_train_step = total_train_step + 1
- print("train_times:{},Loss:{}".format(total_train_step, loss.item()))
- # 测试步骤开始
- DenseNetOutput.eval()
- total_val_loss = 0
- total_accuracy = 0
- with torch.no_grad(): # 测试的时候不需要对梯度进行调整,所以梯度设置不调整
- for data in val_dataloader:
- raman_val_data, v_labels = data
- raman_val_data = raman_val_data.cuda()
- v_labels = v_labels.cuda()
- outputs = DenseNetOutput(raman_val_data)
- loss = loss_fn(outputs, v_labels)
- total_val_loss = total_val_loss + loss.item() # 计算损失值的和
- accuracy = 0
- for j in v_labels: # 计算精确度的和
- if outputs.argmax(1)[j] == v_labels[j]:
- accuracy = accuracy + 1
- # accuracy = (outputs.argmax(1) == v_labels).sum() # 计算一个数据的精确度
- total_accuracy = total_accuracy + accuracy
- val_acc = float(total_accuracy / val_data_size) * 100
- acc_list[i] = val_acc # 记录验证集的正确率
- print('the_classification_is_correct :', total_accuracy, val_data_length)
- print("val_Loss:{}".format(total_val_loss))
- print("val_acc:{}".format(val_acc), '%')
- total_val_step += 1
- torch.save(DenseNetOutput, "DenseNet_{}.pth".format(i + 1))
- print("{0:-^24}".format('Model_Saved'), '\n')
- print('val_max=', max(acc_list), '%') # 验证集的最高正确率
x = torch.Tensor.view(x, (-1, 1, 1022)) # 1022的位置要根据自己的数据集大小取调整
- import torch
- import numpy as np
- from ResNet import Bottlrneck, ResNet
- # from DenseNet_SpatialAttention import DenseNet, DenseLayer, DenseBlock
- device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu') # 判断是否有GPU
- test_data = np.loadtxt('./test_data.txt') # 测试集的路径
- model = ResNet().to(device)
- model.load_state_dict(torch.load('ResNet_100.pth')) # 加载模型
- # model = DenseNet().to(device)
- # model.load_state_dict(torch.load('DenseNet_100.pth')) # 加载模型
- test_length = test_data.shape[0]
- for i in range(test_length):
- td = torch.from_numpy(test_data[i]).float().to(device)
- Predict_output = model(td).to(device)
- _, predicted = Predict_output.max(1)
- pred_type = predicted.item()
- print('pred_type:', pred_type)
