赞
踩
简单概述:根据输入内容,继续输出后面的句子。
RNN模型的接口可以输出两个结果:预测值和当前状态
样本内容:
在尘世的纷扰中,只要心头悬挂着远方的灯光,我们就会坚持不懈地走,理想为我们灌注了精神的蕴藉。所以,生活再平凡、再普通、再琐碎,我们都要坚持一种信念,默守一种精神,为自己积淀站立的信心,前行的气力。
首先引入头文件,然后定义相关函数:get_ch_lable()从文件中获取文本,get_ch._able_v0将文本数组转方向量,具体代码如下:
- import numpy as np
- import torch
- import torch.nn.functional as F
- import time
- import random
- from collections import Counter
-
- # 1.1 定义基本的工具函数
- RANDOM_SEED = 123
- torch.manual_seed(RANDOM_SEED)
- DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
-
- def elapsed(sec): # 计算时间函数
- if sec < 60:
- return str(sec) + "sec"
- elif sec<(60*60):
- return str(sec/60) + "min"
- else:
- return str(sec/(60*60)) + "hour"
-
- training_file = 'csv_list/wordstest.txt' # 定义样本文件
-
- #中文字
- def get_ch_label(txt_file): # 提取样本中的汉字
- labels = ""
- with open(txt_file,'rb') as f :
- for label in f:
- labels = labels + label.decode("gb2312",errors = 'ignore')
- return labels
-
- #中文多文件
- def readalltxt(txt_files): # 处理中文
- labels = []
- for txt_file in txt_files:
- target = get_ch_label(txt_file)
- labels.append(target)
- return labels
-
- # 将汉子转化成向量,支持文件和内存对象里的汉字转换
- def get_ch_label_v(txt_file,word_num_map,txt_label = None):
- words_size = len(word_num_map)
- to_num = lambda word:word_num_map.get(word,words_size)
- if txt_file != None:
- txt_label = get_ch_label(txt_file)
- # 使用to_num()实现单个汉子转成向量的功能,如果没有该汉字,则将返回words_size(值为69)
- labels_vector = list(map(to_num,txt_label)) # 将汉字列表中的每个元素传入到to_num()进行转换
- return labels_vector
样本预处理这个一套指读取整个样本,将其放入training_data里,获取全部的字表words,并生成样本向量wordlabel与向量有对应关系的word_num_map,代码如下:
- # 1.2 样本预处理
- training_data = get_ch_label(training_file)
- print("加载训练模型中")
- print("该样本长度:",len(training_data))
- counter = Counter(training_data)
- words = sorted(counter)
- words_size = len(words)
- word_num_map = dict(zip(words,range(words_size)))
- print("字表大小:",words_size)
- wordlabel = get_ch_label_v(training_file,word_num_map)
- # 加载训练模型中
- # 该样本长度: 75
- # 字表大小: 41(去重)
上述结果表示样本文件里一共有75个文字,其中掉重复的文字之后,还有41个。这41个文字将作为字表词典,建立文字与索引值的对应关系。
在训练模型时,每个文字都会被转化成数字形式的索引值输入模型。模型的输出是这41个文字的概率,即把每个文字当成一类。
使用GRU构建RNN模型,令RNN模型只接收一个序列的喻入字符,并预测出下一个序列的字符。
在该模型里,所需要完成的步骤如下:
- # 1.3 构建循环神经网络(RNN)模型
- class GRURNN(torch.nn.Module):
- def __init__(self,word_size,embed_dim,hidden_dim,output_size,num_layers):
- super(GRURNN, self).__init__()
- self.num_layers = num_layers
- self.hidden_dim = hidden_dim
-
- self.embed = torch.nn.Embedding(word_size,embed_dim)
- # 定义一个多层的双向层:
- # 预测结果:形状为[序列,批次,维度hidden_dim×2],因为是双向RNN,故维度为hidden_dim
- # 序列状态:形状为[层数×2,批次,维度hidden_dim]
- self.gru = torch.nn.GRU(input_size=embed_dim,
- hidden_size=hidden_dim,
- num_layers=num_layers,bidirectional=True)
- self.fc = torch.nn.Linear(hidden_dim *2,output_size)# 全连接层,充当模型的输出层,用于对GRU输出的预测结果进行处理,得到最终的分类结果
-
- def forward(self,features,hidden):
- embeded = self.embed(features.view(1,-1))
- output,hidden = self.gru(embeded.view(1,1,-1),hidden)
- # output = self.attention(output)
- output = self.fc(output.view(1,-1))
- return output,hidden
-
- def init_zero_state(self): # 对于GRU层状态的初始化,每次迭代训练之前,需要对GRU的状态进行清空,因为输入的序列是1,故torch.zeros的第二个参数为1
- init_hidden = torch.zeros(self.num_layers*2,1,self.hidden_dim).to(DEVICE)
- return init_hidden
- # 1.4 实例化模型类,并训练模型
- EMBEDDING_DIM = 10 # 定义词嵌入维度
- HIDDEN_DIM = 20 # 定义隐藏层维度
- NUM_LAYERS = 1 # 定义层数
- # 实例化模型
- model = GRURNN(words_size,EMBEDDING_DIM,HIDDEN_DIM,words_size,NUM_LAYERS)
- model = model.to(DEVICE)
- optimizer = torch.optim.Adam(model.parameters(),lr=0.005)
-
- # 定义测试函数
- def evaluate(model,prime_str,predict_len,temperature=0.8):
- hidden = model.init_zero_state().to(DEVICE)
- predicted = ""
- # 处理输入语义
- for p in range(len(prime_str) -1):
- _,hidden = model(prime_str[p],hidden)
- predicted = predicted + words[predict_len]
- inp = prime_str[-1] # 获得输入字符
- predicted = predicted + words[inp]
- #按照指定长度输出预测字符
- for p in range(predict_len):
- output,hidden = model(inp,hidden) # 将输入字符和状态传入模型
- # 从多项式中分布采样
- # 在测试环境下,使用温度的参数和指数计算对模型的输出结果进行微调,保证其数值是大于0的数,小于0,torch.multinomial()会报错
- # 同时,使用多项式分布的方式进行采样,生成预测结果
- output_dist = output.data.view(-1).div(temperature).exp()
- inp = torch.multinomial(output_dist,1)[0] # 获取采样结果
- predicted = predicted + words[inp] # 将索引转化成汉字保存在字符串中
- return predicted
-
- # 定义参数并训练
- training_iters = 5000
- display_step = 1000
- n_input = 4
- step = 0
- offset = random.randint(0,n_input+1)
- end_offset = n_input + 1
-
- while step < training_iters: # 按照迭代次数训练模型
- start_time = time.time() # 计算起始时间
- #随机取一个位置偏移
- if offset > (len(training_data)-end_offset):
- offset = random.randint(0,n_input+1)
- # 制作输入样本
- inwords = wordlabel[offset:offset+n_input]
- inwords = np.reshape(np.array(inwords),[n_input,-1,1])
- # 制作标签样本
- out_onehot = wordlabel[offset+1:offset+n_input+1]
- hidden = model.init_zero_state() # RNN的状态清零
- optimizer.zero_grad()
-
- loss = 0.0
- inputs = torch.LongTensor(inwords).to(DEVICE)
- targets = torch.LongTensor(out_onehot).to(DEVICE)
- for c in range(n_input): # 按照输入长度将样本预测输入模型并进行预测
- outputs,hidden = model(inputs[c],hidden)
- loss = loss + F.cross_entropy(outputs,targets[c].view(1))
- loss = loss / n_input
- loss.backward()
- optimizer.step()
- # 输出日志
- with torch.set_grad_enabled(False):
- if (step+1)%display_step == 0 :
- print(f'Time elapesd:{(time.time() - start_time)/60:.4f}min')
- print(f'step {step + 1}|Loss {loss.item():.2f}\n\n')
- with torch.no_grad():
- print(evaluate(model,inputs,32),'\n')
- print(50*'=')
- step = step +1
- # 每次迭代结束,将偏移值相后移动n_input+1个距离单位,可以保证输入数据的样本相互均匀,否则会出现文本两边的样本训练次数较少的情况。
- offset = offset + (n_input+1)
- print("Finished!")
- # 1.5 运行模型生成句子
- while True:
- prompt = "输入几个文字:"
- sentence = input(prompt)
- inputword = sentence.strip()
- try:
- inputword = get_ch_label_v(None,word_num_map,inputword)
- keys = np.reshape(np.array(inputword),[len(inputword),-1,1])
- # get_ch_label_v()中,如果在字典中找不到对应的索引,就会为其分配一个无效的索引值,
- # 进而在 evaluate()函数中调用模型的时,差不多对应对的有效词向量而终止报错
- model.eval()
- with torch.no_grad():
- sentence = evaluate(model,torch.LongTensor(keys).to(DEVICE),32)
- print(sentence)
- except: # 异常处理,当输入的文字不在模型字典中时,系统会报错,有意设置,防止输入超范围的字词
- print("还没学会")
- import numpy as np
- import torch
- import torch.nn.functional as F
- import time
- import random
- from collections import Counter
-
- # 1.1 定义基本的工具函数
- RANDOM_SEED = 123
- torch.manual_seed(RANDOM_SEED)
- DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
-
- def elapsed(sec): # 计算时间函数
- if sec < 60:
- return str(sec) + "sec"
- elif sec<(60*60):
- return str(sec/60) + "min"
- else:
- return str(sec/(60*60)) + "hour"
-
- training_file = 'csv_list/wordstest.txt' # 定义样本文件
-
- #中文字
- def get_ch_label(txt_file): # 提取样本中的汉字
- labels = ""
- with open(txt_file,'rb') as f :
- for label in f:
- labels = labels + label.decode("gb2312",errors = 'ignore')
- return labels
-
- #中文多文件
- def readalltxt(txt_files): # 处理中文
- labels = []
- for txt_file in txt_files:
- target = get_ch_label(txt_file)
- labels.append(target)
- return labels
-
- # 将汉子转化成向量,支持文件和内存对象里的汉字转换
- def get_ch_label_v(txt_file,word_num_map,txt_label = None):
- words_size = len(word_num_map)
- to_num = lambda word:word_num_map.get(word,words_size)
- if txt_file != None:
- txt_label = get_ch_label(txt_file)
- # 使用to_num()实现单个汉子转成向量的功能,如果没有该汉字,则将返回words_size(值为69)
- labels_vector = list(map(to_num,txt_label)) # 将汉字列表中的每个元素传入到to_num()进行转换
- return labels_vector
-
- # 1.2 样本预处理
- training_data = get_ch_label(training_file)
- print("加载训练模型中")
- print("该样本长度:",len(training_data))
- counter = Counter(training_data)
- words = sorted(counter)
- words_size = len(words)
- word_num_map = dict(zip(words,range(words_size)))
- print("字表大小:",words_size)
- wordlabel = get_ch_label_v(training_file,word_num_map)
- # 加载训练模型中
- # 该样本长度: 75
- # 字表大小: 41(去重)
-
- # 1.3 构建循环神经网络(RNN)模型
- class GRURNN(torch.nn.Module):
- def __init__(self,word_size,embed_dim,hidden_dim,output_size,num_layers):
- super(GRURNN, self).__init__()
- self.num_layers = num_layers
- self.hidden_dim = hidden_dim
-
- self.embed = torch.nn.Embedding(word_size,embed_dim)
- # 定义一个多层的双向层:
- # 预测结果:形状为[序列,批次,维度hidden_dim×2],因为是双向RNN,故维度为hidden_dim
- # 序列状态:形状为[层数×2,批次,维度hidden_dim]
- self.gru = torch.nn.GRU(input_size=embed_dim,
- hidden_size=hidden_dim,
- num_layers=num_layers,bidirectional=True)
- self.fc = torch.nn.Linear(hidden_dim *2,output_size)# 全连接层,充当模型的输出层,用于对GRU输出的预测结果进行处理,得到最终的分类结果
-
- def forward(self,features,hidden):
- embeded = self.embed(features.view(1,-1))
- output,hidden = self.gru(embeded.view(1,1,-1),hidden)
- # output = self.attention(output)
- output = self.fc(output.view(1,-1))
- return output,hidden
-
- def init_zero_state(self): # 对于GRU层状态的初始化,每次迭代训练之前,需要对GRU的状态进行清空,因为输入的序列是1,故torch.zeros的第二个参数为1
- init_hidden = torch.zeros(self.num_layers*2,1,self.hidden_dim).to(DEVICE)
- return init_hidden
-
- # 1.4 实例化模型类,并训练模型
- EMBEDDING_DIM = 10 # 定义词嵌入维度
- HIDDEN_DIM = 20 # 定义隐藏层维度
- NUM_LAYERS = 1 # 定义层数
- # 实例化模型
- model = GRURNN(words_size,EMBEDDING_DIM,HIDDEN_DIM,words_size,NUM_LAYERS)
- model = model.to(DEVICE)
- optimizer = torch.optim.Adam(model.parameters(),lr=0.005)
-
- # 定义测试函数
- def evaluate(model,prime_str,predict_len,temperature=0.8):
- hidden = model.init_zero_state().to(DEVICE)
- predicted = ""
- # 处理输入语义
- for p in range(len(prime_str) -1):
- _,hidden = model(prime_str[p],hidden)
- predicted = predicted + words[predict_len]
- inp = prime_str[-1] # 获得输入字符
- predicted = predicted + words[inp]
- #按照指定长度输出预测字符
- for p in range(predict_len):
- output,hidden = model(inp,hidden) # 将输入字符和状态传入模型
- # 从多项式中分布采样
- # 在测试环境下,使用温度的参数和指数计算对模型的输出结果进行微调,保证其数值是大于0的数,小于0,torch.multinomial()会报错
- # 同时,使用多项式分布的方式进行采样,生成预测结果
- output_dist = output.data.view(-1).div(temperature).exp()
- inp = torch.multinomial(output_dist,1)[0] # 获取采样结果
- predicted = predicted + words[inp] # 将索引转化成汉字保存在字符串中
- return predicted
-
- # 定义参数并训练
- training_iters = 5000
- display_step = 1000
- n_input = 4
- step = 0
- offset = random.randint(0,n_input+1)
- end_offset = n_input + 1
-
- while step < training_iters: # 按照迭代次数训练模型
- start_time = time.time() # 计算起始时间
- #随机取一个位置偏移
- if offset > (len(training_data)-end_offset):
- offset = random.randint(0,n_input+1)
- # 制作输入样本
- inwords = wordlabel[offset:offset+n_input]
- inwords = np.reshape(np.array(inwords),[n_input,-1,1])
- # 制作标签样本
- out_onehot = wordlabel[offset+1:offset+n_input+1]
- hidden = model.init_zero_state() # RNN的状态清零
- optimizer.zero_grad()
-
- loss = 0.0
- inputs = torch.LongTensor(inwords).to(DEVICE)
- targets = torch.LongTensor(out_onehot).to(DEVICE)
- for c in range(n_input): # 按照输入长度将样本预测输入模型并进行预测
- outputs,hidden = model(inputs[c],hidden)
- loss = loss + F.cross_entropy(outputs,targets[c].view(1))
- loss = loss / n_input
- loss.backward()
- optimizer.step()
- # 输出日志
- with torch.set_grad_enabled(False):
- if (step+1)%display_step == 0 :
- print(f'Time elapesd:{(time.time() - start_time)/60:.4f}min')
- print(f'step {step + 1}|Loss {loss.item():.2f}\n\n')
- with torch.no_grad():
- print(evaluate(model,inputs,32),'\n')
- print(50*'=')
- step = step +1
- # 每次迭代结束,将偏移值相后移动n_input+1个距离单位,可以保证输入数据的样本相互均匀,否则会出现文本两边的样本训练次数较少的情况。
- offset = offset + (n_input+1)
- print("Finished!")
-
- # 1.5 运行模型生成句子
- while True:
- prompt = "输入几个文字:"
- sentence = input(prompt)
- inputword = sentence.strip()
- try:
- inputword = get_ch_label_v(None,word_num_map,inputword)
- keys = np.reshape(np.array(inputword),[len(inputword),-1,1])
- # get_ch_label_v()中,如果在字典中找不到对应的索引,就会为其分配一个无效的索引值,
- # 进而在 evaluate()函数中调用模型的时,差不多对应对的有效词向量而终止报错
- model.eval()
- with torch.no_grad():
- sentence = evaluate(model,torch.LongTensor(keys).to(DEVICE),32)
- print(sentence)
- except: # 异常处理,当输入的文字不在模型字典中时,系统会报错,有意设置,防止输入超范围的字词
- print("还没学会")
-
-
模型结果:训练的并不好,但是还能用
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。