赞
踩
IEEE-Datasets-STEW:SIMULTANEOUS TASK EEG WORKLOAD DATASET :
该数据集由48名受试者的原始EEG数据组成,他们参加了利用SIMKAP多任务测试进行的多任务工作负荷实验。受试者在休息时的大脑活动也在测试前被记录下来,也包括在其中。Emotiv EPOC设备,采样频率为128Hz,有14个通道,用于获取数据,每个案例都有2.5分钟的EEG记录。受试者还被要求在每个阶段后以1到9的评分标准对其感知的心理工作量进行评分,评分结果在单独的文件中提供。
说明:每个受试者的数据遵循命名惯例:subno_task.txt。例如,sub01_lo.txt将是受试者1在休息时的原始脑电数据,而sub23_hi.txt将是受试者23在多任务测试中的原始脑电数据。每个数据文件的行对应于记录中的样本,列对应于EEG设备的14个通道: AF3, F7, F3, FC5, T7, P7, O1, O2, P8, T8, FC6, F4, F8, AF4。
数据说明、下载地址:
STEW: Simultaneous Task EEG Workload Data Set | IEEE Journals & Magazine | IEEE Xplore
本次使用ResNet50,去做此情感数据的分类工作,数据导入+模型训练+测试代码如下:
- import torch
- import torchvision.datasets
- from torch.utils.data import Dataset # 继承Dataset类
- import os
- from PIL import Image
- import numpy as np
- from torchvision import transforms
-
-
- # 预处理
- data_transform = transforms.Compose([
- transforms.Resize((224,224)), # 缩放图像
- transforms.ToTensor(), # 转为Tenso
- transforms.Normalize((0.5,0.5,0.5),(0.5,0.5,0.5)) # 标准化
- ])
-
-
- path = r'C:\STEW\test'
-
- for root,dirs,files in os.walk(path):
- print('root',root) #遍历到该目录地址
- print('dirs',dirs) #遍历到该目录下的子目录名 []
- print('files',files) #遍历到该目录下的文件 []
-
- def read_txt_files(path):
- # 创建文件名列表
- file_names = []
- # 遍历给定目录及其子目录下的所有文件
- for root, dirs, files in os.walk(path):
- # 遍历所有文件
- for file in files:
- # 如果是 .txt 文件,则加入文件名列表
- if file.endswith('.txt'): # endswith () 方法用于判断字符串是否以指定后缀结尾,如果以指定后缀结尾返回True,否则返回False。
- file_names.append(os.path.join(root, file))
- # 返回文件名列表
- return file_names
-
- class DogCat(Dataset): # 数据处理
- def __init__(self,root,transforms = None): # 初始化,指定路径,是否预处理等等
-
- #['cat.15454.jpg', 'cat.445.jpg', 'cat.46456.jpg', 'cat.656165.jpg', 'dog.123.jpg', 'dog.15564.jpg', 'dog.4545.jpg', 'dog.456465.jpg']
- imgs = os.listdir(root)
-
- self.imgs = [os.path.join(root,img) for img in imgs] # 取出root下所有的文件
- self.transforms = data_transform # 图像预处理
-
- def __getitem__(self, index): # 读取图片
- img_path = self.imgs[index]
- label = 1 if 'dog' in img_path.split('/')[-1] else 0
- #然后,就可以根据每个路径的id去做label了。将img_path 路径按照 '/ '分割,-1代表取最后一个字符串,如果里面有dog就为1,cat就为0.
-
- data = Image.open(img_path)
-
- if self.transforms: # 图像预处理
- data = self.transforms(data)
-
- return data,label
-
- def __len__(self):
- return len(self.imgs)
-
- dataset = DogCat('./data/',transforms=True)
-
- for img,label in dataset:
- print('img:',img.size(),'label:',label)
- '''
- img: torch.Size([3, 224, 224]) label: 0
- img: torch.Size([3, 224, 224]) label: 0
- img: torch.Size([3, 224, 224]) label: 0
- img: torch.Size([3, 224, 224]) label: 0
- img: torch.Size([3, 224, 224]) label: 1
- img: torch.Size([3, 224, 224]) label: 1
- img: torch.Size([3, 224, 224]) label: 1
- img: torch.Size([3, 224, 224]) label: 1
- '''
-
- import os
-
- # 获取file_path路径下的所有TXT文本内容和文件名
- def get_text_list(file_path):
- files = os.listdir(file_path)
- text_list = []
- for file in files:
- with open(os.path.join(file_path, file), "r", encoding="UTF-8") as f:
- text_list.append(f.read())
- return text_list, files
-
- class ImageFolderCustom(Dataset):
-
- # 2. Initialize with a targ_dir and transform (optional) parameter
- def __init__(self, targ_dir: str, transform=None) -> None:
-
- # 3. Create class attributes
- # Get all image paths
- self.paths = list(pathlib.Path(targ_dir).glob("*/*.jpg")) # note: you'd have to update this if you've got .png's or .jpeg's
- # Setup transforms
- self.transform = transform
- # Create classes and class_to_idx attributes
- self.classes, self.class_to_idx = find_classes(targ_dir)
-
- # 4. Make function to load images
- def load_image(self, index: int) -> Image.Image:
- "Opens an image via a path and returns it."
- image_path = self.paths[index]
- return Image.open(image_path)
-
- # 5. Overwrite the __len__() method (optional but recommended for subclasses of torch.utils.data.Dataset)
- def __len__(self) -> int:
- "Returns the total number of samples."
- return len(self.paths)
-
- # 6. Overwrite the __getitem__() method (required for subclasses of torch.utils.data.Dataset)
- def __getitem__(self, index: int) -> Tuple[torch.Tensor, int]:
- "Returns one sample of data, data and label (X, y)."
- img = self.load_image(index)
- class_name = self.paths[index].parent.name # expects path in data_folder/class_name/image.jpeg
- class_idx = self.class_to_idx[class_name]
-
- # Transform if necessary
- if self.transform:
- return self.transform(img), class_idx # return data, label (X, y)
- else:
- return img, class_idx # return data, label (X, y)
-
- import torchvision as tv
- import numpy as np
- import torch
- import time
- import os
- from torch import nn, optim
- from torchvision.models import resnet50
- from torchvision.transforms import transforms
-
- os.environ["CUDA_VISIBLE_DEVICE"] = "0,1,2"
-
- # cifar-10进行测验
-
- class Cutout(object):
- """Randomly mask out one or more patches from an image.
- Args:
- n_holes (int): Number of patches to cut out of each image.
- length (int): The length (in pixels) of each square patch.
- """
- def __init__(self, n_holes, length):
- self.n_holes = n_holes
- self.length = length
-
- def __call__(self, img):
- """
- Args:
- img (Tensor): Tensor image of size (C, H, W).
- Returns:
- Tensor: Image with n_holes of dimension length x length cut out of it.
- """
- h = img.size(1)
- w = img.size(2)
-
- mask = np.ones((h, w), np.float32)
-
- for n in range(self.n_holes):
- y = np.random.randint(h)
- x = np.random.randint(w)
-
- y1 = np.clip(y - self.length // 2, 0, h)
- y2 = np.clip(y + self.length // 2, 0, h)
- x1 = np.clip(x - self.length // 2, 0, w)
- x2 = np.clip(x + self.length // 2, 0, w)
-
- mask[y1: y2, x1: x2] = 0.
-
- mask = torch.from_numpy(mask)
- mask = mask.expand_as(img)
- img = img * mask
-
- return img
-
- def load_data_cifar10(batch_size=128,num_workers=2):
- # 操作合集
- # Data augmentation
- train_transform_1 = transforms.Compose([
- transforms.Resize((224, 224)),
- transforms.RandomHorizontalFlip(), # 随机水平翻转
- transforms.RandomRotation(degrees=(-80,80)), # 随机角度翻转
- transforms.ToTensor(),
- transforms.Normalize(
- (0.491339968,0.48215827,0.44653124), (0.24703233,0.24348505,0.26158768) # 两者分别为(mean,std)
- ),
- Cutout(1, 16), # 务必放在ToTensor的后面
- ])
- train_transform_2 = transforms.Compose([
- transforms.Resize((224, 224)),
- transforms.ToTensor(),
- transforms.Normalize(
- (0.491339968, 0.48215827, 0.44653124), (0.24703233, 0.24348505, 0.26158768) # 两者分别为(mean,std)
- )
- ])
- test_transform = transforms.Compose([
- transforms.Resize((224,224)),
- transforms.ToTensor(),
- transforms.Normalize(
- (0.491339968, 0.48215827, 0.44653124), (0.24703233, 0.24348505, 0.26158768) # 两者分别为(mean,std)
- )
- ])
- # 训练集1
- trainset1 = tv.datasets.CIFAR10(
- root='data',
- train=True,
- download=False,
- transform=train_transform_1,
- )
- # 训练集2
- trainset2 = tv.datasets.CIFAR10(
- root='data',
- train=True,
- download=False,
- transform=train_transform_2,
- )
- # 测试集
- testset = tv.datasets.CIFAR10(
- root='data',
- train=False,
- download=False,
- transform=test_transform,
- )
- # 训练数据加载器1
- trainloader1 = torch.utils.data.DataLoader(
- trainset1,
- batch_size=batch_size,
- shuffle=True,
- num_workers=num_workers,
- pin_memory=(torch.cuda.is_available())
- )
- # 训练数据加载器2
- trainloader2 = torch.utils.data.DataLoader(
- trainset2,
- batch_size=batch_size,
- shuffle=True,
- num_workers=num_workers,
- pin_memory=(torch.cuda.is_available())
- )
- # 测试数据加载器
- testloader = torch.utils.data.DataLoader(
- testset,
- batch_size=batch_size,
- shuffle=False,
- num_workers=num_workers,
- pin_memory=(torch.cuda.is_available())
- )
-
- return trainloader1,trainloader2,testloader
-
- def main():
- start = time.time()
- batch_size = 128
- cifar_train1,cifar_train2,cifar_test = load_data_cifar10(batch_size=batch_size)
- model = resnet50().cuda()
- # model.load_state_dict(torch.load('_ResNet50.pth'))
- # 存在已保存的参数文件
- # model = nn.DataParallel(model,device_ids=[0,]) # 又套一层
- model = nn.DataParallel(model,device_ids=[0,1,2])
- loss = nn.CrossEntropyLoss().cuda()
- optimizer = optim.Adam(model.parameters(),lr=0.001)
- for epoch in range(50):
- model.train() # 训练时务必写
- loss_=0.0
- num=0.0
- # train on trainloader1(data augmentation) and trainloader2
- for i,data in enumerate(cifar_train1,0):
- x, label = data
- x, label = x.cuda(),label.cuda()
- # x
- p = model(x) #output
- l = loss(p,label) #loss
- optimizer.zero_grad()
- l.backward()
- optimizer.step()
- loss_ += float(l.mean().item())
- num+=1
- for i, data in enumerate(cifar_train2, 0):
- x, label = data
- x, label = x.cuda(), label.cuda()
- # x
- p = model(x)
- l = loss(p, label)
- optimizer.zero_grad()
- l.backward()
- optimizer.step()
- loss_ += float(l.mean().item())
- num += 1
- model.eval() # 评估时务必写
- print("loss:",float(loss_)/num)
- # test on trainloader2,testloader
- with torch.no_grad():
- total_correct = 0
- total_num = 0
- for x, label in cifar_train2:
- # [b, 3, 32, 32]
- # [b]
- x, label = x.cuda(), label.cuda()
- # [b, 10]
- logits = model(x)
- # [b]
- pred = logits.argmax(dim=1)
- # [b] vs [b] => scalar tensor
- correct = torch.eq(pred, label).float().sum().item()
- total_correct += correct
- total_num += x.size(0)
- # print(correct)
- acc_1 = total_correct / total_num
- # Test
- with torch.no_grad():
- total_correct = 0
- total_num = 0
- for x, label in cifar_test:
- # [b, 3, 32, 32]
- # [b]
- x, label = x.cuda(), label.cuda()
- # [b, 10]
- logits = model(x) #output
- # [b]
- pred = logits.argmax(dim=1)
- # [b] vs [b] => scalar tensor
- correct = torch.eq(pred, label).float().sum().item()
- total_correct += correct
- total_num += x.size(0)
- # print(correct)
- acc_2 = total_correct / total_num
- print(epoch+1,'train acc',acc_1,'|','test acc:', acc_2)
- # 保存时只保存model.module
- torch.save(model.module.state_dict(),'resnet50.pth')
- print("The interval is :",time.time() - start)
-
-
- if __name__ == '__main__':
- main()
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。