赞
踩
递归神经网络(Recursive Neural Network)是一种神经网络架构,用于处理树状或递归结构的数据。与传统的前馈神经网络(Feedforward Neural Network)不同,递归神经网络具有反馈连接,使其能够在网络内传递信息并处理树状结构数据。RNN 可以在不同层级上组合信息,使其适用于各种具有递归性质的数据,如自然语言语法树、分子结构、计算机程序等。
递归神经网络的主要特点如下所示:
递归神经网络在自然语言处理中用于语法分析、文本分类、情感分析等任务。此外,递归神经网络也在生物信息学、计算机程序分析和其他领域中有广泛的应用,因为它可以处理具有递归性质的数据结构。需要注意的是,RNN 有一些限制,如梯度消失问题,因此在某些情况下,更高级的架构如长短时记忆网络(LSTM)和门控循环单元(GRU)可能更适合。
RvNN是一种神经网络架构,代表 "Recursive Variational Neural Network" 或 "Recurrent Variational Neural Network",取决于上下文。这是一种结合了递归(或循环)结构和变分自编码器(Variational Autoencoder,VAE)的神经网络,用于处理序列数据。RvNN 的主要特点如下所示:
RvNN是一个复杂的神经网络架构,通常由深度学习研究人员和自然语言处理领域的专家用于特定的任务。它的应用领域包括自然语言处理、句法分析、文本生成、机器翻译等需要处理序列结构数据的任务。根据具体的应用和研究领域,RvNN 可以具有不同的变种和结构。
实例4-7:综合实战:创建RvNN模型并训练(源码路径:daima\4\Continuous-RvNN-main)
(1)编写文件Continuous-RvNN-main/inference/preprocess/process_MNLI.py,将自然语言推理的数据集进行预处理,以便后续可以在深度学习模型中使用。具体实现代码如下所示。
- from preprocess_tools.process_utils import load_glove, jsonl_save
-
- SEED = 101
- MAX_VOCAB = 50000
- MIN_FREQ = 1
- WORDVECDIM = 300
- dev_keys = ["matched"]
- test_keys = ["matched", "mismatched"]
- predi_keys = ["matched", "mismatched"]
- np.random.seed(SEED)
- random.seed(SEED)
-
- train_path1 = Path('../data/NLI_data/MNLI/multinli_1.0_train.jsonl')
- train_path2 = Path('../data/NLI_data/SNLI/snli_1.0_train.jsonl')
- dev_path = {}
- dev_path["matched"] = Path('../data/NLI_data/MNLI/multinli_1.0_dev_matched.jsonl')
- dev_path["mismatched"] = Path('../data/NLI_data/MNLI/multinli_1.0_dev_mismatched.jsonl')
- test_path = {}
- test_path["matched"] = Path('../data/NLI_data/MNLI/multinli_1.0_dev_matched.jsonl')
- test_path["mismatched"] = Path('../data/NLI_data/MNLI/multinli_1.0_dev_mismatched.jsonl')
- predi_path = {}
- predi_path["matched"] = Path('../data/NLI_data/MNLI/multinli_0.9_test_matched_unlabeled.jsonl')
- predi_path["mismatched"] = Path('../data/NLI_data/MNLI/multinli_0.9_test_mismatched_unlabeled.jsonl')
- predi2_path = {}
- predi2_path["matched"] = Path(
- '../data/NLI_data/MNLI/multinli_1.0_dev_matched.jsonl') # Path('../../data/NLI_data/MNLI/multinli_0.9_test_matched_unlabeled.jsonl')
- predi2_path["mismatched"] = Path(
- '../data/NLI_data/MNLI/multinli_1.0_dev_mismatched.jsonl') # Path('../../data/NLI_data/MNLI/multinli_0.9_test_mismatched_unlabeled.jsonl')
-
- embedding_path = Path("../embeddings/glove/glove.840B.300d.txt")
-
- Path('processed_data/').mkdir(parents=True, exist_ok=True)
-
- train_save_path = Path('processed_data/MNLI_train.jsonl')
- dev_save_path = {}
- for key in dev_keys:
- dev_save_path[key] = Path('processed_data/MNLI_dev_{}.jsonl'.format(key))
- test_save_path = {}
- for key in test_keys:
- test_save_path[key] = Path('processed_data/MNLI_test_{}.jsonl'.format(key))
- predi_save_path = {}
- predi2_save_path = {}
- for key in predi_keys:
- predi_save_path[key] = Path('processed_data/MNLI_predi_{}.jsonl'.format(key))
- predi2_save_path[key] = Path('processed_data/MNLI_predi2_{}.jsonl'.format(key))
- metadata_save_path = fspath(Path("processed_data/MNLI_metadata.pkl"))
-
- labels2idx = {}
- vocab2count = {}
-
-
- def tokenize(sentence):
- return nltk.word_tokenize(sentence)
-
-
- def updateVocab(word):
- global vocab2count
- vocab2count[word] = vocab2count.get(word, 0) + 1
-
-
- def process_data(filename, update_vocab=True, filter=False, predi=False):
- global labels2idx
-
- print("\n\nOpening directory: {}\n\n".format(filename))
-
- sequences1 = []
- sequences2 = []
- pairIDs = []
- labels = []
- count = 0
- max_seq_len = 150
-
- with jsonlines.open(filename) as reader:
- for sample in reader:
- if sample['gold_label'] != '-':
-
- sequence1 = tokenize(sample['sentence1'].lower())
- sequence2 = tokenize(sample['sentence2'].lower())
- pairID = sample["pairID"]
- if predi:
- label = None
- label_id = None
- else:
- label = sample['gold_label']
- if label not in labels2idx:
- labels2idx[label] = len(labels2idx)
- label_id = labels2idx[label]
-
- if filter:
- if (len(sequence1) < max_seq_len) and (len(sequence2) < max_seq_len):
- sequences1.append(sequence1)
- sequences2.append(sequence2)
- labels.append(label_id)
- pairIDs.append(pairID)
- else:
- sequences1.append(sequence1)
- sequences2.append(sequence2)
- labels.append(label_id)
- pairIDs.append(pairID)
-
- if update_vocab:
- for word in sequence1:
- updateVocab(word)
-
- for word in sequence2:
- updateVocab(word)
-
- count += 1
-
- if count % 1000 == 0:
- print("Processing Data # {}...".format(count))
-
- return sequences1, sequences2, labels, pairIDs
-
-
- train_sequences1, \
- train_sequences2, \
- train_labels, _ = process_data(train_path1, filter=True)
-
- train_sequences1_, \
- train_sequences2_, \
- train_labels_, _ = process_data(train_path2, filter=True)
-
- train_sequences1 += train_sequences1_
- train_sequences2 += train_sequences2_
- train_labels += train_labels_
-
- dev_sequences1 = {}
- dev_sequences2 = {}
- dev_labels = {}
-
- for key in dev_keys:
- dev_sequences1[key], \
- dev_sequences2[key], \
- dev_labels[key], _ = process_data(dev_path[key], update_vocab=True)
-
- test_sequences1 = {}
- test_sequences2 = {}
- test_labels = {}
-
- for key in test_keys:
- test_sequences1[key], \
- test_sequences2[key], \
- test_labels[key], _ = process_data(test_path[key], update_vocab=True)
-
- predi_sequences1 = {}
- predi_sequences2 = {}
- predi_labels = {}
- predi_pairIDs = {}
-
- for key in predi_keys:
- predi_sequences1[key], \
- predi_sequences2[key], \
- predi_labels[key], predi_pairIDs[key] = process_data(predi_path[key], update_vocab=True)
-
- predi2_sequences1 = {}
- predi2_sequences2 = {}
- predi2_labels = {}
- predi2_pairIDs = {}
-
- for key in predi_keys:
- predi2_sequences1[key], \
- predi2_sequences2[key], \
- predi2_labels[key], predi2_pairIDs[key] = process_data(predi2_path[key], update_vocab=False)
-
- counts = []
- vocab = []
- for word, count in vocab2count.items():
- if count > MIN_FREQ:
- vocab.append(word)
- counts.append(count)
-
- vocab2embed = load_glove(embedding_path, vocab=vocab2count, dim=WORDVECDIM)
-
- sorted_idx = np.flip(np.argsort(counts), axis=0)
- vocab = [vocab[id] for id in sorted_idx if vocab[id] in vocab2embed]
- if len(vocab) > MAX_VOCAB:
- vocab = vocab[0:MAX_VOCAB]
-
- vocab += ["<PAD>", "<UNK>", "<SEP>"]
-
- print(vocab)
-
- vocab2idx = {word: id for id, word in enumerate(vocab)}
-
- vocab2embed["<PAD>"] = np.zeros((WORDVECDIM), np.float32)
- b = math.sqrt(3 / WORDVECDIM)
- vocab2embed["<UNK>"] = np.random.uniform(-b, +b, WORDVECDIM)
- vocab2embed["<SEP>"] = np.random.uniform(-b, +b, WORDVECDIM)
-
- embeddings = []
- for id, word in enumerate(vocab):
- embeddings.append(vocab2embed[word])
-
-
- def text_vectorize(text):
- return [vocab2idx.get(word, vocab2idx['<UNK>']) for word in text]
-
-
- def vectorize_data(sequences1, sequences2, labels, pairIDs=None):
- data_dict = {}
- sequences1_vec = [text_vectorize(sequence) for sequence in sequences1]
- sequences2_vec = [text_vectorize(sequence) for sequence in sequences2]
- data_dict["sequence1"] = sequences1
- data_dict["sequence2"] = sequences2
- sequences_vec = [sequence1 + [vocab2idx["<SEP>"]] + sequence2 for sequence1, sequence2 in
- zip(sequences1_vec, sequences2_vec)]
- data_dict["sequence1_vec"] = sequences1_vec
- data_dict["sequence2_vec"] = sequences2_vec
- data_dict["sequence_vec"] = sequences_vec
- data_dict["label"] = labels
- if pairIDs is not None:
- data_dict["pairID"] = pairIDs
- print(data_dict["pairID"])
- return data_dict
-
-
- train_data = vectorize_data(train_sequences1, train_sequences2, train_labels)
- """
- for item in train_data["sequence1"]:
- print(item)
- print("\n\n")
- """
- dev_data = {}
- for key in dev_keys:
- dev_data[key] = vectorize_data(dev_sequences1[key], dev_sequences2[key], dev_labels[key])
- test_data = {}
- for key in test_keys:
- test_data[key] = vectorize_data(test_sequences1[key], test_sequences2[key], test_labels[key])
-
- predi_data = {}
- for key in predi_keys:
- predi_data[key] = vectorize_data(predi_sequences1[key], predi_sequences2[key], predi_labels[key],
- predi_pairIDs[key])
-
- predi2_data = {}
- for key in predi_keys:
- predi2_data[key] = vectorize_data(predi2_sequences1[key], predi2_sequences2[key], predi2_labels[key],
- predi2_pairIDs[key])
-
- jsonl_save(filepath=train_save_path,
- data_dict=train_data)
-
- for key in dev_keys:
- jsonl_save(filepath=dev_save_path[key],
- data_dict=dev_data[key])
-
- for key in test_keys:
- jsonl_save(filepath=test_save_path[key],
- data_dict=test_data[key])
-
- for key in predi_keys:
- jsonl_save(filepath=predi_save_path[key],
- data_dict=predi_data[key])
- jsonl_save(filepath=predi2_save_path[key],
- data_dict=predi2_data[key])
-
-
- metadata = {"labels2idx": labels2idx,
- "vocab2idx": vocab2idx,
- "embeddings": np.asarray(embeddings, np.float32),
- "dev_keys": dev_keys,
- "test_keys": test_keys}
-
- with open(metadata_save_path, 'wb') as outfile:
- pickle.dump(metadata, outfile)
上述代码用于处理自然语言推理(NLI)数据集的预处理工作,具体实现流程如下所示:
(2)编写文件Continuous-RvNN-main/classifier/models/Classifier_model.py,功能是使用神经网络结构定义一个实现文本分类的 PyTorch 模型。这个模型是一个文本分类器,可以用于对文本进行分类任务。模型的结构包括了嵌入层、编码器、特征提取和分类器。该模型的具体配置和超参数可以在 config 中指定,包括输入和输出的维度、嵌入的维度、隐藏层的大小等。文件Classifier_model.py的具体实现代码如下所示。
- import torch as T
- import torch.nn as nn
- import torch.nn.functional as F
-
- from controllers.encoder_controller import encoder
- from models.layers import Linear
- from models.utils import gelu
- from models.utils import glorot_uniform_init
-
- class Classifier_model(nn.Module):
- def __init__(self, attributes, config):
-
- super(Classifier_model, self).__init__()
-
- self.config = config
- self.out_dropout = config["out_dropout"]
- self.classes_num = attributes["classes_num"]
- self.in_dropout = config["in_dropout"]
- embedding_data = attributes["embedding_data"]
- pad_id = attributes["PAD_id"]
-
-
- ATT_PAD = -999999
- self.ATT_PAD = T.tensor(ATT_PAD).float()
- self.zeros = T.tensor(0.0)
-
- if embedding_data is not None:
- embedding_data = T.tensor(embedding_data)
- self.word_embedding = nn.Embedding.from_pretrained(embedding_data,
- freeze=config["word_embd_freeze"],
- padding_idx=pad_id)
- else:
- vocab_len = attributes["vocab_len"]
- self.word_embedding = nn.Embedding(vocab_len, config["embd_dim"],
- padding_idx=pad_id)
-
- self.embd_dim = self.word_embedding.weight.size(-1)
- self.transform_word_dim = Linear(self.embd_dim, config["hidden_size"])
-
- if not config["global_state_return"]:
- self.attn_linear1 = Linear(config["hidden_size"], config["hidden_size"])
- self.attn_linear2 = Linear(config["hidden_size"], config["hidden_size"])
-
- self.encoder = encoder(config)
-
- if config["classifier_layer_num"] == 2:
- self.prediction1 = Linear(config["hidden_size"], config["hidden_size"])
- self.prediction2 = Linear(config["hidden_size"], self.classes_num)
- else:
- self.prediction2 = Linear(config["hidden_size"], self.classes_num)
-
-
- # %%
-
- def embed(self, sequence, input_mask):
-
- N, S = sequence.size()
-
- sequence = self.word_embedding(sequence)
- sequence = self.transform_word_dim(sequence)
-
- sequence = sequence * input_mask.view(N, S, 1)
-
- return sequence, input_mask
-
- def extract_features(self, sequence, mask):
- N, S, D = sequence.size()
-
- mask = mask.view(N, S, 1)
-
- attention_mask = T.where(mask == 0,
- self.ATT_PAD.to(mask.device),
- self.zeros.to(mask.device))
-
- assert attention_mask.size() == (N, S, 1)
-
- energy = self.attn_linear2(gelu(self.attn_linear1(sequence)))
-
- assert energy.size() == (N, S, D)
-
- attention = F.softmax(energy + attention_mask, dim=1)
-
- assert attention.size() == (N, S, D)
-
- z = T.sum(attention * sequence, dim=1)
-
- assert z.size() == (N, D)
-
- return z
-
- # %%
- def forward(self, batch):
-
- sequence = batch["sequences_vec"]
- input_mask = batch["input_masks"]
-
- N = sequence.size(0)
-
- # EMBEDDING BLOCK
- sequence, input_mask = self.embed(sequence, input_mask)
- sequence = F.dropout(sequence, p=self.in_dropout, training=self.training)
-
- # ENCODER BLOCK
- sequence_dict = self.encoder(sequence, input_mask)
- sequence = sequence_dict["sequence"]
-
- penalty = None
- if "penalty" in sequence_dict:
- penalty = sequence_dict["penalty"]
-
- if self.config["global_state_return"]:
- feats = sequence_dict["global_state"]
- else:
- feats = self.extract_features(sequence, input_mask)
-
- if self.config["classifier_layer_num"] == 2:
- feats = F.dropout(feats, p=self.out_dropout, training=self.training)
- feats = gelu(self.prediction1(feats))
- feats = F.dropout(feats, p=self.out_dropout, training=self.training)
- logits = self.prediction2(feats)
-
- assert logits.size() == (N, self.classes_num)
-
- return {"logits": logits, "penalty": penalty}
对上述代码的具体说明如下所示:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。