赞
踩
NLP 的序列属性和 RL 天然适配,所以 NLP 里的一些模型也可以用到 RL 里面,如 Transformer。去年发表的 MATransformer 在一些多智能体任务上超过了 MAPPO,可见 Transformer 在 RL 上有巨大的发展潜力。这篇文章用来回顾 NLP 基础模型。
NLP 会收集大量的文本,然后把文本转化为词表(字典数据,key 是 token,value 是 id,所以会经常见到两个字典 id2word 和 word2id),中文还会涉及到分词。
#@save d2l.DATA_HUB['time_machine'] = (d2l.DATA_URL + 'timemachine.txt', '090b5e7e70c295757f55df93cb0a180b9691891a') def read_time_machine(): #@save """将时间机器数据集加载到文本行的列表中""" with open(d2l.download('time_machine'), 'r') as f: lines = f.readlines() return [re.sub('[^A-Za-z]+', ' ', line).strip().lower() for line in lines] def tokenize(lines, token='word'): #@save """将文本行拆分为单词或字符词元""" if token == 'word': return [line.split() for line in lines] elif token == 'char': return [list(line) for line in lines] else: print('错误:未知词元类型:' + token) def count_corpus(tokens): #@save """统计词元的频率""" # 这里的tokens是1D列表或2D列表 if len(tokens) == 0 or isinstance(tokens[0], list): # 将词元列表展平成一个列表 tokens = [token for line in tokens for token in line] return collections.Counter(tokens) class Vocab: #@save """文本词表""" def __init__(self, tokens=None, min_freq=0, reserved_tokens=None): if tokens is None: tokens = [] if reserved_tokens is None: reserved_tokens = [] # 按出现频率排序(从大到小) counter = count_corpus(tokens) self._token_freqs = sorted(counter.items(), key=lambda x: x[1], reverse=True) # 未知词元的索引为0 self.idx_to_token = ['<unk>'] + reserved_tokens self.token_to_idx = {token: idx for idx, token in enumerate(self.idx_to_token)} for token, freq in self._token_freqs: # 词频小于 min_freq 的被定义为 <unk> if freq < min_freq: break if token not in self.token_to_idx: self.idx_to_token.append(token) self.token_to_idx[token] = len(self.idx_to_token) - 1 def __len__(self): return len(self.idx_to_token) def __getitem__(self, tokens): if not isinstance(tokens, (list, tuple)): return self.token_to_idx.get(tokens, self.unk) return [self.__getitem__(token) for token in tokens] def to_tokens(self, indices): if not isinstance(indices, (list, tuple)): return self.idx_to_token[indices] return [self.idx_to_token[index] for index in indices] @property def unk(self): # 未知词元的索引为0 return 0 @property def token_freqs(self): return self._token_freqs def load_corpus_time_machine(max_tokens=-1): #@save """返回时光机器数据集的词元索引列表和词表""" lines = read_time_machine() tokens = tokenize(lines, 'char') vocab = Vocab(tokens) # 因为时光机器数据集中的每个文本行不一定是一个句子或一个段落, # 所以将所有文本行展平到一个列表中 corpus = [vocab[token] for line in tokens for token in line] if max_tokens > 0: corpus = corpus[:max_tokens] return corpus, vocab corpus, vocab = load_corpus_time_machine() len(corpus), len(vocab)
H t = ϕ ( X t W x h + H t − 1 W h h + b h ) \mathbf{H}_t=\phi\left(\mathbf{X}_t \mathbf{W}_{x h}+\mathbf{H}_{t-1} \mathbf{W}_{h h}+\mathbf{b}_h\right) Ht=ϕ(XtWxh+Ht−1Whh+bh)
O t = H t W h o + b o \mathbf{O}_t=\mathbf{H}_t \mathbf{W}_{h o}+\mathbf{b}_o Ot=HtWho+bo
exp
(
−
1
n
∑
t
=
1
n
log
P
(
x
t
∣
x
t
−
1
,
…
,
x
1
)
)
\exp \left(-\frac{1}{n} \sum_{t=1}^n \log P\left(x_t \mid x_{t-1}, \ldots, x_1\right)\right)
exp(−n1t=1∑nlogP(xt∣xt−1,…,x1))
就是 exp(平均交叉熵)
在最好的情况下,模型总是完美地估计标签词元的概率为1。 在这种情况下,模型的困惑度为1。在最坏的情况下,模型总是预测标签词元的概率为0。 在这种情况下,困惑度是正无穷大。
# 代码里 hq 就是图里的 ho import math import torch from torch import nn from torch.nn import functional as F from d2l import torch as d2l batch_size, num_steps = 32, 35 train_iter, vocab = d2l.load_data_time_machine(batch_size, num_steps) def get_params(vocab_size, num_hiddens, device): num_inputs = num_outputs = vocab_size def normal(shape): return torch.randn(size=shape, device=device) * 0.01 # 隐藏层参数 W_xh = normal((num_inputs, num_hiddens)) W_hh = normal((num_hiddens, num_hiddens)) b_h = torch.zeros(num_hiddens, device=device) # 输出层参数 W_hq = normal((num_hiddens, num_outputs)) b_q = torch.zeros(num_outputs, device=device) # 附加梯度 params = [W_xh, W_hh, b_h, W_hq, b_q] for param in params: param.requires_grad_(True) return params # 0 时刻的隐藏状态 def init_rnn_state(batch_size, num_hiddens, device): return (torch.zeros((batch_size, num_hiddens), device=device), ) # forward 函数(整个 rnn,不光是一个时间步) def rnn(inputs, state, params): # inputs的形状:(时间步数量,批量大小,词表大小) W_xh, W_hh, b_h, W_hq, b_q = params H, = state outputs = [] # X的形状:(批量大小,词表大小) for X in inputs: H = torch.tanh(torch.mm(X, W_xh) + torch.mm(H, W_hh) + b_h) Y = torch.mm(H, W_hq) + b_q outputs.append(Y) return torch.cat(outputs, dim=0), (H,) class RNNModelScratch: #@save """从零开始实现的循环神经网络模型""" def __init__(self, vocab_size, num_hiddens, device, get_params, init_state, forward_fn): self.vocab_size, self.num_hiddens = vocab_size, num_hiddens self.params = get_params(vocab_size, num_hiddens, device) self.init_state, self.forward_fn = init_state, forward_fn def __call__(self, X, state): X = F.one_hot(X.T, self.vocab_size).type(torch.float32) return self.forward_fn(X, state, self.params) def begin_state(self, batch_size, device): return self.init_state(batch_size, self.num_hiddens, device) def predict_ch8(prefix, num_preds, net, vocab, device): #@save """在prefix后面生成新字符""" state = net.begin_state(batch_size=1, device=device) outputs = [vocab[prefix[0]]] get_input = lambda: torch.tensor([outputs[-1]], device=device).reshape((1, 1)) for y in prefix[1:]: # prefix 里是已经知道的句子 _, state = net(get_input(), state) outputs.append(vocab[y]) for _ in range(num_preds): # 从 prefix[-1] 预测 num_preds 步得到新的句子 y, state = net(get_input(), state) outputs.append(int(y.argmax(dim=1).reshape(1))) return ''.join([vocab.idx_to_token[i] for i in outputs]) def grad_clipping(net, theta): #@save """裁剪梯度""" # 这里是所有参数,不是单独某一层 if isinstance(net, nn.Module): params = [p for p in net.parameters() if p.requires_grad] else: params = net.params norm = torch.sqrt(sum(torch.sum((p.grad ** 2)) for p in params)) if norm > theta: for param in params: param.grad[:] *= theta / norm #@save def train_epoch_ch8(net, train_iter, loss, updater, device, use_random_iter): """ 训练网络一个迭代周期(定义见第8章) use_random_iter:每个 batch 是否时序上相关,True 代表不相关 """ state, timer = None, d2l.Timer() metric = d2l.Accumulator(2) # 训练损失之和,词元数量 for X, Y in train_iter: if state is None or use_random_iter: # 在第一次迭代或使用随机抽样时初始化state state = net.begin_state(batch_size=X.shape[0], device=device) else: # detach 的目的是为了获取时序上相关的 state,而不应该包含梯度 if isinstance(net, nn.Module) and not isinstance(state, tuple): # state 对于 nn.GRU 是个张量 state.detach_() else: # state 对于 nn.LSTM 或对于我们从零开始实现的模型是个张量 for s in state: s.detach_() y = Y.T.reshape(-1) X, y = X.to(device), y.to(device) y_hat, state = net(X, state) # 就是一个多分类问题,所以就在前面就 concat 在一起 l = loss(y_hat, y.long()).mean() if isinstance(updater, torch.optim.Optimizer): updater.zero_grad() l.backward() grad_clipping(net, 1) updater.step() else: l.backward() grad_clipping(net, 1) # 因为已经调用了mean函数 updater(batch_size=1) metric.add(l * y.numel(), y.numel()) # 返回困惑度,训练速度 return math.exp(metric[0] / metric[1]), metric[1] / timer.stop() #@save def train_ch8(net, train_iter, vocab, lr, num_epochs, device, use_random_iter=False): """训练模型(定义见第8章)""" loss = nn.CrossEntropyLoss() animator = d2l.Animator(xlabel='epoch', ylabel='perplexity', legend=['train'], xlim=[10, num_epochs]) # 初始化 if isinstance(net, nn.Module): updater = torch.optim.SGD(net.parameters(), lr) else: updater = lambda batch_size: d2l.sgd(net.params, lr, batch_size) predict = lambda prefix: predict_ch8(prefix, 50, net, vocab, device) # 训练和预测 for epoch in range(num_epochs): ppl, speed = train_epoch_ch8( net, train_iter, loss, updater, device, use_random_iter) if (epoch + 1) % 10 == 0: print(predict('time traveller')) animator.add(epoch + 1, [ppl]) print(f'困惑度 {ppl:.1f}, {speed:.1f} 词元/秒 {str(device)}') print(predict('time traveller')) print(predict('traveller')) num_hiddens = 512 net = RNNModelScratch(len(vocab), num_hiddens, d2l.try_gpu(), get_params, init_rnn_state, rnn) state = net.begin_state(X.shape[0], d2l.try_gpu()) num_epochs, lr = 500, 1 train_ch8(net, train_iter, vocab, lr, num_epochs, d2l.try_gpu()) # 简洁实现 num_hiddens = 256 rnn_layer = nn.RNN(len(vocab), num_hiddens) #@save class RNNModel(nn.Module): """循环神经网络模型""" def __init__(self, rnn_layer, vocab_size, **kwargs): super(RNNModel, self).__init__(**kwargs) self.rnn = rnn_layer self.vocab_size = vocab_size self.num_hiddens = self.rnn.hidden_size # 如果RNN是双向的(之后将介绍),num_directions应该是2,否则应该是1 # 使用 pytorch 需要构造自己的输出层 if not self.rnn.bidirectional: self.num_directions = 1 self.linear = nn.Linear(self.num_hiddens, self.vocab_size) else: self.num_directions = 2 self.linear = nn.Linear(self.num_hiddens * 2, self.vocab_size) def forward(self, inputs, state): X = F.one_hot(inputs.T.long(), self.vocab_size) X = X.to(torch.float32) Y, state = self.rnn(X, state) # 全连接层首先将Y的形状改为(时间步数*批量大小,隐藏单元数) # 它的输出形状是(时间步数*批量大小,词表大小)。 output = self.linear(Y.reshape((-1, Y.shape[-1]))) return output, state def begin_state(self, device, batch_size=1): if not isinstance(self.rnn, nn.LSTM): # nn.GRU以张量作为隐状态(可以多层和双向) return torch.zeros((self.num_directions * self.rnn.num_layers, batch_size, self.num_hiddens), device=device) else: # nn.LSTM以元组作为隐状态 return (torch.zeros(( self.num_directions * self.rnn.num_layers, batch_size, self.num_hiddens), device=device), torch.zeros(( self.num_directions * self.rnn.num_layers, batch_size, self.num_hiddens), device=device)) num_epochs, lr = 500, 1 device = d2l.try_gpu() state = torch.zeros((1, batch_size, num_hiddens)) net = RNNModel(rnn_layer, vocab_size=len(vocab)) net = net.to(device) d2l.train_ch8(net, train_iter, vocab, lr, num_epochs, device) d2l.predict_ch8('time traveller', 10, net, vocab, device)
R
t
=
σ
(
X
t
W
x
r
+
H
t
−
1
W
h
r
+
b
r
)
Z
t
=
σ
(
X
t
W
x
z
+
H
t
−
1
W
h
z
+
b
z
)
H
~
t
=
tanh
(
X
t
W
x
h
+
(
R
t
⊙
H
t
−
1
)
W
h
h
+
b
h
)
H
t
=
Z
t
⊙
H
t
−
1
+
(
1
−
Z
t
)
⊙
H
~
t
重置门有助于捕获序列中的短期依赖关系更新门有助于捕获序列中的长期依赖关系
import torch from torch import nn from d2l import torch as d2l batch_size, num_steps = 32, 35 train_iter, vocab = d2l.load_data_time_machine(batch_size, num_steps) def get_params(vocab_size, num_hiddens, device): num_inputs = num_outputs = vocab_size def normal(shape): return torch.randn(size=shape, device=device)*0.01 def three(): return (normal((num_inputs, num_hiddens)), normal((num_hiddens, num_hiddens)), torch.zeros(num_hiddens, device=device)) W_xz, W_hz, b_z = three() # 更新门参数 W_xr, W_hr, b_r = three() # 重置门参数 W_xh, W_hh, b_h = three() # 候选隐状态参数 # 输出层参数 W_hq = normal((num_hiddens, num_outputs)) b_q = torch.zeros(num_outputs, device=device) # 附加梯度 params = [W_xz, W_hz, b_z, W_xr, W_hr, b_r, W_xh, W_hh, b_h, W_hq, b_q] for param in params: param.requires_grad_(True) return params def init_gru_state(batch_size, num_hiddens, device): return (torch.zeros((batch_size, num_hiddens), device=device), ) def gru(inputs, state, params): W_xz, W_hz, b_z, W_xr, W_hr, b_r, W_xh, W_hh, b_h, W_hq, b_q = params H, = state outputs = [] for X in inputs: Z = torch.sigmoid((X @ W_xz) + (H @ W_hz) + b_z) R = torch.sigmoid((X @ W_xr) + (H @ W_hr) + b_r) H_tilda = torch.tanh((X @ W_xh) + ((R * H) @ W_hh) + b_h) H = Z * H + (1 - Z) * H_tilda Y = H @ W_hq + b_q outputs.append(Y) return torch.cat(outputs, dim=0), (H,) vocab_size, num_hiddens, device = len(vocab), 256, d2l.try_gpu() num_epochs, lr = 500, 1 model = d2l.RNNModelScratch(len(vocab), num_hiddens, device, get_params, init_gru_state, gru) d2l.train_ch8(model, train_iter, vocab, lr, num_epochs, device) # 简洁实现 num_inputs = vocab_size gru_layer = nn.GRU(num_inputs, num_hiddens) model = d2l.RNNModel(gru_layer, len(vocab)) model = model.to(device) d2l.train_ch8(model, train_iter, vocab, lr, num_epochs, device)
I
t
=
σ
(
X
t
W
x
i
+
H
t
−
1
W
h
i
+
b
i
)
F
t
=
σ
(
X
t
W
x
f
+
H
t
−
1
W
h
f
+
b
f
)
O
t
=
σ
(
X
t
W
x
o
+
H
t
−
1
W
h
o
+
b
o
)
C
~
t
=
tanh
(
X
t
W
x
c
+
H
t
−
1
W
h
c
+
b
c
)
C
t
=
F
t
⊙
C
t
−
1
+
I
t
⊙
C
~
t
H
t
=
O
t
⊙
tanh
(
C
t
)
直觉上和 GRU 差不多,所以 GRU 实际上简化了 LSTM。
from mxnet import np, npx from mxnet.gluon import rnn from d2l import mxnet as d2l npx.set_np() batch_size, num_steps = 32, 35 train_iter, vocab = d2l.load_data_time_machine(batch_size, num_steps) def get_lstm_params(vocab_size, num_hiddens, device): num_inputs = num_outputs = vocab_size def normal(shape): return torch.randn(size=shape, device=device)*0.01 def three(): return (normal((num_inputs, num_hiddens)), normal((num_hiddens, num_hiddens)), torch.zeros(num_hiddens, device=device)) W_xi, W_hi, b_i = three() # 输入门参数 W_xf, W_hf, b_f = three() # 遗忘门参数 W_xo, W_ho, b_o = three() # 输出门参数 W_xc, W_hc, b_c = three() # 候选记忆元参数 # 输出层参数 W_hq = normal((num_hiddens, num_outputs)) b_q = torch.zeros(num_outputs, device=device) # 附加梯度 params = [W_xi, W_hi, b_i, W_xf, W_hf, b_f, W_xo, W_ho, b_o, W_xc, W_hc, b_c, W_hq, b_q] for param in params: param.requires_grad_(True) return params def init_lstm_state(batch_size, num_hiddens, device): return (torch.zeros((batch_size, num_hiddens), device=device), torch.zeros((batch_size, num_hiddens), device=device)) def lstm(inputs, state, params): [W_xi, W_hi, b_i, W_xf, W_hf, b_f, W_xo, W_ho, b_o, W_xc, W_hc, b_c, W_hq, b_q] = params (H, C) = state outputs = [] for X in inputs: I = torch.sigmoid((X @ W_xi) + (H @ W_hi) + b_i) F = torch.sigmoid((X @ W_xf) + (H @ W_hf) + b_f) O = torch.sigmoid((X @ W_xo) + (H @ W_ho) + b_o) C_tilda = torch.tanh((X @ W_xc) + (H @ W_hc) + b_c) C = F * C + I * C_tilda H = O * torch.tanh(C) Y = (H @ W_hq) + b_q outputs.append(Y) return torch.cat(outputs, dim=0), (H, C) vocab_size, num_hiddens, device = len(vocab), 256, d2l.try_gpu() num_epochs, lr = 500, 1 model = d2l.RNNModelScratch(len(vocab), num_hiddens, device, get_lstm_params, init_lstm_state, lstm) d2l.train_ch8(model, train_iter, vocab, lr, num_epochs, device) # 简洁实现 num_inputs = vocab_size lstm_layer = nn.LSTM(num_inputs, num_hiddens) model = d2l.RNNModel(lstm_layer, len(vocab)) model = model.to(device) d2l.train_ch8(model, train_iter, vocab, lr, num_epochs, device)
vocab_size, num_hiddens, num_layers = len(vocab), 256, 2
num_inputs = vocab_size
device = d2l.try_gpu()
lstm_layer = nn.LSTM(num_inputs, num_hiddens, num_layers)
model = d2l.RNNModel(lstm_layer, len(vocab))
model = model.to(device)
H
→
t
=
ϕ
(
X
t
W
x
h
(
f
)
+
H
→
t
−
1
W
h
h
(
f
)
+
b
h
(
f
)
)
H
←
t
=
ϕ
(
X
t
W
x
h
(
b
)
+
H
←
t
+
1
W
h
h
(
b
)
+
b
h
(
b
)
)
O
t
=
[
H
→
t
,
H
←
t
]
W
h
q
+
b
q
import torch from torch import nn from d2l import torch as d2l # 加载数据 batch_size, num_steps, device = 32, 35, d2l.try_gpu() train_iter, vocab = d2l.load_data_time_machine(batch_size, num_steps) # 通过设置“bidirective=True”来定义双向LSTM模型 vocab_size, num_hiddens, num_layers = len(vocab), 256, 2 num_inputs = vocab_size lstm_layer = nn.LSTM(num_inputs, num_hiddens, num_layers, bidirectional=True) model = d2l.RNNModel(lstm_layer, len(vocab)) model = model.to(device) # 训练模型 num_epochs, lr = 500, 1 d2l.train_ch8(model, train_iter, vocab, lr, num_epochs, device)
训练
预测(预测的时候不知道每个时间步的输入,所以和训练不一样)
多层结构
exp ( min ( 0 , 1 − len label len pred ) ) ∏ n = 1 k p n 1 / 2 n \exp \left(\min \left(0,1-\frac{\text { len }_{\text {label }}}{\text { len }_{\text {pred }}}\right)\right) \prod_{n=1}^k p_n^{1 / 2^n} exp(min(0,1− len pred len label ))n=1∏kpn1/2n
p
n
p_n
pn
是预测中所有 n-gram 的精度标签序列 A B C D E F 和预测序列 A B B C D, 有
p
1
=
4
/
5
,
p
2
=
3
/
4
,
p
3
=
1
/
3
,
p
4
=
0
p_1=4 / 5, p_2=3 / 4, p_3=1 / 3, p_4=0
p1=4/5,p2=3/4,p3=1/3,p4=0
min(…) 可以惩罚过短的预测,长匹配有高权重
import collections import math import torch from torch import nn from d2l import torch as d2l #@save class Seq2SeqEncoder(d2l.Encoder): """用于序列到序列学习的循环神经网络编码器""" def __init__(self, vocab_size, embed_size, num_hiddens, num_layers, dropout=0, **kwargs): super(Seq2SeqEncoder, self).__init__(**kwargs) # 嵌入层 self.embedding = nn.Embedding(vocab_size, embed_size) # 这里可以使用双向 self.rnn = nn.GRU(embed_size, num_hiddens, num_layers, dropout=dropout) def forward(self, X, *args): # 输出'X'的形状:(batch_size,num_steps,embed_size) X = self.embedding(X) # 在循环神经网络模型中,第一个轴对应于时间步 X = X.permute(1, 0, 2) # 如果未提及状态,则默认为0 output, state = self.rnn(X) # output的形状:(num_steps,batch_size,num_hiddens) # state的形状:(num_layers,batch_size,num_hiddens) return output, state class Seq2SeqDecoder(d2l.Decoder): """用于序列到序列学习的循环神经网络解码器""" def __init__(self, vocab_size, embed_size, num_hiddens, num_layers, dropout=0, **kwargs): super(Seq2SeqDecoder, self).__init__(**kwargs) self.embedding = nn.Embedding(vocab_size, embed_size) self.rnn = nn.GRU(embed_size + num_hiddens, num_hiddens, num_layers, dropout=dropout) self.dense = nn.Linear(num_hiddens, vocab_size) def init_state(self, enc_outputs, *args): # 0:output, 1:state decoder 只使用 state return enc_outputs[1] def forward(self, X, state): # 输出'X'的形状:(batch_size,num_steps,embed_size) X = self.embedding(X).permute(1, 0, 2) # 广播context,使其具有与X相同的num_steps context = state[-1].repeat(X.shape[0], 1, 1) X_and_context = torch.cat((X, context), 2) output, state = self.rnn(X_and_context, state) output = self.dense(output).permute(1, 0, 2) # output的形状:(batch_size,num_steps,vocab_size) # state的形状:(num_layers,batch_size,num_hiddens) return output, state #@save class EncoderDecoder(nn.Block): """编码器-解码器架构的基类""" def __init__(self, encoder, decoder, **kwargs): super(EncoderDecoder, self).__init__(**kwargs) self.encoder = encoder self.decoder = decoder def forward(self, enc_X, dec_X, *args): enc_outputs = self.encoder(enc_X, *args) dec_state = self.decoder.init_state(enc_outputs, *args) return self.decoder(dec_X, dec_state) #@save def sequence_mask(X, valid_len, value=0): """ 在序列中屏蔽不相关的项 X = torch.tensor([[1, 2, 3], [4, 5, 6]]) valid_len = torch.tensor([1, 2]) mask = [[ True, False, False], [ True, True, False]] """ maxlen = X.size(1) mask = torch.arange((maxlen), dtype=torch.float32, device=X.device)[None, :] < valid_len[:, None] X[~mask] = value return X #@save class MaskedSoftmaxCELoss(nn.CrossEntropyLoss): """ 带遮蔽的softmax交叉熵损失函数 为了不同长度的解码器数据可以批量训练 """ # pred的形状:(batch_size,num_steps,vocab_size) # label的形状:(batch_size,num_steps) # valid_len的形状:(batch_size,) def forward(self, pred, label, valid_len): weights = torch.ones_like(label) weights = sequence_mask(weights, valid_len) self.reduction='none' # 不计算 mean unweighted_loss = super(MaskedSoftmaxCELoss, self).forward( pred.permute(0, 2, 1), label) # 计算平均交叉熵 weighted_loss = (unweighted_loss * weights).mean(dim=1) return weighted_loss #@save def train_seq2seq(net, data_iter, lr, num_epochs, tgt_vocab, device): """训练序列到序列模型""" def xavier_init_weights(m): if type(m) == nn.Linear: nn.init.xavier_uniform_(m.weight) if type(m) == nn.GRU: for param in m._flat_weights_names: if "weight" in param: nn.init.xavier_uniform_(m._parameters[param]) net.apply(xavier_init_weights) net.to(device) optimizer = torch.optim.Adam(net.parameters(), lr=lr) loss = MaskedSoftmaxCELoss() net.train() animator = d2l.Animator(xlabel='epoch', ylabel='loss', xlim=[10, num_epochs]) for epoch in range(num_epochs): timer = d2l.Timer() metric = d2l.Accumulator(2) # 训练损失总和,词元数量 for batch in data_iter: optimizer.zero_grad() X, X_valid_len, Y, Y_valid_len = [x.to(device) for x in batch] bos = torch.tensor([tgt_vocab['<bos>']] * Y.shape[0], device=device).reshape(-1, 1) dec_input = torch.cat([bos, Y[:, :-1]], 1) # 强制教学 Y_hat, _ = net(X, dec_input, X_valid_len) l = loss(Y_hat, Y, Y_valid_len) l.sum().backward() # 损失函数的标量进行“反向传播” d2l.grad_clipping(net, 1) num_tokens = Y_valid_len.sum() optimizer.step() with torch.no_grad(): metric.add(l.sum(), num_tokens) if (epoch + 1) % 10 == 0: animator.add(epoch + 1, (metric[0] / metric[1],)) print(f'loss {metric[0] / metric[1]:.3f}, {metric[1] / timer.stop():.1f} ' f'tokens/sec on {str(device)}') def bleu(pred_seq, label_seq, k): #@save """计算BLEU""" pred_tokens, label_tokens = pred_seq.split(' '), label_seq.split(' ') len_pred, len_label = len(pred_tokens), len(label_tokens) score = math.exp(min(0, 1 - len_label / len_pred)) for n in range(1, k + 1): num_matches, label_subs = 0, collections.defaultdict(int) for i in range(len_label - n + 1): label_subs[' '.join(label_tokens[i: i + n])] += 1 for i in range(len_pred - n + 1): if label_subs[' '.join(pred_tokens[i: i + n])] > 0: num_matches += 1 label_subs[' '.join(pred_tokens[i: i + n])] -= 1 score *= math.pow(num_matches / (len_pred - n + 1), math.pow(0.5, n)) return score #@save def predict_seq2seq(net, src_sentence, src_vocab, tgt_vocab, num_steps, device, save_attention_weights=False): """序列到序列模型的预测""" # 在预测时将net设置为评估模式 net.eval() src_tokens = src_vocab[src_sentence.lower().split(' ')] + [ src_vocab['<eos>']] enc_valid_len = torch.tensor([len(src_tokens)], device=device) src_tokens = d2l.truncate_pad(src_tokens, num_steps, src_vocab['<pad>']) # 添加批量轴 enc_X = torch.unsqueeze( torch.tensor(src_tokens, dtype=torch.long, device=device), dim=0) enc_outputs = net.encoder(enc_X, enc_valid_len) dec_state = net.decoder.init_state(enc_outputs, enc_valid_len) # 添加批量轴 dec_X = torch.unsqueeze(torch.tensor( [tgt_vocab['<bos>']], dtype=torch.long, device=device), dim=0) output_seq, attention_weight_seq = [], [] for _ in range(num_steps): Y, dec_state = net.decoder(dec_X, dec_state) # 我们使用具有预测最高可能性的词元,作为解码器在下一时间步的输入 dec_X = Y.argmax(dim=2) pred = dec_X.squeeze(dim=0).type(torch.int32).item() # 保存注意力权重(稍后讨论) if save_attention_weights: attention_weight_seq.append(net.decoder.attention_weights) # 一旦序列结束词元被预测,输出序列的生成就完成了 if pred == tgt_vocab['<eos>']: break output_seq.append(pred) return ' '.join(tgt_vocab.to_tokens(output_seq)), attention_weight_seq # 训练 embed_size, num_hiddens, num_layers, dropout = 32, 32, 2, 0.1 batch_size, num_steps = 64, 10 lr, num_epochs, device = 0.005, 300, d2l.try_gpu() train_iter, src_vocab, tgt_vocab = d2l.load_data_nmt(batch_size, num_steps) encoder = Seq2SeqEncoder(len(src_vocab), embed_size, num_hiddens, num_layers, dropout) decoder = Seq2SeqDecoder(len(tgt_vocab), embed_size, num_hiddens, num_layers, dropout) net = d2l.EncoderDecoder(encoder, decoder) train_seq2seq(net, train_iter, lr, num_epochs, tgt_vocab, device) # 预测 engs = ['go .', "i lost .", 'he\'s calm .', 'i\'m home .'] fras = ['va !', 'j\'ai perdu .', 'il est calme .', 'je suis chez moi .'] for eng, fra in zip(engs, fras): translation, attention_weight_seq = predict_seq2seq( net, eng, src_vocab, tgt_vocab, num_steps, device) print(f'{eng} => {translation}, bleu {bleu(translation, fra, k=2):.3f}')
注意力评分函数:
加性注意力
a ( q , k ) = w v ⊤ tanh ( W q q + W k k ) ∈ R a(\mathbf{q}, \mathbf{k})=\mathbf{w}_v^{\top} \tanh \left(\mathbf{W}_q \mathbf{q}+\mathbf{W}_k \mathbf{k}\right) \in \mathbb{R} a(q,k)=wv⊤tanh(Wqq+Wkk)∈R
缩放点积注意力
a ( q , k ) = q ⊤ k / d a(\mathbf{q}, \mathbf{k})=\mathbf{q}^{\top} \mathbf{k} / \sqrt{d} a(q,k)=q⊤k/d
矩阵形式:
softmax
(
Q
K
⊤
d
)
V
∈
R
n
×
v
\operatorname{softmax}\left(\frac{\mathbf{Q} \mathbf{K}^{\top}}{\sqrt{d}}\right) \mathbf{V} \in \mathbb{R}^{n \times v}
softmax(d
QK⊤)V∈Rn×v
缩放的目的:
假设查询和键的所有元素都是独立的随机变量, 并且都满足零均值和单位方差, 那么两个向量的点积的均值为0
,方差为d。 为确保无论向量长度如何, 点积的方差在不考虑向量长度的情况下仍然是 1。
2. seq2seq 里的注意力
import math import torch from torch import nn from d2l import torch as d2l #@save def masked_softmax(X, valid_lens): """ 通过在最后一个轴上掩蔽元素来执行softmax操作 masked_softmax(torch.rand(2, 2, 4), torch.tensor([2, 3])) output: tensor([[[0.3313, 0.6687, 0.0000, 0.0000], [0.4467, 0.5533, 0.0000, 0.0000]], [[0.1959, 0.4221, 0.3820, 0.0000], [0.3976, 0.2466, 0.3558, 0.0000]]]) masked_softmax(torch.rand(2, 2, 4), torch.tensor([[1, 3], [2, 4]])) output: tensor([[[1.0000, 0.0000, 0.0000, 0.0000], [0.3195, 0.2861, 0.3944, 0.0000]], [[0.4664, 0.5336, 0.0000, 0.0000], [0.2542, 0.2298, 0.1925, 0.3234]]]) """ # X:3D张量,valid_lens:1D或2D张量 if valid_lens is None: return nn.functional.softmax(X, dim=-1) else: shape = X.shape if valid_lens.dim() == 1: valid_lens = torch.repeat_interleave(valid_lens, shape[1]) else: valid_lens = valid_lens.reshape(-1) # 最后一轴上被掩蔽的元素使用一个非常大的负值替换,从而其softmax输出为0 X = d2l.sequence_mask(X.reshape(-1, shape[-1]), valid_lens, value=-1e6) return nn.functional.softmax(X.reshape(shape), dim=-1) #@save class AdditiveAttention(nn.Module): """加性注意力""" def __init__(self, key_size, query_size, num_hiddens, dropout, **kwargs): super(AdditiveAttention, self).__init__(**kwargs) self.W_k = nn.Linear(key_size, num_hiddens, bias=False) self.W_q = nn.Linear(query_size, num_hiddens, bias=False) self.w_v = nn.Linear(num_hiddens, 1, bias=False) self.dropout = nn.Dropout(dropout) def forward(self, queries, keys, values, valid_lens): queries, keys = self.W_q(queries), self.W_k(keys) # 在维度扩展后, # queries的形状:(batch_size,查询的个数,1,num_hidden) # key的形状:(batch_size,1,“键-值”对的个数,num_hiddens) # 使用广播方式进行求和 每个 q 都要和 k 做操作 features = queries.unsqueeze(2) + keys.unsqueeze(1) features = torch.tanh(features) # self.w_v仅有一个输出,因此从形状中移除最后那个维度。 # scores的形状:(batch_size,查询的个数,“键-值”对的个数) scores = self.w_v(features).squeeze(-1) self.attention_weights = masked_softmax(scores, valid_lens) # values的形状:(batch_size,“键-值”对的个数,值的维度) return torch.bmm(self.dropout(self.attention_weights), values) #@save class DotProductAttention(nn.Module): """缩放点积注意力""" def __init__(self, dropout, **kwargs): super(DotProductAttention, self).__init__(**kwargs) self.dropout = nn.Dropout(dropout) # queries的形状:(batch_size,查询的个数,d) # keys的形状:(batch_size,“键-值”对的个数,d) # values的形状:(batch_size,“键-值”对的个数,值的维度) # valid_lens的形状:(batch_size,)或者(batch_size,查询的个数) def forward(self, queries, keys, values, valid_lens=None): d = queries.shape[-1] # 设置transpose_b=True为了交换keys的最后两个维度 scores = torch.bmm(queries, keys.transpose(1,2)) / math.sqrt(d) self.attention_weights = masked_softmax(scores, valid_lens) return torch.bmm(self.dropout(self.attention_weights), values) #@save class AttentionDecoder(d2l.Decoder): """带有注意力机制解码器的基本接口""" def __init__(self, **kwargs): super(AttentionDecoder, self).__init__(**kwargs) @property def attention_weights(self): raise NotImplementedError #@save class Seq2SeqAttentionDecoder(AttentionDecoder): def __init__(self, vocab_size, embed_size, num_hiddens, num_layers, dropout=0, **kwargs): super(Seq2SeqAttentionDecoder, self).__init__(**kwargs) self.attention = d2l.AdditiveAttention( num_hiddens, num_hiddens, num_hiddens, dropout) self.embedding = nn.Embedding(vocab_size, embed_size) self.rnn = nn.GRU( embed_size + num_hiddens, num_hiddens, num_layers, dropout=dropout) self.dense = nn.Linear(num_hiddens, vocab_size) def init_state(self, enc_outputs, enc_valid_lens, *args): # outputs的形状为(batch_size,num_steps,num_hiddens). # hidden_state的形状为(num_layers,batch_size,num_hiddens) outputs, hidden_state = enc_outputs return (outputs.permute(1, 0, 2), hidden_state, enc_valid_lens) def forward(self, X, state): # enc_outputs的形状为(batch_size,num_steps,num_hiddens). # hidden_state的形状为(num_layers,batch_size, # num_hiddens) enc_outputs, hidden_state, enc_valid_lens = state # 输出X的形状为(num_steps,batch_size,embed_size) X = self.embedding(X).permute(1, 0, 2) outputs, self._attention_weights = [], [] for x in X: # query的形状为(batch_size,1,num_hiddens) query = torch.unsqueeze(hidden_state[-1], dim=1) # context的形状为(batch_size,1,num_hiddens) context = self.attention( query, enc_outputs, enc_outputs, enc_valid_lens) # 在特征维度上连结 x = torch.cat((context, torch.unsqueeze(x, dim=1)), dim=-1) # 将x变形为(1,batch_size,embed_size+num_hiddens) out, hidden_state = self.rnn(x.permute(1, 0, 2), hidden_state) outputs.append(out) self._attention_weights.append(self.attention.attention_weights) # 全连接层变换后,outputs的形状为 # (num_steps,batch_size,vocab_size) outputs = self.dense(torch.cat(outputs, dim=0)) return outputs.permute(1, 0, 2), [enc_outputs, hidden_state, enc_valid_lens] @property def attention_weights(self): return self._attention_weights embed_size, num_hiddens, num_layers, dropout = 32, 32, 2, 0.1 batch_size, num_steps = 64, 10 lr, num_epochs, device = 0.005, 250, d2l.try_gpu() train_iter, src_vocab, tgt_vocab = d2l.load_data_nmt(batch_size, num_steps) encoder = d2l.Seq2SeqEncoder( len(src_vocab), embed_size, num_hiddens, num_layers, dropout) decoder = Seq2SeqAttentionDecoder( len(tgt_vocab), embed_size, num_hiddens, num_layers, dropout) net = d2l.EncoderDecoder(encoder, decoder) d2l.train_seq2seq(net, train_iter, lr, num_epochs, tgt_vocab, device)
下篇文章回顾 transformer 以及一些常用预训练模型,如 bert、gpt 等。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。