- import torch
- from datasets import load_dataset
- from transformers import BertTokenizer, BertModel
- # 定义全局分词工具
- tokenizer = BertTokenizer.from_pretrained('bert-base-chinese')
- # 定义数据集
- class Dataset(torch.utils.data.Dataset):
- def __init__(self, split):
- self.dataset = load_dataset(path = 'lansinuote/ChnSentiCorp', split = split) # 加载数据集
- def __len__(self):
- return len(self.dataset)
- def __getitem__(self, i):
- text = self.dataset[i]['text']
- label = self.dataset[i]['label']
- return text, label
- # 自定义数据的处理(加载)方式
- def my_collate_fn(data): # data 的类型与 dataset 的返回值相同,本例中dataset返回一个列表[text, label]
- # 根据dataset的返回结果,取出对应的text和label
- sents = [i[0] for i in data]
- labels = [i[1] for i in data]
- # 使用全局的分词工具进行编码
- data = tokenizer.batch_encode_plus(batch_text_or_text_pairs = sents,
- truncation = True,
- padding = 'max_length',
- max_length = 500,
- return_tensors = 'pt',
- return_length = True)
- input_ids = data['input_ids']
- attention_mask = data['attention_mask']
- token_type_ids = data['token_type_ids']
- labels = torch.LongTensor(labels)
- return input_ids, attention_mask, token_type_ids, labels
- def main():
- dataset = Dataset('train') # 初始化训练集
- # print(len(dataset), dataset[0])
- # 定义dataloader
- loader = torch.utils.data.DataLoader(dataset = dataset,
- batch_size = 16,
- collate_fn = my_collate_fn,
- shuffle = True,
- drop_last = True)
- # 遍历dataloader加载数据
- for i, (input_ids, attention_mask, token_type_ids, labels) in enumerate(loader):
- break
- print(len(loader))
- print(input_ids.shape, attention_mask.shape, token_type_ids.shape, labels) # 打印一个样本
- # 加载预训练模型
- model = BertModel.from_pretrained('bert-base-chinese')
- for param in model.parameters(): # 不进行梯度计算和反向传播
- param.requires_grad_(False)
- # 调用预训练模型推理一个样本
- output = model(input_ids = input_ids, attention_mask = attention_mask, token_type_ids = token_type_ids)
- print(output.last_hidden_state.shape) # 打印最后一个隐层输出特征的维度
- if __name__ == "__main__":
- main()
- print("All done!")
- # dataloader单个样本:
- torch.Size([16, 500])
- torch.Size([16, 500])
- torch.Size([16, 500])
- tensor([1, 0, 1, 0, 1, 0, 0, 1, 1, 1, 0, 1, 0, 1, 0, 1])
- # 最后一个隐层的输出特征:
- torch.Size([16, 500, 768])
利用预训练 bert 模型最后一个隐层的[cls] token的特征进行中文分类;
- import torch
- from datasets import load_dataset
- from transformers import BertTokenizer, BertModel, AdamW
- # 定义全局分词工具
- tokenizer = BertTokenizer.from_pretrained('bert-base-chinese')
- # 定义数据集
- class Dataset(torch.utils.data.Dataset):
- def __init__(self, split):
- self.dataset = load_dataset(path = 'lansinuote/ChnSentiCorp', split = split) # 加载数据集
- def __len__(self):
- return len(self.dataset)
- def __getitem__(self, i):
- text = self.dataset[i]['text']
- label = self.dataset[i]['label']
- return text, label
- # 自定义数据的处理(加载)方式
- def my_collate_fn(data): # data 的类型与 dataset 的返回值相同,本例中dataset返回一个列表[text, label]
- # 根据dataset的返回结果,取出对应的text和label
- sents = [i[0] for i in data]
- labels = [i[1] for i in data]
- # 使用全局的分词工具进行编码
- data = tokenizer.batch_encode_plus(batch_text_or_text_pairs = sents,
- truncation = True,
- padding = 'max_length',
- max_length = 500,
- return_tensors = 'pt',
- return_length = True)
- input_ids = data['input_ids']
- attention_mask = data['attention_mask']
- token_type_ids = data['token_type_ids']
- labels = torch.LongTensor(labels)
- return input_ids, attention_mask, token_type_ids, labels
- # 定义下游任务模型
- class Model(torch.nn.Module):
- def __init__(self):
- super().__init__()
- self.pretrained_model = BertModel.from_pretrained('bert-base-chinese') # 加载预训练模型
- self.fc = torch.nn.Linear(768, 2)
- # 固定预训练模型
- for param in self.pretrained_model.parameters():
- param.requires_grad = False
- def forward(self, input_ids, attention_mask, token_type_ids):
- with torch.no_grad():
- output = self.pretrained_model(input_ids=input_ids,
- attention_mask=attention_mask,
- token_type_ids=token_type_ids)
- output = self.fc(output.last_hidden_state[:, 0]) # 利用最后一个隐层的[cls]token特征进行分类
- output = output.softmax(dim=1)
- return output
- # 定义测试函数
- def test(model, dataset):
- model.eval()
- correct = 0
- total = 0
- # 定义加载测试集的dataloader
- loader_test = torch.utils.data.DataLoader(dataset = dataset,
- batch_size = 32,
- collate_fn = my_collate_fn,
- shuffle = True,
- drop_last = True)
- for idx, (input_ids, attention_mask, token_type_ids, labels) in enumerate(loader_test):
- if idx == 5: # 测试5个batch
- break
- print(idx)
- with torch.no_grad():
- input_ids = input_ids.cuda()
- attention_mask = attention_mask.cuda()
- token_type_ids = token_type_ids.cuda()
- labels = labels.cuda()
- output = model(input_ids=input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids)
- output = output.argmax(dim=1)
- correct += (output == labels).sum().item()
- total += len(labels)
- print("Acc: ", correct / total) # 打印5个batch的总体准确率
- def main():
- dataset = Dataset('train') # 初始化训练集
- # print(len(dataset), dataset[0])
- # 定义dataloader
- loader = torch.utils.data.DataLoader(dataset = dataset,
- batch_size = 16,
- num_workers = 8,
- collate_fn = my_collate_fn,
- shuffle = True,
- drop_last = True)
- # 初始化模型
- model = Model()
- model = model.cuda() # 使用GPU
- # 初始化优化器和损失函数
- optimizer = AdamW(model.parameters(), lr=5e-4)
- criterion = torch.nn.CrossEntropyLoss().cuda()
- # 训练模型
- model.train()
- for idx, (input_ids, attention_mask, token_type_ids, labels) in enumerate(loader): # 遍历加载数据
- input_ids = input_ids.cuda()
- attention_mask = attention_mask.cuda()
- token_type_ids = token_type_ids.cuda()
- labels = labels.cuda()
- output = model(input_ids=input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids)
- loss = criterion(output, labels)
- loss.backward()
- optimizer.step()
- optimizer.zero_grad()
- if idx % 5 == 0: # 每5个batch打印当前准确率和损失
- output = output.argmax(dim=1)
- accuracy = (output == labels).sum().item() / len(labels)
- print(idx, loss.item(), accuracy)
- if idx == 300: # 使用300个batch进行训练
- break
- # 测试模型
- test(model, Dataset('validation'))
- if __name__ == "__main__":
- main()
- ...
- 260 0.5995925664901733 0.75
- 265 0.3791050910949707 1.0
- 270 0.42692136764526367 0.9375
- 275 0.4765201210975647 0.875
- 280 0.4071955382823944 0.9375
- 285 0.4194560945034027 0.875
- 290 0.449373722076416 0.9375
- 295 0.38813596963882446 1.0
- 300 0.5164415240287781 0.875
- Acc: 0.89375
对训练数据的第15个词进行 mask 掉,预测第15个词;
利用 bert 模型提取特征,对最后一个隐层的第15个token特征进行分类;
分类用的是一个简单的线性层,其维度为(768, token.vocab_size),其中token.vocab_sized的大小为21128,即预测21128个词的分类分数,再与真实标签进行损失计算;
- import torch
- from datasets import load_dataset, load_from_disk
- from transformers import BertTokenizer, BertModel, AdamW
- # 定义全局分词工具
- token = BertTokenizer.from_pretrained('bert-base-chinese')
- # 定义数据集
- class Dataset(torch.utils.data.Dataset):
- def __init__(self, split):
- dataset = load_dataset(path = 'lansinuote/ChnSentiCorp', split = split)
- # dataset = load_from_disk('./data/ChnSentiCorp')
- # dataset = dataset[split]
- def f(data):
- return len(data['text']) > 30
- self.dataset = dataset.filter(f) # 筛选数据集
- def __len__(self):
- return len(self.dataset)
- def __getitem__(self, i):
- text = self.dataset[i]['text']
- return text
- def collate_fn(data):
- # batch编码
- data = token.batch_encode_plus(batch_text_or_text_pairs = data,
- truncation = True,
- padding = 'max_length',
- max_length = 30, # padding到30个词
- return_tensors = 'pt', # 返回pytorch格式
- return_length = True)
- input_ids = data['input_ids']
- attention_mask = data['attention_mask']
- token_type_ids = data['token_type_ids']
- # 把第15个词固定替换为mask
- labels = input_ids[:, 15].reshape(-1).clone() # 记录真实标签
- input_ids[:, 15] = token.get_vocab()[token.mask_token]
- return input_ids, attention_mask, token_type_ids, labels
- # 定义下游任务模型
- class Model(torch.nn.Module):
- def __init__(self):
- super().__init__()
- self.decoder = torch.nn.Linear(768, token.vocab_size, bias=False) # token.vocab_size为21128,预测21128个词的分类分数
- self.bias = torch.nn.Parameter(torch.zeros(token.vocab_size))
- self.decoder.bias = self.bias
- self.pretrained = BertModel.from_pretrained('bert-base-chinese')
- # 固定预训练模型
- for param in self.pretrained.parameters():
- param.requires_grad = False
- def forward(self, input_ids, attention_mask, token_type_ids):
- # 使用bert模型提取特征
- with torch.no_grad():
- output = self.pretrained(input_ids=input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids)
- output = self.decoder(output.last_hidden_state[:, 15])
- return output
- # 测试
- def test(model):
- model.eval()
- correct = 0
- total = 0
- loader_test = torch.utils.data.DataLoader(dataset = Dataset('test'),
- batch_size = 32,
- collate_fn = collate_fn,
- shuffle = True,
- drop_last = True)
- for idx, (input_ids, attention_mask, token_type_ids, labels) in enumerate(loader_test):
- input_ids = input_ids.cuda()
- attention_mask = attention_mask.cuda()
- token_type_ids = token_type_ids.cuda()
- labels = labels.cuda()
- if idx == 15: # 测试15个batch
- break
- with torch.no_grad():
- output = model(input_ids=input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids)
- output = output.argmax(dim=1)
- correct += (output == labels).sum().item()
- total += len(labels)
- print(token.decode(input_ids[0])) # 打印测试数据
- print("真实标签: ", token.decode(labels[0]), "预测标签: ", token.decode(output[0]))
- print("Acc: ", correct / total)
- def main():
- # 初始化训练集
- dataset = Dataset('train')
- # 定义dataloader
- loader = torch.utils.data.DataLoader(dataset = dataset,
- batch_size = 16,
- collate_fn = collate_fn,
- shuffle = True,
- drop_last = True)
- # 初始化模型
- model = Model().cuda()
- # 训练
- optimizer = AdamW(model.parameters(), lr=5e-4)
- criterion = torch.nn.CrossEntropyLoss().cuda()
- model.train()
- for epoch in range(5):
- for idx, (input_ids, attention_mask, token_type_ids, labels) in enumerate(loader):
- input_ids = input_ids.cuda()
- attention_mask = attention_mask.cuda()
- token_type_ids = token_type_ids.cuda()
- labels = labels.cuda()
- output = model(input_ids=input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids)
- loss = criterion(output, labels)
- loss.backward()
- optimizer.step()
- optimizer.zero_grad()
- if idx % 50 == 0:
- output = output.argmax(dim=1)
- accuracy = (output == labels).sum().item() / len(labels)
- print(epoch, idx, loss.item(), accuracy)
- # 测试模型
- test(model)
- if __name__ == "__main__":
- main()
- 4 200 0.7910566329956055 0.75
- 4 250 0.9690109491348267 0.8125
- 4 300 0.4056988060474396 0.9375
- 4 350 0.31916332244873047 1.0
- 4 400 0.8943865895271301 0.6875
- 4 450 0.4540601968765259 0.9375
- 4 500 0.7437821626663208 0.75
- 4 550 0.3669029474258423 0.9375
- [CLS] 之 前 看 到 很 多 评 论 , 说 很 好 兴 冲 [MASK] 买 了 , 看 了 看 感 觉 非 常 失 望 炒 [SEP]
- 真实标签: 冲 预测标签: 冲
- [CLS] 刚 看 了 一 章 就 丢 边 上 了 , 光 盘 也 [MASK] 知 道 放 哪 里 了 , 很 垃 圾 , 很 多 [SEP]
- 真实标签: 不 预测标签: 不
- [CLS] 酒 店 生 意 清 淡 , 大 堂 里 都 没 几 个 [MASK] 人 。 房 间 倒 是 不 小 , 但 觉 得 被 [SEP]
- 真实标签: 客 预测标签: 客
- [CLS] 选 购 的 时 候 , 还 是 有 货 的 。 最 后 [MASK] 通 知 我 说 , 没 货 了 。 当 当 的 服 [SEP]
- 真实标签: 却 预测标签: ,
- [CLS] 简 单 , 大 方 , 在 同 类 尺 寸 的 款 型 [MASK] 笔 记 本 中 不 显 厚 重 , 轻 薄 感 ! [SEP]
- 真实标签: 的 预测标签: 的
- [CLS] 当 时 是 同 事 极 力 推 荐 这 本 书 。 我 [MASK] 到 网 上 的 介 绍 和 那 么 多 的 [UNK] 名 [SEP]
- 真实标签: 看 预测标签: 看
- [CLS] 酒 店 位 置 离 火 车 站 很 近 , 走 路 10 [MASK] 钟 不 到 就 能 到 。 。 。 服 务 不 错 [SEP]
- 真实标签: 分 预测标签: 分
- [CLS] 买 之 前 也 没 见 过 这 本 书, 听 他 们 [MASK] 的 天 花 乱 坠, 翻 了 几 页 就 够 了 [SEP]
- 真实标签: 说 预测标签: 写
- [CLS] 看 了 百 家 讲 坛 , 来 了 兴 趣 。 因 为 [MASK] 作 忙 , 只 看 了 一 点 点 , 因 此 买 [SEP]
- 真实标签: 工 预测标签: 工
- [CLS] 第 一 次 买 的 拉 拉 升 职 记 , 三 天 到 [MASK] ( 我 住 北 京 三 环 边 上 ) , 还 比 [SEP]
- 真实标签: 货 预测标签: 手
- [CLS] 房 间 隔 音 效 果 极 差 , 深 夜 如 果 隔 [MASK] 客 人 大 声 喧 哗 的 话 [UNK] [UNK] 服 务 员 [SEP]
- 真实标签: 壁 预测标签: 音
- [CLS] 读 过 她 的 《 茶 人 三 部 曲 》 , 一 口 [MASK] 读 完 的 。 一 直 在 搜 寻 她 的 文 字 [SEP]
- 真实标签: 气 预测标签: 气
- [CLS] 条 件 、 服 务 、 设 施 都 很 好 , 房 间 [MASK] 干 净 、 很 舒 适 , 尤 其 是 前 台 服 [SEP]
- 真实标签: 很 预测标签: 很
- [CLS] 大 俗 即 大 雅 ! 这 是 看 郑 振 铎 先 生 [MASK] 部 书 后 最 由 衷 的 感 想 , 看 过 中 [SEP]
- 真实标签: 三 预测标签: 这
- [CLS] 觉 得 相 当 没 意 思 的 一 本 书 。 不 伦 [MASK] 类 的 。 看 的 时 候 很 纠 结 , 看 完 [SEP]
- 真实标签: 不 预测标签: 不
- Acc: 0.6979166666666666
- import torch
- import random
- from datasets import load_dataset, load_from_disk
- from transformers import BertTokenizer, BertModel, AdamW
- # 定义全局分词工具
- token = BertTokenizer.from_pretrained('bert-base-chinese')
- # 定义数据集
- class Dataset(torch.utils.data.Dataset):
- def __init__(self, split):
- # dataset = load_dataset(path='lansinuote/ChnSentiCorp', split=split)
- dataset = load_from_disk('./data/ChnSentiCorp')
- dataset = dataset[split]
- def f(data):
- return len(data['text']) > 40
- self.dataset = dataset.filter(f)
- def __len__(self):
- return len(self.dataset)
- def __getitem__(self, i):
- text = self.dataset[i]['text']
- # 切分一句话为前半句和后半句
- sentence1 = text[:20]
- sentence2 = text[20:40]
- label = 0 # label为0表示为同一句
- # 有一半的概率把后半句替换为一句无关的话
- if random.randint(0, 1) == 0:
- j = random.randint(0, len(self.dataset) - 1)
- sentence2 = self.dataset[j]['text'][20:40]
- label = 1
- return sentence1, sentence2, label
- def collate_fn(data):
- sents = [i[:2] for i in data]
- labels = [i[2] for i in data]
- # 编码
- data = token.batch_encode_plus(batch_text_or_text_pairs = sents,
- truncation = True,
- padding = 'max_length',
- max_length = 45,
- return_tensors = 'pt',
- return_length = True,
- add_special_tokens = True)
- input_ids = data['input_ids']
- attention_mask = data['attention_mask']
- token_type_ids = data['token_type_ids']
- labels = torch.LongTensor(labels)
- return input_ids, attention_mask, token_type_ids, labels
- # 定义下游任务模型
- class Model(torch.nn.Module):
- def __init__(self):
- super().__init__()
- self.fc = torch.nn.Linear(768, 2) # 二分类
- self.pretrained = BertModel.from_pretrained('bert-base-chinese')
- # 固定预训练模型
- for param in self.pretrained.parameters():
- param.requires_grad = False
- def forward(self, input_ids, attention_mask, token_type_ids):
- with torch.no_grad():
- output = self.pretrained(input_ids = input_ids, attention_mask = attention_mask, token_type_ids = token_type_ids)
- output = self.fc(output.last_hidden_state[:, 0])
- output = output.softmax(dim=1)
- return output
- def main():
- model = Model().cuda()
- optimizer = AdamW(model.parameters(), lr=5e-4)
- criterion = torch.nn.CrossEntropyLoss().cuda()
- # dataloader
- loader = torch.utils.data.DataLoader(dataset = Dataset('train'),
- batch_size = 8,
- collate_fn = collate_fn,
- shuffle = True,
- drop_last = True)
- # 训练
- model.train()
- for idx, (input_ids, attention_mask, token_type_ids, labels) in enumerate(loader):
- input_ids = input_ids.cuda()
- attention_mask = attention_mask.cuda()
- token_type_ids = token_type_ids.cuda()
- labels = labels.cuda()
- output = model(input_ids=input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids)
- loss = criterion(output, labels)
- loss.backward()
- optimizer.step()
- optimizer.zero_grad()
- if idx % 5 == 0: # 每5个batch打印
- output = output.argmax(dim=1)
- accuracy = (output == labels).sum().item() / len(labels)
- print(idx, loss.item(), accuracy)
- if idx == 300: # 训练300个batch
- break
- # 测试
- test(model)
- # 定义测试函数
- def test(model):
- model.eval()
- correct = 0
- total = 0
- loader_test = torch.utils.data.DataLoader(dataset = Dataset('test'),
- batch_size = 32,
- collate_fn = collate_fn,
- shuffle = True,
- drop_last = True)
- for idx, (input_ids, attention_mask, token_type_ids, labels) in enumerate(loader_test):
- input_ids = input_ids.cuda()
- attention_mask = attention_mask.cuda()
- token_type_ids = token_type_ids.cuda()
- labels = labels.cuda()
- if idx == 5: # 测试5个batch
- break
- with torch.no_grad():
- output = model(input_ids=input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids)
- pred = output.argmax(dim=1)
- correct += (pred == labels).sum().item()
- total += len(labels)
- print('acc:', correct / total)
- if __name__ == "__main__":
- main()
- 240 0.39283961057662964 0.875
- 245 0.7069525122642517 0.5
- 250 0.41953372955322266 0.875
- 255 0.5032698512077332 0.75
- 260 0.6422066688537598 0.75
- 265 0.5467717051506042 0.75
- 270 0.4452913701534271 0.875
- 275 0.5998544096946716 0.625
- 280 0.4301206171512604 0.875
- 285 0.5177156329154968 0.75
- 290 0.3987200856208801 0.875
- 295 0.33609679341316223 1.0
- 300 0.3723036050796509 0.875
- acc: 0.925
