赞
踩
这篇文章是对发现新词 | NLP之无监督方式构建词库(三)的性能优化。主要改动包括如下两个方面:
1、使用了语言模型工具kenlm的count_ngrams程序来统计ngram。由于kenlm是用C++写的,速度有保证,并且它还做了优化,所以对内存很友好。
2、在第二次遍历词库以得到候选词的时候,使用了Trie树结构来加速搜索字符串是否出现过某个ngram。Trie树或者其变种基本上是所有基于词典的分词工具的标配,就是因为它可以加快搜索字符串中是否出现过词典中的词。
本文针对一万多条商品名称语料来进行实验,数据格式如下:
代码部分参见:更好更快的新词发现。
首先,定义了一些变量与一个显示进度的类。
#! -*- coding: utf-8 -*- import re import struct import os import six import codecs import math import logging min_count = 2 order = 4 Entropy_Threshold = [0, 2, 4, 6] # 互信息熵阈值 corpus_file = 'data/thucnews.corpus' # 语料保存的文件名 vocab_file = 'data/thucnews.chars' # 字符集保存的文件名 ngram_file = 'data/thucnews.ngrams' # ngram集保存的文件名 output_file = 'data/thucnews_min.vocab' # 最后导出的词表文件名 memory = 0.5 # memory是占用内存比例,理论上不能超过可用内存比例 logging.basicConfig(level=logging.INFO, format=u'%(asctime)s - %(levelname)s - %(message)s') class Progress: """显示进度,自己简单封装,比tqdm更可控一些 iterator: 可迭代的对象; period: 显示进度的周期; steps: iterator可迭代的总步数,相当于len(iterator) """ def __init__(self, iterator, period=1, steps=None, desc=None): self.iterator = iterator self.period = period if hasattr(iterator, '__len__'): self.steps = len(iterator) else: self.steps = steps self.desc = desc if self.steps: self._format_ = u'%s/%s passed' % ('%s', self.steps) else: self._format_ = u'%s passed' if self.desc: self._format_ = self.desc + ' - ' + self._format_ self.logger = logging.getLogger() def __iter__(self): for i, j in enumerate(self.iterator): if (i + 1) % self.period == 0: self.logger.info(self._format_ % (i + 1)) yield j
这一步的主要操作是将语料间的词与词之间用空格隔开。这样做的原因是kenlm
需要一个以空格分词的、纯文本格式的语料作为输入。
def write_corpus(texts, filename): """将语料写到文件中,词与词(字与字)之间用空格隔开 """ with codecs.open(filename, 'w', encoding='utf-8') as f: for s in Progress(texts, 10000, desc=u'exporting corpus'): s = ' '.join(s) + '\n' f.write(s) # 语料生成器,并且初步预处理语料 def text_generator(): d = codecs.open("data/file_corpus_min.txt", encoding='utf-8').read() # d = d.replace(u'\u3000', ' ').strip() yield re.sub(u'[^\u4e00-\u9fa50-9a-zA-Z ]+', '\n', d) # 1.将语料转存为文本 write_corpus(text_generator(), corpus_file)
生成的thucnews.corpus
文件格式如下:
这一步,就是调用kenlm
的count_ngrams
程序来统计ngram
。所以,你需要自行编译好kenlm
(ubuntu20.04 | 安装编译kenlm),并把它的count_ngrams
放到跟word_discovery.py
同一目录下。在使用前,需要赋予count_ngrams
可执行权限。
(base) liujie@liujie-ThinkPad-L490:~/projects/PycharmProjects/word-discovery$ chmod +x count_ngrams
执行示例如下:
./count_ngrams -S 50% -o 4 --write_vocab_list output/test2.chars <output/test2.corpus >output/test2.ngrams
-S
:[ --memory ] arg (=80%) Sorting memory内存预占用量-O
:n:最高采用n-gram语法--write_vocab_list
:path1 <path2 >path3
,分别是字符集文件、语料文件、ngram文件。def count_ngrams(corpus_file, order, vocab_file, ngram_file, memory=0.5):
"""通过os.system调用Kenlm的count_ngrams来统计频数
其中,memory是占用内存比例,理论上不能超过可用内存比例。
"""
done = os.system(
'./count_ngrams -o %s --memory=%d%% --write_vocab_list %s <%s >%s'
% (order, memory * 100, vocab_file, corpus_file, ngram_file)
)
if done != 0:
raise ValueError('Failed to count ngrams by KenLM.')
# 2.用Kenlm统计ngram
count_ngrams(corpus_file, order, vocab_file, ngram_file, memory)
生成的文件都是二进制文件,后续需要进行转换。
这一步就是转换二进制文件格式并加载到内存中。
class KenlmNgrams: """加载Kenlm的ngram统计结果 vocab_file: Kenlm统计出来的词(字)表; ngram_file: Kenlm统计出来的ngram表; order: 统计ngram时设置的n,必须跟ngram_file对应; min_count: 自行设置的截断频数。 """ def __init__(self, vocab_file, ngram_file, order, min_count): self.vocab_file = vocab_file self.ngram_file = ngram_file self.order = order self.min_count = min_count self.read_chars() # 读取词(字)表 self.read_ngrams() # 读取ngram表 def read_chars(self): f = open(self.vocab_file) chars = f.read() f.close() chars = chars.split('\x00') # six.PY2 返回一个表示当前运行环境是否为python2的boolean值 self.chars = [i.decode('utf-8') if six.PY2 else i for i in chars] def read_ngrams(self): """读取思路参考https://github.com/kpu/kenlm/issues/201 """ self.ngrams = [{} for _ in range(self.order)] self.total = 0 size_per_item = self.order * 4 + 8 def ngrams(): with open(self.ngram_file, 'rb') as f: while True: s = f.read(size_per_item) if len(s) == size_per_item: n = self.unpack('l', s[-8:]) yield s, n else: break for s, n in Progress(ngrams(), 100000, desc=u'loading ngrams'): if n >= self.min_count: self.total += n c = [self.unpack('i', s[j * 4: (j + 1) * 4]) for j in range(self.order)] c = ''.join([self.chars[j] for j in c if j > 2]) for j in range(len(c)): self.ngrams[j][c[:j + 1]] = self.ngrams[j].get(c[:j + 1], 0) + n def unpack(self, t, s): # struct.unpack:把bytes变成相应的Python数据类型 # t=l:表示long转为integer # t=i:表示int转为integer return struct.unpack(t, s)[0] # 3.加载ngram ngrams = KenlmNgrams(vocab_file, ngram_file, order, min_count)
基于互信息熵过滤ngram,[0, 2, 4, 6]
是互信息的阈值,其中第一个0无意义,仅填充用,而2, 4, 6分别是2gram、3gram、4gram的互信息阈值,基本上单调递增比较好。
# TODO:5.过滤ngram def filter_ngrams(ngrams, total, min_pmi=1): """ 通过互信息过滤ngrams,只保留“结实”的ngram。 :param ngrams: [{one_ngram}, {two_ngram}, {three_ngram}, {four_ngram}] :param total: 总数 :param min_pmi: 不同ngram的互信息熵 :return: """ order = len(ngrams) if hasattr(min_pmi, '__iter__'): min_pmi = list(min_pmi) else: min_pmi = [min_pmi] * order output_ngrams = set() total = float(total) for i in range(order - 1, 0, -1): for w, v in ngrams[i].items(): pmi = min([ total * v / (ngrams[j].get(w[:j + 1], total) * ngrams[i - j - 1].get(w[j + 1:], total)) for j in range(i) ]) if math.log(pmi) >= min_pmi[i]: output_ngrams.add(w) return output_ngrams # 4.过滤ngram ngrams = filter_ngrams(ngrams.ngrams, ngrams.total, Entropy_Threshold)
首先构建一个ngram
的Trie
树,然后用这个Trie
树就可以做一个基本的“预分词”。在第二次遍历词库以得到候选词的时候,使用了Trie树结构来加速搜索字符串是否出现过某个ngram。Trie树或者其变种基本上是所有基于词典的分词工具的标配,就是因为它可以加快搜索字符串中是否出现过词典中的词。
# TODO:6.构建一个ngram的Trie树,然后用这个Trie树做一个基本的预分词, # 只要一个片段出现在字典树中,这个片段就不切分 class SimpleTrie: """ 通过Trie树结构,来搜索ngrams组成的连续片段 """ def __init__(self): self.dic = {} self.end = True def add_word(self, word): _ = self.dic # 遍历字符串中的每一个字符 for c in word: # 如果字符不在字典中 if c not in _: _[c] = {} # 如果字符在字典中 _ = _[c] _[self.end] = word def tokenize(self, sent): """ 通过最长联接的方式来对句子进行分词 只要一个片段出现在字典树中,这个片段就不切分 :param sent: 句子 :return: """ result = [] start, end = 0, 1 for i, c1 in enumerate(sent): _ = self.dic if i == end: result.append(sent[start: end]) start, end = i, i + 1 for j, c2 in enumerate(sent[i:]): if c2 in _: _ = _[c2] if self.end in _: if i + j + 1 > end: end = i + j + 1 else: break result.append(sent[start: end]) return result # 构建ngram的Trie树 ngtrie = SimpleTrie() for w in Progress(ngrams, 10000, desc=u'build ngram trie'): _ = ngtrie.add_word(w) # 得到候选词 candidates = {} for t in Progress(text_generator()): for w in ngtrie.tokenize(t): # 预分词 candidates[w] = candidates.get(w, 0) + 1
# TODO:7.候选词过滤 # 如果它是一个小于等于n字的词,那么检测它在不在G中,不在就出局; # 如果它是一个大于n字的词,那个检测它每个n字片段是不是在G中,只要有一个片段不在,就出局 def filter_vocab(candidates, ngrams, order): """通过与ngrams对比,排除可能出来的不牢固的词汇(回溯) """ result = {} for i, j in candidates.items(): if len(i) < 3: result[i] = j elif len(i) <= order and i in ngrams: result[i] = j elif len(i) > order: flag = True for k in range(len(i) + 1 - order): if i[k: k + order] not in ngrams: flag = False if flag: result[i] = j return result # 频数过滤 candidates = {i: j for i, j in candidates.items() if j >= min_count} # 互信息过滤(回溯) candidates = filter_vocab(candidates, ngrams, order)
# TODO:8.输出结果文件
with codecs.open(output_file, 'w', encoding='utf-8') as f:
for i, j in sorted(candidates.items(), key=lambda s: -s[1]):
if len(i) != 1:
s = '%s %s\n' % (i, j)
f.write(s)
部分效果展示:
参考:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。