赞
踩
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档
本人今年研0,刚学到深度学习部分,本来是打算报Datawhale AI夏令营二期CV图像方向的,但是运气不好满了,所以先来NLP试试水。也是按照课程要求报了讯飞的基于术语词典干预的机器翻译挑战赛。更详细的笔记可以参考DataWhale的官方解析Datawhale AI夏令营- 讯飞机器翻译挑战赛baseline解析本文是记录Task1 的baseline学习笔记。
机器翻译(Machine Translation,简称MT)是自然语言处理领域的一个重要分支,其目标是将一种语言的文本自动转换为另一种语言的文本。机器翻译的发展可以追溯到20世纪50年代,经历了从基于规则的方法、统计方法到深度学习方法的演变过程。
近年来,深度学习技术的快速发展推动了神经网络机器翻译(Neural Machine Translation,简称NMT)的兴起。NMT使用深度神经网络模型,如长短期记忆网络(LSTM)和 Transformer,能够自动学习源语言和目标语言之间的复杂映射关系,无需人工设计特征或规则。NMT在翻译质量、速度和适应性方面取得了显著进步,成为当前机器翻译领域的主流方法。
在机器学习和深度学习项目中,数据集通常被划分为三个部分:训练集(Training Set)、开发集(Development Set,也常被称为验证集,Validation Set)和测试集(Test Set)。这种划分的主要目的是为了评估模型的性能并防止过拟合,确保模型具有良好的泛化能力。下面是这三个数据集的具体作用:
基于术语词典干预的机器翻译挑战赛选择以英文为源语言,中文为目标语言的机器翻译。本次大赛除英文到中文的双语数据,还提供英中对照的术语词典。参赛队伍需要基于提供的训练数据样本从多语言机器翻译模型的构建与训练,并基于测试集以及术语词典,提供最终的翻译结果,数据包括:
·训练集:双语数据:中英14万余双语句对
·开发集:英中1000双语句对
·测试集:英中1000双语句对
·术语词典:英中2226条
对于参赛队伍提交的测试集翻译结果文件,采用自动评价指标BLUE-4进行评价,具体工具使用sacrebleu开源版本。
下面是训练集train.txt的截图,需要训练数据也可以去讯飞的比赛官网下载
讲GRU模型前我们先来了解一下RNN(循环神经网络)
RNN模型在每个时间步(小绿框)都会接收一个字的输入x,生成隐藏状态h和输出,再将隐藏状态和下一个字输入到模型中,然后重复该过程
而GRU模型是RNN模型的一种变体,区别在于GRU的隐藏层中引入了门控单元,能够有效捕捉长序列语义关联,缓解梯度消失或爆炸现象,其核心结构由更新门和重置门两部分组成。
(1)首先我们来看第一个公式,该公式表示的是t时刻的更新门,更新门决定使用多少历史信息和当前信息来更新当前隐含状态。
z
t
z_{t}
zt是门控更新信号,
z
t
z_{t}
zt的大小决定了候选隐藏状态的记忆的程度,
h
t
−
1
h_{t-1}
ht−1为历史隐藏状态,
x
t
x_{t}
xt表示t时刻输入的数据,
w
z
w_{z}
wz为权重矩阵,σ是sigmoid函数。
(2)第二个公式表示t时刻的重置门,重置门决定保留多少历史信息。
r
t
r_{t}
rt表示重置信号,值越大说明需要记住的历史信息量越多,
w
r
w_{r}
wr为权重矩阵
(3)在更新门
z
t
z_{t}
zt和重置门
r
t
r_{t}
rt的作用下,当前时刻候选隐含状态为公式3,隐含输出状态
h
t
h_{t}
ht可更新为公式4
上式中,候选隐含状态负责融合输入数据和历史数据的信息特征,该操作与重置门得到的重置信号
r
t
r_{t}
rt有关。而
h
t
h_{t}
ht代表当前时刻最终单元状态,其包括遗忘和记忆两个过程,(1-
z
t
z_{t}
zt)与上时刻隐含状态
h
t
−
1
h_{t-1}
ht−1的乘积表示遗忘过程,
z
t
z_{t}
zt越接近1,则将遗忘上时刻越多信息。
z
t
z_{t}
zt与候选隐含状态的乘积表示记忆过程,
z
t
z_{t}
zt大小决定了候选隐含状态的记忆程度,也就是保留之前多少的隐含状态。以上过程也即加入多少新记忆,就要忘记多少老记忆。
Seq2Seq由两个结构组成,分别是Encoder和Decoder块
Seq2Seq模型由两个主要部分组成:Encoder(编码器)和Decoder(解码器),两者均为GRU网络。以下是该模型的详细介绍:
Encoder
从图中可看出:
Decoder
从图中可看出:
下列代码为具体如何设计Seq2Seq模型,我在官方笔记的基础上加了一些自己理解的注释,可能有不对的地方,欢迎指出。
# 定义GRU模型的Encoder编码器 class Encoder(nn.Module): def __init__(self, input_dim, emb_dim, hid_dim, n_layers, dropout): # 初始化父类(必须) super().__init__() # 模型接收输入序列,使用Embedding对输入序列进行维度变化 self.embedding = nn.Embedding(input_dim, emb_dim) # 使用GRU对数据进行编码 """ GRU网络:输入序列逐步传递给GRU单元,每个输入 xi生成相应的隐藏状态hi 1、第一个GRU单元接收x1,生成隐藏状态h1 2、第二个GRU单元接收x2和h1,生成隐藏状态h2 以此类推,直到最后输出hn作为上下文向量c,用于解码阶段 """ self.rnn = nn.GRU(emb_dim, hid_dim, n_layers, dropout=dropout, batch_first=True) # nn.Dropout随机丢弃层,在训练阶段按某种概率随即将输入的张量元素随机归零,用于防止网络过拟合 self.dropout = nn.Dropout(dropout) # 前向传播 def forward(self, src): # src shape: [batch_size, src_len] # nn.Dropout随机丢弃层,在训练阶段按某种概率随即将输入的张量元素随机归零,用于防止网络过拟合 embedded = self.dropout(self.embedding(src)) # embedded shape: [batch_size, src_len, emb_dim] # 隐藏层变换 outputs, hidden = self.rnn(embedded) # outputs shape: [batch_size, src_len, hid_dim] # hidden shape: [n_layers, batch_size, hid_dim] return outputs, hidden # 定义GRU模型的Decoder解码器 class Decoder(nn.Module): def __init__(self, output_dim, emb_dim, hid_dim, n_layers, dropout): # 初始化父类(必须) super().__init__() # 定义输出数据的维度(全连接层的神经元个数) self.output_dim = output_dim # 对数据进行维度处理 self.embedding = nn.Embedding(output_dim, emb_dim) """ 解码器的第一个GRU单元接收上下文向量c和初始隐藏状态,生成第一个隐藏状态h1'和输出y1 第二个GRU单元接收h1',生成第二个隐藏状态h2'和输出y2 依次类推,直到生成最后一个隐藏状态hn'和输出yn """ self.rnn = nn.GRU(emb_dim, hid_dim, n_layers, dropout=dropout, batch_first=True) # 定义全连接层(对数据进行线性变换),输入神经元个数为隐藏层维度,输出神经元个数为输出层维度 self.fc_out = nn.Linear(hid_dim, output_dim) # nn.Dropout随机丢弃层,在训练阶段按某种概率随即将输入的张量元素随机归零,用于防止网络过拟合 self.dropout = nn.Dropout(dropout) # 前向传播 def forward(self, input, hidden): # input shape: [batch_size, 1] # hidden shape: [n_layers, batch_size, hid_dim] # nn.Dropout随机丢弃层,在训练阶段按某种概率随即将输入的张量元素随机归零,用于防止网络过拟合 embedded = self.dropout(self.embedding(input)) # embedded shape: [batch_size, 1, emb_dim] # 隐藏层变换 output, hidden = self.rnn(embedded, hidden) # output shape: [batch_size, 1, hid_dim] # hidden shape: [n_layers, batch_size, hid_dim] # 全连接层预测结果 prediction = self.fc_out(output.squeeze(1)) # prediction shape: [batch_size, output_dim] return prediction, hidden class Seq2Seq(nn.Module): def __init__(self, encoder, decoder, device): super().__init__() self.encoder = encoder self.decoder = decoder self.device = device def forward(self, src, trg, teacher_forcing_ratio=0.5): # src shape: [batch_size, src_len] # trg shape: [batch_size, trg_len] batch_size = src.shape[0] trg_len = trg.shape[1] trg_vocab_size = self.decoder.output_dim outputs = torch.zeros(batch_size, trg_len, trg_vocab_size).to(self.device) _, hidden = self.encoder(src) input = trg[:, 0].unsqueeze(1) # Start token for t in range(1, trg_len): output, hidden = self.decoder(input, hidden) outputs[:, t, :] = output teacher_force = random.random() < teacher_forcing_ratio top1 = output.argmax(1) input = trg[:, t].unsqueeze(1) if teacher_force else top1.unsqueeze(1) return outputs
代码如下(示例):
# 定义数据集类 # 修改TranslationDataset类以处理术语 class TranslationDataset(Dataset): def __init__(self, filename, terminology): self.data = [] # 读入数据集 with open(filename, 'r', encoding='utf-8') as f: for line in f: # 将中英文文本分别按'\t'分割存储 en, zh = line.strip().split('\t') self.data.append((en, zh)) # 加载术语词典 self.terminology = terminology # 创建词汇表,注意这里需要确保术语词典中的词也被包含在词汇表中 self.en_tokenizer = get_tokenizer('basic_english') self.zh_tokenizer = list # 使用字符级分词 en_vocab = Counter(self.terminology.keys()) # 确保术语在词汇表中 zh_vocab = Counter() for en, zh in self.data: en_vocab.update(self.en_tokenizer(en)) zh_vocab.update(self.zh_tokenizer(zh)) # 添加术语到词汇表 self.en_vocab = ['<pad>', '<sos>', '<eos>'] + list(self.terminology.keys()) + [word for word, _ in en_vocab.most_common(10000)] self.zh_vocab = ['<pad>', '<sos>', '<eos>'] + [word for word, _ in zh_vocab.most_common(10000)] self.en_word2idx = {word: idx for idx, word in enumerate(self.en_vocab)} self.zh_word2idx = {word: idx for idx, word in enumerate(self.zh_vocab)} def __len__(self): return len(self.data) def __getitem__(self, idx): en, zh = self.data[idx] en_tensor = torch.tensor([self.en_word2idx.get(word, self.en_word2idx['<sos>']) for word in self.en_tokenizer(en)] + [self.en_word2idx['<eos>']]) zh_tensor = torch.tensor([self.zh_word2idx.get(word, self.zh_word2idx['<sos>']) for word in self.zh_tokenizer(zh)] + [self.zh_word2idx['<eos>']]) return en_tensor, zh_tensor def collate_fn(batch): en_batch, zh_batch = [], [] for en_item, zh_item in batch: en_batch.append(en_item) zh_batch.append(zh_item) # 对英文和中文序列分别进行填充,统一两个序列的长度 en_batch = nn.utils.rnn.pad_sequence(en_batch, padding_value=0, batch_first=True) zh_batch = nn.utils.rnn.pad_sequence(zh_batch, padding_value=0, batch_first=True) return en_batch, zh_batch # 新增术语词典加载部分 def load_terminology_dictionary(dict_file): terminology = {} with open(dict_file, 'r', encoding='utf-8') as f: for line in f: en_term, ch_term = line.strip().split('\t') terminology[en_term] = ch_term return terminology # 加载文本中的句子 def load_sentences(file_path: str) -> List[str]: with open(file_path, 'r', encoding='utf-8') as f: return [line.strip() for line in f] # 更新translate_sentence函数以考虑术语词典 def translate_sentence(sentence: str, model: Seq2Seq, dataset: TranslationDataset, terminology, device: torch.device, max_length: int = 50): model.eval() tokens = dataset.en_tokenizer(sentence) tensor = torch.LongTensor([dataset.en_word2idx.get(token, dataset.en_word2idx['<sos>']) for token in tokens]).unsqueeze(0).to(device) # [1, seq_len] with torch.no_grad(): _, hidden = model.encoder(tensor) translated_tokens = [] input_token = torch.LongTensor([[dataset.zh_word2idx['<sos>']]]).to(device) # [1, 1] for _ in range(max_length): output, hidden = model.decoder(input_token, hidden) top_token = output.argmax(1) translated_token = dataset.zh_vocab[top_token.item()] if translated_token == '<eos>': break # 如果翻译的词在术语词典中,则使用术语词典中的词 if translated_token in terminology.values(): for en_term, ch_term in terminology.items(): if translated_token == ch_term: translated_token = en_term break translated_tokens.append(translated_token) input_token = top_token.unsqueeze(1) # [1, 1] return ''.join(translated_tokens)
# 训练函数 def train(model, iterator, optimizer, criterion, clip): model.train() epoch_loss = 0 for i, (src, trg) in enumerate(iterator): # to(device)设置再GPU上训练 src, trg = src.to(device), trg.to(device) # 将模型的参数梯度初始化为0,不然每次梯度会累加 optimizer.zero_grad() # 调用模型 output = model(src, trg) # 设置全连接层的神经元个数 output_dim = output.shape[-1] output = output[:, 1:].contiguous().view(-1, output_dim) trg = trg[:, 1:].contiguous().view(-1) # 通过交叉熵计算损失,后面会指定 loss = criterion(output, trg) # 反向传播 loss.backward() # 梯度裁剪 torch.nn.utils.clip_grad_norm_(model.parameters(), clip) # 更新参数 optimizer.step() epoch_loss += loss.item() return epoch_loss / len(iterator) # 主函数 if __name__ == '__main__': start_time = time.time() # 开始计时 # 检查GPU是否可用 device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') #terminology = load_terminology_dictionary('../dataset/en-zh.dic') terminology = load_terminology_dictionary('../dataset/en-zh.dic') # 加载数据 dataset = TranslationDataset('../dataset/train.txt',terminology = terminology) print(len(dataset)) # 选择数据集的前N个样本进行训练 N = 1000 #int(len(dataset) * 1) # 或者你可以设置为数据集大小的一定比例,如 int(len(dataset) * 0.1) subset_indices = list(range(N)) subset_dataset = Subset(dataset, subset_indices) train_loader = DataLoader(subset_dataset, batch_size=32, shuffle=True, collate_fn=collate_fn) # 定义模型参数 INPUT_DIM = len(dataset.en_vocab) OUTPUT_DIM = len(dataset.zh_vocab) ENC_EMB_DIM = 256 DEC_EMB_DIM = 256 HID_DIM = 512 N_LAYERS = 2 ENC_DROPOUT = 0.5 DEC_DROPOUT = 0.5 # 初始化模型 enc = Encoder(INPUT_DIM, ENC_EMB_DIM, HID_DIM, N_LAYERS, ENC_DROPOUT) dec = Decoder(OUTPUT_DIM, DEC_EMB_DIM, HID_DIM, N_LAYERS, DEC_DROPOUT) model = Seq2Seq(enc, dec, device).to(device) # 定义优化器和损失函数 optimizer = optim.Adam(model.parameters()) criterion = nn.CrossEntropyLoss(ignore_index=dataset.zh_word2idx['<pad>']) # 训练模型 N_EPOCHS = 10 CLIP = 1 for epoch in range(N_EPOCHS): train_loss = train(model, train_loader, optimizer, criterion, CLIP) print(f'Epoch: {epoch+1:02} | Train Loss: {train_loss:.3f}') # 在训练循环结束后保存模型 torch.save(model.state_dict(), './translation_model_GRU.pth') end_time = time.time() # 结束计时 # 计算并打印运行时间 elapsed_time_minute = (end_time - start_time)/60 print(f"Total running time: {elapsed_time_minute:.2f} minutes")
我们先用1000个数据作为训练集,训练循环为10次来进行训练(148363是总的数据量)
可以看到数据量小且训练次数也少的时候,损失很大。
我们用BLEU来评估一下
# 评估方法 def evaluate_bleu(model: Seq2Seq, dataset: TranslationDataset, src_file: str, ref_file: str, terminology,device: torch.device): model.eval() src_sentences = load_sentences(src_file) ref_sentences = load_sentences(ref_file) translated_sentences = [] for src in src_sentences: translated = translate_sentence(src, model, dataset, terminology, device) translated_sentences.append(translated) bleu = BLEU() score = bleu.corpus_score(translated_sentences, [ref_sentences]) return score # 主函数 if __name__ == '__main__': device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') # 加载术语词典 terminology = load_terminology_dictionary('../dataset/en-zh.dic') # 创建数据集实例时传递术语词典 dataset = TranslationDataset('../dataset/train.txt', terminology) # 定义模型参数 INPUT_DIM = len(dataset.en_vocab) OUTPUT_DIM = len(dataset.zh_vocab) ENC_EMB_DIM = 256 DEC_EMB_DIM = 256 HID_DIM = 512 N_LAYERS = 2 ENC_DROPOUT = 0.5 DEC_DROPOUT = 0.5 # 初始化模型 enc = Encoder(INPUT_DIM, ENC_EMB_DIM, HID_DIM, N_LAYERS, ENC_DROPOUT) dec = Decoder(OUTPUT_DIM, DEC_EMB_DIM, HID_DIM, N_LAYERS, DEC_DROPOUT) model = Seq2Seq(enc, dec, device).to(device) # 加载训练好的模型 model.load_state_dict(torch.load('./translation_model_GRU.pth')) # 评估BLEU分数 bleu_score = evaluate_bleu(model, dataset, '../dataset/dev_en.txt', '../dataset/dev_zh.txt', terminology = terminology,device = device) print(f'BLEU-4 score: {bleu_score.score:.2f}')
可以看到BLEU-4的分数几乎为0
def inference(model: Seq2Seq, dataset: TranslationDataset, src_file: str, save_dir:str, terminology, device: torch.device): model.eval() src_sentences = load_sentences(src_file) translated_sentences = [] for src in src_sentences: translated = translate_sentence(src, model, dataset, terminology, device) #print(translated) translated_sentences.append(translated) #print(translated_sentences) # 将列表元素连接成一个字符串,每个元素后换行 text = '\n'.join(translated_sentences) # 打开一个文件,如果不存在则创建,'w'表示写模式 with open(save_dir, 'w', encoding='utf-8') as f: # 将字符串写入文件 f.write(text) #return translated_sentences # 主函数 if __name__ == '__main__': device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') # 加载术语词典 terminology = load_terminology_dictionary('../dataset/en-zh.dic') # 加载数据集和模型 dataset = TranslationDataset('../dataset/train.txt',terminology = terminology) # 定义模型参数 INPUT_DIM = len(dataset.en_vocab) OUTPUT_DIM = len(dataset.zh_vocab) ENC_EMB_DIM = 256 DEC_EMB_DIM = 256 HID_DIM = 512 N_LAYERS = 2 ENC_DROPOUT = 0.5 DEC_DROPOUT = 0.5 # 初始化模型 enc = Encoder(INPUT_DIM, ENC_EMB_DIM, HID_DIM, N_LAYERS, ENC_DROPOUT) dec = Decoder(OUTPUT_DIM, DEC_EMB_DIM, HID_DIM, N_LAYERS, DEC_DROPOUT) model = Seq2Seq(enc, dec, device).to(device) # 加载训练好的模型 model.load_state_dict(torch.load('./translation_model_GRU.pth')) save_dir = '../dataset/submit.txt' inference(model, dataset, src_file="../dataset/test_en.txt", save_dir = save_dir, terminology = terminology, device = device) print(f"翻译完成!文件已保存到{save_dir}")
看一眼结果文件
恭喜,那么我们的第一个人工智障已经训练完毕啦!!
开个小玩笑。。
我们先提交到讯飞上看看评估结果:
分数很低对吧,让我们调高训练集数量和训练次数
把训练集数量改为
训练次数改为100次
可以看到损失下降到了3.681,BLEU-4评分为
最后结果也稍微好了一点点吧,虽然还是一个人工智障,只是智商比之前高了,当然,这是好事情
但是好消息是讯飞的评分提高了很多,有0.9202,这是一个相当不错的成绩了
以上是我目前结果,可以看到增加训练集数据量和训练循环次数能够比较好得提高翻译的质量,当然这肯定是有一个上限的,本来还有一个数据清洗的部分,但是我还没写完,就留到后面task2再发吧。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。