赞
踩
前面我们介绍了Seq2Seq的编码器、解码器架构,其中使用的GRU网络,并且网络中加入了注意力机制(Attention Mechanism)。今天我们的任务是了解Transformer模型,并基于Transformer模型实现在机器翻译任务上的应用!本文将带你了解Transformer的关键模块(位置编码、多头注意力机制、残差网络和层标准化等),并通过实现代码详细讲解如何使用Transformer进行机器翻译。
基于循环或卷积神经网络的序列到序列建模方法是现存机器翻译任务中的经典方法。然而,它们在建模文本长程依赖方面都存在一定的局限性。
卷积神经网络:受限的上下文窗口在建模长文本方面天然地存在不足。如果要对长距离依赖进行描述,需要多层卷积操作,而且不同层之间信息传递也可能有损失,这些都限制了模型的能力。
循环神经网络:上下文的语义依赖是通过维护循环单元中的隐状态实现的。在编码过程中,每一个时间步的输入建模都涉及到对隐藏状态的修改。随着序列长度的增加,编码在隐藏状态中的序列早期的上下文信息被逐渐遗忘。尽管注意力机制的引入在一定程度上缓解了这个问题,但循环网络在编码效率方面仍存在很大的不足之处。
为了更好地描述文字序列,谷歌的研究人员在2017年提出了一种新的模型——Transformer。Transformer模型摒弃了循环结构,并完全通过注意力机制完成对源语言序列和目标语言序列全局依赖的建模。这种高度可并行化的编码过程使得模型的运行变得十分高效。当前几乎大部分的大语言模型都是基于Transformer结构,本节将以应用于机器翻译的基于Transformer的编码器和解码器介绍该模型。
Transformer的主要组件包括编码器(Encoder)、解码器(Decoder)和注意力层。其核心是利用多头自注意力机制(Multi-Head Self-Attention),使每个位置的表示不仅依赖于当前位置,还能够直接获取其他位置的表示。自从提出以来,Transformer模型在机器翻译、文本生成等自然语言处理任务中均取得了突破性进展,成为NLP领域新的主流模型。
对于输入文本序列,先通过一个输入嵌入层(Input Embedding)将每个单词转换为其相对应的向量表示。由于Transformer模型不再使用基于循环的方式建模文本输入,序列中不再有任何信息能够提示模型单词之间的相对位置关系。因此,在送入编码器端建模其上下文语义之前,一个非常重要的操作是在词嵌入中加入位置编码(Positional Encoding)这一特征。位置编码通过正弦和余弦函数来表示。
- class PositionalEncoding(nn.Module):
- def __init__(self, d_model, dropout=0.1, max_len=5000):
- super(PositionalEncoding, self).__init__()
- self.dropout = nn.Dropout(p=dropout)
-
- pe = torch.zeros(max_len, d_model)
- position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
- div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
- pe[:, 0::2] = torch.sin(position * div_term)
- pe[:, 1::2] = torch.cos(position * div_term)
- pe = pe.unsqueeze(0).transpose(0, 1)
- self.register_buffer('pe', pe)
-
- def forward(self, x):
- x = x + self.pe[:x.size(0), :]
- return self.dropout(x)

自注意力(Self-Attention)操作是基于Transformer的机器翻译模型的基本操作,在源语言的编码和目标语言的生成中频繁地被使用以建模源语言、目标语言任意两个单词之间的依赖关系。
- def attention(query, key, value, mask=None, dropout=None):
- d_k = query.size(-1)
- scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(d_k)
- if mask is not None:
- scores = scores.masked_fill(mask == 0, -1e9)
- p_attn = F.softmax(scores, dim=-1)
- if dropout is not None:
- p_attn = dropout(p_attn)
- return torch.matmul(p_attn, value), p_attn
多头注意力机制(Multi-Head Attention)是Transformer模型的核心创新之一,它通过多个并行的注意力头来捕捉不同的注意力模式。每个头都是独立的自注意力机制,最终将这些头的输出进行拼接。
- class MultiHeadedAttention(nn.Module):
- def __init__(self, h, d_model, dropout=0.1):
- super(MultiHeadedAttention, self).__init__()
- assert d_model % h == 0
- self.d_k = d_model // h
- self.h = h
- self.linears = clones(nn.Linear(d_model, d_model), 4)
- self.attn = None
- self.dropout = nn.Dropout(p=dropout)
-
- def forward(self, query, key, value, mask=None):
- if mask is not None:
- mask = mask.unsqueeze(1)
- nbatches = query.size(0)
- query, key, value = [l(x).view(nbatches, -1, self.h, self.d_k).transpose(1, 2)
- for l, x in zip(self.linears, (query, key, value))]
- x, self.attn = attention(query, key, value, mask=mask, dropout=self.dropout)
- x = x.transpose(1, 2).contiguous().view(nbatches, -1, self.h * self.d_k)
- return self.linears[-1](x)

前馈层(Feed Forward)接受自注意力子层的输出作为输入,并通过一个带有Relu激活函数的两层全连接网络对输入进行更加复杂的非线性变换。
- class PositionwiseFeedForward(nn.Module):
- def __init__(self, d_model, d_ff, dropout=0.1):
- super(PositionwiseFeedForward, self).__init__()
- self.w_1 = nn.Linear(d_model, d_ff)
- self.w_2 = nn.Linear(d_ff, d_model)
- self.dropout = nn.Dropout(dropout)
-
- def forward(self, x):
- return self.w_2(self.dropout(F.relu(self.w_1(x))))
残差连接和层归一化技术被引入到每个Transformer块中,以提升训练的稳定性。
- class SublayerConnection(nn.Module):
- def __init__(self, size, dropout):
- super(SublayerConnection, self).__init__()
- self.norm = nn.LayerNorm(size)
- self.dropout = nn.Dropout(dropout)
-
- def forward(self, x, sublayer):
- return x + self.dropout(sublayer(self.norm(x)))
以下是关于如何基于Transformer实现机器翻译任务的完整教程,包括环境配置、数据预处理、模型构建、训练、评价及使用术语词典进行后处理的详细步骤。
首先,我们需要安装必要的Python包:
- # 安装必要的Python包
- !pip install torchtext jieba sacrebleu
- !pip install -U pip setuptools wheel
- !pip install -U 'spacy[cuda12x]'
- !python -m spacy download en_core_web_sm
- import torch
- import torch.nn as nn
- import torch.nn.functional as F
- import torch.optim as optim
- from torch.utils.data import Dataset, DataLoader
- from torchtext.data.utils import get_tokenizer
- import jieba
- import random
- import math
-
- # 定义tokenizer
- en_tokenizer = get_tokenizer('spacy', language='en_core_web_sm')
- zh_tokenizer = lambda x: list(jieba.cut(x)) # 使用jieba分词
- # 读取数据函数
- def read_data(file_path: str) -> List[str]:
- with open(file_path, 'r', encoding='utf-8') as f:
- return [line.strip() for line in f]
-
- # 数据预处理函数
- def preprocess_data(en_data: List[str], zh_data: List[str]) -> List[Tuple[List[str], List[str]]]:
- processed_data = []
- for en, zh in zip(en_data, zh_data):
- en_tokens = en_tokenizer(en.lower())[:100]
- zh_tokens = zh_tokenizer(zh)[:100]
- if en_tokens and zh_tokens: # 确保两个序列都不为空
- processed_data.append((en_tokens, zh_tokens))
- return processed_data
-
- # 构建词汇表
- def build_vocab(data: List[Tuple[List[str], List[str]]]):
- en_vocab = build_vocab_from_iterator(
- (en for en, _ in data),
- specials=['<unk>', '<pad>', '<bos>', '<eos>']
- )
- zh_vocab = build_vocab_from_iterator(
- (zh for _, zh in data),
- specials=['<unk>', '<pad>', '<bos>', '<eos>']
- )
- en_vocab.set_default_index(en_vocab['<unk>'])
- zh_vocab.set_default_index(zh_vocab['<unk>'])
- return en_vocab, zh_vocab

- class TranslationDataset(Dataset):
- def __init__(self, data: List[Tuple[List[str], List[str]]], en_vocab, zh_vocab):
- self.data = data
- self.en_vocab = en_vocab
- self.zh_vocab = zh_vocab
-
- def __len__(self):
- return len(self.data)
-
- def __getitem__(self, idx):
- en, zh = self.data[idx]
- en_indices = [self.en_vocab['<bos>']] + [self.en_vocab[token] for token in en] + [self.en_vocab['<eos>']]
- zh_indices = [self.zh_vocab['<bos>']] + [self.zh_vocab[token] for token in zh] + [self.zh_vocab['<eos>']]
- return en_indices, zh_indices
- def collate_fn(batch):
- en_batch, zh_batch = [], []
- for en_item, zh_item in batch:
- if en_item and zh_item:
- en_batch.append(torch.tensor(en_item))
- zh_batch.append(torch.tensor(zh_item))
- if not en_batch or not zh_batch:
- return torch.tensor([]), torch.tensor([])
-
- en_batch = nn.utils.rnn.pad_sequence(en_batch, batch_first=True, padding_value=en_vocab['<pad>'])
- zh_batch = nn.utils.rnn.pad_sequence(zh_batch, batch_first=True, padding_value=zh_vocab['<pad>'])
-
- return en_batch, zh_batch
- def load_data(train_path: str, dev_en_path: str, dev_zh_path: str, test_en_path: str):
- train_data = read_data(train_path)
- train_en, train_zh = zip(*(line.split('\t') for line in train_data))
- dev_en = read_data(dev_en_path)
- dev_zh = read_data(dev_zh_path)
- test_en = read_data(test_en_path)
-
- train_processed = preprocess_data(train_en, train_zh)
- dev_processed = preprocess_data(dev_en, dev_zh)
- test_processed = [(en_tokenizer(en.lower())[:100], []) for en in test_en if en.strip()]
-
- global en_vocab, zh_vocab
- en_vocab, zh_vocab = build_vocab(train_processed)
-
- train_dataset = TranslationDataset(train_processed, en_vocab, zh_vocab)
- dev_dataset = TranslationDataset(dev_processed, en_vocab, zh_vocab)
- test_dataset = TranslationDataset(test_processed, en_vocab, zh_vocab)
-
- train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, collate_fn=collate_fn, drop_last=True)
- dev_loader = DataLoader(dev_dataset, batch_size=32, collate_fn=collate_fn, drop_last=True)
- test_loader = DataLoader(test_dataset, batch_size=1, collate_fn=collate_fn, drop_last=True)
-
- return train_loader, dev_loader, test_loader, en_vocab, zh_vocab

- class PositionalEncoding(nn.Module):
- def __init__(self, d_model, dropout=0.1, max_len=5000):
- super(PositionalEncoding, self).__init__()
- self.dropout = nn.Dropout(p=dropout)
-
- pe = torch.zeros(max_len, d_model)
- position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
- div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
- pe[:, 0::2] = torch.sin(position * div_term)
- pe[:, 1::2] = torch.cos(position * div_term)
- pe = pe.unsqueeze(0).transpose(0, 1)
- self.register_buffer('pe', pe)
-
- def forward(self, x):
- x = x + self.pe[:x.size(0), :]
- return self.dropout(x)
-
- class TransformerModel(nn.Module):
- def __init__(self, src_vocab, tgt_vocab, d_model, nhead, num_encoder_layers, num_decoder_layers, dim_feedforward, dropout):
- super(TransformerModel, self).__init__()
- self.transformer = nn.Transformer(d_model, nhead, num_encoder_layers, num_decoder_layers, dim_feedforward, dropout)
- self.src_embedding = nn.Embedding(len(src_vocab), d_model)
- self.tgt_embedding = nn.Embedding(len(tgt_vocab), d_model)
- self.positional_encoding = PositionalEncoding(d_model, dropout)
- self.fc_out = nn.Linear(d_model, len(tgt_vocab))
- self.src_vocab = src_vocab
- self.tgt_vocab = tgt_vocab
- self.d_model = d_model
-
- def forward(self, src, tgt):
- src = src.transpose(0, 1)
- tgt = tgt.transpose(0, 1)
-
- src_mask = self.transformer.generate_square_subsequent_mask(src.size(0)).to(src.device)
- tgt_mask = self.transformer.generate_square_subsequent_mask(tgt.size(0)).to(tgt.device)
-
- src_padding_mask = (src == self.src_vocab['<pad>']).transpose(0, 1)
- tgt_padding_mask = (tgt == self.tgt_vocab['<pad>']).transpose(0, 1)
-
- src_embedded = self.positional_encoding(self.src_embedding(src) * math.sqrt(self.d_model))
- tgt_embedded = self.positional_encoding(self.tgt_embedding(tgt) * math.sqrt(self.d_model))
-
- output = self.transformer(src_embedded, tgt_embedded, src_mask, tgt_mask, None, src_padding_mask, tgt_padding_mask, src_padding_mask)
- return self.fc_out(output).transpose(0, 1)
-
- def initialize_model(src_vocab, tgt_vocab, d_model=512, nhead=8, num_encoder_layers=6, num_decoder_layers=6, dim_feedforward=2048, dropout=0.1):
- model = TransformerModel(src_vocab, tgt_vocab, d_model, nhead, num_encoder_layers, num_decoder_layers, dim_feedforward, dropout)
- return model

- def initialize_optimizer(model, learning_rate=0.001):
- return optim.Adam(model.parameters(), lr=learning_rate)
- def epoch_time(start_time, end_time):
- elapsed_time = end_time - start_time
- elapsed_mins = int(elapsed_time / 60)
- elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
- return elapsed_mins, elapsed_secs
- def train(model, iterator, optimizer, criterion, clip):
- model.train()
- epoch_loss = 0
-
- for i, batch in enumerate(iterator):
- src, tgt = batch
- if src.numel() == 0 or tgt.numel() == 0:
- continue
-
- src, tgt = src.to(DEVICE), tgt.to(DEVICE)
-
- optimizer.zero_grad()
- output = model(src, tgt[:, :-1])
-
- output_dim = output.shape[-1]
- output = output.contiguous().view(-1, output_dim)
- tgt = tgt[:, 1:].contiguous().view(-1)
-
- loss = criterion(output, tgt)
- loss.backward()
-
- clip_grad_norm_(model.parameters(), clip)
- optimizer.step()
-
- epoch_loss += loss.item()
-
- return epoch_loss / len(iterator)
-
- def evaluate(model, iterator, criterion):
- model.eval()
- epoch_loss = 0
- with torch.no_grad():
- for i, batch in enumerate(iterator):
- src, tgt = batch
- if src.numel() == 0 or tgt.numel() == 0:
- continue
-
- src, tgt = src.to(DEVICE), tgt.to(DEVICE)
-
- output = model(src, tgt[:, :-1])
-
- output_dim = output.shape[-1]
- output = output.contiguous().view(-1, output_dim)
- tgt = tgt[:, 1:].contiguous().view(-1)
-
- loss = criterion(output, tgt)
- epoch_loss += loss.item()
-
- return epoch_loss / len(iterator)

- def train_model(model, train_iterator, valid_iterator, optimizer, criterion, N_EPOCHS=10, CLIP=1):
- best_valid_loss = float('inf')
-
- for epoch in range(N_EPOCHS):
- start_time = time.time()
-
- train_loss = train(model, train_iterator, optimizer, criterion, CLIP)
- valid_loss = evaluate(model, valid_iterator, criterion)
-
- end_time = time.time()
- epoch_mins, epoch_secs = epoch_time(start_time, end_time)
-
- if valid_loss < best_valid_loss:
- best_valid_loss = valid_loss
- torch.save(model.state_dict(), 'best-model_transformer.pt')
-
- print(f'Epoch: {epoch+1:02} | Time: {epoch_mins}m {epoch_secs}s')
- print(f'\tTrain Loss: {train_loss:.3f} | Train PPL: {math.exp(train_loss):7.3f}')
- print(f'\t Val. Loss: {valid_loss:.3f} | Val. PPL: {math.exp(valid_loss):7.3f}')

- def translate_sentence(sentence, src_vocab, tgt_vocab, model, device, max_length=50):
- model.eval()
-
- if isinstance(sentence, str):
- tokens = [token for token in en_tokenizer(sentence)]
- else:
- tokens = [str(token) for token in sentence]
-
- tokens = ['<bos>'] + tokens + ['<eos>']
- src_indexes = [src_vocab[token] for token in tokens]
- src_tensor = torch.LongTensor(src_indexes).unsqueeze(0).to(device)
-
- with torch.no_grad():
- encoder_outputs = model.transformer.encoder(model.positional_encoding(model.src_embedding(src_tensor) * math.sqrt(model.d_model)))
-
- trg_indexes = [tgt_vocab['<bos>']]
- for i in range(max_length):
- trg_tensor = torch.LongTensor(trg_indexes).unsqueeze(0).to(device)
-
- with torch.no_grad():
- output = model(src_tensor, trg_tensor)
-
- pred_token = output.argmax(2)[:, -1].item()
- trg_indexes.append(pred_token)
-
- if pred_token == tgt_vocab['<eos>']:
- break
-
- trg_tokens = [tgt_vocab.get_itos()[i] for i in trg_indexes]
- return trg_tokens[1:-1]
-
- def calculate_bleu(dev_loader, src_vocab, trg_vocab, model, device):
- translated_sentences = []
- references = []
-
- for src, trg in dev_loader:
- src = src.to(device)
- translation = translate_sentence(src, src_vocab, trg_vocab, model, device)
- translated_sentences.append(' '.join(translation)) # Wrap in a list for BLEU calculation
-
- for t in trg:
- ref_str = ' '.join([trg_vocab.get_itos()[idx] for idx in t.tolist() if idx not in [trg_vocab['<bos>'], trg_vocab['<eos>'], trg_vocab['<pad>']]])
- references.append(ref_str)
-
- bleu = sacrebleu.corpus_bleu(translated_sentences, [references])
- return bleu.score

- if __name__ == '__main__':
- MAX_LENGTH = 100
- BATCH_SIZE = 32
- DEVICE = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
- N = 148363
-
- train_path = 'train.txt'
- dev_en_path = 'dev_en.txt'
- dev_zh_path = 'dev_zh.txt'
- test_en_path = 'test_en.txt'
-
- train_loader, dev_loader, test_loader, en_vocab, zh_vocab = load_data(train_path, dev_en_path, dev_zh_path, test_en_path)
-
- D_MODEL = 512
- NHEAD = 8
- NUM_ENCODER_LAYERS = 6
- NUM_DECODER_LAYERS = 6
- DIM_FEEDFORWARD = 2048
- DROPOUT = 0.1
-
- N_EPOCHS = 10
- CLIP = 1
-
- model = initialize_model(en_vocab, zh_vocab, D_MODEL, NHEAD, NUM_ENCODER_LAYERS, NUM_DECODER_LAYERS, DIM_FEEDFORWARD, DROPOUT).to(DEVICE)
- print(f'The model has {sum(p.numel() for p in model.parameters() if p.requires_grad):,} trainable parameters')
-
- criterion = nn.CrossEntropyLoss(ignore_index=zh_vocab['<pad>'])
- optimizer = optim.Adam(model.parameters(), lr=0.0001, betas=(0.9, 0.98), eps=1e-9)
-
- train_model(model, train_loader, dev_loader, optimizer, criterion, N_EPOCHS, CLIP)

- def load_dictionary(dict_path):
- term_dict = {}
- with open(dict_path, 'r', encoding='utf-8') as f:
- data = f.read()
- data = data.strip().split('\n')
- source_term = [line.split('\t')[0] for line in data]
- target_term = [line.split('\t')[1] for line in data]
- for i in range(len(source_term)):
- term_dict[source_term[i]] = target_term[i]
- return term_dict
-
- def post_process_translation(translation, term_dict):
- translated_words = [term_dict.get(word, word) for word in translation]
- return "".join(translated_words)
-
- # 加载术语词典
- dict_path = 'en-zh.dic'
- term_dict = load_dictionary(dict_path)

- save_dir = 'submit_add_dict.txt'
- with open(save_dir, 'w') as f:
- translated_sentences = []
- for batch in test_loader:
- src, _ = batch
- src = src.to(DEVICE)
- translated = translate_sentence(src[0], en_vocab, zh_vocab, model, DEVICE)
- results = post_process_translation(translated, term_dict)
- f.write(results + '\n')
- print(f"翻译完成,结果已保存到{save_dir}")
在Transformer模型的基础上,还有多种方法可以进一步提高机器翻译的效果和模型的性能:
超参数调优:
引入术语词典:
数据清洗:
数据扩增:
优化学习率调度:
小规模预训练模型:
微调模型:
语言模型评分:
集成学习:
通过这些改进方法,可以进一步提升Transformer模型在机器翻译任务中的表现,使其在实际应用中更加高效和准确。
结语
在这篇博文中,我们详细介绍了如何基于Transformer模型实现一个完整的机器翻译任务。从环境配置、数据预处理、模型构建、训练到最终的评价和术语词典后处理,我们逐步剖析了每一个关键环节。通过这种方式,大家可以深入理解Transformer的核心原理和实现细节,掌握其在自然语言处理中的强大功能。
Transformer模型自提出以来,因其高效的并行计算和强大的上下文捕获能力,已成为自然语言处理领域的主流模型。希望通过本教程,大家能够更加深入地理解并应用这一模型,进一步提高自己的自然语言处理技能。
机器翻译只是Transformer众多应用中的一个实例。随着技术的发展,Transformer还将继续在更多领域展现其强大的性能和广泛的应用前景。期待大家在实际项目中能够灵活应用所学知识,不断创新,取得更好的成果。
如果你觉得这篇博文对你有帮助,请点赞、收藏、关注我,并且可以打赏支持我!
欢迎关注我的后续博文,我将分享更多关于人工智能、自然语言处理和计算机视觉的精彩内容。
谢谢大家的支持!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。