赞
踩
初始化项目:
Flask
、sqlite3
、openai
、redis
、numpy
、scikit-learn
、python-dotenv
等。配置数据库和缓存:
定义 Flask 应用:
实现核心功能:
创建 app.py
文件并安装相关包:
pip install Flask sqlite3 openai redis numpy scikit-learn python-dotenv
在项目根目录创建 .env
文件,并在其中添加 OpenAI API 密钥:
OPENAI_API_KEY=your_openai_api_key
创建初始化数据库的脚本 init_db.py
:
- import sqlite3
-
- conn = sqlite3.connect('qa_corpus.db')
- cursor = conn.cursor()
-
- # 创建QA语料库表
- cursor.execute('''
- CREATE TABLE IF NOT EXISTS qa_corpus (
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- question TEXT NOT NULL,
- answer TEXT NOT NULL
- )
- ''')
-
- # 创建用户问题表
- cursor.execute('''
- CREATE TABLE IF NOT EXISTS user_questions (
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- question TEXT NOT NULL,
- resolved BOOLEAN NOT NULL
- )
- ''')
-
- conn.commit()
- conn.close()

运行 init_db.py
初始化数据库:
python init_db.py
在 app.py
中定义 Flask 应用和路由:
- from flask import Flask, request, jsonify
- import sqlite3
- import redis
- import numpy as np
- from sklearn.feature_extraction.text import TfidfVectorizer
- from sklearn.metrics.pairwise import cosine_similarity
- import os
- from dotenv import load_dotenv
- import openai
-
- # 加载环境变量
- load_dotenv()
-
- # 配置 Flask 应用
- app = Flask(__name__)
-
- # 配置 Redis 连接
- redis_client = redis.Redis(host='localhost', port=6379, db=0)
-
- # 配置 OpenAI API 密钥
- openai.api_key = os.getenv("OPENAI_API_KEY")
-
- # 配置 SQLite 数据库连接
- DATABASE = 'qa_corpus.db'
-
- # 获取数据库连接
- def get_db_connection():
- conn = sqlite3.connect(DATABASE)
- conn.row_factory = sqlite3.Row
- return conn
-
- # 缓存语料库
- def cache_corpus():
- conn = get_db_connection()
- cursor = conn.cursor()
- cursor.execute('SELECT * FROM qa_corpus')
- corpus = cursor.fetchall()
- conn.close()
-
- corpus_dict = {row['id']: {'question': row['question'], 'answer': row['answer']} for row in corpus}
-
- # 将语料库缓存到 Redis
- redis_client.set('qa_corpus', corpus_dict)
-
- # 获取缓存的语料库
- def get_cached_corpus():
- corpus = redis_client.get('qa_corpus')
- if corpus:
- return corpus
- else:
- cache_corpus()
- return redis_client.get('qa_corpus')

- # 编码用户问题
- def encode_question(question):
- # 使用 OpenAI 模型对问题进行编码
- response = openai.Embedding.create(input=[question], model="text-embedding-ada-002")
- embedding = response['data'][0]['embedding']
- return np.array(embedding)
- # 查找相似问题
- def find_similar_question(encoded_question, corpus):
- questions = [qa['question'] for qa in corpus.values()]
-
- # 使用Tf-idf进行特征编码
- vectorizer = TfidfVectorizer().fit_transform(questions)
- vectors = vectorizer.toarray()
-
- # 计算相似度
- similarities = cosine_similarity([encoded_question], vectors)
- most_similar_index = np.argmax(similarities)
- most_similar_id = list(corpus.keys())[most_similar_index]
- similarity = similarities[0][most_similar_index]
-
- return most_similar_id, similarity
-

- # 保存用户问题到数据库
- def save_user_question(question, resolved):
- conn = get_db_connection()
- cursor = conn.cursor()
- cursor.execute('INSERT INTO user_questions (question, resolved) VALUES (?, ?)', (question, resolved))
- conn.commit()
- conn.close()
- @app.route('/ask', methods=['POST'])
- def ask_question():
- user_question = request.json.get('question')
-
- # 对用户问题进行编码
- encoded_question = encode_question(user_question)
- corpus = get_cached_corpus()
- similar_question_id, similarity = find_similar_question(encoded_question, corpus)
-
- # 返回相似问题的答案或默认回复
- if similarity >= 0.8:
- answer = corpus[similar_question_id]['answer']
- response = {'answer': answer, 'similarity': similarity}
- else:
- response = {'answer': '对不起,我无法理解您的问题。', 'similarity': similarity}
-
- # 保存用户问题到数据库
- save_user_question(user_question, similarity >= 0.8)
-
- return jsonify(response)

- @app.route('/add_question', methods=['POST'])
- def add_question():
- question = request.json.get('question')
- answer = request.json.get('answer')
-
- conn = get_db_connection()
- cursor = conn.cursor()
- cursor.execute('INSERT INTO qa_corpus (question, answer) VALUES (?, ?)', (question, answer))
- conn.commit()
- conn.close()
-
- # 更新缓存
- cache_corpus()
-
- return jsonify({'status': 'success', 'message': '问题已成功添加到语料库中。'})
- @app.route('/hot_words', methods=['GET'])
- def hot_words():
- conn = get_db_connection()
- cursor = conn.cursor()
- cursor.execute('SELECT question FROM user_questions')
- questions = cursor.fetchall()
- conn.close()
-
- # 分词和统计词频
- all_words = ' '.join([q['question'] for q in questions]).split()
- word_freq = {word: all_words.count(word) for word in set(all_words)}
- sorted_word_freq = sorted(word_freq.items(), key=lambda item: item[1], reverse=True)
-
- return jsonify({'hot_words': sorted_word_freq[:10]})
确保在启动应用之前,已经正确配置了 Redis,并且已经初始化了 SQLite 数据库。
- if __name__ == '__main__':
- # 首次启动时缓存语料库
- cache_corpus()
- app.run(host='0.0.0.0', port=5000)
init_db.py
)。.env
文件中或环境变量中)。python app.py
。可以使用 curl 或 Postman 来测试 API 端点
curl -X POST http://localhost:5000/ask -H "Content-Type: application/json" -d '{"question": "你们的营业时间是?"}'
curl -X POST http://localhost:5000/add_question -H "Content-Type: application/json" -d '{"question": "你们的营业时间是?", "answer": "我们每天早上9点到晚上9点营业。"}'
curl http://localhost:5000/hot_words
- def get_db_connection():
- # 创建并返回一个 SQLite 数据库连接
- conn = sqlite3.connect(DATABASE)
- conn.row_factory = sqlite3.Row # 使查询结果以字典形式返回
- return conn
- def cache_corpus():
- conn = get_db_connection()
- cursor = conn.cursor()
- cursor.execute('SELECT * FROM qa_corpus')
- corpus = cursor.fetchall()
- conn.close()
-
- # 将语料库转换为字典形式
- corpus_dict = {row['id']: {'question': row['question'], 'answer': row['answer']} for row in corpus}
-
- # 将语料库缓存到 Redis
- redis_client.set('qa_corpus', corpus_dict)
- def get_cached_corpus():
- # 从 Redis 获取缓存的语料库
- corpus = redis_client.get('qa_corpus')
- if corpus:
- return corpus
- else:
- # 如果缓存不存在,重新缓存语料库
- cache_corpus()
- return redis_client.get('qa_corpus')
- def encode_question(question):
- # 使用 OpenAI 模型对问题进行编码
- response = openai.Embedding.create(input=[question], model="text-embedding-ada-002")
- embedding = response['data'][0]['embedding']
- return np.array(embedding)
- def find_similar_question(encoded_question, corpus):
- # 从语料库中提取所有问题
- questions = [qa['question'] for qa in corpus.values()]
-
- # 使用Tf-idf进行特征编码
- vectorizer = TfidfVectorizer().fit_transform(questions)
- vectors = vectorizer.toarray()
-
- # 计算用户问题与语料库中问题的相似度
- similarities = cosine_similarity([encoded_question], vectors)
- most_similar_index = np.argmax(similarities)
- most_similar_id = list(corpus.keys())[most_similar_index]
- similarity = similarities[0][most_similar_index]
-
- return most_similar_id, similarity
- def save_user_question(question, resolved):
- # 将用户问题保存到数据库
- conn = get_db_connection()
- cursor = conn.cursor()
- cursor.execute('INSERT INTO user_questions (question, resolved) VALUES (?, ?)', (question, resolved))
- conn.commit()
- conn.close()
- @app.route('/ask', methods=['POST'])
- def ask_question():
- user_question = request.json.get('question')
-
- # 对用户问题进行编码
- encoded_question = encode_question(user_question)
- corpus = get_cached_corpus()
- similar_question_id, similarity = find_similar_question(encoded_question, corpus)
-
- # 返回相似问题的答案或默认回复
- if similarity >= 0.8:
- answer = corpus[similar_question_id]['answer']
- response = {'answer': answer, 'similarity': similarity}
- else:
- response = {'answer': '对不起,我无法理解您的问题。', 'similarity': similarity}
-
- # 保存用户问题到数据库
- save_user_question(user_question, similarity >= 0.8)
-
- return jsonify(response)

- @app.route('/add_question', methods=['POST'])
- def add_question():
- question = request.json.get('question')
- answer = request.json.get('answer')
-
- # 将新的问答对插入到数据库
- conn = get_db_connection()
- cursor = conn.cursor()
- cursor.execute('INSERT INTO qa_corpus (question, answer) VALUES (?, ?)', (question, answer))
- conn.commit()
- conn.close()
-
- # 更新缓存
- cache_corpus()
-
- return jsonify({'status': 'success', 'message': '问题已成功添加到语料库中。'})

- @app.route('/hot_words', methods=['GET'])
- def hot_words():
- conn = get_db_connection()
- cursor = conn.cursor()
- cursor.execute('SELECT question FROM user_questions')
- questions = cursor.fetchall()
- conn.close()
-
- # 分词和统计词频
- all_words = ' '.join([q['question'] for q in questions]).split()
- word_freq = {word: all_words.count(word) for word in set(all_words)}
- sorted_word_freq = sorted(word_freq.items(), key=lambda item: item[1], reverse=True)
-
- return jsonify({'hot_words': sorted_word_freq[:10]})
确保在启动应用之前,已经正确配置了 Redis,并且已经初始化了 SQLite 数据库。
- if __name__ == '__main__':
- # 首次启动时缓存语料库
- cache_corpus()
- app.run(host='0.0.0.0', port=5000)
安装 Redis:
python init_db.py
设置 OpenAI API 密钥:
.env
文件,并添加你的 OpenAI API 密钥。OPENAI_API_KEY=your_openai_api_key
python app.py
可以使用 curl
或 Postman
来测试 API 端点
curl -X POST http://localhost:5000/ask -H "Content-Type: application/json" -d '{"question": "你们的营业时间是?"}'
响应示例:
json
- {
- "answer": "我们每天早上9点到晚上9点营业。",
- "similarity": 0.95
- }
curl -X POST http://localhost:5000/add_question -H "Content-Type: application/json" -d '{"question": "你们的营业时间是?", "answer": "我们每天早上9点到晚上9点营业。"}'
响应示例:
json
- {
- "status": "success",
- "message": "问题已成功添加到语料库中。"
- }
curl http://localhost:5000/hot_words
响应示例:
json
- {
- "hot_words": [
- ["营业时间", 5],
- ["你们", 4],
- ["是", 3],
- ["什么", 2],
- ["时间", 2],
- ["几点", 1],
- ["到", 1],
- ["晚上", 1],
- ["早上", 1],
- ["我们", 1]
- ]
- }
数据库模块:
缓存模块:
编码模块:
相似度计算模块:
API 模块:
通过上述步骤,我们构建了一个简单的智能客服系统。该系统能够处理用户提问、添加新的问答对,并提供热门词汇查询功能。每个模块的实现都经过详细的解释,确保代码易于理解和维护。希望这些内容对你有所帮助!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。