赞
踩
目前,神经机器翻译(NMT)技术在翻译质量和速度方面已经取得了显著进展。然而,在特定领域或行业中,NMT仍面临一些挑战,尤其是在术语一致性方面。对于术语名词、人名地名等特定词汇,机器翻译经常会出现不准确的结果,这会导致翻译的混淆或歧义。通过引入术语词典,可以纠正这些错误,从而提高翻译质量。
本次比赛的任务是基于术语词典干预的英文到中文的机器翻译。大赛提供了以下数据:
参赛队伍需要使用提供的训练数据,构建并训练多语言机器翻译模型,并基于测试集和术语词典,提供最终的翻译结果。
所有文件均为UTF-8编码,训练集、开发集、测试集和术语词典的格式如下:
训练集:每行为一个句对样本,格式如图1所示。
示例:
图1 训练集格式
术语词典格式如图2所示。
图2 术语词典格式
对于参赛队伍提交的测试集翻译结果文件,采用自动评价指标BLUE-4进行评价,具体工具使用sacrebleu开源版本。
- # 安装torchtext
- !pip install torchtext
-
- import torch
- import torch.nn as nn
- import torch.optim as optim
- from torch.utils.data import Dataset, DataLoader
- from torchtext.data.utils import get_tokenizer
- from collections import Counter
- import random
- from torch.utils.data import Subset, DataLoader
- import time
-
- # 定义数据集类,处理术语词典
- class TranslationDataset(Dataset):
- def __init__(self, filename, terminology):
- self.data = []
- with open(filename, 'r', encoding='utf-8') as f:
- for line in f:
- en, zh = line.strip().split('\t')
- self.data.append((en, zh))
-
- self.terminology = terminology
-
- # 创建词汇表,确保术语词典中的词也被包含在词汇表中
- self.en_tokenizer = get_tokenizer('basic_english')
- self.zh_tokenizer = list # 使用字符级分词
-
- en_vocab = Counter(self.terminology.keys()) # 确保术语在词汇表中
- zh_vocab = Counter()
-
- for en, zh in self.data:
- en_vocab.update(self.en_tokenizer(en))
- zh_vocab.update(self.zh_tokenizer(zh))
-
- self.en_vocab = ['<pad>', '<sos>', '<eos>'] + list(self.terminology.keys()) + [word for word, _ in en_vocab.most_common(10000)]
- self.zh_vocab = ['<pad>', '<sos>', '<eos>'] + [word for word, _ in zh_vocab.most_common(10000)]
-
- self.en_word2idx = {word: idx for idx, word in enumerate(self.en_vocab)}
- self.zh_word2idx = {word: idx for idx, word in enumerate(self.zh_vocab)}
-
- def __len__(self):
- return len(self.data)
-
- def __getitem__(self, idx):
- en, zh = self.data[idx]
- en_tensor = torch.tensor([self.en_word2idx.get(word, self.en_word2idx['<sos>']) for word in self.en_tokenizer(en)] + [self.en_word2idx['<eos>']])
- zh_tensor = torch.tensor([self.zh_word2idx.get(word, self.zh_word2idx['<sos>']) for word in self.zh_tokenizer(zh)] + [self.zh_word2idx['<eos>']])
- return en_tensor, zh_tensor
-
- def collate_fn(batch):
- en_batch, zh_batch = [], []
- for en_item, zh_item in batch:
- en_batch.append(en_item)
- zh_batch.append(zh_item)
-
- en_batch = nn.utils.rnn.pad_sequence(en_batch, padding_value=0, batch_first=True)
- zh_batch = nn.utils.rnn.pad_sequence(zh_batch, padding_value=0, batch_first=True)
-
- return en_batch, zh_batch
-
- class Encoder(nn.Module):
- def __init__(self, input_dim, emb_dim, hid_dim, n_layers, dropout):
- super().__init__()
- self.embedding = nn.Embedding(input_dim, emb_dim)
- self.rnn = nn.GRU(emb_dim, hid_dim, n_layers, dropout=dropout, batch_first=True)
- self.dropout = nn.Dropout(dropout)
-
- def forward(self, src):
- embedded = self.dropout(self.embedding(src))
- outputs, hidden = self.rnn(embedded)
- return outputs, hidden
-
- class Decoder(nn.Module):
- def __init__(self, output_dim, emb_dim, hid_dim, n_layers, dropout):
- super().__init__()
- self.output_dim = output_dim
- self.embedding = nn.Embedding(output_dim, emb_dim)
- self.rnn = nn.GRU(emb_dim, hid_dim, n_layers, dropout=dropout, batch_first=True)
- self.fc_out = nn.Linear(hid_dim, output_dim)
- self.dropout = nn.Dropout(dropout)
-
- def forward(self, input, hidden):
- embedded = self.dropout(self.embedding(input))
- output, hidden = self.rnn(embedded, hidden)
- prediction = self.fc_out(output.squeeze(1))
- return prediction, hidden
-
- class Seq2Seq(nn.Module):
- def __init__(self, encoder, decoder, device):
- super().__init__()
- self.encoder = encoder
- self.decoder = decoder
- self.device = device
-
- def forward(self, src, trg, teacher_forcing_ratio=0.5):
- batch_size = src.shape[0]
- trg_len = trg.shape[1]
- trg_vocab_size = self.decoder.output_dim
-
- outputs = torch.zeros(batch_size, trg_len, trg_vocab_size).to(self.device)
-
- _, hidden = self.encoder(src)
-
- input = trg[:, 0].unsqueeze(1)
-
- for t in range(1, trg_len):
- output, hidden = self.decoder(input, hidden)
- outputs[:, t, :] = output
- teacher_force = random.random() < teacher_forcing_ratio
- top1 = output.argmax(1)
- input = trg[:, t].unsqueeze(1) if teacher_force else top1.unsqueeze(1)
-
- return outputs
-
- def load_terminology_dictionary(dict_file):
- terminology = {}
- with open(dict_file, 'r', encoding='utf-8') as f:
- for line in f:
- en_term, ch_term = line.strip().split('\t')
- terminology[en_term] = ch_term
- return terminology
-
- def train(model, iterator, optimizer, criterion, clip):
- model.train()
- epoch_loss = 0
- for i, (src, trg) in enumerate(iterator):
- src, trg = src.to(device), trg.to(device)
- optimizer.zero_grad()
- output = model(src, trg)
- output_dim = output.shape[-1]
- output = output[:, 1:].contiguous().view(-1, output_dim)
- trg = trg[:, 1:].contiguous().view(-1)
- loss = criterion(output, trg)
- loss.backward()
- torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
- optimizer.step()
- epoch_loss += loss.item()
- return epoch_loss / len(iterator)
-
- def evaluate_bleu(model, dataset, src_file, ref_file, terminology, device):
- model.eval()
- src_sentences = load_sentences(src_file)
- ref_sentences = load_sentences(ref_file)
-
- translated_sentences = []
- for src in src_sentences:
- translated = translate_sentence(src, model, dataset, terminology, device)
- translated_sentences.append(translated)
-
- bleu = BLEU()
- score = bleu.corpus_score(translated_sentences, [ref_sentences])
-
- return score
-
- def translate_sentence(sentence, model, dataset, terminology, device, max_length=50):
- model.eval()
- tokens = dataset.en_tokenizer(sentence)
- tensor = torch.LongTensor([dataset.en_word2idx.get(token, dataset.en_word2idx['<sos>']) for token in tokens]).unsqueeze(0).to(device)
-
- with torch.no_grad():
- _, hidden = model.encoder(tensor)
-
- translated_tokens = []
- input_token = torch.LongTensor([[dataset.zh_word2idx['<sos>']]]).to(device)
-
- for _ in range(max_length):
- output, hidden = model.decoder(input_token, hidden)
- top_token = output.argmax(1)
- translated_token = dataset.zh_vocab[top_token.item()]
-
- if translated_token == '<eos>':
- break
-
- if translated_token in terminology.values():
- for en_term, ch_term in terminology.items():
- if translated_token == ch_term:
- translated_token = en_term
- break
-
- translated_tokens.append(translated_token)
- input_token = top_token.unsqueeze(1)
-
- return ''.join(translated_tokens)
-
- def inference(model, dataset, src_file, save_dir, terminology, device):
- model.eval()
- src_sentences = load_sentences(src_file)
-
- translated_sentences = []
- for src in src_sentences:
- translated = translate_sentence(src, model, dataset, terminology, device)
- translated_sentences.append(translated)
-
- text = '\n'.join(translated_sentences)
- with open(save_dir, 'w', encoding='utf-8') as f:
- f.write(text)
-
- def load_sentences(file_path):
- with open(file_path, 'r', encoding='utf-8') as f:
- return [line.strip() for line in f]
-
- # 主函数
- if __name__ == '__main__':
- start_time = time.time()
-
- device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
-
- # 加载术语词典
- terminology = load_terminology_dictionary('../dataset/en-zh.dic')
-
- # 加载数据
- dataset = TranslationDataset('../dataset/train.txt', terminology)
- N = 1000
- subset_indices = list(range(N))
- subset_dataset = Subset(dataset, subset_indices)
- train_loader = DataLoader(subset_dataset, batch_size=32, shuffle=True, collate_fn=collate_fn)
-
- INPUT_DIM = len(dataset.en_vocab)
- OUTPUT_DIM = len(dataset.zh_vocab)
- ENC_EMB_DIM = 256
- DEC_EMB_DIM = 256
- HID_DIM = 512
- N_LAYERS = 2
- ENC_DROPOUT = 0.5
- DEC_DROPOUT = 0.5
-
- enc = Encoder(INPUT_DIM, ENC_EMB_DIM, HID_DIM, N_LAYERS, ENC_DROPOUT)
- dec = Decoder(OUTPUT_DIM, DEC_EMB_DIM, HID_DIM, N_LAYERS, DEC_DROPOUT)
- model = Seq2Seq(enc, dec, device).to(device)
-
- optimizer = optim.Adam(model.parameters())
- criterion = nn.CrossEntropyLoss(ignore_index=dataset.zh_word2idx['<pad>'])
-
- N_EPOCHS = 10
- CLIP = 1
-
- for epoch in range(N_EPOCHS):
- train_loss = train(model, train_loader, optimizer, criterion, CLIP)
- print(f'Epoch: {epoch+1:02} | Train Loss: {train_loss:.3f}')
-
- torch.save(model.state_dict(), './translation_model_GRU.pth')
-
- end_time = time.time()
- elapsed_time_minute = (end_time - start_time)/60
- print(f"Total running time: {elapsed_time_minute:.2f} minutes")
-
- bleu_score = evaluate_bleu(model, dataset, '../dataset/dev_en.txt', '../dataset/dev_zh.txt', terminology, device)
- print(f'BLEU-4 score: {bleu_score.score:.2f}')
-
- save_dir = '../dataset/submit.txt'
- inference(model, dataset, "../dataset/test_en.txt", save_dir, terminology, device)
- print(f"翻译完成!文件已保存到{save_dir}")
- # 选择数据集的前N个样本进行训练
- N = int(len(dataset) * 0.8)
- # 训练模型
- N_EPOCHS = 100
如果你觉得这篇博文对你有帮助,请点赞、收藏、关注我,并且可以打赏支持我!
欢迎关注我的后续博文,我将分享更多关于人工智能、自然语言处理和计算机视觉的精彩内容。
谢谢大家的支持!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。