赞
踩
此博客是基于华为云中的DFCNN_Transformer的教程进行的学习和实践。本文将介绍一个结合了深度全卷积网络(DFCNN)和Transformer的模型——DFCNN-Transformer,旨在提高中文语音识别的准确性和效率。
注意:
该代码主要改进之处为将原先的TensorFlow-1.13.1版本的代码改进为TensorFlow-2.0+版本。以方便大家进行代码的实践。
所需数据已放在博客中,可自行下载。
首先加载需要的python库
import numpy as np import scipy.io.wavfile as wav import matplotlib.pyplot as plt import keras import tensorflow as tf tf.compat.v1.disable_eager_execution() tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR) from keras.layers import Input, Conv2D, BatchNormalization, MaxPooling2D from keras.layers import Reshape, Dense, Dropout, Lambda from keras.optimizers import Adam from keras import backend as K from keras.models import Model import os os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' import warnings warnings.filterwarnings("ignore")
定义声学模型
定义层函数:
CTC损失函数:
声学模型类:
在_model_init方法中:
_ctc_init 方法:
定义了三个额外的输入层:labels、input_length和label_length,这些都是CTC损失函数所需要的。然后,使用Lambda层来应用之前定义的ctc_lambda函数,计算CTC损失。最后,创建了一个新的模型self.ctc_model,该模型将这四个输入(标签、原始输入、输入长度和标签长度)作为输入,并将CTC损失作为输出。
opt_init 方法:
创建了一个Adam优化器实例,并使用它来编译self.ctc_model。注意,在定义损失函数时,使用了一个lambda函数,该函数简单地返回了由Lambda层计算出的CTC损失。这是因为在Lambda层中,已经指定了如何计算损失,所以在这里只需要将输出作为损失即可。
#定义卷积层 def conv2d(size): return Conv2D(size, (3,3), use_bias=True, activation='relu', padding='same', kernel_initializer='he_normal') #定义BN层 def norm(x): return BatchNormalization(axis=-1)(x) #定义最大池化层 def maxpool(x): return MaxPooling2D(pool_size=(2,2), strides=None, padding="valid")(x) #定义dense层 def dense(units, activation="relu"): return Dense(units, activation=activation, use_bias=True, kernel_initializer='he_normal') #两个卷积层加一个最大池化层的组合 def cnn_cell(size, x, pool=True): x = norm(conv2d(size)(x)) x = norm(conv2d(size)(x)) if pool: x = maxpool(x) return x #CTC损失函数 def ctc_lambda(args): labels, y_pred, input_length, label_length = args y_pred = y_pred[:, :, :] return K.ctc_batch_cost(labels, y_pred, input_length, label_length) #组合声学模型 class acoustic_model(): def __init__(self,vocab_size): self.vocab_size = vocab_size self.learning_rate = 0.0008 self.is_training = True self._model_init() if self.is_training: self._ctc_init() self.opt_init() def _model_init(self): self.inputs = Input(name='the_inputs', shape=(None, 200, 1)) self.h1 = cnn_cell(32, self.inputs) self.h2 = cnn_cell(64, self.h1) self.h3 = cnn_cell(128, self.h2) self.h4 = cnn_cell(128, self.h3, pool=False) self.h5 = cnn_cell(128, self.h4, pool=False) # 200 / 8 * 128 = 3200 self.h6 = Reshape((-1, 3200))(self.h5) self.h6 = Dropout(0.2)(self.h6) self.h7 = dense(256)(self.h6) self.h7 = Dropout(0.2)(self.h7) self.outputs = dense(self.vocab_size, activation='softmax')(self.h7) self.model = Model(inputs=self.inputs, outputs=self.outputs) def _ctc_init(self): self.labels = Input(name='the_labels', shape=[None], dtype='float32') self.input_length = Input(name='input_length', shape=[1], dtype='int64') self.label_length = Input(name='label_length', shape=[1], dtype='int64') self.loss_out = Lambda(ctc_lambda, output_shape=(1,), name='ctc')\ ([self.labels, self.outputs, self.input_length, self.label_length]) self.ctc_model = Model(inputs=[self.labels, self.inputs, self.input_length, self.label_length], outputs=self.loss_out) def opt_init(self): opt = tf.keras.optimizers.legacy.Adam(learning_rate = self.learning_rate, beta_1 = 0.9, beta_2 = 0.999, decay = 0.01, epsilon = 10e-8) self.ctc_model.compile(loss={'ctc': lambda y_true, output: output}, optimizer=opt) acoustic = acoustic_model(vocab_size=50)
获取数据
compute_fbank 函数:该函数旨在从WAV文件中提取特征。
get_data 类:该类用于管理数据集,包括WAV文件列表、对应的拼音和汉字标签。
from scipy.fftpack import fft # 获取信号的时频图 def compute_fbank(file): x=np.linspace(0, 400 - 1, 400, dtype = np.int64) w = 0.54 - 0.46 * np.cos(2 * np.pi * (x) / (400 - 1) ) fs, wavsignal = wav.read(file) time_window = 25 window_length = fs / 1000 * time_window wav_arr = np.array(wavsignal) wav_length = len(wavsignal) range0_end = int(len(wavsignal)/fs*1000 - time_window) // 10 data_input = np.zeros((range0_end, 200), dtype = np.float) data_line = np.zeros((1, 400), dtype = np.float) for i in range(0, range0_end): p_start = i * 160 p_end = p_start + 400 data_line = wav_arr[p_start:p_end] data_line = data_line * w data_line = np.abs(fft(data_line)) data_input[i]=data_line[0:200] data_input = np.log(data_input + 1) return data_input class get_data(): def __init__(self): self.data_path = './speech_recognition/data/' self.data_length = 20 self.batch_size = 1 self.source_init() def source_init(self): self.wav_lst = [] self.pin_lst = [] self.han_lst = [] with open('speech_recognition/data.txt', 'r', encoding='utf8') as f: data = f.readlines() for line in data: wav_file, pin, han = line.split('\t') self.wav_lst.append(wav_file) self.pin_lst.append(pin.split(' ')) self.han_lst.append(han.strip('\n')) if self.data_length: self.wav_lst = self.wav_lst[:self.data_length] self.pin_lst = self.pin_lst[:self.data_length] self.han_lst = self.han_lst[:self.data_length] self.acoustic_vocab = self.acoustic_model_vocab(self.pin_lst) self.pin_vocab = self.language_model_pin_vocab(self.pin_lst) self.han_vocab = self.language_model_han_vocab(self.han_lst) def get_acoustic_model_batch(self): _list = [i for i in range(len(self.wav_lst))] while 1: for i in range(len(self.wav_lst) // self.batch_size): wav_data_lst = [] label_data_lst = [] begin = i * self.batch_size end = begin + self.batch_size sub_list = _list[begin:end] for index in sub_list: fbank = compute_fbank(self.data_path + self.wav_lst[index]) pad_fbank = np.zeros((fbank.shape[0] // 8 * 8 + 8, fbank.shape[1])) pad_fbank[:fbank.shape[0], :] = fbank label = self.pin2id(self.pin_lst[index], self.acoustic_vocab) label_ctc_len = self.ctc_len(label) if pad_fbank.shape[0] // 8 >= label_ctc_len: wav_data_lst.append(pad_fbank) label_data_lst.append(label) pad_wav_data, input_length = self.wav_padding(wav_data_lst) pad_label_data, label_length = self.label_padding(label_data_lst) inputs = {'the_inputs': pad_wav_data, 'the_labels': pad_label_data, 'input_length': input_length, 'label_length': label_length, } outputs = {'ctc': np.zeros(pad_wav_data.shape[0], )} yield inputs, outputs def get_language_model_batch(self): batch_num = len(self.pin_lst) // self.batch_size for k in range(batch_num): begin = k * self.batch_size end = begin + self.batch_size input_batch = self.pin_lst[begin:end] label_batch = self.han_lst[begin:end] max_len = max([len(line) for line in input_batch]) input_batch = np.array( [self.pin2id(line, self.pin_vocab) + [0] * (max_len - len(line)) for line in input_batch]) label_batch = np.array( [self.han2id(line, self.han_vocab) + [0] * (max_len - len(line)) for line in label_batch]) yield input_batch, label_batch def pin2id(self, line, vocab): return [vocab.index(pin) for pin in line] def han2id(self, line, vocab): return [vocab.index(han) for han in line] def wav_padding(self, wav_data_lst): wav_lens = [len(data) for data in wav_data_lst] wav_max_len = max(wav_lens) wav_lens = np.array([leng // 8 for leng in wav_lens]) new_wav_data_lst = np.zeros((len(wav_data_lst), wav_max_len, 200, 1)) for i in range(len(wav_data_lst)): new_wav_data_lst[i, :wav_data_lst[i].shape[0], :, 0] = wav_data_lst[i] return new_wav_data_lst, wav_lens def label_padding(self, label_data_lst): label_lens = np.array([len(label) for label in label_data_lst]) max_label_len = max(label_lens) new_label_data_lst = np.zeros((len(label_data_lst), max_label_len)) for i in range(len(label_data_lst)): new_label_data_lst[i][:len(label_data_lst[i])] = label_data_lst[i] return new_label_data_lst, label_lens def acoustic_model_vocab(self, data): vocab = [] for line in data: line = line for pin in line: if pin not in vocab: vocab.append(pin) vocab.append('_') return vocab def language_model_pin_vocab(self, data): vocab = ['<PAD>'] for line in data: for pin in line: if pin not in vocab: vocab.append(pin) return vocab def language_model_han_vocab(self, data): vocab = ['<PAD>'] for line in data: line = ''.join(line.split(' ')) for han in line: if han not in vocab: vocab.append(han) return vocab def ctc_len(self, label): add_len = 0 label_len = len(label) for i in range(label_len - 1): if label[i] == label[i + 1]: add_len += 1 return label_len + add_len
准备训练参数及数据
为了本示例演示效果,参数batch_size在此仅设置为1,参数data_length在此仅设置为20。
若进行完整训练,则应注释data_args.data_length = 20,并调高batch_size。
train_data = get_data()
vocab_size = len(train_data.acoustic_vocab)
acoustic = acoustic_model(vocab_size)
if os.path.exists('/speech_recognition/acoustic_model/model.h5'):
print('加载声学模型')
acoustic.ctc_model.load_weights('./speech_recognition/acoustic_model/model.h5')
epochs = 20
batch_num = len(train_data.wav_lst) // train_data.batch_size
print("开始训练!")
for k in range(epochs):
print('第', k+1, '个epoch')
batch = train_data.get_acoustic_model_batch()
acoustic.ctc_model.fit_generator(batch, steps_per_epoch=batch_num, epochs=1)
print("\n训练完成,保存模型")
acoustic.ctc_model.save_weights('./speech_recognition/acoustic_model/model.h5')
使用 Transformer 结构进行语言模型的建模。
normalize 函数:实现了批量归一化(Batch Normalization),用于在训练过程中标准化神经网络的输入,使其具有零均值和单位方差。它接受输入张量inputs,并返回归一化后的输出。
embedding 函数:定义了一个词嵌入层,用于将输入的整数ID(通常代表单词或符号)转换为固定大小的密集向量(词嵌入)。
def normalize(inputs,
epsilon = 1e-8,
scope="ln",
reuse=None):
with tf.compat.v1.variable_scope(scope, reuse=reuse):
inputs_shape = inputs.get_shape()
params_shape = inputs_shape[-1:]
mean, variance = tf.compat.v1.nn.moments(inputs, [-1], keep_dims=True)
beta= tf.Variable(tf.zeros(params_shape))
gamma = tf.Variable(tf.ones(params_shape))
normalized = (inputs - mean) / ( (variance + epsilon) ** (.5) )
outputs = gamma * normalized + beta
return outputs
def embedding(inputs, vocab_size, num_units, zero_pad=True, scale=True, scope="embedding", reuse=None): with tf.compat.v1.variable_scope(scope, reuse=reuse): lookup_table = tf.compat.v1.get_variable('lookup_table', dtype=tf.float32, shape=[vocab_size, num_units], initializer=tf.compat.v1.keras.initializers.glorot_normal) if zero_pad: lookup_table = tf.concat((tf.zeros(shape=[1, num_units]), lookup_table[1:, :]), 0) outputs = tf.nn.embedding_lookup(lookup_table, inputs) if scale: outputs = outputs * (num_units ** 0.5) return outputs
def multihead_attention(emb, queries, keys, num_units=None, num_heads=8, dropout_rate=0, is_training=True, causality=False, scope="multihead_attention", reuse=None): with tf.compat.v1.variable_scope(scope, reuse=reuse): if num_units is None: num_units = queries.get_shape().as_list[-1] Q = tf.compat.v1.layers.dense(queries, num_units, activation=tf.nn.relu) # (N, T_q, C) K = tf.compat.v1.layers.dense(keys, num_units, activation=tf.nn.relu) # (N, T_k, C) V = tf.compat.v1.layers.dense(keys, num_units, activation=tf.nn.relu) # (N, T_k, C) Q_ = tf.concat(tf.split(Q, num_heads, axis=2), axis=0) # (h*N, T_q, C/h) K_ = tf.concat(tf.split(K, num_heads, axis=2), axis=0) # (h*N, T_k, C/h) V_ = tf.concat(tf.split(V, num_heads, axis=2), axis=0) # (h*N, T_k, C/h) outputs = tf.matmul(Q_, tf.transpose(K_, [0, 2, 1])) # (h*N, T_q, T_k) outputs = outputs / (K_.get_shape().as_list()[-1] ** 0.5) key_masks = tf.sign(tf.abs(tf.reduce_sum(emb, axis=-1))) # (N, T_k) key_masks = tf.tile(key_masks, [num_heads, 1]) # (h*N, T_k) key_masks = tf.tile(tf.expand_dims(key_masks, 1), [1, tf.shape(queries)[1], 1]) # (h*N, T_q, T_k) paddings = tf.ones_like(outputs) * (-2 ** 32 + 1) outputs = tf.where(tf.equal(key_masks, 0), paddings, outputs) # (h*N, T_q, T_k) if causality: diag_vals = tf.ones_like(outputs[0, :, :]) # (T_q, T_k) tril = tf.contrib.linalg.LinearOperatorTriL(diag_vals).to_dense() # (T_q, T_k) masks = tf.tile(tf.expand_dims(tril, 0), [tf.shape(outputs)[0], 1, 1]) # (h*N, T_q, T_k) paddings = tf.ones_like(masks) * (-2 ** 32 + 1) outputs = tf.where(tf.equal(masks, 0), paddings, outputs) # (h*N, T_q, T_k) outputs = tf.nn.softmax(outputs) # (h*N, T_q, T_k) query_masks = tf.sign(tf.abs(tf.reduce_sum(emb, axis=-1))) # (N, T_q) query_masks = tf.tile(query_masks, [num_heads, 1]) # (h*N, T_q) query_masks = tf.tile(tf.expand_dims(query_masks, -1), [1, 1, tf.shape(keys)[1]]) # (h*N, T_q, T_k) outputs *= query_masks # broadcasting. (N, T_q, C) outputs = tf.compat.v1.layers.dropout(outputs, rate=dropout_rate, training=tf.convert_to_tensor(is_training)) outputs = tf.matmul(outputs, V_) # ( h*N, T_q, C/h) outputs = tf.concat(tf.split(outputs, num_heads, axis=0), axis=2) # (N, T_q, C) outputs += queries outputs = normalize(outputs) # (N, T_q, C) return outputs
def feedforward(inputs, num_units=[2048, 512], scope="multihead_attention", reuse=None): with tf.compat.v1.variable_scope(scope, reuse=reuse): params = {"inputs": inputs, "filters": num_units[0], "kernel_size": 1, "activation": tf.nn.relu, "use_bias": True} outputs = tf.compat.v1.layers.conv1d(**params) params = {"inputs": outputs, "filters": num_units[1], "kernel_size": 1, "activation": None, "use_bias": True} outputs = tf.compat.v1.layers.conv1d(**params) outputs += inputs outputs = normalize(outputs) return outputs #定义 label_smoothing层¶ def label_smoothing(inputs, epsilon=0.1): K = inputs.get_shape().as_list()[-1] # number of channels return ((1-epsilon) * inputs) + (epsilon / K)
# 组合语言模型 class language_model(): def __init__(self, input_vocab_size, label_vocab_size): self.graph = tf.Graph() with self.graph.as_default(): self.is_training = True self.hidden_units = 512 self.input_vocab_size = input_vocab_size self.label_vocab_size = label_vocab_size self.num_heads = 8 self.num_blocks = 6 self.max_length = 100 self.learning_rate = 0.0003 self.dropout_rate = 0.2 self.x = tf.compat.v1.placeholder(tf.int32, shape=(None, None)) self.y = tf.compat.v1.placeholder(tf.int32, shape=(None, None)) self.emb = embedding(self.x, vocab_size=self.input_vocab_size, num_units=self.hidden_units, scale=True, scope="enc_embed") self.enc = self.emb + embedding( tf.tile(tf.expand_dims(tf.range(tf.shape(self.x)[1]), 0), [tf.shape(self.x)[0], 1]), vocab_size=self.max_length, num_units=self.hidden_units, zero_pad=False, scale=False, scope="enc_pe") self.enc = tf.compat.v1.layers.dropout(self.enc, rate=self.dropout_rate, training=tf.convert_to_tensor(self.is_training)) for i in range(self.num_blocks): with tf.compat.v1.variable_scope("num_blocks_{}".format(i)): self.enc = multihead_attention(emb=self.emb, queries=self.enc, keys=self.enc, num_units=self.hidden_units, num_heads=self.num_heads, dropout_rate=self.dropout_rate, is_training=self.is_training, causality=False) self.outputs = feedforward(self.enc, num_units=[4 * self.hidden_units, self.hidden_units]) self.logits = tf.compat.v1.layers.dense(self.outputs, self.label_vocab_size) self.preds = tf.compat.v1.to_int32(tf.argmax(self.logits, axis=-1)) self.istarget = tf.compat.v1.to_float(tf.not_equal(self.y, 0)) self.acc = tf.reduce_sum(tf.compat.v1.to_float(tf.equal(self.preds, self.y)) * self.istarget) / ( tf.reduce_sum(self.istarget)) tf.summary.scalar('acc', self.acc) if self.is_training: self.y_smoothed = label_smoothing(tf.one_hot(self.y, depth=self.label_vocab_size)) self.loss = tf.compat.v1.nn.softmax_cross_entropy_with_logits_v2(logits=self.logits, labels=self.y_smoothed) self.mean_loss = tf.reduce_sum(self.loss * self.istarget) / (tf.reduce_sum(self.istarget)) self.global_step = tf.Variable(0, name='global_step', trainable=False) self.optimizer = tf.compat.v1.train.AdamOptimizer(learning_rate=self.learning_rate, beta1=0.9, beta2=0.98, epsilon=1e-8) self.train_op = self.optimizer.minimize(self.mean_loss, global_step=self.global_step) tf.summary.scalar('mean_loss', self.mean_loss) self.merged = tf.compat.v1.summary.merge_all() print('语音模型建立完成!')
input_vocab_size = len(train_data.pin_vocab) label_vocab_size = len(train_data.han_vocab) is_training = True language = language_model(input_vocab_size,label_vocab_size) epochs = 20 with language.graph.as_default(): saver =tf.compat.v1.train.Saver() with tf.compat.v1.Session(graph=language.graph) as sess: merged = tf.compat.v1.summary.merge_all() sess.run(tf.compat.v1.global_variables_initializer()) if os.path.exists('./speech_recognition/language_model/model.meta'): print('加载语言模型') saver.restore(sess, './speech_recognition/language_model/model') for k in range(epochs): total_loss = 0 batch = train_data.get_language_model_batch() for i in range(batch_num): input_batch, label_batch = next(batch) feed = {language.x: input_batch, language.y: label_batch} cost,_ = sess.run([language.mean_loss,language.train_op], feed_dict=feed) total_loss += cost print('第', k+1, '个 epoch', ': average loss = ', total_loss/batch_num) print("\n训练完成")
准备解码所需字典,需和训练一致,也可以将字典保存到本地,直接进行读取
train_data = get_data()
test_data = get_data()
acoustic_model_batch = test_data.get_acoustic_model_batch()
language_model_batch = test_data.get_language_model_batch()
vocab_size = len(train_data.acoustic_vocab)
acoustic = acoustic_model(vocab_size)
acoustic.ctc_model.load_weights('./speech_recognition/acoustic_model/model.h5')
print('\n加载声学模型完成!')
tf.compat.v1.disable_v2_behavior()
input_vocab_size = len(train_data.pin_vocab)
label_vocab_size = len(train_data.han_vocab)
language = language_model(input_vocab_size,label_vocab_size)
sess = tf.compat.v1.Session(graph=language.graph)
with language.graph.as_default():
saver =tf.compat.v1.train.Saver()
with sess.as_default():
saver.restore(sess, './speech_recognition/language_model/model')
print('\n加载语言模型完成!')
def decode_ctc(num_result, num2word):
result = num_result[:, :, :]
in_len = np.zeros((1), dtype = np.int32)
in_len[0] = result.shape[1]
t = K.ctc_decode(result, in_len, greedy = True, beam_width=10, top_paths=1)
v = K.get_value(t[0][0])
v = v[0]
text = []
for i in v:
text.append(num2word[i])
return v, text
for i in range(10): print('\n示例', i+1) # 载入训练好的模型,并进行识别 inputs, outputs = next(acoustic_model_batch) x = inputs['the_inputs'] y = inputs['the_labels'][0] result = acoustic.model.predict(x, steps=1) # 将数字结果转化为文本结果 _, text = decode_ctc(result, train_data.acoustic_vocab) text = ' '.join(text) text = text.replace(" _", "") print('原文拼音:', ' '.join([train_data.acoustic_vocab[int(i)] for i in y])) print('识别结果:', text) with sess.as_default(): try: _, y = next(language_model_batch) text = text.strip('\n').split(' ') x = np.array([train_data.pin_vocab.index(pin) for pin in text]) x = x.reshape(1, -1) preds = sess.run(language.preds, {language.x: x}) got = ''.join(train_data.han_vocab[idx] for idx in preds[0]) print('原文汉字:', ''.join(train_data.han_vocab[idx] for idx in y[0])) print('识别结果:', got) except StopIteration: break sess.close()
原文链接:https://bbs.huaweicloud.com/blogs/386935
具体细节内容可参考原文链接进行学习。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。