赞
踩
首先,我们需要定义一个配置文件,定义一系列要使用到的参数
class Config(object): ''' 配置参数 ''' def __init__(self,dataset): self.model_name='LiChenhao Bert Model' # 训练集,测试集,检验集,类别,模型训练结果保存路径 # self.train_path=dataset+'/data/dev.txt' # self.test_path=dataset+'/data/dev.txt' # self.dev_path=dataset+'/data/dev.txt' self.train_path=dataset+'/data/train.txt' self.test_path=dataset+'/data/test.txt' self.dev_path=dataset+'/data/dev.txt' self.class_list=[x.strip() for x in open(dataset+'/data/class.txt').readlines()] self.save_path=dataset+'/saved_dict/'+self.model_name+'.ckpt' # 配置使用检测GPU self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') # 若超过1000还没有提升就提前结束训练 self.require_improvement=1000 # 类别数 self.num_classes = len(self.class_list) # 整体训练次数 self.num_epoch=3 # batch大小 self.batch_size=128 #每个序列最大token数 self.pad_size=32 #学习率 self.learning_rate = 1e-5 self.bert_path='bert_pretrain' #预训练网络相对路径 self.tokenizer=BertTokenizer.from_pretrained(self.bert_path) #加载预训练Bert网络对输入数据进行embedding self.hidden_size=768 #Bert模型后自定义分类器(单隐层全连接网络)的隐层节点数
上述配置文件中定义了如下内容
我们自定义的网络要继承自 nn.Module
详细代码如下:
class Model(nn.Module): def __init__(self,config): super(Model,self).__init__() self.bert=BertModel.from_pretrained(config.bert_path) #从路径加载预训练模型 for param in self.bert.parameters(): param.requires_grad = True # 使参数可更新 self.fc=nn.Linear(config.hidden_size,config.num_classes) # 自定义全连接层 ,输入数,输出数(多分类数量),bert 模型最后带了一个输出输出是768,这里的输入要和bert最后的输出统一 def forward(self,x): context=x[0] #128*32 batch_size*seq_length mask=x[2] #128*32 batch_size*seq_length # 第一个参数 是所有输入对应的输出 第二个参数 是 cls最后接的分类层的输出 _,pooled = self.bert(context,attention_mask=mask,output_all_encoded_layers=False) # output_all_encoded_layers 是否将bert中每层(12层)的都输出,false只输出最后一层【128*768】 out=self.fc(pooled) # 128*10 return out
forward(self,x)函数是Bert中一个特殊文章函数,forward(self,x)函数详细解析请看此文章
这里输入的数据的结构为 [输入的token序列,序列真实长度,mask序列]
,输入数据的格式和数据预处理部分相关,这部分将在后边详细叙述
预训练的bert模型,需要 输入的token序列和mask序列,因此前两行代码分别用于提取输入数据中的token序列和mask序列,Bert模型中还有一个output_all_encoded_layers
参数需要指定,这个参数为True时,Bert模型会将内部12层结构的输出拼接在一起并返回,如果为False,Bert只返回最后一层的输出;
接下来获取bert模型的返回值,bert会有两个返回值;
第一个参数返回的是每个token对应的输出,当output_all_encoded_layers
参数值不同时,第一个参数返回的结构也不同
output_all_encoded_layers
参数为 True
时,第一个参数是一个数组,数组内包含12个完整的torch.FloatTensor,对应 Bert 的12层 Self-Attention 块 每个torch.FloatTensor的维度是:batch_size * 序列长度 * Bert内部FC网络节点数output_all_encoded_layers
参数为 False
时,第一个参数是一个torch.FloatTensor,为Bert最后一层 Self-Attention 块的输出 维度是:batch_size * 序列长度 * Bert内部FC网络节点数第二个参数返回的是:第一个token对应的输出经一个分类器(全连接网络)处理后得到的值,这个全连接网络的隐层节点数和Bert的隐层节点数相同,因此分类器前后数据结构并未改变;这个参数主要用于解决序列级任务,后边可以连接各种网络结构
在我们的模型中 我们使用Bert做语句分类,属于序列级任务因此要使用Bert的第二个参数;
我们将Bert的第二个参数放入在__ init _函数中定义好的全连接网络中,最后将结果返回(最后两行代码)
注:有人会问,Bert返回的第二个参数不是已经经过分类器处理了吗,为什么还要在接一个分类器?因为Bert
内部的预训练分类器输出一般不等于我们数据总得类别数,因此这个外接的全连接网络除了进一步分类,还有改变分类器输出结构的作用
PAD,CLS='[PAD]','[CLS]' def load_dataset(file_path,config): """ :param file_path: :param config: :return: """ contents=[] with open(file_path,encoding='utf-8') as f: pad_size = config.pad_size for line in tqdm(f): line=line.strip() if not line: continue content,label=line.split('\t') token=config.tokenizer.tokenize(content) token=[CLS]+token #序列级任务,要在当前序列前增加一个[CLS]标志位 seq_len=len(token) mask=[] token_ids=config.tokenizer.convert_tokens_to_ids(token) if len(token_ids) < pad_size: #长度不够 补0 mask = [1] * len(token_ids) + ([0] * (pad_size - len(token_ids))) # mask,token_size 顺序不能变,先弄token_ids 冷()会变,无法设置 mask token_ids = token_ids + ([0]*(pad_size-len(token_ids))) # token_ids += pad_size-len(token)*[0] else: #长度过长 截断 mask=[1]*pad_size token_ids=token_ids[:pad_size] seq_len=pad_size #seq_len长度改变,重新赋值 contents.append((token_ids,int(label),seq_len,mask)) return contents
这部分代码的工作就是按行读取数据文件,并生成Bert能够识别的数据结构和标签列表,这里解释一下几个关键问题:
补充:
当token长度大于510时(预训练模型Bert中token最长512,但首尾需要去添加[CLS]和[SEP]),需要对token进行截断,当前有以下三种截断方式:
迭代器只在调用时生成当前需要的这部分数据,而不是一次性生成所有数据;我们知道,程序在运行时会加载所有需要的数据,而训练Bert模型时每个epoch都需要打乱数据集内部顺序,如果一次性生成所有epoch需要的数据并加载到内存,很容易出现内存不足的情况;而使用迭代器就能够极大的降低内存的占用
将数据集转化成迭代器的代码如下:
class DatasetIterator(object): def __init__(self,dataset,batch_size,device): self.dataset=dataset self.batch_size=batch_size self.index=0 self.device=device self.n_batches = len(dataset)//batch_size self.residue = False #记录batch数量是否为整数 if len(dataset)%batch_size!=0: self.residue = True def __next__(self): if self.residue and self.index==self.n_batches: batches=self.dataset[self.index*self.batch_size:len(self.dataset)] self.index += 1 batches=self._to_tensor(batches) elif self.index>self.n_batches: self.index=0 raise StopIteration else: batches = self.dataset[self.index*self.batch_size:(self.index+1)*self.batch_size] self.index+=1 batches = self._to_tensor(batches) return batches def _to_tensor(self,datas): x=torch.LongTensor([item[0] for item in datas]).to(self.device) #样本 y=torch.LongTensor([item[1] for item in datas]).to(self.device) #标签 seq_len= torch.LongTensor([item[2] for item in datas]).to(self.device) #序列真实长度 mask = torch.LongTensor([item[3] for item in datas]).to(self.device) #序列真实长度 return (x,seq_len,mask),y def __iter__(self): return self def __len__(self): if self.residue: return self.n_batches else: return self.n_batches+1
迭代器的原理是按顺序在数据集中每次返回batch_size个数据,如果最后的数据量不足batch_size,则将剩余的数据全部返回;这里没有添加 shuffle,后期会改进(没有shuffle会使模型记录样本之间的先后关系)
我们也可以使用现成的 DataLoader()
函数构建数据集迭代器,这是DataLoader()函数的举例,需要根据上边的代码进行修改
def data_loader(input_ids,input_masks,input_segments,label_ids):
all_input_ids = torch.tensor(input_ids, dtype=torch.long)
all_input_mask = torch.tensor(input_masks, dtype=torch.long)
all_segment_ids = torch.tensor(input_segments, dtype=torch.long)
all_label = torch.tensor(label_ids, dtype=torch.long)
train_data = TensorDataset(all_input_ids, all_input_mask, all_segment_ids, all_label)
train_dataloader = DataLoader(train_data, batch_size=batch_size,shuffle=True)
return train_dataloader
模型训练主要由以下几步组成:
————————以下对每个样本进行的操作——————————
————————以上对每个样本进行的操作——————————
def train(config,model,train_iter,dev_iter,test_iter): """ :param config: :param model: :param train_iter: :param dev_iter: :param test_iter: :return: """ start_time=time.time() model.train() #设置为训练模式,是参数可反向更新 #启动 batchNormal 和 dropout param_optimizer=list(model.named_parameters()) # 不需要衰减的参数 no_decay=['bias','LayerNorm.bias','Layerweight'] # 指定哪些权重更新,哪些权重不更新 optimizmer_grouped_parameters=[ {'params':[p for n,p in param_optimizer if not any( nd in n for nd in no_decay)],'weight_decay':0.001}, #遍历所有参数,如果参数名字里有no_decay镍的元素则取出元素 {'params':[p for n,p in param_optimizer if any( nd in n for nd in no_decay)],'weight_decay':0.0}#遍历所有参数,如果参数名字里没有no_decay镍的元素则取出元素 ] # 配置优化器,t_total是总的迭代次数=epoch数*每个epoch中遍历完全部数据所需的迭代次数 optimizmer = BertAdam(params=optimizmer_grouped_parameters, lr=config.learning_rate, warmup=0.05, #预热学习率比例 t_total=len(train_iter)*config.num_epoch) total_batch=0 #记录进行多少batch dev_best_loss=0 #记录校验集最后的loss last_improve=0 #记录上次校验集loss下降的batch数 上一次哪个batch更新了loss flag=False #是否很久没有效果提升,停止训练 for epoch in range(config.num_epoch): print('Epoch[{}/{}]'.format(epoch+1,config.num_epoch)) for i,(trains,labels) in enumerate(train_iter): #每次取出一个 batch 的数据 更新一次梯度 outputs = model(trains) model.zero_grad() #梯度清零 loss = F.cross_entropy(outputs,labels) loss.backward() optimizmer.step() #更新参数 if total_batch%100==0: # 100个batch输出一次状态 true = labels.data.cpu() #.cpu() 表示转化为cpu的数据类型,迭代100次 计算一次准确率 predit = torch.max(outputs.data,1)[1].cpu() train_acc = metrics.accuracy_score(true,predit) # 计算准确率 dev_acc,dev_loss = evaluate(config,model,dev_iter) #计算dev的损失和准确率 if dev_loss < dev_best_loss: dev_best_loss=dev_loss torch.save(model.state_dict(),config.save_path) improve='*' last_improve=total_batch else: improve = '' time_dif=utils.get_time_dif(start_time) msg='Iter:{0:6},Train Loss:{1:5.2},Train Acc{2:>6.2},Val Loss:{3:>5.2},Val Acc:{4:>6.2},Time:{5} {6}' print(msg.format(total_batch,loss.item(),train_acc,dev_loss,dev_acc,time_dif,improve)) model.train() #不明白 total_batch+=1 if(total_batch-last_improve>config.require_improvement): # 大于 require_improvement 次没有更新loss则结束 print('已经长时间没有提升,自动退出...') flag=True break if flag: break test(config,model,test_iter)
模型测试主要分为以下几个步骤:
def test(config,model,test_iter): """ 模型测试 :param config: :param model: :param test_iter: :return: """ model.load_state_dict(torch.load(config.save_path)) model.eval() start_time=time.time() test_acc,test_loss,test_report,test_confusion =evaluate(config,model,test_iter,test=True) msg='Test Loss:{0:>5.2}, Test Acc:{1:>6,2%}' print(msg.format(test_loss,test_acc)) print("Precision,Recall and F1-Score") print('Confusion Maxtrix') print(test_confusion) time_def=utils.get_time_dif(start_time) print('使用时间:',time_def)
模型评估主要计算模型在测试集上的准确率、损失值等信息
主要步骤如下:
with torch.no_grad()
语句,使PyTorch不在记录梯度def evaluate(config,model,dev_iter,test=False): """ :param config: :param model: :param dev: :param iter: :return: """ # 在 eval模式下,dropout层会让所有的激活单元都通过,而batchnorm层会停止计算和更新mean和var,直接使用在训练阶段已经学出的mean和var值。 # model.eval() loss_total=0 predict_all= np.array([],dtype=int) labels_all= np.array([],dtype=int) with torch.no_grad(): for texts,labels in dev_iter: outputs=model(texts) loss = F.cross_entropy(outputs,labels) loss_total += loss labels=labels.data.cpu().numpy() # torch.max 返回两个值,一个每个样本最大分类类别的概率,一个是最大值对应的索引,参数1是对每行求最大值 predict = torch.max(outputs.data,1)[1].cpu().numpy() labels_all=np.append(labels_all,labels) predict_all=np.append(predict_all,predict) acc = metrics.accuracy_score(labels_all,predict_all) if test: report=metrics.classification_report(labels_all,predict_all,target_names=config.class_list,digits=4) confusion = metrics.confusion_matrix(labels_all,predict_all) return acc,loss_total / len(dev_iter), report,confusion return acc,loss_total / len(dev_iter)
执行整个过程的步骤如下:
关于bert模块两个返回值的深度解析请参考此文章 ->从源码层面,深入理解 Bert 框架
代码如下:
if __name__ == "__main__": dataset='THUCNews' # 数据地址 model_name=args.model x=import_module('models.'+model_name) #读取指定的 bert 模型 config = x.Config(dataset) #根据数据路径生成配置文件 np.random.seed(1) torch.manual_seed(1) torch.cuda.manual_seed_all(4) torch.backends.cudnn.deterministic=True start_time=time.time() #训练开始时间 print('load dataset...') train_data,dev_data,test_data=utils.build_dataset(config) #根据配置文件,生成测试集 训练集,开发集 dev_iter = utils.build_iterator(dev_data, config) #生成数据迭代器 train_iter=utils.build_iterator(train_data,config) test_iter=utils.build_iterator(test_data,config) # for i,(train,label) in enumerate(dev_iter): # print(i,label) time_dif=utils.get_time_dif(start_time) #数据准备结束 print('准备数据时间为:',time_dif) #模型训练 model=x.Model(config).to(config.device) #实例化model train.train(config,model,train_iter,dev_iter,test_iter) #训练
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。