赞
踩
Transformer是一种基于注意力机制的革命性深度学习模型架构,用于处理序列到序列的任务,如机器翻译和文本生成。相比于传统的循环神经网络(RNN)和长短期记忆网络(LSTM),Transformer利用自注意力机制(self-attention)来捕捉序列中不同位置之间的依赖关系,从而实现并行计算,显著提升了模型的训练速度和性能。Transformer由编码器和解码器组成,编码器用于将输入序列编码为上下文表示,解码器则利用编码器输出的信息生成目标序列。其中,多头注意力机制允许模型同时关注输入序列中不同的表示子空间,位置编码则保留了序列中词语的顺序信息。Transformer的出现标志着在自然语言处理领域中从传统的循环结构向更有效的注意力机制和全局性的表示学习的转变。
Transformer的基本结构由编码器-解码器架构组成,每个部分都有其独特的组件和功能:
(一)编码器:
1.输入嵌入层:将输入序列中的词语或符号转换为向量表示。
2.位置编码:将序列中每个词语的位置信息加入到嵌入向量中,以保留词语在序列中的顺序信息。
3.多头自注意力机制:通过多个并行的注意力头,允许模型在不同表示子空间内对输入序列进行关注,从而捕捉长距离依赖关系。
4.前馈神经网络:每个注意力子层后接一个全连接前馈神经网络,以对序列中的每个位置进行独立的转换和映射。
5.残差连接 和 层归一化:在每个子层内部,通过残差连接和层归一化来增强模型训练过程中的稳定性和效率。
(二)解码器:
1.输入嵌入层:将目标序列中的词语或符号转换为向量表示。
2.位置编码:与编码器相同,将位置信息嵌入到输出嵌入向量中。
3.多头自注意力机制:解码器中的自注意力机制允许模型关注已生成的部分序列,以便更好地生成下一个词语。
4.编码器-解码器注意力机制:解码器通过注意编码器的输出,以获得输入序列的全局信息。
5.前馈神经网络:与编码器相似,用于对解码器的每个位置进行转换和映射。
6.残差连接 和 层归一化:同样用于增强解码器内部每个子层的稳定性和效率。
- import math
- import torchtext
- import torch
- import torch.nn as nn
- from torch import Tensor
- from torch.nn.utils.rnn import pad_sequence
- from torch.utils.data import DataLoader
- from collections import Counter
- from torchtext.vocab import Vocab
- from torch.nn import TransformerEncoder, TransformerDecoder, TransformerEncoderLayer, TransformerDecoderLayer
- import io
- import time
- import pandas as pd
- import numpy as np
- import pickle
- import tqdm
- import sentencepiece as spm
- torch.manual_seed(0)
- device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
我们使用从 JParaCrawl 下载的日英平行数据集,这个数据集被描述为由NTT创建的“最大的公开可用的英日平行语料库”。它主要通过网络抓取并自动对齐平行句子而创建。
- df = pd.read_csv('zh-ja.bicleaner05.txt', sep='\\t', engine='python', header=None)#使用 pandas 库中的 read_csv 函数来读取名为 'zh-ja.bicleaner05.txt' 的文件,并将其存储在变量 df 中
- trainen = df[2].values.tolist()#[:10000]
- trainja = df[3].values.tolist()#[:10000]
- # trainen.pop(5972)
- # trainja.pop(5972)
- print(trainen[500])
- print(trainja[500])
打印结果如下所示:
与英语或其他字母语言不同,日语句子中不包含空格来分隔单词。我们可以使用 JParaCrawl 提供的分词器,它们使用了 SentencePiece 分别针对日语和英语进行了创建。
- en_tokenizer = spm.SentencePieceProcessor(model_file='spm.en.nopretok.model')
- ja_tokenizer = spm.SentencePieceProcessor(model_file='spm.ja.nopretok.model')
我们进行一个简单的测试,结果如下:
en_tokenizer.encode("All residents aged 20 to 59 years who live in Japan must enroll in public pension system.", out_type='str')
ja_tokenizer.encode("年金 日本に住んでいる20歳~60歳の全ての人は、公的年金制度に加入しなければなりません。", out_type='str')
我们接着从 TorchText 导入并构建 Vocab 对象,在获取了词汇表对象之后,我们可以利用词汇表和分词器对象来构建训练数据的张量。
- def build_vocab(sentences, tokenizer):
- counter = Counter()# 创建一个空的计数器对象
- for sentence in sentences:
- counter.update(tokenizer.encode(sentence, out_type=str))# 对每个句子进行分词并更新计数器
- return Vocab(counter, specials=['<unk>', '<pad>', '<bos>', '<eos>'])
- ja_vocab = build_vocab(trainja, ja_tokenizer)# 用日语句子列表和日语分词器构建日语词汇表
- en_vocab = build_vocab(trainen, en_tokenizer)# 用英语句子列表和英语分词器构建英语词汇表
- def data_process(ja, en):
- data = []# 创建一个空列表用于存储处理后的数据
- for (raw_ja, raw_en) in zip(ja, en): # 使用 zip 函数同时迭代日语列表和英语列表中的句子对
- # 处理日语句子
- ja_tensor_ = torch.tensor([ja_vocab[token] for token in ja_tokenizer.encode(raw_ja.rstrip("\n"), out_type=str)],
- dtype=torch.long)
- # 处理英语句子
- en_tensor_ = torch.tensor([en_vocab[token] for token in en_tokenizer.encode(raw_en.rstrip("\n"), out_type=str)],
- dtype=torch.long)
- # 将处理后的日语和英语句子对组成元组,添加到数据列表中
- data.append((ja_tensor_, en_tensor_))
- return data# 返回处理后的数据列表
- train_data = data_process(trainja, trainen)# 对训练集的日语和英语数据进行处理,生成训练数据集
- BATCH_SIZE = 8# 批处理大小
- PAD_IDX = ja_vocab['<pad>']# 获取日语词汇表中 '<pad>' 标记的索引
- BOS_IDX = ja_vocab['<bos>']# 获取日语词汇表中 '<bos>' 标记的索引
- EOS_IDX = ja_vocab['<eos>']# 获取日语词汇表中 '<eos>' 标记的索引
- def generate_batch(data_batch):
- ja_batch, en_batch = [], []
- for (ja_item, en_item) in data_batch:
- # 为每个句子添加 '<bos>'(句子开头)和 '<eos>'(句子结尾)标记
- ja_batch.append(torch.cat([torch.tensor([BOS_IDX]), ja_item, torch.tensor([EOS_IDX])], dim=0))
- en_batch.append(torch.cat([torch.tensor([BOS_IDX]), en_item, torch.tensor([EOS_IDX])], dim=0))
- # 对添加了 '<bos>' 和 '<eos>' 标记的句子进行填充操作
- ja_batch = pad_sequence(ja_batch, padding_value=PAD_IDX)
- en_batch = pad_sequence(en_batch, padding_value=PAD_IDX)
- return ja_batch, en_batch
- # 创建训练数据的 DataLoader,每次迭代会生成一个批次数据
- train_iter = DataLoader(train_data, batch_size=BATCH_SIZE,
- shuffle=True, collate_fn=generate_batch)
Transformer模型包括一个编码器和一个解码器块,每个块都包含固定数量的层。编码器通过一系列的多头注意力和前馈网络层处理输入序列。编码器的输出称为记忆,将其与目标张量一起馈送给解码器。编码器和解码器使用强制教学技术进行端到端的训练。
- from torch.nn import (TransformerEncoder, TransformerDecoder,
- TransformerEncoderLayer, TransformerDecoderLayer)
-
-
- class Seq2SeqTransformer(nn.Module):
- def __init__(self, num_encoder_layers: int, num_decoder_layers: int,
- emb_size: int, src_vocab_size: int, tgt_vocab_size: int,
- dim_feedforward:int = 512, dropout:float = 0.1):
- super(Seq2SeqTransformer, self).__init__()
-
- # 定义编码器层
- encoder_layer = TransformerEncoderLayer(d_model=emb_size, nhead=NHEAD,
- dim_feedforward=dim_feedforward)
- self.transformer_encoder = TransformerEncoder(encoder_layer, num_layers=num_encoder_layers)
- # 定义解码器层
- decoder_layer = TransformerDecoderLayer(d_model=emb_size, nhead=NHEAD,
- dim_feedforward=dim_feedforward)
- self.transformer_decoder = TransformerDecoder(decoder_layer, num_layers=num_decoder_layers)
-
- # 线性层生成器,用于最终输出目标词汇
- self.generator = nn.Linear(emb_size, tgt_vocab_size)
- # 源语言和目标语言的词嵌入层
- self.src_tok_emb = TokenEmbedding(src_vocab_size, emb_size)
- self.tgt_tok_emb = TokenEmbedding(tgt_vocab_size, emb_size)
- # 位置编码器,用于处理序列位置信息
- self.positional_encoding = PositionalEncoding(emb_size, dropout=dropout)
-
- def forward(self, src: Tensor, trg: Tensor, src_mask: Tensor,
- tgt_mask: Tensor, src_padding_mask: Tensor,
- tgt_padding_mask: Tensor, memory_key_padding_mask: Tensor):
- # 编码器处理源语言序列
- src_emb = self.positional_encoding(self.src_tok_emb(src))
- tgt_emb = self.positional_encoding(self.tgt_tok_emb(trg))
- memory = self.transformer_encoder(src_emb, src_mask, src_padding_mask)
- outs = self.transformer_decoder(tgt_emb, memory, tgt_mask, None,
- tgt_padding_mask, memory_key_padding_mask)
- # 最终输出通过线性层生成目标词汇概率分布
- return self.generator(outs)
-
- def encode(self, src: Tensor, src_mask: Tensor):
- # 仅编码阶段,不需要解码器
- return self.transformer_encoder(self.positional_encoding(
- self.src_tok_emb(src)), src_mask)
-
- def decode(self, tgt: Tensor, memory: Tensor, tgt_mask: Tensor):
- # 仅解码阶段,不需要编码器
- return self.transformer_decoder(self.positional_encoding(
- self.tgt_tok_emb(tgt)), memory,
- tgt_mask)
文本标记使用标记嵌入(token embeddings)表示。为了引入单词顺序的概念,还会添加位置编码(positional encoding)到标记嵌入中。
- class PositionalEncoding(nn.Module):
- def __init__(self, emb_size: int, dropout, maxlen: int = 5000):
- super(PositionalEncoding, self).__init__()
- # 计算位置编码
- den = torch.exp(- torch.arange(0, emb_size, 2) * math.log(10000) / emb_size)
- pos = torch.arange(0, maxlen).reshape(maxlen, 1)
- pos_embedding = torch.zeros((maxlen, emb_size))
- pos_embedding[:, 0::2] = torch.sin(pos * den)# 偶数维度使用sin函数
- pos_embedding[:, 1::2] = torch.cos(pos * den)# 奇数维度使用cos函数
- pos_embedding = pos_embedding.unsqueeze(-2)# 添加一个维度
-
- # 设置dropout
- self.dropout = nn.Dropout(dropout)
- # 将位置编码作为缓冲区注册,这样在模型保存和加载时它们不会被认为是模型参数
- self.register_buffer('pos_embedding', pos_embedding)
-
- def forward(self, token_embedding: Tensor):
- # 在输入的词嵌入上加上位置编码,并应用dropout
- return self.dropout(token_embedding +
- self.pos_embedding[:token_embedding.size(0),:])
-
- class TokenEmbedding(nn.Module):
- def __init__(self, vocab_size: int, emb_size):
- super(TokenEmbedding, self).__init__()
- # 定义词嵌入层
- self.embedding = nn.Embedding(vocab_size, emb_size)
- self.emb_size = emb_size
- def forward(self, tokens: Tensor):
- # 返回词嵌入乘以一个数值开方
- return self.embedding(tokens.long()) * math.sqrt(self.emb_size)
创建了一个后续词掩码,以防止目标词关注其后续词,创建了用于掩盖源语言和目标语言填充标记的掩码。
- def generate_square_subsequent_mask(sz):
- # 创建一个上三角矩阵,对角线以下的值为0,其余为1
- mask = (torch.triu(torch.ones((sz, sz), device=device)) == 1).transpose(0, 1)
- # 将mask转换为float类型,并用-infinity填充0位置,用0.0填充1位置
- mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
- return mask
-
- def create_mask(src, tgt):
- # 计算源序列和目标序列的长度
- src_seq_len = src.shape[0]
- tgt_seq_len = tgt.shape[0]
-
- # 生成目标序列的mask
- tgt_mask = generate_square_subsequent_mask(tgt_seq_len)
- # 创建源序列的mask,全为False
- src_mask = torch.zeros((src_seq_len, src_seq_len), device=device).type(torch.bool)
-
- # 创建源序列的填充mask,将PAD_IDX所在位置置为True
- src_padding_mask = (src == PAD_IDX).transpose(0, 1)
- # 创建目标序列的填充mask,将PAD_IDX所在位置置为True
- tgt_padding_mask = (tgt == PAD_IDX).transpose(0, 1)
- return src_mask, tgt_mask, src_padding_mask, tgt_padding_mask
使用自己的GPU,NUM_ENCODER_LAYERS 和 NUM_DECODER_LAYERS 设置为3或者更高,NHEAD设置8,EMB_SIZE设置为512。
- SRC_VOCAB_SIZE = len(ja_vocab)# 源语言词汇表大小
- TGT_VOCAB_SIZE = len(en_vocab)# 目标语言词汇表大小
- EMB_SIZE = 512# 词嵌入维度
- NHEAD = 8# 注意力头数
- FFN_HID_DIM = 512# Feedforward层隐藏单元数
- BATCH_SIZE = 16# 批量大小
- NUM_ENCODER_LAYERS = 3# 编码器层数
- NUM_DECODER_LAYERS = 3# 解码器层数
- NUM_EPOCHS = 16# 训练轮数
- # 创建Seq2SeqTransformer模型实例
- transformer = Seq2SeqTransformer(NUM_ENCODER_LAYERS, NUM_DECODER_LAYERS,
- EMB_SIZE, SRC_VOCAB_SIZE, TGT_VOCAB_SIZE,
- FFN_HID_DIM)
-
- # 初始化模型参数,使用Xavier初始化
- for p in transformer.parameters():
- if p.dim() > 1:
- nn.init.xavier_uniform_(p)
-
- # 将模型移动到设备(如GPU)上
- transformer = transformer.to(device)
- # 定义交叉熵损失函数,忽略填充索引PAD_IDX
- loss_fn = torch.nn.CrossEntropyLoss(ignore_index=PAD_IDX)
- # 定义优化器,使用Adam优化器
- optimizer = torch.optim.Adam(
- transformer.parameters(), lr=0.0001, betas=(0.9, 0.98), eps=1e-9
- )
- def train_epoch(model, train_iter, optimizer):
- model.train()
- losses = 0
- for idx, (src, tgt) in enumerate(train_iter):
- src = src.to(device)
- tgt = tgt.to(device)
-
- tgt_input = tgt[:-1, :]# 目标序列输入,不包括最后一个标记
- # 创建用于Transformer的mask
- src_mask, tgt_mask, src_padding_mask, tgt_padding_mask = create_mask(src, tgt_input)
- # 前向传播
- logits = model(src, tgt_input, src_mask, tgt_mask,
- src_padding_mask, tgt_padding_mask, src_padding_mask)
-
- optimizer.zero_grad()
-
- tgt_out = tgt[1:,:]# 目标序列输出,不包括第一个标记
- loss = loss_fn(logits.reshape(-1, logits.shape[-1]), tgt_out.reshape(-1))
- loss.backward()
-
- optimizer.step()
- losses += loss.item()
- return losses / len(train_iter)
-
-
- def evaluate(model, val_iter):
- model.eval()
- losses = 0
- for idx, (src, tgt) in (enumerate(valid_iter)):
- src = src.to(device)
- tgt = tgt.to(device)
-
- tgt_input = tgt[:-1, :]# 目标序列输入,不包括最后一个标记
-
- src_mask, tgt_mask, src_padding_mask, tgt_padding_mask = create_mask(src, tgt_input)
- # 前向传播
- logits = model(src, tgt_input, src_mask, tgt_mask,
- src_padding_mask, tgt_padding_mask, src_padding_mask)
- tgt_out = tgt[1:,:]# 目标序列输出,不包括第一个标记
- loss = loss_fn(logits.reshape(-1, logits.shape[-1]), tgt_out.reshape(-1))
- losses += loss.item()
- return losses / len(val_iter)
在准备好必要的类和函数之后,准备开始训练我们的模型。由于训练任务量较大,训练时间人与人之间会有较大差异,受算力、参数等各方面的影响。
- for epoch in tqdm.tqdm(range(1, NUM_EPOCHS+1)):
- start_time = time.time()# 记录当前 epoch 的开始时间
- # 训练一个 epoch 并返回训练损失
- train_loss = train_epoch(transformer, train_iter, optimizer)
- end_time = time.time()# 记录当前 epoch 的结束时间
- # 打印当前 epoch 的训练损失和所用时间
- print((f"Epoch: {epoch}, Train loss: {train_loss:.3f}, "
- f"Epoch time = {(end_time - start_time):.3f}s"))
我们尝试使用训练好的模型去翻译日语,首先,我们需要创建函数来翻译新的句子,包括获取日语句子、分词、转换为张量、推断以及将结果解码回英语句子的步骤。
- def greedy_decode(model, src, src_mask, max_len, start_symbol):
- src = src.to(device)# 将源语言序列移动到设备上(如GPU)
- src_mask = src_mask.to(device)# 将源语言mask移动到设备上(如GPU)
- memory = model.encode(src, src_mask)
- # 准备用于解码的初始目标语言序列(包含起始符号
- ys = torch.ones(1, 1).fill_(start_symbol).type(torch.long).to(device)
- # 进行贪婪解码
- for i in range(max_len-1):
- memory = memory.to(device)
- memory_mask = torch.zeros(ys.shape[0], memory.shape[0]).to(device).type(torch.bool)
- tgt_mask = (generate_square_subsequent_mask(ys.size(0))
- .type(torch.bool)).to(device)
- # 解码器前向传播
- out = model.decode(ys, memory, tgt_mask)
- out = out.transpose(0, 1)
- # 生成下一个词的概率分布并选择概率最高的词作为下一个词
- prob = model.generator(out[:, -1])
- _, next_word = torch.max(prob, dim = 1)
- next_word = next_word.item()
- # 将预测的下一个词添加到目标语言序列中
- ys = torch.cat([ys,
- torch.ones(1, 1).type_as(src.data).fill_(next_word)], dim=0)
- # 如果预测到了结束符号,则停止解码
- if next_word == EOS_IDX:
- break
- return ys
- def translate(model, src, src_vocab, tgt_vocab, src_tokenizer):
- model.eval()
- tokens = [BOS_IDX] + [src_vocab.stoi[tok] for tok in src_tokenizer.encode(src, out_type=str)]+ [EOS_IDX]
- num_tokens = len(tokens)
- src = (torch.LongTensor(tokens).reshape(num_tokens, 1) )
- src_mask = (torch.zeros(num_tokens, num_tokens)).type(torch.bool)
- # 使用贪婪解码生成目标语言序列
- tgt_tokens = greedy_decode(model, src, src_mask, max_len=num_tokens + 5, start_symbol=BOS_IDX).flatten()
- # 将目标语言序列中的特殊标记替换为可读的文本
- return " ".join([tgt_vocab.itos[tok] for tok in tgt_tokens]).replace("<bos>", "").replace("<eos>", "")
然后我们调用translate函数:
translate(transformer, "HSコード 8515 はんだ付け用、ろう付け用又は溶接用の機器(電気式(電気加熱ガス式を含む。)", ja_vocab, en_vocab, ja_tokenizer)
训练完成后,我们将首先使用 Pickle 保存词汇对象(en_vocab 和 ja_vocab)
- import pickle
- # open a file, where you want to store the data
- file = open('en_vocab.pkl', 'wb')
- # dump information to that file
- pickle.dump(en_vocab, file)
- file.close()
- file = open('ja_vocab.pkl', 'wb')
- # 将日语词汇表(ja_vocab)的信息存储到文件中
- pickle.dump(ja_vocab, file)
- file.close()
最后,我们还可以使用 PyTorch 的保存和加载函数来保存模型以供以后使用。一般来说,根据后续使用的目的,有两种保存模型的方式。第一种方式是仅用于推理,我们可以稍后加载模型,并用它来从日语翻译成英语。
- # save model for inference
- torch.save(transformer.state_dict(), 'inference_model')
第二种方式也是用于推理,但同时也适用于稍后加载模型并希望恢复训练的情况。
- # save model + checkpoint to resume training later
- torch.save({
- 'epoch': NUM_EPOCHS,# 当前轮次
- 'model_state_dict': transformer.state_dict(),
- 'optimizer_state_dict': optimizer.state_dict(),
- 'loss': train_loss,# 训练损失
- }, 'model_checkpoint.tar')# 将信息保存到名为'model_checkpoint.tar'的文件中
在本次实验中,我们实现了基于transformer实现机器翻译,基于Transformer模型的机器翻译实验通常包括几个关键步骤:首先,通过预处理数据并进行适当的分词处理,确保输入和输出序列的准备工作。其次,构建并训练Transformer模型,包括编码器和解码器,以及注意力机制的应用,用于捕捉长距离依赖关系。然后,使用合适的优化器和损失函数对模型进行优化,以最小化翻译过程中的错误。最后,通过保存模型以便后续推理或继续训练,确保模型在实际应用中的可持续性和灵活性。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。