当前位置:   article > 正文

vocab生成

vocab

nlp或者asr任务中,通常都需要根据corpus创建词典,并将单词在词典中的位置进行映射。词典中词可以是单个字也可以是字符,需要根据具体应用进行处理,一般在端到端asr任务中,vocab中单位都是字。

首先我们拿到的标注文本一般是这样的

MDT_F2F_223_158.wav 反正,呃对还有两台平板,然后高中的时候学校不给带手机我们都借他的手机用,他的手机真的借遍了全班
MDT_F2F_223_159.wav 那会儿我有一次问他,我说我想借你的苹果四,他说哦,那台就
  • 1
  • 2

或者是json文件, 一个json文件是一段长音频的标注文件

    {
        "start_time": {
            "original": "0:00:00.988400"
        },
        "end_time": {
            "original": "0:00:08.122000"
        },
        "words": "呃诶你知道吗?我们家有一片桃园哎,就在河南漯河",
        "speaker": "SPK059",
        "location": "Room-1",
        "topic": "Gardening",
        "room_info": {
            "length(m)": 8.44,
            "width(m)": 3.55,
            "height(m)": 2.22,
            "T60": 0.36
        },
        "devices_type": {
            "Android": "Huawei mate9",
            "IOS": "Iphone X",
            "Recorder": "Newsmy RV51"
        },
        "session_id": "MDT_Conversation_060"
    },
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

这个时候需要先根据长音频标注json处理成每个短音频标注文本,然后再利用短音频标注文本做成vocab

举个例子,下面是已经划分好的训练集、测试集、验证集的数据

(notebook) root@ai-PowerEdge-R740:/opt/asr# ls /data/magic_data/
audio  sample_submission.csv  transcription
(notebook) root@ai-PowerEdge-R740:/opt/asr# ls /data/magic_data/audio/
dev  test  train
(notebook) root@ai-PowerEdge-R740:/opt/asr# ls /data/magic_data/transcription/
dev  test_no_ref_noise  train
(notebook) root@ai-PowerEdge-R740:/opt/asr# ls /data/magic_data/audio/train/
MDT_F2F_001.wav  MDT_F2F_044.wav  MDT_F2F_087.wav  MDT_F2F_130.wav  MDT_F2F_173.wav  
(notebook) root@ai-PowerEdge-R740:/opt/asr# ls /data/magic_data/transcription/train/
MDT_F2F_001.json  MDT_F2F_044.json  MDT_F2F_087.json  MDT_F2F_130.json  MDT_F2F_173.json  
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

根据json标注文件,将长音频文件处理为一句话音频和标注,并处理成这种格式

# 将音频切分保存到wav文件夹中,并生成wav.scp和transcripts文件
# wav.scp文件格式:
# id1 path1
# id2 path2
# transcripts文件格式:
# id1 你好
# id2 今天天气怎么样
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

参考代码为prepare_data.py

import os
import json

data_rootdir = '/data/magic_data'  # 指定解压后数据的根目录

audiodir = os.path.join(data_rootdir, 'audio')
trans_dir = os.path.join(data_rootdir, 'transcription')

# 音频切分
def segment_wav(src_wav, tgt_wav, start_time, end_time):
    span = end_time - start_time
    cmds = 'sox %s %s trim %f %f' % (src_wav, tgt_wav, start_time, span)
    os.system(cmds)

# 将时间格式转化为秒为单位
def time2sec(t):
    h,m,s = t.strip().split(":")
    return float(h) * 3600 + float(m) * 60 + float(s)

# 读取json文件内容
def load_json(json_file):
    with open(json_file, 'r') as f:
        lines = f.readlines()
        json_str = ''.join(lines).replace('\n', '').replace(' ', '').replace(',}', '}')
        return json.loads(json_str)

# 训练集和开发集数据处理
for name in ['train', 'dev']:   
    save_dir = os.path.join('data', name, 'wav')
    if not os.path.exists(save_dir):
        os.makedirs(save_dir)
    seg_wav_list = []
    sub_audio_dir = os.path.join(audiodir, name)
    for wav in os.listdir(sub_audio_dir):

        if wav[0] == '.': 
            continue # 跳过隐藏文件   '._MDT_F2F_209.wav'
        #把wav文件转换为json文件
        if name == 'dev':     #  'MDT_Conversation_045_Android.wav'  MDT_Conversation_045.json  #Android.wav
            parts = wav.split('_')
            jf = '_'.join(parts[:-1])+'.json' #
            suffix = parts[-1]  
        else:
            jf = wav[:-4]+'.json'   # "MDT_F2F_100.wav" , 'MDT_F2F_100.json'
    
        utt_list = load_json(os.path.join(trans_dir, name, jf))
        for i in range(len(utt_list)):
            utt_info = utt_list[i] #一段语音
            session_id = utt_info['session_id']
            if name == 'dev':
                tgt_id = session_id + '_' + str(i) + '_' + suffix
            else:
                tgt_id = session_id + '_' + str(i) + '.wav'

            # 句子切分
            start_time = time2sec(utt_info['start_time']['original'])
            end_time = time2sec(utt_info['end_time']['original'])

            src_wav = os.path.join(sub_audio_dir, wav)
            tgt_wav = os.path.join(save_dir, tgt_id)
            segment_wav(src_wav, tgt_wav, start_time, end_time)
            seg_wav_list.append((tgt_id, tgt_wav, utt_info['words'])) #[('MDT_F2F_223_3.wav', './data/train/wav/MDT_F2F_223_3.wav', '诶,那你考研想考哪儿')]

    with open(os.path.join('./data', name, 'wav.scp'), 'w') as ww:
        with open(os.path.join('./data', name, 'transcrpts.txt'), 'w', encoding='utf-8') as tw:
            for uttid, wavdir, text in seg_wav_list:
                ww.write(uttid+' '+wavdir+'\n')
                tw.write(uttid+' '+text+'\n')

    print('prepare %s dataset done!' % name)

# 测试集数据处理
save_dir = os.path.join('./data', 'test', 'wav')
if not os.path.exists(save_dir):
    os.makedirs(save_dir)

seg_wav_list = []
sub_audio_dir = os.path.join(audiodir, 'test')
for wav in os.listdir(sub_audio_dir):

    if wav[0] == '.' or 'IOS' not in wav:
        continue # 跳过隐藏文件和非IOS的音频文件

    jf = '_'.join(wav.split('_')[:-1])+'.json'
    
    utt_list = load_json(os.path.join(trans_dir, 'test_no_ref_noise', jf))
    for i in range(len(utt_list)):
        utt_info = utt_list[i]
        session_id = utt_info['session_id']
        uttid = utt_info['uttid']
        if 'words' in utt_info: continue # 如果句子已经标注,则跳过
        # 句子切分
        start_time = time2sec(utt_info['start_time'])
        end_time = time2sec(utt_info['end_time'])

        tgt_id = uttid + '.wav'
        src_wav = os.path.join(sub_audio_dir, wav)
        tgt_wav = os.path.join(save_dir, tgt_id)
        segment_wav(src_wav, tgt_wav, start_time, end_time)
        seg_wav_list.append((uttid, tgt_wav))
if not os.path.exists(os.path.join('data', 'test')):
    os.makedirs(os.path.join('data', 'test'))
with open(os.path.join('data', 'test', 'wav.scp'), 'w') as ww:
    for uttid, wavdir in seg_wav_list:
        ww.write(uttid+' '+wavdir+'\n')
print('prepare test dataset done!')
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106

这段代码处理后,得到如下文件。
在这里插入图片描述
wav.scp 参考内容

MDT_Conversation_060_0_Android.wav data/dev/wav/MDT_Conversation_060_0_Android.wav
MDT_Conversation_060_1_Android.wav data/dev/wav/MDT_Conversation_060_1_Android.wav
  • 1
  • 2

transcrpts.txt 参考内容

MDT_Conversation_060_0_Android.wav 呃诶你知道吗?我们家有一片桃园哎,就在河南漯河
MDT_Conversation_060_1_Android.wav 哦桃园什么桃园,是种桃子的园子吗
  • 1
  • 2

然后将transcripts.txt文件中句子切分为字,并去掉特殊符号
参考代码 process_text.py

import os
import string
import hanzi
def text_normlization(seq):
    new_seq = []
    for c in seq:
        if c == '+':
            new_seq.append(c) # 文档中有加号,所以单独处理,避免删除
        elif c in string.punctuation or c in hanzi.punctuation:
            continue # 删除全部的半角标点和全角标点
        else:
            if c.encode('UTF-8').isalpha(): c = c.lower() # 大写字母转小写
            new_seq.append(c)
    return ' '.join(new_seq)


for name in ['train', 'dev']:
    with open(os.path.join('./data', name, 'transcrpts.txt'), 'r') as tr:
        with open(os.path.join('./data', name, 'text'), 'w') as tw:
            for line in tr:
                parts = line.split()
                uttid = parts[0]
                seqs = ''.join(parts[1:])
                if '[' in seqs: continue # 直接跳过包含特殊标记的句子
                seqs = text_normlization(seqs)
                tw.write(uttid +' '+seqs+'\n')
    print('Normlize %s TEXT!' % name)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

现在transcripts.txt变换为text, 测试集不需要做处理
在这里插入图片描述
读取train和test下text创建词典
create_vocab.py

import os

vocab_dict = {} #词频统计

for name in ['train', 'dev']:
    with open(os.path.join('./data', name, 'text'), 'r', encoding='utf-8') as fr:
        for line in fr:
            chars = line.strip().split()[1:]
            for c in chars:
                if c in vocab_dict:
                    vocab_dict[c] += 1
                else:
                    vocab_dict[c] = 1

vocab_list = sorted(vocab_dict.items(), key=lambda x: x[1], reverse=True)  #[('是', 68793),('的', 64165)]
vocab = {'<PAD>': 0, '<BOS>': 1, '<EOS>': 2, '<UNK>': 3} # word_id
for i in range(len(vocab_list)):
    c = vocab_list[i][0]
    vocab[c] = i + 4

print('There are %d units in Vocabulary!' % len(vocab))
with open(os.path.join('./data', 'vocab'), 'w', encoding='utf-8') as fw:
    for c, id in vocab.items():
        fw.write(c+' '+ str(id) +'\n')
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

vocab加了四个特殊符号,保存为data目录下
在这里插入图片描述

综上就是词典的制作过程,其中还顺便记录了音频的切分过程,其中process_text.py中汉字切分用到hanzi这个包,这个包直接通过pip安装有问题,我通过下载zhon的github库,然后直接将其中hanzi.py这个文件拿到process_text.py的文件目录下, 这样能直接使用hanzi这个模块。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/378265
推荐阅读
  

闽ICP备14008679号