赞
踩
将数字转换为相对应的汉字,例如:
1 壹元整
2 贰元整
3 叁元整
4 肆元整
5 伍元整
6 陆元整
7 柒元整
8 捌元整
9 玖元整
10 壹拾元整
11 壹拾壹元整
12 壹拾贰元整
13 壹拾叁元整
14 壹拾肆元整
15 壹拾伍元整
def load_corpus(data_path): '''加载语料文件,返回编码,解码内容列表以及编码,解码字符集''' # 读取文件,按回车分行,将每一行为一个元素,存放到一个列表中 with open(data_path, 'r', encoding='utf-8') as f: lines = f.read().split('\n') # 训练样本大小最多为80000 num_samples = 80000 # 输入数据(用来存放加载数据的全部数字字符) input_texts = [] # 目标数据(用来存放加载数据的全部的中文大写字符) target_texts = [] # 全部输入数字数据token合集,每一个数字都是唯一值,例如:1, 2, 3, 4, 0, 5, 6, 7,不会有10,11等 input_characters = set() # 全部输出状态合集, 每一个汉字都是唯一的,例如:壹,元, 整 target_characters = set() # min(num_samples, len(lines)-1): 只要num_samples条数据,多余的不要 for line in lines[:min(num_samples, len(lines) - 1)]: if not line.strip(): continue # 跳过空行 try: # 将'1\t壹元整'拆开, input_text是1\t,target_text是壹元整 input_text, target_text = line.split('\t') except ValueError: # 如果错误则输出错误的行 print('Error line:', line) input_text='' target_text='' # 计算input_text中的tokens数量 for char in input_text: if char not in input_characters: # 如果数字不存在input_characters中就加入其中 input_characters.add(char) # 计算 target_texts 中的tokens数量 for char in target_text: if char not in target_characters: # 如果汉字不存在targe_characters中就加入其中 target_characters.add(char) # target_text起始位置字符使用'^',结束位置字符使用'$'(正则表达式规范) target_text = '^' + target_text + '$' input_texts.append(input_text) # 列表里包括所有的数字 target_texts.append(target_text) # 列表里包括所有的汉字 # 对数字和汉字进行排序 input_characters = sorted(list(input_characters)) target_characters = sorted(list(target_characters)) # 返回列表(原数据所有数字,原数据所有汉字,唯一值数字, 唯一值汉字) return input_texts, target_texts, input_characters, target_characters
结果:
def build_charset_dict(chs_list, spec_list=[]):
'''
根据字符集和特殊符号创建字典
'_' 映射填充符, 对应索引为0
'''
# '_'用来映射填充符,对应索引为 0
# spec_list 是用来为汉字加上^$符号的,默认为空是因为数字不需要加上^$符号
temp_lst = ['_'] + spec_list + chs_list
# 这两步是将数字和字典进行编号,先进行,创建元组再转换为字典,'_': 0, '0': 1 ....
temp = [(char, i) for i, char in enumerate(temp_lst)]
token_index = dict(temp)
# 返回字典
return token_index
结果:
例如: 数据中全部数字的列表input_texts [‘1’…‘80000’], 那么它的最大长度是5
def get_context_max_len(context):
'''统计并返回文本项集合最大文本长度'''
# 遍历加载数据中全部的文字长度或数字长度到列表,获得最大长度并返回,用来构建token index矩阵
max_seq_length = max([len(txt) for txt in context])
return max_seq_length
结果:
def rewrite(input_texts, input_token_index, enc_max_len, target_texts, target_token_index, dec_max_len): ''' 输入项编码器token序列(文本序列)转换token index矩阵 输入项解码器token序列(文本序列)转换为token index矩阵 ''' # 这个函数用来讲token序列转换为矩阵 def convert_tokens_index_part(context, chs_dict, chs_max_len): # 先创建一个input_texts * 最大长度的 全是0的矩阵 token_index_matrix = np.zeros((len(context), chs_max_len), dtype=np.int32) for i, input_text in enumerate(context): # 遍历input_texts中每一个元素,eg: input_text = '121', 列表里是'1', '2', '1',每一个去对应数字的字典里面找到他们对应的索引值,并放回一个列表 input_indexs = [chs_dict.get(char) for char in input_text] # pad_sequences是用来将不足最大chs_max_len的列表填充,post表示在序列后面填充 token_index_matrix[i] = pad_sequences([input_indexs], maxlen=chs_max_len, padding='post') # 返回矩阵 return token_index_matrix # 第一个是编码的矩阵;第二个是解码输入的矩阵,不要$字符, 所以最大长度减一;第三个是解码的输出矩阵,不要^字符,所以最大长度减一 return convert_tokens_index_part(input_texts, input_token_index, enc_max_len), np.array([row[row != 2] for row in convert_tokens_index_part(target_texts, target_token_index, dec_max_len-1)]), convert_tokens_index_part(target_texts, target_token_index, dec_max_len-1)[:, 1:]
结果:
def build_basic_model(num_encoder_tokens, encoder_embedding_dim, num_decoder_tokens, decoder_enbedding_dim, latent_dim): ''' 创建编码器模型 function api 参数: num_encoder_tokens: 编码字典大小 encoder_embedding_dim: 编码词向量大小 latent_dim: 神经元的数量 ''' ####################Encoder layer###################### # shape (batch, seq_len) shape=(None, ) 指的是一个矩阵 # 例如:数字维度为 80000 * 5 的一个矩阵 encoder_inputs = K.layers.Input(shape=(None,), name='encoder_inputs') # Embedding层矩阵:[11,20] '0~9'加上'_'的11个字符, 特征维度20 # mask_zero=True:如果为True,则在输入序列中的0值将被忽略,不会生成对应的嵌入向量。 encoder_embedding = K.layers.Embedding(num_encoder_tokens, encoder_embedding_dim, mask_zero=True, name='encoder_embedding')(encoder_inputs) # 输入 shape(batch, seq_len, token_len) # 三维 # return_state=True 返回最后一次训练cell_state、hidden_state结果 # 随机dropout的隐藏层参数值, 循环训练中随机dropout的占比 encoder_lstm = K.layers.LSTM(latent_dim, return_state=True, return_sequences=False,dropout=0.2, recurrent_dropout=0.5, name="encoder_lstm") ##### 最后一个时间步的变量,ct, ht # 取lstm返回的hidden_state和cell_state的输出【seq2seq中的C】 _, encoder_state_h, encoder_state_c = encoder_lstm(encoder_embedding) ##### 将ct和ht拼接起来 encoder_states = [encoder_state_h, encoder_state_c] ''' 创建解码器 num_decoder_tokens: 解码字典大小 decoder_enbedding_dim: 解码词向量大小 latent_dim:rnn层神经元数量 initial_states: 解码器参数的初始状态 ''' ######################Decoder########################## # 每一个序列的输出项都需要推理 # eg = 80000 * 20 decoder_inputs = K.layers.Input(shape=(None,), name='decoder_inputs') # Embedding层矩阵:[19,25] 13个大写中文'十百千万',加上'_^$'3个标记符号,共19个 特征维度25 decoder_embedding = K.layers.Embedding(num_decoder_tokens, decoder_enbedding_dim,mask_zero=True, name='decoder_embedding')(decoder_inputs) # return_state=True 返回[?,128]个cell state (最后一次循环的输出) # return_sequences=True 返回lstm训练后完整的一套hidden_state [样本数,循环次数,hidden值] decoder_lstm = K.layers.LSTM(latent_dim, return_state=True, return_sequences=True,dropout=0.2, recurrent_dropout=0.5, name="decoder_lstm") # 取lstm每次训练输出的hidden_state, *代表以list类型接收返回值 # lstm中隐藏层参数的初始值来自于encoder层lstm的hidden_state和cell_state(记录了encoder层训练的状态) # *代表动态list,所有返回值,都装入动态list中 rnn_outputs, *_ = decoder_lstm(decoder_embedding, initial_state=encoder_states) # shape(bath, seq_len, hidden) # 全连接层,把lstm计算得到的[?,?,128]结果,通过全连接转换为[?,?,19]个softmax概率输出 # 结果:[?,?,19] 预测rnn每次time_step训练后,可能是某个字符的概率 decoder_dense = K.layers.Dense(num_decoder_tokens, activation='softmax', name='decoder_dense') decoder_outputs = decoder_dense(rnn_outputs) # encoder和decoder组装 # 输入参数 = [输入编码,目标编码], 输出参数 = [目标编码预测] basic_model = K.models.Model(inputs=[encoder_inputs, decoder_inputs], outputs=[decoder_outputs]) return basic_model
if __name__ == '__main__': # 读入数据 data_path = r'C:\AI\深度学习\算法\NLP\EncoderDecoder\dataset.txt' # 加载语料文件 input_texts, target_texts, input_chs, target_chs = load_corpus(data_path) # 构建token dict input_token_index = build_charset_dict(input_chs) special_characters = ['^', '$'] target_token_index = build_charset_dict(target_chs, special_characters) # 构建模型训练用token索引数据集 enc_max_len = get_context_max_len(input_texts) dec_max_len = get_context_max_len(target_texts) encoder_input_data, decoder_input_data, decoder_output_data = rewrite(input_texts, input_token_index, enc_max_len, target_texts, target_token_index, dec_max_len) # print('encoder_input_data:\n', encoder_input_data) # print('decoder_input_data:\n', decoder_input_data) # print('decoder_output_data:\n', decoder_output_data) # 构建模型 enc_token_len = len(input_token_index) enc_emb_len = 20 dec_token_len = len(target_token_index) dec_emb_len = 25 latent_dim = 128 batch_size = 64 epochs = 5 train_model = build_basic_model(enc_token_len, enc_emb_len, dec_token_len, dec_emb_len, latent_dim) train_model.compile( optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'] ) # 回调函数 # 保存文件权重 callback_list = [K.callbacks.ModelCheckpoint('basic_model_best.h5', save_best_only=True)] train_model.fit( x = [encoder_input_data, decoder_input_data], y = decoder_output_data, batch_size=batch_size, epochs=epochs, validation_split=0.2, callbacks=callback_list) # 保存编码和解码词典 import json with open('enc_dec_dict.json', 'w', encoding='utf-8') as f: # 唯一汉字,唯一数字索引,最大长度 json.dump([input_token_index, target_token_index, enc_max_len, dec_max_len], f, ensure_ascii=False) # ensure_ascii=False: 可读的 print('模型相关数据保存成功')
这里LSTM有一个问题:
结果:
if __name__ == '__main__': # 神经元数量 latent_dim = 128 # 编码和解码器的权重 encoder, decoder = build_basic_inference_model('basic_model_best.h5', latent_dim) # 保存编码和解码词典 import json # 加载相关模型数据 with open('enc_dec_dict.json', 'r', encoding='utf-8') as f: input_token_index, target_token_index, enc_max_len, dec_max_len = json.load(f) print('模型相关数据加载成功') # 输入测试文本 s = '264' # 目标字符反向字典索引 # eg: 3: '万', 1: '^'..... reverse_target_word_index = dict([(i,c) for c,i in target_token_index.items()]) # 将s转换为索引编码列表 token_index = [input_token_index[c] for c in s] # shape [1, 3] (1, 序列长度) [[3, 7, 5]] input_seq = np.asarray([token_index]) # encoder预测(最后一层输出的hidden_state和cell_state) states_value_h, states_value_c = encoder.predict(input_seq) # 要预测的起始字符'^' target_seq = np.zeros((1,1)) # [[0]] target_seq[0,0] = target_token_index['^'] # 是否发现停止字符(循环停止条件) stop_condition = False # 预测结果 decoded_sentence = '' # stop_condition:停止条件,为True就是停止,False就是不停止 while not stop_condition: # 通过encoder层最后输出的states,加上起始字符,进行预测 # 1个样本(批次)、1次循环、19个结果(19个字符索引的概率) # 传入解码器的有编码器的ht,ct还有预测的起始字符(可能是^, 也可能是上一个的预测) output, decoder_state_h, decoder_states_c = decoder.predict([target_seq, states_value_h, states_value_c]) # 概率判断预测字符的索引 # 返回第一个时间步上最大值的索引 sampled_token_index = np.argmax(output[0,0,:]) # 索引这个单词的汉字 sampled_word = reverse_target_word_index[sampled_token_index] # print(output.shape) # print('预测的目标字符索引:',sampled_token_index) # print('预测的目标字符:', sampled_word) # 如果预测到了结束字符,或预测的字符长度超过了目标最大字符长度,则设置循环终止 if sampled_word == '$' or len(decoded_sentence) > dec_max_len: stop_condition = True continue # 拼接预测字符 decoded_sentence += sampled_word # 更新预测起始字符 target_seq = np.zeros((1,1)) target_seq[0,0] = sampled_token_index # 更新预测输入 states_value_h = decoder_state_h states_value_c = decoder_states_c print('预测字符序列:', decoded_sentence)
结果:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。