赞
踩
代码来源于这篇博客
class Transformer(object):
def __init__(self):
def build_model(self):
def positional_encoding(self):
def add_embedding(self):
def encoder(self):
def decodr(self):
def multi_head_attention_layer(self):
def feed_forward_layer(self):
def train(self):
def eval(self):
在最后一步调用build_model函数构建模型
def __init__(self, embedding_size, num_layers, keep_prob, learning_rate, learning_decay_steps, learning_decay_rate, clip_gradient, is_embedding_scale, multihead_num, label_smoothing, max_gradient_norm, encoder_vocabs, decoder_vocabs, max_encoder_len, max_decoder_len, share_embedding, pad_index=None ): self.embedding_size = embedding_size self.num_layers = num_layers self.keep_prob = keep_prob self.learning_rate = learning_rate self.learning_decay_steps = learning_decay_steps self.learning_decay_rate = learning_decay_rate self.clip_gradient = clip_gradient self.encoder_vocabs = encoder_vocabs self.decoder_vocabs = decoder_vocabs self.max_encoder_len = max_encoder_len self.max_decoder_len = max_decoder_len self.share_embedding = share_embedding self.is_embedding_scale = is_embedding_scale self.multihead_num = multihead_num self.label_smoothing = label_smoothing self.max_gradient_norm = max_gradient_norm self.pad_index = pad_index self.build_model()
def build_model(self): #encoder层的输入 self.encoder_inputs = tf.placeholder(tf.int32, [None, None], name='encoder_inputs') self.encoder_inputs_length = tf.placeholder(tf.int32, [None], name='encoder_inputs_length') self.batch_size = tf.placeholder(tf.int32, [], name='batch_size') self.keep_prob = tf.placeholder(tf.float32, name='keep_prob') #decoder层的输入 self.decoder_inputs = tf.placeholder(tf.int32, [None, None], name='decoder_inputs') #decoder层的目标 self.decoder_targets = tf.placeholder(tf.int32, [None, None], name='decoder_targets') self.decoder_inputs_length = tf.shape(self.decoder_inputs)[1] self.decoder_targets_length = tf.placeholder(tf.int32, [None], name='decoder_targets_length') #计算mask矩阵 self.targets_mask = tf.sequence_mask(self.decoder_targets_length, self.max_decoder_len, dtype=tf.float32, name='masks') self.itf_weight = tf.placeholder(tf.float32, [None, None], name='itf_weight') #embedding层 with tf.name_scope("embedding"): zero = tf.zeros([1, self.embedding_size], dtype=tf.float32) # 填充值对应的embedding #未处理之前的embedding矩阵 -1是为了给填充值留一个位置 encoder_embedding = tf.get_variable( 'embedding_table', [self.encoder_vocabs - 1, self.embedding_size], initializer=tf.random_normal_initializer(0., self.embedding_size ** -0.5)) #将embedding分成两部分,然后把填充值对应的embedding与这两部分concat front, end = tf.split(encoder_embedding, [self.pad_index, self.encoder_vocabs - 1 - self.pad_index]) encoder_embedding = tf.concat((front, zero, end), axis=0) # [self.vocab_size, self.embedding_size] #获取位置编码,位置编码包含时序信息 encoder_position_encoding = self.positional_encoding(self.max_encoder_len) #获取decoder层的embedding if not self.share_embedding: decoder_embedding = tf.get_variable( 'decoder_embedding', [self.decoder_vocabs, self.embedding_size], initializer=tf.random_normal_initializer(0., self.embedding_size ** -0.5) ) decoder_position_encoding = self.positional_encoding(self.max_decoder_len) with tf.name_scope('encoder'): # 获取encoder_input对应的mask矩阵和词向量矩阵 encoder_inputs_embedding, encoder_inputs_mask = self.add_embedding( encoder_embedding, encoder_position_encoding, self.encoder_inputs, tf.shape(self.encoder_inputs)[1] ) # encoder层的输出 self.encoder_outputs = self.encoder(encoder_inputs_embedding, encoder_inputs_mask) with tf.name_scope('decoder'): #当词向量共享时 if self.share_embedding: decoder_inputs_embedding, decoder_inputs_mask = self.add_embedding( encoder_embedding, encoder_position_encoding, self.decoder_inputs, self.decoder_inputs_length ) #当不共享时,比如翻译时,因为是不同语言,所有encoder层和decoder层的词向量矩阵不同 else: decoder_inputs_embedding, decoder_inputs_mask = self.add_embedding( decoder_embedding, decoder_position_encoding, self.decoder_inputs, self.decoder_inputs_length ) self.decoder_outputs, self.predict_ids = self.decoder(decoder_inputs_embedding, self.encoder_outputs, decoder_inputs_mask, encoder_inputs_mask) # loss with tf.name_scope('loss'): # label smoothing self.targets_one_hot = tf.one_hot( self.decoder_targets, depth=self.decoder_vocabs, on_value=(1.0 - self.label_smoothing) + (self.label_smoothing / self.decoder_vocabs), off_value=(self.label_smoothing / self.decoder_vocabs), dtype=tf.float32 ) loss = tf.nn.softmax_cross_entropy_with_logits( labels=self.targets_one_hot, logits=self.decoder_outputs ) if config.use_itf_loss: loss *= self.itf_weight else: loss *= self.targets_mask self.loss = tf.reduce_sum(loss) / tf.reduce_sum(self.targets_mask) # 优化函数,对学习率采用指数递减的形式 self.global_step = tf.train.get_or_create_global_step() learning_rate = tf.train.exponential_decay(self.learning_rate, self.global_step, self.learning_decay_steps, self.learning_decay_rate, staircase=True) optimizer = tf.train.AdamOptimizer(learning_rate) trainable_params = tf.trainable_variables() gradients = tf.gradients(self.loss, trainable_params) clip_gradients, _ = tf.clip_by_global_norm(gradients, self.max_gradient_norm) self.train_op = optimizer.apply_gradients(zip(clip_gradients, trainable_params)) # summary tf.summary.scalar('loss', self.loss) self.merged = tf.summary.merge_all()
def positional_encoding(self, sequence_length): """ positional encoding :return: 因为attention没用使用cnn和rnn,因此使用位置编码来表示序列时序信息, 计算公式如下: pos:序列位置, i:在embedding维度的哪个位置 d:emdedding维度大小 (pos,2i) = sin( pos / pow(10000, 2i/d) ) (pos,2i+1) =cos( pos / pow(10000, 2i/d) ) """ position_embedding = np.zeros([sequence_length, self.embedding_size]) for pos in range(sequence_length): for i in range(self.embedding_size // 2): position_embedding[pos, 2 * i] = np.sin(pos / np.power(10000, 2 * i / self.embedding_size)) position_embedding[pos, 2 * i + 1] = np.cos(pos / np.power(10000, 2 * i / self.embedding_size)) position_embedding = tf.convert_to_tensor(position_embedding, dtype=tf.float32) return position_embedding
def add_embedding(self, embedding, position_encoding, inputs_data, data_length): # 将词向量与位置编码进行相加 inputs_embedded = tf.nn.embedding_lookup(embedding, inputs_data) #归一化,因为初始化时,词向量为[0, embedding_size ** -0.5],所以归一化需要*embedding ** 0.5 if self.is_embedding_scale is True: inputs_embedded *= self.embedding_size ** 0.5 inputs_embedded += position_encoding[:data_length, :] # embedding_mask #将inputs_data中与pad_index相等的对应的mask矩阵的值置为0,其余为1 [batch_size, encoder_inputs_length, 1] embedding_mask = tf.expand_dims( tf.cast(tf.not_equal(inputs_data, self.pad_index), dtype=tf.float32), axis=-1 ) inputs_embedded *= embedding_mask # embedding dropout inputs_embedded = tf.nn.dropout(inputs_embedded, keep_prob=self.keep_prob) return inputs_embedded, embedding_mask
def encoder(self, encoder_inputs_embedding, encoder_inputs_mask): # multi-head attention mask [batch_size, encoder_inputs_length, 1] ''' 对mask矩阵进行处理 [batch_size, encoder_inputs_length,1]=> [batch_size,encoder_inputs_length,encoder_inputs_length]=> 对扩维后的矩阵进行维度变换,然后与未变换的进行matmul,得到的结果是每个词对应的一个seq的mask结果, 若这个词本身就为pad,这个结果就是全为0,否则就是这个序列的mask结果 [batch_size*multhead_num,encoder_inputs_length,encoder_inputs_length] 对数据进行扩展,以便进行多头注意力的计算 eg: a = tf.constant(np.random.randint(0,2,[2,3])) a = tf.expand_dims(a, axis=-1) b = tf.transpose(a, [0,2,1]) c = tf.matmul(a, b) d = tf.tile(c, [2, 1, 1]) sess.run(a) array([[[0], [1], [1]], [[0], [1], [1]]]) sess.run(b) array([[[0, 1, 1]], [[0, 1, 1]]]) sess.run(c) array([[[0, 0, 0], [0, 1, 1], [0, 1, 1]], [[0, 0, 0], [0, 1, 1], [0, 1, 1]]]) sess.run(d) array([[[0, 0, 0], [0, 1, 1], [0, 1, 1]], [[0, 0, 0], [0, 1, 1], [0, 1, 1]], [[0, 0, 0], [0, 1, 1], [0, 1, 1]], [[0, 0, 0], [0, 1, 1], [0, 1, 1]]]) ''' encoder_self_attention_mask = tf.tile( tf.matmul(encoder_inputs_mask, tf.transpose(encoder_inputs_mask, [0, 2, 1])), [self.multihead_num, 1, 1] ) encoder_outputs = encoder_inputs_embedding #进行多头注意力的计算,多层encoder for i in range(self.num_layers): # multi-head selt-attention sub_layer multi_head_outputs = self.multi_head_attention_layer( query=encoder_outputs, key_value=encoder_outputs, score_mask=encoder_self_attention_mask, output_mask=encoder_inputs_mask, activation=None, name='encoder_multi_' + str(i) #因为每一层之间不共享参数 ) #前馈神经网络 encoder_outputs = self.feed_forward_layer( multi_head_outputs, output_mask=encoder_inputs_mask, activation=tf.nn.relu, name='encoder_dense_' + str(i) ) return encoder_outputs
decoder层有两个输入,一个是参与decoder层运算的输入,一个是targets,用于计算损失
def decoder(self, decoder_inputs_embedding, encoder_outputs, decoder_inputs_mask, encoder_inputs_mask): # mask 编码解码注意层 [batch_size*multhead_num, 1, encoder_inputs_length] decoder_encoder_attention_mask = tf.tile( tf.transpose(encoder_inputs_mask, [0, 2, 1]), [self.multihead_num, 1, 1] ) ''' 自注意层mask 因为不能使用下一个词去预测上一个词,所以上一个词的信息不能用后面的词来表示,所以是一个下三角矩阵 [1,2,....,decoder_inputs_length] [[1,0,0,...,0], [1,1,0,...,0], [1,1,1,...,0]...] [decoder_inputs_length, decoder_inputs_length] [[[1,0,0,...,0], [1,1,0,...,0], [1,1,1,...,0]], [[1,0,0,...,0], [1,1,0,...,0], [1,1,1,...,0]]...] [multihead_num * batch_size, decoder_inputs_length, decoder_inputs_length] ''' decoder_self_attention_mask = tf.tile(tf.expand_dims(tf.sequence_mask( tf.range(start=1, limit=self.decoder_inputs_length + 1), maxlen=self.decoder_inputs_length, dtype=tf.float32), axis=0 ), [self.multihead_num * tf.shape(decoder_inputs_embedding)[0], 1, 1]) decoder_outputs = decoder_inputs_embedding for i in range(self.num_layers): # 自注意层 [batch_size, encoder_inputs_length, embedding_size] masked_multi_head_outputs = self.multi_head_attention_layer( query=decoder_outputs, key_value=decoder_outputs, score_mask=decoder_self_attention_mask, output_mask=decoder_inputs_mask, activation=None, name='decoder_first_multi_' + str(i) ) # 解码层 key_value替换为encoder_output multi_head_outputs = self.multi_head_attention_layer( query=masked_multi_head_outputs, key_value=encoder_outputs, score_mask=decoder_encoder_attention_mask, output_mask=decoder_inputs_mask, activation=None, name='decoder_second_multi_' + str(i) ) # point-wise feed forward sub_layer decoder_outputs = self.feed_forward_layer( multi_head_outputs, output_mask=decoder_inputs_mask, activation=tf.nn.relu, name='decoder_dense_' + str(i) ) # output_layer decoder_outputs = tf.layers.dense(decoder_outputs, units=self.decoder_vocabs, activation=None, name='outputs') # 预测 predict_ids = tf.argmax(decoder_outputs, axis=-1, output_type=tf.int32) return decoder_outputs, predict_ids
def multi_head_attention_layer(self, query, key_value, score_mask=None, output_mask=None, activation=None, name=None): """ multi-head self-attention sub_layer :param query: :param key_value: :param score_mask: :param output_mask: :param activation: :param name: :return: """ with tf.variable_scope(name, reuse=tf.AUTO_REUSE): # 计算Q、K、V V = tf.layers.dense(key_value, units=self.embedding_size, activation=activation, use_bias=False, name='V') K = tf.layers.dense(key_value, units=self.embedding_size, activation=activation, use_bias=False, name='K') Q = tf.layers.dense(query, units=self.embedding_size, activation=activation, use_bias=False, name='Q') # 将Q、K、V分离为multi-heads的形式 ''' 比如Q是一个[3,4,6]的矩阵,其中6便是每一个word对应的Q信息,使用多头注意力时,会将这段信息切分成multihead-num个, 分给各个头进行计算 [batch_size, seq_length, embedding_size]->[batchsize*multiheadnum, seq_length, embedding_size/multiheadnum] ''' V = tf.concat(tf.split(V, self.multihead_num, axis=-1), axis=0) K = tf.concat(tf.split(K, self.multihead_num, axis=-1), axis=0) Q = tf.concat(tf.split(Q, self.multihead_num, axis=-1), axis=0) ''' 计算Q、K的点积,并进行scale,因为是多头的,所以Q和K矩阵已经被均匀split,所以被除数应该是词向量长度除multhead_num [batchsize*multiheadnum, seq_length, seq_length] 每一个word都对应了整个序列的得分并且有multiheadnum个这样的矩阵 例如[[1,2,3],[2,3,4]]=>转化为词向量 =>[[[1,1,1,1],[2,2,2,2],[3,3,3,3]], [[2,2,2,2],[3,3,3,3],[4,4,4,4]]]=>假如2个头 =>[[[[1,1],[2,2],[3,3]], [[2,2],[3,3],[4,4]]], [[[1,1],[2,2],[3,3]], [[2,2],[3,3],[4,4]]]]=>concat =>[[[1,1],[2,2],[3,3]], [[2,2],[3,3],[4,4]], [[1,1],[2,2],[3,3]], [[2,2],[3,3],[4,4]]]=>matmul =>[[[12,13,14],[1,3,3],[2,3,5]], [[2,4,5],[5,6,9],[2,4,7]], [[3,5,6],[2,44,6],[6,7,89]], [[2,4,5],[5,6,7],[2,5,8]]] (此处省略了除sqrt(embedding_size)) 计算softmax叉乘V时(假如batch_size为1 假如softmax [[[0.3,0.3,0.4],[0.2,0.4,0.6],[0.5,0.3,0.2]]] V = [[[1,2,3,4,5],[2,3,4,5,6],[3,4,5,6,7]]] 叉乘相等于序列中每个word对应的值等于其softmax对应的各个单词的概率乘以他们的V中的值,比如第一个向量就等于0.3*V[0]+0.4*V[1]+0.3*V[2] 结果为[[[2.1,3.1,4.1,5.1,6.1],[2.8,4,5.2,6.4,7.6],[1.7,2.7,3.7,4.7,5.7]]] ''' # 当是decoder时,结果为[batchsize*multiheadnum, decoder_inputs_length, encoder_inputs_length] score = tf.matmul(Q, tf.transpose(K, [0, 2, 1])) / tf.sqrt(self.embedding_size / self.multihead_num) #除sqrt(embedding_size),因为词向量被分割,所以是embedding_size/multihead_num # mask if score_mask is not None: score *= score_mask score += ((score_mask - 1) * 1e+9) # softmax softmax = tf.nn.softmax(score, dim=2) # dropout softmax = tf.nn.dropout(softmax, keep_prob=self.keep_prob) # attention attention = tf.matmul(softmax, V) # 将multi-head的输出进行拼接 concat = tf.concat(tf.split(attention, self.multihead_num, axis=0), axis=-1) # Linear Multihead = tf.layers.dense(concat, units=self.embedding_size, activation=activation, use_bias=False, name='linear') # output mask 将pad对应的值置为0 if output_mask is not None: Multihead *= output_mask # 残差连接前的dropout Multihead = tf.nn.dropout(Multihead, keep_prob=self.keep_prob) # 残差连接 Multihead += query # Layer Norm 标准化 Multihead = tf.contrib.layers.layer_norm(Multihead, begin_norm_axis=2) return Multihead
def feed_forward_layer(self, inputs, output_mask=None, activation=None, name=None): """ point-wise feed_forward sub_layer :param inputs: :param output_mask: :param activation: :param name: :return: """ with tf.variable_scope(name, reuse=tf.AUTO_REUSE): # dense layer inner_layer = tf.layers.dense(inputs, units=4 * self.embedding_size, activation=activation) dense = tf.layers.dense(inner_layer, units=self.embedding_size, activation=None) # output mask if output_mask is not None: dense *= output_mask # dropout dense = tf.nn.dropout(dense, keep_prob=self.keep_prob) # 残差连接 dense += inputs # Layer Norm dense = tf.contrib.layers.layer_norm(dense, begin_norm_axis=2) return dense
def train(self, sess, encoder_inputs, encoder_inputs_length, decoder_inputs,
decoder_targets, decoder_targets_length, itf_weight,
keep_prob=transformer_config.keep_prob):
feed_dict = {self.encoder_inputs: encoder_inputs,
self.encoder_inputs_length: encoder_inputs_length,
self.decoder_inputs: decoder_inputs,
self.decoder_targets: decoder_targets,
self.decoder_targets_length: decoder_targets_length,
self.keep_prob: keep_prob,
self.batch_size: len(encoder_inputs),
self.itf_weight: itf_weight}
_, train_loss = sess.run([self.train_op, self.loss], feed_dict=feed_dict)
return train_loss
def eval(self, sess, encoder_inputs_val, encoder_inputs_length_val, decoder_inputs_val,
decoder_targets_val, decoder_targets_length_val, itf_weight_val):
feed_dict = {self.encoder_inputs: encoder_inputs_val,
self.encoder_inputs_length: encoder_inputs_length_val,
self.decoder_inputs: decoder_inputs_val,
self.decoder_targets: decoder_targets_val,
self.decoder_targets_length: decoder_targets_length_val,
self.keep_prob: 1.0,
self.batch_size: len(encoder_inputs_val),
self.itf_weight: itf_weight_val}
val_loss = sess.run(self.loss, feed_dict=feed_dict)
summary = sess.run(self.merged, feed_dict=feed_dict)
return val_loss, summary
encoder层输入=>词向量化处理,并与位置编码相加=>自注意层的计算=>得到self-attention层的输出并进行前馈神经网络运算=>得到encoder层输出
decoder层输入=>词向量化处理,并与位置编码相加=>自注意层的计算=>使用自注意层的输出与encoder层的输出来共同参与到编码解码注意层的计算=>前馈神经网络计算=>得到encoder层输出
解码编码层注意力层的实现是使用encoder层的输出来求出V和K向量,用decoder自注意层的输出来求出Q向量,然后进行计算
参考链接:
BERT大火却不懂Transformer?读这一篇就够了
Transformer: NLP里的变形金刚 — 详述
完全图解自然语言处理中的Transformer——BERT基础(入门长文)
Transformer文本生成与tensorflow实现
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。