赞
踩
- import numpy as np
- from hmmlearn import hmm
- import math
- from hmmlearn.hmm import GaussianHMM, GMMHMM
-
- def getModel():
- # 任务1:创建 GaussianHMM 模型,用变量名model1标记
- # 创建 GMMHMM 模型,用变量名model2标记
- model1 = GaussianHMM()
- model2 = GMMHMM()
-
- # 任务2:根据提示设置 model1 的参数
- # 设置最大迭代次数为10
- model1.n_iter = 10
- # 设置隐藏状态数目为20
- model1.n_components = 20
-
- # 任务3:根据提示设置 model2 的参数
- # 设置最小方差为 0.5
- model2.min_covar = 0.5
- # 设置随机数种子为 8
- model2.random_state = 8
-
- return model1, model2
本关任务:编写一个能够基于 Python 创建出隐马尔可夫模型的小程序。
为了完成本关任务,你需要掌握:
1.HMM 模型的思想与原理;
2.如何构建 HMM 模型。
隐马尔可夫模型(Hidden Markov Model,HMM)是一个统计模型,它用来描述一个含有隐含未知参数的马尔可夫过程。其难点是从可观察的参数中确定该过程的隐含参数。隐马尔可夫模型(HMM)可以用五个元素来描述,包括2个状态集合和3个概率矩阵:隐含状态 S
、可观测状态O
、初始状态概率矩阵π
、隐含状态转移概率矩阵 A
、观测状态转移概率矩阵 B
。
图 1 HMM 模型
如图1所示,有两个行数据,这两列数据分布的特点,第一行是X
行,表示状态序列,第二行是O
行,表示观测序列,X
行的某一个状态依赖于前一个状态,X
行的每一个都指向O
行其中的一个。因此,HMM 模型中涉及到以下定义:
设Q是所有可能的状态的集合,V是所有可能的观测的集合。Q=q1,q2,…,qN,V=v1,v2,…,vM,其中,N是可能的状态数,M是可能的观测数。
I是长度为T的状态序列,O是对应的观测序列。 I=(i1,i2,…,iT),O=(o1,o2,…,oT)
A是状态转移矩阵:A=[aij]N×N i=1,2,…,N;j=1,2,…,N
其中,在时刻t,处于qi 状态的条件下在时刻t+1转移到状态qj 的概率: aij=P(it+1=qj∣it=qi)
B是观测概率矩阵:B=[bj(k)]N×M k=1,2,…,M;j=1,2,…,N
其中,在时刻t处于状态qj 的条件下生成观测vk 的概率: bj(k)=P(ot=vk∣it=qj)
π是初始状态概率向量:π=(πi) 其中,πi=P(i1=qi)
隐马尔科夫模型由初始状态概率向量π、状态转移概率矩阵A和观测概率矩阵B决定。π和A决定状态序列,B决定观测序列。因此,隐马尔科夫模型λ可以由三元符号表示,即:λ=(A,B,π)。A,B,π称为隐马尔科夫模型的三要素。
在Python
中,我们可以直接调用第三方库hmmlearn
来创建 HMM 模型,其中提供了三种 HMM 的实现方式:GaussianHMM(观测状态连续型且符合高斯分布)、GMMHMM(观测状态连续型且符合混合高斯分布) 、MultinomialHMM(观测状态离散型)。
使用示例:
model = hmm.MultinomialHMM(n_components=n_states, n_iter=1000, tol=0.01)
模型参数:
n_components
:隐藏状态数目covariance_type
:协方差矩阵的类型min_covar
:最小方差,防止过拟合startprob_prior
:初始概率向量transmat_prior
:转移状态矩阵means_prior, means_weight
:均值covars_prior, covars_weight
:协方差algorithm
:所用算法random_state
:随机数种子n_iter
:最大迭代次数tol
:停机阈值verbose
:是否打印日志以观察是否已收敛params
:决定哪些参数在迭代中更新init_params
:决定哪些参数在迭代前先初始化根据提示,在右侧编辑器补充代码,根据提示完成对隐马尔可夫模型的创建,并指定相关的参数。
平台会对你编写的代码进行测试:
测试输入:无
;
预期输出:
您创建的 GaussianHMM 模型中,最大迭代次数为10,隐藏状态数目为20
您创建的 GMMHMM 模型中,最小方差为0.5,随机数种子为8
模型按要求构建成功,任务完成!
开始你的任务吧,祝你成功!
- import numpy as np
-
- def Forward(transition_probability, emission_probability, pi, obs_seq):
- """
- :param transition_probability: 状态转移矩阵
- :param emission_probability: 发射矩阵
- :param pi: 初始状态概率
- :param obs_seq: 观测状态序列
- :return: 返回结果
- """
- transition_probability = np.array(transition_probability)
- emission_probability = np.array(emission_probability)
- pi = np.array(pi)
- Row = transition_probability.shape[0]
- Col = len(obs_seq)
-
- F = np.zeros((Row, Col))
- F[:, 0] = pi * np.transpose(emission_probability[:, obs_seq[0]])
- for t in range(1, Col):
- for n in range(Row):
- F[n, t] = np.dot(F[:, t - 1], transition_probability[:, n]) * emission_probability[n, obs_seq[t]]
-
- return F
-
- def Backward(transition_probability, emission_probability, pi, obs_seq):
- """
- :param transition_probability: 状态转移矩阵
- :param emission_probability: 发射矩阵
- :param pi: 初始状态概率
- :param obs_seq: 观测状态序列
- :return: 返回结果
- """
- transition_probability = np.array(transition_probability)
- emission_probability = np.array(emission_probability)
- pi = np.array(pi)
-
- Row = transition_probability.shape[0]
- Col = len(obs_seq)
- F = np.zeros((Row, Col))
- F[:, (Col - 1):] = 1
- for t in reversed(range(Col - 1)):
- for n in range(Row):
- F[n, t] = np.sum(F[:, t + 1] * transition_probability[n, :] * emission_probability[:, obs_seq[t + 1]])
-
- return F
本关任务:编写一个隐马尔可夫模型概率计算的前向算法和后向算法的小程序。
为了完成本关任务,你需要掌握:
1.前向算法的原理和计算方法;
2.后向算法的原理和计算方法。
隐马尔可夫模型(HMM)在实际应用中,一般会遇上三种问题:
1、概率计算问题 给定模型λ=(A,B,π)和观测序列O=o1,o2,…,oT,计算在模型λ下观测序列O出现的概率P(O∣λ)。
2、学习问题
已知观测序列O=o1,o2,…,oT,估计模型λ=(A,B,π),使P(O∣λ)最大。即用极大似然法的方法估计参数。
3、预测问题(也称为解码问题):
已知观测序列O=o1,o2,…,oT 和模型λ=(A,B,π),求给定观测序列条件概率P(I∣O)最大的状态序列I=(i1,i2,…,iT),即给定观测序列,求最有可能的对应的状态序列。
概率计算问题计算的是在模型λ下观测序列O出现的概率P(O∣λ)。前向概率指的是:给定模型λ,定义到时刻t部分观测序列为o1,o2,…,ot 且状态为qi 的概率为前向概率。记作:αt(i)=P(o1,o2,…,ot,it=qi∣λ)
图 1 前向算法
如图1所示,前向概率的计算方式为:
初值:(t=1),α1(i)=P(o1,i1=q1∣λ)=πibi(o1),i=1,2,…,N
进行递推计算:
终结:P(O∣λ)=∑i=1NαT(i)
前向算法使用前向概率的概念,记录每个时间下的前向概率,使得在递推计算下一个前向概率时,只需要上一个时间点的所有前向概率即可。原理上也是用空间换时间。这样的时间复杂度是O(N2T)。
代码示例:
F = np.zeros((Row, Col)) # 最后要返回的就是F,就是我们公式中的alpha
F[:, 0] = pi * np.transpose(emission_probability[:, obs_seq[0]]) # 这是初始化求第一列,就是初始的概率*各自的发射概率
print(F[:, 0])
for t in range(1, len(obs_seq)): # 这里相当于填矩阵的元素值
for n in range(Row): # n是代表隐藏状态的
F[n, t] = np.dot(F[:, t - 1], trainsition_probability[:, n]) * emission_probability[
n, obs_seq[t]] # 对应于公式,前面是对应相乘
后向概率与前向概率非常类似,也是基于动态规划的思想。
图 2 后向算法
如图2所示,其计算流程为:
初值:βT(i)=1,1≤i≤N
进行递推计算:
代码示例:
F = np.zeros((Row, Col))
F[:, (Col - 1):] = 1 # 最后的每一个元素赋值为1
for t in reversed(range(Col - 1)):
for n in range(Row):
F[n, t] = np.sum(F[:, t + 1] * trainsition_probability[n, :] * emission_probability[:, obs_seq[t + 1]])
根据提示,在右侧编辑器补充代码,完成隐马尔可夫模型中前向算法和后向算法的代码编写。
平台会对你编写的代码进行测试:
测试输入:无
;
预期输出:
开始使用前向算法进行计算。
开始使用后向算法进行计算。
算法补全正确,任务完成!
开始你的任务吧,祝你成功!
- def Viterbi(obs, states, start_p, trans_p, emit_p):
- V = [{}]
- path = {}
-
- # Initialize base cases (t == 0)
- for y in states:
- V[0][y] = start_p[y] * emit_p[y][obs[0]]
- path[y] = [y]
-
- # Run Viterbi for t > 0
- for t in range(1, len(obs)):
- V.append({})
- newpath = {}
-
- for curr_state in states:
- # Check all possible states and select the one with the max probability.
- (prob, state) = max(
- (V[t-1][prev_state] * trans_p[prev_state][curr_state] * emit_p[curr_state][obs[t]], prev_state)
- for prev_state in states
- )
- V[t][curr_state] = prob
- newpath[curr_state] = path[state] + [curr_state]
-
- # Don't need to remember the old paths
- path = newpath
-
- (prob, state) = max((V[t][y], y) for y in states)
- return path[state]
-
- # Main section of the code
- states = ('Healthy', 'Fever')
- observations = ('normal', 'cold', 'dizzy')
- start_probability = {'Healthy': 0.6, 'Fever': 0.4}
- transition_probability = {
- 'Healthy': {'Healthy': 0.7, 'Fever': 0.3},
- 'Fever': {'Healthy': 0.4, 'Fever': 0.6},
- }
- emission_probability = {
- 'Healthy': {'normal': 0.5, 'cold': 0.4, 'dizzy': 0.1},
- 'Fever': {'normal': 0.1, 'cold': 0.3, 'dizzy': 0.6},
- }
-
- if __name__ == '__main__':
- obs = ['normal', 'cold', 'dizzy']
- print("开始基于 Viterbi 算法进行学习。")
- print("预测得出的诊断结果为:")
- print(Viterbi(obs, states, start_probability, transition_probability, emission_probability))
本关任务:编写一个能基于 Viterbi 学习算法的 HMM 小程序用以解决诊断问题。
为了完成本关任务,你需要掌握:
1.Viterbi 算法思想;
2.Viterbi 算法原理。
维特比算法(Viterbi 算法)是一个特殊但应用最广的动态规划算法。利用动态规划,可以解决任何一个图中的最短路径问题。而维特比算法是针对一个特殊的图-篱笆网了(Lattice)的有向图最短路径问题而提出来的。它之所以重要,是因为凡是使用隐马尔科夫模型描述的问题都可以用它解码,包括当前的数字通信、语音识别、机器翻译、拼音转汉字、分词等。
对于一个特殊的隐马尔科夫模型(HMM)及一个相应的观察序列,我们常常希望能找到生成此序列最可能的隐藏状态序列。假设观测序列的长度为 m,隐含状态个数为 n。则有图1的隐含状态转移图(n=3)。
图 1 隐含状态转移图
假如采用穷举法,穷举出所有可能的状态序列再比较他们的概率值,则时间复杂度是O(nm),显然这样的时间复杂度是无法接受的,而通过维特比算法能把时间复杂度降到O(m∗n2)。
根据图1,记last_state
为上一个观测现象对应的各个隐含状态的概率,curr_state
为现在的观测现象对应的各个隐含状态的概率。则求解curr_state
实际上只依赖于last_state
。而他们的依赖关系可由代码表示为:
for cs in states:
curr_state[cs] = max(last_state[ls] *
transition_probability[ls][cs] *
emission_probability[cs][observation]
for ls in states)
计算过程利用了转移概率transition_probability
和发射概率emission_probability
,选出那个最有可能产生当前状态cs
的上一状态ls
。同时要为每个隐含状态维护一个路径 path
,path[s]
表示到达状态s
前的最优状态序列。通过前面的计算选出那个最有可能产生当前状态cs
的上一状态ls
后,往path[cs]
中插入 ls 。则依照这种方法遍历完所有的观测序列后,只需要选择curr_state
中概率值最大的那个state
作为最终的隐含状态,同时从path
中取出 path[state]
作为该最终隐含状态前面的状态序列。
因此,观测序列只需要遍历一遍,时间复杂度为O(m),而每次要计算当前各个状态最可能的前一状态,时间复杂度为O(n2),因此总体的时间复杂度为 O(m∗n2)。
问题描述:想象一个乡村诊所。村民有着非常理想化的特性,要么健康要么发烧。他们只有问诊所的医生的才能知道是否发烧。聪明的医生通过询问病人的感觉诊断他们是否发烧。村民只回答他们感觉正常、头晕或冷。 假设一个病人每天来到诊所并告诉医生他的感觉。医生相信病人的健康状况如同一个离散马尔可夫链。
病人的状态有两种“健康”和“发烧”,但医生不能直接观察到,这意味着状态对他是“隐含”的。每天病人会告诉医生自己有以下几种由他的健康状态决定的感觉的一种:正常、冷或头晕。这些是观察结果。整个系统为一个隐马尔可夫模型(HMM)。医生知道村民的总体健康状况,还知道发烧和没发烧的病人通常会抱怨什么症状。换句话说,医生知道隐马尔可夫模型的参数。
根据提示,在右侧编辑器补充代码,根据已知的 HMM 的各大参数,补充完全 Viterbi 学习算法的代码内容。
平台会对你编写的代码进行测试:
测试输入:无
;
预期输出:
开始基于 Viterbi 算法进行学习。
预测得出的诊断结果为:
['Healthy', 'Healthy', 'Fever']
开始你的任务吧,祝你成功!
- import sys
- import math
-
- def loadData():
- wordDict = {}
- tagDict = {}
-
- with open("/data/workspace/myshixun/src/step4/wiki-en-train.norm_pos", 'r') as f:
- for line in f:
- for wordtag in line.strip().split(' '):
- temp = wordtag.split('_')
- if len(temp) != 2:
- continue
-
- # 任务:补全代码,完成对指定数据集的读取
- word, tag = temp
- if word not in wordDict:
- wordDict[word] = 1
- else:
- wordDict[word] += 1
-
- if tag not in tagDict:
- tagDict[tag] = 1
- else:
- tagDict[tag] += 1
-
- return wordDict, tagDict
-
- # 调用 loadData 函数并打印结果
- wordDict, tagDict = loadData()
- print("经过统计,数据集中的词汇量共有{}个词。".format(len(wordDict)))
- print("经过统计,数据集中共有{}个词性标注。".format(len(tagDict)))
本关任务:编写一个能加载词性标注数据集的小程序。
为了完成本关任务,你需要掌握:如何加载词性标注的数据集。
在语言学上,词性(Par-Of-Speech, Pos)指的是单词的语法分类,也称为词类。同一个类别的词语具有相似的语法性质,所有词性的集合称为词性标注集。不同的语料库采用了不同的词性标注集,一般都含有形容词、动词、名词等常见词性。如图1所示为常见的几种词性标志及其对应含义。
图 1 词性标注表
在接下来的关卡中,我们将基于隐马尔可夫模型(HMM)用于词性标注任务中。
在本实训中,我们所使用的数据集来自与维基百科中关于英文的词性标注数据,共计1301行数据,每行数据包含若干个词性标注,每一个数据都以词_词性
的形式进行表示。如:other_JJ
、time_NN
。
首先我们加载数据,数据的路径为/data/workspace/myshixun/src/step4/wiki-en-train.norm_pos
。代码示例:
with open("/data/workspace/myshixun/src/step4/wiki-en-train.norm_pos", 'r') as f:
for line in f:
for wordtag in line.strip().split(' '):
temp = wordtag.split('_')
if len(temp) != 2:
continue
word, tag = wordtag.split('_')
print(word+" "+tag)
实际输出:
Natural JJ
language NN
processing NN
-LRB- -LRB-
NLP NN
-RRB- -RRB-
..........
对数据集中的词汇量和词性标注个数进行统计,可以得知,书记中共有5231个单词,42个词性标注。代码示例:
wordDict = {}
tagDict = {}
with open("/data/workspace/myshixun/src/step4/wiki-en-train.norm_pos", 'r') as f:
for line in f:
for wordtag in line.strip().split(' '):
temp = wordtag.split('_')
if len(temp) != 2:
continue
word, tag = wordtag.split('_')
if wordDict.get(word) == None:
wordDict[word] = 1
else:
wordDict[word] = wordDict[word] + 1
if tagDict.get(tag) == None:
tagDict[tag] = 1
else:
tagDict[tag] = tagDict[tag] + 1
print("经过统计,数据集中的词汇量共有{}个词。".format(len(wordDict)))
print("经过统计,数据集中共有{}个词性标注。".format(len(tagDict)))
实际输出:
经过统计,数据集中的词汇量共有5231个词。
经过统计,数据集中共有42个词性标注。
根据提示,在右侧编辑器补充代码,加载给定的词性标注数据集。
平台会对你编写的代码进行测试:
测试输入:无
; 预期输出:
经过统计,数据集中的词汇量共有5233个词。
经过统计,数据集中共有42个词性标注。
数据加载任务完成!
开始你的任务吧,祝你成功!
- import io
- from collections import defaultdict
-
- # 分隔符
- SOS = '<s>'
- EOS = '</s>'
-
- def train_hmm(training_file, model_file):
- emit = defaultdict(int)
- transition = defaultdict(int)
- context = defaultdict(int)
- with open(training_file, 'r') as f:
- for line in f:
- previous = SOS # Make the sentence start.
- context[previous] += 1
- for wordtag in line.strip().split(' '):
- temp = wordtag.split('_')
- if len(temp) != 2:
- continue
- word, tag = wordtag.split('_')
- # 计算转移矩阵
- transition['{} {}'.format(previous, tag)] += 1
- context[tag] += 1 # Count the context.
- # 计算发射矩阵
- emit['{} {}'.format(tag, word)] += 1
- previous = tag
- # Make the sentence end.
- transition['{} {}'.format(previous, EOS)] += 1
-
- # 输出流
- out = io.StringIO()
-
- # 计算转移概率
- for key, value in sorted(transition.items(), key=lambda x: x[1], reverse=True):
- previous, tag = key.split(' ')
- out.write('T {} {}\n'.format(key, value / context[previous]))
-
- # 计算发射概率
- for key, value in sorted(emit.items(), key=lambda x: x[1], reverse=True):
- tag, word = key.split(' ')
- out.write('E {} {}\n'.format(key, value / context[tag]))
-
- with open(model_file, 'w') as f:
- f.write(out.getvalue().strip())
本关任务:编写一个能计算给定数据集词性标注的发射概率与转移概率的小程序。
为了完成本关任务,你需要掌握:
1.发射概率与转移概率的计算原理;
2.发射概率与转移概率的计算方法。
在词性标注中,我们定义 HMM 模型中类似(P(你|代词)、P(很|副词)、P(美|形容词)
叫做发射概率(emission probability),字面意思是从一个词性中发射或生成出某一个单词的概率。比如(P(你|代词)
表示从这么多代词中选一个对应的单词,这个单词为“你”的可能性是多少。
如图1所示,发射概率的计算,如果单词是n
(名词) ,那该单词是 Mary 或 Jane 的概率。为此,可先创建一个这样的表格,如下图所示。N、M、V 分别表示词性。其中 Mary 在语料句子中作为 N 出现了 4次。之后将每列除以条目之和,获得这些数字。如此就计算出了相应的发射概率。注意单词可以重复出现,例如 will 即是名词又是情态动词。
图 1 发射概率的计算
同时,我们定义类似(P(代词|<s>)、P(副词|代词)、P(形容词|副词)、P(</s>|形容词)
叫做转移概率(transition probability),表示从一个词性转移到下一个词性的概率。比如P(副词|代词)
表示上一个词的词性是代词,那么下一个词的词性是副词的概率是多少。发射概率和转移概率都可以从标注文档中通过统计得到。
如图2所示,首先需要给每个句子添加一个开始和结束的标签。当然这些标签也可以作为词性。现在创建计数表,计算每个词性出现的次数。图中,表示名词 N 后面出现的情态动词 M 的次数为 3 次。为了计算概率,需要将每行中的条目除以行中条目的和。如下图情态动词动词 M 之后是名词 N 的概率为 1/4; M 情态动词之后是动词V的概率为 3/4 。这个就是转移概率。
图 2 转移概率的计算
在进行词性标注的应用前,我们首先需要根据数据集计算出词与词性的发射概率和转移概率。即:
1、统计转移概率(transition probability),即连续的两个 tag 出现的频次/频率,对应 HMM 模型中的p(yi∣yi−1)。
2、统计发射概率(emission probability),即某一个 tag 生成某一个 word 的频次/频率,对应 HMM 模型中的p(xi∣yi)。
代码示例:
out = io.StringIO()
# 计算转移概率
for key, value in sorted(transition.items(),key=lambda x: x[1], reverse=True):
previous, word = key.split(' ')
out.write('T {} {}\n'.format(key, value / context[previous]))
# 计算发射概率
for key, value in sorted(emit.items(), key=lambda x: x[1], reverse=True):
previous, tag = key.split(' ')
out.write('E {} {}\n'.format(key, value / context[previous]))
根据提示,在右侧编辑器补充代码,基于给定的数据集计算出其词性标注所需的发射概率与转移概率,并输出到 IO 流中。
平台会对你编写的代码进行测试:
测试输入:无
; 预期输出:
开始计算转移概率。
开始计算发射概率。
任务完成!
开始你的任务吧,祝你成功!
- import io
- import argparse
- import math
- from collections import defaultdict
- from fun import train_hmm
- SOS = '<s>'
- EOS = '</s>'
- N = 1e6
- LAMBDA = 0.95
-
- # 加载计算好的发射概率和转移概率
- def load_model(model_file):
- transition = defaultdict(float)
- emission = defaultdict(float)
- possible_tags = defaultdict(float)
- print("开始加载模型。")
- with open(model_file, 'r') as f:
- for line in f:
- type, context, word, prob = line.strip().split(' ')
- possible_tags[context] = 1
- if type == 'T':
- transition[' '.join([context, word])] = float(prob)
- else:
- emission[' '.join([context, word])] = float(prob)
- return transition, emission, possible_tags
-
- # 从隐马尔可夫模型中获取转移概率
- def prob_trans(key, model):
- return model[key]
-
- # 从隐马尔可夫模型中获取发射概率
- def prob_emiss(key, model):
- return LAMBDA * model[key] + (1 - LAMBDA) * 1 / N
-
- # 定义 Viterbi 学习算法
- def forward_neubig(transition, emission, possible_tags, line):
- words = line.strip().split(' ')
- l = len(words)
- best_score = {}
- best_edge = {}
- best_score['{} {}'.format(0, SOS)] = 0
- best_edge['{} {}'.format(0, SOS)] = None
-
- for i in range(0, l):
- for prev in possible_tags.keys():
- for next in possible_tags.keys():
- prev_key = '{} {}'.format(i, prev)
- next_key = '{} {}'.format(i + 1, next)
- trans_key = '{} {}'.format(prev, next)
- emiss_key = '{} {}'.format(next, words[i])
- if prev_key in best_score and trans_key in transition:
- score = best_score[prev_key] + \
- -math.log2(prob_trans(trans_key, transition)) + \
- -math.log2(prob_emiss(emiss_key, emission))
- if next_key not in best_score or best_score[next_key] > score:
- best_score[next_key] = score
- best_edge[next_key] = prev_key
-
- for prev in possible_tags.keys():
- for next in [EOS]:
- prev_key = '{} {}'.format(l, prev)
- next_key = '{} {}'.format(l + 1, next)
- trans_key = '{} {}'.format(prev, next)
- emiss_key = '{} {}'.format(next, EOS)
- if prev_key in best_score and trans_key in transition:
- score = best_score[prev_key] + \
- -math.log2(prob_trans(trans_key, transition))
- if next_key not in best_score or best_score[next_key] > score:
- best_score[next_key] = score
- best_edge[next_key] = prev_key
-
- return best_edge
-
- # 定义 Viterbi 学习算法
- def forward(transition, emission, possible_tags, line):
- if SOS in possible_tags:
- possible_tags.pop(SOS)
- words = line.strip().split(' ')
- l = len(words)
- best_score = {}
- best_edge = {}
- best_score['{} {}'.format(0, SOS)] = 0
- best_edge['{} {}'.format(0, SOS)] = None
-
- for next in possible_tags.keys():
- for prev in [SOS]:
- prev_key = '{} {}'.format(0, prev)
- next_key = '{} {}'.format(1, next)
- trans_key = '{} {}'.format(prev, next)
- emiss_key = '{} {}'.format(next, words[0])
- if prev_key in best_score and trans_key in transition:
- score = best_score[prev_key] + \
- -math.log2(prob_trans(trans_key, transition)) + \
- -math.log2(prob_emiss(emiss_key, emission))
- if next_key not in best_score or best_score[next_key] > score:
- best_score[next_key] = score
- best_edge[next_key] = prev_key
-
- for i in range(1, l):
- for next in possible_tags.keys():
- for prev in possible_tags.keys():
- prev_key = '{} {}'.format(i, prev)
- next_key = '{} {}'.format(i + 1, next)
- trans_key = '{} {}'.format(prev, next)
- emiss_key = '{} {}'.format(next, words[i])
- if prev_key in best_score and trans_key in transition:
- score = best_score[prev_key] + \
- -math.log2(prob_trans(trans_key, transition)) + \
- -math.log2(prob_emiss(emiss_key, emission))
- if next_key not in best_score or best_score[next_key] > score:
- best_score[next_key] = score
- best_edge[next_key] = prev_key
-
- for next in [EOS]:
- for prev in possible_tags.keys():
- prev_key = '{} {}'.format(l, prev)
- next_key = '{} {}'.format(l + 1, next)
- trans_key = '{} {}'.format(prev, next)
- emiss_key = '{} {}'.format(next, EOS)
- if prev_key in best_score and trans_key in transition:
- score = best_score[prev_key] + \
- -math.log2(prob_trans(trans_key, transition))
- if next_key not in best_score or best_score[next_key] > score:
- best_score[next_key] = score
- best_edge[next_key] = prev_key
-
- return best_edge
-
- # 维特比算法的后向部分。
- def backward(best_edge, line):
- words = line.strip().split(' ')
- l = len(words)
- tags = []
- next_edge = best_edge['{} {}'.format(l+1, EOS)]
- while next_edge != '{} {}'.format(0, SOS):
- position, tag = next_edge.split(' ')
- tags.append(tag)
- next_edge = best_edge[next_edge]
- tags.reverse()
- return tags
-
- def test_hmm(model_file, test_file, output_file):
- transition, emission, possible_tags = load_model(model_file)
-
- out = io.StringIO()
-
- with open(test_file, 'r') as f:
- for line in f:
- best_edge = forward(transition, emission, possible_tags, line)
- tags = backward(best_edge, line)
- out.write(' '.join(tags) + '\n')
-
- if output_file == 'stdout':
- print(out.getvalue().strip())
- else:
- with open(output_file, 'w') as f:
- f.write(out.getvalue().strip())
-
- if __name__ == '__main__':
- training_file = "/data/workspace/myshixun/src/step6/wiki-en-train.norm_pos"
- model_file = "/data/workspace/myshixun/src/step6/my_model"
- test_file ="/data/workspace/myshixun/src/step6/wiki-en-test.norm"
- output_file = "/data/workspace/myshixun/src/step6/my_answer.pos"
- train_hmm(training_file, model_file)
- test_hmm(model_file, test_file, output_file)
- print("模型计算结束,任务完成!")
本关任务:编写一个能计算对指定文件进行词性标注的小程序。
为了完成本关任务,你需要掌握:HMM 测试的方法与步骤。
维特比算法是一个特殊但应用最广的动态规划算法,专门针对一个特殊的图——篱笆网络(Lattice)有向图。凡使用 HMM 描述的问题都可以用它来解码。本实训以维特比算法为基础进行 HMM测试。
图 1 词性标注
如图1所示,每一个单词下方都有一串候选词性(句子开始和结尾处的候选词性为<s>
和</s>
)。维特比算法中,
0:<s>
)”到每一步中每一个候选隐状态节点(图中的大量灰盒子,如2:NN
)的最短路径和对应的概率值。图 2 计算最优路线的值
如图2所示,best_score['1 NN']
表示从初始节点(0 <s>)
到该节点(1 NN)
的最优路线的值(负对数)。best_edge[1 NN] = 0 <s>
表示从初始节点(0 <s>)
到该节点(1 NN)
的最优路线中前一个节点是0 <s>
。代码示例:
for next in possible_tags.keys():
for prev in [SOS]:
prev_key = '{} {}'.format(0, prev)
next_key = '{} {}'.format(1, next)
trans_key = '{} {}'.format(prev, next)
emiss_key = '{} {}'.format(next, words[0])
if prev_key in best_score and trans_key in transition:
score = best_score[prev_key] + \
-math.log2(prob_trans(trans_key, transition)) + \
-math.log2(prob_emiss(emiss_key, emission))
if next_key not in best_score or best_score[next_key] > score:
best_score[next_key] = score
best_edge[next_key] = prev_key
图 3 计算最优路线的值
如图3所示,best_score[2 NN]
表示从初始节点0 <s>
到该节点2 NN
的最优路线的值(负对数)。因为要取最优,显然本层每个单元要对前一层的所有单元进行一次计算,然后取最小值。所以代码中有两个 for 循环。所以算法复杂度为O(nm2)。best_edge[2 NN] = 1 JJ
表示从初始节点0 <s>
到该节点2 NN
的最优路线中前一个节点是1 JJ
。代码示例:
for i in range(1, l):
for next in possible_tags.keys():
for prev in possible_tags.keys():
prev_key = '{} {}'.format(i, prev)
next_key = '{} {}'.format(i + 1, next)
trans_key = '{} {}'.format(prev, next)
emiss_key = '{} {}'.format(next, words[i])
if prev_key in best_score and trans_key in transition:
score = best_score[prev_key] + \
-math.log2(prob_trans(trans_key, transition)) + \
-math.log2(prob_emiss(emiss_key, emission))
if next_key not in best_score or best_score[next_key] > score:
best_score[next_key] = score
best_edge[next_key] = prev_key
根据提示,在右侧编辑器补充代码,完成能够进行词性标注测试的小程序。
平台会对你编写的代码进行测试:
测试输入:无
; 预期输出:
开始计算转移概率和发射概率。
开始加载模型。
模型计算结束,任务完成!
开始你的任务吧,祝你成功!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。