当前位置:   article > 正文

基于Milvus向量数据库实现检索增强生成(RAG)_milvus rag

milvus rag

简介

RAG利用了向量数据库和大型语言模型(LLM)的能力来提升回答质量。这一过程可以被分解为以下几个主要部分:
在这里插入图片描述
数据入库:
在数据的准备过程中,通过特定的加载器将各种模态的信息进行导入,由于各种信息的大小参差不齐,故需要对其进行切片处理,在将每个部分进行切片后,embedding到特定维度的向量,将源数据喝向量一起存储到向量数据库中。

常见的向量数据库引擎有:FAISS、Chromadb、ES、Milvus,本文采用Milvus进行实践
在这里插入图片描述
检索生成:
在调用的过程中,先将用户的提问Question进行embedding(1.直接进行embedding;2.使用LLM对Question进行关键信息抽取后embedding,类似于Tools调用的方式),将embedding放进向量数据库进行搜索

将Question和检索到的数据使用一定格式进行拼接形成prompt。最后,使用LLM进行生成答案

数据准备阶段:数据提取——>文本分割——>向量化(embedding)——>数据入库
应用阶段:用户提问——>数据检索(召回)——>注入Prompt——>LLM生成答案

知识库的获取

一般知识库可通过爬虫的方式进行获取,这里仅爬取:标题、时间、URL

import requests
from bs4 import BeautifulSoup
import pandas as pd
from urllib.parse import urljoin


def scrape_page(url, data=[]):
    # 发送请求
    response = requests.get(url)
    response.encoding = 'utf-8'

    # 检查请求是否成功
    if response.status_code == 200:
        # 解析网页内容
        soup = BeautifulSoup(response.text, 'html.parser')

        # 找到通知信息所在的标签
        notifications = soup.find_all('li')

        # 遍历每个通知,提取标题、发布日期和超链接URL
        for notification in notifications:
            if notification.get('id') and notification.get('id').startswith('line_'):
                title = notification.find('a').text.strip()
                date = notification.find('span', class_='listTime').text.strip()
                link = urljoin(url, notification.find('a')['href'])
                data.append({'title': title, 'date': date, 'url': link})
                print({'title': title, 'date': date, 'url': link})

        # 检查是否存在下一页的超链接
        next_page_link = soup.find('span', class_='p_next p_fun')
        if next_page_link:
            next_page_url = urljoin(url, next_page_link.find('a')['href'])
            print(f"存在下一页,继续获取: {next_page_url}")
            scrape_page(next_page_url, data)
        else:
            print("已到最后一页")
    else:
        print("请求失败")


# 起始页面URL
start_url = "https://jwc/tzgg.htm"

# 爬取通知信息
data = []
scrape_page(start_url, data)

# 将通知信息转换为DataFrame
df = pd.DataFrame(data)

# 将DataFrame保存为CSV文件
df.to_csv('./csv/notifications.csv', index=False)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53

知识库的向量化

文本嵌入(text embedding)是将文本数据映射到高维空间中的向量表示的技术。这些向量捕捉了文本的语义和语法信息,使得计算机可以更好地理解和处理文本数据。文本嵌入技术通常使用深度学习模型,如Word2Vec、GloVe、BERT等,这些模型可以根据文本的上下文关系和语言规律,将每个单词或者每个文本片段转换成一个高维向量。这些向量可以用于各种自然语言处理任务,如文本分类、情感分析、命名实体识别等。通过文本嵌入,计算机可以更好地理解和处理自然语言。
下面使用embedding API测试text embedding的效果

import os
from dotenv import load_dotenv
load_dotenv(".env")

from scipy.spatial import distance
text_list = ["我爱编程", "编程使我快乐", "我爱吃蛋炒饭", "今天中午吃什么"]
embd_zhipu = []

def get_embedding(text, platform, model):
    text = text.replace("\n", " ")
    return platform.embeddings.create(input=text,model=model).data[0].embedding
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

使用智谱AI

from zhipuai import ZhipuAI
zhipuai = ZhipuAI(api_key=os.environ.get("ZHIPU_API_KEY"))

for text in text_list:
    embd = get_embedding(text, zhipuai, "embedding-2")
    embd_zhipu.append(embd)

# calculate vector distance
distance_matirx = distance.cdist(embd_zhipu, embd_zhipu, metric="euclidean")
distance_matirx
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

在这里插入图片描述
使用OpenAI接口

from openai import OpenAI
client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY"))

embd_openai = []
for text in text_list:
    embd = get_embedding(text, client, "text-embedding-ada-002")
    embd_openai.append(embd)
    
distance_matirx = distance.cdist(embd_openai, embd_openai, metric="euclidean")
distance_matirx
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

在这里插入图片描述
两种模型embedding的维度

len(embd_openai[0])
# 1536

len(embd_zhipu[0])
# 1024
  • 1
  • 2
  • 3
  • 4
  • 5

向量入库

这里采用主流的向量数据库:Milvus,测试环节可以直接使用他们的线上服务https://zilliz.com/
注册好在里面创建一个starter版本的cluster即可

封装两个函数分别用于知识库的编码,以及查询语句的编码

def get_dashscope_embeddings(texts):
    embeddings = DashScopeEmbeddings(
        model="text-embedding-v2",
        dashscope_api_key="sk-f9414689d64e4cadb700f84bf7dea6a1"
    )
    return embeddings.embed_documents(texts)


def get_dashscope_query_embedding(text):
    embeddings = DashScopeEmbeddings(
        model="text-embedding-v2",
        dashscope_api_key="sk-f9414689d64e4cadb700f84bf7dea6a1"
    )
    return embeddings.embed_query(text)

print(len(get_dashscope_embeddings(["hello world", "hello"])[0]))
print(len(get_dashscope_query_embedding(["hello world"])))

'''
1536
1536
'''
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

连接zilliz

from pymilvus import utility, FieldSchema, DataType, CollectionSchema, Collection, connections

CLUSTER_ENDPOINT = "***"
TOKEN = "***"

connections.connect(
  alias='default', 
  #  Public endpoint obtained from Zilliz Cloud
  uri=CLUSTER_ENDPOINT,
  # API key or a colon-separated cluster username and password
  token=TOKEN, 
)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

新建一个collection,相当于sql里面的表

def get_schema():
    field1 = FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True)
    field2 = FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=512)
    # field3 = FieldSchema(name="url", dtype=DataType.VARCHAR, max_length=512)
    field4 = FieldSchema(name="text_vector", dtype=DataType.FLOAT_VECTOR, dim=1536)
    schema = CollectionSchema(fields=[field1, field2, field4])
    return schema

def recreate_collection(collection_name):
    # collection_name = "notifications"
    utility.drop_collection(collection_name=collection_name)
    schema = get_schema()
    collection = Collection(name=collection_name, schema=schema)
    index_params = {
        "index_type": "AUTOINDEX",
        "metric_type": "L2",
        "params": {}
    }
    collection.create_index(
        field_name="text_vector",
        index_params=index_params,
        index_name='vector_idx'
    )
    collection.load()
    return collection

def get_collection(collection_name):
    return Collection(name=collection_name)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

插入数据函数

def insert_data(collection, df):
    vectors = df['embedding'].tolist()
    data = [
        {"text": f"{title} {url}", "text_vector": vector} for title, url, vector in zip(df['title'], df['url'], vectors)
    ]
    collection.insert(data)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
# read csv dir
import pandas as pd
import os
collection = recreate_collection("notifications")
for file in os.listdir('csv'):
    df = pd.read_csv(f'csv/{file}')
    # df['embedding'] = get_openai_embedding(df['title'].tolist())
    df['embedding'] = get_dashscope_embeddings(df['title'].tolist())
    insert_data(collection, df)
    collection.flush()
    print(f"inserted {file}, total entities: {collection.num_entities}")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

搜索测试

def search(collection, query_embedding, top_k=10):
    search_params = {
        "metric_type": "L2",
        "params": {"level": 2}
    }
    results = collection.search(
        query_embedding,
        anns_field="text_vector",
        param=search_params,
        limit=top_k,
        output_fields=["text"]
    )
    return results

# query_embedding = get_openai_embedding(["单项奖学金"])[0]
collection = get_collection("notifications")
query_embedding = get_dashscope_query_embedding(["2021级转专业拟录取"])
results = search(collection, [query_embedding])

for i in results[0]:
    print(f"{i.entity.get('text')}\n")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

检索生成

question = "有关于2023级转专业的信息你知道哪些?"
embedding = get_dashscope_query_embedding(question)
results = search(collection, [embedding])
context = ""
for i in results[0]:
    context += f"{i.entity.get('text')}\n"

prompt = f'''你是回答问题的助理。使用以下检索到的上下文来回答问题,在回复答案时请携带原文链接(如有)。如果你不知道答案,就说你不知道。
上下文: {context} 
问题: {question} 
回答:'''

from langchain_community.llms import Tongyi
llm = Tongyi(model="qwen-plus")
res = llm.invoke(prompt)
print(res)

'''
有关2023级本科生转专业的信息,我找到了一个通知和一个公示名单:

1. **通知**:《关于做好2023级、2022级本科生转专业工作的通知》(原文链接:https://jwc/info/1057/5025.htm)
这个通知详细说明了2023级和2022级本科生转专业的工作安排和要求。

2. **公示名单**:《关于2023年特殊原因转专业拟录取学生名单公示》(原文链接:https://jwc/info/1057/4560.htm)
这个公示列出了因特殊原因申请转专业的2023级学生的拟录取名单。

这些资料应该能提供2023级转专业的一些基本流程和结果。如果有更具体的问题或需要更多信息,请告知。
'''
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

RAG的原理其实很简单,就是向量检索+Prompt!

使用langchain进行流程优化

langchain是一个开源的LLM工具,用于开发LLM的下游应用
很多LLM、Text embedding、Vector-db的API接口都被集成在了langchain-community中

from dotenv import load_dotenv
load_dotenv()
import os
from langchain_community.embeddings import DashScopeEmbeddings
from langchain import hub
from langchain_core.runnables import RunnablePassthrough
from langchain_community.vectorstores import Milvus
from langchain_community.llms import Tongyi

llm = Tongyi(model="qwen-plus")

vectorstore = Milvus(
    embedding_function = DashScopeEmbeddings(model="text-embedding-v2"),
    collection_name = "notifications",
    connection_args={
        "uri": os.environ.get("ZILLIZ_CLOUD_URI"),
        "token": os.environ.get("ZILLIZ_CLOUD_API_KEY"),  # API key, for serverless clusters which can be used as replacements for user and password
        "secure": True,
    },
    primary_field="id",
    text_field="text",
    vector_field="text_vector",
)

retriever = vectorstore.as_retriever(search_kwargs=dict(k=5))

from langchain.prompts import PromptTemplate
prompt = PromptTemplate(
    template=f'''你是回答问题的助理,请使用正常的文本格式进行输出,并保证输出格式的良好的可读性,在合适的位置使用换行符。使用以下检索到的上下文来回答问题,在回复答案时请携带原文链接(如有)。如果你不知道答案,就说你不知道。
    上下文: {context} 
    问题: {question} 
    回答:''',
    input_variables=["context", "question"],
)

def format_docs(docs):
    return "\n".join(doc.page_content for doc in docs)


rag_chain = (
    {"context": retriever | format_docs, "question": RunnablePassthrough()}
    | prompt
    | llm
)

'''
我找到了两份与2023级转专业相关的通知:

1. **关于做好2023级、2022级本科生转专业工作的通知**:该通知详细阐述了2023级和2022级本科生转专业的相关政策和流程。你可以通过以下链接查看详细内容:[https://jwc/info/1057/5025.htm](https://jwc/info/1057/5025.htm)

2. **关于2023年特殊原因转专业拟录取学生名单公示**:这份公示列出了因特殊原因申请转专业并在2023年被拟录取的学生名单。详细名单可见:[https://jwc/info/1057/4560.htm](https://jwc/info/1057/4560.htm)

这些资料应该能为你提供2023级转专业的基本信息。如果你需要更具体的信息,如转专业的要求、截止日期或具体程序,可能需要查看上述链接的详细内容或直接咨询相关部门。

'''
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/代码探险家/article/detail/785567
推荐阅读
相关标签
  

闽ICP备14008679号