当前位置:   article > 正文

RAG进阶(二): RAG 融合(rag fusion)_rag如何融合多源数据

rag如何融合多源数据

在上一篇博客中,我们学习了多重查询(Multi Query)技术,Multi Query的基本思想是当用户输入查询语句(自然语言)时,我们让大模型(LLM)基于用户的问题再生成多个查询语句,这些生成的查询语句是对用户查询语句的补充,它们是从不同的视角来补充用户的查询语句,然后每条查询语句都会从向量数据库中检索到一批相关文档,最后所有的相关文档都会被喂给LLM,这样LLM就会生成比较完整和全面的答案。这样就可以避免因为查询语句的差异而导致结果不正确。如下图所示:

今天我们来介绍RAG 融合(rag fusion),它的主要思想是在Multi Query的基础上,对其检索结果进行重新排序(即reranking)后输出Top K个最相关文档,最后将这top k个文档喂给LLM并生成最终的答案(answer)。如下图所示:

 一、环境配置

我们需要安装如下python包:

pip install langchain langchain_openai langchain_pinecone langchainhub

接下来我们需要导入所需要设置本次实验所需要用的几个api key:OPENAI_API_KEY,PINECONE_API_KEY,LANGCHAIN_API_KEY,这里需要说明的是本次实验会使用到openai的gpt-3.5-turbo模型,Pinecone向量数据库(PINECONE_API_KEY), LangSmith(LANGCHAIN_API_KEY).

pinecone云向量数据库是一个在线的云端向量数据库,我们需要去其官网申请api key,   LangSmith 是用来跟踪和分析langchain组件在执行过程中产生的中间结果,这对我们理解langchain组件的功能和作用有很大的帮助,因此我们也需要去langchain官网申请一个api key。

  1. import os
  2. from dotenv import load_dotenv, find_dotenv
  3. from langchain_openai import OpenAIEmbeddings
  4. from langchain_pinecone import PineconeVectorStore
  5. #导入项目中需要用到的各种的api_key
  6. _ = load_dotenv(find_dotenv()) # read local .env file
  7. os.environ['OPENAI_API_KEY']=os.environ['OPENAI_API_KEY']
  8. os.environ['PINECONE_API_KEY']=os.environ['PINECONE_API_KEY']
  9. # 导入langsmith所需要的api key,用于跟踪中间结果
  10. os.environ['LANGCHAIN_TRACING_V2'] = 'true'
  11. os.environ['LANGCHAIN_ENDPOINT'] = 'https://api.smith.langchain.com'
  12. os.environ['LANGCHAIN_API_KEY'] = os.environ['LANGCHAIN_API_KEY']

接下来我们还需要去pinecone官网创建一个向量数据库,这里我们创建了一个名为:rag-fusion的空向量数据库,后面需要被检索的文档向量会被自动上传到该向量数据库中:

下面我们来创建一组本次实验所需的测试文档集合,一共10个文档,每个文档为一句中文的句子,每个文档的内容在语义上基本都和气候变化相关:

  1. all_documents={
  2. "doc1": "气候变化和经济影响。",
  3. "doc2": "气候变化引起的公共卫生问题。",
  4. "doc3": "气候变化:社会视角。",
  5. "doc4": "气候变化的技术解决方案。",
  6. "doc5": "应对气候变化需要改变政策。",
  7. "doc6": "气候变化及其对生物多样性的影响。",
  8. "doc7": "气候变化:科学和模型。",
  9. "doc8": "全球变暖:气候变化的一个子集。",
  10. "doc9": "气候变化如何影响日常天气。",
  11. "doc10": "气候变化行动主义的历史。",
  12. }

接下来我们需要创建pinecone的向量数据库,在创建向量数据库时,我们指定使用openai的embedding模型,以及向量数据库名(rag-fusion), 需要说明的是这里我们使用的是from_texts的方法来创建向量数据库,它的作用是往云端的向量数据库"rag-fusion"中上传文档向量,这样云端的"rag-fusion"向量库就不再是一个空的数据库了:

  1. vectorstore = PineconeVectorStore.from_texts(
  2. list(all_documents.values()), OpenAIEmbeddings(), index_name="rag-fusion"
  3. )

二、定义查询生成器(Multi Query)

我们现在将定义一个chain来生成多重查询语句,如果对多重查询还不熟悉的朋友,可以查看我之前写的这篇博客。这里我们会首先创建生成多重查询的prompt, 我们可以从langchain官网拉取预先定义好的prompt, 也可以手动定义prompt:

  1. from langchain_core.output_parsers import StrOutputParser
  2. from langchain_openai import ChatOpenAI
  3. from langchain import hub
  4. prompt = hub.pull("langchain-ai/rag-fusion-query-generation")
  5. prompt

 同样我们也可以手动创建多重查询的prompt:

  1. # prompt = ChatPromptTemplate.from_messages([
  2. # ("system", "You are a helpful assistant that generates multiple search queries based on a single input query."),
  3. # ("user", "Generate multiple search queries related to: {original_query}"),
  4. # ("user", "OUTPUT (4 queries):")
  5. # ])

接下来我们来创建一个生成多重查询的chain, 该chain会根据用户的query生成4个多角度的query, 这些多角度的query是对用户原始query的补充。

  1. generate_queries = (
  2. prompt | ChatOpenAI(temperature=0) | StrOutputParser() | (lambda x: x.split("\n"))
  3. )
  4. original_query = "气候变化的影响"
  5. queries = generate_queries.invoke({"original_query": original_query})
  6. queries

 

这里我们看到用户的原始问题是: 气候变化的影响,generate_queries根据用户的问题生成了4个多角度的问题来对用户问题进行补充。

三、定义完整链

我们现在可以将它们放在一起并定义完整的用于检索的chain。该chain的作用是:

 1. 生成一组查询(queries)
 2. 在检索器中对每个query进行检索
 3. 使用倒排序排名融合方法将所有结果连接在一起

请注意,该chain不执行最后的生成步骤(不会将top k的检索结果喂给LLM)

original_query = "气候变化的影响"

接下来我们来创建一个向量库的实例并通过该向量库实例来创建一个检索器。

  1. vectorstore = PineconeVectorStore.from_existing_index("rag-fusion", OpenAIEmbeddings())
  2. retriever = vectorstore.as_retriever()

 下面我们需要定义倒排序排名算法(Reciprocal Rank Fusion (RRF)),该算法来源于这篇论文:Reciprocal Rank Fusion outperforms Condorcet and individual Rank Learning Methods,下面是该算法在论文中的定义:

 RRF 是与滑铁卢大学 (CAN) 和 Google 合作开发的,用其作者的话说,“比任何单独的系统产生更好的结果,比标准的”重新排名方法更好。我们简单解释一下该算法的原理,在RRF算法中,D表示相关文档的全集,k是固定常数60,r(d)表示当前文档d在其子集中的位置。该算法会对文档全集D进行二重遍历,外层遍历文档全集D, 内层遍历文档子集,在做内层变量的时候我们会累计当前文档在其所在子集中的位置并取倒数作为其权重(分数)。

下面是RRF算法的python实现: 

  1. from langchain.load import dumps, loads
  2. def reciprocal_rank_fusion(results: list[list], k=60):
  3. """ Reciprocal_rank_fusion that takes multiple lists of ranked documents
  4. and an optional parameter k used in the RRF formula """
  5. # Initialize a dictionary to hold fused scores for each unique document
  6. fused_scores = {}
  7. # Iterate through each list of ranked documents
  8. for docs in results:
  9. # Iterate through each document in the list, with its rank (position in the list)
  10. for rank, doc in enumerate(docs):
  11. # Convert the document to a string format to use as a key (assumes documents can be serialized to JSON)
  12. doc_str = dumps(doc)
  13. # If the document is not yet in the fused_scores dictionary, add it with an initial score of 0
  14. if doc_str not in fused_scores:
  15. fused_scores[doc_str] = 0
  16. # Retrieve the current score of the document, if any
  17. previous_score = fused_scores[doc_str]
  18. # Update the score of the document using the RRF formula: 1 / (rank + k)
  19. fused_scores[doc_str] += 1 / (rank + k)
  20. # Sort the documents based on their fused scores in descending order to get the final reranked results
  21. reranked_results = [
  22. (loads(doc), score)
  23. for doc, score in sorted(fused_scores.items(), key=lambda x: x[1], reverse=True)
  24. ]
  25. # Return the reranked results as a list of tuples, each containing the document and its fused score
  26. return reranked_results

 下面我们来创建一个完整的chain,它由generate_queries ,retriever.map() ,reciprocal_rank_fusion三部分组成,其中generate_queries会生成4个多角度的query, retriever.map()的作用是根据generate_queries的结果映射出4个retriever(可以理解为同时复制出4个retriever)与中generate_queries会生成4个query对应,并为每个query检索出来的一组相关文档集(默认为4个相关文档),那么4个query总共可以生成16个相关文档。这16个相关文档集最后会经过RRF算法从新排序后输出最终的4个相关度最高的文档:

  1. original_query = "气候变化的影响"
  2. chain = generate_queries | retriever.map() | reciprocal_rank_fusion
  3. chain.invoke({"original_query": original_query})

 这里我们看到了经过最终的RRF算法进行重拍以后的4个最相关的文档,并且从高倒低罗列出了每个相关文档的得分,下面我们来分析一下这些分数是如何统计出来的,为此我们需要提取那些在执行RRF算法之前的结果:

  1. chain1 = generate_queries | retriever.map()
  2. chain1_result = chain1.invoke({"original_query": original_query})
  3. chain1_result

 

 下面我们可以根据RRF算法在论文中的定义,手动来计算上面这些相关文档的分数:

  1. #气候变化和经济影响。0.16344044051606188
  2. score0 = 1/60+1/61+1/62+1/60+1/61+1/62+1/63+1/60+1/61+1/62
  3. #气候变化引起的公共卫生问题。0.049189141547682
  4. score1 = 1/60+1/61+1/62
  5. #气候变化及其对生物多样性的影响。0.16344044051606188
  6. score2 = 1/63+1/63
  7. # 气候变化如何影响日常天气。0.015873015873015872
  8. score3 = 1/63
  9. print(score0)
  10. print(score1)
  11. print(score2)
  12. print(score3)

 这里我们看到我们手动计算的分数与RRF的python算法计算的分数是一致的。

下面我们可以在LangSmith平台中查看最终chain的执行过程中的中间结果,如下图的左侧为最终chain的所有组件如:ChatPromptTemplate,ChatOpenAI,Retriever(4个),reciprocal_rank_fusion,下图的右侧为每个组件所对应的输入和输出的内容:

下图为ChatPromptTemplate组件对应的输入和输出结果:

 下图为ChatOpenAI(LLM)组件对应的输入和输出结果:

  下图为第一个Retriever的输入和输出结果:

下图为最后一个Retriever的输入和输出结果: 

下图为RRF算法的输入和输出结果: 

 

 四、完整程序

最后我们来编写一个完整的RAG-FUSION的程序,在这个程序中我们会检索百度百科中的一篇关于恐龙的文章,同时我们使用Chroma做为本地存储的向量数据库,bge的中文embedding模型。

  1. from langchain.text_splitter import RecursiveCharacterTextSplitter
  2. from langchain_community.vectorstores import Chroma
  3. from langchain_core.output_parsers import StrOutputParser
  4. from langchain_core.runnables import RunnablePassthrough
  5. from langchain.prompts import ChatPromptTemplate
  6. from operator import itemgetter
  7. from langchain.embeddings import HuggingFaceBgeEmbeddings
  8. from langchain_openai import ChatOpenAI
  9. from langchain_community.document_loaders import WebBaseLoader
  10. from langchain.load import dumps, loads
  11. #定义RRF算法函数
  12. def reciprocal_rank_fusion(results: list[list], k=60):
  13. """ Reciprocal_rank_fusion that takes multiple lists of ranked documents
  14. and an optional parameter k used in the RRF formula """
  15. # Initialize a dictionary to hold fused scores for each unique document
  16. fused_scores = {}
  17. # Iterate through each list of ranked documents
  18. for docs in results:
  19. # Iterate through each document in the list, with its rank (position in the list)
  20. for rank, doc in enumerate(docs):
  21. # Convert the document to a string format to use as a key (assumes documents can be serialized to JSON)
  22. doc_str = dumps(doc)
  23. # If the document is not yet in the fused_scores dictionary, add it with an initial score of 0
  24. if doc_str not in fused_scores:
  25. fused_scores[doc_str] = 0
  26. # Retrieve the current score of the document, if any
  27. previous_score = fused_scores[doc_str]
  28. # Update the score of the document using the RRF formula: 1 / (rank + k)
  29. fused_scores[doc_str] += 1 / (rank + k)
  30. # Sort the documents based on their fused scores in descending order to get the final reranked results
  31. reranked_results = [
  32. (loads(doc), score)
  33. for doc, score in sorted(fused_scores.items(), key=lambda x: x[1], reverse=True)
  34. ]
  35. # Return the reranked results as a list of tuples, each containing the document and its fused score
  36. return reranked_results
  37. # 0.加载bge embedding模型
  38. bge_embeddings = HuggingFaceBgeEmbeddings(model_name="BAAI/bge-small-zh-v1.5")
  39. # 1.加载文档
  40. loader = WebBaseLoader("https://baike.baidu.com/item/恐龙/139019")
  41. docs = loader.load()
  42. # 2.创建文档分割器,并分割文档
  43. text_splitter = RecursiveCharacterTextSplitter(chunk_size=512,chunk_overlap=0)
  44. splits = text_splitter.split_documents(docs)
  45. # 3.创建向量数据库
  46. vectorstore = Chroma.from_documents(documents=splits,embedding=bge_embeddings)
  47. # 4.创建检索器
  48. retriever = vectorstore.as_retriever()
  49. template1 = """You are a helpful assistant that generates multiple search queries based on a single input query. \n
  50. Generate multiple search queries related to: {question} \n
  51. Output (4 queries):"""
  52. prompt_rag_fusion = ChatPromptTemplate.from_template(template1)
  53. generate_queries = (
  54. prompt_rag_fusion
  55. | ChatOpenAI(temperature=0)
  56. | StrOutputParser()
  57. | (lambda x: x.split("\n"))
  58. )
  59. retrieval_chain_rag_fusion = generate_queries | retriever.map() | reciprocal_rank_fusion
  60. # RAG
  61. template2 = """Answer the following question based on this context:
  62. {context}
  63. Question: {question}
  64. """
  65. prompt = ChatPromptTemplate.from_template(template2)
  66. final_rag_chain = (
  67. {"context": retrieval_chain_rag_fusion,
  68. "question": itemgetter("question")}
  69. | prompt
  70. | ChatOpenAI(temperature=0)
  71. | StrOutputParser()
  72. )
  73. question='恐龙是怎么灭绝的?'
  74. final_rag_chain.invoke({"question":question})

最后我们可以在LangSmith平台中查看langchain的各个组件的输入,输出结果。

 

 

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/869733
推荐阅读
相关标签
  

闽ICP备14008679号