赞
踩
在nlp或者asr任务中,通常都需要根据corpus创建词典,并将单词在词典中的位置进行映射。词典中词可以是单个字也可以是字符,需要根据具体应用进行处理,一般在端到端asr任务中,vocab中单位都是字。
首先我们拿到的标注文本一般是这样的
MDT_F2F_223_158.wav 反正,呃对还有两台平板,然后高中的时候学校不给带手机我们都借他的手机用,他的手机真的借遍了全班
MDT_F2F_223_159.wav 那会儿我有一次问他,我说我想借你的苹果四,他说哦,那台就
或者是json文件, 一个json文件是一段长音频的标注文件
{ "start_time": { "original": "0:00:00.988400" }, "end_time": { "original": "0:00:08.122000" }, "words": "呃诶你知道吗?我们家有一片桃园哎,就在河南漯河", "speaker": "SPK059", "location": "Room-1", "topic": "Gardening", "room_info": { "length(m)": 8.44, "width(m)": 3.55, "height(m)": 2.22, "T60": 0.36 }, "devices_type": { "Android": "Huawei mate9", "IOS": "Iphone X", "Recorder": "Newsmy RV51" }, "session_id": "MDT_Conversation_060" },
这个时候需要先根据长音频标注json处理成每个短音频标注文本,然后再利用短音频标注文本做成vocab
举个例子,下面是已经划分好的训练集、测试集、验证集的数据
(notebook) root@ai-PowerEdge-R740:/opt/asr# ls /data/magic_data/
audio sample_submission.csv transcription
(notebook) root@ai-PowerEdge-R740:/opt/asr# ls /data/magic_data/audio/
dev test train
(notebook) root@ai-PowerEdge-R740:/opt/asr# ls /data/magic_data/transcription/
dev test_no_ref_noise train
(notebook) root@ai-PowerEdge-R740:/opt/asr# ls /data/magic_data/audio/train/
MDT_F2F_001.wav MDT_F2F_044.wav MDT_F2F_087.wav MDT_F2F_130.wav MDT_F2F_173.wav
(notebook) root@ai-PowerEdge-R740:/opt/asr# ls /data/magic_data/transcription/train/
MDT_F2F_001.json MDT_F2F_044.json MDT_F2F_087.json MDT_F2F_130.json MDT_F2F_173.json
根据json标注文件,将长音频文件处理为一句话音频和标注,并处理成这种格式
# 将音频切分保存到wav文件夹中,并生成wav.scp和transcripts文件
# wav.scp文件格式:
# id1 path1
# id2 path2
# transcripts文件格式:
# id1 你好
# id2 今天天气怎么样
参考代码为prepare_data.py
import os import json data_rootdir = '/data/magic_data' # 指定解压后数据的根目录 audiodir = os.path.join(data_rootdir, 'audio') trans_dir = os.path.join(data_rootdir, 'transcription') # 音频切分 def segment_wav(src_wav, tgt_wav, start_time, end_time): span = end_time - start_time cmds = 'sox %s %s trim %f %f' % (src_wav, tgt_wav, start_time, span) os.system(cmds) # 将时间格式转化为秒为单位 def time2sec(t): h,m,s = t.strip().split(":") return float(h) * 3600 + float(m) * 60 + float(s) # 读取json文件内容 def load_json(json_file): with open(json_file, 'r') as f: lines = f.readlines() json_str = ''.join(lines).replace('\n', '').replace(' ', '').replace(',}', '}') return json.loads(json_str) # 训练集和开发集数据处理 for name in ['train', 'dev']: save_dir = os.path.join('data', name, 'wav') if not os.path.exists(save_dir): os.makedirs(save_dir) seg_wav_list = [] sub_audio_dir = os.path.join(audiodir, name) for wav in os.listdir(sub_audio_dir): if wav[0] == '.': continue # 跳过隐藏文件 '._MDT_F2F_209.wav' #把wav文件转换为json文件 if name == 'dev': # 'MDT_Conversation_045_Android.wav' MDT_Conversation_045.json #Android.wav parts = wav.split('_') jf = '_'.join(parts[:-1])+'.json' # suffix = parts[-1] else: jf = wav[:-4]+'.json' # "MDT_F2F_100.wav" , 'MDT_F2F_100.json' utt_list = load_json(os.path.join(trans_dir, name, jf)) for i in range(len(utt_list)): utt_info = utt_list[i] #一段语音 session_id = utt_info['session_id'] if name == 'dev': tgt_id = session_id + '_' + str(i) + '_' + suffix else: tgt_id = session_id + '_' + str(i) + '.wav' # 句子切分 start_time = time2sec(utt_info['start_time']['original']) end_time = time2sec(utt_info['end_time']['original']) src_wav = os.path.join(sub_audio_dir, wav) tgt_wav = os.path.join(save_dir, tgt_id) segment_wav(src_wav, tgt_wav, start_time, end_time) seg_wav_list.append((tgt_id, tgt_wav, utt_info['words'])) #[('MDT_F2F_223_3.wav', './data/train/wav/MDT_F2F_223_3.wav', '诶,那你考研想考哪儿')] with open(os.path.join('./data', name, 'wav.scp'), 'w') as ww: with open(os.path.join('./data', name, 'transcrpts.txt'), 'w', encoding='utf-8') as tw: for uttid, wavdir, text in seg_wav_list: ww.write(uttid+' '+wavdir+'\n') tw.write(uttid+' '+text+'\n') print('prepare %s dataset done!' % name) # 测试集数据处理 save_dir = os.path.join('./data', 'test', 'wav') if not os.path.exists(save_dir): os.makedirs(save_dir) seg_wav_list = [] sub_audio_dir = os.path.join(audiodir, 'test') for wav in os.listdir(sub_audio_dir): if wav[0] == '.' or 'IOS' not in wav: continue # 跳过隐藏文件和非IOS的音频文件 jf = '_'.join(wav.split('_')[:-1])+'.json' utt_list = load_json(os.path.join(trans_dir, 'test_no_ref_noise', jf)) for i in range(len(utt_list)): utt_info = utt_list[i] session_id = utt_info['session_id'] uttid = utt_info['uttid'] if 'words' in utt_info: continue # 如果句子已经标注,则跳过 # 句子切分 start_time = time2sec(utt_info['start_time']) end_time = time2sec(utt_info['end_time']) tgt_id = uttid + '.wav' src_wav = os.path.join(sub_audio_dir, wav) tgt_wav = os.path.join(save_dir, tgt_id) segment_wav(src_wav, tgt_wav, start_time, end_time) seg_wav_list.append((uttid, tgt_wav)) if not os.path.exists(os.path.join('data', 'test')): os.makedirs(os.path.join('data', 'test')) with open(os.path.join('data', 'test', 'wav.scp'), 'w') as ww: for uttid, wavdir in seg_wav_list: ww.write(uttid+' '+wavdir+'\n') print('prepare test dataset done!')
这段代码处理后,得到如下文件。
wav.scp 参考内容
MDT_Conversation_060_0_Android.wav data/dev/wav/MDT_Conversation_060_0_Android.wav
MDT_Conversation_060_1_Android.wav data/dev/wav/MDT_Conversation_060_1_Android.wav
transcrpts.txt 参考内容
MDT_Conversation_060_0_Android.wav 呃诶你知道吗?我们家有一片桃园哎,就在河南漯河
MDT_Conversation_060_1_Android.wav 哦桃园什么桃园,是种桃子的园子吗
然后将transcripts.txt文件中句子切分为字,并去掉特殊符号
参考代码 process_text.py
import os import string import hanzi def text_normlization(seq): new_seq = [] for c in seq: if c == '+': new_seq.append(c) # 文档中有加号,所以单独处理,避免删除 elif c in string.punctuation or c in hanzi.punctuation: continue # 删除全部的半角标点和全角标点 else: if c.encode('UTF-8').isalpha(): c = c.lower() # 大写字母转小写 new_seq.append(c) return ' '.join(new_seq) for name in ['train', 'dev']: with open(os.path.join('./data', name, 'transcrpts.txt'), 'r') as tr: with open(os.path.join('./data', name, 'text'), 'w') as tw: for line in tr: parts = line.split() uttid = parts[0] seqs = ''.join(parts[1:]) if '[' in seqs: continue # 直接跳过包含特殊标记的句子 seqs = text_normlization(seqs) tw.write(uttid +' '+seqs+'\n') print('Normlize %s TEXT!' % name)
现在transcripts.txt变换为text, 测试集不需要做处理
读取train和test下text创建词典
create_vocab.py
import os vocab_dict = {} #词频统计 for name in ['train', 'dev']: with open(os.path.join('./data', name, 'text'), 'r', encoding='utf-8') as fr: for line in fr: chars = line.strip().split()[1:] for c in chars: if c in vocab_dict: vocab_dict[c] += 1 else: vocab_dict[c] = 1 vocab_list = sorted(vocab_dict.items(), key=lambda x: x[1], reverse=True) #[('是', 68793),('的', 64165)] vocab = {'<PAD>': 0, '<BOS>': 1, '<EOS>': 2, '<UNK>': 3} # word_id for i in range(len(vocab_list)): c = vocab_list[i][0] vocab[c] = i + 4 print('There are %d units in Vocabulary!' % len(vocab)) with open(os.path.join('./data', 'vocab'), 'w', encoding='utf-8') as fw: for c, id in vocab.items(): fw.write(c+' '+ str(id) +'\n')
vocab加了四个特殊符号,保存为data目录下
综上就是词典的制作过程,其中还顺便记录了音频的切分过程,其中process_text.py中汉字切分用到hanzi这个包,这个包直接通过pip安装有问题,我通过下载zhon的github库,然后直接将其中hanzi.py这个文件拿到process_text.py的文件目录下, 这样能直接使用hanzi这个模块。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。