赞
踩
我们知道传统RNN输入和输出数据是等长的,这显然极大限制了他的应用范围。
前面几节我们讲到的循环神经网络的各种变体基本上都在解决一个序列的问题。还有一大类问题涉及到的是两个序列间转换。它是自然语言处理中的一个重要领域,包括机器翻译、语音识别和文本分类等任务。
当然,前面讲过的RNN、LSTM、GRU、BRNN等模型也都可以应用于这类问题。
但是专门设计的模型显然可以更加有针对性。
下面要介绍的编解码网络 (Encoder-Decoder Network)就是其中非常典型的一种。
编解码器网络是由 Francois Cholet et al. 在 2014 年论文 "Sequence to Sequence Learning with Neural Networks"中提出的。
编解码器网络由两个独立的循环神经网络 (RNN) 组成,分别称为编码器和解码器。
编码器将输入序列进行编码,并将编码结果作为解码器的初始状态。
解码器利用编码结果,逐步生成输出序列。
编码器将输入序列 进行编码,并将编码结果作为解码器的初始状态。解码器利用编码结果,逐步生成输出序列 。
编码器是一个循环神经网络,用于将输入序列 X 进行编码。
把不定长的输入序列变换成定长的上下文变量 c ,并在变量中编码输入序列的信息。
不直接对输入序列进行编码,而是通过对应的隐藏状态进行编码。
在每个时间步 t ,编码器会将输入 x t 和上一时间步的隐藏状态 h t-1 作为输入,通过一个函数 f 来计算当前时间步的隐藏状态 h t 。
在整个输入序列的编码过程结束后,编码器会将最终的隐藏状态 h T 作为解码器的初始状态。
解码器也是一个循环神经网络,用于根据编码器的输出生成输出序列 Y 。
在每个时间步 t,解码器会将输出 y t-1 和上一时间步的隐藏状态 s t-1 作为输入,通过一个函数 f d 来计算当前时间步的隐藏状态 s t :
然后,解码器会将隐藏状态 s t 作为输入,通过一个函数 g 来生成当前时间步的输出 y t:
在整个输出序列的生成过程结束后,解码器会将最终的输出序列 作为编解码器网络的输出。
我们前面学习了编码器解码器模型,它是文本处理领域非常流行的一个架构,它解决了传统方式输入大小固定的问题,能够将深度学习更好的应用于NLP任务中。
而基于这个架构的模型,我们可以构建出seq2seq模型,也就是序列到序列模型。故名思意,模型的输入是一个序列,输出也是一个序列。这样的任务大家随随便便就能说出好几种,比如,机器翻译,文本摘要,语音识别等等。
Seq2seq模型最早在2014年,由Ilya Sutskever等提出。当时主要应用在机器翻译的相关问题中,其可以理解为一个适用于处理由句子(段落)X生成句子(段落)Y的通用模型。
对于给定的输入句子X,我们的目标是通过Seq2seq模型生成目标句子Y。
X与Y并不限制为同一种语言。将X和Y的单词序列表示如下:
首先对输入的句子进行建模,循环神经网络将的输入特征向量 x t 和 h t-1 转换为了 h t:
编码器则通过选定的函数q,通过非线性变换将输入向量转化为了中间向量表示c,c也可以称为上下文向量:
而解码器则根据编码器生成的中间向量表示 c 和之前生成的历史信息,生成 t 时刻的输出 y t :
要注意的是,这里编码器和解码器的时间步都用的符号 t 表示,但实际上这两个 t 并不是一回事。通过上面的公式,可以看到 y t 的值取决于前面 t-1 步的输出,因此这里的解码器也是一个循环神经网络。
解码的过程是先利用上一个时间步的输出 y t-1 和上下文向量c,在当前时间步将上一个隐状态 h t-1 转化为当前步的隐状态 h t 。可以得出如下公式:
可以通过解码器的隐状态 h t ,计算出t时刻的输出 y t 。比较常见的计算方法则是softmax。
seq2seq的模型结构有很多种,最简单的模型结构就是下面这样:
在这种结构中,上下文向量 c 只作用于了第一个解码器隐状态,也就是将c当成了初始隐藏状态。c 也可以作用于所有的隐藏状态,此时网络结构就是如下形式:
此时,这依然是比较简单的情况,不再把上下文向量 c 当成是 RNN 的初始隐藏状态,而是当成 RNN 每一个神经元的输入。可以看到在 Decoder 的每一个神经元都拥有相同的输入c。
第三种则是如下结构:
可以看到它在输入的部分多了上一个神经元的输出 y 。
即每一个神经元的输入包括:上一个神经元的隐藏层向量 h,上一个神经元的输出 y,上下文向量c。这里的解码器就和我们前面讲的公式完全吻合了。
对于第一个神经元的输入 y 0 ,通常是句子起始标志位的 embedding 向量。
通过计算:
最大化输出序列的联合概率,等价于最小化负对数(其实就是交叉熵损失函数)。
模型训练时,根据最大似然估计,
老师在课上是这样引入的:
Sequence to Sequence Learning:两个循环神经网络组成。
红色部分和绿色部分都是RNN。
预测任务就是从一个序列到另一个序列。
第一个序列称之为原序列,第二个序列称为目标序列。两者长度可能不同。
网络编码器接收原序列作为输入序列,最终在 t 时刻生成隐藏状态,我们称之为 z,有时也称之为 c,他将作为序列的编码值,是一个固定长度的向量。
解码器网络的输入为,当前的输入 y t 和 z ,输出为 I ,I 将作为下一时刻的输入。这样就可以计算出最终 y1到yT的条件概率。
对于机器翻译而言,编码器依次处理源语言的每一个词,最终得到一个固定长的语义向量 z ,解码器以标志位bos(句子的开头)加上 z 作为输入,预测词的概率,选择概率最高的词 I ,I 和 z一起被送入下一时刻预测下一个词 am ,直到句子的结尾出现 EOS 标志位结束。
你可能会听到:
当需要进行不定长的序列输入输出处理时,既可以使用编码器-解码器的模型,也可以使用seq to seq的模型,有时会混用。
这两种模型是非常像的,只是RNN Cell是不同的,一个选用的是GRU,一个选用的是LSTM,本质都是两个RNN。
- import torch
- import torch.nn as nn
-
- class Encoder(nn.Module):
- def __init__(self, input_size, hidden_size, num_layers):
- super(Encoder, self).__init__()
- self.lstm = nn.LSTM(input_size, hidden_size, num_layers) # LSTM模型
-
- def forward(self, x, hidden):
- x, hidden = self.lstm(x, hidden)
- return hidden # 只需要输出hidden
- class Decoder(nn.Module):
- def __init__(self, output_size, hidden_size, num_layers):
- super(Decoder, self).__init__()
- self.lstm = nn.LSTM(output_size, hidden_size, num_layers) # LSTM模型
- self.linear = nn.Linear(hidden_size, output_size)
-
- def forward(self, x, hidden):
- x, state = self.lstm(x, hidden)
- x = self.linear(x)
- return x, state
- class Seq2Seq(nn.Module):
-
- def __init__(self, encoder, decoder):
- super().__init__()
- self.encoder = encoder
- self.decoder = decoder
-
- def forward(self, encoder_inputs, decoder_inputs):
- return self.decoder(decoder_inputs, self.encoder(encoder_inputs))
根据读音生成字母序列:
- import random
-
- # 数据集生成
- soundmark = ['ei', 'bi:', 'si:', 'di:', 'i:', 'ef', 'dʒi:', 'eit∫', 'ai', 'dʒei', 'kei', 'el', 'em', 'en', 'əu', 'pi:', 'kju:',
- 'ɑ:', 'es', 'ti:', 'ju:', 'vi:', 'd∧blju:', 'eks', 'wai', 'zi:']
-
- alphabet = ['a','b','c','d','e','f','g','h','i','j','k','l','m','n','o','p','q',
- 'r','s','t','u','v','w','x','y','z']
-
- t = 10000 #总条数
- r = 0.9 #扰动项
- seq_len = 6
- src_tokens, tgt_tokens = [],[] #原始序列、目标序列列表
-
- for i in range(t):
- src, tgt = [],[]
- for j in range(seq_len):
- ind = random.randint(0,25)
- src.append(soundmark[ind])
- if random.random() < r:
- tgt.append(alphabet[ind])
- else:
- tgt.append(alphabet[random.randint(0,25)])
- src_tokens.append(src)
- tgt_tokens.append(tgt)
- src_tokens[:2], tgt_tokens[:2]
([['ei', 'si:', 'wai', 'ei', 'el', 'ef'], ['em', 'ti:', 'ai', 'ai', 'ju:', 'ti:']], [['a', 'c', 'y', 'a', 'l', 'f'], ['m', 't', 'v', 'i', 'u', 't']])
- from collections import Counter #计数类
-
- flatten = lambda l: [item for sublist in l for item in sublist] #展平数组
- #无论多少维度,都可以使用该函数展开。它使用了列表推导式和嵌套的 for 循环,可以适用于任意维度的列表,将其展开为一维。
-
- # 构建词表
- class Vocab:
- def __init__(self, tokens):
- self.tokens = tokens # 传入的tokens是二维列表
- self.token2index = {'<bos>': 0, '<eos>': 1} # 先存好特殊词元
- # 将词元按词频排序后生成列表
- self.token2index.update({
- token: index + 2
- for index, (token, freq) in enumerate(
- sorted(Counter(flatten(self.tokens)).items(), key=lambda x: x[1], reverse=True))
- })
- #构建id到词元字典
- self.index2token = {index: token for token, index in self.token2index.items()}
-
- def __getitem__(self, query):
- # 单一索引
- if isinstance(query, (str, int)):
- if isinstance(query, str):
- return self.token2index.get(query, 0)
- elif isinstance(query, (int)):
- return self.index2token.get(query, '<unk>')
- # 数组索引
- elif isinstance(query, (list, tuple)):
- return [self.__getitem__(item) for item in query]
-
- def __len__(self):
- return len(self.index2token)
这段代码定义了一个名为 Vocab
的类,用于构建词汇表。让我一步步解释:
__init__
方法:这是一个类的构造方法,在创建类的实例时被调用。它接受一个参数 tokens
,该参数是一个列表,其中包含了文本数据中的词语。
self.tokens = tokens
:将传入的 tokens
参数保存在类的实例变量 tokens
中,以便后续使用。
self.token2index
字典:这是一个字典,用于将词语映射到索引。初始包含了两个特殊的词 <bos>
和 <eos>
,分别表示句子的开始和结束,它们的索引分别为 0 和 1。
self.token2index.update(...)
:这一行代码更新了 self.token2index
字典,将词语和对应的索引添加进去。这里使用了列表推导式和 Counter
类来统计词频,并根据词频对词语进行排序,最终将词语和对应的索引添加到 self.token2index
中。索引从 2 开始,因为前面已经占用了 0 和 1。
self.index2token
字典:这是一个将索引映射回词语的字典,是 self.token2index
字典的反转。这样可以通过索引查找对应的词语。
总的来说,这段代码实现了一个词汇表类,可以根据输入的文本数据构建词汇表,并提供了词语到索引和索引到词语的映射关系。
这两个方法是 Python 类中的特殊方法,用于实现类的索引访问和获取长度。让我解释一下:
__getitem__(self, query)
方法:这个方法允许使用类似 obj[key]
的方式获取对象的元素。在这个类中,它用于实现从词语到索引和从索引到词语的映射。具体实现如下:
query
是一个字符串或整数,表示单个索引或词语。如果是字符串,则返回该词语在词汇表中的索引,如果词语不在词汇表中,则返回索引 0。如果是整数,则返回该索引对应的词语,如果索引不存在,则返回 <unk>
表示未知词语。query
是一个列表或元组,表示批量索引,返回一个列表,其中每个元素是对应索引或词语的结果。__len__(self)
方法:这个方法返回对象的长度,即词汇表中词语的数量(索引的数量),通过返回 len(self.index2token)
来实现。
这些方法使得 Vocab
类的实例可以像字典一样进行索引访问,并且可以使用 len()
函数获取词汇表的大小。
- from torch.utils.data import DataLoader, TensorDataset
-
- #实例化source和target词表
- src_vocab, tgt_vocab = Vocab(src_tokens), Vocab(tgt_tokens)
-
- #增加结尾标识<eos>
- src_data = torch.tensor([src_vocab[line + ['<eos>']] for line in src_tokens])
- tgt_data = torch.tensor([tgt_vocab[line + ['<eos>']] for line in tgt_tokens])
-
- # 训练集和测试集比例8比2,batch_size = 16
- train_size = int(len(src_data) * 0.8)
- test_size = len(src_data) - train_size
- batch_size = 16
-
- train_loader = DataLoader(TensorDataset(src_data[:train_size], tgt_data[:train_size]), batch_size=batch_size)
- test_loader = DataLoader(TensorDataset(src_data[-test_size:], tgt_data[-test_size:]), batch_size=1)
在这段代码中,DataLoader
是 PyTorch 中用于加载数据的类,用于创建一个可以迭代的数据加载器。让我解释一下传入参数的含义:
TensorDataset(src_data[:train_size], tgt_data[:train_size])
:TensorDataset
是 PyTorch 中的一个类,用于将张量数据组合成数据集。这里传入了两个张量 src_data[:train_size]
和 tgt_data[:train_size]
,它们分别表示训练数据的源数据和目标数据。src_data[:train_size]
表示从源数据中取前 train_size
个样本,tgt_data[:train_size]
表示从目标数据中取前 train_size
个样本。这样就创建了一个包含训练数据的数据集。
DataLoader(dataset, batch_size=batch_size)
:DataLoader
类用于将数据集加载到内存,并生成可迭代的批量数据。这里传入了一个数据集 TensorDataset(src_data[:train_size], tgt_data[:train_size])
,以及一个参数 batch_size
,表示批量大小,即每次迭代时返回的样本数。batch_size
参数可以自定义,通常根据训练模型的需要来设置。
src_data[-test_size:], tgt_data[-test_size:]
:类似地,这里使用了负索引来取测试数据集。src_data[-test_size:]
表示从源数据中取最后 test_size
个样本,tgt_data[-test_size:]
表示从目标数据中取最后 test_size
个样本。这样就创建了一个包含测试数据的数据集。
test_loader = DataLoader(..., batch_size=1)
:与训练数据不同,测试数据集的 batch_size
设置为 1,这表示每次迭代返回一个样本,这样在测试阶段可以逐个样本进行推断。
Embedding
若使用简单的独热编码:
- # 定义编码器
- class Encoder(nn.Module):
-
- def __init__(self, vocab_size, ebd_size, hidden_size, num_layers):
- super().__init__()
- self.embedding = nn.Embedding(vocab_size, ebd_size) # 将token表示为embedding
- self.gru = nn.GRU(ebd_size, hidden_size, num_layers=num_layers)
-
- def forward(self, encoder_inputs):
- # encoder_inputs从(batch_size, seq_len)变成(batch_size, seq_len, emb_size)再调整为(seq_len, batch_size, emb_size)
- encoder_inputs = self.embedding(encoder_inputs).permute(1, 0, 2)
- output, hidden = self.gru(encoder_inputs)
- # hidden 的形状为 (num_layers, batch_size, hidden_size)
- # 最后时刻的最后一个隐层的输出的隐状态即为上下文向量
- return hidden
-
- # 定义解码器
- class Decoder(nn.Module):
-
- def __init__(self, vocab_size, ebd_size, hidden_size, num_layers):
- super().__init__()
- self.embedding = nn.Embedding(vocab_size, ebd_size)
- # 拼接维度ebd_size + hidden_size
- self.gru = nn.GRU(ebd_size + hidden_size, hidden_size, num_layers=num_layers)
- self.linear = nn.Linear(hidden_size, vocab_size)
-
- def forward(self, decoder_inputs, encoder_states):
- '''
- decoder_inputs 为目标序列偏移一位的结果, 由初始形状: (batch_size, seq_len)变为(batch_size, seq_len)
- 再调整为(batch_size, seq_len, emb_size) -> (seq_len, batch_size, emb_size)
- '''
- decoder_inputs = self.embedding(decoder_inputs).permute(1, 0, 2)
- context = encoder_states[-1] # 上下文向量取编码器的最后一个隐层的输出
- # context 初始形状为 (batch_size, hidden_size),为下一步连接,需repeat为(seq_len, batch_size, hidden_size)形式
- context = context.repeat(decoder_inputs.shape[0], 1, 1)
- output, hidden = self.gru(torch.cat((decoder_inputs, context), -1), encoder_states)
- # logits 的形状为 (seq_len, batch_size, vocab_size)
- logits = self.linear(output)
- return logits, hidden
-
- # seq2seq模型
- class Seq2Seq(nn.Module):
-
- def __init__(self, encoder, decoder):
- super().__init__()
- self.encoder = encoder
- self.decoder = decoder
-
- def forward(self, encoder_inputs, decoder_inputs):
- return self.decoder(decoder_inputs, self.encoder(encoder_inputs))
- ebd= nn.Embedding(26, 26)
- ebd(train_loader.dataset[0][0])
tensor([[ 0.4887, -0.6628, -1.8760, -0.1039, -0.3671, -0.0545, 0.8259, 1.7120, 1.0536, 0.1105, -2.2157, 0.1826, -0.9814, 0.6896, 1.9313, -0.4203, 0.4704, -0.3540, -2.5149, 1.6691, 0.7668, -1.2259, -0.0838, -0.8457, -0.7388, 0.7919], [-0.3271, -0.9200, 1.4683, -1.0719, -0.9968, -0.5890, 0.0442, -0.4679, -0.6279, -0.7677, -1.8178, 0.0872, 0.6651, -1.2833, -0.5265, -0.2333, 0.1615, -0.1019, 0.6508, 0.3404, -1.2946, 0.1573, -0.7420, -2.0256, 1.6652, 0.7278], [-0.3090, -0.6615, -0.2852, -0.4307, -1.7267, -1.2491, 1.1952, -1.7489, 0.1471, 1.2763, -0.2151, -0.2278, 0.4850, 0.6540, 1.8243, 1.2390, 0.2089, -1.3072, 0.2947, 0.3472, 1.1848, -1.0279, -0.8024, 0.1165, -0.0939, 0.0841], [ 0.4887, -0.6628, -1.8760, -0.1039, -0.3671, -0.0545, 0.8259, 1.7120, 1.0536, 0.1105, -2.2157, 0.1826, -0.9814, 0.6896, 1.9313, -0.4203, 0.4704, -0.3540, -2.5149, 1.6691, 0.7668, -1.2259, -0.0838, -0.8457, -0.7388, 0.7919], [ 0.2450, 0.2942, -0.6478, -1.1449, 0.8370, 0.2286, 1.2670, -0.2990, -1.2017, 1.9527, 0.2588, -0.7598, 1.4220, -1.9788, -1.0234, 0.5749, -0.3605, -0.5907, 1.6407, 0.2505, -0.4432, -0.4119, 0.1512, -0.5205, -2.3585, 1.8493], [-0.6333, 0.7932, 0.2204, 0.8744, 1.3206, -1.3566, -1.3790, -1.0874, 1.1842, -0.2754, -0.7049, -1.1859, 0.9867, 1.7082, -0.3269, -0.6141, 0.4234, 0.2091, -0.7511, 1.0062, 0.3373, -0.3307, -1.2813, 0.2178, -0.3695, -0.1869], [ 1.6062, -0.9316, 0.7249, 0.1260, 1.2153, 0.7596, -1.4848, 0.4740, -0.1286, 0.7063, 0.9402, -0.0867, -0.2397, -1.2286, 2.3666, -1.9981, 0.4441, -0.3359, -2.6526, -1.9506, -0.4288, 0.7680, 1.0715, 0.0294, -0.0815, -1.4052]], grad_fn=<EmbeddingBackward0>)
- from tqdm import *
- import matplotlib.pyplot as plt
-
- # 设置超参数
- lr = 0.001
- num_epochs = 20
- hidden_size = 128
-
- # 建立模型
- encoder = Encoder(len(src_vocab), len(src_vocab), hidden_size, num_layers=2)
- decoder = Decoder(len(tgt_vocab), len(tgt_vocab), hidden_size, num_layers=2)
- model = Seq2Seq(encoder, decoder)
-
- # 交叉熵损失及adam优化器
- criterion = nn.CrossEntropyLoss(reduction='none')
- optimizer = torch.optim.Adam(model.parameters(), lr=lr)
-
- # 记录损失变化
- loss_history = []
-
- #开始训练
- model.train()
- for epoch in tqdm(range(num_epochs)):
- for encoder_inputs, decoder_targets in train_loader:
- encoder_inputs, decoder_targets = encoder_inputs, decoder_targets
- # 偏移一位作为decoder的输入
- # decoder的输入第一位是<bos>
- bos_column = torch.tensor([tgt_vocab['<bos>']] * decoder_targets.shape[0]).reshape(-1, 1)
- decoder_inputs = torch.cat((bos_column, decoder_targets[:, :-1]), dim=1)
- # pred的形状为 (seq_len, batch_size, vocab_size)
- pred, _ = model(encoder_inputs, decoder_inputs)
- # decoder_targets 的形状为 (batch_size, seq_len),我们需要改变pred的形状以保证它能够正确输入
- # loss 的形状为 (batch_size, seq_len),其中的每个元素都代表了一个词元的损失
- loss = criterion(pred.permute(1, 2, 0), decoder_targets).mean()
-
- # 反向传播
- optimizer.zero_grad()
- loss.backward()
- optimizer.step()
- loss_history.append(loss.item())
-
100%|██████████| 20/20 [03:35<00:00, 10.79s/it]
- plt.plot(loss_history)
- plt.ylabel('train loss')
- plt.show()
- model.eval()
- translation_results = []
-
- correct = 0
- error = 0
- # 因为batch_size是1,所以每次取出来的都是单个句子
- for src_seq, tgt_seq in test_loader:
- encoder_inputs = src_seq
- hidden = model.encoder(encoder_inputs)
- pred_seq = [tgt_vocab['<bos>']]
- for _ in range(8):
- # 一步步输出,decoder的输入的形状为(batch_size, seq_len)=(1,1)
- decoder_inputs = torch.tensor(pred_seq[-1]).reshape(1, 1)
- # pred形状为 (seq_len, batch_size, vocab_size) = (1, 1, vocab_size)
- pred, hidden = model.decoder(decoder_inputs, hidden)
- next_token_index = pred.squeeze().argmax().item()
- if next_token_index == tgt_vocab['<eos>']:
- break
- pred_seq.append(next_token_index)
-
- # 去掉开头的<bos>
- pred_seq = tgt_vocab[pred_seq[1:]]
- # 因为tgt_seq的形状为(1, seq_len),我们需要将其转化成(seq_len, )的形状
- tgt_seq = tgt_seq.squeeze().tolist()
-
- # 需要注意在<eos>之前截断
- if tgt_vocab['<eos>'] in tgt_seq:
- eos_idx = tgt_seq.index(tgt_vocab['<eos>'])
- tgt_seq = tgt_vocab[tgt_seq[:eos_idx]]
- else:
- tgt_seq = tgt_vocab[tgt_seq]
- translation_results.append((' '.join(tgt_seq), ' '.join(pred_seq)))
-
- for i in range(len(tgt_seq)):
- if i >= len(pred_seq) or pred_seq[i] != tgt_seq[i]:
- error += 1
- else:
- correct += 1
-
- print(correct/(correct+error))
0.507
translation_results
束搜索 (Beam Search) 是一种在自然语言处理和计算机视觉中都经常使用的搜索算法。它用于在大量可能的候选解中查找最优解。
束搜索通过保留一个有限数量的候选解来减少搜索空间的大小。
它的基本思想是,在每一步搜索中,保留当前状态的最优的几个候选解(称为束)。每次进行扩展时,都只考虑束中的候选解。这样,在后续的搜索过程中,束中的候选解会越来越接近最优解。
束搜索在 NLP 任务中被广泛使用,比如机器翻译、语音识别、语音合成等。
4.1、解码过程的挑战
解码器输出需要基于历史生成结果的条件概率。
4.2、贪心搜索
从词汇表V中获取条件概率最高的标记。
。
4.3、束搜索
Beam Search:保留多种可能序列。
举例说明:
假定一个Seq2Seq过程如上图所示,束搜索一般应用于Decoder过程的预测阶段。
假设词表大小为3,内容分别为a、b、c,为了便于理解,下面用一个概率图模型来表示它们之间的关系。
以上图为例,这里假设beam size为2,则具体流程为:
生成第1个词,从词表中选择概率最大的2个词。以图中的概率数据,那么当前的2个序列就是a和b。
生成第2个词,我们将当前序列a和b,分别与词表中的所有词进行组合,得到新的6个序列aa ab ac ba bb bc,计算每个序列的得分并选择得分最高2个列,作为新的当前序列,得到aa和ac。
生成第3个词,再次进行组合,得到6个序列aaa aab aac aca acb acc,再次计算序列得分,得到acb和aaa。
4.4、算法流程
适当总结一下示例中的过程,束搜索的算法流程如下:
初始化:设置束的大小 beam size,并将起始状态加入束中。
扩展:对束中每个状态进行扩展,生成所有可能的下一步状态。
评估:对于每个扩展出的状态,计算其可能性评分。
更新:将所有扩展出的状态按照评分从大到小排序,保留评分最大的 beam size 个状态。
重复步骤2-4,直到搜索到终止状态或者达到最大搜索步骤。
输出:束中的最优状态即为最终的最优解。
束搜索在每一步只考虑当前状态最优的几个候选解,这样可以大大减少搜索空间的大小,提高算法的执行效率。并且束搜索会不断更新束中的候选解,使得束中的解越来越接近最优解。
在确定束的大小合适时束搜索是一种很有效的算法,能很好的解决Seq2Seq等自然语言处理问题。束搜索其实就是一种贪心算法的扩展版,理论上设置 beam size=1 就是贪心算法,束搜索通过保留多个候选解来尽可能避免陷入局部最优。
4.4、改进策略
4.5、优缺点
束搜索主要起到以下作用:
防止爆炸式搜索:在使用解码器生成目标序列时,需要对所有可能的输出序列进行打分。如果直接穷举搜索所有可能的序列,将会有很高的时间和空间复杂度。束搜索通过只考虑当前搜索状态的最优几个候选解,来减少搜索空间大小。
提高生成质量:更加注重当前搜索状态的最优候选解,这样更可能生成更加合理的输出序列。
防止生成错误的序列:保留一个有限数量的候选解能更好地避免不符合语法或语义的序列被选中。同时束搜索会在每次扩展时选择具有最高可能性的解,最大限度减少了错误解的生成。
总结起来就是一句话,本质上就是在尽可能平衡搜索质量和效率之间的关系。
- import torch
- import torch.nn as nn
-
- # 直接读取
- with open('data/有英语-中文普通话对应句 - 2023-02-18.tsv', encoding='utf-8') as f:
- lines = f.readlines()
- print(lines[:5])
- # 只读取有效内容
- with open('data/有英语-中文普通话对应句 - 2023-02-18.tsv', encoding='utf-8') as f:
- data = []
- for line in f.readlines():
- data.append(line.strip().split('\t')[1]+'\t'+line.strip().split('\t')[3])
- print(data[:5])
- # 找出特殊字符
- import re
- import string
-
- content = ''.join(data)
- special_char = re.sub(r'[\u4e00-\u9fa5]', ' ', content) # 匹配中文,将中文替换掉
-
- print(set(special_char) - set(string.ascii_letters) - set(string.digits))
- def cleaning(data):
- for i in range(len(data)):
- # 替换特殊字符
- data[i] = data[i].replace('\u200b', '')
- data[i] = data[i].replace('\u200f', '')
- data[i] = data[i].replace('\xad', '')
- data[i] = data[i].replace('\u3000', ' ')
- eng_mark = [',', '.', '!', '?'] # 因为标点前加空格
- for mark in eng_mark:
- data[i] = data[i].replace(mark, ' '+mark)
- data[i] = data[i].lower() # 统一替换为小写
- return data
cleaning(data)
- def tokenize(data):
- # 分别存储源语言和目标语言的词元
- src_tokens, tgt_tokens = [], []
- for line in data:
- pair = line.split('\t')
- src = pair[0].split(' ')
- tgt = list(pair[1])
- src_tokens.append(src)
- tgt_tokens.append(tgt)
- return src_tokens, tgt_tokens
-
- src_tokens, tgt_tokens = tokenize(data)
- print("src:", src_tokens[:6])
- print("tgt:", tgt_tokens[:6])
- import numpy as np
- def statistics(tokens):
- max_len = 80 #只统计长度80以下的
- len_list = range(max_len) # 长度值
- freq_list = np.zeros(max_len) # 频率值
- for token in tokens:
- if len(token) < max_len:
- freq_list[len_list.index(len(token))] += 1
- return len_list, freq_list
-
- s1, s2 = statistics(src_tokens)
- t1, t2 = statistics(tgt_tokens)
- import matplotlib.pyplot as plt
- plt.plot(s1,s2)
- plt.plot(t1,t2)
[<matplotlib.lines.Line2D at 0x7fc43528fd90>]
- from collections import Counter #计数类
-
- flatten = lambda l: [item for sublist in l for item in sublist] #展平数组
-
- # 构建词表
- class Vocab:
- def __init__(self, tokens):
- self.tokens = tokens # 传入的tokens是二维列表
- self.token2index = {'<bos>': 0, '<eos>': 1, '<unk>':2, '<pad>':3} # 先存好特殊词元
- # 将词元按词频排序后生成列表
- self.token2index.update({
- token: index + 4
- for index, (token, freq) in enumerate(
- sorted(Counter(flatten(self.tokens)).items(), key=lambda x: x[1], reverse=True))
- })
- #构建id到词元字典
- self.index2token = {index: token for token, index in self.token2index.items()}
-
- def __getitem__(self, query):
- # 单一索引
- if isinstance(query, (str, int)):
- if isinstance(query, str):
- return self.token2index.get(query, 0)
- elif isinstance(query, (int)):
- return self.index2token.get(query, '<unk>')
- # 数组索引
- elif isinstance(query, (list, tuple)):
- return [self.__getitem__(item) for item in query]
-
- def __len__(self):
- return len(self.index2token)
- from torch.utils.data import DataLoader, TensorDataset
-
- seq_len = 48 # 序列最大长度
-
- # 对数据按照最大长度进行截断和填充
- def padding(tokens, seq_len):
- # 该函数针对单个句子进行处理
- # 传入的句子是词元形式
- return tokens[:seq_len] if len(tokens) > seq_len else tokens + ['<pad>'] * (seq_len - len(tokens))
-
- #实例化source和target词表
- src_vocab, tgt_vocab = Vocab(src_tokens), Vocab(tgt_tokens)
-
- #增加结尾标识<eos>
- src_data = torch.tensor([src_vocab[padding(line + ['<eos>'], seq_len)] for line in src_tokens])
- tgt_data = torch.tensor([tgt_vocab[padding(line + ['<eos>'], seq_len)] for line in tgt_tokens])
-
- # 训练集和测试集比例8比2,batch_size = 16
- train_size = int(len(src_data) * 0.8)
- test_size = len(src_data) - train_size
- batch_size = 256
-
- train_loader = DataLoader(TensorDataset(src_data[:train_size], tgt_data[:train_size]), batch_size=batch_size)
- test_loader = DataLoader(TensorDataset(src_data[-test_size:], tgt_data[-test_size:]), batch_size=1)
- # 定义编码器
- class Encoder(nn.Module):
-
- def __init__(self, vocab_size, ebd_size, hidden_size, num_layers):
- super().__init__()
- self.embedding = nn.Embedding(vocab_size, ebd_size, padding_idx=3) # 将token表示为embedding
- self.gru = nn.GRU(ebd_size, hidden_size, num_layers=num_layers)
-
- def forward(self, encoder_inputs):
- # encoder_inputs从(batch_size, seq_len)变成(batch_size, seq_len, emb_size)再调整为(seq_len, batch_size, emb_size)
- encoder_inputs = self.embedding(encoder_inputs).permute(1, 0, 2)
- output, hidden = self.gru(encoder_inputs)
- # hidden 的形状为 (num_layers, batch_size, hidden_size)
- # 最后时刻的最后一个隐层的输出的隐状态即为上下文向量
- return hidden
-
- # 定义解码器
- class Decoder(nn.Module):
-
- def __init__(self, vocab_size, ebd_size, hidden_size, num_layers):
- super().__init__()
- self.embedding = nn.Embedding(vocab_size, ebd_size, padding_idx=3)
- # 拼接维度ebd_size + hidden_size
- self.gru = nn.GRU(ebd_size + hidden_size, hidden_size, num_layers=num_layers)
- self.linear = nn.Linear(hidden_size, vocab_size)
-
- def forward(self, decoder_inputs, encoder_states):
- '''
- decoder_inputs 为目标序列偏移一位的结果, 由初始形状: (batch_size, seq_len)变为(batch_size, seq_len)
- 再调整为(batch_size, seq_len, emb_size) -> (seq_len, batch_size, emb_size)
- '''
- decoder_inputs = self.embedding(decoder_inputs).permute(1, 0, 2)
- context = encoder_states[-1] # 上下文向量取编码器的最后一个隐层的输出
- # context 初始形状为 (batch_size, hidden_size),为下一步连接,需repeat为(seq_len, batch_size, hidden_size)形式
- context = context.repeat(decoder_inputs.shape[0], 1, 1)
- output, hidden = self.gru(torch.cat((decoder_inputs, context), -1), encoder_states)
- # logits 的形状为 (seq_len, batch_size, vocab_size)
- logits = self.linear(output)
- return logits, hidden
-
- # seq2seq模型
- class Seq2Seq(nn.Module):
-
- def __init__(self, encoder, decoder):
- super().__init__()
- self.encoder = encoder
- self.decoder = decoder
-
- def forward(self, encoder_inputs, decoder_inputs):
- return self.decoder(decoder_inputs, self.encoder(encoder_inputs))
- from tqdm import *
- import matplotlib.pyplot as plt
-
- # 设置是否使用GPU
- device = 'cuda' if torch.cuda.is_available() else 'cpu'
-
- # 设置超参数
- lr = 0.001
- num_epochs = 50
- hidden_size = 256
-
- # 建立模型
- encoder = Encoder(len(src_vocab), len(src_vocab), hidden_size, num_layers=2)
- decoder = Decoder(len(tgt_vocab), len(tgt_vocab), hidden_size, num_layers=2)
- model = Seq2Seq(encoder, decoder)
- model.to(device)
-
- # 交叉熵损失及adam优化器
- criterion = nn.CrossEntropyLoss(reduction='none', ignore_index =3)
- optimizer = torch.optim.Adam(model.parameters(), lr=lr)
-
- # 记录损失变化
- loss_history = []
-
- #开始训练
- model.train()
- for epoch in tqdm(range(num_epochs)):
- for encoder_inputs, decoder_targets in train_loader:
- encoder_inputs, decoder_targets = encoder_inputs.to(device), decoder_targets.to(device)
- # 偏移一位作为decoder的输入
- # decoder的输入第一位是<bos>
- bos_column = torch.tensor([tgt_vocab['<bos>']] * decoder_targets.shape[0]).reshape(-1, 1).to(device)
- decoder_inputs = torch.cat((bos_column, decoder_targets[:, :-1]), dim=1)
- # pred的形状为 (seq_len, batch_size, vocab_size)
- pred, _ = model(encoder_inputs, decoder_inputs)
- # decoder_targets 的形状为 (batch_size, seq_len),我们需要改变pred的形状以保证它能够正确输入
- # loss 的形状为 (batch_size, seq_len),其中的每个元素都代表了一个词元的损失
- loss = criterion(pred.permute(1, 2, 0), decoder_targets).mean()
-
- # 反向传播
- optimizer.zero_grad()
- loss.backward()
- optimizer.step()
- loss_history.append(loss.item())
100%|██████████| 50/50 [1:36:32<00:00, 115.86s/it]
- plt.plot(loss_history)
- plt.ylabel('train loss')
- plt.show()
- # 保存模型
- torch.save(model.state_dict(), 'seq2seq_params.pt')
- import math
- # 计算bleu分数
- def bleu(label, pred, n):
- score = math.exp(min(0, 1 - len(label) / len(pred)))
- for k in range(1, n + 1):
- num_matches = 0
- hashtable = Counter([' '.join(label[i:i + k]) for i in range(len(label) - k + 1)])
- for i in range(len(pred) - k + 1):
- ngram = ' '.join(pred[i:i + k])
- if ngram in hashtable and hashtable[ngram] > 0:
- num_matches += 1
- hashtable[ngram] -= 1
- score *= pow(num_matches / (len(pred) - k + 1), pow(0.5, k))
- return score
- model.eval()
- translation_results = []
- bleu_scores = []
- # 因为batch_size是1,所以每次取出来的都是单个句子
- for src_seq, tgt_seq in test_loader:
- encoder_inputs = src_seq
- hidden = model.encoder(encoder_inputs.to(device))
- pred_seq = [tgt_vocab['<bos>']]
- for _ in range(8):
- # 一步步输出,decoder的输入的形状为(batch_size, seq_len)=(1,1)
- decoder_inputs = torch.tensor(pred_seq[-1]).reshape(1, 1).to(device)
- # pred形状为 (seq_len, batch_size, vocab_size) = (1, 1, vocab_size)
- pred, hidden = model.decoder(decoder_inputs, hidden)
- next_token_index = pred.squeeze().argmax().item()
- if next_token_index == tgt_vocab['<eos>']:
- break
- pred_seq.append(next_token_index)
-
- # 去掉开头的<bos>
- pred_seq = tgt_vocab[pred_seq[1:]]
- # 因为tgt_seq的形状为(1, seq_len),我们需要将其转化成(seq_len, )的形状
- tgt_seq = tgt_seq.squeeze().tolist()
-
- # 需要注意在<eos>之前截断
- if tgt_vocab['<eos>'] in tgt_seq:
- eos_idx = tgt_seq.index(tgt_vocab['<eos>'])
- tgt_seq = tgt_vocab[tgt_seq[:eos_idx]]
- else:
- tgt_seq = tgt_vocab[tgt_seq]
- translation_results.append((' '.join(tgt_seq), ' '.join(pred_seq)))
- bleu_scores.append(bleu(tgt_seq, pred_seq, n=2))
print(sum(bleu_scores) / test_size)
0.16821586571116853
translation_results
参考
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。