- !pip install efficientnet_pytorch
- import warnings
- warnings.filterwarnings('ignore')
- import pandas as pd
- import matplotlib.pyplot as plt
- import seaborn as sns
- import random
- import torch
- import torch.nn as nn
- import torch.optim as optim
- !pip install vision-transformer-pytorch
- from torchvision import transforms, models
- from sklearn.metrics import classification_report
- from sklearn.utils import shuffle
- from sklearn.metrics import confusion_matrix
- from torch.utils.tensorboard import SummaryWriter
- from torch.optim.lr_scheduler import ReduceLROnPlateau
- import numpy as np
- from tabulate import tabulate
- import os
- import glob
- import json
- import shutil
- from PIL import Image, ImageDraw
- import torchvision.transforms as transforms
- from torchvision.datasets import ImageFolder
- from torch.utils.data import DataLoader
- from efficientnet_pytorch import EfficientNet
- from torch import nn
- from vision_transformer_pytorch import VisionTransformer
- import torch.nn.functional as F
- import torchvision.models as models

- #定义配置文件
- class Config:
- def __init__(self):
- #设置输入图像的大小
- self.image_width = 128
- self.image_height = 128
- self.epoch = 1
- self.seed = 42
- self.batch_size = 16 #batchsize的大小会影响最后的输出的维度,如batch_size=32,最后输出为32*1是1维向量
- self.dataset_path = '/kaggle/input/.../'
- # self.checkpoint_filepath = 'model_checkpoint.h5'
- # self.logs_path = '/kaggle/working/logs'
- #实例化配置函数
- config = Config()
- print("Checking Epoch Configuration:", config.epoch)

- dataset = {"image_path":[],"img_status":[],"where":[]}
- for where in os.listdir(config.dataset_path):
- for status in os.listdir(config.dataset_path+"/"+where):
- for image in glob.glob(os.path.join(config.dataset_path, where, status, "*.jpg")):
- dataset["image_path"].append(image)
- dataset["img_status"].append(status)
- dataset["where"].append(where)
- dataset = pd.DataFrame(dataset)
- #将数据集进行打乱,并对其进行升序排序
- dataset = shuffle(dataset)
- dataset = dataset.reset_index(drop=True)
- # 对训练集-数据集进行数据增强
- # 12/05 定义一些增强操作
- train_transform = transforms.Compose([
- transforms.RandomHorizontalFlip(), # 随机水平翻转
- transforms.RandomRotation(degrees=15), # 随机旋转
- transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1), # 颜色扭曲
- #transforms.RandomResizedCrop(size=256, scale=(0.8, 1.0)), # 随机裁剪和缩放
- transforms.ToTensor(), # 转换为张量
- transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) # 归一化
- ])
- # Data Transformation for Validation and Testing
- val_test_transform = transforms.Compose([
- transforms.ToTensor(),
- transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
- ])
- # Data Loaders
- train_dataset = ImageFolder(os.path.join(config.dataset_path, 'train'), transform=train_transform)
- train_loader = DataLoader(train_dataset, batch_size=config.batch_size, shuffle=True)
- valid_dataset = ImageFolder(os.path.join(config.dataset_path, 'valid'), transform=val_test_transform)
- valid_loader = DataLoader(valid_dataset, batch_size=config.batch_size, shuffle=False)
- test_dataset = ImageFolder(os.path.join(config.dataset_path, 'test'), transform=val_test_transform)
- test_loader = DataLoader(test_dataset, batch_size=5, shuffle=False)

- class SelfAttention(nn.Module):
- def __init__(self, in_channels):
- super(SelfAttention, self).__init__()
- self.theta = nn.Conv2d(in_channels, 112, kernel_size=1, stride=1)
- self.phi = nn.Conv2d(in_channels, 112, kernel_size=1, stride=1)
- self.g = nn.Conv2d(in_channels, 64, kernel_size=1, stride=1)
- self.concat = nn.Conv2d(64, in_channels, kernel_size=1, stride=1)
- self.softmax = nn.Softmax(dim=-1)
- def forward(self, x):
- theta = self.theta(x)
- phi = self.phi(x)
- g = self.g(x)
- theta = theta.view(x.size(0), -1, x.size(2) * x.size(3))
- phi = phi.view(x.size(0), -1, x.size(2) * x.size(3))
- g = g.view(x.size(0), -1, x.size(2) * x.size(3))
- # print("Theta shape:", theta.shape)
- # print("Phi shape:", phi.shape)
- # print("G shape:", g.shape)
- theta = theta.permute(0, 2, 1)
- attn = torch.matmul(theta, phi)
- attn = self.softmax(attn)
- g = g.permute(0, 2, 1)
- attn_g = torch.matmul(attn, g)
- attn_g = attn_g.permute(0, 2, 1)
- attn_g = attn_g.view(x.size(0), g.size(1), x.size(2), x.size(3))
- attn_g = self.concat(attn_g)
- return attn_g + x
- # Continue with the rest of your attention mechanism...

- class EfficientNetWithAttention(nn.Module):
- def __init__(self, num_classes, pretrained=True, attention_channels=1792):
- super(EfficientNetWithAttention, self).__init__()
- # Load the pre-trained EfficientNet as a feature extractor
- efficientnet = models.efficientnet_b4(pretrained=pretrained)
- self.features = efficientnet.features
- # Add custom head
- self.attention = SelfAttention(attention_channels)
- self.avg_pool = nn.AdaptiveAvgPool2d(1)
- self.fc = nn.Linear(attention_channels, num_classes)
- self.val_loss = []
- self.val_accuracy = []
- self.test_loss = []
- self.test_accuracy = []
- self.train_loss = []
- self.train_accuracy = []
- def forward(self, x):
- # Forward pass through EfficientNet feature extractor
- x = self.features(x)
- # Apply self-attention module
- x = self.attention(x)
- # Global average pooling
- x = self.avg_pool(x)
- x = x.view(x.size(0), -1)
- # Fully connected layer for classification
- x = self.fc(x)
- return x
- def print_model_summary(self):
- print(self.model)
- print("Model Summary:")
- total_params = sum(p.numel() for p in self.parameters())
- print(f"Total Parameters: {total_params}")
- trainable_params = sum(p.numel() for p in self.parameters() if p.requires_grad)
- print(f"Trainable Parameters: {trainable_params}")
- def plot_metrics_graph(self):
- epochs = range(1, len(self.train_loss) + 1)
- plt.figure(figsize=(12, 8))
- plt.subplot(2, 1, 1)
- plt.plot(epochs, self.train_loss, label='Train Loss', linewidth=2, color='blue')
- plt.plot(epochs, self.val_loss, label='Validation Loss', linewidth=2, color='orange')
- plt.plot(epochs, self.test_loss, label='Test Loss', linewidth=2, color='green')
- plt.xlabel('Epochs')
- plt.ylabel('Loss')
- plt.title('Training ,Test and Validation Loss')
- plt.legend()
- plt.subplot(2, 1, 2)
- plt.plot(epochs, self.train_accuracy, label='Train Accuracy', linewidth=2, color='green')
- plt.plot(epochs, self.val_accuracy, label='Validation Accuracy', linewidth=2, color='red')
- plt.xlabel('Epochs')
- plt.ylabel('Accuracy')
- plt.title('Training and Validation Accuracy')
- plt.legend()
- plt.tight_layout()
- plt.show()
- def plot_confusion_matrix(self, y_true, y_pred):
- cm = confusion_matrix(y_true, y_pred)
- plt.figure(figsize=(8, 6))
- sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", cbar=False)
- plt.xlabel("Predicted Labels")
- plt.ylabel("True Labels")
- plt.title("Confusion Matrix")
- plt.show()
- def train_model(self, train_loader, valid_loader, num_epochs, device):
- criterion = nn.BCEWithLogitsLoss() # Binary Cross-Entropy loss
- optimizer = optim.Adam(self.parameters(), lr=0.001)
- scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.2, patience=3, verbose=True, min_lr=1e-6)
- for epoch in range(num_epochs):
- self.train() # Set the model to training mode
- total_loss = 0.0
- correct_train = 0
- total_train = 0
- print(f"Epoch [{epoch+1}/{num_epochs}] - Training...")
- for batch_idx, (inputs, labels) in enumerate(train_loader):
- inputs, labels = inputs.to(device), labels.to(device)
- optimizer.zero_grad()
- outputs = self(inputs)
- loss = criterion(outputs, labels.float().unsqueeze(1))
- loss.backward()
- optimizer.step()
- total_loss += loss.item() * inputs.size(0)
- predicted_labels = (outputs >= 0.0).float()
- correct_train += (predicted_labels == labels.float().unsqueeze(1)).sum().item()
- total_train += labels.size(0)
- print(f"Epoch [{epoch+1}/{num_epochs}] - Batch [{batch_idx+1}/{len(train_loader)}] - "
- f"Loss: {loss.item():.4f} - Train Accuracy: {correct_train / total_train:.4f}")
- average_loss = total_loss / len(train_loader.dataset)
- train_accuracy = correct_train / total_train
- self.train_loss.append(average_loss)
- self.train_accuracy.append(train_accuracy)
- self.eval()
- total_val_loss = 0.0
- correct_val = 0
- total_val = 0
- y_true = []
- y_pred = []
- with torch.no_grad():
- for inputs, labels in valid_loader:
- inputs, labels = inputs.to(device), labels.to(device)
- outputs = self(inputs)
- val_loss = criterion(outputs, labels.float().unsqueeze(1))
- total_val_loss += val_loss.item() * inputs.size(0)
- predicted_labels = (outputs >= 0.0).float()
- correct_val += (predicted_labels == labels.float().unsqueeze(1)).sum().item()
- total_val += labels.size(0)
- y_true.extend(labels.float().unsqueeze(1).cpu().numpy())
- y_pred.extend(predicted_labels.cpu().numpy())
- average_val_loss = total_val_loss / len(valid_loader.dataset)
- val_accuracy = correct_val / total_val
- self.val_loss.append(average_val_loss)
- self.val_accuracy.append(val_accuracy)
- print(f"Epoch [{epoch+1}/{num_epochs}] - "
- f"Train Loss: {average_loss:.4f} - Train Accuracy: {train_accuracy:.4f} - "
- f"Val Loss: {average_val_loss:.4f} - Val Accuracy: {val_accuracy:.4f} - "
- f"LR: {scheduler.optimizer.param_groups[0]['lr']:.6f}")
- scheduler.step(average_val_loss)
- self.plot_metrics_graph()
- self.plot_confusion_matrix(y_true, y_pred)

- # 初始化模型
- num_classes = 1 # 你的类别数
- model = EfficientNetWithAttention(num_classes=num_classes, pretrained=True)
- # 输出模型结构
- #print(model)
- device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
- model.to(device)
- # Train the model using the integrated training loop
- num_epochs = config.epoch # Change this in last
- model.train_model(train_loader, valid_loader, num_epochs, device)
- #torch.save(model.state_dict(), 'model_efficient_b4.pth')
