赞
踩
目录
网络上关于RNN循环神经网络的讲解,已经够多了。讲的比较好的,当然是吴恩达-深度学习-RNN了。吴师讲解了许多种的RNN模型,并且给出了RNN适用的领域,并且有相关的博客吴恩达-RNN记录。
在这篇博客中,依然关注RNN实战。
将对影评数据集进行情感分析,看看是否是正面的,即二分类问题。
有下面三点需注意:
· 数据集构建:影评数据集进行情感分析(分类任务)
· 词向量模型:加载训练好的词向量或者自己训练都可以
· 序列网络模型:训练RNN模型进行识别
第一步,导入库
- import os
- import warnings
- warnings.filterwarnings('ignore')
- import tensorflow as tf
- import numpy as np
- import pprint
- import logging
- import time
- from collections import Counter
- from pathlib import Path
- from tqdm import tqdm
第二步,加载并观察影评数据集
- #加载数据集
- (x_train,y_train),(x_test,y_test) = tf.keras.datasets.imdb.load_data()
-
- #数据集形状
- print( x_train.shape )
-
- #读进来的数据是已经转换成ID映射的,而一般的数据读进来都是词语,都需要手动转换成ID映射的
- print( x_train[0] ) # 部分运行结果: [ 1, 14, 22, 16,,,]
-
- #分类
- np.unique( y_train )
第三步,创建词与id 映射表
- # 加载单词对照的索引
- _word2idx = tf.keras.datasets.imdb.get_word_index()
-
- #将整体单词对应的索引都加 3 ,为下面三个字符腾出三个索引 0,1,2
- word2idx = {w: i+3 for w,i in _word2idx.items()}
-
- #空格对应 0
- word2idx['<pad>'] = 0
- #每个影评开始对应 1
- word2idx['<start>'] = 1
- #在加载的词汇表中找不到影评中的某些字符,都对应 2
- word2idx['unk'] = 2
-
- #创建 id与单词映射表
- idx2word = {i:w for w,i in word2idx.items()}
第四步,按文本长度大小进行排序
- def sort_by_len(x,y):
- x, y = np.asarray(x), np.asarray(y)
- idx = sorted(range(len(x)),key=lambda i:len(x[i]))
- return x[idx],y[idx]
第五步,将中间结果保存到本地,万一程序崩了还得重玩,保存的是文本数据,不是ID
- #对数据重新排序
- x_train, y_train = sort_by_len(x_train,y_train)
- x_test, y_test = sort_by_len(x_test,y_test)
-
- def write_file(f_path,xs,ys):
- with open(f_path,'w',encoding='utf-8') as f:
- for x, y in zip(xs,ys):
- f.write(str(y)+'\t'+' '.join([idx2word[i] for i in x][1:]) +'\n') #从 1:开始,是因为前面有个start
-
- write_file('./datasets/RNN-imdb/train.txt',x_train,y_train)
- write_file('./datasets/RNN-imdb/test.txt',x_test,y_test)
第六步、构建语料表,基于词频进行统计
- counter = Counter()
- with open('./datasets/RNN-imdb/train.txt',encoding='utf-8') as f:
- for line in f:
- line = line.rstrip()
- label, words = line.split('\t')
- words = words.split(' ')
- counter.update(words)
-
- words = ['<pad>'] + [w for w, freq in counter.most_common() if freq>=10]
- print('Vocab Size:',len(words))
-
- Path('./datasets/vocab').mkdir(exist_ok=True)
-
- with open('./datasets/vocab/word.txt','w',encoding='utf-8') as f:
- for w in words:
- f.write(w+'\n')
第七步,得到新的word2id映射表
- word2idx = {}
- with open('./datasets/vocab/word.txt',encoding='utf-8') as f:
- for i,line in enumerate(f):
- line = line.rstrip()
- word2idx[line] = i
每一个单词,都对应一个向量,表明这个单词的一些特征,这些单词所对应的向量组成的矩阵也叫做词嵌入矩阵,即embedding。
对于embedding,可以基于网络来训练,也可以直接加载到别人训练好的,一般都是加载预训练模型。这里有一些常用的:https://nlp.stanford.edu/projects/glove
下面对已经训练好的词向量进行导入,创建embedding
- #做一个大表,里面有20598个不同的词,【20599*50】
- embedding = np.zeros((len(word2idx)+1,50)) # +1 表示如果不在语料库中,就都是unknow
-
- with open('./datasets/glove.6B/glove.6B.50d.txt',encoding='utf-8') as f: # 下载好的
- count = 0
- for i, line in enumerate(f):
- if i % 100000 == 0:
- print(f'- At line {i}') # 打印处理了多少数据
- line = line.rstrip()
- sp = line.split(' ')
- word, vec = sp[0], sp[1:]
- if word in word2idx:
- count += 1
- embedding[word2idx[word]] = np.asarray(vec,dtype='float32') #将词转换成对应的向量
可以对embedding进行保存
- print(f"{count} / {len(word2idx)} words have found pre-trained values")
- np.save("./datasets/vocab/word.npy",embedding)
- print("Save ./datasets/vocab/word.npy")
- # 得到文本对应的id
- def data_generator(f_path,params):
- with open(f_path,encoding='utf-8') as f:
- print("Reading",f_path)
- for line in f:
- line = line.rstrip()
- label, text = line.split('\t')
- text = text.split(' ')
- x = [params['word2idx'].get(w, len(word2idx)) for w in text] #得到当前词所对应的ID
- if len(x) >= params['max_len']: # 截断操作
- x = x[:params['max_len']]
- else:
- x += [0] * (params['max_len'] - len(x)) #补齐操作
- y = int(label)
- yield x, y
-
- #得到数据集
- def dataset(is_training, params):
- #is_training代表是训练数据还是测试数据
- _shapes = ([params['max_len']],())
- _types = (tf.int32, tf.int32)
-
- if is_training:
- ds = tf.data.Dataset.from_generator(
- lambda: data_generator(params['train_path'],params),
- output_shapes = _shapes,
- output_types = _types
- )
- ds = ds.shuffle(params['num_samples'])
- ds = ds.batch(params['batch_size'])
- ds = ds.prefetch(tf.data.experimental.AUTOTUNE) #设置缓存队列,根据可用的CPU动态设置并行调用的数量,说白了就是加速
- else:
- ds = tf.data.Dataset.from_generator(
- lambda: data_generator(params['test_path'], params), # 加lambda,是为了防止函数data_gen直接执行
- output_shapes = _shapes,
- output_types = _types,
- )
- ds = ds.batch(params['batch_size'])
- ds = ds.prefetch(tf.data.experimental.AUTOTUNE)
-
- return ds
首先,了解一下embedding_lookup的作用
再来看看如何进行分类的:
双向RNN:
建立模型一:
- class Model1(tf.keras.Model):
- def __init__(self, params):
- super().__init__()
-
- self.embedding = tf.Variable(np.load('./datasets/vocab/word.npy'),
- dtype = tf.float32,
- name = 'pretrained_embedding',
- trainable = False
- )
- self.drop1 = tf.keras.layers.Dropout(params['dropout_rate'])
- self.drop2 = tf.keras.layers.Dropout(params['dropout_rate'])
- self.drop3 = tf.keras.layers.Dropout(params['dropout_rate'])
-
- self.rnn1 = tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(params['rnn_units'],return_sequences=True))
- self.rnn2 = tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(params['rnn_units'],return_sequences=True))
- self.rnn3 = tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(params['rnn_units'],return_sequences=False))
-
- self.drop_fc = tf.keras.layers.Dropout(params['dropout_rate'])
- self.fc = tf.keras.layers.Dense(2*params['rnn_units'],tf.nn.elu)
-
- self.out_linear = tf.keras.layers.Dense(2)
-
- def call(self, inputs, training=False):
- if inputs.dtype != tf.int32:
- inputs = tf.cast(inputs, tf.int32)
- batch_sz = tf.shape(inputs)[0]
- run_units = 2*params['rnn_units']
-
- # inputs形状 是 batch * maxlen,即多少个样本为一组,每个样本的长度固定为最大长度
- x = tf.nn.embedding_lookup(self.embedding, inputs)
- # embedding 之后,多了50维词向量,即x为 batch * maxlen * 50
-
- x = self.drop1(x, training=training)
- x = self.rnn1(x)
-
- x = self.drop2(x, training=training)
- x = self.rnn2(x)
-
- x = self.drop3(x, training=training)
- x = self.rnn3(x)
-
- x = self.drop_fc(x, training=training)
- x = self.fc(x)
-
- x = self.out_linear(x)
-
- return x
建立模型二:
- class Model2(tf.keras.Model):
- def __init__(self, params):
- super().__init__()
-
- self.embedding = tf.Variable(np.load('./datasets/vocab/word.npy'),
- dtype = tf.float32,
- name = 'pretrained_embedding',
- trainable = False
- )
- self.drop1 = tf.keras.layers.Dropout(params['dropout_rate'])
- self.drop2 = tf.keras.layers.Dropout(params['dropout_rate'])
- self.drop3 = tf.keras.layers.Dropout(params['dropout_rate'])
-
- self.rnn1 = tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(params['rnn_units'],return_sequences=True))
- self.rnn2 = tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(params['rnn_units'],return_sequences=True))
- self.rnn3 = tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(params['rnn_units'],return_sequences=True))
-
- self.drop_fc = tf.keras.layers.Dropout(params['dropout_rate'])
- self.fc = tf.keras.layers.Dense(2*params['rnn_units'],tf.nn.elu)
-
- self.out_linear = tf.keras.layers.Dense(2)
-
- def call(self, inputs, training=False):
- if inputs.dtype != tf.int32:
- inputs = tf.cast(inputs, tf.int32)
- batch_sz = tf.shape(inputs)[0]
- run_units = 2*params['rnn_units']
-
- # inputs形状 是 batch * maxlen,即多少个样本为一组,每个样本的长度固定为最大长度
- x = tf.nn.embedding_lookup(self.embedding, inputs)
- # embedding 之后,多了50维词向量,即x为 batch * maxlen * 50
-
- x = tf.reshape(x, (batch_sz*10*10, 10, 50))
- x = self.drop1(x, training=training)
- x = self.rnn1(x)
- x = tf.reduce_max(x,1)
-
- x = tf.reshape(x, (batch_sz*10, 10, run_units))
- x = self.drop2(x, training=training)
- x = self.rnn2(x)
- x = tf.reduce_max(x,1)
-
- x = tf.reshape(x, (batch_sz, 10, run_units))
- x = self.drop3(x, training=training)
- x = self.rnn3(x)
- x = tf.reduce_max(x,1)
-
- x = self.drop_fc(x, training=training)
- x = self.fc(x)
-
- x = self.out_linear(x)
-
- return x
设置参数
- params = {
- 'vocab_path': './datasets/vocab/word.txt',
- 'train_path': './datasets/RNN-imdb/train.txt',
- 'test_path': './datasets/RNN-imdb/test.txt',
- 'num_samples': 25000,
- 'num_labels': 2,
- 'batch_size': 32,
- 'max_len': 1000,
- 'rnn_units': 200,
- 'dropout_rate': 0.2,
- 'clip_norm': 10., # 防止梯度过大
- 'num_patience': 3, # 没有下降的最大次数
- 'lr': 3e-4 # 学习率
- }
- #用来判断进行提前停止
- def is_descending(history: list):
- # 若经过 num_patience 次梯度下降,损失值或者准确率没有优化,反而下降
- # 则返回 true ,停止迭代
- history = history[-(params['num_patience']+1):]
- for i in range(1, len(history)):
- if history[i-1] <= history[i]:
- return False
- return True
- word2idx = {}
- with open(params['vocab_path'],encoding='utf-8') as f:
- for i, line in enumerate(f):
- line = line.rstrip()
- word2idx[line] = i
- params['word2idx'] = word2idx
- params['vocab_size'] = len(word2idx) + 1
-
- model = Model2(params)
- model.build(input_shape=(None,None))#设置输入的大小,或者fit时候也能自动找到
- # pprint.pprint([(v.name,v.shape) for v in model.trainable_variables])
-
- #链接:https://tensorflow.google.cn/api_docs/python/tf/keras/optimizers/schedules/ExponentialDecay?version=stable
- # return initial_learning_rate * decay_rate ^ (step / decay_steps)
- decay_lr = tf.optimizers.schedules.ExponentialDecay(params['lr'],1000,0.95) # 加一个指数衰减函数
- optim = tf.optimizers.Adam(params['lr'])
- global_step = 0
-
- history_acc = []
- best_acc = 0
-
- t0 = time.time()
- logger = logging.getLogger('tensorflow')
- logger.setLevel(logging.INFO)
- while True:
- #训练模型
- for texts, labels in dataset(is_training=True,params=params):
- with tf.GradientTape() as tape: #梯度带,记录所有在上文中的操作,并且通过调用 .gradient() 获得任何上下文中计算得出的张量的梯度
- logits = model(texts, training=True)
- loss = tf.nn.sparse_softmax_cross_entropy_with_logits(labels=labels,logits=logits)
- loss = tf.reduce_mean(loss)
-
- optim.lr.assign(decay_lr(global_step))
- grads = tape.gradient(loss, model.trainable_variables)
- grads, _ = tf.clip_by_global_norm(grads, params['clip_norm']) #将梯度限制一下,有的时候会更新太猛,防止过拟合
- optim.apply_gradients(zip(grads, model.trainable_variables)) #更新梯度
-
- if global_step % 50 == 0:
- logger.info(f"Step {global_step} | Loss: {loss.numpy().item():.4f} | \
- Spent: {time.time() - t0:.1f} secs | \
- LR: {optim.lr.numpy().item():.6f} ")
- t0 = time.time()
- global_step += 1
-
- # 验证集效果
- m = tf.keras.metrics.Accuracy()
-
- for texts, labels in dataset(is_training=False,params=params):
- logits = model(texts,training=False)
- y_pred = tf.argmax(logits, axis=-1)
- m.update_state(y_true=labels, y_pred=y_pred)
-
- acc = m.result().numpy()
- logger.info(f"Evaluation: Testing Accuracy: {acc:.3f}")
- history_acc.append(acc)
-
- if acc > best_acc:
- best_acc = acc
- logger.info(f"Best Accuracy: {best_acc:.3f}")
-
- if len(history_acc) > params['num_patience'] and is_descending(history_acc):
- logger.info(f"Test Accuracy not improved over {params['num_patience']} epochs ,Early stop")
- break
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。