赞
踩
本语音识别程序分为声学模型部分和语言模型部分,框架分别采用keras和tensorflow。
程序中使用的完整开源数据集是thchs30、aishell、prime、stcmd,但在程序的演示中只使用了部分的thchs30的数据集。
下面代码是声学模型和语言模型的训练部分
- import os
- import tensorflow as tf
- from utils import get_data, data_hparams
- from keras.callbacks import ModelCheckpoint
-
-
- # 0.准备训练所需数据------------------------------
- data_args = data_hparams()
- data_args.data_type = 'train'
- data_args.data_path = 'data/'
- data_args.thchs30 = True
- data_args.aishell = True
- data_args.prime = True
- data_args.stcmd = True
- data_args.batch_size = 4
- data_args.data_length = 10
- # data_args.data_length = None
- data_args.shuffle = True
- train_data = get_data(data_args)
-
- # 0.准备验证所需数据------------------------------
-
- data_args = data_hparams()
- data_args.data_type = 'dev'
- # data_args.data_path = '../dataset/'
- data_args.data_path = 'data/'
- data_args.thchs30 = True
- data_args.aishell = True
- data_args.prime = True
- data_args.stcmd = True
- data_args.batch_size = 4
- # data_args.data_length = None
- data_args.data_length = 10
- data_args.shuffle = True
- dev_data = get_data(data_args)
-
- # 1.声学模型训练-----------------------------------
- from model_speech.cnn_ctc import Am, am_hparams
- am_args = am_hparams()
- am_args.vocab_size = len(train_data.am_vocab)
- am_args.gpu_nums = 1
- am_args.lr = 0.0008
- am_args.is_training = True
- am = Am(am_args)
-
- if os.path.exists('logs_am/model.h5'):
- print('load acoustic model...')
- am.ctc_model.load_weights('logs_am/model.h5')
-
- epochs = 10
- batch_num = len(train_data.wav_lst) // train_data.batch_size
-
- # checkpoint
- ckpt = "model_{epoch:02d}-{val_acc:.2f}.hdf5"
- checkpoint = ModelCheckpoint(os.path.join('./checkpoint', ckpt), monitor='val_loss', save_weights_only=False, verbose=1, save_best_only=True)
-
-
- batch = train_data.get_am_batch()
-
-
- am.ctc_model.fit_generator(batch, steps_per_epoch=batch_num, epochs=200, workers=1, use_multiprocessing=False)
-
- am.ctc_model.save_weights('logs_am/model.h5')
-
-
- # 2.语言模型训练-------------------------------------------
- from model_language.transformer import Lm, lm_hparams
- lm_args = lm_hparams()
- lm_args.num_heads = 8
- lm_args.num_blocks = 6
- lm_args.input_vocab_size = len(train_data.pny_vocab)
- lm_args.label_vocab_size = len(train_data.han_vocab)
- lm_args.max_length = 100
- lm_args.hidden_units = 512
- lm_args.dropout_rate = 0.2
- lm_args.lr = 0.0003
- lm_args.is_training = True
- lm = Lm(lm_args)
-
- epochs = 100
- with lm.graph.as_default():
- saver =tf.train.Saver()
- with tf.Session(graph=lm.graph) as sess:
- merged = tf.summary.merge_all()
- sess.run(tf.global_variables_initializer())
- add_num = 0
- if os.path.exists('logs_lm/checkpoint'):
- print('loading language model...')
- latest = tf.train.latest_checkpoint('logs_lm')
- add_num = int(latest.split('_')[-1])
- saver.restore(sess, latest)
- writer = tf.summary.FileWriter('logs_lm/tensorboard', tf.get_default_graph())
- for k in range(epochs):
- total_loss = 0
- batch = train_data.get_lm_batch()
- for i in range(batch_num):
- input_batch, label_batch = next(batch)
- feed = {lm.x: input_batch, lm.y: label_batch}
- cost,_ = sess.run([lm.mean_loss,lm.train_op], feed_dict=feed)
- total_loss += cost
- if (k * batch_num + i) % 10 == 0:
- rs=sess.run(merged, feed_dict=feed)
- writer.add_summary(rs, k * batch_num + i)
- print('epochs', k+1, ': average loss = ', total_loss/batch_num)
- saver.save(sess, 'logs_lm/model_%d' % (epochs + add_num))
- writer.close()
数据处理
- import difflib
- import numpy as np
- import tensorflow as tf
- import scipy.io.wavfile as wav
- from tqdm import tqdm
- from scipy.fftpack import fft
- from python_speech_features import mfcc
- from random import shuffle
- from keras import backend as K
-
- def data_hparams():
- params = tf.contrib.training.HParams(
- # vocab
- data_type='train',
- data_path='data/',
- thchs30=True,
- aishell=True,
- prime=True,
- stcmd=True,
- batch_size=1,
- data_length=10,
- shuffle=True)
- return params
-
-
- class get_data():
- def __init__(self, args):
- self.data_type = args.data_type
- self.data_path = args.data_path
- self.thchs30 = args.thchs30
- self.aishell = args.aishell
- self.prime = args.prime
- self.stcmd = args.stcmd
- self.data_length = args.data_length
- self.batch_size = args.batch_size
- self.shuffle = args.shuffle
- self.source_init()
-
- def source_init(self):
- print('get source list...')
- read_files = []
- if self.data_type == 'train':
- if self.thchs30 == True:
- read_files.append('thchs_train.txt')
- if self.aishell == True:
- read_files.append('aishell_train.txt')
- if self.prime == True:
- read_files.append('prime.txt')
- if self.stcmd == True:
- read_files.append('stcmd.txt')
- elif self.data_type == 'dev':
- if self.thchs30 == True:
- read_files.append('thchs_dev.txt')
- if self.aishell == True:
- read_files.append('aishell_dev.txt')
- elif self.data_type == 'test':
- if self.thchs30 == True:
- read_files.append('thchs_test.txt')
- if self.aishell == True:
- read_files.append('aishell_test.txt')
- self.wav_lst = []
- self.pny_lst = []
- self.han_lst = []
- for file in read_files:
- print('load ', file, ' data...')
- sub_file = 'data/' + file
- with open(sub_file, 'r', encoding='utf-8-sig') as f:
- data = f.readlines()
- for line in tqdm(data):
-
- wav_file, pny, han = line.split('\t')
- self.wav_lst.append(wav_file)
- self.pny_lst.append(pny.split(' '))
- self.han_lst.append(han.strip('\n'))
- if self.data_length:
- self.wav_lst = self.wav_lst[:self.data_length]
- self.pny_lst = self.pny_lst[:self.data_length]
- self.han_lst = self.han_lst[:self.data_length]
- print('make am vocab...')
- self.am_vocab = self.mk_am_vocab(self.pny_lst)
- print('make lm pinyin vocab...')
- self.pny_vocab = self.mk_lm_pny_vocab(self.pny_lst)
- print('make lm hanzi vocab...')
- self.han_vocab = self.mk_lm_han_vocab(self.han_lst)
-
- def get_am_batch(self):
- shuffle_list = [i for i in range(len(self.wav_lst))]
- while 1:
- if self.shuffle == True:
- shuffle(shuffle_list)
- for i in range(len(self.wav_lst) // self.batch_size):
- wav_data_lst = []
- label_data_lst = []
- begin = i * self.batch_size
- end = begin + self.batch_size
- sub_list = shuffle_list[begin:end]
- for index in sub_list:
- fbank = compute_fbank(self.data_path + self.wav_lst[index])
- pad_fbank = np.zeros((fbank.shape[0] // 8 * 8 + 8, fbank.shape[1]))
- pad_fbank[:fbank.shape[0], :] = fbank
- label = self.pny2id(self.pny_lst[index], self.am_vocab)
- label_ctc_len = self.ctc_len(label)
- if pad_fbank.shape[0] // 8 >= label_ctc_len:
- wav_data_lst.append(pad_fbank)
- label_data_lst.append(label)
- pad_wav_data, input_length = self.wav_padding(wav_data_lst)
- pad_label_data, label_length = self.label_padding(label_data_lst)
- inputs = {'the_inputs': pad_wav_data,
- 'the_labels': pad_label_data,
- 'input_length': input_length,
- 'label_length': label_length,
- }
- outputs = {'ctc': np.zeros(pad_wav_data.shape[0], )}
- yield inputs, outputs
-
- def get_lm_batch(self):
- batch_num = len(self.pny_lst) // self.batch_size
- for k in range(batch_num):
- begin = k * self.batch_size
- end = begin + self.batch_size
- input_batch = self.pny_lst[begin:end]
- label_batch = self.han_lst[begin:end]
- max_len = max([len(line) for line in input_batch])
- input_batch = np.array(
- [self.pny2id(line, self.pny_vocab) + [0] * (max_len - len(line)) for line in input_batch])
- label_batch = np.array(
- [self.han2id(line, self.han_vocab) + [0] * (max_len - len(line)) for line in label_batch])
- yield input_batch, label_batch
-
- def pny2id(self, line, vocab):
- return [vocab.index(pny) for pny in line]
-
- def han2id(self, line, vocab):
- return [vocab.index(han) for han in line]
-
- def wav_padding(self, wav_data_lst):
- wav_lens = [len(data) for data in wav_data_lst]
- wav_max_len = max(wav_lens)
- wav_lens = np.array([leng // 8 for leng in wav_lens])
- new_wav_data_lst = np.zeros((len(wav_data_lst), wav_max_len, 200, 1))
- for i in range(len(wav_data_lst)):
- new_wav_data_lst[i, :wav_data_lst[i].shape[0], :, 0] = wav_data_lst[i]
- return new_wav_data_lst, wav_lens
-
- def label_padding(self, label_data_lst):
- label_lens = np.array([len(label) for label in label_data_lst])
- max_label_len = max(label_lens)
- new_label_data_lst = np.zeros((len(label_data_lst), max_label_len))
- for i in range(len(label_data_lst)):
- new_label_data_lst[i][:len(label_data_lst[i])] = label_data_lst[i]
- return new_label_data_lst, label_lens
-
- def mk_am_vocab(self, data):
- vocab = []
- for line in tqdm(data):
- line = line
- for pny in line:
- if pny not in vocab:
- vocab.append(pny)
- vocab.append('_')
- return vocab
-
- def mk_lm_pny_vocab(self, data):
- vocab = ['<PAD>']
- for line in tqdm(data):
- for pny in line:
- if pny not in vocab:
- vocab.append(pny)
- return vocab
-
- def mk_lm_han_vocab(self, data):
- vocab = ['<PAD>']
- for line in tqdm(data):
- line = ''.join(line.split(' '))
- for han in line:
- if han not in vocab:
- vocab.append(han)
- return vocab
-
- def ctc_len(self, label):
- add_len = 0
- label_len = len(label)
- for i in range(label_len - 1):
- if label[i] == label[i + 1]:
- add_len += 1
- return label_len + add_len
-
- def compute_mfcc(file):
- fs, audio = wav.read(file)
- mfcc_feat = mfcc(audio, samplerate=fs, numcep=26)
- mfcc_feat = mfcc_feat[::3]
- mfcc_feat = np.transpose(mfcc_feat)
- return mfcc_feat
-
- def compute_fbank(file):
- x = np.linspace(0, 400 - 1, 400, dtype=np.int64)
- w = 0.54 - 0.46 * np.cos(2 * np.pi * (x) / (400 - 1))
- fs, wavsignal = wav.read(file)
- time_window = 25
- wav_arr = np.array(wavsignal)
- range0_end = int(len(wavsignal) / fs * 1000 - time_window) // 10 + 1
- data_input = np.zeros((range0_end, 200), dtype=np.float)
- data_line = np.zeros((1, 400), dtype=np.float)
- for i in range(0, range0_end):
- p_start = i * 160
- p_end = p_start + 400
- data_line = wav_arr[p_start:p_end]
- data_line = data_line * w
- data_line = np.abs(fft(data_line))
- data_input[i] = data_line[0:200]
- data_input = np.log(data_input + 1)
- return data_input
-
-
- # word error rate------------------------------------
- def GetEditDistance(str1, str2):
- leven_cost = 0
- s = difflib.SequenceMatcher(None, str1, str2)
- for tag, i1, i2, j1, j2 in s.get_opcodes():
- if tag == 'replace':
- leven_cost += max(i2-i1, j2-j1)
- elif tag == 'insert':
- leven_cost += (j2-j1)
- elif tag == 'delete':
- leven_cost += (i2-i1)
- return leven_cost
-
- # 解码器------------------------------------
- def decode_ctc(num_result, num2word):
- result = num_result[:, :, :]
- in_len = np.zeros((1), dtype = np.int32)
- in_len[0] = result.shape[1]
- r = K.ctc_decode(result, in_len, greedy = True, beam_width=10, top_paths=1)
- r1 = K.get_value(r[0][0])
- r1 = r1[0]
- text = []
- for i in r1:
- text.append(num2word[i])
- return r1, text
声学模型
- from keras.layers import Input, Conv2D, BatchNormalization, MaxPooling2D
- from keras.layers import Reshape, Dense, Dropout, Lambda
- from keras.optimizers import Adam
- from keras import backend as K
- from keras.models import Model
- from keras.utils import multi_gpu_model
- import tensorflow as tf
-
- def am_hparams():
- params = tf.contrib.training.HParams(
-
- vocab_size=50,
- lr=0.0008,
- gpu_nums=1,
- is_training=True)
- return params
-
-
- # =============================搭建模型====================================
- class Am():
- """docstring for Amodel."""
- def __init__(self, args):
- self.vocab_size = args.vocab_size
- self.gpu_nums = args.gpu_nums
- self.lr = args.lr
- self.is_training = args.is_training
- self._model_init()
- if self.is_training:
- self._ctc_init()
- self.opt_init()
-
- def _model_init(self):
- self.inputs = Input(name='the_inputs', shape=(None, 200, 1))
- self.h1 = cnn_cell(32, self.inputs)
- self.h2 = cnn_cell(64, self.h1)
- self.h3 = cnn_cell(128, self.h2)
- self.h4 = cnn_cell(128, self.h3, pool=False)
- self.h5 = cnn_cell(128, self.h4, pool=False)
- # 200 / 8 * 128 = 3200
- self.h6 = Reshape((-1, 3200))(self.h5)
- self.h6 = Dropout(0.2)(self.h6)
- self.h7 = dense(256)(self.h6)
- self.h7 = Dropout(0.2)(self.h7)
- self.outputs = dense(self.vocab_size, activation='softmax')(self.h7)
- self.model = Model(inputs=self.inputs, outputs=self.outputs)
- self.model.summary()
-
- def _ctc_init(self):
- self.labels = Input(name='the_labels', shape=[None], dtype='float32')
- self.input_length = Input(name='input_length', shape=[1], dtype='int64')
- self.label_length = Input(name='label_length', shape=[1], dtype='int64')
- self.loss_out = Lambda(ctc_lambda, output_shape=(1,), name='ctc')\
- ([self.labels, self.outputs, self.input_length, self.label_length])
- self.ctc_model = Model(inputs=[self.labels, self.inputs,
- self.input_length, self.label_length], outputs=self.loss_out)
-
- def opt_init(self):
- opt = Adam(lr = self.lr, beta_1 = 0.9, beta_2 = 0.999, decay = 0.01, epsilon = 10e-8)
- if self.gpu_nums > 1:
- self.ctc_model=multi_gpu_model(self.ctc_model,gpus=self.gpu_nums)
- self.ctc_model.compile(loss={'ctc': lambda y_true, output: output}, optimizer=opt)
-
- # ============================模型组件=================================
- def conv2d(size):
- return Conv2D(size, (3,3), use_bias=True, activation='relu',
- padding='same', kernel_initializer='he_normal')
-
-
- def norm(x):
- return BatchNormalization(axis=-1)(x)
-
-
- def maxpool(x):
- return MaxPooling2D(pool_size=(2,2), strides=None, padding="valid")(x)
-
- def dense(units, activation="relu"):
- return Dense(units, activation=activation, use_bias=True,
- kernel_initializer='he_normal')
-
-
- # x.shape=(none, none, none)
- # output.shape = (1/2, 1/2, 1/2)
- def cnn_cell(size, x, pool=True):
- x = norm(conv2d(size)(x))
- x = norm(conv2d(size)(x))
- if pool:
- x = maxpool(x)
- return x
-
- def ctc_lambda(args):
- labels, y_pred, input_length, label_length = args
- y_pred = y_pred[:, :, :]
- return K.ctc_batch_cost(labels, y_pred, input_length, label_length)
语言模型
- import tensorflow as tf
- import numpy as np
- def normalize(inputs,
- epsilon = 1e-8,
- scope="ln",
- reuse=None):
-
- with tf.variable_scope(scope, reuse=reuse):
- inputs_shape = inputs.get_shape()
- params_shape = inputs_shape[-1:]
-
- mean, variance = tf.nn.moments(inputs, [-1], keep_dims=True)
- beta= tf.Variable(tf.zeros(params_shape))
- gamma = tf.Variable(tf.ones(params_shape))
- normalized = (inputs - mean) / ( (variance + epsilon) ** (.5) )
- outputs = gamma * normalized + beta
- return outputs
-
- def embedding(inputs,
- vocab_size,
- num_units,
- zero_pad=True,
- scale=True,
- scope="embedding",
- reuse=None):
-
- with tf.variable_scope(scope, reuse=reuse):
- lookup_table = tf.get_variable('lookup_table',
- dtype=tf.float32,
- shape=[vocab_size, num_units],
- initializer=tf.contrib.layers.xavier_initializer())
- if zero_pad:
- lookup_table = tf.concat((tf.zeros(shape=[1, num_units]), lookup_table[1:, :]), 0)
- outputs = tf.nn.embedding_lookup(lookup_table, inputs)
-
- if scale:
- outputs = outputs * (num_units ** 0.5)
-
- return outputs
-
-
- def multihead_attention(emb,
- queries,
- keys,
- num_units=None,
- num_heads=8,
- dropout_rate=0,
- is_training=True,
- causality=False,
- scope="multihead_attention",
- reuse=None):
-
- with tf.variable_scope(scope, reuse=reuse):
- # Set the fall back option for num_units
- if num_units is None:
- num_units = queries.get_shape().as_list[-1]
-
- # Linear projections
- Q = tf.layers.dense(queries, num_units, activation=tf.nn.relu) # (N, T_q, C)
- K = tf.layers.dense(keys, num_units, activation=tf.nn.relu) # (N, T_k, C)
- V = tf.layers.dense(keys, num_units, activation=tf.nn.relu) # (N, T_k, C)
-
- # Split and concat
- Q_ = tf.concat(tf.split(Q, num_heads, axis=2), axis=0) # (h*N, T_q, C/h)
- K_ = tf.concat(tf.split(K, num_heads, axis=2), axis=0) # (h*N, T_k, C/h)
- V_ = tf.concat(tf.split(V, num_heads, axis=2), axis=0) # (h*N, T_k, C/h)
-
- # Multiplication
- outputs = tf.matmul(Q_, tf.transpose(K_, [0, 2, 1])) # (h*N, T_q, T_k)
-
- # Scale
- outputs = outputs / (K_.get_shape().as_list()[-1] ** 0.5)
-
- # Key Masking
- key_masks = tf.sign(tf.abs(tf.reduce_sum(emb, axis=-1))) # (N, T_k)
- key_masks = tf.tile(key_masks, [num_heads, 1]) # (h*N, T_k)
- key_masks = tf.tile(tf.expand_dims(key_masks, 1), [1, tf.shape(queries)[1], 1]) # (h*N, T_q, T_k)
-
- paddings = tf.ones_like(outputs)*(-2**32+1)
- outputs = tf.where(tf.equal(key_masks, 0), paddings, outputs) # (h*N, T_q, T_k)
-
- # Causality = Future blinding
- if causality:
- diag_vals = tf.ones_like(outputs[0, :, :]) # (T_q, T_k)
- tril = tf.contrib.linalg.LinearOperatorTriL(diag_vals).to_dense() # (T_q, T_k)
- masks = tf.tile(tf.expand_dims(tril, 0), [tf.shape(outputs)[0], 1, 1]) # (h*N, T_q, T_k)
-
- paddings = tf.ones_like(masks)*(-2**32+1)
- outputs = tf.where(tf.equal(masks, 0), paddings, outputs) # (h*N, T_q, T_k)
-
- # Activation
- outputs = tf.nn.softmax(outputs) # (h*N, T_q, T_k)
-
- # Query Masking
- query_masks = tf.sign(tf.abs(tf.reduce_sum(emb, axis=-1))) # (N, T_q)
- query_masks = tf.tile(query_masks, [num_heads, 1]) # (h*N, T_q)
- query_masks = tf.tile(tf.expand_dims(query_masks, -1), [1, 1, tf.shape(keys)[1]]) # (h*N, T_q, T_k)
- outputs *= query_masks # broadcasting. (N, T_q, C)
-
- # Dropouts
- outputs = tf.layers.dropout(outputs, rate=dropout_rate, training=tf.convert_to_tensor(is_training))
-
- # Weighted sum
- outputs = tf.matmul(outputs, V_) # ( h*N, T_q, C/h)
-
- # Restore shape
- outputs = tf.concat(tf.split(outputs, num_heads, axis=0), axis=2 ) # (N, T_q, C)
-
- # Residual connection
- outputs += queries
-
- # Normalize
- outputs = normalize(outputs) # (N, T_q, C)
-
- return outputs
-
-
- def feedforward(inputs,
- num_units=[2048, 512],
- scope="multihead_attention",
- reuse=None):
-
- with tf.variable_scope(scope, reuse=reuse):
- # Inner layer
- params = {"inputs": inputs, "filters": num_units[0], "kernel_size": 1,
- "activation": tf.nn.relu, "use_bias": True}
- outputs = tf.layers.conv1d(**params)
-
- # Readout layer
- params = {"inputs": outputs, "filters": num_units[1], "kernel_size": 1,
- "activation": None, "use_bias": True}
- outputs = tf.layers.conv1d(**params)
-
- # Residual connection
- outputs += inputs
-
- # Normalize
- outputs = normalize(outputs)
-
- return outputs
-
-
- def label_smoothing(inputs, epsilon=0.1):
-
- K = inputs.get_shape().as_list()[-1] # number of channels
- return ((1-epsilon) * inputs) + (epsilon / K)
-
-
- class Lm():
- '''
- # 语言模型
- '''
- def __init__(self, arg):
- self.graph = tf.Graph()
- with self.graph.as_default():
- self.is_training = arg.is_training
- self.hidden_units = arg.hidden_units
- self.input_vocab_size = arg.input_vocab_size
- self.label_vocab_size = arg.label_vocab_size
- self.num_heads = arg.num_heads
- self.num_blocks = arg.num_blocks
- self.max_length = arg.max_length
- self.lr = arg.lr
- self.dropout_rate = arg.dropout_rate
-
- # input
- self.x = tf.placeholder(tf.int32, shape=(None, None))
- self.y = tf.placeholder(tf.int32, shape=(None, None))
- # embedding
- self.emb = embedding(self.x, vocab_size=self.input_vocab_size, num_units=self.hidden_units, scale=True, scope="enc_embed")
- # 编码模块
- self.enc = self.emb + embedding(tf.tile(tf.expand_dims(tf.range(tf.shape(self.x)[1]), 0), [tf.shape(self.x)[0], 1]), vocab_size=self.max_length,num_units=self.hidden_units, zero_pad=False, scale=False,scope="enc_pe")
- ## Dropout
- self.enc = tf.layers.dropout(self.enc,
- rate=self.dropout_rate,
- training=tf.convert_to_tensor(self.is_training))
-
- ## Blocks
- for i in range(self.num_blocks):
- with tf.variable_scope("num_blocks_{}".format(i)):
- ### Multihead Attention
- self.enc = multihead_attention(emb = self.emb,
- queries=self.enc,
- keys=self.enc,
- num_units=self.hidden_units,
- num_heads=self.num_heads,
- dropout_rate=self.dropout_rate,
- is_training=self.is_training,
- causality=False)
-
- ### Feed Forward
- self.outputs = feedforward(self.enc, num_units=[4*self.hidden_units, self.hidden_units])
-
- # Final linear projection
- self.logits = tf.layers.dense(self.outputs, self.label_vocab_size)
- self.preds = tf.to_int32(tf.argmax(self.logits, axis=-1))
- self.istarget = tf.to_float(tf.not_equal(self.y, 0))
- self.acc = tf.reduce_sum(tf.to_float(tf.equal(self.preds, self.y))*self.istarget)/ (tf.reduce_sum(self.istarget))
- tf.summary.scalar('acc', self.acc)
-
- if self.is_training:
- # Loss
- self.y_smoothed = label_smoothing(tf.one_hot(self.y, depth=self.label_vocab_size))
- self.loss = tf.nn.softmax_cross_entropy_with_logits_v2(logits=self.logits, labels=self.y_smoothed)
- self.mean_loss = tf.reduce_sum(self.loss*self.istarget) / (tf.reduce_sum(self.istarget))
-
- # Training Scheme
- self.global_step = tf.Variable(0, name='global_step', trainable=False)
- self.optimizer = tf.train.AdamOptimizer(learning_rate=self.lr, beta1=0.9, beta2=0.98, epsilon=1e-8)
- self.train_op = self.optimizer.minimize(self.mean_loss, global_step=self.global_step)
-
- # Summary
- tf.summary.scalar('mean_loss', self.mean_loss)
- self.merged = tf.summary.merge_all()
-
-
- def lm_hparams():
- params = tf.contrib.training.HParams(
- num_heads = 8,
- num_blocks = 6,
- # vocab
- input_vocab_size = 50,
- label_vocab_size = 50,
- # embedding size
- max_length = 100,
- hidden_units = 512,
- dropout_rate = 0.2,
- lr = 0.0003,
- is_training = True)
- return params
模型测试代码
- #coding=utf-8
- import tensorflow as tf
- import numpy as np
- from utils import decode_ctc, GetEditDistance
-
-
- # 0.准备解码所需字典,参数需和训练一致,也可以将字典保存到本地,直接进行读取
- from utils import get_data, data_hparams
- data_args = data_hparams()
- train_data = get_data(data_args)
-
-
- # 1.声学模型-----------------------------------
- from model_speech.cnn_ctc import Am, am_hparams
-
- am_args = am_hparams()
- am_args.vocab_size = len(train_data.am_vocab)
- am = Am(am_args)
- print('loading acoustic model...')
- am.ctc_model.load_weights('logs_am/model.h5')
-
- # 2.语言模型-------------------------------------------
- from model_language.transformer import Lm, lm_hparams
-
- lm_args = lm_hparams()
- lm_args.input_vocab_size = len(train_data.pny_vocab)
- lm_args.label_vocab_size = len(train_data.han_vocab)
- lm_args.dropout_rate = 0.
- print('loading language model...')
- lm = Lm(lm_args)
- sess = tf.Session(graph=lm.graph)
- with lm.graph.as_default():
- saver =tf.train.Saver()
- with sess.as_default():
- latest = tf.train.latest_checkpoint('logs_lm')
- saver.restore(sess, latest)
-
- # 3. 准备测试所需数据, 不必和训练数据一致,通过设置data_args.data_type测试,
- # 此处应设为'test',我用了'train'因为演示模型较小,如果使用'test'看不出效果,
- # 且会出现未出现的词。
- data_args.data_type = 'train'
- data_args.shuffle = False
- data_args.batch_size = 1
- test_data = get_data(data_args)
-
- # 4. 进行测试-------------------------------------------
- am_batch = test_data.get_am_batch()
- word_num = 0
- word_error_num = 0
- for i in range(8):
- print('\n the ', i, 'th example.')
-
- inputs, _ = next(am_batch)
- x = inputs['the_inputs']
- y = test_data.pny_lst[i]
- result = am.model.predict(x, steps=1)
-
- _, text = decode_ctc(result, train_data.am_vocab)
- text = ' '.join(text)
- print('文本结果:', text)
- print('原文结果:', ' '.join(y))
- with sess.as_default():
- text = text.strip('\n').split(' ')
- x = np.array([train_data.pny_vocab.index(pny) for pny in text])
- x = x.reshape(1, -1)
- preds = sess.run(lm.preds, {lm.x: x})
- label = test_data.han_lst[i]
- got = ''.join(train_data.han_vocab[idx] for idx in preds[0])
- print('原文汉字:', label)
- print('识别结果:', got)
- word_error_num += min(len(label), GetEditDistance(label, got))
- word_num += len(label)
- print('词错误率:', word_error_num / word_num)
- sess.close()
模型测试结果
- the 0 th example.
- 文本结果: lv4 shi4 yang2 chun1 yan1 jing3 da4 kuai4 wen2 zhang1 de di3 se4 si4 yue4 de lin2 luan2 geng4 shi4 lv4 de2 xian1 huo2 xiu4 mei4 shi1 yi4 ang4 ran2
- 原文结果: lv4 shi4 yang2 chun1 yan1 jing3 da4 kuai4 wen2 zhang1 de di3 se4 si4 yue4 de lin2 luan2 geng4 shi4 lv4 de2 xian1 huo2 xiu4 mei4 shi1 yi4 ang4 ran2
- 原文汉字: 绿是阳春烟景大块文章的底色四月的林峦更是绿得鲜活秀媚诗意盎然
- 识别结果: 绿是阳春烟景大块文章的底色四月的林峦更是绿得鲜活秀媚诗意盎然
-
- the 1 th example.
- 文本结果: ta1 jin3 ping2 yao1 bu4 de li4 liang4 zai4 yong3 dao4 shang4 xia4 fan1 teng2 yong3 dong4 she2 xing2 zhuang4 ru2 hai3 tun2 yi4 zhi2 yi3 yi1 tou2 de you1 shi4 ling3 xian1
- 原文结果: ta1 jin3 ping2 yao1 bu4 de li4 liang4 zai4 yong3 dao4 shang4 xia4 fan1 teng2 yong3 dong4 she2 xing2 zhuang4 ru2 hai3 tun2 yi4 zhi2 yi3 yi1 tou2 de you1 shi4 ling3 xian1
- 原文汉字: 他仅凭腰部的力量在泳道上下翻腾蛹动蛇行状如海豚一直以一头的优势领先
- 识别结果: 他仅凭腰部的力量在蛹道上下翻腾蛹动蛇行状如海豚一直以一头的优势领先
-
- the 2 th example.
- 文本结果: qi3 ye4 yi1 kao4 ji4 shu4 wa1 qian2 zeng1 xiao4 ta1 fu4 ze2 quan2 chang3 chan3 pin3 zhi4 liang4 yu3 ji4 shu4 pei2 xun4 cheng2 le chang3 li3 de da4 mang2 ren2
- 原文结果: qi3 ye4 yi1 kao4 ji4 shu4 wa1 qian2 zeng1 xiao4 ta1 fu4 ze2 quan2 chang3 chan3 pin3 zhi4 liang4 yu3 ji4 shu4 pei2 xun4 cheng2 le chang3 li3 de da4 mang2 ren2
- 原文汉字: 企业依靠技术挖潜增效他负责全厂产品质量与技术培训成了厂里的大忙人
- 识别结果: 企业依靠技术挖潜增效他负责全厂产品质量与技术培训成了厂里的大忙人
-
- the 3 th example.
- 文本结果: cai4 zuo4 hao3 le yi1 wan3 qing1 zheng1 wu3 chang1 yu2 yi1 wan3 fan1 jia1 chao3 ji1 dan4 yi1 wan3 zha4 cai4 gan4 zi chao3 rou4 si1
- 原文结果: cai4 zuo4 hao3 le yi1 wan3 qing1 zheng1 wu3 chang1 yu2 yi1 wan3 fan1 jia1 chao3 ji1 dan4 yi1 wan3 zha4 cai4 gan4 zi chao3 rou4 si1
- 原文汉字: 菜做好了一碗清蒸武昌鱼一碗蕃茄炒鸡蛋一碗榨菜干子炒肉丝
- 识别结果: 菜做好了一碗清蒸武昌鱼一碗蕃茄炒鸡蛋一碗榨菜干子炒肉丝
-
- the 4 th example.
- 文本结果: ta1 kan4 kan4 ye4 ji3 hen3 shen1 bai2 tian1 de yan2 re4 yi3 gei3 ye4 liang2 chui1 san4 fen1 fu4 da4 jia1 ge4 zi4 an1 xi1 ming2 tian1 ji4 xu4 wan2 le4
- 原文结果: ta1 kan4 kan4 ye4 ji3 hen3 shen1 bai2 tian1 de yan2 re4 yi3 gei3 ye4 liang2 chui1 san4 fen1 fu4 da4 jia1 ge4 zi4 an1 xi1 ming2 tian1 ji4 xu4 wan2 le4
- 原文汉字: 她看看夜己很深白天的炎热已给夜凉吹散吩咐大家各自安息明天继续玩乐
- 识别结果: 她看看夜己很深白天的炎热已给夜凉吹散吩咐大家各自安息明天继续玩乐
项目文件的下载地址为:DeepSpeechRecognition.rar-深度学习文档类资源-CSDN下载
数据为 data文件夹,将data文件夹解压后放置在项目文件的根目录下即可以运行项目程序
数据的下载地址为:深度学习语音识别数据集data.rar-深度学习文档类资源-CSDN下载
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。