赞
踩
问答机器人思路
1. 问题的处理,包括语料和用户输入的问题
2. 找回:海选,选用机器学习等训练速度较快的方法,大致返回相似的前K个问题
3. 排序:精选,使用深度学习,返回相似度的值
这里的问答机器人是我们在分类之后,对特定问题进行回答的一种机器人。至于回答的问题的类型,取决于我们的语料。
当前我们需要实现的问答机器人是一个回答编程语言(比如python是什么
,python难么
等)相关问题的机器人
主要实现逻辑:从现有的问答对中,选择出和问题最相似的问题,并且获取其相似度(一个数值),如果相似度大于阈值,则返回这个最相似的问题对应的答案
问答机器人的实现可以大致分为三步步骤:
对问题的处理过程中,我们可以考虑以下问题:
python
等,提取出来之后,方便后续对问题进行过滤。
召回:可以理解为是一个海选的操作,就是从现有的问答对中选择可能相似的前K个问题。
为什么要进行召回?
主要目的是为了后续进行排序的时候,减少需要计算的数据量,比如有10万个问答对,直接通过深度学习肯定是可以获取所有的相似度,但是速度慢。
所以考虑使用机器学习的方法进行一次海选
那么,如何实现召回呢?
前面我们介绍,召回就是选择前K个最相似的问题,所以召回的实现就是想办法通过机器学习的手段计算器相似度。
可以思考的方法:
上述的方法理论上都可行,知识当候选计算的词语数量太多的时候,需要挨个计算相似度,非常耗时。
所以可以考虑以下两点:
但是还有一个问题,供大家慢慢思考:
不管是词频,还是tdidf,获取的结果肯定是没有考虑文字顺序的,效果不一定是最好的,那么此时,应该如何让最后召回的效果更好呢?
排序过程,使用了召回的结果作为输入,同时输出的是最相似的那一个。
整个过程使用深度学习实现。深度学习虽然训练的速度慢,但是整体效果肯定比机器学习好(机器学习受限于特征工程,数据量等因素,没有办法深入的学会不同问题之间的内在相似度),所以通过自建的模型,获取最后的相似度。
使用深度学习的模型这样一个黑匣子,在训练数据足够多的时候,能够学习到用户的各种不同输入的问题,当我们把目标值(相似的问题)给定的情况下,让模型自己去找到这些训练数据目标值和特征值之间相似的表示方法。
那么此时,有以下两个问题:
使用什么数据,来训练模型,最后返回模型的相似度
训练的数据的来源:可以考虑根据现有的问答对去手动构造,但是构造的数据不一定能够覆盖后续用户提问的全部问题。所以可以考虑通过程序去采集网站上相似的问题,比如百度知道的搜索结果。
模型该如何构建
模型可以有两个输入,输出为一个数值,两个输入的处理方法肯定是一样的。这种网络结构我们经常把它称作孪生神经网络。
很明显,我们队输入的数据需要进行编码的操作,比如word embedding + LSTM/GRU/BIGRU等
两个编码之后的结果,我们可以进行组合,然后通过一个多层的神经网络,输出一个数字,把这个数值定义为我们的相似度。
当然我们的深层的神经网络在最开始的时候也并不是计算的相似度,但是我们的训练数据的目标值是相似度,在N多次的训练之后,确定了输入和输出的表示方法之后,那么最后的模型输出就是相似度了。
流程如下:
这里说的问答对,是带有标准答案的问题,后续命中问答对中的问题后,会返回该问题对应的答案
为了后续使用方便,我们可以把现有问答对的处理成如下的格式,可以考虑存入数据库或者本地文件:
{
"问题1":{
"主体":["主体1","主体3","主体3"..],
"问题1分词后的句子":["word1","word2","word3"...],
"答案":"答案"
},
"问题2":{
...
}
}
代码如下:
# lib/get_qa_dcit.py def get_qa_dict(): chuanzhi_q_path = "./问答对/Q.txt" chuanzhi_a_path = "./问答对/A.txt" QA_dict = {} for q,a in zip(open(chuanzhi_q_path).readlines(),open(chuanzhi_a_path).readlines()): QA_dict[q.strip()] = {} QA_dict[q.strip()]["ans"] = a.strip() QA_dict[q.strip()]["entity"] = sentence_entity(q.strip())[-1] #准备短问答的问题 python_duan_path = "./data/Python短问答-11月汇总.xlsx" ret = pd.read_excel(python_duan_path) column_list = ret.columns assert '问题' in column_list and "答案" in column_list, "excel 中必须包含问题和答案" for q, a in zip(ret["问题"], ret["答案"]): q = re.sub("\s+", " ", q) QA_dict[q.strip()] = {} QA_dict[q.strip()]["ans"] = a cuted,entiry = sentence_entity(q.strip())[-1] QA_dict[q.strip()]["entity"] = entiry QA_dict[q.strip()]["q_cuted"] = cuted return QA_dict QA_dict = get_qa_dict()
把问答对中的问题,和用户输出的问题,转化为向量,为后续计算相似度做准备。
这里,我们使用tfidf对问答对中的问题进行处理,转化为向量矩阵。
TODO,使用单字,使用n-garm,使用BM25,使用word2vec等,让其结果更加准确
from sklearn.feature_extraction.text import TfidfVectorizer
from lib import QA_dict
def build_q_vectors():
"""对问题建立索引"""
lines_cuted= [q["q_cuted"] for q in QA_dict]
tfidf_vectorizer = TfidfVectorizer()
features_vec = tfidf_vectorizer.fit_transform(lines_cuted)
#返回tfidf_vectorizer,后续还需要对用户输入的问题进行同样的处理
return tfidf_vectorizer,features_vec,lines_cuted
思路很简单。对用户输入的问题使用tfidf_vectorizer
进行处理,然后和features_vec
中的每一个结果进行计算,获取相似度。
但是由于耗时可能会很久,所以考虑使用其他方法来实现
pysparnn
的介绍官方地址:https://github.com/facebookresearch/pysparnn
pysparnn
是一个对sparse数据进行相似邻近搜索的python库,这个库是用来实现 高维空间中寻找最相似的数据的。
pysparnn
的使用方法pysparnn的使用非常简单,仅仅需要以下步骤,就能够完成从高维空间中寻找相似数据的结果
import pysparnn.cluster_index as ci from sklearn.feature_extraction.text import TfidfVectorizer #1. 原始数据 data = [ 'hello world', 'oh hello there', 'Play it', 'Play it again Sam', ] #2. 原始数据向量化 tv = TfidfVectorizer() tv.fit(data) features_vec = tv.transform(data) # 原始数据构造索引 cp = ci.MultiClusterIndex(features_vec, data) # 待搜索的数据向量化 search_data = [ 'oh there', 'Play it again Frank' ] search_features_vec = tv.transform(search_data) #3. 索引中传入带搜索数据,返回结果 cp.search(search_features_vec, k=1, k_clusters=2, return_distance=False) >> [['oh hello there'], ['Play it again Sam']]
使用注意点:
search_features_vec
:搜索的句子的向量k
:最大的几个结果,k=1,返回最大的一个k_clusters
:对数据分为多少类进行搜索return_distance
:是否返回距离#构造索引 cp = ci.MultiClusterIndex(features_vec, lines_cuted) #对用户输入的句子进行向量化 search_vec = tfidf_vec.transform(ret) #搜索获取结果,返回最大的8个数据,之后根据`main_entiry`进行过滤结果 cp_search_list = cp.search(search_vec, k=8, k_clusters=10, return_distance=True) exist_same_entiry = False search_lsit = [] for _temp_call_line in cp_search_list[0]: cur_entity = QA_dict[_temp_call_line[1]]["main_entity"] if len(set(main_entity) & set(cur_entity))>0: #命名体的集合存在交集的时候返回 exist_same_entiry = True search_lsit.append(_temp_call_line[1]) if exist_same_entiry: #存在相同的主体的时候 return search_lsit else: # print(cp_search_list) return [i[1] for i in cp_search_list[0]]
在这个过程中,需要注意,提前把cp,tfidf_vec
等内容提前准备好,而不应该在每次接收到用户的问题之后重新生成一遍,否则效率会很低
pysparnn
的原理介绍1. 聚类:选择部分leader,把数据进行聚类(找到最相似的leader)
2. 搜索:待搜索的数据找到最相似的leader,计算leader所在类的数据的相似度,返回最相似的前K个
参考地址:https://nlp.stanford.edu/IR-book/html/htmledition/cluster-pruning-1.html
前面我们使用的pysparnn
使用的是一种cluster pruning(簇修剪)
的技术,即,开始的时候对数据进行聚类,后续再有限个类别中进行数据的搜索,根据计算的余弦相似度返回结果。
数据预处理过程如下:
当获取到一个问题q的时候,查询过程:
在上述的过程中,可以设置两个大于0的数字b1和b2
数据预处理
阶段,每个follower选择b1个最相似的leader,而不是选择单独一个lader,这样不同的簇是有数据交叉的;前面的描述就是b1=b2=1的情况,通过增加b1和b2
的值,我们能够有更大的机会找到更好的结果,但是这样会需要更加大量的计算。
在pysparnn中实例化索引的过程中
即:ci.MultiClusterIndex(features, records_data, num_indexes)
中,num_indexes
能够设置b1的值,默认为2。
在搜索的过程中,cp.search(search_vec, k=8, k_clusters=10, return_distance=True,num_indexes)
,num_Indexes
可以设置b2的值,默认等于b1的值。
前面的学习,我们能够返回相似的召回结果,但是,如何让这些结果更加准确呢?
我们可以从下面的角度出发:
python 好学 么 -->填充后是 :python 好学 么 简单 难 嘛
,这里假设word2vector同学会了好学,简单,难
他们之间是相似的BM25(BM=best matching)是TDIDF的优化版本,首先我们来看看TFIDF是怎么计算的
t
f
i
d
f
i
=
t
f
∗
i
d
f
=
词
i
的
数
量
词
语
总
数
∗
l
o
g
总
文
档
数
包
含
词
i
的
文
档
数
tfidf_i = tf*idf = \frac{词i的数量}{词语总数}*log\frac{总文档数}{包含词i的文档数}
tfidfi=tf∗idf=词语总数词i的数量∗log包含词i的文档数总文档数
其中tf称为词频,idf为逆文档频率
那么BM25是如何计算的呢?
B
M
25
(
i
)
=
词
i
的
数
量
总
词
数
∗
(
k
+
1
)
C
C
+
k
(
1
−
b
+
b
∣
d
∣
a
v
d
l
)
∗
l
o
g
(
总
文
档
数
包
含
i
的
文
档
数
)
C
=
t
f
=
词
i
的
数
量
总
词
数
,
k
>
0
,
b
∈
[
0
,
1
]
,
d
为
文
档
i
的
长
度
,
a
v
d
l
是
文
档
平
均
长
度
BM25(i) = \frac{词i的数量}{总词数}*\frac{(k+1)C}{C+k(1-b+b\frac{|d|}{avdl})}*log(\frac{总文档数}{包含i的文档数}) \\ C = tf=\frac{词i的数量}{总词数},k>0,b\in [0,1],d为文档i的长度,avdl是文档平均长度
BM25(i)=总词数词i的数量∗C+k(1−b+bavdl∣d∣)(k+1)C∗log(包含i的文档数总文档数)C=tf=总词数词i的数量,k>0,b∈[0,1],d为文档i的长度,avdl是文档平均长度
大家可以看到,BM25和tfidf的计算结果很相似,唯一的区别在于中多了一项,这一项是用来对tf的结果进行的一种变换。
把 1 − b + b d a v d l 1-b+b\frac{d}{avdl} 1−b+bavdld中的b看成0,那么此时中间项的结果为 ( k + 1 ) t f k + t f \frac{(k+1)tf}{k+tf} k+tf(k+1)tf,通过设置一个k,就能够保证其最大值为 1 1 1,达到限制tf过大的目的。
即:
k不变的情况下,上式随着tf的增大而增大,上限为k+1,但是增加的程度会变小,如下图所示。
在一个句子中,某个词重要程度应该是随着词语的数量逐渐衰减的,所以中间项对词频进行了惩罚,随着次数的增加,影响程度的增加会越来越小。通过设置k值,能够保证其最大值为k+1,k往往取值1.2
。
其变化如下图(无论k为多少,中间项的变化程度会随着次数的增加,越来越小):
同时
1
−
b
+
b
d
a
v
d
l
1-b+b\frac{d}{avdl}
1−b+bavdld的作用是用来对文本的长度进行归一化
。
例如在考虑整个句子的tdidf的时候,如果句子的长度太短,那么计算的总的tdidf的值是要比长句子的tdidf的值要低的。所以可以考虑对句子的长度进行归一化处理。
可以看到,当句子的长度越短,
1
−
b
+
b
∣
d
∣
a
v
d
l
1-b+b\frac{|d|}{avdl}
1−b+bavdl∣d∣的值是越小,作为分母的位置,会让整个第二项越大,从而达到提高短文本句子的BM25的值的效果。当b的值为0,可以禁用归一化,b往往取值0.75
其变化效果如下:
通过前面的学习,我们知道其实BM25和Tfidf的区别不大,所以我们可以在之前sciket-learn的TfidfVectorizer基础上进行修改,获取我们的BM25的计算结果,主要也是修改其中的fit
方法和transform
方法
在sklearn的TfidfVectorizer中
,首先接受参数,其次会调用TfidfTransformer
来完成其他方法的调用
继承TfidfVectorizer完成 参数的接受
from sklearn.feature_extraction.text import TfidfVectorizer,TfidfTransformer,_document_frequency from sklearn.base import BaseEstimator,TransformerMixin from sklearn.preprocessing import normalize from sklearn.utils.validation import check_is_fitted import numpy as np import scipy.sparse as sp class Bm25Vectorizer(CountVectorizer): def __init__(self,k=1.2,b=0.75, norm="l2", use_idf=True, smooth_idf=True,sublinear_tf=False,*args,**kwargs): super(Bm25Vectorizer,self).__init__(*args,**kwargs) self._tfidf = Bm25Transformer(k=k,b=b,norm=norm, use_idf=use_idf, smooth_idf=smooth_idf, sublinear_tf=sublinear_tf) @property def k(self): return self._tfidf.k @k.setter def k(self, value): self._tfidf.k = value @property def b(self): return self._tfidf.b @b.setter def b(self, value): self._tfidf.b = value def fit(self, raw_documents, y=None): """Learn vocabulary and idf from training set. """ X = super(Bm25Vectorizer, self).fit_transform(raw_documents) self._tfidf.fit(X) return self def fit_transform(self, raw_documents, y=None): """Learn vocabulary and idf, return term-document matrix. """ X = super(Bm25Vectorizer, self).fit_transform(raw_documents) self._tfidf.fit(X) return self._tfidf.transform(X, copy=False) def transform(self, raw_documents, copy=True): """Transform documents to document-term matrix. """ check_is_fitted(self, '_tfidf', 'The tfidf vector is not fitted') X = super(Bm25Vectorizer, self).transform(raw_documents) return self._tfidf.transform(X, copy=False)
完成自己的Bm25transformer
,只需要再原来基础的代码上进心修改部分即可。sklearn中的转换器类的实现要求,不能直接继承已有的转换器类
class Bm25Transformer(BaseEstimator, TransformerMixin): def __init__(self,k=1.2,b=0.75, norm='l2', use_idf=True, smooth_idf=True, sublinear_tf=False): self.k = k self.b = b ##################以下是TFIDFtransform代码########################## self.norm = norm self.use_idf = use_idf self.smooth_idf = smooth_idf self.sublinear_tf = sublinear_tf def fit(self, X, y=None): """Learn the idf vector (global term weights) Parameters ---------- X : sparse matrix, [n_samples, n_features] a matrix of term/token counts """ _X = X.toarray() self.avdl = _X.sum()/_X.shape[0] #句子的平均长度 # print("原来的fit的数据:\n",X) #计算每个词语的tf的值 self.tf = _X.sum(0)/_X.sum() #[M] #M表示总词语的数量 self.tf = self.tf.reshape([1,self.tf.shape[0]]) #[1,M] # print("tf\n",self.tf) ##################以下是TFIDFtransform代码########################## if not sp.issparse(X): X = sp.csc_matrix(X) if self.use_idf: n_samples, n_features = X.shape df = _document_frequency(X) # perform idf smoothing if required df += int(self.smooth_idf) n_samples += int(self.smooth_idf) # log+1 instead of log makes sure terms with zero idf don't get # suppressed entirely. idf = np.log(float(n_samples) / df) + 1.0 self._idf_diag = sp.spdiags(idf, diags=0, m=n_features, n=n_features, format='csr') return self def transform(self, X, copy=True): """Transform a count matrix to a tf or tf-idf representation Parameters ---------- X : sparse matrix, [n_samples, n_features] a matrix of term/token counts copy : boolean, default True Whether to copy X and operate on the copy or perform in-place operations. Returns ------- vectors : sparse matrix, [n_samples, n_features] """ ########### 计算中间项 ############### cur_tf = np.multiply(self.tf, X.toarray()) #[N,M] #N表示数据的条数,M表示总词语的数量 norm_lenght = 1 - self.b + self.b*(X.toarray().sum(-1)/self.avdl) #[N] #N表示数据的条数 norm_lenght = norm_lenght.reshape([norm_lenght.shape[0],1]) #[N,1] middle_part = (self.k+1)*cur_tf /(cur_tf +self.k*norm_lenght) ############# 结算结束 ################ if hasattr(X, 'dtype') and np.issubdtype(X.dtype, np.floating): # preserve float family dtype X = sp.csr_matrix(X, copy=copy) else: # convert counts or binary occurrences to floats X = sp.csr_matrix(X, dtype=np.float64, copy=copy) n_samples, n_features = X.shape if self.sublinear_tf: np.log(X.data, X.data) X.data += 1 if self.use_idf: check_is_fitted(self, '_idf_diag', 'idf vector is not fitted') expected_n_features = self._idf_diag.shape[0] if n_features != expected_n_features: raise ValueError("Input has n_features=%d while the model" " has been trained with n_features=%d" % ( n_features, expected_n_features)) # *= doesn't work X = X * self._idf_diag ############# 中间项和结果相乘 ############ X = X.toarray()*middle_part if not sp.issparse(X): X = sp.csr_matrix(X, dtype=np.float64) ############# ######### if self.norm: X = normalize(X, norm=self.norm, copy=False) return X @property def idf_(self): ##################以下是TFIDFtransform代码########################## # if _idf_diag is not set, this will raise an attribute error, # which means hasattr(self, "idf_") is False return np.ravel(self._idf_diag.sum(axis=0))
完整代码参考:https://github.com/SpringMagnolia/Bm25Vectorzier/blob/master/BM25Vectorizer.py
测试简单使用,观察和tdidf的区别:
from BM25Vectorizer import Bm25Vectorizer from sklearn.feature_extraction.text import TfidfVectorizer if __name__ == '__main__': # format_weibo(word=False) # format_xiaohuangji_corpus(word=True) bm_vec = Bm25Vectorizer() tf_vec = TfidfVectorizer() # 1. 原始数据 data = [ 'hello world', 'oh hello there', 'Play it', 'Play it again Sam,24343,123', ] # 2. 原始数据向量化 bm_vec.fit(data) tf_vec.fit(data) features_vec_bm = bm_vec.transform(data) features_vec_tf = tf_vec.transform(data) print("Bm25 result:",features_vec_bm.toarray()) print("*"*100) print("Tfidf result:",features_vec_tf.toarray())
输出如下:
Bm25 result: [[0. 0. 0. 0.47878333 0. 0. 0. 0. 0. 0.8779331 ] [0. 0. 0. 0.35073401 0. 0.66218791 0. 0. 0.66218791 0. ] [0. 0. 0. 0. 0.70710678 0. 0.70710678 0. 0. 0. ] [0.47038081 0.47038081 0.47038081 0. 0.23975776 0. 0.23975776 0.47038081 0. 0. ]] ********************************************************************************** Tfidf result: [[0. 0. 0. 0.6191303 0. 0. 0. 0. 0. 0.78528828] [0. 0. 0. 0.48693426 0. 0.61761437 0. 0. 0.61761437 0. ] [0. 0. 0. 0. 0.70710678 0. 0.70710678 0. 0. 0. ] [0.43671931 0.43671931 0.43671931 0. 0.34431452 0. 0.34431452 0.43671931 0. 0. ]]
修改之前召回的代码只需要把调用tfidfvectorizer改成调用Bm25vectorizer
这里我们可以使用fasttext,word2vector等方式实现获取词向量,然后对一个句子中的所有词语的词向量进行平均,获取整个句子的向量表示,即sentence Vector
,该实现方法在fasttext和Word2vector中均有实现,而且通过参数的控制,实现N-garm的效果
假设我们有文本a.txt
如下:
我 很 喜欢 她
今天 天气 不错
我 爱 深度学习
那么我们可以实现获取句子向量的方法如下
from fastText import FastText
#训练模型,设置n-garm=2
model = FastText.train_unsupervised(input="./a.txt",minCount=1,wordNgrams=2)
#获取句子向量,是对词向量的平均
model.get_sentence_vector("我 是 谁")
这里我们使用之前采集的相似文本数据作为训练样本
步骤如下:
这里我们使用单个字作为特征,只需要注意,英文使用单个词作为特征
""" 使用单个字作为特征,进行fasttext训练,最后封装代码获取召回结果 """ import string def word_split(line): #对中文按照字进行处理,对英文不分为字母 #即 I爱python --> i 爱 python letters = string.ascii_lowercase+"+"+"/" #c++,ui/ue result = [] temp = "" for word in line: if word.lower() in letters: temp+=word.lower() else: if temp !="": result.append(temp) temp = "" result.append(word) if temp!="": result.append(temp) return result def process_data(): path1 = r"corpus\final_data\merged_q.txt" path2 = r"corpus\final_data\merged_sim_q.txt" save_path = r"corpus\recall_fasttext_data\data.txt" filter = set() with open(path1) as f,open(save_path,"a") as save_f: for line in f: line = line.strip() if line not in filter: filter.add(line) _temp = " ".join(word_split(line)) save_f.write(_temp+"\n") with open(path2) as f,open(save_path,"a") as save_f: for line in f: line = line.strip() if line not in filter: filter.add(line) _temp = " ".join(word_split(line)) save_f.write(_temp+"\n")
训练fasttext的model,用来生成词向量
def train_model(fasttext_model_path):
logging.basicConfig(format='%(asctime)s : %(levelname)s : %(message)s', level=logging.INFO)
save_path = r"corpus\recall_fasttext_data\data.txt"
model = FastText.train_unsupervised(save_path,epoch=20,minCount=3,wordNgrams=2)
model.save_model(fasttext_model_path)
对现有的QA问答对,生成向量,传入pysparnn中构建索引
def get_base_text_vectors(cp_dump_path,model):
#保存到本地pkl文件,防止每次都生成一次
if os.path.exists(cp_dump_path):
cp = pickle.load(open(cp_dump_path,"rb"))
else:
print(QA_dict)
q_lines = [q for q in QA_dict]
q_cuted_list = [" ".join(word_split(i)) for i in q_lines]
lines_vectors = []
for q_cuted in q_cuted_list:
lines_vectors.append(model.get_sentence_vector(q_cuted))
cp = ci.MultiClusterIndex(lines_vectors,q_lines)
pickle.dump(cp,open(cp_dump_path,"wb"))
return cp
传入用户的问题,进行分词和句子向量的获取,获取搜索的结果
def get_search_vectors(cp,model,search_line):
line_cuted = " ".join(word_split(search_line))
line_vec = model.get_sentence_vector(line_cuted)
#这里的line_vec中可以有多个句子的向量表示,能够返回每个句子的搜索结果
cp_search_list = cp.search(line_vec,k=10,k_clusters=10,return_distance=True)
#TODO 对搜索的结果进行关键字的过滤
return cp_search_list
测试模型的效果
from fastext_vectors import get_search_vectors,train_model,get_base_text_vectors
import fastText
if __name__ == '__main__':
fasttext_model_path = "corpus/build_questions/fasttext_recall.model"
cp_dump_path = "corpus/build_questions/cp_recall.pkl"
# train_model(fasttext_model_path)
model = fastText.load_model(fasttext_model_path)
cp = get_base_text_vectors(cp_dump_path,model)
ret = get_search_vectors(cp,model,"女孩学python容易么?")
print(ret)
输出如下:
[[('0.0890376', '学习Python需要什么基础,学起来更容易?'),
('0.090688944', '学习PHP的女生多吗?女生可以学吗?'),
('0.092773676', 'Python适合什么人学习?'),
('0.09416294', 'Python语言适合什么样的人学?'),
('0.102790296', 'python语言容易学习吗?'),
('0.1050359', '学习测试的女生多吗?女生可以学吗?'),
('0.10546541', 'Python好学吗?'),
('0.11058545', '学习Python怎样?'),
('0.11080605', '怎样学好Python?'),
('0.11124289', '学生怎么上课的?')]]
#lib/SentenceVectorizer """ 使用fasttext 实现sentence to vector """ import fastText from fastText import FastText import config from lib import cut import logging import os class SentenceVectorizer: def __init__(self): if os.path.exists(config.recall_fasttext_model_path): self.model = fastText.load_model(config.recall_fasttext_model_path) else: # self.process_data() self.model = self.build_model() self.fited = False def fit_transform(self,sentences): """处理全部问题数据""" lines_vectors = self.fit(sentences) return lines_vectors def fit(self,lines): lines_vectors = [] for q_cuted in lines: lines_vectors.append(self.model.get_sentence_vector(q_cuted)) self.fited = True return lines_vectors def transform(self,sentence): """处理用户输入的数据""" assert self.fited = True line_vec = self.model.get_sentence_vector(" ".join(sentence)) return line_vec def build_model(self): logging.basicConfig(format='%(asctime)s : %(levelname)s : %(message)s', level=logging.INFO) model = FastText.train_unsupervised(config.recall_fasttext_data_path, epoch=20, minCount=3, wordNgrams=2) model.save_model(config.recall_fasttext_model_path) return model def process_data(self): path1 = r"corpus\final_data\merged_q.txt" path2 = r"corpus\final_data\merged_sim_q.txt" save_path = r"corpus\recall_fasttext_data\data.txt" filter = set() with open(path1) as f, open(save_path, "a") as save_f: for line in f: line = line.strip() if line not in filter: filter.add(line) _temp = " ".join(cut(line,by_word=True)) save_f.write(_temp + "\n") with open(path2) as f, open(save_path, "a") as save_f: for line in f: line = line.strip() if line not in filter: filter.add(line) _temp = " ".join(cut(line,by_word=True)) save_f.write(_temp + "\n")
前面的课程中为了完成一个问答机器人,我们先进行了召回,相当于是通过海选的方法找到呢大致相似的问题。
通过现在的排序模型,我们需要精选出最相似的哪一个问题,返回对应的答案
我们需要实现的排序模型是两个输入,即两个问题,输出的是一个相似度。所以和之前的深度学习模型一样,我们需要实现的步骤如下:
这里的数据,我们使用之前采集的百度问答的相似问题和手动构造的数据。那么,我们需要把他格式化为最终模型需要的格式,即两个输入和输出的相似度。
这里的输入,我们可以使用单个字作为特征,也可以使用一个分词之后的词语作为特征。所以在实现准备输入数据方法的过程中,可以提前准备。
这里我们使用每个问题搜索结果的前两页认为他们是相似的,相似度为1,最后两页的结果是不相似的,相似度为0。
介绍模型的构建之前,我们先介绍下孪生神经网络(Siamese Network)
和其名字的由来。
1. 两个输入,一个输出
2. 两个输入在编码部分共享权重
3. 输出
a. 直接对两个编码后的结果进行相似度计算
b. 两个编码后的结果交给DNN,输出一个值
Siamese和Chinese有点像。Siamese是古时候泰国的称呼,中文译作暹罗。Siamese在英语中是“孪生”、“连体”的意思。为什么孪生和泰国有关系呢?
十九世纪泰国出生了一对连体婴儿,当时的医学技术无法使两人分离出来,于是两人顽强地生活了一生,1829年被英国商人发现,进入马戏团,在全世界各地表演,1839年他们访问美国北卡罗莱那州后来成为马戏团的台柱,最后成为美国公民。1843年4月13日跟英国一对姐妹结婚,恩生了10个小孩,昌生了12个,姐妹吵架时,兄弟就要轮流到每个老婆家住三天。1874年恩因肺病去世,另一位不久也去世,两人均于63岁离开人间。两人的肝至今仍保存在费城的马特博物馆内。从此之后“暹罗双胞胎”(Siamese twins)就成了连体人的代名词,也因为这对双胞胎让全世界都重视到这项特殊疾病。
所以孪生神经网络就是有两个共享权值的网络的组成,或者只用实现一个,另一个直接调用,有两个输入,一个输出。1993年就已经被用来进行支票签名的验证。
孪生神经网络通过两个输入,被DNN进行编码,得到向量的表示之后,根据实际的用途来制定损失函数。比如我们需要计算相似度的时候,可以使用余弦相似度,或者使用 e x p − ∣ ∣ h l e f t − h r i g h t ∣ ∣ exp^{-||h^{left}-h^{right}||} exp−∣∣hleft−hright∣∣来确定向量的距离。
孪生神经网络被用于有多个输入和一个输出的场景,比如手写字体识别、文本相似度检验、人脸识别等
在计算相似度之前,我们可以考虑在传统的孪生神经网络的基础上,在计算相似度之前,把我们的编码之后的向量通过多层神经网络进行非线性的变化,结果往往会更加好,那么此时其网络结构大致如下:
其中Network1和network2为权重参数共享的两个形状相同的网络,用来对输入的数据进行编码,包括(word-embedding,GRU,biGRU等
),Network3部分是一个深层的神经网络,包含(batchnorm、dropout、relu、Linear
等层)
编写预测和评估的代码,预测的过程只需要修改获得结果,不需要上图中的损失计算的过程
这里的分词可以对之前的分词方法进行修改
def cut_sentence_by_word(sentence): # 对中文按照字进行处理,对英文不分为字母 letters = string.ascii_lowercase + "+" + "/" # c++,ui/ue result = [] temp = "" for word in line: if word.lower() in letters: temp += word.lower() else: if temp != "": result.append(temp) temp = "" result.append(word) if temp != "": result.append(temp) return result def jieba_cut(sentence,by_word=False,with_sg=False,use_stopwords=False): if by_word: return cut_sentence_by_word(sentence) ret = psg.lcut(sentence) if use_stopwords: ret = [(i.word, i.flag) for i in ret if i.word not in stopwords_list] if not with_sg: ret = [i[0] for i in ret] return ret
word Sequence
代码该处的代码和seq2seq中的代码相同,直接使用
Dataset
和DataLoader
和seq2seq中的代码大致相同
前面做好了准备工作之后,就需要开始进行模型的搭建。
虽然我们知道了整个结构的大致情况,但是我们还是不知道其中具体的细节。
2016年AAAI
会议上,有一篇Siamese Recurrent Architectures for Learning Sentence Similarity
的论文(地址:https://www.aaai.org/ocs/index.php/AAAI/AAAI16/paper/download/12195/12023)
整个结构如下图:
可以看到word 经过embedding之后进行LSTM的处理,然后经过exp来确定相似度,可以看到整个模型是非常简单的,之后很多人在这个结构上增加了更多的层,比如加入attention、dropout、pooling等层。
那么这个时候,请思考下面几个问题:
attention在这个网络结构中该如何实现
之前我们的attention是用在decoder中,让decoder的hidden和encoder的output进行运算,得到attention的weight,再和decoder的output进行计算,作为下一次decoder的输入
那么在当前我们可以把句子A的output理解为句子B的encoder的output
,那么我们就可以进行attention的计算了
和这个非常相似的有一个attention的变种,叫做
self attention
。前面所讲的Attention是基于source端和target端的隐变量(hidden state)计算Attention的,得到的结果是源端的每个词与目标端每个词之间的依赖关系。Self Attention
不同,它分别在source端和target端进行,仅与source input或者target input自身相关的Self Attention,捕捉source端或target端自身的词与词之间的依赖关系。
self.attention
1. attention:在decoder的阶段寻找encoder中需要重点关注的内容
2. self-attention:当前时间步的输出需要更大程度的受到当前句子的哪些值影响
a. 计算attention weight
b. self.attention 的结果 = attention_weight * 当前句子的输出
dropout用在什么地方
pooling是什么如何使用
max pooling
或者是average pooling
def forward(self, *input): sent1, sent2 = input[0], input[1] #这里使用mask,在后面计算attention的时候,让其忽略pad的位置 mask1, mask2 = sent1.eq(0), sent2.eq(0) # embeds: batch_size * seq_len => batch_size * seq_len * batch_size x1 = self.embeds(sent1) x2 = self.embeds(sent2) # batch_size * seq_len * dim => batch_size * seq_len * hidden_size output1, _ = self.lstm1(x1) output2, _ = self.lstm1(x2) # 进行Attention的操作,同时进行形状的对齐 # batch_size * seq_len * hidden_size q1_align, q2_align = self.soft_attention_align(output1, output2, mask1, mask2) # 拼接之后再传入LSTM中进行处理 # batch_size * seq_len * (8 * hidden_size) q1_combined = torch.cat([output1, q1_align, self.submul(output1, q1_align)], -1) q2_combined = torch.cat([output2, q2_align, self.submul(output2, q2_align)], -1) # batch_size * seq_len * (2 * hidden_size) q1_compose, _ = self.lstm2(q1_combined) q2_compose, _ = self.lstm2(q2_combined) # 进行Aggregate操作,也就是进行pooling # input: batch_size * seq_len * (2 * hidden_size) # output: batch_size * (4 * hidden_size) q1_rep = self.apply_pooling(q1_compose) q2_rep = self.apply_pooling(q2_compose) # Concate合并到一起,用来进行计算相似度 x = torch.cat([q1_rep, q2_rep], -1) def submul(self,x1,x2): mul = x1 * x2 sub = x1 - x2 return torch.cat([sub,mul],dim=-1)
实现思路:
def soft_attention_align(self, x1, x2, mask1, mask2): ''' x1: batch_size * seq_len_1 * hidden_size x2: batch_size * seq_len_2 * hidden_size mask1:x1中pad的位置为1,其他为0 mask2:x2中pad 的位置为1,其他为0 ''' # attention: batch_size * seq_len_1 * seq_len_2 attention_weight = torch.matmul(x1, x2.transpose(1, 2)) #mask1 : batch_size,seq_len1 mask1 = mask1.float().masked_fill_(mask1, float('-inf')) #mask2 : batch_size,seq_len2 mask2 = mask2.float().masked_fill_(mask2, float('-inf')) # weight: batch_size * seq_len_1 * seq_len_2 weight1 = F.softmax(attention_weight + mask2.unsqueeze(1), dim=-1) #batch_size*seq_len_1*hidden_size x1_align = torch.matmul(weight1, x2) #同理,需要对attention_weight进行permute操作 weight2 = F.softmax(attention_weight.transpose(1, 2) + mask1.unsqueeze(1), dim=-1) x2_align = torch.matmul(weight2, x1)
池化的过程有一个窗口
的概念在其中,所以max 或者是average指的是窗口中的值取最大值还是取平均估值。整个过程可以理解为拿着窗口在源数据上取值
窗口有窗口大小(kernel_size,窗口多大)和步长(stride,每次移动多少)两个概念
>>> input = torch.tensor([[[1,2,3,4,5,6,7]]])
>>> F.avg_pool1d(input, kernel_size=3, stride=2)
tensor([[[ 2., 4., 6.]]]) #[1,2,3] [3,4,5] [5,6,7]的平均估值
def apply_pooling(self, x):
# input: batch_size * seq_len * (2 * hidden_size)
#进行平均池化
p1 = F.avg_pool1d(x.transpose(1, 2), x.size(1)).squeeze(-1)
#进行最大池化
p2 = F.max_pool1d(x.transpose(1, 2), x.size(1)).squeeze(-1)
# output: batch_size * (4 * hidden_size)
return torch.cat([p1, p2], 1)
相似度的计算我们可以使用一个传统的距离计算公式,或者是exp的方法来实现,但是其效果不一定好,所以这里我们使用一个深层的神经网络来实现,使用pytorch中的Sequential对象来实现非常简单
self.fc = nn.Sequential( nn.BatchNorm1d(self.hidden_size * 8), nn.Linear(self.hidden_size * 8, self.linear_size), nn.ELU(inplace=True), nn.BatchNorm1d(self.linear_size), nn.Dropout(self.dropout), nn.Linear(self.linear_size, self.linear_size), nn.ELU(inplace=True), nn.BatchNorm1d(self.linear_size), nn.Dropout(self.dropout), nn.Linear(self.linear_size, 2), nn.Softmax(dim=-1) )
在上述过程中,我们使用了激活函数ELU,而没有使用RELU,因为在有噪声的数据中ELU的效果往往会更好。
E L U ( ∗ x ∗ ) = m a x ( 0 , x ) + m i n ( 0 , α ∗ ( e x p ( x ) − 1 ) ) ELU(*x*)=max(0,x)+min(0,α∗(exp(x)−1)) ELU(∗x∗)=max(0,x)+min(0,α∗(exp(x)−1)),其中 α \alpha α在torch中默认值为1。
通过下图可以看出他和RELU的区别,RELU在小于0的位置全部为0,但是ELU在小于零的位置是从0到-1的。可以理解为正常的数据汇总难免出现噪声,小于0的值,而RELU会直接把他处理为0,认为其实正常值,但是ELU却会保留他,所以ELU比RELU更有鲁棒性
在孪生神经网络中我们经常会使用对比损失(Contrastive Loss),作为损失函数,对比损失是Yann LeCun
提出的用来判断数据降维之后和源数据是否相似的问题。在这里我们用它来判断两个句子的表示是否相似。
对比损失的计算公式如下:
L
=
1
2
N
∑
n
=
1
N
(
y
d
2
+
(
1
−
y
)
m
a
x
(
m
a
r
g
i
n
−
d
,
0
)
2
)
L = \frac{1}{2N}\sum^N_{n=1}(yd^2 + (1-y)max(margin-d,0)^2)
L=2N1n=1∑N(yd2+(1−y)max(margin−d,0)2)
其中
d
=
∣
∣
a
n
−
b
n
∣
∣
2
d = ||a_n-b_n||_2
d=∣∣an−bn∣∣2,代表两个两本特征的欧氏距离,y表示是否匹配,y=1表示匹配,y=0表示不匹配,margin是一个阈值,比如margin=1。
上式可分为两个部分,即:
下图红色是相似样本的损失,蓝色是不相似样本的损失
但是前面我们已经计算出了相似度,所以在这里我们有两个操作
def train(model,optimizer,loss_func,epoch): model.tarin() for batch_idx, (q,simq,q_len,simq_len,sim) in enumerate(train_loader): optimizer.zero_grad() output = model(q.to(config.device),simq.to(config.device)) loss = loss_func(output,sim.to(config.deivce)) loss.backward() optimizer.step() if batch_idx%100==0: print("...") torch.save(model.state_dict(), './DNN/data/model_paramters.pkl') torch.save(optimizer.state_dict(),"./DNN/data/optimizer_paramters.pkl") model = SiameseNetwork().cuda() loss = torch.nn.MSELoss() optimizer = optim.Adam(model.parameters(), lr=0.001) for epoch in range(1,config.epoch+1): train(model,optimizer,loss,epoch)
#contrastive_loss.py import torch import torch.nn class ContrastiveLoss(torch.nn.Module): """ Contrastive loss function. """ def __init__(self, margin=1.0): super(ContrastiveLoss, self).__init__() self.margin = margin def forward(self, x0, x1, y): # 欧式距离 diff = x0 - x1 dist_sq = torch.sum(torch.pow(diff, 2), 1) dist = torch.sqrt(dist_sq) mdist = self.margin - dist #clamp(input,min,max),和numpy中裁剪的效果相同 dist = torch.clamp(mdist, min=0.0) loss = y * dist_sq + (1 - y) * torch.pow(dist, 2) loss = torch.sum(loss) / 2.0 / x0.size()[0] return loss
之后只需要把原来的损失函数改为当前的损失函数即可
代码封装过程中,需要注意,在整个结构中,我们有很多的结算结果是dump到本地的,为了防止后续每次的重复计算。所以laod的结果,应该提前加载到内容,而不是每次调用load义词
完成判断用户意图的代码,即在使用fasttext的模型,判断用户输入句子的分类
import fastText import re from lib import jieba_cut fc_word_mode = fastText.load_model("./classify/data/ft_classify.model") fc_word_mode = fastText.load_model("./classify/data/ft_classify_words.model") def is_QA(sentence_info): python_qs_list = [" ".join(sentence_info["cuted_sentence"])] result = fc_word_mode.predict(python_qs_list) python_qs_list = [" ".join(sentence_info["cuted_word_sentence"])] words_result = fc_word_mode.predict(python_qs_list) for index, (label,acc,word_label,word_acc) in enumerate(zip(*result,*words_result)): label = label[0] acc = acc[0] word_label = word_label[0] word_acc = word_acc[0] #以label_qa为准,如果预测结果是label_chat,则label_qa的概率=1-labele_chat if label == "__label__chat": label = "__label__QA" acc = 1-acc if word_label == "__label__chat": word_label = "__label__QA" word_acc = 1 - word_acc if acc>0.95 or word_acc>0.95: #是QA return True else: return False
提供predict的接口
""" 准备闲聊的模型 """ import pickle from lib import jieba_cut import numpy as np from chatbot import Sequence2Sequence class Chatbot: def __init__(self,ws_path="./chatbot/data/ws.pkl",save_path="./chatbot/model/seq2seq_chatbot.ckpt"): self.ws_chatbot = pickle.load(open(ws_path, "rb")) self.save_path = save_path #TODO ..... def predict(self,s): """ :param s:没有分词的 :param ws: :param ws_words: :return: """ #TODO ... return ans
""" 进行召回的方法 """ import os import pickle class Recall: def __init__(self,topk=20): # 准备问答的mode等模块 self.topk = topk def predict(self,sentence): """ :param sentence: :param debug: :return: [recall list],[entity] """ #TODO recall return recall_list def get_answer(self,s): return self.QA_dict[s]
""" 深度学习排序 """ import tensorflow as tf import pickle from DNN2 import SiamsesNetwork from lib import jieba_cut class DNNSort(): def __init__(self): #使用词语和单字两个模型的均值作为最后的结果 self.dnn_sort_words = DNNSortWords() self.dnn_sort_single_word = DNNSortSingleWord() def predict(self,s,c_list): sort1 = self.dnn_sort_words.predict(s,c_list) sort2 = self.dnn_sort_single_word.predict(s,c_list) for i in sort1: sort1[i] = (sort1[i]+ sort2[i])/2 sorts = sorted(sort1.items(),key=lambda x:x[-1],reverse=True) return sorts[0][0],sorts[0][1] class DNNSortWords: def __init__(self,ws_path="./DNN2/data/ws_80000.pkl",save_path="./DNN2/model_keras/esim_model_softmax.ckpt"): self.ws = pickle.load(open(ws_path, "rb")) self.save_path = save_path #TOOD ... def predict(self,s,c_list): """ :param s:没有分词的 :param c_list: 带比较的列表 :param ws: :param ws_words: :return: """ #TOOD ... return sim_dict class DNNSortSingleWord: def __init__(self,ws_path="./DNN2/data/ws_word.pkl",save_path="./DNN2/data/esim_word_model_softmax.ckpt"): self.ws = pickle.load(open(ws_path, "rb")) self.save_path = save_path #TOOD ... def predict(self,s,c_list): """ :param s:没有分词的 :param c_list: 带比较的列表 :param ws: :param ws_words: :return: """ #TOOD ... return sim_dict
不同的用户,连续10分钟内的对话认为是一轮对话,如果10分还没有下一次对话,认为该轮对话结束,如果10分钟后开始对话,认为是下一轮对话。是要是为了保存不同轮中的聊天主题,后续可以实现基本的对话管理。比如用户刚问了python相关的问题,后续如果问题中不带主体,那么就把redis中的python作为其主体
主要实现逻辑为:
具体思路如下:
""" 获取,更新用户的信息 """ from pymongo import MongoClient import redis from uuid import uuid1 import time import json """ ### redis { user_id:"id", user_background:{} last_entity:[] last_conversation_time:int(time): } userid_conversation_id:"" ### monodb 存储对话记录 {user_id:,conversion_id:,from:user/bot,message:"",create_time,entity:[],attention:[]} """ HOST = "localhost" CNVERSION_EXPERID_TIME = 60 * 10 # 10分钟,连续10分钟没有通信,意味着会话结束 class MessageManager: def __init__(self): self.client = MongoClient(host=HOST) self.m = self.client["toutiao"]["dialogue"] self.r = redis.Redis(host=HOST, port=6379, db=10) def last_entity(self, user_id): """最近一次的entity""" return json.loads(self.r.hget(user_id, "entity")) def gen_conversation_id(self): return uuid1().hex def bot_message_pipeline(self, user_id, message): """保存机器人的回复记录""" conversation_id_key = "{}_conversion_id".format(user_id) conversation_id = self.user_exist(conversation_id_key) if conversation_id: # 更新conversation_id的过期时间 self.r.expire(conversation_id_key, CNVERSION_EXPERID_TIME) data = {"user_id": user_id, "conversation_id": conversation_id, "from": "bot", "message": message, "create_time": int(time.time()), } self.m.save(data) else: raise ValueError("没有会话id,但是机器人尝试回复....") def user_message_pipeline(self, user_id, message, create_time, attention, entity=[]): # 确定用户相关的信息 # 1. 用户是否存在 # 2.1 用户存在,返回用户的最近的entity,存入最近的对话 # 3.1 判断是否为新的对话,如果是新对话,开启新的回话,update用户的对话信息 # 3.2 如果不是新的对话,update用户的对话信息 # 3. 更新用户的基本信息 # 4 返回用户相关信息 # 5. 调用预测接口,发来对话的结构 # 要保存的data数据,缺少conversation_id data = { "user_id": user_id, "from": "user", "message": message, "create_time": create_time, "entity": json.dumps(entity), "attention": attention, } conversation_id_key = "{}_conversion_id".format(user_id) conversation_id = self.user_exist(conversation_id_key) print("conversation_id",conversation_id) if conversation_id: if entity: # 更新当前用户的 last_entity self.r.hset(user_id, "last_entity", json.dumps(entity)) # 更新最后的对话时间 self.r.hset(user_id, "last_conversion_time", create_time) # 设置conversation id的过期时间 self.r.expire(conversation_id_key, CNVERSION_EXPERID_TIME) # 保存聊天记录到mongodb中 data["conversation_id"] = conversation_id self.m.save(data) print("mongodb 保存数据成功") else: # 不存在 user_basic_info = { "user_id": user_id, "last_conversion_time": create_time, "last_entity": json.dumps(entity) } self.r.hmset(user_id, user_basic_info) print("redis存入 user_basic_info success") conversation_id = self.gen_conversation_id() print("生成conversation_id",conversation_id) # 设置会话的id self.r.set(conversation_id_key, conversation_id, ex=CNVERSION_EXPERID_TIME) # 保存聊天记录到mongodb中 data["conversation_id"] = conversation_id self.m.save(data) print("mongodb 保存数据成功") def user_exist(self, conversation_id_key): """ 判断用户是否存在 :param user_id:用户id :return: """ conversation_id = self.r.get(conversation_id_key) if conversation_id: conversation_id = conversation_id.decode() print("load conversation_id",conversation_id) return conversation_id
gRPC 的安装:`pip install grpcio`
安装 ProtoBuf 相关的 python 依赖库:`pip install protobuf`
安装 python grpc 的 protobuf 编译工具:`pip install grpcio-tools`
//chatbot.proto 文件 syntax = "proto3"; message ReceivedMessage { string user_id = 1; //用户id string user_message = 2; //当前用户传递的消息 int32 create_time = 3; //当前消息发送的时间 } message ResponsedMessage { string user_response = 1; //返回给用户的消息 int32 create_time = 2; //返回给用户的时间 } service ChatBotService { rpc Chatbot (ReceivedMessage) returns (ResponsedMessage); }
使用下面的命令编译,得到chatbot_pb2.py
和chatbot_pb2_grpc.py
文件
python -m grpc_tools.protoc -I. –python_out=. –grpc_python_out=. ./chatbot.proto
import dialogue from classify import is_QA from dialogue.process_sentence import process_user_sentence from chatbot_grpc import chatbot_pb2_grpc from chatbot_grpc import chatbot_pb2 import time class chatServicer(chatbot_pb2_grpc.ChatBotServiceServicer): def __init__(self): #提前加载各种模型 self.recall = dialogue.Recall(topk=20) self.dnnsort = dialogue.DNNSort() self.chatbot = dialogue.Chatbot() self.message_manager = dialogue.MessageManager() def Chatbot(self, request, context): user_id = request.user_id message = request.user_message create_time = request.create_time #对用户的输出进行基础的处理,如分词 message_info = process_user_sentence(message) if is_QA(message_info): attention = "QA" #实现对对话数据的保存 self.message_manager.user_message_pipeline(user_id, message, create_time, attention, entity=message_info["entity"]) recall_list,entity = self.recall.predict(message_info) line, score = self.dnnsort.predict(message,recall_list) if score > 0.7: ans = self.recall.get_answer(line) user_response = ans["ans"] else: user_response = "不好意思,这个问题我还没学习到..." else: attention = "chat" # 实现对对话数据的保存 self.message_manager.user_message_pipeline(user_id,message,create_time,attention,entity=message_info["entity"]) user_response = self.chatbot.predict(message) self.message_manager.bot_message_pipeline(user_id,user_response) user_response = user_response create_time = int(time.time()) return chatbot_pb2.ResponsedMessage(user_response=user_response,create_time=create_time) def serve(): import grpc from concurrent import futures # 多线程服务器 server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) # 注册本地服务 chatbot_pb2_grpc.add_ChatBotServiceServicer_to_server(chatServicer(), server) # 监听端口 server.add_insecure_port("[::]:9999") # 开始接收请求进行服务 server.start() # 使用 ctrl+c 可以退出服务 try: time.sleep(1000) except KeyboardInterrupt: server.stop(0) if __name__ == '__main__': serve()
#!/bin/bash
cd `$dirname`|exit 0
#source activate ds
python grpc_predict.py
添加可执行权限:chmod +x 文件名
supervisor现在的官方版本还是python2的,但是可以使用下面的命令安装python3版本
pip3 install git+https://github.com/Supervisor/supervisor
完成supervisor的配置文件的编写,conf中使用分号作为注释符号
;conf.d [program:chat_service] command=/root/chat_service/run.sh ;执行的命令 stdout_logfile=/root/chat_service/log/out.log ;log的位置 stderr_logfile=/root/chat_service/log/error.log ;错误log的位置 directory=/root/chat_service ;路径 autostart=true ;是否自动启动 autorestart=true ;是否自动重启 startretries=10 ;失败的最大尝试次数
在supervisor的基础配置中添加上述配置文件
;/etc/supervisord/supervisor.conf
[include]
files=/root/chat_service/conf.d
运行supervisord
supervisord -c /etc/supervisord/supervisor.conf
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。