赞
踩
Transformer是谷歌在2017年的一篇论文"Attention is all you need"提出的一个seq2seq的模型架构,其创造性的提出了自注意力的思想,可以很好的表达序列中各个单词之间的相互注意力关系。这个模型在NLP领域取得了巨大的成功。此外这个模型架构在最近几年也在CV领域取得了令人瞩目的进展,在图像识别,目标检测等方面都达到或超过CNN模型的性能。因此Transformer可以说是人工智能领域最近最值得关注和学习的一个架构。目前有网上已经有很多文章详细解读了Transformer的架构和其细节,这里我将不再重复这方面的内容,而是关注在实战方面,基于Tensorflow来搭建一个Transformer模型,实现法语和英语的翻译。
在Tensorflow的官网上有一个详细的教程,介绍了如何搭建Tranformer来实现葡萄牙语翻译为英语。我也是学习了这个教程之后,进行一些改造,以实现对法语-英语的翻译。
以下是本代码需要导入的库
- import re
- import tensorflow as tf
- from tensorflow_text.tools.wordpiece_vocab import bert_vocab_from_dataset as bert_vocab
- import tensorflow_text as text
- import pandas as pd
- import random
- import numpy as np
- import matplotlib.pyplot as plt
- import time
在这个网站Tab-delimited Bilingual Sentence Pairs from the Tatoeba Project (Good for Anki and Similar Flashcard Applications)可以找到很多不同的语言与英语的翻译。这里我们下载法语-英语的数据作为训练集和验证集。下载http://www.manythings.org/anki/fra-eng.zip这个文件并解压之后,我们可以看到里面每一行对应一个英语句子和一个法语句子,以及句子的贡献者,中间以TAB分隔。
以下代码是读取文件的数据并查看法语和英语的句子:
- fra = []
- eng = []
- with open('fra.txt', 'r') as f:
- content = f.readlines()
- for line in content:
- temp = line.split(sep='\t')
- eng.append(temp[0])
- fra.append(temp[1])
查看这些句子,可以看到有些句子包含特殊字符,例如'Cours\u202f!' 我们需要把这些特殊的不可见字符(\u202f, \xa0 ...)去除掉
- new_fra = []
- new_eng = []
- for item in fra:
- new_fra.append(re.sub('\s', ' ', item).strip().lower())
- for item in eng:
- new_eng.append(re.sub('\s', ' ', item).strip().lower())
因为模型只能处理数字,需要把这些法语和英语的单词转为token。这里采用BERT tokenizer的方式来处理,具体可以参见tensorflow的教程Subword tokenizers | Text | TensorFlow
首先创建两个dataset,分别包含了法语和英语的句子。
- ds_fra = tf.data.Dataset.from_tensor_slices(new_fra)
- ds_eng = tf.data.Dataset.from_tensor_slices(new_eng)
调用tensorflow的bert_vocab库来创建词汇表,这里定义了一些保留token用于特殊目的,例如[START]标识句子的开始,[UNK]标识一个不在词汇表出现的新单词。
- bert_tokenizer_params=dict(lower_case=True)
- reserved_tokens=["[PAD]", "[UNK]", "[START]", "[END]"]
-
- bert_vocab_args = dict(
- # The target vocabulary size
- vocab_size = 8000,
- # Reserved tokens that must be included in the vocabulary
- reserved_tokens=reserved_tokens,
- # Arguments for `text.BertTokenizer`
- bert_tokenizer_params=bert_tokenizer_params,
- # Arguments for `wordpiece_vocab.wordpiece_tokenizer_learner_lib.learn`
- learn_params={},
- )
-
- fr_vocab = bert_vocab.bert_vocab_from_dataset(
- ds_fra.batch(1000).prefetch(2),
- **bert_vocab_args
- )
-
- en_vocab = bert_vocab.bert_vocab_from_dataset(
- ds_eng.batch(1000).prefetch(2),
- **bert_vocab_args
- )
词汇表处理完成之后,我们可以看看里面包含哪些内容:
- print(en_vocab[:10])
- print(en_vocab[100:110])
- print(en_vocab[1000:1010])
- print(en_vocab[-10:])
输出如下,可以看到词汇表不是严格按照每个英语单词来划分的,例如'##ers'表示某个单词如果以ers结尾,则会划分出一个'##ers'的token
- ['[PAD]', '[UNK]', '[START]', '[END]', '!', '"', '$', '%', '&', "'"]
- ['ll', 'there', 've', 'and', 'him', 'time', 'here', 'about', 'get', 'didn']
- ['##ers', 'chair', 'earth', 'honest', 'succeed', '##ted', 'animals', 'bill', 'drank', 'lend']
- ['##?', '##j', '##q', '##z', '##°', '##–', '##—', '##‘', '##’', '##€']
把词汇表保存为文件,然后我们就可以实例化两个tokenizer,以实现对法语和英语句子的token化处理。
- def write_vocab_file(filepath, vocab):
- with open(filepath, 'w') as f:
- for token in vocab:
- print(token, file=f)
- write_vocab_file('fr_vocab.txt', fr_vocab)
- write_vocab_file('en_vocab.txt', en_vocab)
-
- fr_tokenizer = text.BertTokenizer('fr_vocab.txt', **bert_tokenizer_params)
- en_tokenizer = text.BertTokenizer('en_vocab.txt', **bert_tokenizer_params)
下面我们可以测试一下对一些英语句子进行token处理后的结果,这里我们需要给每个句子的开头和结尾分别加上[START]和[END]这两个特殊的token,这样可以方便以后模型的训练。
- START = tf.argmax(tf.constant(reserved_tokens) == "[START]")
- END = tf.argmax(tf.constant(reserved_tokens) == "[END]")
-
- def add_start_end(ragged):
- count = ragged.bounding_shape()[0]
- starts = tf.fill([count,1], START)
- ends = tf.fill([count,1], END)
- return tf.concat([starts, ragged, ends], axis=1)
-
- sentences = ["Hello Roy!", "The sky is blue.", "Nice to meet you!"]
-
- add_start_end(en_tokenizer.tokenize(sentences).merge_dims(1,2)).to_tensor()
输出结果如下:
- <tf.Tensor: shape=(3, 7), dtype=int64, numpy=
- array([[ 2, 1830, 45, 3450, 4, 3, 0],
- [ 2, 62, 1132, 64, 996, 13, 3],
- [ 2, 353, 61, 416, 60, 4, 3]])>
现在我们可以构建训练集和验证集了。这里需要把法语和英语的句子都包括在数据集中,其中法语句子作为Transformer编码器的输入,英语句子作为解码器的输入以及模型输出的Target。这里我们用Pandas构造一个Dataframe,随机划分其中80%的数据为训练集,其余为验证集。然后转换为Tensorflow的dataset
- df = pd.DataFrame(data={'fra':new_fra, 'eng':new_eng})
-
- # Shuffle the Dataframe
- recordnum = df.count()['fra']
- indexlist = list(range(recordnum-1))
- random.shuffle(indexlist)
- df_train = df.loc[indexlist[:int(recordnum*0.8)]]
- df_val = df.loc[indexlist[int(recordnum*0.8):]]
-
- ds_train = tf.data.Dataset.from_tensor_slices((df_train.fra.values, df_train.eng.values))
- ds_val = tf.data.Dataset.from_tensor_slices((df_val.fra.values, df_val.eng.values))
查看训练集的句子最多包含多少个token
- lengths = []
-
- for fr_examples, en_examples in ds_train.batch(1024):
- fr_tokens = fr_tokenizer.tokenize(fr_examples)
- lengths.append(fr_tokens.row_lengths())
-
- en_tokens = en_tokenizer.tokenize(en_examples)
- lengths.append(en_tokens.row_lengths())
- print('.', end='', flush=True)
-
- all_lengths = np.concatenate(lengths)
-
- plt.hist(all_lengths, np.linspace(0, 100, 11))
- plt.ylim(plt.ylim())
- max_length = max(all_lengths)
- plt.plot([max_length, max_length], plt.ylim())
- plt.title(f'Max tokens per example: {max_length}');
从结果中可以看到训练集的句子转换为token后最多包含67个token:
之后就可以为数据集生成batch,如以下代码:
- BUFFER_SIZE = 20000
- BATCH_SIZE = 64
- MAX_TOKENS = 67
-
- def filter_max_tokens(fr, en):
- num_tokens = tf.maximum(tf.shape(fr)[1],tf.shape(en)[1])
- return num_tokens < MAX_TOKENS
-
- def tokenize_pairs(fr, en):
- fr = add_start_end(fr_tokenizer.tokenize(fr).merge_dims(1,2))
- # Convert from ragged to dense, padding with zeros.
- fr = fr.to_tensor()
-
- en = add_start_end(en_tokenizer.tokenize(en).merge_dims(1,2))
- # Convert from ragged to dense, padding with zeros.
- en = en.to_tensor()
- return fr, en
-
- def make_batches(ds):
- return (
- ds
- .cache()
- .shuffle(BUFFER_SIZE)
- .batch(BATCH_SIZE)
- .map(tokenize_pairs, num_parallel_calls=tf.data.AUTOTUNE)
- .filter(filter_max_tokens)
- .prefetch(tf.data.AUTOTUNE))
-
- train_batches = make_batches(ds_train)
- val_batches = make_batches(ds_val)
可以生成一个batch来查看一下:
- for a in train_batches.take(1):
- print(a)
结果如下,可见每个batch包含两个tensor,分别对应法语和英语句子转化为token之后的向量,每个句子以token 2开头,以token 3结尾:
- (<tf.Tensor: shape=(64, 24), dtype=int64, numpy=
- array([[ 2, 39, 9, ..., 0, 0, 0],
- [ 2, 62, 43, ..., 0, 0, 0],
- [ 2, 147, 70, ..., 0, 0, 0],
- ...,
- [ 2, 4310, 14, ..., 0, 0, 0],
- [ 2, 39, 9, ..., 0, 0, 0],
- [ 2, 68, 64, ..., 0, 0, 0]])>, <tf.Tensor: shape=(64, 20), dtype=int64, numpy=
- array([[ 2, 36, 76, ..., 0, 0, 0],
- [ 2, 36, 75, ..., 0, 0, 0],
- [ 2, 92, 80, ..., 0, 0, 0],
- ...,
- [ 2, 68, 60, ..., 0, 0, 0],
- [ 2, 36, 75, ..., 0, 0, 0],
- [ 2, 67, 9, ..., 0, 0, 0]])>)
把上面得到的batch数据输入到embedding层,就可以把每个token转化为一个高位向量,例如转换为一个128维的向量。之后我们需要给这个向量增加一个位置信息以表示这个token在句子中的位置。论文给出了一种对位置信息进行编码的方法,如以下的公式:
公式中pos表示词语的位置,例如一个句子有50个单词,pos取值范围为0-49. d_model表示embedding的维度,例如把每个单词映射为一个128维的向量,d_model=128. i表示这128维里面的维度,取值范围为0-127
因此公式的含义为,对第N个单词,在其128维的嵌入向量中,每个维度都加上对应的位置信息.
以第3个单词为例,pos=2, 在其对应的128维向量,其偶数维(0,2,4...)需要加上sin(2/10000^(2i/128)),2i的对应取值是(0,2,4...). 第2i+1维(1,3,5...)需要加上cos(2/10000^(2i/128)),2i的对应取值是(0,2,4...)
以下代码将生成位置编码向量,这个向量可以加入到token的嵌入向量中。
- def get_angles(pos, i, d_model):
- angle_rates = 1 / np.power(10000, (2 * (i//2)) / np.float32(d_model))
- return pos * angle_rates
-
- def positional_encoding(position, d_model):
- angle_rads = get_angles(np.arange(position)[:, np.newaxis],
- np.arange(d_model)[np.newaxis, :],
- d_model)
-
- # apply sin to even indices in the array; 2i
- angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])
-
- # apply cos to odd indices in the array; 2i+1
- angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])
-
- pos_encoding = angle_rads[np.newaxis, ...]
-
- return tf.cast(pos_encoding, dtype=tf.float32)
Mask用于标识输入序列中为0的位置,如果为0,则Mask为1. 这样可以使得padding的字符不会参与到模型的训练中
Look ahead mask是用于在预测是掩盖未来的字符,例如翻译一句法语,对应的英语是目标数据,在训练时,当预测第一个英语单词时,需要把整句英语都掩盖,当预测第二个英语单词时,需要把整句英语的第一个单词之后的都掩盖。这个目的是避免让模型看到之后要预测的单词,影响模型的训练。
- def create_padding_mask(seq):
- seq = tf.cast(tf.math.equal(seq, 0), tf.float32)
-
- # add extra dimensions to add the padding
- # to the attention logits.
- return seq[:, tf.newaxis, tf.newaxis, :] # (batch_size, 1, 1, seq_len)
-
- def create_look_ahead_mask(size):
- mask = 1 - tf.linalg.band_part(tf.ones((size, size)), -1, 0)
- return mask # (seq_len, seq_len)
现在来到了Transformer的核心概念了,我们需要把输入的向量,通过三个线性转换的矩阵,把它变为Q,K,V三个向量。
通过计算Q和K的相似性来得到注意力系数,再和V相乘,得到对应的数值,如以下的图片:
注意力权重的计算公式如下:
解释一下这个公式,这里的K和V代表了Key和Value,Q是查询的内容。假设有一句话"Tom is a boy",这句话有4个单词,也就是4个token。通过线性变换之后,每个token都有对应的Q,K, V。当用Tom这个token的Q来做查询时,将比较这个token的Q值与所有4个token的K值,看哪个最相似,然后计算出一个注意力权重,例如我们假定Tom除了和Tom最相似外,和boy是第二相似的,那么通过softmax之后得到的注意力权重是[0.9, 0.005, 0.005, 0.09], 然后再和每个Token的V值相乘,得到最后的注意力值,这个值里面就是每个token的V值根据注意力权重分配后累加之后的数值,包含了token之间的关系。
另外也可以用电商网站的例子来做类比,每个产品都有一个Key来描述,例如PS3游戏机,Value表示这个产品的价格。那么我们输入一个Query词语"PS游戏"时,网站就会进行比对,找到最相似的产品并展示。
具体到上面的计算公式,例如每个token都编码为一个128维的向量。通过三个Q,K,V线性变换矩阵来做变换,其中Q,K矩阵的输出维度为64,V矩阵的输出维度为100。以输入一个批量32个句子为例,这些句子最长的一个有20个token,那么输入的维度是32×20×128。变换之后,Q是32×20×64,K是32×20×64,V是32×20×100。对Q和K的转置矩阵K'进行矩阵乘法,即matmul(Q, K'),得到的结果的维度是32×20×20,表示每个句子中的每个token的Q都和这个句子中的所有token的K做了点乘,计算相似度。在公式中对这个计算结果还要进行缩放,除以维度的开方,即64的开方8,这样做可以使得无论Q,K的维度多大,最后得到的结果的方差保持不变。对这个结果进行Softmax归一处理,得到每个token和其他token的注意力权重。再把这个值与V相乘,得到的结果的维度为32×20×100,即每个句子中的每个token都获得了一个100维的向量表达,这里面编码了token和其他token之间的一些关系。
在代码实现的时候,还要给句子的padding_mask乘以一个很大的负数,加到注意力权重的结果中,再进行softmax计算。这个目的是,对于padding_mask为1的位置,表示这个token是一个padding,没有实际的含义。因此这个位置的注意力权重加上一个很大的负数之后,softmax的结果就是接近于0,这样就可以排除掉padding token的影响。
以下是代码实现:
- def scaled_dot_product_attention(q, k, v, mask):
- """Calculate the attention weights.
- q, k, v must have matching leading dimensions.
- k, v must have matching penultimate dimension, i.e.: seq_len_k = seq_len_v.
- The mask has different shapes depending on its type(padding or look ahead)
- but it must be broadcastable for addition.
- Args:
- q: query shape == (..., seq_len_q, depth)
- k: key shape == (..., seq_len_k, depth)
- v: value shape == (..., seq_len_v, depth_v)
- mask: Float tensor with shape broadcastable
- to (..., seq_len_q, seq_len_k). Defaults to None.
- Returns:
- output, attention_weights
- """
-
- matmul_qk = tf.matmul(q, k, transpose_b=True) # (..., seq_len_q, seq_len_k)
-
- # scale matmul_qk
- dk = tf.cast(tf.shape(k)[-1], tf.float32)
- scaled_attention_logits = matmul_qk / tf.math.sqrt(dk)
-
- # add the mask to the scaled tensor.
- if mask is not None:
- scaled_attention_logits += (mask * -1e9)
-
- # softmax is normalized on the last axis (seq_len_k) so that the scores
- # add up to 1.
- attention_weights = tf.nn.softmax(scaled_attention_logits, axis=-1) # (..., seq_len_q, seq_len_k)
-
- output = tf.matmul(attention_weights, v) # (..., seq_len_q, depth_v)
-
- return output, attention_weights
了解了注意力机制的原理后,可以构造一个多头注意力。这里多头的意思是使得模型可以从不同的层面来关注token之间的关系。例如可以想象其中一头是关注token之间的表达含义的关系,另一头是关注token之间的语法关系。
Multi-head的结构如下图:
这个Multi-head的结构包括了3部分:
在具体编码实现的时候,我们可以把以上的层按照heads数量进行合并,最后计算完之后再拆分。
例如有8个head, 每个head的线性变换层是转换为一个32维的输出,那么我们可以用一个大的线性变换层来统一处理,输出为32*8维,再把结果的维度修改为[..., 8, 32],把结果统一用一个scaled dot product attention处理,处理之后把结果再按照head数整合,然后经过最后的线性变换层输出。以下是代码实现,封装为一个keras的层:
- class MultiHeadAttention(tf.keras.layers.Layer):
- def __init__(self,*, d_model, num_heads):
- super(MultiHeadAttention, self).__init__()
- self.num_heads = num_heads
- self.d_model = d_model
-
- assert d_model % self.num_heads == 0
-
- self.depth = d_model // self.num_heads
-
- self.wq = tf.keras.layers.Dense(d_model)
- self.wk = tf.keras.layers.Dense(d_model)
- self.wv = tf.keras.layers.Dense(d_model)
-
- self.dense = tf.keras.layers.Dense(d_model)
-
- def split_heads(self, x, batch_size):
- """Split the last dimension into (num_heads, depth).
- Transpose the result such that the shape is (batch_size, num_heads, seq_len, depth)
- """
- x = tf.reshape(x, (batch_size, -1, self.num_heads, self.depth))
- return tf.transpose(x, perm=[0, 2, 1, 3])
-
- def call(self, v, k, q, mask):
- batch_size = tf.shape(q)[0]
-
- q = self.wq(q) # (batch_size, seq_len, d_model)
- k = self.wk(k) # (batch_size, seq_len, d_model)
- v = self.wv(v) # (batch_size, seq_len, d_model)
-
- q = self.split_heads(q, batch_size) # (batch_size, num_heads, seq_len_q, depth)
- k = self.split_heads(k, batch_size) # (batch_size, num_heads, seq_len_k, depth)
- v = self.split_heads(v, batch_size) # (batch_size, num_heads, seq_len_v, depth)
-
- # scaled_attention.shape == (batch_size, num_heads, seq_len_q, depth)
- # attention_weights.shape == (batch_size, num_heads, seq_len_q, seq_len_k)
- scaled_attention, attention_weights = scaled_dot_product_attention(
- q, k, v, mask)
-
- scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3]) # (batch_size, seq_len_q, num_heads, depth)
-
- concat_attention = tf.reshape(scaled_attention,
- (batch_size, -1, self.d_model)) # (batch_size, seq_len_q, d_model)
-
- output = self.dense(concat_attention) # (batch_size, seq_len_q, d_model)
-
- return output, attention_weights
Multi-head attention输出的结果再通过一个point wise feed forward network进行转换,这个网络由两个全连接层组成,连接层之间采用ReLU进行激活,代码如下:
- def point_wise_feed_forward_network(d_model, dff):
- return tf.keras.Sequential([
- tf.keras.layers.Dense(dff, activation='relu'), # (batch_size, seq_len, dff)
- tf.keras.layers.Dense(d_model) # (batch_size, seq_len, d_model)
- ])
有了以上的基础模块之后,我们就可以搭建整个transformer模型了。模型由编码器和解码器两大部分组成,如下图:
我们首先看左边的编码器部分,这个编码器由N个编码层顺序连接组成。第一个编码层接收最下方的输入,对于我们的例子来说,输入就是法语的句子,经过编码之后的向量。例如是一个[64, 32, 128]的向量,表示每个批次有64个句子,这个批次里面最长的句子包括了32个token,每个token被编码为128维的向量表达。这个输入向量加入位置编码信息之后,就是编码器的第一个编码层的输入了。
除了第一个编码层之外,其他编码层以上一个编码层的输出为输入。最后一个编码层的输出V,K作为解码器的输入。
再看一下右边的解码器部分,同样解码器也是由N个解码层顺序连接组成。每个解码层包括了两个multi-head attention(MHA)模块。第一个解码层接收最下方的输入,对于我们的例子来说,就是法语的句子对应的英语句子翻译,经过编码之后的向量。例如是一个[64, 48, 128]的向量,表示每个批次有64个句子,这个批次里面最长的句子包括了48个token,每个token被编码为128维的向量表达。这个输入向量加入位置编码信息之后,就是解码器的第一个编码层的输入了。这个输入经过第一个编码层的MHA处理之后,输出的值作为第二个MHA的Q值输入,第二个MHA的V,K输入是编码器的输出。最终这个解码层的输出结果作为第二个解码层的第一个MHA的输入,MHA的输出作为第二个MHA的Q值,V,K是编码器的输出,从而得到第二个解码层的输出。如此类推,直到第N个解码层处理完毕,把结果通过一个线性变化之后,通过Softmax计算预测的概率。
这里解码器的输入需要把对应的look head mask传入,以使得模型不会看到实际预测的单词。
例如我们输入一个法语句子,最终翻译的英语句子是"Tom is a boy",这个句子编码后是6个token,包含了[start]和[end]两个token. 对应的look ahead mask是一个6*6的矩阵。
编码器可以包括多个编码层,首先定义一个编码层,如以下代码
- class EncoderLayer(tf.keras.layers.Layer):
- def __init__(self,*, d_model, num_heads, dff, rate=0.1):
- super(EncoderLayer, self).__init__()
- self.mha = MultiHeadAttention(d_model=d_model, num_heads=num_heads)
- self.ffn = point_wise_feed_forward_network(d_model, dff)
- self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
- self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
- self.dropout1 = tf.keras.layers.Dropout(rate)
- self.dropout2 = tf.keras.layers.Dropout(rate)
-
- def call(self, x, training, mask):
- attn_output, _ = self.mha(x, x, x, mask) # (batch_size, input_seq_len, d_model)
- attn_output = self.dropout1(attn_output, training=training)
- out1 = self.layernorm1(x + attn_output) # (batch_size, input_seq_len, d_model)
-
- ffn_output = self.ffn(out1) # (batch_size, input_seq_len, d_model)
- ffn_output = self.dropout2(ffn_output, training=training)
- out2 = self.layernorm2(out1 + ffn_output) # (batch_size, input_seq_len, d_model)
- return out2
定义编码器,这个编码器包括了以下3部分:
输入的句子的每个单词token化之后,根据token id查找对应的嵌入向量,然后根据token的位置添加位置编码信息,然后作为编码器的输入。编码器最后的输出,将作为解码器的输入。
- class Encoder(tf.keras.layers.Layer):
- def __init__(self,*, num_layers, d_model, num_heads, dff, input_vocab_size, rate=0.1):
- super(Encoder, self).__init__()
-
- self.d_model = d_model
- self.num_layers = num_layers
-
- self.embedding = tf.keras.layers.Embedding(input_vocab_size, d_model)
- self.pos_encoding = positional_encoding(MAX_TOKENS, self.d_model)
-
- self.enc_layers = [
- EncoderLayer(d_model=d_model, num_heads=num_heads, dff=dff, rate=rate)
- for _ in range(num_layers)]
-
- self.dropout = tf.keras.layers.Dropout(rate)
-
- def call(self, x, training, mask):
-
- seq_len = tf.shape(x)[1]
-
- # adding embedding and position encoding.
- x = self.embedding(x) # (batch_size, input_seq_len, d_model)
- x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
- x += self.pos_encoding[:, :seq_len, :]
-
- x = self.dropout(x, training=training)
-
- for i in range(self.num_layers):
- x = self.enc_layers[i](x, training, mask)
-
- return x # (batch_size, input_seq_len, d_model)
以下是解码层的代码
- class DecoderLayer(tf.keras.layers.Layer):
- def __init__(self,*, d_model, num_heads, dff, rate=0.1):
- super(DecoderLayer, self).__init__()
- self.mha1 = MultiHeadAttention(d_model=d_model, num_heads=num_heads)
- self.mha2 = MultiHeadAttention(d_model=d_model, num_heads=num_heads)
-
- self.ffn = point_wise_feed_forward_network(d_model, dff)
-
- self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
- self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
- self.layernorm3 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
-
- self.dropout1 = tf.keras.layers.Dropout(rate)
- self.dropout2 = tf.keras.layers.Dropout(rate)
- self.dropout3 = tf.keras.layers.Dropout(rate)
-
- def call(self, x, enc_output, training, look_ahead_mask, padding_mask):
- # enc_output.shape == (batch_size, input_seq_len, d_model)
-
- attn1, attn_weights_block1 = self.mha1(x, x, x, look_ahead_mask) # (batch_size, target_seq_len, d_model)
- attn1 = self.dropout1(attn1, training=training)
- out1 = self.layernorm1(attn1 + x)
-
- attn2, attn_weights_block2 = self.mha2(
- enc_output, enc_output, out1, padding_mask) # (batch_size, target_seq_len, d_model)
- attn2 = self.dropout2(attn2, training=training)
- out2 = self.layernorm2(attn2 + out1) # (batch_size, target_seq_len, d_model)
-
- ffn_output = self.ffn(out2) # (batch_size, target_seq_len, d_model)
- ffn_output = self.dropout3(ffn_output, training=training)
- out3 = self.layernorm3(ffn_output + out2) # (batch_size, target_seq_len, d_model)
-
- return out3, attn_weights_block1, attn_weights_block2
定义解码器
- class Decoder(tf.keras.layers.Layer):
- def __init__(self,*, num_layers, d_model, num_heads, dff, target_vocab_size,
- rate=0.1):
- super(Decoder, self).__init__()
-
- self.d_model = d_model
- self.num_layers = num_layers
-
- self.embedding = tf.keras.layers.Embedding(target_vocab_size, d_model)
- self.pos_encoding = positional_encoding(MAX_TOKENS, d_model)
-
- self.dec_layers = [
- DecoderLayer(d_model=d_model, num_heads=num_heads, dff=dff, rate=rate)
- for _ in range(num_layers)]
- self.dropout = tf.keras.layers.Dropout(rate)
-
- def call(self, x, enc_output, training,
- look_ahead_mask, padding_mask):
-
- seq_len = tf.shape(x)[1]
- attention_weights = {}
-
- x = self.embedding(x) # (batch_size, target_seq_len, d_model)
- x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
- x += self.pos_encoding[:, :seq_len, :]
-
- x = self.dropout(x, training=training)
-
- for i in range(self.num_layers):
- x, block1, block2 = self.dec_layers[i](x, enc_output, training, look_ahead_mask, padding_mask)
-
- attention_weights[f'decoder_layer{i+1}_block1'] = block1
- attention_weights[f'decoder_layer{i+1}_block2'] = block2
-
- # x.shape == (batch_size, target_seq_len, d_model)
- return x, attention_weights
定义好了编码器和解码器之后,就可以组装整个模型了。
- class Transformer(tf.keras.Model):
- def __init__(self,*, num_layers, d_model, num_heads, dff, input_vocab_size,
- target_vocab_size, rate=0.1):
- super().__init__()
- self.encoder = Encoder(num_layers=num_layers, d_model=d_model,
- num_heads=num_heads, dff=dff,
- input_vocab_size=input_vocab_size, rate=rate)
-
- self.decoder = Decoder(num_layers=num_layers, d_model=d_model,
- num_heads=num_heads, dff=dff,
- target_vocab_size=target_vocab_size, rate=rate)
-
- self.final_layer = tf.keras.layers.Dense(target_vocab_size)
-
- def call(self, inputs, training):
- # Keras models prefer if you pass all your inputs in the first argument
- inp, tar = inputs
-
- padding_mask, look_ahead_mask = self.create_masks(inp, tar)
-
- enc_output = self.encoder(inp, training, padding_mask) # (batch_size, inp_seq_len, d_model)
-
- # dec_output.shape == (batch_size, tar_seq_len, d_model)
- dec_output, attention_weights = self.decoder(
- tar, enc_output, training, look_ahead_mask, padding_mask)
-
- final_output = self.final_layer(dec_output) # (batch_size, tar_seq_len, target_vocab_size)
-
- return final_output, attention_weights
-
- def create_masks(self, inp, tar):
- # Encoder padding mask (Used in the 2nd attention block in the decoder too.)
- padding_mask = create_padding_mask(inp)
-
- # Used in the 1st attention block in the decoder.
- # It is used to pad and mask future tokens in the input received by
- # the decoder.
- look_ahead_mask = create_look_ahead_mask(tf.shape(tar)[1])
- dec_target_padding_mask = create_padding_mask(tar)
- look_ahead_mask = tf.maximum(dec_target_padding_mask, look_ahead_mask)
-
- return padding_mask, look_ahead_mask
按照论文,Optimizer采用Adam算法,学习率按照以下公式来计算:
- class CustomSchedule(tf.keras.optimizers.schedules.LearningRateSchedule):
- def __init__(self, d_model, warmup_steps=4000):
- super(CustomSchedule, self).__init__()
-
- self.d_model = d_model
- self.d_model = tf.cast(self.d_model, tf.float32)
-
- self.warmup_steps = warmup_steps
-
- def __call__(self, step):
- step = tf.cast(step, tf.float32)
- arg1 = tf.math.rsqrt(step)
- arg2 = step * (self.warmup_steps ** -1.5)
-
- return tf.math.rsqrt(self.d_model) * tf.math.minimum(arg1, arg2)
-
- learning_rate = CustomSchedule(d_model)
-
- optimizer = tf.keras.optimizers.Adam(learning_rate, beta_1=0.9, beta_2=0.98, epsilon=1e-9)
模型的预测值是token的序号,可以理解为类别。因此采用类别的交叉熵来计算Loss值。以下代码定义了一个损失函数,以及一个计算模型准确率指标的函数。
- loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
- from_logits=True, reduction='none')
-
- def loss_function(real, pred):
- mask = tf.math.logical_not(tf.math.equal(real, 0))
- loss_ = loss_object(real, pred)
-
- mask = tf.cast(mask, dtype=loss_.dtype)
- loss_ *= mask
-
- return tf.reduce_sum(loss_)/tf.reduce_sum(mask)
-
- def accuracy_function(real, pred):
- accuracies = tf.equal(real, tf.argmax(pred, axis=2))
-
- mask = tf.math.logical_not(tf.math.equal(real, 0))
- accuracies = tf.math.logical_and(mask, accuracies)
-
- accuracies = tf.cast(accuracies, dtype=tf.float32)
- mask = tf.cast(mask, dtype=tf.float32)
- return tf.reduce_sum(accuracies)/tf.reduce_sum(mask)
-
- train_loss = tf.keras.metrics.Mean(name='train_loss')
- train_accuracy = tf.keras.metrics.Mean(name='train_accuracy')
现在我们可以对模型进行训练了。我们的输入是法语和英语的句子对,经过token处理和向量化表达的数据。其中法语的数据作为编码器的输入,英语的数据分为tar_inp和tar_real两部分。tar_inp作为解码器的输入。tar_real作为模型训练的目标值,和模型输出的预测值作loss的计算。
例如英语的句子为'SOS A lion in the jungle is sleeping EOS',SOS和EOS分别表示开头和结束的特殊Token。那么tar_inp为'SOS A lion in the jungle is sleeping',tar_real为'A lion in the jungle is sleeping EOS'。可以理解为首先输入这个英语句子对应的法语句子到编码器,并且输入tar_inp的第一个token 'SOS'到解码器,我们预期模型应该能够翻译出第一个英语单词,把这个英语单词和tar_real的目标'A'相比较,计算loss。然后我们再输入tar_inp的头两个token'SOS A'到解码器,预期模型能翻译出第二个英语单词,计算这第二个英语单词和tar_real的目标'lion'的loss。如此类推直到tar_inp的最后一个token。在实际训练中,tar_inp和tar_real是一次全部传给模型的,结合look_ahead_mask就可以完成以上的训练过程。
首先我们实例化一个Transformer,如以下代码:
- input_vocab_size = 0
- target_vocab_size = 0
- with open('fr_vocab.txt', 'r') as f:
- input_vocab_size = len(f.readlines())
- with open('en_vocab.txt', 'r') as f:
- target_vocab_size = len(f.readlines())
-
- transformer = Transformer(
- num_layers=num_layers,
- d_model=d_model,
- num_heads=num_heads,
- dff=dff,
- input_vocab_size=input_vocab_size,
- target_vocab_size=target_vocab_size,
- rate=dropout_rate)
定义checkpoint在训练过程中保存模型:
- checkpoint_path = './checkpoints/train'
-
- #定义两个trackable object需要保存
- ckpt = tf.train.Checkpoint(transformer=transformer, optimizer=optimizer)
-
- ckpt_manager = tf.train.CheckpointManager(ckpt, checkpoint_path, max_to_keep=5)
-
- # if a checkpoint exists, restore the latest checkpoint.
- if ckpt_manager.latest_checkpoint:
- ckpt.restore(ckpt_manager.latest_checkpoint)
- print('Latest checkpoint restored!!')
定义一个训练函数:
- EPOCHS = 20
-
- # The @tf.function trace-compiles train_step into a TF graph for faster
- # execution. The function specializes to the precise shape of the argument
- # tensors. To avoid re-tracing due to the variable sequence lengths or variable
- # batch sizes (the last batch is smaller), use input_signature to specify
- # more generic shapes.
-
- train_step_signature = [
- tf.TensorSpec(shape=(None, None), dtype=tf.int64),
- tf.TensorSpec(shape=(None, None), dtype=tf.int64),
- ]
-
- @tf.function(input_signature=train_step_signature)
- def train_step(inp, tar):
- tar_inp = tar[:, :-1]
- tar_real = tar[:, 1:]
- print(tar_real)
- with tf.GradientTape() as tape:
- predictions, _ = transformer([inp, tar_inp], training = True)
- loss = loss_function(tar_real, predictions)
-
- gradients = tape.gradient(loss, transformer.trainable_variables)
- optimizer.apply_gradients(zip(gradients, transformer.trainable_variables))
-
- train_loss(loss)
- train_accuracy(accuracy_function(tar_real, predictions))
然后就可以开始训练了,在训练了20个回合后,准确率去到86.3%:
- for epoch in range(EPOCHS):
- start = time.time()
-
- train_loss.reset_states()
- train_accuracy.reset_states()
-
- # inp -> portuguese, tar -> english
- for (batch, (inp, tar)) in enumerate(train_batches):
- try:
- train_step(inp, tar)
- except ValueError:
- print(inp)
- print('-------')
- print(tar)
-
- if batch % 50 == 0:
- print(f'Epoch {epoch + 1} Batch {batch} Loss {train_loss.result():.4f} Accuracy {train_accuracy.result():.4f}')
-
- if (epoch + 1) % 5 == 0:
- ckpt_save_path = ckpt_manager.save()
- print(f'Saving checkpoint for epoch {epoch+1} at {ckpt_save_path}')
-
- print(f'Epoch {epoch + 1} Loss {train_loss.result():.4f} Accuracy {train_accuracy.result():.4f}')
-
- print(f'Time taken for 1 epoch: {time.time() - start:.2f} secs\n')
模型训练完成后,就到了激动人心的时刻了。我们可以检验一下这个法语英语翻译器到底能否完成翻译任务呢。为此我们构建一个Translator的类,这个类在翻译的时候接收一个法语句子,在翻译前需要先添加上START, END这两个token,然后模型就会给出预测的英语Token,直到预测的TOKEN为END
- class Translator(tf.Module):
- START = tf.argmax(tf.constant(reserved_tokens) == "[START]")
- END = tf.argmax(tf.constant(reserved_tokens) == "[END]")
-
- def __init__(self, fr_tokenizer, en_tokenizer, transformer):
- self.fr_tokenizer = fr_tokenizer
- self.en_tokenizer = en_tokenizer
- self.transformer = transformer
-
- def _add_start_end(self, ragged):
- count = ragged.bounding_shape()[0]
- starts = tf.fill([count,1], START)
- ends = tf.fill([count,1], END)
- return tf.concat([starts, ragged, ends], axis=1)
-
- def __call__(self, sentence, max_length=MAX_TOKENS):
- # input sentence is french, hence adding the start and end token
- assert isinstance(sentence, tf.Tensor)
- if len(sentence.shape) == 0:
- sentence = sentence[tf.newaxis]
- #print(sentence)
- #print(self.fr_tokenizer.tokenize(sentence))
- #print(self.fr_tokenizer.tokenize(sentence).merge_dims(1,2))
- sentence = self._add_start_end(self.fr_tokenizer.tokenize(sentence).merge_dims(1,2)).to_tensor()
-
- encoder_input = sentence
-
- # As the output language is english, initialize the output with the
- # english start token.
- #start_end = self.en_tokenizer.tokenize([''])[0]
- start_end = self._add_start_end(en_tokenizer.tokenize(['']).merge_dims(1,2))[0]
- start = start_end[0][tf.newaxis]
- end = start_end[1][tf.newaxis]
-
- # `tf.TensorArray` is required here (instead of a python list) so that the
- # dynamic-loop can be traced by `tf.function`.
- output_array = tf.TensorArray(dtype=tf.int64, size=0, dynamic_size=True)
- output_array = output_array.write(0, start)
-
- for i in tf.range(max_length):
- output = tf.transpose(output_array.stack())
- predictions, _ = self.transformer([encoder_input, output], training=False)
-
- # select the last token from the seq_len dimension
- predictions = predictions[:, -1:, :] # (batch_size, 1, vocab_size)
-
- predicted_id = tf.argmax(predictions, axis=-1)
-
- # concatentate the predicted_id to the output which is given to the decoder
- # as its input.
- output_array = output_array.write(i+1, predicted_id[0])
-
- if predicted_id == end:
- break
-
- output = tf.transpose(output_array.stack())
- # output.shape (1, tokens)
- text = en_tokenizer.detokenize(output)[0] # shape: ()
-
- #tokens = en_tokenizer.lookup(output)[0]
-
- # `tf.function` prevents us from using the attention_weights that were
- # calculated on the last iteration of the loop. So recalculate them outside
- # the loop.
- _, attention_weights = self.transformer([encoder_input, output[:,:-1]], training=False)
-
- #return text, tokens, attention_weights
- return text, attention_weights
-
- translator = Translator(fr_tokenizer, en_tokenizer, transformer)
定义一个辅助函数,打印模型输入的法语句子,对应的英语句子和模型预测的英语句子:
- def print_translation(sentence, tokens, ground_truth):
- prediction_text = []
- tokens_numpy = tokens.numpy()
- for i in range(1, tokens_numpy.shape[0]-1):
- prediction_text.append(tokens_numpy[i].decode("utf-8"))
- prediction_text = ' '.join(prediction_text)
- print(f'{"Input:":15s}: {sentence}')
- print(f'{"Prediction":15s}: {prediction_text}')
- print(f'{"Ground truth":15s}: {ground_truth}')
下面我们可以从验证集中选取几个法语句子来测试一下:
- sentence = "c’est une histoire tellement triste."
- ground_truth = "this is such a sad story."
-
- translated_text, attention_weights = translator(
- tf.constant(sentence))
- print_translation(sentence, translated_text, ground_truth)
输出如下:
- Input: : c’est une histoire tellement triste.
- Prediction : that ' s such a sad story .
- Ground truth : this is such a sad story.
然后我试一下随便输入一个法语句子,因为我不懂法语,只能先造一个英语句子,然后在谷歌翻译里面翻译为法语句子。
- sentence = "Ces pratiques sont essentiellement inefficaces et peuvent entraîner des risques pour la santé et la pollution de l'environnement."
- ground_truth = "These practices are essentially ineffective, and can cause health hazards and environmental pollution."
-
- translated_text, attention_weights = translator(
- tf.constant(sentence))
- print_translation(sentence, translated_text, ground_truth)
结果如下,可见翻译的不太准确,但是大概意思还是接近的,可见目前的训练集还不够大,如果有更多的数据,应该能提升模型的性能。
- Input: : Ces pratiques sont essentiellement inefficaces et peuvent entraîner des risques pour la santé et la pollution de l'environnement.
- Prediction : these practices are essentially invinivities and practicing health and pollution .
- Ground truth : These practices are essentially ineffective, and can cause health hazards and environmental pollution.
通过对TensorFlow官网的transformer教程的学习,实现了一个法语翻译为英语的模型,下一步可以尝试一下中文翻译为英语,按照官网的介绍,中文,日语等语言的Token化的过程和英语法语不同,需要尝试另外一种token的方法,这个留待以后进一步研究。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。