当前位置:   article > 正文

Transformer应用——机器翻译(English & Chinese)_transformer 实现 中译英

transformer 实现 中译英

目录

说明

一、机器翻译

二、数据

2.1 数据集划分

2.2 数据处理

2.2.1 读取数据

2.2.2 数据类定义

2.2.3 数据加载器

2.2.4 序列化

2.2.5 测试

三、模型

3.1 模型框架

3.2 其他工具

四、训练

五、评估

​编辑

六、总结


说明

本片博客记录在学习论文《Attention is all you need》和项目《tensor2tensor》的基础上进行的实际应用情况。不做任何商业用途,绝不允许侵权行为。

论文地址:Attention is all you need

项目地址:tensor2tensor

本项目源码:github.com

学习工具:Visual Studio Code、ChatGPT-4o-mini、GitHub

学习内容:根据对论文和项目的理解,建立一个基础的transformer模型,使用中英文平行语料对其进行训练、优化、评估。

一、机器翻译

机器翻译模型是人工智能领域中的一种技术,用于自动将文本从一种语言翻译成另一种语言。

使用Transformer模型架构:

  • 序列到序列(Seq2Seq)模型:这是机器翻译中最常见的架构,由编码器(Encoder)和解码器(Decoder)组成。编码器将输入文本编码成一个固定长度的向量,解码器则根据这个向量生成目标语言的翻译。
  • 注意力机制(Attention):在Seq2Seq模型中加入注意力机制,可以使得解码器在生成每个目标单词时,能够关注到输入序列中的多个单词,从而提高翻译质量。

训练数据流:

  1. 输入模型前:将批量源序列和目标序列对token化并通过索引编码,使用pad_value=0将其补全至max_seq_length长度,矩阵形状均为[batch_size, max_seq_length]
  2. 在模型中:通过embedding、positional encode、encode、decode、fc_out等层,输出概率矩阵[batch_size, max_seq_length, vocab_size]
  3. 序列化:每一个序列可以表示为矩阵[max_seq_length, vocab_size],通过贪婪搜索策略或者最大概率策略获得序列token索引[1,max_seq_length],则批序列索引矩阵[batch_size, max_seq_length]进行解码获得输出字符串序列。

这是我对transformer模型数据流的简单理解,在下文会对相关的内容进行详细解释。

二、数据

 数据来源:5.翻译语料(translation2019zh),520万个中英文句子对

直接下载:https://drive.google.com/file/d/1EX8eE5YWBxCaohBO8Fh4e2j3b9C2bTVQ/view

数据描述:中英文平行语料520万对,包括训练集516万、验证集3.9万数据去重

每一个对,包含一个英文和对应的中文。中文或英文,多数情况是一句带标点符号的完整的话。对于一个平行的中英文对,中文平均有36个字,英文平均有19个单词(单词如“she”)。例:

{"english": "And the light breeze moves me to caress her long ear", "chinese": "微风推着我去爱抚它的长耳朵"}

2.1 数据集划分

在调试过程中,训练320条数据平均需要30秒。而训练一个中英翻译器大约需要数百万条数据,若我使用完整的训练集,那么即使epoch=1,也至少需要130个小时。目前我做这个项目是为了增强对transformer模型的理解,以及提高代码能力,并不是为了得到一个可以实用的翻译器。所以我在项目中建立了small和big文件夹,均包含训练、验证、测试数据。本次均使用small数据集训练,若今后有更好的硬件条件以及对代码优化完成,我会尝试训练完整的数据集。

small:

  • train_txt:从原训练集中提取了前960条数据
  • val_txt:原训练集的[961,1280]条数据
  • test_txt:原训练集的[1281,1600]条数据

big:

  • train.txt(训练):原训练集5161434条数据
  • val.txt(验证):原验证集前3200条数据
  • test.txt(测试):将原验证集的剩余数据(36123条)

注意到,数据集样本数大多都是32的倍数,因为我设置batch_size=32模型采用并行式训练,一次可以将32条数据传入模型,这是Transformer架构的一个重要优点。

2.2 数据处理

数据集划分好后,进行处理工作,全部用函数进行封装,在data_tool.py中

  1. import re
  2. import json
  3. import torch
  4. from torch.utils.data import Dataset, DataLoader
  5. from transformers import BertTokenizer
  6. from model import vocab_size,max_seq_length,batch_size
  7. def read_file(path) # 读取文件
  8. class TranslationDataset(Dataset) # 自定义数据集
  9. tokenizer = BertTokenizer.from_pretrained('bert-base-chinese') # tonken工具
  10. def data_loader(data_path) # 数据加载器
  11. def capitalize_first_letter_of_sentences(text) # 英语句子首字母大写
  12. def tokens_to_sequences(predicted_tokens) # 将token处理成字符串序列
  13. def output_to_seq(output=None, tgt_indices=None, test=False) # 将模型输出处理为字符串序列
  14. def test() # 测试函数

 通过这些工具,我们可以完成模型输入前、输出后的绝大部分数据处理工作,主要包括

  • 读取数据:从文件读取,并存入数据类中,并且做了填充处理;可单独访问中英文数据,用于评估时使用
  • 模型输入:根据数据类创建了加载器,可以加载批量数据用于模型调试、训练和评估
  • 模型输出:对于训练后得到概率矩阵,可以生成字符串序列

2.2.1 读取数据

原数据集为json文件,每一行为一个json对象,读取时会报错,所以直接转为了txt文件按行读取,返回字典列表。

  1. def read_file(path):
  2. """数据文件格式为txt,每一行为一个json文件, 按行读取并添加至data"""
  3. data = []
  4. try:
  5. # 打开文件
  6. with open(path, 'r', encoding='utf-8') as file:
  7. for line in file:
  8. # 去除可能的空行或额外字符
  9. clean_line = line.strip()
  10. if clean_line:
  11. try:
  12. # 将字符串转换为字典
  13. data_dict = json.loads(clean_line)
  14. data.append(data_dict)
  15. except json.JSONDecodeError as e:
  16. print(f"JSON解析错误: {e}")
  17. except FileNotFoundError as e:
  18. print(f"文件未找到: {e}")
  19. except IOError as e:
  20. print(f"文件读取错误: {e}")
  21. return data

函数说明

  • 函数名称read_file
  • 参数
    • path:一个字符串,表示要读取的文件的路径。
  • 返回值
    • data:一个列表,包含从文件中解析出来的字典。

函数功能

  1. 打开文件:使用with open(path, 'r', encoding='utf-8') as file:语句打开文件,并指定以只读模式打开,使用UTF-8编码。
  2. 逐行读取:通过for line in file:循环逐行读取文件内容。
  3. 去除空行:对每一行使用line.strip()去除可能的空白字符。
  4. 解析JSON:如果行内容非空,则尝试使用json.loads(clean_line)将字符串解析为字典。
  5. 错误处理

    1. 如果解析JSON时出错(例如,如果字符串不是有效的JSON格式),则捕获json.JSONDecodeError异常,并打印错误信息。
    2. 如果文件打开或读取时出错,则捕获FileNotFoundErrorIOError异常,并打印错误信息。
  6. 返回数据:函数最后返回包含所有解析字典的列表。

2.2.2 数据类定义

  1. class TranslationDataset(Dataset):
  2. """自定义数据集"""
  3. def __init__(self, data, tokenizer, max_length=max_seq_length):
  4. self.data = data # 数据
  5. self.English = [item['english'].lower() for item in data] # 将英文文本添加到 self.English 列表,编码需要小写化
  6. self.Chinese = [item['chinese'] for item in data] # 将中文文本添加到 self.Chinese 列表
  7. self.tokenizer = tokenizer # token化工具
  8. self.max_length = max_length # 最大序列长度
  9. def __len__(self):
  10. return len(self.data)
  11. def __getitem__(self, idx):
  12. English_encoded = self.tokenizer(self.English[idx], max_length=self.max_length, truncation=True, padding='max_length', return_tensors='pt')
  13. Chinese_encoded = self.tokenizer(self.Chinese[idx], max_length=self.max_length, truncation=True, padding='max_length', return_tensors='pt')
  14. return English_encoded['input_ids'].reshape(-1), Chinese_encoded['input_ids'].reshape(-1)

类定义

  • 类名TranslationDataset
  • 继承Dataset

类属性

  • data:包含所有数据项的列表,每个数据项是一个包含英文和中文文本的字典。
  • English:包含所有英文文本的列表,所有文本被转换为小写。
  • Chinese:包含所有中文文本的列表。
  • tokenizer:用于将文本转换为机器可读的token序列的工具。
  • max_length:序列的最大长度,超过这个长度的序列将被截断。

方法

  1. __init__:构造函数,初始化数据集。
  2. __len__:返回数据集中的数据项数量。
  3. __getitem__:根据索引返回一个数据项。

构造函数 __init__

  • 参数
    • data:数据集,是一个列表。
    • tokenizer:用于文本编码的tokenizer对象。
    • max_length:可选参数,默认为max_seq_length,表示序列的最大长度。

构造函数中,首先将英文文本转换为小写并存储在self.English列表中,中文文本则直接存储在self.Chinese列表中。同时,将tokenizer和最大序列长度保存为类的属性。

__len__ 方法

这个方法返回数据集中数据项的总数。

__getitem__ 方法

  • 参数
    • idx:索引,用于获取数据集中的特定项。

此方法使用提供的索引idx来获取英文和中文文本,然后使用tokenizer将它们编码为token序列。编码时,如果文本长度超过max_length,则会进行截断;如果文本长度不足,则会进行填充以达到max_length。编码后的结果包括input_ids,这是模型需要的输入格式。最后,返回两个编码后的input_ids,并且将它们重塑为二维数组(如果需要的话)。

2.2.3 数据加载器

  1. tokenizer = BertTokenizer.from_pretrained('bert-base-chinese')
  2. def data_loader(data_path):
  3. # 从路径读取数据
  4. data = read_file(data_path)
  5. # 创建测试数据集
  6. dataset = TranslationDataset(data, tokenizer)
  7. # 创建数据加载器
  8. loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
  9. return loader, dataset
  • tokenizer使用Hugging Face的transformers库加载预训练的bert-base-chinese模型的tokenizer。这个tokenizer将用于将文本转换为模型能够理解的token序列。

  • data_loader函数:这个函数接收一个数据列表data,并返回一个数据加载器loader和一个数据集对象dataset

    • 参数

      • data:一个列表,包含用于训练的数据项,每个数据项通常是一个包含英文和中文文本的字典。
    • 返回值

      • loader:数据加载器,用于在训练过程中迭代数据。
      • dataset:数据集对象,它是TranslationDataset的一个实例。
    • 函数内部操作

      • 根据传入的路径读取文件获得数据列表data
      • 使用data和tokenizer创建一个TranslationDataset实例。
      • 使用这个数据集实例创建一个DataLoader实例。DataLoaderbatch_size参数定义了每个批次的样本数量,shuffle参数设置为True表示在每个epoch开始时打乱数据。

注意:

"tokenizer = BertTokenizer.from_pretrained('bert-base-chinese')"可能会请求失败,再次运行即可

2.2.4 序列化

  1. def capitalize_first_letter_of_sentences(text):
  2. # 使用split方法将文本分割成句子列表,这里假设句子以点号、问号或感叹号结尾
  3. sentences = text.split('. ')
  4. sentences = [sentence.strip() for sentence in sentences if sentence.strip() != '']
  5. # 对每个句子进行处理,使其第一个单词的首字母大写
  6. capitalized_sentences = [sentence[0].upper() + sentence[1:] if sentence else '' for sentence in sentences]
  7. # 将处理后的句子列表重新组合成一个文本
  8. capitalized_text = '. '.join(capitalized_sentences) + ('.' if text.endswith('.') else '')
  9. return capitalized_text
  10. def tokens_to_sequences(predicted_tokens):
  11. # 移除 [CLS]、[SEP]、[UNK]、[PAD]、[MASK] 标志
  12. processed_sentence = predicted_tokens.replace("[CLS]", "").replace("[SEP]", "").replace("[PAD]", "").replace("[UNK]", "").replace("[MASK]", "")
  13. # 去除多余的空格
  14. processed_sentence = " ".join(processed_sentence.split())
  15. # 检测中文字符的正则表达式
  16. pattern_chinese = re.compile(r'[\\u4e00-\\u9fff]+')
  17. # 检测英文字符的正则表达式
  18. pattern_english = re.compile(r'[a-zA-Z]+')
  19. if pattern_english.findall(processed_sentence):
  20. processed_sentence = capitalize_first_letter_of_sentences(processed_sentence)
  21. if pattern_chinese.findall(processed_sentence):
  22. processed_sentence = processed_sentence.replace(" ", "")
  23. return processed_sentence

capitalize_first_letter_of_sentences 函数用于将输入文本中的每个句子的首字母大写。它首先通过 split('. ') 以 为分隔符将文本拆分成句子列表,然后去除空句子。接着,通过列表推导式将每个句子的首字母大写,并重新组合成一个新的文本。

tokens_to_sequences 函数用于处理预测得到的标记序列 predicted_tokens 。首先移除特定的标记 [CLS] 、 [SEP] 、 [UNK] 、 [PAD] 、 [MASK] ,然后去除多余的空格。之后,通过正则表达式检测文本中是否包含中文字符或英文字符,如果包含英文字符则调用 capitalize_first_letter_of_sentences 函数对文本进行首字母大写处理,如果包含中文字符则去除空格。最后返回处理后的文本。

  1. def output_to_seq(output=None, tgt_indices=None, test=False):
  2. """将输出的概率矩阵解码成序列
  3. 步骤:
  4. 1、选择每个位置的最高概率词
  5. 2、使用tokenizer解码
  6. 3、处理解码结果得到字符串序列
  7. """
  8. if tgt_indices!=None:
  9. predicted_indices = tgt_indices
  10. else:
  11. if output==None:
  12. if test==True:
  13. output = torch.randn(2, 64, vocab_size)
  14. else:
  15. print("Error: The output is None. Please provide a valid output tensor.")
  16. return
  17. print("OutputProbabilities:\n", output.size())
  18. # 选择每个位置的最高概率词
  19. predicted_indices = torch.argmax(output, dim=-1)
  20. print("OutputPredicatedIndices:\n", predicted_indices.size())
  21. print("OutputPredicatedSequences:")
  22. # 将索引转换为词,并转为字符串
  23. predicted_sequences = []
  24. for row in predicted_indices:
  25. predicted_tokens = tokenizer.decode(row)
  26. predicted_sequence = tokens_to_sequences(predicted_tokens)
  27. print(predicted_sequence)
  28. predicted_sequences.append(predicted_sequence)
  29. return predicted_sequences

这段代码定义了一个名为output_to_seq的函数,用于将模型输出的概率矩阵解码成序列。

  1. 输入参数处理

    • output:模型输出的概率矩阵。
    • tgt_indices:目标序列的索引。如果未提供,函数将尝试从output中提取。
    • test:一个布尔值,指示是否处于测试模式。在测试模式下,如果outputNone,函数将生成一个随机输出。
  2. 选择最高概率词

    • 如果tgt_indices未提供,函数将根据output来获取预测的索引。如果outputNone且处于测试模式,函数将生成一个随机的输出矩阵。
    • 使用torch.argmax(output, dim=-1)来获取每个位置的最高概率词的索引。
  3. 解码和序列转换

    • 将获取到的索引使用tokenizer.decode()转换为对应的词。
    • 使用tokens_to_sequences函数进一步处理这些词,得到最终的字符串序列。
  4. 输出和返回

    • 打印解码后的序列,并将这些序列收集到一个列表中。
    • 最后,函数返回包含所有预测序列的列表。

2.2.5 测试

  1. def test():
  2. file_path = r"data\small\train.txt"
  3. loader, dataset = data_loader(file_path)
  4. """tokenizer的编码解码测试"""
  5. print("编码:\n", dataset[0][0])
  6. decoded_text = tokenizer.decode(dataset[0][0])
  7. print("英文源文本:\n", dataset.data[0]['english'])
  8. print("解码:\n", decoded_text)
  9. print("序列化:\n", tokens_to_sequences(decoded_text))
  10. print("编码:\n", dataset[0][1])
  11. decoded_text = tokenizer.decode(dataset[0][1])
  12. print("中文源文本:\n", dataset.data[0]['chinese'])
  13. print("解码:\n",decoded_text)
  14. print("序列化:\n", tokens_to_sequences(decoded_text))
  15. """output_to_seq测试"""
  16. output_to_seq(test=True)

这段代码定义了一个名为test的函数,用于测试数据加载、tokenizer的编码解码功能以及output_to_seq函数。以下是代码的详细解释:

  1. 数据加载

    • file_path变量定义了训练数据的文件路径。
    • data_loader(file_path)返回一个loader对象和dataset对象
    • dataset[0][0]dataset[0][1]假设是从dataset中获取的两个样本。
  2. tokenizer的编码解码测试

    • 打印第一对样本英文的编码结果。
    • 使用tokenizer的decode方法将编码结果解码回文本,并打印英文源文本。
    • 打印解码后的文本,并使用tokens_to_sequences函数将其序列化,然后打印序列化结果。
    • 重复上述步骤,但针对第一个样本对的中文进行输出。
  3. output_to_seq测试

    • 调用output_to_seq(test=True)函数进行测试。这里test=True意味着如果outputNone,函数将生成一个随机的输出矩阵。

测试结果

我们可以看到,中英文序列会被编码为tensor,每一个token对应一个索引,并且补齐至最大长度,解码后的输出需要经过处理才能得到正常的字符串序列。

 在测试下,会随机生成概率矩阵,接着通过最大概率获得索引矩阵,最后根据索引解码并处理得到字符串序列。由于是随机生成的内容,所以最终得到的序列为乱码。

好了,到目前为止,数据处理的工作差不多就完成了,接下来开始建模工作

三、模型

首先查看一下GPU信息,这点非常重要!

  1. def get_gpu_info():
  2. # 查看GPU情况
  3. try:
  4. # 使用subprocess调用nvidia-smi命令
  5. result = subprocess.run(['nvidia-smi', '--query-gpu=name,memory.total', '--format=csv,noheader'],
  6. capture_output=True, text=True)
  7. output = result.stdout.strip().split('\\n')
  8. # 提取GPU信息
  9. gpu_info = {}
  10. for line in output:
  11. if line:
  12. name, memory = line.split(', ')
  13. gpu_info[name] = memory
  14. print(f"GPU型号: {name}, 内存: {memory}")
  15. print(f"GPU数量: {len(gpu_info)}")
  16. except Exception as e:
  17. print('Error:', str(e))

这个结果意味着,不能进行多GPU分布式计算,不能用Flash Attention进行注意力计算加速(至少RTX 3060),并且也不能用较大的batch_size。过于贫穷......

UserWarning: 1Torch was not compiled with flash attention. (Triggered internally at C:\cb\pytorch_1000000000000\work\aten\src\AT00\work\aten\src\ATen\native\transformers\cuda\sdp_utils.cpp:555.)

 这个是训练过程中的警告,但是硬件都不支持Flash Attention,就没必要去配环境了(然而我搞了好久)

至于怎么用GPU,需要NVIDIA显卡、cuda、cudNN、对应的pytorch版本等等,不是本篇博客的重点,请自行上网搜索,配置环境。

3.1 模型框架

还是这张熟悉的图片,读完一部分核心源码之后,我本打算手搓一个编码器-解码器结构,但是考虑到时间成本和收益,我还是决定使用现有的编码器层、解码器层。

  1. import math
  2. import time
  3. import torch
  4. import torch.nn as nn
  5. import torch.nn.functional as F
  6. from torch.optim.lr_scheduler import ReduceLROnPlateau
  7. import matplotlib.pyplot as plt
  8. from data_tool import *
  9. # 模型超参数
  10. embedding_dim = 512 # 嵌入维度
  11. n_heads = 8 # 多头注意力头数
  12. n_layers = 6 # 编码器和解码器层数
  13. class Transformer(nn.Module):
  14. """一个标准的transformer模型
  15. 包括了嵌入、位置编码、编码器层、编码器、解码器层、解码器等模块
  16. 向前传播步骤:
  17. 1. 对源序列和目标序列进行嵌入操作, 嵌入维度512
  18. 2. 增加位置编码
  19. 3. 丢弃部分数据,防止过拟合
  20. 4. 编码
  21. 5. 解码
  22. 6. 计算输出
  23. """
  24. def __init__(self, vocab_size, embedding_dim, n_heads, n_layers, max_seq_length, dropout=0.1):
  25. super(Transformer, self).__init__() # 继承父类nn.Module
  26. self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") # 获取设备
  27. self.embedding = nn.Embedding(vocab_size, embedding_dim).to(self.device) # 嵌入矩阵
  28. self.positional_encoding = self.create_positional_encoding(embedding_dim, max_seq_length).to(self.device) # 位置编码
  29. encoder_layer = nn.TransformerEncoderLayer(embedding_dim, n_heads, dim_feedforward=2048, dropout=dropout, batch_first=True).to(self.device) # 编码层
  30. self.encoder = nn.TransformerEncoder(encoder_layer, n_layers).to(self.device) # 编码器
  31. decoder_layer = nn.TransformerDecoderLayer(embedding_dim, n_heads, dim_feedforward=2048, dropout=dropout, batch_first=True).to(self.device) # 解码层
  32. self.decoder = nn.TransformerDecoder(decoder_layer, n_layers).to(self.device) # 解码器
  33. self.fc_out = nn.Linear(embedding_dim, vocab_size).to(self.device) # 输出层
  34. self.dropout = nn.Dropout(dropout).to(self.device) # 丢弃
  35. def create_positional_encoding(self, embedding_dim, max_seq_length):
  36. positional_encoding = torch.zeros(max_seq_length, embedding_dim) # 初始化位置编码矩阵
  37. position = torch.arange(0, max_seq_length, dtype=torch.float).unsqueeze(1) # 位置信息 计算公式分子
  38. div_term = torch.exp(torch.arange(0, embedding_dim, 2).float() * (-math.log(10000.0) / embedding_dim)) # 计算公式分母
  39. positional_encoding[:, 0::2] = torch.sin(position * div_term)
  40. positional_encoding[:, 1::2] = torch.cos(position * div_term)
  41. return positional_encoding
  42. def forward(self, src, tgt, src_mask=None, tgt_mask=None, memory_mask=None):
  43. # 源序列、目标序列嵌入
  44. src_embed = self.embedding(src) # [batch_size, max_seq_length, embedding_dim]
  45. tgt_embed = self.embedding(tgt) # [batch_size, max_seq_length, embedding_dim]
  46. # 生成位置编码
  47. batch_size = src.size(0)
  48. posit = self.positional_encoding.unsqueeze(0).expand(batch_size, -1, -1) # 扩展至批次维度[batch_size, max_seq_length, embedding_dim]
  49. src_temp = src_embed + posit
  50. tgt_temp = tgt_embed + posit
  51. # 丢弃部分数据,避免过拟合
  52. src_temp = self.dropout(src_temp)
  53. tgt_temp = self.dropout(tgt_temp)
  54. memory = self.encoder(src_temp, src_mask) # 编码
  55. output = self.decoder(tgt_temp, memory, tgt_mask, memory_mask) # 解码
  56. output = self.fc_out(output) # 计算输出
  57. return output
  58. def generate(self, src, beam_size=5, early_stopping=True)

类定义:这个类继承自nn.Module,用于创建一个Transformer模型。

class Transformer(nn.Module):

构造函数__init__:构造函数初始化模型的参数。

def __init__(self, vocab_size, embedding_dim, n_heads, n_layers, max_seq_length, dropout=0.1):

参数:

  • vocab_size:词汇表大小。
  • embedding_dim:嵌入维度。
  • n_heads:多头注意力的头数。
  • n_layers:编码器和解码器的层数。
  • max_seq_length:序列的最大长度。
  • dropout:丢弃率,用于防止过拟合,默认为0.1。

设备选择:根据是否有可用的GPU,选择设备(CPU或GPU)。

self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

嵌入层:创建一个嵌入层,用于将输入序列转换为嵌入向量。

self.embedding = nn.Embedding(vocab_size, embedding_dim).to(self.device)

位置编码:创建位置编码矩阵,用于在序列的每个位置上添加额外的信息。

self.positional_encoding = self.create_positional_encoding(embedding_dim, max_seq_length).to(self.device)

编码器和解码器层:使用了PyTorch的TransformerEncoderLayerTransformerDecoderLayer

  1. encoder_layer = nn.TransformerEncoderLayer(embedding_dim, n_heads, dim_feedforward=2048, dropout=dropout, batch_first=True).to(self.device)
  2. self.encoder = nn.TransformerEncoder(encoder_layer, n_layers).to(self.device)
  3. decoder_layer = nn.TransformerDecoderLayer(embedding_dim, n_heads, dim_feedforward=2048, dropout=dropout, batch_first=True).to(self.device)
  4. self.decoder = nn.TransformerDecoder(decoder_layer, n_layers).to(self.device)

输出层:创建一个线性层,用于将解码器输出转换为预测的输出分布。

self.fc_out = nn.Linear(embedding_dim, vocab_size).to(self.device)

丢弃层:创建一个丢弃层,用于在嵌入层和解码器输入层应用丢弃策略。

self.dropout = nn.Dropout(dropout).to(self.device)

前向传播函数forward

def forward(self, src, tgt, src_mask=None, tgt_mask=None, memory_mask=None):

前向传播函数接收源序列src、目标序列tgt以及各种掩码(src_masktgt_maskmemory_mask),并执行以下操作:

  1. 对源序列和目标序列进行嵌入。
  2. 添加位置编码。
  3. 对嵌入后的序列应用丢弃层。
  4. 使用编码器对源序列进行编码。
  5. 使用解码器对目标序列进行解码。
  6. 通过输出层计算最终的输出。

生成函数generate

def generate(self, src, beam_size=5, early_stopping=True)

生成函数用于生成序列输出。接收源序列src和可选参数beam_size(用于贝叶斯搜索的大小)以及early_stopping(是否在生成过程中提前停止)。这个函数目前还没有调试成功。

3.2 其他工具

  1. def shift_right(tensor, pad_value=0):
  2. """Decoder输入tgt右移处理"""
  3. # 获取张量的形状
  4. batch_size, seq_length = tensor.size()
  5. # 创建一个新的张量,填充 pad_value
  6. shifted_tensor = torch.full((batch_size, seq_length), pad_value, dtype=tensor.dtype)
  7. # 将原始张量的内容复制到新的张量中,向右移动一位
  8. shifted_tensor[:, 1:] = tensor[:, :-1]
  9. return shifted_tensor
  10. def get_path(type,test):
  11. if type == 0 and test == True:
  12. path = r"model_save\small\English_to_Chinese_model.pth"
  13. elif type == 0 and test == False:
  14. path = r"model_save\big\English_to_Chinese_model.pth"
  15. elif type == 1 and test == True:
  16. path = r"model_save\small\Chinese_to_English_model.pth"
  17. elif type == 1 and test == False:
  18. path = r"model_save\big\Chinese_to_English_model.pth"
  19. return path
  20. def save_model(model, type=0, test=False):
  21. """保存模型"""
  22. save_path = get_path(type,test)
  23. # 保存模型
  24. torch.save(model.state_dict(), save_path)
  25. print("已保存模型至",save_path)
  26. def load_model(type, test=False):
  27. """加载模型"""
  28. load_path = get_path(type,test)
  29. # 加载模型
  30. model = Transformer(vocab_size, embedding_dim, n_heads, n_layers, max_seq_length)
  31. model = model.to(model.device)
  32. model.load_state_dict(torch.load(load_path))
  33. return model
  34. def get_src_tgt(English, Chinese, model, type=0):
  35. if type==0:
  36. src, tgt = English, Chinese
  37. else:
  38. src, tgt = Chinese, English
  39. shifted_right_tgt = shift_right(tgt) # tgt右移
  40. src = src.to(model.device) # 放入GPU
  41. shifted_right_tgt = shifted_right_tgt.to(model.device) # 放入GPU
  42. return src,shifted_right_tgt
  43. def create_tgt_mask(tgt,model):
  44. length = len(tgt)
  45. tgt_mask = torch.triu(torch.ones(max_seq_length, max_seq_length,dtype=bool), diagonal=1).unsqueeze(0).expand(length*n_heads, -1, -1).to(model.device)
  46. return tgt_mask
  47. def create_src_mask(src,model):
  48. return
  49. def create_memory_mask(src,model):
  50. return
  51. def draw_loss(loss_values):
  52. # 绘制平均损失曲线图
  53. plt.figure(figsize=(10, 5))
  54. plt.plot(loss_values, label='Average Loss Every 10 Batchs')
  55. plt.title('Average Loss Over Batchs Every 10 Batchs')
  56. plt.xlabel('Batch (Every 10 Batch)')
  57. plt.ylabel('Average Loss')
  58. plt.legend()
  59. plt.grid(True)
  60. plt.show()

四、训练

  1. def model_train(train_loader, val_loader, type=0, num_epochs=3, test=False):
  2. """训练模型
  3. Args:
  4. data_loader:训练数据加载器
  5. type: 训练类型, 0——English to Chinese, 1——Chinese to English
  6. epochs: 训练轮数
  7. test: 是否为测试
  8. UserWarning: Torch was not compiled with flash attention.
  9. FlashAttention only supports Ampere GPUs or newer. 至少RTX 3060才能跑得起来。
  10. 本机器暗影精灵7 RTX 3050, 硬件不支持
  11. 1. 训练步骤
  12. 2. 早停机制:连续 patience 个 epoch 验证集损失没有下降就停止训练
  13. 3. 损失曲线绘制
  14. """
  15. total_training_time = 0 # 初始化总训练时间
  16. loss_values = [] # 损失列表
  17. validation_frequency = 1000 # 模型验证周期
  18. patience = 5 # 如果连续5次验证集损失没有改善,则停止
  19. best_val_loss = float('inf') # 初始化最佳验证损失
  20. patience_counter = 0 # 早停计数器
  21. if test==True:
  22. num_epochs = 10
  23. validation_frequency = 30
  24. # 1. 创建一个Transformer模型
  25. model = Transformer(vocab_size, embedding_dim, n_heads, n_layers, max_seq_length)
  26. model = model.to(model.device) # 放入GPU
  27. # 2. 定义损失函数和优化器
  28. criterion = nn.CrossEntropyLoss(ignore_index=0)
  29. optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)
  30. # 3. 定义学习率调度器
  31. scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=3, threshold=0.0001, cooldown=1, min_lr=1e-8)
  32. # 4. 循环训练
  33. for epoch in range(num_epochs):
  34. num_batches = 0 # 记录批次数
  35. start_time = time.time() # 记录训练开始时间
  36. print(f"epoch [{epoch+1}/{num_epochs}]")
  37. batch_loss_10 = 0.0
  38. for English, Chinese in train_loader:
  39. num_batches += 1 # 记录批次
  40. src, tgt = get_src_tgt(English,Chinese,model,type)
  41. tgt_mask = create_tgt_mask(tgt,model)
  42. # 前向传播、计算损失、反向传播
  43. optimizer.zero_grad()
  44. output = model(src, tgt, tgt_mask)
  45. loss = criterion(output.view(-1, vocab_size), tgt.view(-1))
  46. loss.backward()
  47. optimizer.step()
  48. batch_loss_10 += loss.item()
  49. if num_batches % 10 == 0:
  50. # 训练时间、损失记录和日志输出
  51. average_loss = batch_loss_10 / 10
  52. loss_values.append(average_loss)
  53. end_time = time.time()
  54. training_time = end_time - start_time
  55. total_training_time += training_time
  56. print(f"[Batch {num_batches}], Training Time: {training_time:.2f} seconds")
  57. print(f"[Batch {num_batches}], Average Loss : {average_loss:.4f}")
  58. start_time = time.time()
  59. batch_loss_10 = 0.0
  60. if num_batches % validation_frequency == 0:
  61. # 验证、早停
  62. model.eval()
  63. with torch.no_grad():
  64. num_val_batchs = 0
  65. val_ave_loss = 0.0
  66. for English, Chinese in val_loader:
  67. num_val_batchs += 1
  68. val_src, val_tgt = get_src_tgt(English, Chinese, model, type)
  69. tgt_mask = create_tgt_mask(tgt,model)
  70. val_output = model(val_src,val_tgt, tgt_mask)
  71. val_loss = criterion(val_output.view(-1, vocab_size), val_tgt.view(-1))
  72. val_ave_loss += val_loss.item() / len(val_src)
  73. # 保存最佳验证损失和模型
  74. if val_ave_loss < best_val_loss:
  75. best_val_loss = val_ave_loss
  76. patience_counter = 0 # 重置计数器
  77. else:
  78. patience_counter += 1
  79. # 打印当前批次的损失
  80. print("This is a Valuation: ")
  81. print(f"Last Batch Loss: {loss}")
  82. print(f"Validation Loss: {val_loss}")
  83. # 检查是否满足早停条件
  84. if patience_counter >= patience:
  85. print("Early stopping!")
  86. break
  87. else:
  88. print("Continue!")
  89. # 更新学习率
  90. scheduler.step(loss.item())
  91. print(f"The Last Loss: {loss}")
  92. print(f"Total Training Time: {total_training_time:.2f} seconds")
  93. # 损失函数图
  94. draw_loss(loss_values)
  95. # 保存模型
  96. save_model(model,type,test)

这段代码定义了一个函数model_train,用于训练一个Transformer模型。以下是该函数的详细解释:

参数

  • train_loader: 训练数据的加载器。
  • val_loader: 验证数据的加载器。
  • type: 训练类型,0代表英文到中文,1代表中文到英文。
  • num_epochs: 训练的轮数,默认为3。
  • test: 是否为测试模式,默认为False。

变量

  • total_training_time: 总训练时间。
  • loss_values: 存储每个批次的损失值。
  • validation_frequency: 模型验证的频率。
  • patience: 早停机制的耐心值,默认为5。
  • best_val_loss: 最佳验证损失,初始化为无穷大。
  • patience_counter: 早停计数器。

训练流程

  1. 初始化模型、损失函数、优化器和调度器。
  2. 在每个epoch中,遍历训练数据,进行前向传播、损失计算和反向传播。
  3. 每隔一定批次,输出训练时间和平均损失。
  4. 每隔validation_frequency批次,进行模型验证,并根据验证损失更新学习率和早停计数器。
  5. 如果连续patience个epoch验证损失没有下降,则触发早停机制。

使用small数据训练这个模型(训练数据只有960条,batch_size=32,只有30个批次),设置test=True即为测试模型,epoch=10,每训练10个批次打印一次损失和时间,每30个批次进行一次验证,验证时不会对模型参数进行更新。

  1. def train(train_data_path, val_data_path, test=False):
  2. print("----------模型训练测试----------")
  3. train_loader, train_dataset = data_loader(train_data_path)
  4. val_loader, val_dataset = data_loader(val_data_path)
  5. model_train(train_loader,val_loader,test=test)
  6. if __name__=="__main__":
  7. train_data_path = r"data\small\train.txt"
  8. val_data_path = r"data\small\val.txt"
  9. train(train_data_path,val_data_path,test=True)

从结果上看,模型成功跑起来了,loss也有明显下降趋势。

五、评估

BLEU(Bilingual Evaluation Understudy)是一种用于评估机器翻译质量的指标。它是由NIST(美国国家标准与技术研究院)提出的,用于衡量机器翻译的忠实度和流畅度。BLEU得分通常介于0到1之间,得分越高表示翻译质量越好。

  1. import torch
  2. import model
  3. import data_tool
  4. import nltk
  5. from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction
  6. # 确保已经下载了nltk的corpus
  7. # nltk.download('punkt')
  8. def calculate_bleu_scores(translated_sentences, reference_sentences,type=0,test=False):
  9. bleu_scores = [] # 用于存储每个句子对的 BLEU 分数
  10. # 中英互译平滑函数选择
  11. if type==0:
  12. smoothing_function=SmoothingFunction().method4
  13. else:
  14. smoothing_function=SmoothingFunction().method2
  15. for i, (trans, ref) in enumerate(zip(translated_sentences, reference_sentences)):
  16. print(trans)
  17. print(ref)
  18. # 对翻译和参考句子进行分词
  19. trans_tokens = nltk.word_tokenize(trans)
  20. ref_tokens = nltk.word_tokenize(ref)
  21. # 计算句子对的 BLEU 分数,这里使用了默认的权重 (0.25, 0.25, 0.25, 0.25)
  22. bleu_score = sentence_bleu([ref_tokens], trans_tokens, weights=(0.25, 0.25, 0.25, 0.25), smoothing_function=smoothing_function)
  23. bleu_scores.append(bleu_score)
  24. if test==True: print(f"句子 {i+1} BLEU 分数: {bleu_score:.10f}")
  25. return bleu_scores
  26. def bleu_test():
  27. references = ["我是学生", "我喜欢吃苹果"]
  28. hypothesis = ["我是学生", "我不喜欢吃苹果"]
  29. print("英译中:")
  30. scores = calculate_bleu_scores(hypothesis, references,0,True)
  31. # 计算平均BLEU分数
  32. average_score = sum(scores) / len(scores)
  33. print(f"所有生成翻译的平均BLEU分数为: {average_score}")
  34. references = ["This is a test sentence.", "I like apples."]
  35. hypothesis = ["This is a test sentence.", "I love apples."]
  36. print("中译英:")
  37. scores = calculate_bleu_scores(hypothesis, references,1,True)
  38. # 计算平均BLEU分数
  39. average_score = sum(scores) / len(scores)
  40. print(f"所有生成翻译的平均BLEU分数为: {average_score}")

 中英互译测试结果:

接下来我们使用测试数据集

  1. def src_tgt_bleu_score(test_data_path, type=0, test=False):
  2. """在训练结束后,加载模型, 导入验证集, 将src和tgt输入模型,计算输出BLEU"""
  3. test_loader, test_dataset = model.data_loader(test_data_path)
  4. transformer = model.load_model(type,test)
  5. num_batchs = 0
  6. total_average_score = 0
  7. transformer.eval()
  8. with torch.no_grad():
  9. for English, Chinese in test_loader:
  10. num_batchs+=1
  11. test_src, test_tgt = model.get_src_tgt(English, Chinese, transformer, type)
  12. print(f"Batch {num_batchs} has {len(test_src)} samples")
  13. tgt_mask = model.create_tgt_mask(test_tgt, transformer)
  14. test_output = transformer(test_src, test_tgt, tgt_mask)
  15. references = data_tool.output_to_seq(tgt_indices=test_tgt)
  16. hypothesis = data_tool.output_to_seq(test_output)
  17. scores = calculate_bleu_scores(hypothesis, references,type)
  18. average_score = sum(scores) / len(scores)
  19. total_average_score += average_score
  20. print(f"平均BLEU分数为: {average_score}")
  21. print(f"所有生成翻译的平均BLEU分数为: {(total_average_score/num_batchs):.4f}")

训练10轮在测试集第10批次的结果:

训练5轮测试结果:

对比可得,在small\train.txt这个小数据集上,10轮训练的结果远好于5轮训练的结果。并且,仔细观察不难看出,几乎只有完全预测正确的句子会得1分,存在个别错别字则为0分,导致平均得分非常低,我认为是我的SmoothingFunction().method选择问题。

六、总结

到目前为止,整个项目是一个半成品状态:

  1. generate函数没有调试通过,希望此函数可以仅根据src序列生成tgt序列,并且完成测试
  2. 模型没有在大规模的数据集进行训练
  3. 模型的超参数还可以进一步调整
  4. src_mask和memory_mask没有实现(影响不是特别大)

但是我很满意已经得到的成果,通过实际操作,我理解了Transformer模型的完整数据流,以及tensor2tensor模型的过程。代码我放在了GitHub仓库,有兴趣的朋友可以查看,也欢迎大家一起交流经验。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/972693
推荐阅读
相关标签
  

闽ICP备14008679号