赞
踩
流式语音合成模型是指可以在合成音频流的过程中,支持实时返回识别结果的一类TTS模型。与之相对的是非流式模型,它必须在处理完整句音频后才能返回结果。
显而易见,对于点读场景,使用流式语音合成可以大幅提升用户体验。
在之前的电表系列和点读系列项目中,我们使用的一直是非流式模型,这距离模型真实落地,显然还有不小差距。
这么做的原因是因为之前流式语音功能一直没能实现。现在,利用PaddleSpeech项目中,提供的强大的流式语音合成能力,通过ONNXRuntime,实现了流式语音模型的部署落地。
具体操作可以参考百度技术团队大佬的示例项目:
【PaddleSpeech】语音合成-onnx模型使用
在本文中,主要对上面的示例项目进行说明,并介绍改造、简化OpenVINO官方的PaddleOCR部署示例代码的方法,最终,结合之前打造的点读机API,完成一个完成的电表视频流读数和编号识别系统。
废话不多说,先上效果图和演示视频。
from IPython.display import Video
Video('2022-07-12 14-17-19.mkv')
video
element.
电表全系列
(主线篇)
(众人拾柴——应用篇)
(番外篇)
点读全系列
ONNX部署
参考下面在AI Studio上的操作,在本地安装PaddleSpeech环境,并准备好onnx部署模型。
# 安装PaddleSpeech
!git clone https://gitee.com/paddlepaddle/PaddleSpeech.git
# 进入PaddleSpeech目录
%cd PaddleSpeech
# 安装依赖环境
!pip install pytest-runner
# 需要本地安装
# !pip install openvino==2022.1.0
!pip install .
# 下载模型并解压缩
%cd /home/aistudio/work
!wget https://paddlespeech.bj.bcebos.com/Parakeet/released_models/fastspeech2/fastspeech2_cnndecoder_csmsc_streaming_onnx_1.0.0.zip
!wget https://paddlespeech.bj.bcebos.com/Parakeet/released_models/mb_melgan/mb_melgan_csmsc_onnx_0.2.0.zip
!unzip fastspeech2_cnndecoder_csmsc_streaming_onnx_1.0.0.zip
!unzip mb_melgan_csmsc_onnx_0.2.0.zip
# 下载nltk数据包,如果项目中有就不用下载了
%cd /home/aistudio
!wget -P data https://paddlespeech.bj.bcebos.com/Parakeet/tools/nltk_data.tar.gz
!tar zxvf data/nltk_data.tar.gz
语音合成流水线包含 文本前端(Text Frontend) 、声学模型(Acoustic Model) 和 声码器(Vocoder) 三个主要模块:
语音合成基本流程如下图所示:
本文使用的是基于 FastSpeech2 声学模型和 MelGAN 声码器的中文流式语音合成系统:
具体的模型运行过程可以参考项目【PaddleSpeech】语音合成-onnx模型使用,这里不再重复介绍。
一个文本前端模块主要包含:
(输入给声学模型之前,还需要把音素序列转换为 id)
其中最重要的模块是 文本正则化 模块和 字音转换(TTS 中更常用 G2P 代指) 模块。
PaddleSpeech Text-to-Speech的文本前端解决方案:
声学模型将字符/音素转换为声学特征,如线性频谱图、mel 频谱图、LPC 特征等,声学特征以 “帧” 为单位,一般一帧是 10ms 左右,一个音素一般对应 5~20 帧左右, 声学模型需要解决的是 “不等长序列间的映射问题”,“不等长”是指,同一个人发不同音素的持续时间不同,同一个人在不同时刻说同一句话的语速可能不同,对应各个音素的持续时间不同,不同人说话的特色不同,对应各个音素的持续时间不同。这是一个困难的“一对多”问题。
声学模型主要分为自回归模型和非自回归模型,其中自回归模型在 t
时刻的预测需要依赖 t-1
时刻的输出作为输入,预测时间长,但是音质相对较好,非自回归模型不存在预测上的依赖关系,预测时间快,音质相对较差。
主流声学模型发展的脉络:
在本文中,我们使用 FastSpeech2
作为声学模型。
PaddleSpeech TTS 实现的 FastSpeech2 与论文不同的地方在于,使用的是 phone 级别的 pitch
和 energy
(与 FastPitch 类似),这样的合成结果可以更加稳定。
声码器将声学特征转换为波形。声码器需要解决的是 “信息缺失的补全问题”。信息缺失是指,在音频波形转换为频谱图的时候,存在相位信息的缺失,在频谱图转换为 mel 频谱图的时候,存在频域压缩导致的信息缺失;假设音频的采样率是16kHZ, 一帧的音频有 10ms,也就是说,1s 的音频有 16000 个采样点,而 1s 中包含 100 帧,每一帧有 160 个采样点,声码器的作用就是将一个频谱帧变成音频波形的 160 个采样点,所以声码器中一般会包含上采样模块。
与声学模型类似,声码器也分为自回归模型和非自回归模型, 更细致的分类如下:
选择PaddleSpeech提供的语音合成模型时,要注意对应的训练集。比如在CSMCS数据集上训练的,就是中文语音合成模型,在ljspeech数据集上训练的,则是英文语音合成模型。
组织好预训练模型的情况下,在本地运行项目提供的流式语音合成脚本streaming_tts.py
,如果本地的声卡环境支持,就能听到PaddleSpeech合成的语音了。
# 运行代码需要本地环境 import onnxruntime as ort import math import time import numpy as np import pyaudio import soundfile as sf from paddlespeech.server.utils.util import denorm, get_chunks from paddlespeech.server.utils.audio_process import float2pcm from paddlespeech.t2s.frontend.zh_frontend import Frontend voc_block = 36 voc_pad = 14 am_block = 72 am_pad = 12 voc_upsample = 300 # 配置文本前端 phones_dict = "fastspeech2_cnndecoder_csmsc_streaming_onnx_1.0.0/phone_id_map.txt" frontend = Frontend( phone_vocab_path=phones_dict, tone_vocab_path=None) am_stat_path = r"fastspeech2_cnndecoder_csmsc_streaming_onnx_1.0.0/speech_stats.npy" am_mu, am_std = np.load(am_stat_path) # 模型路径 onnx_am_encoder = "fastspeech2_cnndecoder_csmsc_streaming_onnx_1.0.0/fastspeech2_csmsc_am_encoder_infer.onnx" onnx_am_decoder = "fastspeech2_cnndecoder_csmsc_streaming_onnx_1.0.0/fastspeech2_csmsc_am_decoder.onnx" onnx_am_postnet = "fastspeech2_cnndecoder_csmsc_streaming_onnx_1.0.0/fastspeech2_csmsc_am_postnet.onnx" onnx_voc_melgan = "mb_melgan_csmsc_onnx_0.2.0/mb_melgan_csmsc.onnx" # 用CPU推理 providers = ['CPUExecutionProvider'] # 配置ort session sess_options = ort.SessionOptions() # 加载模型,创建Onnxruntime Session am_encoder_infer_sess = ort.InferenceSession(onnx_am_encoder, providers=providers, sess_options=sess_options) am_decoder_sess = ort.InferenceSession(onnx_am_decoder, providers=providers, sess_options=sess_options) am_postnet_sess = ort.InferenceSession(onnx_am_postnet, providers=providers, sess_options=sess_options) voc_melgan_sess = ort.InferenceSession(onnx_voc_melgan, providers=providers, sess_options=sess_options) def depadding(data, chunk_num, chunk_id, block, pad, upsample): """ Streaming inference removes the result of pad inference """ front_pad = min(chunk_id * block, pad) # first chunk if chunk_id == 0: data = data[:block * upsample] # last chunk elif chunk_id == chunk_num - 1: data = data[front_pad * upsample:] # middle chunk else: data = data[front_pad * upsample:(front_pad + block) * upsample] return data def inference_stream(text): input_ids = frontend.get_input_ids( text, merge_sentences=False, get_tone_ids=False) phone_ids = input_ids["phone_ids"] for i in range(len(phone_ids)): part_phone_ids = phone_ids[i].numpy() voc_chunk_id = 0 orig_hs = am_encoder_infer_sess.run( None, input_feed={'text': part_phone_ids}) orig_hs = orig_hs[0] # streaming voc chunk info mel_len = orig_hs.shape[1] voc_chunk_num = math.ceil(mel_len / voc_block) start = 0 end = min(voc_block + voc_pad, mel_len) # streaming am hss = get_chunks(orig_hs, am_block, am_pad, "am") am_chunk_num = len(hss) for i, hs in enumerate(hss): am_decoder_output = am_decoder_sess.run( None, input_feed={'xs': hs}) am_postnet_output = am_postnet_sess.run( None, input_feed={ 'xs': np.transpose(am_decoder_output[0], (0, 2, 1)) }) am_output_data = am_decoder_output + np.transpose( am_postnet_output[0], (0, 2, 1)) normalized_mel = am_output_data[0][0] sub_mel = denorm(normalized_mel, am_mu, am_std) sub_mel = depadding(sub_mel, am_chunk_num, i, am_block, am_pad, 1) if i == 0: mel_streaming = sub_mel else: mel_streaming = np.concatenate( (mel_streaming, sub_mel), axis=0) # streaming voc # 当流式AM推理的mel帧数大于流式voc推理的chunk size,开始进行流式voc 推理 while (mel_streaming.shape[0] >= end and voc_chunk_id < voc_chunk_num): voc_chunk = mel_streaming[start:end, :] sub_wav = voc_melgan_sess.run( output_names=None, input_feed={'logmel': voc_chunk}) sub_wav = depadding( sub_wav[0], voc_chunk_num, voc_chunk_id, voc_block, voc_pad, voc_upsample) yield sub_wav voc_chunk_id += 1 start = max( 0, voc_chunk_id * voc_block - voc_pad) end = min( (voc_chunk_id + 1) * voc_block + voc_pad, mel_len) if __name__ == '__main__': text = "欢迎使用飞桨语音合成系统,测试一下合成效果。" # warm up # onnxruntime 第一次时间会长一些,建议先warmup一下 for sub_wav in inference_stream(text="哈哈哈哈"): continue # pyaudio 播放 p = pyaudio.PyAudio() stream = p.open(format = p.get_format_from_width(2), # int16 channels = 1, rate = 24000, output = True) # 计时 wavs = [] t1 = time.time() for sub_wav in inference_stream(text): print("响应时间:", time.time() - t1) t1 = time.time() wavs.append(sub_wav.flatten()) wav = float2pcm(sub_wav) # float32 to int16 wav_bytes = wav.tobytes() # to bytes stream.write(wav_bytes) # 关闭 pyaudio播放器 stream.stop_stream() stream.close() p.terminate() # 流式合成的结果导出 wav = np.concatenate(wavs) print(wav.shape) sf.write("demo_stream.wav",data=wav, samplerate=24000)
该系统高度依赖于前置项目,比如电表检测识别模型(PPOCR-v2版)来自于【PaddlePaddle+OpenVINO】电表检测识别模型的部署项目,并对该项目的OCR识别过程进行了大幅简化;前端GUI设计来自于手把手教你快速打造一个AI识物点读机项目,直接继承了该项目调用摄像头、加载视频的功能,并将该项目的非流式语音合成系统替换为流式语音。
下面介绍下几处核心代码。
在OpenVINO的官方示例教程中,PaddleOCR with OpenVINO的notebook是405-paddle-ocr-webcam,这是一个输入视频流的项目,但是高度耦合在原项目中。我们需要自己设计GUI前端,比较好的办法就是先把代码拆掉,只留下单图的OCR识别能力就行。
具体做法就是把不必要的层级,如try:……
全删掉,并且将传入的视频帧frame
替换成单张图片。核心代码实现如下:
def run_paddle_ocr(img_path): text_list = ['测试效果'] processing_times = collections.deque() det_request = det_compiled_model.create_infer_request() test_pic = cv2.imread(img_path) test_image = image_preprocess(test_pic, 1600) # measure processing time for text detection start_time = time.time() # perform the inference step det_request.infer(inputs={det_input_layer.any_name: test_image}) det_results = det_request.get_tensor(det_output_layer).data stop_time = time.time() # Postprocessing for Paddle Detection ori_im = test_pic.copy() data = {'image': test_pic} data_resize = processing.DetResizeForTest(data) data_list = [] keep_keys = ['image', 'shape'] for key in keep_keys: data_list.append(data_resize[key]) img, shape_list = data_list shape_list = np.expand_dims(shape_list, axis=0) pred = det_results[0] if isinstance(pred, paddle.Tensor): pred = pred.numpy() segmentation = pred > 0.3 boxes_batch = [] for batch_index in range(pred.shape[0]): src_h, src_w, ratio_h, ratio_w = shape_list[batch_index] mask = segmentation[batch_index] boxes, scores = processing.boxes_from_bitmap(pred[batch_index], mask,src_w, src_h) boxes_batch.append({'points': boxes}) post_result = boxes_batch dt_boxes = post_result[0]['points'] dt_boxes = processing.filter_tag_det_res(dt_boxes, ori_im.shape) processing_times.append(stop_time - start_time) # use processing times from last 200 frames if len(processing_times) > 400: processing_times.popleft() processing_time_det = np.mean(processing_times) * 1000 # Preprocess detection results for recognition dt_boxes = processing.sorted_boxes(dt_boxes) img_crop_list = [] if dt_boxes != []: for bno in range(len(dt_boxes)): tmp_box = copy.deepcopy(dt_boxes[bno]) img_crop = processing.get_rotate_crop_image(ori_im, tmp_box) img_crop_list.append(img_crop) # Recognition starts from here img_num = len(img_crop_list) # Calculate the aspect ratio of all text bars width_list = [] for img in img_crop_list: width_list.append(img.shape[1] / float(img.shape[0])) # Sorting can speed up the recognition process indices = np.argsort(np.array(width_list)) rec_res = [['', 0.0]] * img_num batch_num = 4 # For each detected text box, run inference for text recognition for beg_img_no in range(0, img_num, batch_num): end_img_no = min(img_num, beg_img_no + batch_num) norm_img_batch = [] max_wh_ratio = 0 for ino in range(beg_img_no, end_img_no): h, w = img_crop_list[indices[ino]].shape[0:2] wh_ratio = w * 1.0 / h max_wh_ratio = max(max_wh_ratio, wh_ratio) for ino in range(beg_img_no, end_img_no): norm_img = resize_norm_img(img_crop_list[indices[ino]],max_wh_ratio) norm_img = norm_img[np.newaxis, :] norm_img_batch.append(norm_img) norm_img_batch = np.concatenate(norm_img_batch) norm_img_batch = norm_img_batch.copy() # Run inference for text recognition rec_request = rec_compiled_model.create_infer_request() rec_request.infer(inputs={rec_input_layer.any_name: norm_img_batch}) rec_results = rec_request.get_tensor(rec_output_layer).data # Postprocessing recognition results postprocess_op = processing.build_post_process(processing.postprocess_params) rec_result = postprocess_op(rec_results) for rno in range(len(rec_result)): rec_res[indices[beg_img_no + rno]] = rec_result[rno] # Text recognition results, rec_res, include two parts: # txts are the recognized text results, scores are the recognition confidence level if rec_res != []: image = Image.fromarray(cv2.cvtColor(test_pic, cv2.COLOR_BGR2RGB)) boxes = dt_boxes txts = [rec_res[i][0] for i in range(len(rec_res)) if rec_res[i][1]>0.3] scores = [rec_res[i][1] for i in range(len(rec_res))] # draw text recognition results beside the image draw_img = processing.draw_ocr_box_txt( image, boxes, txts, scores, drop_score=0.3) # Visualize PPOCR results _, f_width = draw_img.shape[:2] fps = 1000 / processing_time_det cv2.putText(img=draw_img, text=f"OpenVINO Inference time: {processing_time_det:.1f}ms ({fps:.1f} FPS)", org=(20, 40),fontFace=cv2.FONT_HERSHEY_COMPLEX, fontScale=f_width / 1000, color=(0, 0, 255), thickness=1, lineType=cv2.LINE_AA) draw_img = cv2.cvtColor(draw_img, cv2.COLOR_RGB2BGR) cv2.imwrite('test.jpg', draw_img) for i in range(len(txts)): if len(txts[i])>8: txts[i] = '电表编号是' + str(txts[i]) else: txts[i] = '电表读数是' + str(txts[i]) print(txts) if len(txts) > 0: text_list = txts return text_list
在点读系统中,上面这段核心代码还需继续改造,不仅要输出识别结果,还要将合成的图片一并返回:
def run_paddle_ocr(frame): text_list = ['测试效果'] processing_times = collections.deque() det_request = det_compiled_model.create_infer_request() test_image = image_preprocess(frame, 1600) # measure processing time for text detection start_time = time.time() # perform the inference step det_request.infer(inputs={det_input_layer.any_name: test_image}) det_results = det_request.get_tensor(det_output_layer).data stop_time = time.time() # Postprocessing for Paddle Detection ori_im = frame.copy() data = {'image': frame} data_resize = processing.DetResizeForTest(data) data_list = [] keep_keys = ['image', 'shape'] for key in keep_keys: data_list.append(data_resize[key]) img, shape_list = data_list shape_list = np.expand_dims(shape_list, axis=0) pred = det_results[0] if isinstance(pred, paddle.Tensor): pred = pred.numpy() segmentation = pred > 0.3 boxes_batch = [] for batch_index in range(pred.shape[0]): src_h, src_w, ratio_h, ratio_w = shape_list[batch_index] mask = segmentation[batch_index] boxes, scores = processing.boxes_from_bitmap(pred[batch_index], mask,src_w, src_h) boxes_batch.append({'points': boxes}) post_result = boxes_batch dt_boxes = post_result[0]['points'] dt_boxes = processing.filter_tag_det_res(dt_boxes, ori_im.shape) processing_times.append(stop_time - start_time) # use processing times from last 200 frames if len(processing_times) > 400: processing_times.popleft() processing_time_det = np.mean(processing_times) * 1000 # Preprocess detection results for recognition dt_boxes = processing.sorted_boxes(dt_boxes) img_crop_list = [] if dt_boxes != []: for bno in range(len(dt_boxes)): tmp_box = copy.deepcopy(dt_boxes[bno]) img_crop = processing.get_rotate_crop_image(ori_im, tmp_box) img_crop_list.append(img_crop) # Recognition starts from here img_num = len(img_crop_list) # Calculate the aspect ratio of all text bars width_list = [] for img in img_crop_list: width_list.append(img.shape[1] / float(img.shape[0])) # Sorting can speed up the recognition process indices = np.argsort(np.array(width_list)) rec_res = [['', 0.0]] * img_num batch_num = 4 # For each detected text box, run inference for text recognition for beg_img_no in range(0, img_num, batch_num): end_img_no = min(img_num, beg_img_no + batch_num) norm_img_batch = [] max_wh_ratio = 0 for ino in range(beg_img_no, end_img_no): h, w = img_crop_list[indices[ino]].shape[0:2] wh_ratio = w * 1.0 / h max_wh_ratio = max(max_wh_ratio, wh_ratio) for ino in range(beg_img_no, end_img_no): norm_img = resize_norm_img(img_crop_list[indices[ino]],max_wh_ratio) norm_img = norm_img[np.newaxis, :] norm_img_batch.append(norm_img) norm_img_batch = np.concatenate(norm_img_batch) norm_img_batch = norm_img_batch.copy() # Run inference for text recognition rec_request = rec_compiled_model.create_infer_request() rec_request.infer(inputs={rec_input_layer.any_name: norm_img_batch}) rec_results = rec_request.get_tensor(rec_output_layer).data # Postprocessing recognition results postprocess_op = processing.build_post_process(processing.postprocess_params) rec_result = postprocess_op(rec_results) for rno in range(len(rec_result)): rec_res[indices[beg_img_no + rno]] = rec_result[rno] # Text recognition results, rec_res, include two parts: # txts are the recognized text results, scores are the recognition confidence level if rec_res != []: image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) boxes = dt_boxes txts = [rec_res[i][0] for i in range(len(rec_res)) if rec_res[i][1]>0.3] scores = [rec_res[i][1] for i in range(len(rec_res))] # draw text recognition results beside the image draw_img = processing.draw_ocr_box_txt( image, boxes, txts, scores, drop_score=0.3) # Visualize PPOCR results _, f_width = draw_img.shape[:2] fps = 1000 / processing_time_det draw_img = cv2.cvtColor(draw_img, cv2.COLOR_RGB2BGR) draw_img = cv2.resize(draw_img, (800, 600)) draw_img = cv2.putText(img=draw_img, text=f"OpenVINO Infer time: {processing_time_det:.1f}ms ({fps:.1f} FPS)", org=(10, 100),fontFace=cv2.FONT_HERSHEY_COMPLEX, fontScale=1.0, color=(0, 0, 255), thickness=1, lineType=cv2.LINE_AA) cv2.imwrite('test.jpg', draw_img) for i in range(len(txts)): if len(txts[i])>8: txts[i] = '电表编号是' + str(txts[i]) else: txts[i] = '电表读数是' + str(txts[i]) print(txts) if len(txts) > 0: text_list = txts return text_list, draw_img
对于流式语音合成任务,输入的是一串文字,但我们需要注意的是,PaddleOCR识别输出的结果是list。这里面有个简单的处理,就是去遍历这个list,然后用标点分割,把它们串成一串文字。
text_list = run_paddle_ocr('data/IMG_20210727_091835.jpg')
a = ','
text = a.join(text_list)
关键代码如下:
# 判断点读事件是否已触发 if True == self.readEvent.is_set(): save_time = str(int(time.time())) frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) cv2.imwrite('output_%s.jpg' % save_time, frame) # text_list = run_paddle_ocr('data/IMG_20210727_091835.jpg') a = ',' text = a.join(text_list) for sub_wav in inference_stream(text="哈哈哈哈"): continue # pyaudio 播放 p = pyaudio.PyAudio() stream = p.open(format = p.get_format_from_width(2), # int16 channels = 1, rate = 24000, output = True) # 计时 wavs = [] t1 = time.time() for sub_wav in inference_stream(text): print("响应时间:", time.time() - t1) t1 = time.time() wavs.append(sub_wav.flatten()) wav = float2pcm(sub_wav) # float32 to int16 wav_bytes = wav.tobytes() # to bytes stream.write(wav_bytes) # 关闭 pyaudio播放器 stream.stop_stream() stream.close() p.terminate() # 流式合成的结果导出 wav = np.concatenate(wavs) print(wav.shape) sf.write("output_%s.wav" % save_time,data=wav, samplerate=24000)
本项目介绍了基于PaddleSpeech流式语音合成模型ONNX部署实现的电表检测识别点读系统,读者可在本地Windows环境自行使用,代码请查看项目的SpotReads-OCR.zip
文件。
设想我们把系统在Intel AIBOX上运行,如果有一台巡检机器人,用户就可以通过后台只会,等到摄像头定位准确的时候,点读(或保存)电表读数和编号识别结果,记录下目标电表的信息,然后让机器人“赶赴”下一台电表,进而实现“无人现场”的抄表场景——吹吹空调就能完成任务,不错吧~
至此,电表系列项目的核心优化点又回到了模型效果上,是时候标一波数据了~
配合PaddleOCR-v3,读者可以期待点读系统的下一次升级~
此处仅为搬运,原作链接:https://aistudio.baidu.com/aistudio/projectdetail/4324411
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。