发现新词 | NLP之无监督方式构建词库(四)

ngram 词库


  这篇文章是对发现新词 | NLP之无监督方式构建词库(三)的性能优化。主要改动包括如下两个方面:






#! -*- coding: utf-8 -*-
import re
import struct
import os
import six
import codecs
import math
import logging

min_count = 2
order = 4
Entropy_Threshold = [0, 2, 4, 6]  # 互信息熵阈值
corpus_file = 'data/thucnews.corpus'  # 语料保存的文件名
vocab_file = 'data/thucnews.chars'  # 字符集保存的文件名
ngram_file = 'data/thucnews.ngrams'  # ngram集保存的文件名
output_file = 'data/thucnews_min.vocab'  # 最后导出的词表文件名
memory = 0.5  # memory是占用内存比例,理论上不能超过可用内存比例

logging.basicConfig(level=logging.INFO, format=u'%(asctime)s - %(levelname)s - %(message)s')

class Progress:
    iterator: 可迭代的对象;
    period: 显示进度的周期;
    steps: iterator可迭代的总步数,相当于len(iterator)

    def __init__(self, iterator, period=1, steps=None, desc=None):
        self.iterator = iterator
        self.period = period
        if hasattr(iterator, '__len__'):
            self.steps = len(iterator)
            self.steps = steps
        self.desc = desc
        if self.steps:
            self._format_ = u'%s/%s passed' % ('%s', self.steps)
            self._format_ = u'%s passed'
        if self.desc:
            self._format_ = self.desc + ' - ' + self._format_
        self.logger = logging.getLogger()

    def __iter__(self):
        for i, j in enumerate(self.iterator):
            if (i + 1) % self.period == 0:
                self.logger.info(self._format_ % (i + 1))
            yield j
def write_corpus(texts, filename):
    with codecs.open(filename, 'w', encoding='utf-8') as f:
        for s in Progress(texts, 10000, desc=u'exporting corpus'):
            s = ' '.join(s) + '\n'

# 语料生成器,并且初步预处理语料
def text_generator():
    d = codecs.open("data/file_corpus_min.txt", encoding='utf-8').read()
    # d = d.replace(u'\u3000', ' ').strip()
    yield re.sub(u'[^\u4e00-\u9fa50-9a-zA-Z ]+', '\n', d)

# 1.将语料转存为文本
write_corpus(text_generator(), corpus_file)
  这一步,就是调用kenlmcount_ngrams程序来统计ngram。所以,你需要自行编译好kenlmubuntu20.04 | 安装编译kenlm),并把它的count_ngrams放到跟word_discovery.py同一目录下。在使用前,需要赋予count_ngrams可执行权限。

(base) liujie@liujie-ThinkPad-L490:~/projects/PycharmProjects/word-discovery$ chmod +x count_ngrams 
  • 1


./count_ngrams -S 50% -o 4 --write_vocab_list output/test2.chars <output/test2.corpus >output/test2.ngrams
  • 1
  • -S:[ --memory ] arg (=80%) Sorting memory内存预占用量
  • -O:n:最高采用n-gram语法
  • --write_vocab_listpath1 <path2 >path3,分别是字符集文件、语料文件、ngram文件。
def count_ngrams(corpus_file, order, vocab_file, ngram_file, memory=0.5):
    done = os.system(
        './count_ngrams -o %s --memory=%d%% --write_vocab_list %s <%s >%s'
        % (order, memory * 100, vocab_file, corpus_file, ngram_file)
    if done != 0:
        raise ValueError('Failed to count ngrams by KenLM.')

# 2.用Kenlm统计ngram
count_ngrams(corpus_file, order, vocab_file, ngram_file, memory)
  基于互信息熵过滤ngram,[0, 2, 4, 6]是互信息的阈值,其中第一个0无意义,仅填充用,而2, 4, 6分别是2gram、3gram、4gram的互信息阈值,基本上单调递增比较好。

# TODO:5.过滤ngram
def filter_ngrams(ngrams, total, min_pmi=1):
    :param ngrams: [{one_ngram}, {two_ngram}, {three_ngram}, {four_ngram}]
    :param total: 总数
    :param min_pmi: 不同ngram的互信息熵
    order = len(ngrams)
    if hasattr(min_pmi, '__iter__'):
        min_pmi = list(min_pmi)
        min_pmi = [min_pmi] * order
    output_ngrams = set()
    total = float(total)
    for i in range(order - 1, 0, -1):
        for w, v in ngrams[i].items():
            pmi = min([
                total * v / (ngrams[j].get(w[:j + 1], total) * ngrams[i - j - 1].get(w[j + 1:], total))
                for j in range(i)
            if math.log(pmi) >= min_pmi[i]:
    return output_ngrams

# 4.过滤ngram
ngrams = filter_ngrams(ngrams.ngrams, ngrams.total, Entropy_Threshold)
# TODO:6.构建一个ngram的Trie树,然后用这个Trie树做一个基本的预分词,
#  只要一个片段出现在字典树中,这个片段就不切分
class SimpleTrie:

    def __init__(self):
        self.dic = {}
        self.end = True

    def add_word(self, word):
        _ = self.dic
        # 遍历字符串中的每一个字符
        for c in word:
            # 如果字符不在字典中
            if c not in _:
                _[c] = {}
            # 如果字符在字典中
            _ = _[c]
        _[self.end] = word

    def tokenize(self, sent):
        :param sent: 句子
        result = []
        start, end = 0, 1
        for i, c1 in enumerate(sent):
            _ = self.dic
            if i == end:
                result.append(sent[start: end])
                start, end = i, i + 1
            for j, c2 in enumerate(sent[i:]):
                if c2 in _:
                    _ = _[c2]
                    if self.end in _:
                        if i + j + 1 > end:
                            end = i + j + 1
        result.append(sent[start: end])
        return result

# 构建ngram的Trie树
ngtrie = SimpleTrie()
for w in Progress(ngrams, 10000, desc=u'build ngram trie'):
    _ = ngtrie.add_word(w)

# 得到候选词
candidates = {}
for t in Progress(text_generator()):
    for w in ngtrie.tokenize(t):  # 预分词
        candidates[w] = candidates.get(w, 0) + 1
# TODO:7.候选词过滤
# 如果它是一个小于等于n字的词,那么检测它在不在G中,不在就出局;
# 如果它是一个大于n字的词,那个检测它每个n字片段是不是在G中,只要有一个片段不在,就出局
def filter_vocab(candidates, ngrams, order):
    result = {}
    for i, j in candidates.items():
        if len(i) < 3:
            result[i] = j
        elif len(i) <= order and i in ngrams:
            result[i] = j
        elif len(i) > order:
            flag = True
            for k in range(len(i) + 1 - order):
                if i[k: k + order] not in ngrams:
                    flag = False
            if flag:
                result[i] = j
    return result

# 频数过滤
candidates = {i: j for i, j in candidates.items() if j >= min_count}
# 互信息过滤(回溯)
candidates = filter_vocab(candidates, ngrams, order)
# TODO:8.输出结果文件
with codecs.open(output_file, 'w', encoding='utf-8') as f:
    for i, j in sorted(candidates.items(), key=lambda s: -s[1]):
        if len(i) != 1:
            s = '%s %s\n' % (i, j)
