当前位置:   article > 正文

RNN-文本分类任务实战_rnn文本分类

rnn文本分类

目录

一、了解

二、实战--映射表

三、实战--embedding词嵌入矩阵

四、实战--构建训练数据

五、实战--自定义网络模型

六、实战--训练


一、了解

网络上关于RNN循环神经网络的讲解,已经够多了。讲的比较好的,当然是吴恩达-深度学习-RNN了。吴师讲解了许多种的RNN模型,并且给出了RNN适用的领域,并且有相关的博客吴恩达-RNN记录。

在这篇博客中,依然关注RNN实战。

将对影评数据集进行情感分析,看看是否是正面的,即二分类问题。

有下面三点需注意:

· 数据集构建:影评数据集进行情感分析(分类任务)

· 词向量模型:加载训练好的词向量或者自己训练都可以

· 序列网络模型:训练RNN模型进行识别

  

二、实战--映射表

第一步,导入库

  1. import os
  2. import warnings
  3. warnings.filterwarnings('ignore')
  4. import tensorflow as tf
  5. import numpy as np
  6. import pprint
  7. import logging
  8. import time
  9. from collections import Counter
  10. from pathlib import Path
  11. from tqdm import tqdm

第二步,加载并观察影评数据集

  1. #加载数据集
  2. (x_train,y_train),(x_test,y_test) = tf.keras.datasets.imdb.load_data()
  3. #数据集形状
  4. print( x_train.shape )
  5. #读进来的数据是已经转换成ID映射的,而一般的数据读进来都是词语,都需要手动转换成ID映射的
  6. print( x_train[0] ) # 部分运行结果: [ 1, 14, 22, 16,,,]
  7. #分类
  8. np.unique( y_train )

第三步,创建词与id 映射表

  1. # 加载单词对照的索引
  2. _word2idx = tf.keras.datasets.imdb.get_word_index()
  3. #将整体单词对应的索引都加 3 ,为下面三个字符腾出三个索引 012
  4. word2idx = {w: i+3 for w,i in _word2idx.items()}
  5. #空格对应 0
  6. word2idx['<pad>'] = 0
  7. #每个影评开始对应 1
  8. word2idx['<start>'] = 1
  9. #在加载的词汇表中找不到影评中的某些字符,都对应 2
  10. word2idx['unk'] = 2
  11. #创建 id与单词映射表
  12. idx2word = {i:w for w,i in word2idx.items()}

第四步,按文本长度大小进行排序

  1. def sort_by_len(x,y):
  2. x, y = np.asarray(x), np.asarray(y)
  3. idx = sorted(range(len(x)),key=lambda i:len(x[i]))
  4. return x[idx],y[idx]

第五步,将中间结果保存到本地,万一程序崩了还得重玩,保存的是文本数据,不是ID

  1. #对数据重新排序
  2. x_train, y_train = sort_by_len(x_train,y_train)
  3. x_test, y_test = sort_by_len(x_test,y_test)
  4. def write_file(f_path,xs,ys):
  5. with open(f_path,'w',encoding='utf-8') as f:
  6. for x, y in zip(xs,ys):
  7. f.write(str(y)+'\t'+' '.join([idx2word[i] for i in x][1:]) +'\n') #从 1:开始,是因为前面有个start
  8. write_file('./datasets/RNN-imdb/train.txt',x_train,y_train)
  9. write_file('./datasets/RNN-imdb/test.txt',x_test,y_test)

第六步、构建语料表,基于词频进行统计

  1. counter = Counter()
  2. with open('./datasets/RNN-imdb/train.txt',encoding='utf-8') as f:
  3. for line in f:
  4. line = line.rstrip()
  5. label, words = line.split('\t')
  6. words = words.split(' ')
  7. counter.update(words)
  8. words = ['<pad>'] + [w for w, freq in counter.most_common() if freq>=10]
  9. print('Vocab Size:',len(words))
  10. Path('./datasets/vocab').mkdir(exist_ok=True)
  11. with open('./datasets/vocab/word.txt','w',encoding='utf-8') as f:
  12. for w in words:
  13. f.write(w+'\n')

第七步,得到新的word2id映射表

  1. word2idx = {}
  2. with open('./datasets/vocab/word.txt',encoding='utf-8') as f:
  3. for i,line in enumerate(f):
  4. line = line.rstrip()
  5. word2idx[line] = i

三、实战--embedding词嵌入矩阵

每一个单词,都对应一个向量,表明这个单词的一些特征,这些单词所对应的向量组成的矩阵也叫做词嵌入矩阵,即embedding。

对于embedding,可以基于网络来训练,也可以直接加载到别人训练好的,一般都是加载预训练模型。这里有一些常用的:https://nlp.stanford.edu/projects/glove

下面对已经训练好的词向量进行导入,创建embedding

  1. #做一个大表,里面有20598个不同的词,【20599*50
  2. embedding = np.zeros((len(word2idx)+1,50)) # +1 表示如果不在语料库中,就都是unknow
  3. with open('./datasets/glove.6B/glove.6B.50d.txt',encoding='utf-8') as f: # 下载好的
  4. count = 0
  5. for i, line in enumerate(f):
  6. if i % 100000 == 0:
  7. print(f'- At line {i}') # 打印处理了多少数据
  8. line = line.rstrip()
  9. sp = line.split(' ')
  10. word, vec = sp[0], sp[1:]
  11. if word in word2idx:
  12. count += 1
  13. embedding[word2idx[word]] = np.asarray(vec,dtype='float32') #将词转换成对应的向量

可以对embedding进行保存

  1. print(f"{count} / {len(word2idx)} words have found pre-trained values")
  2. np.save("./datasets/vocab/word.npy",embedding)
  3. print("Save ./datasets/vocab/word.npy")

四、实战--构建训练数据

  1. # 得到文本对应的id
  2. def data_generator(f_path,params):
  3. with open(f_path,encoding='utf-8') as f:
  4. print("Reading",f_path)
  5. for line in f:
  6. line = line.rstrip()
  7. label, text = line.split('\t')
  8. text = text.split(' ')
  9. x = [params['word2idx'].get(w, len(word2idx)) for w in text] #得到当前词所对应的ID
  10. if len(x) >= params['max_len']: # 截断操作
  11. x = x[:params['max_len']]
  12. else:
  13. x += [0] * (params['max_len'] - len(x)) #补齐操作
  14. y = int(label)
  15. yield x, y
  16. #得到数据集
  17. def dataset(is_training, params):
  18. #is_training代表是训练数据还是测试数据
  19. _shapes = ([params['max_len']],())
  20. _types = (tf.int32, tf.int32)
  21. if is_training:
  22. ds = tf.data.Dataset.from_generator(
  23. lambda: data_generator(params['train_path'],params),
  24. output_shapes = _shapes,
  25. output_types = _types
  26. )
  27. ds = ds.shuffle(params['num_samples'])
  28. ds = ds.batch(params['batch_size'])
  29. ds = ds.prefetch(tf.data.experimental.AUTOTUNE) #设置缓存队列,根据可用的CPU动态设置并行调用的数量,说白了就是加速
  30. else:
  31. ds = tf.data.Dataset.from_generator(
  32. lambda: data_generator(params['test_path'], params), # 加lambda,是为了防止函数data_gen直接执行
  33. output_shapes = _shapes,
  34. output_types = _types,
  35. )
  36. ds = ds.batch(params['batch_size'])
  37. ds = ds.prefetch(tf.data.experimental.AUTOTUNE)
  38. return ds

五、实战--自定义网络模型

首先,了解一下embedding_lookup的作用

 再来看看如何进行分类的:

 双向RNN:

建立模型一:

  1. class Model1(tf.keras.Model):
  2. def __init__(self, params):
  3. super().__init__()
  4. self.embedding = tf.Variable(np.load('./datasets/vocab/word.npy'),
  5. dtype = tf.float32,
  6. name = 'pretrained_embedding',
  7. trainable = False
  8. )
  9. self.drop1 = tf.keras.layers.Dropout(params['dropout_rate'])
  10. self.drop2 = tf.keras.layers.Dropout(params['dropout_rate'])
  11. self.drop3 = tf.keras.layers.Dropout(params['dropout_rate'])
  12. self.rnn1 = tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(params['rnn_units'],return_sequences=True))
  13. self.rnn2 = tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(params['rnn_units'],return_sequences=True))
  14. self.rnn3 = tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(params['rnn_units'],return_sequences=False))
  15. self.drop_fc = tf.keras.layers.Dropout(params['dropout_rate'])
  16. self.fc = tf.keras.layers.Dense(2*params['rnn_units'],tf.nn.elu)
  17. self.out_linear = tf.keras.layers.Dense(2)
  18. def call(self, inputs, training=False):
  19. if inputs.dtype != tf.int32:
  20. inputs = tf.cast(inputs, tf.int32)
  21. batch_sz = tf.shape(inputs)[0]
  22. run_units = 2*params['rnn_units']
  23. # inputs形状 是 batch * maxlen,即多少个样本为一组,每个样本的长度固定为最大长度
  24. x = tf.nn.embedding_lookup(self.embedding, inputs)
  25. # embedding 之后,多了50维词向量,即x为 batch * maxlen * 50
  26. x = self.drop1(x, training=training)
  27. x = self.rnn1(x)
  28. x = self.drop2(x, training=training)
  29. x = self.rnn2(x)
  30. x = self.drop3(x, training=training)
  31. x = self.rnn3(x)
  32. x = self.drop_fc(x, training=training)
  33. x = self.fc(x)
  34. x = self.out_linear(x)
  35. return x

 建立模型二:

  1. class Model2(tf.keras.Model):
  2. def __init__(self, params):
  3. super().__init__()
  4. self.embedding = tf.Variable(np.load('./datasets/vocab/word.npy'),
  5. dtype = tf.float32,
  6. name = 'pretrained_embedding',
  7. trainable = False
  8. )
  9. self.drop1 = tf.keras.layers.Dropout(params['dropout_rate'])
  10. self.drop2 = tf.keras.layers.Dropout(params['dropout_rate'])
  11. self.drop3 = tf.keras.layers.Dropout(params['dropout_rate'])
  12. self.rnn1 = tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(params['rnn_units'],return_sequences=True))
  13. self.rnn2 = tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(params['rnn_units'],return_sequences=True))
  14. self.rnn3 = tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(params['rnn_units'],return_sequences=True))
  15. self.drop_fc = tf.keras.layers.Dropout(params['dropout_rate'])
  16. self.fc = tf.keras.layers.Dense(2*params['rnn_units'],tf.nn.elu)
  17. self.out_linear = tf.keras.layers.Dense(2)
  18. def call(self, inputs, training=False):
  19. if inputs.dtype != tf.int32:
  20. inputs = tf.cast(inputs, tf.int32)
  21. batch_sz = tf.shape(inputs)[0]
  22. run_units = 2*params['rnn_units']
  23. # inputs形状 是 batch * maxlen,即多少个样本为一组,每个样本的长度固定为最大长度
  24. x = tf.nn.embedding_lookup(self.embedding, inputs)
  25. # embedding 之后,多了50维词向量,即x为 batch * maxlen * 50
  26. x = tf.reshape(x, (batch_sz*10*10, 10, 50))
  27. x = self.drop1(x, training=training)
  28. x = self.rnn1(x)
  29. x = tf.reduce_max(x,1)
  30. x = tf.reshape(x, (batch_sz*10, 10, run_units))
  31. x = self.drop2(x, training=training)
  32. x = self.rnn2(x)
  33. x = tf.reduce_max(x,1)
  34. x = tf.reshape(x, (batch_sz, 10, run_units))
  35. x = self.drop3(x, training=training)
  36. x = self.rnn3(x)
  37. x = tf.reduce_max(x,1)
  38. x = self.drop_fc(x, training=training)
  39. x = self.fc(x)
  40. x = self.out_linear(x)
  41. return x

设置参数

  1. params = {
  2. 'vocab_path': './datasets/vocab/word.txt',
  3. 'train_path': './datasets/RNN-imdb/train.txt',
  4. 'test_path': './datasets/RNN-imdb/test.txt',
  5. 'num_samples': 25000,
  6. 'num_labels': 2,
  7. 'batch_size': 32,
  8. 'max_len': 1000,
  9. 'rnn_units': 200,
  10. 'dropout_rate': 0.2,
  11. 'clip_norm': 10., # 防止梯度过大
  12. 'num_patience': 3, # 没有下降的最大次数
  13. 'lr': 3e-4 # 学习率
  14. }

六、实战--训练

  1. #用来判断进行提前停止
  2. def is_descending(history: list):
  3. # 若经过 num_patience 次梯度下降,损失值或者准确率没有优化,反而下降
  4. # 则返回 true ,停止迭代
  5. history = history[-(params['num_patience']+1):]
  6. for i in range(1, len(history)):
  7. if history[i-1] <= history[i]:
  8. return False
  9. return True
  1. word2idx = {}
  2. with open(params['vocab_path'],encoding='utf-8') as f:
  3. for i, line in enumerate(f):
  4. line = line.rstrip()
  5. word2idx[line] = i
  6. params['word2idx'] = word2idx
  7. params['vocab_size'] = len(word2idx) + 1
  8. model = Model2(params)
  9. model.build(input_shape=(None,None))#设置输入的大小,或者fit时候也能自动找到
  10. # pprint.pprint([(v.name,v.shape) for v in model.trainable_variables])
  11. #链接:https://tensorflow.google.cn/api_docs/python/tf/keras/optimizers/schedules/ExponentialDecay?version=stable
  12. # return initial_learning_rate * decay_rate ^ (step / decay_steps)
  13. decay_lr = tf.optimizers.schedules.ExponentialDecay(params['lr'],1000,0.95) # 加一个指数衰减函数
  14. optim = tf.optimizers.Adam(params['lr'])
  15. global_step = 0
  16. history_acc = []
  17. best_acc = 0
  18. t0 = time.time()
  19. logger = logging.getLogger('tensorflow')
  20. logger.setLevel(logging.INFO)
  1. while True:
  2. #训练模型
  3. for texts, labels in dataset(is_training=True,params=params):
  4. with tf.GradientTape() as tape: #梯度带,记录所有在上文中的操作,并且通过调用 .gradient() 获得任何上下文中计算得出的张量的梯度
  5. logits = model(texts, training=True)
  6. loss = tf.nn.sparse_softmax_cross_entropy_with_logits(labels=labels,logits=logits)
  7. loss = tf.reduce_mean(loss)
  8. optim.lr.assign(decay_lr(global_step))
  9. grads = tape.gradient(loss, model.trainable_variables)
  10. grads, _ = tf.clip_by_global_norm(grads, params['clip_norm']) #将梯度限制一下,有的时候会更新太猛,防止过拟合
  11. optim.apply_gradients(zip(grads, model.trainable_variables)) #更新梯度
  12. if global_step % 50 == 0:
  13. logger.info(f"Step {global_step} | Loss: {loss.numpy().item():.4f} | \
  14. Spent: {time.time() - t0:.1f} secs | \
  15. LR: {optim.lr.numpy().item():.6f} ")
  16. t0 = time.time()
  17. global_step += 1
  18. # 验证集效果
  19. m = tf.keras.metrics.Accuracy()
  20. for texts, labels in dataset(is_training=False,params=params):
  21. logits = model(texts,training=False)
  22. y_pred = tf.argmax(logits, axis=-1)
  23. m.update_state(y_true=labels, y_pred=y_pred)
  24. acc = m.result().numpy()
  25. logger.info(f"Evaluation: Testing Accuracy: {acc:.3f}")
  26. history_acc.append(acc)
  27. if acc > best_acc:
  28. best_acc = acc
  29. logger.info(f"Best Accuracy: {best_acc:.3f}")
  30. if len(history_acc) > params['num_patience'] and is_descending(history_acc):
  31. logger.info(f"Test Accuracy not improved over {params['num_patience']} epochs ,Early stop")
  32. break
声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop】
推荐阅读
相关标签
  

闽ICP备14008679号