赞
踩
本文主要讲解:SE-ResNet34对结构性数据进行多分类
主要思路:
需要数据请私聊
以下是简单数据截图,最右边那列为标签
SE:Squeeze-and-Excitation的缩写,特征压缩与激发的意思。
可以把SENet看成是channel-wise的attention,可以嵌入到含有skip-connections的模块中,ResNet,VGG,Inception等等。
在resnet中加入SE:
下图是SE-ResNet, 可以看到SE module被apply到了residual branch上。我们首先将特征维度降低到输入的1/r,然后经过ReLu激活后再通过一个Fully Connected 层升回到原来的维度。
这样做比直接用一个Fully Connected层的好处在于:
1)具有更多的非线性,可以更好地拟合通道间复杂的相关性;
2)极大地减少了参数量和计算量。然后通过一个Sigmoid的门获得01之间归一化的权重,最后通过一个Scale的操作来将归一化后的权重加权到每个通道的特征上。在Addition前对分支上Residual的特征进行了特征重标定。如果对Addition后主支上的特征进行重标定,由于在主干上存在01的scale操作,在网络较深BP优化时就会在靠近输入层容易出现梯度消散的情况,导致模型难以优化。
————————————————
版权声明:本文为CSDN博主「AI剑客」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:原文链接
代码输出如下:
主运行程序入口
import matplotlib.pyplot as plt
from PIL import ImageFile
from imblearn.over_sampling import SMOTE
from sklearn import preprocessing
from sklearn.metrics import confusion_matrix, accuracy_score
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from tensorflow.python.keras.utils.np_utils import to_categorical
from torchvision.models import ResNet
ImageFile.LOAD_TRUNCATED_IMAGES = True
from torch import nn
import os
import time
import csv
os.environ['CUDA_LAUNCH_BLOCKING'] = "0"
import torch.optim
import torch.utils.data
import warnings
warnings.filterwarnings("ignore")
import pandas as pd
from tqdm import tqdm
import numpy as np
torch.backends.cudnn.enabled = True
torch.backends.cudnn.benchmark = True
def checkpoint(model, best_loss, best_epoch, LR):
"""
Saves checkpoint of torchvision model during training.
Args:
model: torchvision model to be saved
best_loss: best val loss achieved so far in training
epoch: current epoch of training
LR: current learning rate in training
Returns:
None
"""
state = {
'model': model,
'state_dict': model.state_dict(),
'best_loss': best_loss,
'best_epoch': best_epoch,
'LR': LR
}
torch.save(state, 'results/checkpoint')
def make_pred_multilabel(model, test_df, device):
"""
Gives predictions for test fold and calculates mse using previously trained model
Args:
model: seresnet from torchvision previously fine tuned to training data
test_df : dataframe csv file
Returns:
pred_df: dataframe containing individual predictions and ground truth for each test data
"""
size = len(test_df)
print("Test _df size :", size)
model = model.to(device)
inputs = test_df[:, :6]
labels = test_df[:, 6]
y_test = [int(i) for i in labels.tolist()]
scaler_x = preprocessing.MinMaxScaler(feature_range=(0, 1))
inputs = scaler_x.fit_transform(inputs)
inputs = inputs.astype(float)
labels = to_categorical(labels)
# X_train = torch.FloatTensor(X_train.reshape((X_train.shape[0], 3, 2, 1)))
inputs = torch.FloatTensor(inputs.reshape((inputs.shape[0], 3, 1, 2)))
labels = torch.FloatTensor(labels)
# inputs = Variable(torch.unsqueeze(inputs, dim=3).float(), requires_grad=False)
inputs = inputs.to(device)
labels = labels.to(device)
criterion = nn.MSELoss()
batch_size = 64
length = len(inputs)
model.eval()
with torch.no_grad():
for j in range(0, length, batch_size):
X_train_batch = inputs[j:j + batch_size]
y_train_batch = labels[j:j + batch_size]
X_train_batch = X_train_batch.to(device)
y_train_batch = y_train_batch.to(device)
outputs = model(X_train_batch)
outputs = torch.sigmoid(outputs)
loss = criterion(outputs, y_train_batch)
print("loss:{:.3f}".format(loss))
y_pred = outputs.cpu().data.numpy()
y_pred = np.argmax(y_pred, axis=1)
acc = accuracy_score(y_test, y_pred)
print('acc', acc)
C = confusion_matrix(y_test, y_pred)
plt.matshow(C, cmap=plt.cm.Reds) # 根据最下面的图按自己需求更改颜色 , labels=labels
plt.colorbar()
for i in range(len(C)):
for j in range(len(C)):
plt.annotate(C[j, i], xy=(i, j), horizontalalignment='center', verticalalignment='center')
plt.tick_params(labelsize=15)
plt.ylabel('True label')
plt.xlabel('Predicted label')
plt.show()
class SELayer(nn.Module):
def __init__(self, channel, reduction=16):
super(SELayer, self).__init__()
self.avg_pool = nn.AdaptiveAvgPool2d(1)
self.fc = nn.Sequential(
nn.Linear(channel, channel // reduction, bias=False),
nn.ReLU(inplace=True),
nn.Linear(channel // reduction, channel, bias=False),
nn.Sigmoid()
)
def forward(self, x):
b, c, _, _ = x.size()
y = self.avg_pool(x).view(b, c)
y = self.fc(y).view(b, c, 1, 1)
return x * y.expand_as(x)
def conv3x3(in_planes, out_planes, stride=1):
return nn.Conv2d(in_planes, out_planes, kernel_size=2, stride=stride, padding=1, bias=False)
class SEBasicBlock(nn.Module):
expansion = 1
def __init__(self, inplanes, planes, stride=1, downsample=None, groups=1,
base_width=64, dilation=1, norm_layer=None,
*, reduction=16):
super(SEBasicBlock, self).__init__()
self.conv1 = conv3x3(inplanes, planes, stride)
self.bn1 = nn.BatchNorm2d(planes)
self.relu = nn.ReLU(inplace=True)
self.conv2 = conv3x3(planes, planes, 1)
self.bn2 = nn.BatchNorm2d(planes)
self.se = SELayer(planes, reduction)
self.downsample = downsample
self.stride = stride
def forward(self, x):
# residual = x
out = self.conv1(x)
out = self.bn1(out)
out = self.relu(out)
out = self.conv2(out)
out = self.bn2(out)
out = self.se(out)
# if self.downsample is not None:
# residual = self.downsample(x)
# out += residual
out = self.relu(out)
return out
def se_resnet34(num_classes):
"""Constructs a ResNet-34 model.
Args:
pretrained (bool): If True, returns a model pre-trained on ImageNet
"""
model = ResNet(SEBasicBlock, [3, 4, 6, 3], num_classes=num_classes)
model.avgpool = nn.AdaptiveAvgPool2d(1)
return model
class Se_ResNet34(nn.Module):
def __init__(self, N_LABELS):
super(Se_ResNet34, self).__init__()
self.se_resnet34 = se_resnet34(N_LABELS)
num_ftrs = self.se_resnet34.fc.in_features
self.se_resnet34.fc = nn.Sequential(nn.Linear(num_ftrs, N_LABELS), nn.Sigmoid())
# print(self.se_resnet34)
def forward(self, x):
x = self.se_resnet34(x)
return x
# 剪辑反向传播期间计算的梯度,以避免梯度爆炸。
def clip_gradient(optimizer, grad_clip):
for group in optimizer.param_groups:
for param in group['params']:
if param.grad is not None:
param.grad.data.clamp_(-grad_clip, grad_clip)
def BatchIterator(model, phase, Data_loader, criterion, optimizer, device):
# -------------------- Initial paprameterd
global loss, outputs, y_train_batch
grad_clip = 0.5 # clip gradients at an absolute value of
running_loss = 0.0
X_train = Data_loader[:, :6]
scaler_x = preprocessing.MinMaxScaler(feature_range=(0, 1))
X_train = scaler_x.fit_transform(X_train)
y_train = Data_loader[:, 6]
y_train = to_categorical(y_train)
X_train = X_train.astype(float)
X_train = torch.FloatTensor(X_train.reshape((X_train.shape[0], 3, 1, 2)))
y_train = torch.FloatTensor(y_train)
# X_train = Variable(torch.unsqueeze(X_train, dim=3).float(), requires_grad=False)
batch_size = 64
length = len(X_train)
X_train = X_train.to(device)
y_train = y_train.to(device)
if phase == "train":
optimizer.zero_grad()
model.train()
for j in range(0, length, batch_size):
X_train_batch = X_train[j:j + batch_size]
y_train_batch = y_train[j:j + batch_size]
X_train_batch = X_train_batch.to(device)
y_train_batch = y_train_batch.to(device)
outputs = model(X_train_batch)
outputs = torch.sigmoid(outputs)
# backward
loss = criterion(outputs, y_train_batch)
loss.backward()
clip_gradient(optimizer, grad_clip)
# update weights
optimizer.step()
running_loss += loss * batch_size
# print("loss:{:.3f}".format(loss))
return running_loss
def ModelTrain(train_df, val_df,
device, LR):
# Training parameters
start_epoch = 0
num_epochs = 88 # number of epochs to train for (if early stopping is not triggered)
random_seed = 33 # random.randint(0,100)
np.random.seed(random_seed)
torch.manual_seed(random_seed)
model = Se_ResNet34(7).cuda()
# print(model)
model = model.to(device)
criterion = nn.BCELoss().to(device)
epoch_losses_train = []
epoch_losses_val = []
since = time.time()
best_loss = 999999
best_epoch = -1
# --------------------------Start of epoch loop
for epoch in tqdm(range(start_epoch, num_epochs)):
# print('Epoch {}/{}'.format(epoch, num_epochs))
print('-' * 10)
phase = 'train'
optimizer = torch.optim.Adam(params=filter(lambda p: p.requires_grad, model.parameters()), lr=LR) # 固定部分参数
running_loss = BatchIterator(model=model, phase=phase, Data_loader=train_df,
criterion=criterion, optimizer=optimizer, device=device)
epoch_loss_train = running_loss / len(train_df)
epoch_losses_train.append(epoch_loss_train.item())
# print("Train_losses:", epoch_losses_train)
phase = 'val'
optimizer = torch.optim.Adam(params=filter(lambda p: p.requires_grad, model.parameters()), lr=LR)
running_loss = BatchIterator(model=model, phase=phase, Data_loader=val_df,
criterion=criterion, optimizer=optimizer, device=device)
epoch_loss_val = running_loss / len(val_df)
epoch_losses_val.append(epoch_loss_val.item())
# print("Validation_losses:", epoch_losses_val)
timestampTime = time.strftime("%H%M%S")
timestampDate = time.strftime("%d%m%Y")
timestampEND = timestampDate + '-' + timestampTime
# checkpoint model if has best val loss yet
if epoch_loss_val < best_loss:
best_loss = epoch_loss_val
best_epoch = epoch
checkpoint(model, best_loss, best_epoch, LR)
print('Epoch [' + str(epoch + 1) + '] [save] [' + timestampEND + '] loss= ' + str(epoch_loss_val))
else:
print('Epoch [' + str(epoch + 1) + '] [----] [' + timestampEND + '] loss= ' + str(epoch_loss_val))
# log training and validation loss over each epoch
with open("results/log_train", 'a') as logfile:
logwriter = csv.writer(logfile, delimiter=',')
if (epoch == 1):
logwriter.writerow(["epoch", "train_loss", "val_loss", "Seed", "LR"])
logwriter.writerow([epoch, epoch_loss_train, epoch_loss_val, random_seed, LR])
# -------------------------- End of phase
# break if no val loss improvement in 3 epochs
if ((epoch - best_epoch) >= 3):
if epoch_loss_val > best_loss:
print("decay loss from " + str(LR) + " to " + str(LR / 2) + " as not seeing improvement in val loss")
LR = LR / 2
print("created new optimizer with LR " + str(LR))
if ((epoch - best_epoch) >= 10):
print("no improvement in 10 epochs, break")
break
# old_epoch = epoch
# ------------------------- End of epoch loop
time_elapsed = time.time() - since
print('Training complete in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))
# 画损失下降图
plt.plot(epoch_loss_train.cpu().data.numpy())
plt.plot(epoch_losses_val)
plt.ylim([0.0, 0.9])
plt.xlabel("epochs")
plt.ylabel("loss")
plt.legend(['epoch_loss_train', 'epoch_losses_val'], loc='best')
plt.show()
checkpoint_best = torch.load('results/checkpoint')
model = checkpoint_best['model']
best_epoch = checkpoint_best['best_epoch']
print(best_epoch)
return model, best_epoch
def change_c2h6(x):
try:
x = float(x)
return x
except:
if x == '-':
return 0
else:
print(x)
def main():
# train_df_path = "data/DGA数据.xlsx"
train_df_path = "data/数据1(1).xlsx"
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
data = pd.read_excel(train_df_path)
# data = pd.read_excel(train_df_path)
data.fillna(0, inplace=True)
data = data[~data.isin([np.nan, np.inf, -np.inf]).any(1)]
data.replace(np.inf, 0, inplace=True)
data.replace(-np.inf, 0, inplace=True)
le = LabelEncoder()
data['故障类型'] = le.fit_transform(data['故障类型'])
X_train = data.iloc[:, :6]
y_train = data.iloc[:, 6]
oversample = SMOTE()
X_train, y_train = oversample.fit_resample(X_train, y_train)
data = pd.concat([X_train, y_train], axis=1)
data = data.values
test_size = 64 / len(data)
train_df, test_df = train_test_split(data, test_size=test_size, random_state=0, shuffle=True)
val_df = test_df
train_df_size = len(train_df)
print("Train_df size", train_df_size)
test_df_size = len(test_df)
print("test_df size", test_df_size)
val_df_size = len(val_df)
print("val_df size", val_df_size)
LR = 0.5e-4
# acc 0.484375 acc 0.5625 LR = 0.1e-4 epochs =100
model, best_epoch = ModelTrain(train_df, val_df, device, LR)
make_pred_multilabel(model, test_df, device)
if __name__ == "__main__":
main()
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。