赞
踩
- import math # 导入math库,用于访问数学函数和常数
- import torchtext # 导入torchtext,用于文本处理
- import torch # 导入PyTorch库,用于深度学习
- import torch.nn as nn # 从PyTorch导入nn模块,包含构建神经网络所需的类
- from torch import Tensor # 从PyTorch导入Tensor类,用于表示多维数组
- from torch.nn.utils.rnn import pad_sequence # 导入pad_sequence函数,用于填充序列
- from torch.utils.data import DataLoader # 导入DataLoader类,用于加载数据集
- from collections import Counter # 从collections模块导入Counter类,用于计数
- from torchtext.vocab import Vocab # 从torchtext导入Vocab类,用于构建词汇表
- from torch.nn import TransformerEncoder, TransformerDecoder, \
- TransformerEncoderLayer, TransformerDecoderLayer # 从torch.nn导入Transformer模型的相关层
- import io # 导入io库,用于处理输入输出流
- import time # 导入time库,用于时间相关操作
- import pandas as pd # 导入pandas库,用于数据分析和操作DataFrame对象
- import numpy as np # 导入numpy库,用于数值计算
- import pickle # 导入pickle库,用于序列化和反序列化Python对象
- import tqdm # 导入tqdm库,用于显示进度条
- import sentencepiece as spm # 导入sentencepiece库,用于文本的分词处理
- torch.manual_seed(0) # 设置PyTorch的随机种子为0,以确保结果的可重复性
- device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') # 设置设备为GPU(如果可用)或CPU
在本教程中,我们将使用从 JParaCrawl![http://www.kecl.ntt.co.jp/icl/lirg/jparacrawl] 下载的日英并行数据集,该数据集被描述为 "由 NTT 创建的最大的公开可用英日并行语料库"。它主要是通过抓取网络并自动对齐平行句子而创建的。
- # 读取数据文件,并指定分隔符为制表符
- df = pd.read_csv('./zh-ja/zh-ja.bicleaner05.txt', sep='\\t', engine='python', header=None)
-
- # 提取中文和日文文本列
- trainen = df[2].values.tolist() # 中文文本列表
- trainja = df[3].values.tolist() # 日文文本列表
- # trainen.pop(5972)
与英语或其他字母语言不同,日语句子不包含分隔单词的空格。我们可以使用 JParaCrawl 提供的标记化器,该标记化器是使用 SentencePiece 为日语和英语创建的
- en_tokenizer = spm.SentencePieceProcessor(model_file='enja_spm_models/spm.en.nopretok.model')
- ja_tokenizer = spm.SentencePieceProcessor(model_file='enja_spm_models/spm.ja.nopretok.model')
在加载了分词器之后,你可以通过执行以下代码来测试它们。
- # 导入en_tokenizer模块
- import en_tokenizer
-
- # 使用en_tokenizer对给定的文本进行编码,并将结果转换为字符串类型
- encoded_text = en_tokenizer.encode("All residents aged 20 to 59 years who live in Japan must enroll in public pension system.", out_type='str')
-
- # 打印编码后的文本
- print(encoded_text)
- # 导入ja_tokenizer模块
- import ja_tokenizer
-
- # 使用ja_tokenizer的encode方法对给定的文本进行编码
- # 输入文本为:"年金 日本に住んでいる20歳~60歳の全ての人は、公的年金制度に加入しなければなりません。"
- # 输出类型为字符串
- encoded_text = ja_tokenizer.encode("年金 日本に住んでいる20歳~60歳の全ての人は、公的年金制度に加入しなければなりません。", out_type='str')
-
- # 打印编码后的文本
- print(encoded_text)
定义 build_vocab 的函数,它的作用是根据一组句子和分词器来构建一个词汇表。这个函数首先使用 Counter 类来计数句子中的不同标记,Counter 是一个字典子类,用于计数可哈希对象。函数遍历传入的 sentences 列表,对每一句使用 tokenizer 进行编码,将句子转换成标记序列,并将这些标记更新到 counter 中。
tokenizer.encode 方法将句子转换为一系列标记,并且 out_type=str 参数指定输出类型为字符串。然后,使用 counter.update 方法将这些标记添加到计数器中,从而统计每个标记出现的次数。
接下来,函数使用 counter 来构建词汇表。这里使用了 Vocab 构造函数,它接受一个计数器对象,并可以添加一些特殊标记,如未知标记 <unk>、填充标记 <pad>、开始标记 <bos> 和结束标记 <eos>。这些特殊标记在自然语言处理中有着重要的作用,例如处理未知词汇、填充序列以统一长度、标记句子的开始和结束。
最后,函数返回构建好的词汇表。在代码的后半部分,分别使用 build_vocab 函数和日语、英语的分词器 ja_tokenizer、en_tokenizer 来构建日语和英语的词汇表,分别存储在 ja_vocab 和 en_vocab 变量中。这样,每个语言的文本在进行进一步的自然语言处理任务之前,都会有一个准备好的词汇表,这有助于标准化和优化模型的训练过程。
- # 定义一个函数 build_vocab,用于构建词汇表
- def build_vocab(sentences, tokenizer):
- # 创建一个计数器对象
- counter = Counter()
- # 遍历输入的句子列表
- for sentence in sentences:
- # 使用分词器对句子进行编码,并将结果更新到计数器中
- counter.update(tokenizer.encode(sentence, out_type=str))
- # 返回一个词汇表对象,其中包含计数器的统计信息以及特殊标记(如未知词、填充符、开始符和结束符)
- return Vocab(counter, specials=['<unk>', '<pad>', '<bos>', '<eos>'])
-
- # 使用训练集和日语分词器构建日语词汇表
- ja_vocab = build_vocab(trainja, ja_tokenizer)
- # 使用训练集和英语分词器构建英语词汇表
- en_vocab = build_vocab(trainen, en_tokenizer)
接下来,函数使用 zip
函数将日语列表 ja
和英语列表 en
配对组合,并遍历这些配对。对于每一对原始日语句子 raw_ja
和原始英语句子 raw_en。
处理后的日语和英语张量被组成一个元组 (ja_tensor_, en_tensor_),然后这个元组被添加到 data列表中。这样,每对原始句子都被转换成了它们的数值表示形式,并且存储在列表中。
最后,函数返回这个包含处理后数据的列表 data。通过调用 data_process 函数并传入训练数据 trainja 和 trainen,得到了处理后的训练数据集 train_data。这个数据集现在可以被用来创建 DataLoader 对象,进而在模型训练过程中进行批量加载和处理。
- # 定义一个名为data_process的函数,接收两个参数:ja和en
- def data_process(ja, en):
- # 初始化一个空列表data
- data = []
- # 使用zip函数将ja和en中的元素一一对应地组合在一起,然后遍历这些组合
- for (raw_ja, raw_en) in zip(ja, en):
- # 对原始的日语文本进行分词,并将分词结果转换为对应的词汇表索引,然后将这些索引转换为LongTensor类型的张量
- ja_tensor_ = torch.tensor([ja_vocab[token] for token in ja_tokenizer.encode(raw_ja.rstrip("
- "), out_type=str)],
- dtype=torch.long)
- # 对原始的英语文本进行分词,并将分词结果转换为对应的词汇表索引,然后将这些索引转换为LongTensor类型的张量
- en_tensor_ = torch.tensor([en_vocab[token] for token in en_tokenizer.encode(raw_en.rstrip("
- "), out_type=str)],
- dtype=torch.long)
- # 将处理好的日语和英语张量作为元组添加到data列表中
- data.append((ja_tensor_, en_tensor_))
- # 返回处理后的数据列表
- return data
-
- # 调用data_process函数处理训练数据,并将结果赋值给train_data变量
- train_data = data_process(trainja, trainen)
- # 设置批量大小为8
- BATCH_SIZE = 8
- # 获取日语词汇表中的填充索引
- PAD_IDX = ja_vocab['<pad>']
- # 获取日语词汇表中的开始索引
- BOS_IDX = ja_vocab['<bos>']
- # 获取日语词汇表中的结束索引
- EOS_IDX = ja_vocab['<eos>']
-
- # 定义一个函数,用于生成批次数据
- def generate_batch(data_batch):
- # 初始化两个空列表,用于存储日语和英语的批次数据
- ja_batch, en_batch = [], []
- # 遍历输入的数据批次
- for (ja_item, en_item) in data_batch:
- # 将开始索引、日语数据和结束索引拼接起来,并添加到日语批次列表中
- ja_batch.append(torch.cat([torch.tensor([BOS_IDX]), ja_item, torch.tensor([EOS_IDX])], dim=0))
- # 将开始索引、英语数据和结束索引拼接起来,并添加到英语批次列表中
- en_batch.append(torch.cat([torch.tensor([BOS_IDX]), en_item, torch.tensor([EOS_IDX])], dim=0))
- # 对日语批次列表进行填充处理,使其具有相同的长度
- ja_batch = pad_sequence(ja_batch, padding_value=PAD_IDX)
- # 对英语批次列表进行填充处理,使其具有相同的长度
- en_batch = pad_sequence(en_batch, padding_value=PAD_IDX)
- # 返回填充后的日语和英语批次数据
- return ja_batch, en_batch
-
- # 使用DataLoader加载训练数据,设置批量大小为BATCH_SIZE,打乱顺序,并使用generate_batch函数作为collate_fn参数
- train_iter = DataLoader(train_data, batch_size=BATCH_SIZE,
- shuffle=True, collate_fn=generate_batch)
下面,我们定义一个基于Transformer的序列到序列(Seq2Seq)模型,用于处理序列数据的翻译或转换任务。模型由编码器(encoder)和解码器(decoder)组成,它们共享相同的维度和一些层的参数。
- from torch.nn import TransformerEncoder, TransformerDecoder, TransformerEncoderLayer, TransformerDecoderLayer
-
- class Seq2SeqTransformer(nn.Module):
- def __init__(self, num_encoder_layers: int, num_decoder_layers: int,
- emb_size: int, src_vocab_size: int, tgt_vocab_size: int,
- dim_feedforward:int = 512, dropout:float = 0.1):
- super(Seq2SeqTransformer, self).__init__()
-
- # Transformer 编码器层
- encoder_layer = TransformerEncoderLayer(d_model=emb_size, nhead=NHEAD,
- dim_feedforward=dim_feedforward)
- self.transformer_encoder = TransformerEncoder(encoder_layer, num_layers=num_encoder_layers)
-
- # Transformer 解码器层
- decoder_layer = TransformerDecoderLayer(d_model=emb_size, nhead=NHEAD,
- dim_feedforward=dim_feedforward)
- self.transformer_decoder = TransformerDecoder(decoder_layer, num_layers=num_decoder_layers)
-
- # 线性层作为生成器,将解码器输出映射到目标词汇表的维度
- self.generator = nn.Linear(emb_size, tgt_vocab_size)
-
- # 源语言和目标语言的词嵌入层
- self.src_tok_emb = TokenEmbedding(src_vocab_size, emb_size)
- self.tgt_tok_emb = TokenEmbedding(tgt_vocab_size, emb_size)
-
- # 位置编码层
- self.positional_encoding = PositionalEncoding(emb_size, dropout=dropout)
-
- def forward(self, src: Tensor, trg: Tensor, src_mask: Tensor,
- tgt_mask: Tensor, src_padding_mask: Tensor,
- tgt_padding_mask: Tensor, memory_key_padding_mask: Tensor):
- # 对源语言和目标语言的输入进行位置编码
- src_emb = self.positional_encoding(self.src_tok_emb(src))
- tgt_emb = self.positional_encoding(self.tgt_tok_emb(trg))
-
- # Transformer 编码器处理源语言序列
- memory = self.transformer_encoder(src_emb, src_mask, src_padding_mask)
-
- # Transformer 解码器处理目标语言序列
- outs = self.transformer_decoder(tgt_emb, memory, tgt_mask, None,
- tgt_padding_mask, memory_key_padding_mask)
-
- # 通过生成器映射到目标词汇表的维度
- return self.generator(outs)
-
- def encode(self, src: Tensor, src_mask: Tensor):
- # 编码器的编码过程,返回编码器的输出(编码器最后一层的输出)
- return self.transformer_encoder(self.positional_encoding(
- self.src_tok_emb(src)), src_mask)
-
- def decode(self, tgt: Tensor, memory: Tensor, tgt_mask: Tensor):
- # 解码器的解码过程,返回解码器的输出(解码器最后一层的输出)
- return self.transformer_decoder(self.positional_encoding(
- self.tgt_tok_emb(tgt)), memory,
- tgt_mask)
-
- class PositionalEncoding(nn.Module):
- def __init__(self, emb_size: int, dropout, maxlen: int = 5000):
- super(PositionalEncoding, self).__init__()
-
- # 计算位置编码
- den = torch.exp(- torch.arange(0, emb_size, 2) * math.log(10000) / emb_size)
- pos = torch.arange(0, maxlen).reshape(maxlen, 1)
- pos_embedding = torch.zeros((maxlen, emb_size))
- pos_embedding[:, 0::2] = torch.sin(pos * den)
- pos_embedding[:, 1::2] = torch.cos(pos * den)
- pos_embedding = pos_embedding.unsqueeze(-2)
-
- self.dropout = nn.Dropout(dropout)
- self.register_buffer('pos_embedding', pos_embedding)
-
- def forward(self, token_embedding: Tensor):
- # 返回加上位置编码后的结果,并应用 dropout
- return self.dropout(token_embedding +
- self.pos_embedding[:token_embedding.size(0), :])
-
- class TokenEmbedding(nn.Module):
- def __init__(self, vocab_size: int, emb_size):
- super(TokenEmbedding, self).__init__()
- self.embedding = nn.Embedding(vocab_size, emb_size)
- self.emb_size = emb_size
-
- def forward(self, tokens: Tensor):
- # 返回词嵌入乘以 sqrt(词嵌入维度) 的结果
- return self.embedding(tokens.long()) * math.sqrt(self.emb_size)
-
- def generate_square_subsequent_mask(sz):
- """
- 生成一个用于 Transformer 解码器的目标序列遮蔽,确保在每个时间步只能看到之前的信息。
- Args:
- - sz (int): 序列的长度
- Returns:
- - mask (Tensor): 形状为 (sz, sz) 的上三角遮蔽矩阵,对角线及以下元素为-∞,其余元素为0
- """
- mask = (torch.triu(torch.ones((sz, sz), device=device)) == 1).transpose(0, 1)
- mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
- return mask
-
- def create_mask(src, tgt):
- """
- 创建用于 Transformer 模型的遮蔽张量。
- Args:
- - src (Tensor): 源序列张量
- - tgt (Tensor): 目标序列张量
- Returns:
- - src_mask (Tensor): 源序列的填充遮蔽张量,形状为 (src_seq_len, src_seq_len)
- - tgt_mask (Tensor): 目标序列的上三角遮蔽张量,形状为 (tgt_seq_len, tgt_seq_len)
- - src_padding_mask (Tensor): 源序列的填充遮蔽张量,形状为 (src_seq_len, batch_size)
- - tgt_padding_mask (Tensor): 目标序列的填充遮蔽张量,形状为 (tgt_seq_len, batch_size)
- """
- src_seq_len = src.shape[0]
- tgt_seq_len = tgt.shape[0]
-
- # 生成目标序列的遮蔽张量
- tgt_mask = generate_square_subsequent_mask(tgt_seq_len)
-
- # 源序列的遮蔽张量为全零矩阵,不遮蔽任何内容
- src_mask = torch.zeros((src_seq_len, src_seq_len), device=device).type(torch.bool)
-
- # 生成源序列和目标序列的填充遮蔽张量
- src_padding_mask = (src == PAD_IDX).transpose(0, 1)
- tgt_padding_mask = (tgt == PAD_IDX).transpose(0, 1)
-
- return src_mask, tgt_mask, src_padding_mask, tgt_padding_mask
-
- SRC_VOCAB_SIZE = len(ja_vocab) # 源语言词汇表大小
- TGT_VOCAB_SIZE = len(en_vocab) # 目标语言词汇表大小
- EMB_SIZE = 512 # 词嵌入维度
- NHEAD = 8 # 注意力头的数量
- FFN_HID_DIM = 512 # FeedForward 层隐藏层维度
- BATCH_SIZE = 16 # 批量大小
- NUM_ENCODER_LAYERS = 3 # 编码器层数
- NUM_DECODER_LAYERS = 3 # 解码器层数
- NUM_EPOCHS = 16 # 训练轮数
-
- transformer = Seq2SeqTransformer(NUM_ENCODER_LAYERS, NUM_DECODER_LAYERS,
- EMB_SIZE, SRC_VOCAB_SIZE, TGT_VOCAB_SIZE,
- FFN_HID_DIM)
-
- # 使用 Xavier 初始化网络参数
- for p in transformer.parameters():
- if p.dim() > 1:
- nn.init.xavier_uniform_(p)
-
- transformer = transformer.to(device) # 将模型移动到GPU上(如果可用)
-
- loss_fn = torch.nn.CrossEntropyLoss(ignore_index=PAD_IDX) # 定义损失函数,忽略填充索引
-
- optimizer = torch.optim.Adam(
- transformer.parameters(), lr=0.0001, betas=(0.9, 0.98), eps=1e-9
- )
-
- def train_epoch(model, train_iter, optimizer):
- """
- 训练模型的一个epoch。
- Args:
- - model (Seq2SeqTransformer): Transformer 模型
- - train_iter (DataLoader): 训练数据迭代器
- - optimizer (torch.optim.Adam): 优化器
- Returns:
- - float: 平均损失值
- """
- model.train()
- losses = 0
- for idx, (src, tgt) in enumerate(train_iter):
- src = src.to(device)
- tgt = tgt.to(device)
-
- tgt_input = tgt[:-1, :]
-
- src_mask, tgt_mask, src_padding_mask, tgt_padding_mask = create_mask(src, tgt_input)
-
- logits = model(src, tgt_input, src_mask, tgt_mask,
- src_padding_mask, tgt_padding_mask, src_padding_mask)
-
- optimizer.zero_grad()
-
- tgt_out = tgt[1:,:]
- loss = loss_fn(logits.reshape(-1, logits.shape[-1]), tgt_out.reshape(-1))
- loss.backward()
-
- optimizer.step()
- losses += loss.item()
- return losses / len(train_iter)
-
-
- def evaluate(model, val_iter):
- """
- 评估模型在验证集上的表现。
- Args:
- - model (Seq2SeqTransformer): Transformer 模型
- - val_iter (DataLoader): 验证数据迭代器
- Returns:
- - float: 平均验证损失值
- """
- model.eval()
- losses = 0
- for idx, (src, tgt) in (enumerate(valid_iter)):
- src = src.to(device)
- tgt = tgt.to(device)
-
- tgt_input = tgt[:-1, :]
-
- src_mask, tgt_mask, src_padding_mask, tgt_padding_mask = create_mask(src, tgt_input)
-
- logits = model(src, tgt_input, src_mask, tgt_mask,
- src_padding_mask, tgt_padding_mask, src_padding_mask)
- tgt_out = tgt[1:,:]
- loss = loss_fn(logits.reshape(-1, logits.shape[-1]), tgt_out.reshape(-1))
- losses += loss.item()
- return losses / len(val_iter)
- # 导入进度条库
- import tqdm
-
- # 定义训练循环,NUM_EPOCHS为训练的总轮数
- for epoch in tqdm.tqdm(range(1, NUM_EPOCHS+1)):
- # 记录当前轮次的开始时间
- start_time = time.time()
-
- # 调用train_epoch函数进行一轮训练,返回训练损失值
- train_loss = train_epoch(transformer, train_iter, optimizer)
-
- # 记录当前轮次的结束时间
- end_time = time.time()
-
- # 打印当前轮次、训练损失值以及本轮训练所花费的时间
- print((f"Epoch: {epoch}, Train loss: {train_loss:.3f}, "
- f"Epoch time = {(end_time - start_time):.3f}s"))
尝试使用上面训练好的模型来实现日语句子的翻译。
定义下面两个函数用于使用训练好的Seq2SeqTransformer模型进行解码和翻译。
·greedy_decode函数实现了一种简单的解码策略,它接收一个预训练的模型、源语言文本的表示、相应的掩码、允许的最大解码长度以及解码开始的起始符号。函数首先将源语言数据准备到合适的设备上,并利用模型的编码器部分来获取编码后的记忆表示。接着,初始化目标序列,仅包含起始符号,并在设备上进行处理。在每次迭代中,函数生成目标掩码以防止解码时信息泄露,然后利用解码器和当前的目标序列生成下一个词的概率分布。之后,选择概率最高的词作为序列的下一个元素,直到生成结束符号或达到最大长度。最后,函数返回解码得到的完整目标序列。
·translate函数则是将greedy_decode函数应用于实际翻译任务中的一个包装器。它首先将模型设置为评估模式,使用分词器将源语言文本转换为词索引,并添加必要的开始和结束符号。然后,构建源语言数据的张量和掩码,调用greedy_decode函数进行解码。解码得到的词索引序列经过转换和格式化,去除特定的标记,并连接成人类可读的目标语言文本。这样,translate函数提供了一个从源语言文本到目标语言文本的直接翻译接口。
- def greedy_decode(model, src, src_mask, max_len, start_symbol):
- # 将源语言数据和掩码移动到指定的设备(CPU或GPU)
- src = src.to(device)
- src_mask = src_mask.to(device)
- # 使用模型的编码器对源数据进行编码,得到源记忆memory
- memory = model.encode(src, src_mask)
-
- # 初始化目标序列ys,填充为起始符号,并移动到设备上
- ys = torch.ones(1, 1).fill_(start_symbol).type(torch.long).to(device)
-
- # 进行循环直到达到最大长度或生成结束符号
- for i in range(max_len-1):
- # 确保memory在正确的设备上
- memory = memory.to(device)
- # 创建memory_mask以屏蔽源记忆与目标序列之间的交互
- memory_mask = torch.zeros(ys.shape[0], memory.shape[0]).to(device).type(torch.bool)
- # 生成tgt_mask以防止解码器在未来步骤中看到未来的信息
- tgt_mask = (generate_square_subsequent_mask(ys.size(0)).type(torch.bool)).to(device)
-
- # 使用模型的解码器和目标掩码对目标序列进行解码,得到输出out
- out = model.decode(ys, memory, tgt_mask)
- # 转置输出out以匹配生成器层的输入要求
- out = out.transpose(0, 1)
- # 通过模型的生成器层获取下一个词的概率分布
- prob = model.generator(out[:, -1])
- # 从概率分布中选择概率最高的词作为下一个词
- _, next_word = torch.max(prob, dim=1)
- next_word = next_word.item() # 转换为Python标量
- # 将新词添加到目标序列ys中
- ys = torch.cat([ys, torch.ones(1, 1).type_as(src.data).fill_(next_word)], dim=0)
-
- # 如果生成了结束符号,则跳出循环
- if next_word == EOS_IDX:
- break
-
- # 返回生成的目标序列
- return ys
-
- def translate(model, src, src_vocab, tgt_vocab, src_tokenizer):
- # 设置模型为评估模式
- model.eval()
-
- # 使用分词器将源语言文本编码为词索引,并添加开始和结束符号
- tokens = [BOS_IDX] + [src_vocab.stoi[tok] for tok in src_tokenizer.encode(src, out_type=str)] + [EOS_IDX]
-
- # 将词索引转换为张量src,并创建相应的源掩码src_mask
- num_tokens = len(tokens)
- src = torch.LongTensor(tokens).reshape(num_tokens, 1)
- src_mask = torch.zeros(num_tokens, num_tokens).type(torch.bool)
-
- # 调用greedy_decode函数进行解码
- # 设置最大长度为源序列长度加5,起始符号为BOS_IDX
- tgt_tokens = greedy_decode(model, src, src_mask, max_len=num_tokens + 5, start_symbol=BOS_IDX).flatten()
-
- # 将解码得到的词索引转换为目标语言文本
- # 移除开始和结束符号,并以空格连接各个词
- return " ".join([tgt_vocab.itos[tok] for tok in tgt_tokens]).replace("<bos>", "").replace("<eos>", "")
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。