赞
踩
在这篇文章里,我们准备实现一个带有注意力机制的seq2seq模型,seq2seq模型的应用场景广为人知的当属语言翻译了,但是,由于语言翻译训练时间太过漫长,模型结构太过庞大,这里我们使用B站中的一个视频中讲解的案例来对模型进行实现。
视频链接:https://www.bilibili.com/video/BV1LD4y1P7nK?p=90
该视频讲解的案例是对时间的表示进行格式化输出,如2023年1月1日有的地方表示为 2023/1/1,有的地方表示为2023.1.1,格式并不统一,十分不利于时间结构的统一,下面这个例子将对不同的时间表示格式进行训练,统一为 2023-1-1的形式,且因为这个案例的数据短小,训练的耗时也不会太长。
以下代码需要的环境需求为
keras==2.2.5
由于在公开数据集上很难找到我们需要的时间格式的数据集,所以我们可以考虑使用 faker
库伪造数据集。
faker
库能够很轻松的帮助你伪造一些看起来很真实的数据集,这就免去了我们自己制造的数据集单调的困难。
from faker import Faker import random from tqdm import tqdm from babel.dates import format_date fake = Faker() # 设置随机种子 Faker.seed(12345) random.seed(12345) # 定义想要生成的日期的格式 FORMATS = ['short', 'medium', 'long', 'full', 'full', 'full', 'full', 'full', 'full', 'full', 'full', 'full', 'full', 'd MMM YYY', 'd MMMM YYY', 'dd MMM YYY', 'd MMM, YYY', 'd MMMM, YYY', 'dd, MMM YYY', 'd MM YY', 'd MMMM YYY', 'MMMM d YYY', 'MMMM d, YYY', 'dd.MM.YY'] # 定义生成的语言 LOCALES = ['en_US'] def load_date(): """ 生成数据 :returns: 返回生成的数据、对应的标准格式的数据、data object """ dt = fake.date_object() try: human_readable = format_date(dt, format=random.choice(FORMATS), locale='en_US') human_readable = human_readable.lower() human_readable = human_readable.replace(',', '') # 将生成的数据化为标准格式 machine_readable = dt.isoformat() except AttributeError as e: return None, None, None return human_readable, machine_readable, dt def load_dataset(m): """ 生成m个日期数据 :m: 生成的数据的数量 """ human_vocab = set() machine_vocab = set() dataset = [] Tx = 30 for i in tqdm(range(m)): h, m, _ = load_date() if h is not None: dataset.append((h, m)) human_vocab.update(tuple(h)) machine_vocab.update(tuple(m)) # 将'<unk>', '<pad>'两个标签加入,'<unk>'表示未出现在字典中的词,'<pad>'表示占位 human = dict(zip(sorted(human_vocab) + ['<unk>', '<pad>'], list(range(len(human_vocab) + 2)))) # 将dict化为item:num的形式 inv_machine = dict(enumerate(sorted(machine_vocab))) machine = {v: k for k, v in inv_machine.items()} return dataset, human, machine dataset, x_vocab, y_vocab = load_dataset(10000)
dataset
中是生成的数据和标准数据, x_vocab, y_vocab
存储的是生成数据和标准数据每个字符的字典,方便生成Onehot编码。
处理后的部分数据如下:
dataset[0]:
(‘9 may 1998’, ‘1998-05-09’)
x_vocab:
{’ ‘: 0, ‘.’: 1, ‘/’: 2, ‘0’: 3, ‘1’: 4, ‘2’: 5, ‘3’: 6, ‘4’: 7, ‘5’: 8, ‘6’: 9, ‘7’: 10, ‘8’: 11, ‘9’: 12, ‘a’: 13, ‘b’: 14, ‘c’: 15, ‘d’: 16, ‘e’: 17, ‘f’: 18, ‘g’: 19, ‘h’: 20, ‘i’: 21, ‘j’: 22, ‘l’: 23, ‘m’: 24, ‘n’: 25, ‘o’: 26, ‘p’: 27, ‘r’: 28, ‘s’: 29, ‘t’: 30, ‘u’: 31, ‘v’: 32, ‘w’: 33, ‘y’: 34, ‘<unk>’: 35, ‘<pad>’: 36}
y_vocab:
{’-': 0, ‘0’: 1, ‘1’: 2, ‘2’: 3, ‘3’: 4, ‘4’: 5, ‘5’: 6, ‘6’: 7, ‘7’: 8, ‘8’: 9, ‘9’: 10}
每一个元组的前面是生成的数据,后面是标准的数据,形式上还是难辨真假的。
在seq2seq模型的输入上面,我们可以输入Onehot编码,当然这是最简单的处理形式,复杂一点可以输入词嵌入的向量,这里我们采用Onehot编码的形式对数据进行处理,将每一条数据都处理成Onehot编码的格式。
在处理数据前,我们需要先定义Decoder和Encoder的最大长度,因为输入的数据的Onehot编码肯定必须与Encoder的长度一样长,输出的Onehot编码肯定必须与Decoder的Onehot编码一样长,这里,由于日期格式序列都不是很长,我们定义Encoder长度为30,Decoder长度为10即可。
def preprocess_data(dataset, human_vocab, machine_vocab, Tx, Ty): ''' 处理数据为Onehot编码格式 :param dataset: 传入的生成数据与标准数据 :param human_vocab: 生成数据的字典 :param machine_vocab: 标准数据的字典 :param Tx: Encoder的最大长度 :param Ty: Decoder的最大长度 :return: 编码后的XY以及Onehot后的XY ''' X, Y = zip(*dataset) X = np.array([string_to_int(i, Tx, human_vocab) for i in X]) Y = [string_to_int(t, Ty, machine_vocab) for t in Y] Xoh = np.array(list(map(lambda x: to_categorical(x, num_classes=len(human_vocab)), X))) Yoh = np.array(list(map(lambda x: to_categorical(x, num_classes=len(machine_vocab)), Y))) return X, np.array(Y), Xoh, Yoh def string_to_int(string, length, vocab): """ 将每条数据按照字典对应的编码转为编码格式 string -- 输入的字符串 length -- 想要转为的编码的长度 vocab -- 字符与数字对应的字典 """ # make lower to standardize string = string.lower() string = string.replace(',', '') # 如果大于这个长度就截断 if len(string) > length: string = string[:length] # 填充字典里没有的字符为'<unk>'的编码 rep = list(map(lambda x: vocab.get(x, '<unk>'), string)) # 如果小于规定长度则用'<pad>'的编码进行填充 if len(string) < length: rep += [vocab['<pad>']] * (length - len(string)) return rep X, Y, X_onehot, Y_onehot = preprocess_data(dataset, x_vocab, y_vocab, 30, 10)
输出部分数据后,可以看到处理后的数据的形式如下:
X[0]:
[12 0 24 13 34 0 4 12 12 11 36 36 36 36 36 36 36 36 36 36 36 36 36 36
36 36 36 36 36 36]
Y[0]:
[ 2 10 10 9 0 1 6 0 1 10]
X_onehot[0]:
[[0. 0. 0. … 0. 0. 0.]
[1. 0. 0. … 0. 0. 0.]
[0. 0. 0. … 0. 0. 0.]
…
[0. 0. 0. … 0. 0. 1.]
[0. 0. 0. … 0. 0. 1.]
[0. 0. 0. … 0. 0. 1.]]
Y_onehot[0]:
[[0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0.]
[0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1.]
[0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1.]
[0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0.]
[1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
[0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
[0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0.]
[1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
[0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
[0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1.]]
首先我们来搭建seq2seq模型,seq2seq模型由编码器和解码器构成,编码器和解码器我们都采用LSTM的结构。
LSTM的API为根据参数的不同返回值也有不同,主要影响返回值的参数有 return_sequences, return_state
两个布尔值的参数,两个参数根据布尔值的组合不同返回值也不同。
return_sequences=False,return_state=False
h = LSTM(X)
默认情况就是两个都是False,这种情况下会返回最后一个时间步的隐藏状态。
return_sequences = True, return_state = False
lstm = LSTM(X,return_sequences = True, return_state = False)
lstm
返回的是每个时间步的隐藏状态。
return_sequences = False, return_state = True
lstm, state_h, state_c = LSTM(X,return_sequences = False, return_state = True)
接收三个返回值,lstm
是最后一个时间步的隐藏状态,state_h
与 lstm
完全一样,state_c
是最后一个时间步的cell状态。
return_sequences = True, return_state = True
lstm, state_h, state_c = LSTM(X,return_sequences = True, return_state = True)
lstm
是每个时间步的隐藏状态, state_h
是最后一个时间步的隐藏状态, state_c
是最后一个时间步的cell状态。
此外,由于解码器是对编码器产生的中间向量进行计算,所以我们在编码器上采用提取特征更好的双向LSTM结构,构建的seq2seq模型的层次如下:
from keras.layers import Bidirectional, Concatenate, Dot, Input, LSTM from keras.layers import RepeatVector, Dense, Softmax class Seq2seq(object): """Seq2seq进行日期格式翻译 """ def __init__(self, Tx=30, Ty=10, n_x=32, n_y=64): # 定义网络的相关参数 self.model_param = { "Tx": Tx, # 定义encoder序列最大长度 "Ty": Ty, # decoder序列最大长度 "n_x": n_x, # encoder的隐层输出值大小 "n_y": n_y # decoder的隐层输出值大小和cell输出值大小 } def load_data(self, m): """ 获取m条数据 """ # 获取3个值:数据集,特征词的字典映射,目标词字典映射 dataset, x_vocab, y_vocab = load_dataset(m) # 获取处理好的数据:特征x以及目标y的one_hot编码 X, Y, X_onehot, Y_onehot = preprocess_data(dataset, x_vocab, y_vocab, self.model_param["Tx"], self.model_param["Ty"]) # 添加特征词个不重复个数以及目标词的不重复个数 self.model_param["x_vocab"] = x_vocab self.model_param["y_vocab"] = y_vocab self.model_param["x_vocab_size"] = len(x_vocab) self.model_param["y_vocab_size"] = len(y_vocab) return X_onehot, Y_onehot def get_encoder(self): """ 定义编码器结构 :return: """ # 指定隐层值输出的大小 self.encoder = Bidirectional(LSTM(self.model_param["n_x"], return_sequences=True, name='bidirectional_1'), merge_mode='concat') def get_decoder(self): """ 定义解码器结构 :return: """ # 定义decoder结构,指定隐层值的形状大小,return_state=True self.decoder = LSTM(self.model_param["n_y"], return_state=True)
至此,seq2seq模型就搭建好了,之后,便是实现在seq2seq模型中的注意力机制了。
这里我们实现上一篇文章中介绍的软性注意力机制,其计算的过程如下所示。
设输入序列为
x
=
(
x
1
,
x
2
,
⋯
,
x
n
)
x=(x_1,x_2,\cdots,x_n)
x=(x1,x2,⋯,xn),输出序列为
y
=
(
y
1
,
y
2
,
⋯
,
y
m
)
y=(y_1,y_2,\cdots,y_m)
y=(y1,y2,⋯,ym),首先,将输入序列通过LSTM,得到最后一个状态的隐藏状态和输出状态,即
h
n
,
c
n
=
L
S
T
M
(
x
,
h
n
−
1
,
c
n
−
1
)
h_n,c_n=LSTM(x,h_{n-1},c_{n-1})
hn,cn=LSTM(x,hn−1,cn−1)之后就是计算注意力的问题了。
设
s
t
s_t
st 为解码器
t
t
t 时刻的隐层状态输出,
h
t
h_t
ht 为编码器
t
t
t 时刻的隐层状态输出,故计算解码器
i
i
i 时刻与编码器
j
j
j 时刻的权重计算函数为:
e
i
j
=
v
T
t
a
n
h
(
W
1
s
i
+
W
2
h
j
)
e_{ij}=v^T tanh (W_1s_i+W_2h_j)
eij=vTtanh(W1si+W2hj)权重系数表现如下:
α
i
k
=
e
x
p
(
e
i
k
)
∑
l
=
1
n
e
x
p
(
e
i
l
)
,
k
=
1
,
⋯
,
n
\alpha_{ik}=\frac{exp({e_{ik}})}{\sum_{l=1}^{n}exp({e_{il}})},k=1,\cdots,n
αik=∑l=1nexp(eil)exp(eik),k=1,⋯,n接着计算加上了注意力权重后的隐层状态的加权向量
c
i
c_i
ci
c
i
=
∑
j
=
1
n
α
i
j
h
j
c_i = \sum_{j=1}^{n}\alpha_{ij}h_{j}
ci=j=1∑nαijhj然后将
c
i
c_i
ci 与上一时刻解码器的输出
s
i
−
1
s_{i-1}
si−1 结合并输入到LSTM中,最后将其输出到一个softmax网络结构中,输出概率最大的标签即可。
我们依然在Seq2seq模型的类中定义注意力机制的计算方式,
def get_attention(self): """ 定义Attention的结构 :return: attention结构 """ repeator = RepeatVector(self.model_param["Tx"]) concatenator = Concatenate(axis=-1) densor1 = Dense(10, activation="tanh", name='Dense1') densor2 = Dense(1, activation="relu", name='Dense2') activator = Softmax(axis=1, name='attention_weights') dotor = Dot(axes=1) # 将结构存储在attention当中 self.attention = { "repeator": repeator, "concatenator": concatenator, "densor1": densor1, "densor2": densor2, "activator": activator, "dotor": dotor } def computer_one_attention(self, a, s_prev): """ 利用定义好的attention结构计算中的alpha系数与a对应输出 :param a:隐层状态值 (m, 30, 64) :param s_prev: LSTM的初始隐层状态值, 形状(batch, 64) :return: context """ # 使用repeator扩大数据s_prev的维度为(sample, Tx, n_y),这样可以与a进行合并 s_prev = self.attention["repeator"](s_prev) #[batchsize, 30, 64] # 将a和s_prev 按照最后一个维度进行合并计算 concat = self.attention["concatenator"]([a, s_prev]) #[batchsize, 30, 128] # 使用densor1全连接层网络计算出e e = self.attention["densor1"](concat) #[batchsize, 30, 10] # 使用densor2增加relu激活函数计算 energies = self.attention["densor2"](e) #[batchsize, 30, 1] # 使用"activator"的softmax函数计算权重"alphas" # 这样一个attention的系数计算完成 alphas = self.attention["activator"](energies) #[batchsize, 30, 1] # 使用dotor,矩阵乘法,将 "alphas" and "a" 去计算context/c context = self.attention["dotor"]([alphas, a]) #[batchsize, 1, 64] return context
seq2seq模型以及注意力机制的计算都已经完成后,就可以对模型的输出进行定义,模型的输出实际上就是让Decoder对输出字典中每个字符编码输出概率,然后取概率最大的那个字符编码为最终输出的字符编码,这涉及到概率,我们在最后一层加上一个softmax输出,对概率进行规整化,所以,输出的构造如下
def get_output_layer(self):
"""
定义输出层
:return: output_layer
"""
# 对decoder输出进行softmax,输出向量大小为y_vocab大小
self.output_layer = Dense(self.model_param["y_vocab_size"], activation=Softmax(axis=1))
到此,一个模型所需要的输主体、Attention机制以及输出都已经齐全了,接下来就需要将这些串联起来构成我们需要的模型了,在上节所讲的seq2seq模型的介绍里面,我们知道了解码器的结构有很多种,这里,我们采取第二种解码器的结构进行构建,即初始化Decoder的初始状态,并将中间向量输入到Decoder的每一个时间步中去,我们的模型定义如下:
def model(self): """ 定义模型获取模型实例 :param model_param: 网络的相关参数 :param seq2seq:网络结构 :return: model,Keras model instance """ # 定义模型的输入 (30,) # 定义decoder中隐层初始状态值s0以及cell输出c0 X = Input(shape=(self.model_param["Tx"], self.model_param["x_vocab_size"]), name='X') # 输入Decoder的初始状态 s0 = Input(shape=(self.model_param["n_y"],), name='s0') c0 = Input(shape=(self.model_param["n_y"],), name='c0') s = s0 c = c0 # 定义装有输出值的列表 outputs = [] # 步骤1:定义encoder的双向LSTM结构得输出a a = self.encoder(X) # 步骤3:循环decoder的Ty次序列输入,获取decoder最后输出 # 包括计算Attention输出 for t in range(self.model_param["Ty"]): # 1: 定义decoder第t'时刻的注意力结构并输出context context = self.computer_one_attention(a, s) # 2: 对"context" vector输入到deocder当中 # 获取cell的两个输出隐层状态和,initial_state= [previous hidden state, previous cell state] s, _, c = self.decoder(context, initial_state=[s, c]) # 3: 应用 Dense layere获取deocder的t'时刻的输出 out = self.output_layer(s) # 4: 将decoder中t'时刻的输出装入列表 outputs.append(out) # 步骤 4: 创建model实例,定义输入输出 model = Model(inputs=(X, s0, c0), outputs=outputs)
在这里,我们为什么要循环进行输出呢?因为我们需要预测的是一句话,在Decoder中,一句话最多的长度是10,那么,对于Encoder以及Attention送入的向量,我们都需要将其放入Decoder中进行预测,由于Decoder中每一句话都有10个长度,所以对其中每个位置的字符都要进行预测,也就是输出10个字符的预测,因此会循环10次。
而对于构建的模型,我们需要按照一定的顺序将各函数连接在一起,即定义一个初始化函数如下:
def init_seq2seq(self):
"""
初始化网络结构
:return:
"""
self.get_encoder()
self.get_decoder()
self.get_attention()
self.get_output_layer()
至此,模型的初始化就完成了。
模型初始化后,就需要对模型进行训练了,训练时,我们需要将模型的输入数据进行输入,从上面的模型可知,我们的输入包含训练数据的Onehot编码以及Decoder的初始状态,这里我们就定义初始状态为0即可。
需要注意的是模型的目标值,由于在上面模型定义时,由于Decoder的最长序列是10,所以我们输出是连续输出10个位置中每个位置的预测,故这里的输出相当于将每个位置的输出都进行分开预测,因此,我们将目标值输入时也需要按照这样的格式进行输入,才能够计算损失。由于输出是十个位置,所以我们放入模型的目标值也必须是十个位置的,原来的目标值 Y_onehot
是一个 [10000,10,11]
维度的向量,最后的 11
是指映射到字典里的11个字符中,直接这样输出显然是不行的,因为这样相当于输出的位置有10000个,我们需要将第一维与第二维进行交换,将这个向量变为 [10,10000,11]
维度的向量,才能够对应上模型中定义的10个输出位置。
def train(self, X_onehot, Y_onehot): """ 训练 :param X_onehot: 特征值的one_hot编码 :param Y_onehot: 目标值的one_hot编码 :return: """ # 利用网络结构定义好模型输入输出 model = self.model() opt = Adam(lr=0.005, beta_1=0.9, beta_2=0.999, epsilon=None, decay=0.001) model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy']) s0 = np.zeros((10000, self.model_param["n_y"])) c0 = np.zeros((10000, self.model_param["n_y"])) outputs = list(Y_onehot.swapaxes(0, 1)) # 输入x,以及decoder中LSTM的两个初始化值 model.fit([X_onehot, s0, c0], outputs, epochs=10, batch_size=100)
最后则是模型预测,在最后进行预测的时候,模型的输出是一个 [10,11]
维度的向量,这显然是无法进行读取的,因此,我们需要借助最开始保存的输出部分的字典来讲这些Onehot向量反向映射为字符串,模型预测如下:
def test(self): """ 模型预测 :return: """ model = self.model() model.load_weights("./models/model.h5") example = '1 March 2001' source = string_to_int(example, self.model_param["Tx"], self.model_param["x_vocab"]) source = np.expand_dims(np.array(list(map(lambda x: to_categorical(x, num_classes=self.model_param["x_vocab_size"]), source))), axis=0) s0 = np.zeros((10000, self.model_param["n_y"])) c0 = np.zeros((10000, self.model_param["n_y"])) prediction = model.predict([source, s0, c0]) prediction = np.argmax(prediction, axis=-1) output = [dict(zip(self.model_param["y_vocab"].values(), self.model_param["y_vocab"].keys()))[int(i)] for i in prediction] print("source:", example) print("output:", ''.join(output)) s2s = Seq2seq() X_onehot, Y_onehot = s2s.load_data(10000) s2s.init_seq2seq() s2s.train(X_onehot, Y_onehot) s2s.test()
以上就是对加入了Attention机制的seq2seq模型的手动实现过程,相信经过了如上的过程后,你应该明白了seq2seq模型的实现与作用机理,并且经过手动实现注意力机制,对注意力机制更加的熟悉,也由此能够去了解更加复杂多样的注意力方式。
[1] : https://blog.csdn.net/weixin_38314865/article/details/107582093
[2] : https://blog.csdn.net/rocking_struggling/article/details/104318023
[3] : https://www.bilibili.com/video/BV1LD4y1P7nK?p=90
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。