赞
踩
RAG利用了向量数据库和大型语言模型(LLM)的能力来提升回答质量。这一过程可以被分解为以下几个主要部分:
数据入库:
在数据的准备过程中,通过特定的加载器将各种模态的信息进行导入,由于各种信息的大小参差不齐,故需要对其进行切片处理,在将每个部分进行切片后,embedding到特定维度的向量,将源数据喝向量一起存储到向量数据库中。
常见的向量数据库引擎有:FAISS、Chromadb、ES、Milvus,本文采用Milvus进行实践
检索生成:
在调用的过程中,先将用户的提问Question进行embedding(1.直接进行embedding;2.使用LLM对Question进行关键信息抽取后embedding,类似于Tools调用的方式),将embedding放进向量数据库进行搜索
将Question和检索到的数据使用一定格式进行拼接形成prompt。最后,使用LLM进行生成答案
数据准备阶段:数据提取——>文本分割——>向量化(embedding)——>数据入库
应用阶段:用户提问——>数据检索(召回)——>注入Prompt——>LLM生成答案
一般知识库可通过爬虫的方式进行获取,这里仅爬取:标题、时间、URL
import requests from bs4 import BeautifulSoup import pandas as pd from urllib.parse import urljoin def scrape_page(url, data=[]): # 发送请求 response = requests.get(url) response.encoding = 'utf-8' # 检查请求是否成功 if response.status_code == 200: # 解析网页内容 soup = BeautifulSoup(response.text, 'html.parser') # 找到通知信息所在的标签 notifications = soup.find_all('li') # 遍历每个通知,提取标题、发布日期和超链接URL for notification in notifications: if notification.get('id') and notification.get('id').startswith('line_'): title = notification.find('a').text.strip() date = notification.find('span', class_='listTime').text.strip() link = urljoin(url, notification.find('a')['href']) data.append({'title': title, 'date': date, 'url': link}) print({'title': title, 'date': date, 'url': link}) # 检查是否存在下一页的超链接 next_page_link = soup.find('span', class_='p_next p_fun') if next_page_link: next_page_url = urljoin(url, next_page_link.find('a')['href']) print(f"存在下一页,继续获取: {next_page_url}") scrape_page(next_page_url, data) else: print("已到最后一页") else: print("请求失败") # 起始页面URL start_url = "https://jwc/tzgg.htm" # 爬取通知信息 data = [] scrape_page(start_url, data) # 将通知信息转换为DataFrame df = pd.DataFrame(data) # 将DataFrame保存为CSV文件 df.to_csv('./csv/notifications.csv', index=False)
文本嵌入(text embedding)是将文本数据映射到高维空间中的向量表示的技术。这些向量捕捉了文本的语义和语法信息,使得计算机可以更好地理解和处理文本数据。文本嵌入技术通常使用深度学习模型,如Word2Vec、GloVe、BERT等,这些模型可以根据文本的上下文关系和语言规律,将每个单词或者每个文本片段转换成一个高维向量。这些向量可以用于各种自然语言处理任务,如文本分类、情感分析、命名实体识别等。通过文本嵌入,计算机可以更好地理解和处理自然语言。
下面使用embedding API测试text embedding的效果
import os
from dotenv import load_dotenv
load_dotenv(".env")
from scipy.spatial import distance
text_list = ["我爱编程", "编程使我快乐", "我爱吃蛋炒饭", "今天中午吃什么"]
embd_zhipu = []
def get_embedding(text, platform, model):
text = text.replace("\n", " ")
return platform.embeddings.create(input=text,model=model).data[0].embedding
使用智谱AI
from zhipuai import ZhipuAI
zhipuai = ZhipuAI(api_key=os.environ.get("ZHIPU_API_KEY"))
for text in text_list:
embd = get_embedding(text, zhipuai, "embedding-2")
embd_zhipu.append(embd)
# calculate vector distance
distance_matirx = distance.cdist(embd_zhipu, embd_zhipu, metric="euclidean")
distance_matirx
使用OpenAI接口
from openai import OpenAI
client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY"))
embd_openai = []
for text in text_list:
embd = get_embedding(text, client, "text-embedding-ada-002")
embd_openai.append(embd)
distance_matirx = distance.cdist(embd_openai, embd_openai, metric="euclidean")
distance_matirx
两种模型embedding的维度
len(embd_openai[0])
# 1536
len(embd_zhipu[0])
# 1024
这里采用主流的向量数据库:Milvus,测试环节可以直接使用他们的线上服务https://zilliz.com/
注册好在里面创建一个starter版本的cluster即可
封装两个函数分别用于知识库的编码,以及查询语句的编码
def get_dashscope_embeddings(texts): embeddings = DashScopeEmbeddings( model="text-embedding-v2", dashscope_api_key="sk-f9414689d64e4cadb700f84bf7dea6a1" ) return embeddings.embed_documents(texts) def get_dashscope_query_embedding(text): embeddings = DashScopeEmbeddings( model="text-embedding-v2", dashscope_api_key="sk-f9414689d64e4cadb700f84bf7dea6a1" ) return embeddings.embed_query(text) print(len(get_dashscope_embeddings(["hello world", "hello"])[0])) print(len(get_dashscope_query_embedding(["hello world"]))) ''' 1536 1536 '''
连接zilliz
from pymilvus import utility, FieldSchema, DataType, CollectionSchema, Collection, connections
CLUSTER_ENDPOINT = "***"
TOKEN = "***"
connections.connect(
alias='default',
# Public endpoint obtained from Zilliz Cloud
uri=CLUSTER_ENDPOINT,
# API key or a colon-separated cluster username and password
token=TOKEN,
)
新建一个collection,相当于sql里面的表
def get_schema(): field1 = FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True) field2 = FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=512) # field3 = FieldSchema(name="url", dtype=DataType.VARCHAR, max_length=512) field4 = FieldSchema(name="text_vector", dtype=DataType.FLOAT_VECTOR, dim=1536) schema = CollectionSchema(fields=[field1, field2, field4]) return schema def recreate_collection(collection_name): # collection_name = "notifications" utility.drop_collection(collection_name=collection_name) schema = get_schema() collection = Collection(name=collection_name, schema=schema) index_params = { "index_type": "AUTOINDEX", "metric_type": "L2", "params": {} } collection.create_index( field_name="text_vector", index_params=index_params, index_name='vector_idx' ) collection.load() return collection def get_collection(collection_name): return Collection(name=collection_name)
插入数据函数
def insert_data(collection, df):
vectors = df['embedding'].tolist()
data = [
{"text": f"{title} {url}", "text_vector": vector} for title, url, vector in zip(df['title'], df['url'], vectors)
]
collection.insert(data)
# read csv dir
import pandas as pd
import os
collection = recreate_collection("notifications")
for file in os.listdir('csv'):
df = pd.read_csv(f'csv/{file}')
# df['embedding'] = get_openai_embedding(df['title'].tolist())
df['embedding'] = get_dashscope_embeddings(df['title'].tolist())
insert_data(collection, df)
collection.flush()
print(f"inserted {file}, total entities: {collection.num_entities}")
搜索测试
def search(collection, query_embedding, top_k=10): search_params = { "metric_type": "L2", "params": {"level": 2} } results = collection.search( query_embedding, anns_field="text_vector", param=search_params, limit=top_k, output_fields=["text"] ) return results # query_embedding = get_openai_embedding(["单项奖学金"])[0] collection = get_collection("notifications") query_embedding = get_dashscope_query_embedding(["2021级转专业拟录取"]) results = search(collection, [query_embedding]) for i in results[0]: print(f"{i.entity.get('text')}\n")
question = "有关于2023级转专业的信息你知道哪些?" embedding = get_dashscope_query_embedding(question) results = search(collection, [embedding]) context = "" for i in results[0]: context += f"{i.entity.get('text')}\n" prompt = f'''你是回答问题的助理。使用以下检索到的上下文来回答问题,在回复答案时请携带原文链接(如有)。如果你不知道答案,就说你不知道。 上下文: {context} 问题: {question} 回答:''' from langchain_community.llms import Tongyi llm = Tongyi(model="qwen-plus") res = llm.invoke(prompt) print(res) ''' 有关2023级本科生转专业的信息,我找到了一个通知和一个公示名单: 1. **通知**:《关于做好2023级、2022级本科生转专业工作的通知》(原文链接:https://jwc/info/1057/5025.htm) 这个通知详细说明了2023级和2022级本科生转专业的工作安排和要求。 2. **公示名单**:《关于2023年特殊原因转专业拟录取学生名单公示》(原文链接:https://jwc/info/1057/4560.htm) 这个公示列出了因特殊原因申请转专业的2023级学生的拟录取名单。 这些资料应该能提供2023级转专业的一些基本流程和结果。如果有更具体的问题或需要更多信息,请告知。 '''
RAG的原理其实很简单,就是向量检索+Prompt!
langchain是一个开源的LLM工具,用于开发LLM的下游应用
很多LLM、Text embedding、Vector-db的API接口都被集成在了langchain-community中
from dotenv import load_dotenv load_dotenv() import os from langchain_community.embeddings import DashScopeEmbeddings from langchain import hub from langchain_core.runnables import RunnablePassthrough from langchain_community.vectorstores import Milvus from langchain_community.llms import Tongyi llm = Tongyi(model="qwen-plus") vectorstore = Milvus( embedding_function = DashScopeEmbeddings(model="text-embedding-v2"), collection_name = "notifications", connection_args={ "uri": os.environ.get("ZILLIZ_CLOUD_URI"), "token": os.environ.get("ZILLIZ_CLOUD_API_KEY"), # API key, for serverless clusters which can be used as replacements for user and password "secure": True, }, primary_field="id", text_field="text", vector_field="text_vector", ) retriever = vectorstore.as_retriever(search_kwargs=dict(k=5)) from langchain.prompts import PromptTemplate prompt = PromptTemplate( template=f'''你是回答问题的助理,请使用正常的文本格式进行输出,并保证输出格式的良好的可读性,在合适的位置使用换行符。使用以下检索到的上下文来回答问题,在回复答案时请携带原文链接(如有)。如果你不知道答案,就说你不知道。 上下文: {context} 问题: {question} 回答:''', input_variables=["context", "question"], ) def format_docs(docs): return "\n".join(doc.page_content for doc in docs) rag_chain = ( {"context": retriever | format_docs, "question": RunnablePassthrough()} | prompt | llm ) ''' 我找到了两份与2023级转专业相关的通知: 1. **关于做好2023级、2022级本科生转专业工作的通知**:该通知详细阐述了2023级和2022级本科生转专业的相关政策和流程。你可以通过以下链接查看详细内容:[https://jwc/info/1057/5025.htm](https://jwc/info/1057/5025.htm) 2. **关于2023年特殊原因转专业拟录取学生名单公示**:这份公示列出了因特殊原因申请转专业并在2023年被拟录取的学生名单。详细名单可见:[https://jwc/info/1057/4560.htm](https://jwc/info/1057/4560.htm) 这些资料应该能为你提供2023级转专业的基本信息。如果你需要更具体的信息,如转专业的要求、截止日期或具体程序,可能需要查看上述链接的详细内容或直接咨询相关部门。 '''
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。