赞
踩
目录
作者:嘟粥yyds
时间:2023年8月15日
集成开发工具:Jupyter Notebook 6.5.2
集成开发环境:Python 3.10.6
第三方库:tensorflow-gpu 2.10.0、librosa、matplotlib、mpl_toolkits.axes_gridl、numpy、glob、tqdm、IPython、scipy、pickle、random、os
语音识别以语音为研究对象,它是语音信号处理的一个重要研究发现,是模型识别的一个分支,涉及到生理学、心理学、语言学、计算机科学以及信号处理等诸多领域。甚至还涉及到人的体态语言,最终目标是实现人与机器进行自然语言通信。
自动语音识别(Automatic Speech Recognition,ASR)是一项将人类说话的语音转换成文本或命令的技术。它是自然语言处理(NLP)领域的一个重要分支,旨在使计算机能够理解和处理人类语音。
ASR 技术的工作过程可以简要描述如下:
ASR技术在很多领域有广泛的应用,包括但不限于:
然而,尽管ASR技术取得了巨大的进展,但在面对多种语音、口音、背景噪音等复杂情境时,识别的准确性仍然可能受到一定限制。随着深度学习和人工智能的发展,预计ASR技术将会不断进步并应用于更多领域。
ASR的输入是语音片段,输出是对应的文本内容。使用深度神经网络(Deep Neural Networks, DNN)实现ASR的一般流程如下。
一种常用的声学特征是梅尔频率倒谱系数(Mel Frequency Cepstral Coefficents,MFCC)。
将原始语言切分为小的片段后,根据每个片段计算对应的MFCC特征,即可得到一个二维数组。
其中第一个维度为小片段的个数,原始语音越长,第一个维度也越大,第二个维度为MFCC特征的维度。得到原始语音的数值表示后,就可以使用WaveNet实现ASR。
WaveNet模型结构如下所示,主要使用了多层因果空洞卷积(Causal Dilated Convocation)和Skip Connections。
由于MFCC特征为一维序列,所以使用Conv1D进行卷积。而因果是指,卷积的输出只和当前位置之前的输入有关,即不使用未来的特征,可以理解为将卷积的位置向前偏移。
空洞是指,卷积是跳跃进行的,经过多次堆叠后可以有效地扩大感受野,从而学习到长序列之间地依赖。
最后一层卷积的特征图个数和字典大小相同,经过softmax处理后,每个小片段对应的MFCC都能得到在整个字典上的概率分布。
但小片段的个数一般要大于文本内容中字的个数,即使是同一句话,每个字的持续时间和发音轻重,字之间地停顿时间,也都有无数种可能的变化。
本文使用CTC(Connectionist temporal classification)算法来计算损失函数。
本文使用以下数据,THCHS-30,包括13388条中文语音文件以及对应的文本标注。
THCHS-30是一个很经典的中文语音数据集,包含了1万余条语音文件,大约40小时的中文语音数据,内容以文章诗句为主,全部为女声。由清华大学语言与语言技术中心(CSLT)出版的开放式中文语音数据库。原创录音于2002年由朱晓燕教授在清华大学计算机科学系智能与系统重点实验室监督下进行,原名“TCMSD”,代表“清华连续”普通话语音数据库。13年后的出版由王东博士发起,并得到了朱晓燕教授的支持。他们希望为语音识别领域的新入门的研究人员提供玩具级别的数据库。因此,该数据库对学术用户完全免费。
首先安装这个项目所特需的依赖库,若还有其他依赖库未安装,则也按相同方法安装。
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple python_speech_features, librosa
- # 导入其他需要的库
- import numpy as np
- import matplotlib.pyplot as plt
- from mpl_toolkits.axes_grid1 import make_axes_locatable
- %matplotlib inline
- import random
- import pickle
- import glob
- from tqdm import tqdm
- import os
- # 导入语音处理相关的库
- from python_speech_features import mfcc
- import scipy.io.wavfile as wav
- import librosa
- from IPython.display import Audio
- # 导入所需的模块和类
- import tensorflow as tf
- from tensorflow.keras.layers import Input, Conv1D, Activation, Lambda, Add, Multiply, BatchNormalization
- from tensorflow.keras.optimizers import SGD, Adam
- from tensorflow.keras.callbacks import ModelCheckpoint, ReduceLROnPlateau, EarlyStopping
- from tensorflow.keras.models import Model
- from tensorflow.keras import backend as K
- from tensorflow.keras.backend import ctc_batch_cost
- from tensorflow.keras.utils import to_categorical, plot_model
- # 使用glob匹配所有以.trn为扩展名的文件路径
- text_paths = glob.glob('data/*.trn')
-
- # 获取匹配到的文件总数
- total = len(text_paths)
-
- # 打印总文件数
- print(total)
-
-
- # 使用with语句打开第一个匹配到的文件
- with open(text_paths[0], 'r', encoding='utf8') as fr:
- # 读取文件中的所有行并存储在lines列表中
- lines = fr.readlines()
-
- # 打印读取的行
- print(lines)
- 13388
- ['绿 是 阳春 烟 景 大块 文章 的 底色 四月 的 林 峦 更是 绿 得 鲜活 秀媚 诗意 盎然\n', 'lv4 shi4 yang2 chun1 yan1 jing3 da4 kuai4 wen2 zhang1 de5 di3 se4 si4 yue4 de5 lin2 luan2 geng4 shi4 lv4 de5 xian1 huo2 xiu4 mei4 shi1 yi4 ang4 ran2\n', 'l v4 sh ix4 ii iang2 ch un1 ii ian1 j ing3 d a4 k uai4 uu un2 zh ang1 d e5 d i3 s e4 s iy4 vv ve4 d e5 l in2 l uan2 g eng4 sh ix4 l v4 d e5 x ian1 h uo2 x iu4 m ei4 sh ix1 ii i4 aa ang4 r an2\n']
- # 初始化空列表,用于存储处理后的文本和文件路径
- texts = []
- paths = []
-
- # 遍历匹配到的文件路径
- for path in text_paths:
- # 使用with语句打开文件
- with open(path, 'r', encoding='utf8') as fr:
- # 读取文件中的所有行并存储在lines列表中
- lines = fr.readlines()
-
- # 提取第一行文本并进行处理,去除换行符和空格
- line = lines[0].strip('\n').replace(' ', '')
-
- # 将处理后的文本添加到texts列表中
- texts.append(line)
-
- # 将处理后的文件路径添加到paths列表中,去除文件扩展名
- paths.append(path.rstrip('.trn'))
-
- # 打印第一个文件路径和对应的文本内容
- print(paths[0], texts[0])
data\A11_0.wav 绿是阳春烟景大块文章的底色四月的林峦更是绿得鲜活秀媚诗意盎然
- mfcc_dim = 13
-
- def load_and_trim(path):
- audio, sr = librosa.load(path)
- energy = librosa.feature.rms(y=audio)
- frames = np.nonzero(energy >= np.max(energy) / 5)
- indices = librosa.core.frames_to_samples(frames)[1]
- audio = audio[indices[0]:indices[-1]] if indices.size else audio[0:0]
-
- return audio, sr
-
- def visualize(index):
- path = paths[index]
- text = texts[index]
- print('Audio Text:', text)
-
- audio, sr = load_and_trim(path)
- plt.figure(figsize=(12, 3))
- plt.plot(np.arange(len(audio)), audio)
- plt.title('Raw Audio Signal')
- plt.xlabel('Time')
- plt.ylabel('Audio Amplitude')
- plt.show()
-
- feature = mfcc(audio, sr, numcep=mfcc_dim, nfft=551)
- print('Shape of MFCC:', feature.shape)
-
- # Plot MFCC spectrogram with coordinates
- plt.figure(figsize=(12, 5))
- librosa.display.specshow(feature, sr=sr)
-
- plt.title('Normalized MFCC')
- plt.ylabel('Time')
- plt.xlabel('MFCC Coefficient')
- plt.colorbar(format='%+2.0f dB')
- # Manually set x-axis tick labels for MFCC coefficients
- num_coefficients = feature.shape[0]
- plt.xticks(np.arange(0, 13), np.arange(1, 13 + 1))
-
- # Manually set y-axis tick labels for time
- num_frames = feature.shape[0]
- print(num_frames)
- time_in_seconds = librosa.frames_to_time(np.arange(0, num_frames, 100), sr=sr)
- time_labels = [t for t in time_in_seconds]
- plt.yticks(np.arange(0, num_frames, 100))
-
- plt.tight_layout()
- plt.show()
-
-
- return path
-
- Audio(visualize(0))
提取音频数据的MFCC特征(大概需要5分钟左右的时间)
- features = []
-
- # 使用tqdm来显示循环进度
- for i in tqdm(range(total)):
- # 获取当前索引的音频文件路径
- path = paths[i]
-
- # 加载和修剪音频
- audio, sr = load_and_trim(path)
-
- # 计算音频的MFCC特征并添加到features列表中
- features.append(mfcc(audio, sr, numcep=mfcc_dim, nfft=551))
-
- # 打印MFCC特征的数量和第一个特征的形状
- print(len(features), features[0].shape)
- # 从特征列表中随机抽取100个样本
- samples = random.sample(features, 100)
-
- # 将样本堆叠成矩阵
- samples = np.vstack(samples)
-
- # 计算抽样样本的MFCC均值和标准差
- mfcc_mean = np.mean(samples, axis=0)
- mfcc_std = np.std(samples, axis=0)
- print(mfcc_mean)
- print(mfcc_std)
-
- # 对所有特征进行标准化
- features = [(feature - mfcc_mean) / (mfcc_std + 1e-14) for feature in features]
- chars = {}
-
- # 统计所有文本中的字符出现频次
- for text in texts:
- for c in text:
- chars[c] = chars.get(c, 0) + 1
-
- # 按字符出现频次排序
- chars = sorted(chars.items(), key=lambda x: x[1], reverse=True)
-
- # 仅保留字符列表
- chars = [char[0] for char in chars]
-
- # 打印字符数量和前100个字符
- print(len(chars), chars[:100])
-
- # 创建字符到ID的映射和ID到字符的映射
- char2id = {c: i for i, c in enumerate(chars)}
- id2char = {i: c for i, c in enumerate(chars)}
2883 ['的', '一', '有', '人', '了', '不', '为', '在', '是', '十', '用', '我', '外', '要', '也', '而', '中', '上', '二', '国', '他', '大', '和', '文', '来', '年', '子', '这', '到', '业', '生', '越', '于', '下', '地', '个', '以', '着', '家', '时', '月', '区', '出', '后', '成', '与', '五', '日', '能', '们', '多', '又', '可', '学', '王', '员', '三', '天', '行', '山', '发', '长', '运', '等', '因', '百', '同', '儿', '四', '得', '开', '里', '说', '就', '小', '会', '过', '作', '从', '去', '军', '之', '被', '种', '内', '应', '对', '样', '全', '厂', '民', '往', '然', '育', '所', '高', '方', '将', '明', '新']
- data_index = np.arange(total)
- np.random.shuffle(data_index)
- train_size = int(0.9 * total)
- test_size = total - train_size
- train_index = data_index[:train_size]
- test_index = data_index[train_size:]
-
- X_train = [features[i] for i in train_index]
- Y_train = [texts[i] for i in train_index]
- X_test = [features[i] for i in test_index]
- Y_test = [texts[i] for i in test_index]
本文设置的 batch_size=8 需要4G显存,读者可自行修改,训练50个epoch需要8小时左右。
- batch_size = 8
-
-
- def batch_generator(x, y, batch_size=batch_size):
- offset = 0
- while True:
- offset += batch_size
-
- if offset == batch_size or offset >= len(x):
- data_index = np.arange(len(x))
- np.random.shuffle(data_index)
- x = [x[i] for i in data_index]
- y = [y[i] for i in data_index]
- offset = batch_size
-
- X_data = x[offset - batch_size: offset]
- Y_data = y[offset - batch_size: offset]
-
- X_maxlen = max([X_data[i].shape[0] for i in range(batch_size)])
- Y_maxlen = max([len(Y_data[i]) for i in range(batch_size)])
-
- X_batch = np.zeros([batch_size, X_maxlen, mfcc_dim])
- Y_batch = np.ones([batch_size, Y_maxlen]) * len(char2id)
- X_length = np.zeros([batch_size, 1], dtype='int32')
- Y_length = np.zeros([batch_size, 1], dtype='int32')
-
- for i in range(batch_size):
- X_length[i, 0] = X_data[i].shape[0]
- X_batch[i, :X_length[i, 0], :] = X_data[i]
-
- Y_length[i, 0] = len(Y_data[i])
- Y_batch[i, :Y_length[i, 0]] = [char2id[c] for c in Y_data[i]]
-
- inputs = {'X': X_batch, 'Y': Y_batch, 'X_length': X_length, 'Y_length': Y_length}
- outputs = {'ctc': np.zeros([batch_size])}
-
- yield (inputs, outputs)
- # 定义自定义模块类
- class ResidualBlock(Model):
- def __init__(self, filters, kernel_size, dilation_rate):
- super(ResidualBlock, self).__init__()
- self.conv1 = Conv1D(filters=filters, kernel_size=kernel_size, strides=1, padding='causal', activation=None, dilation_rate=dilation_rate)
- self.batchnorm1 = BatchNormalization()
- self.activation_tanh = Activation('tanh')
- self.activation_sigmoid = Activation('sigmoid')
- self.conv2 = Conv1D(filters=filters, kernel_size=1, strides=1, padding='valid', activation=None)
- self.batchnorm2 = BatchNormalization()
- self.add = Add()
-
- def call(self, inputs):
- hf = self.activation_tanh(self.batchnorm1(self.conv1(inputs)))
- hg = self.activation_sigmoid(self.batchnorm1(self.conv1(inputs)))
- h0 = Multiply()([hf, hg])
-
- ha = self.activation_tanh(self.batchnorm2(self.conv2(h0)))
- hs = self.activation_tanh(self.batchnorm2(self.conv2(h0)))
-
- return self.add([ha, inputs]), hs
-
- # 定义其他函数
- def conv1d(inputs, filters, kernel_size, dilation_rate):
- return Conv1D(filters=filters, kernel_size=kernel_size, strides=1, padding='causal', activation=None, dilation_rate=dilation_rate)(inputs)
-
- def batchnorm(inputs):
- return BatchNormalization()(inputs)
-
- def activation(inputs, activation):
- return Activation(activation)(inputs)
-
- # 定义超参数
- epochs = 50
- num_blocks = 3
- filters = 128
-
- # 输入和卷积参数
- X = Input(shape=(None, mfcc_dim,), dtype='float32', name='X')
- Y = Input(shape=(None,), dtype='float32', name='Y')
- X_length = Input(shape=(1,), dtype='int32', name='X_length')
- Y_length = Input(shape=(1,), dtype='int32', name='Y_length')
-
- # 构建模型
- h0 = activation(batchnorm(conv1d(X, filters, 1, 1)), 'tanh')
- shortcut = []
- for i in range(num_blocks):
- for r in [1, 2, 4, 8, 16]:
- h0, s = ResidualBlock(filters=filters, kernel_size=7, dilation_rate=r)(h0)
- shortcut.append(s)
-
- h1 = activation(Add()(shortcut), 'relu')
- h1 = activation(batchnorm(conv1d(h1, filters, 1, 1)), 'relu')
- Y_pred = activation(batchnorm(conv1d(h1, len(char2id) + 1, 1, 1)), 'softmax')
- sub_model = Model(inputs=X, outputs=Y_pred)
-
- # 构建整体模型
- ctc_loss = Lambda(calc_ctc_loss, output_shape=(1,), name='ctc')([Y, sub_model.output, X_length, Y_length])
- model = Model(inputs=[X, Y, X_length, Y_length], outputs=ctc_loss)
- optimizer = SGD(learning_rate=0.02, momentum=0.9, nesterov=True, clipnorm=5)
- model.compile(loss={'ctc': lambda ctc_true, ctc_pred: ctc_pred}, optimizer=optimizer)
-
- # 回调和训练
- checkpointer = ModelCheckpoint(filepath='full_asr.h5', verbose=0)
- lr_decay = ReduceLROnPlateau(monitor='loss', factor=0.2, patience=1, min_lr=0.000)
- early_stopping = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)
-
- # 绘制模型结构图
- plot_model(model, to_file='model.png', show_shapes=True, dpi=280)
- history = model.fit(
- x=batch_generator(X_train, Y_train),
- steps_per_epoch=len(X_train) // batch_size,
- epochs=epochs,
- validation_data=batch_generator(X_test, Y_test),
- validation_steps=len(X_test) // batch_size,
- callbacks=[checkpointer, lr_decay, early_stopping])
- train_loss = history.history['loss']
- valid_loss = history.history['val_loss']
- plt.plot(np.linspace(1, epochs, epochs), train_loss, label='train')
- plt.plot(np.linspace(1, epochs, epochs), valid_loss, label='valid')
- plt.legend(loc='upper right')
- plt.xlabel('Epoch')
- plt.ylabel('Loss')
- plt.show()
- sub_model.save('sub_asr.h5')
-
- with open('dictionary.pkl', 'wb') as fw:
- pickle.dump([char2id, id2char, mfcc_mean, mfcc_std], fw)
下面这个代码是在训练集和测试集中随机抽取样本进行测试。
- from tensorflow.keras.models import load_model
- import pickle
-
- with open('dictionary.pkl', 'rb') as fr:
- [char2id, id2char, mfcc_mean, mfcc_std] = pickle.load(fr)
-
- sub_model = load_model('sub_asr_1.h5')
-
- def random_predict(x, y):
- index = np.random.randint(len(x))
- feature = x[index]
- text = y[index]
-
- pred = sub_model.predict(np.expand_dims(feature, axis=0))
- pred_ids = K.eval(K.ctc_decode(pred, [feature.shape[0]], greedy=False, beam_width=10, top_paths=1)[0][0])
- pred_ids = pred_ids.flatten().tolist()
-
- print('True transcription:\n-- ', text, '\n')
- # 防止音频中出现字典中不存在的字,返回空格代替
- print('Predicted transcription:\n-- ' + ''.join([id2char.get(i, ' ') for i in pred_ids]), '\n')
-
-
- random_predict(X_train, Y_train)
- random_predict(X_test, Y_test)
- WARNING:tensorflow:No training configuration found in the save file, so the model was *not* compiled. Compile it manually.
- 1/1 [==============================] - 2s 2s/step
- True transcription:
- -- 当我病愈去医院看望您时您这个年长者却躺在病床上紧紧拉着我的手殷殷嘱我保重
-
- Predicted transcription:
- -- 当我病愈去医院看望您时您这个年长者却躺在病床上紧拉着我的手殷嘱我保重
-
- 1/1 [==============================] - 2s 2s/step
- True transcription:
- -- 目前内蒙古拥有蒙医院二十七所中蒙医院十九所中蒙医研究所七处
-
- Predicted transcription:
- -- 目前内蒙古拥有蒙医院二十七所中蒙医院十九所中蒙医研究所七处
下面这个代码为获取本地电脑麦克风权限,记录5秒并将数据保存到本地,再用模型预测。需要提醒的是,在录音时要确保没有噪音且说话时是标准的普通话。
- import pyaudio
- import wave
-
- def record_audio(output_wav_path, duration=5, sample_rate=44100, chunk_size=1024):
- audio = pyaudio.PyAudio()
-
- # 打开麦克风
- stream = audio.open(format=pyaudio.paInt16,
- channels=1,
- rate=sample_rate,
- input=True,
- frames_per_buffer=chunk_size)
-
- print("Recording...")
-
- frames = []
-
- # 监听并录制音频
- for _ in range(0, int(sample_rate / chunk_size * duration)):
- data = stream.read(chunk_size)
- frames.append(data)
-
- print("Recording finished.")
-
- # 关闭麦克风
- stream.stop_stream()
- stream.close()
- audio.terminate()
-
- # 保存录制的音频为 WAV 文件
- with wave.open(output_wav_path, 'wb') as wf:
- wf.setnchannels(1)
- wf.setsampwidth(audio.get_sample_size(pyaudio.paInt16))
- wf.setframerate(sample_rate)
- wf.writeframes(b''.join(frames))
-
-
- output_wav_path = "recorded_audio.wav"
- record_audio(output_wav_path, duration=5)
- def single_predict(audio_path):
- # 加载和修剪音频
- audio, sr = load_and_trim(audio_path)
-
- # 计算音频的MFCC特征
- feature = mfcc(audio, sr, numcep=mfcc_dim, nfft=551)
-
- feature = (feature - mfcc_mean) / (mfcc_std + 1e-14)
-
- pred = sub_model.predict(np.expand_dims(feature, axis=0))
- pred_ids = K.eval(K.ctc_decode(pred, [feature.shape[0]], greedy=False, beam_width=10, top_paths=1)[0][0])
- pred_ids = pred_ids.flatten().tolist()
-
- print('Predicted transcription:\n-- ' + ''.join([id2char.get(i, ' ') for i in pred_ids]), '\n')
-
- # Specify the path to your MP3 audio file
- audio_path = "recorded_audio.wav"
-
- single_predict(audio_path)
项目资源地址如下:GitHub - 0911duzhou/Automatic-speech-recognition
若无法访问Github,也可在博主的主页资源里下载。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。