当前位置:   article > 正文

从零到一,打造顶尖RAG应用!揭秘RAG技术构建全流程!_搭建 高质量 rag 库

搭建 高质量 rag 库

通过本文你可以了解到:

  • 什么是RAG?
  • 如何搭建一个RAG应用?
  • 目前开源的RAG应用有哪些?

RAG简介

在这里插入图片描述

检索增强生成Retrieval Augmented Generation,RAG)是一种强大的工具,整合了从庞大知识库中检索到的相关信息,并以此为基础,指导大型语言模型生成更为精准的答案,从而显著提升了回答的准确性与深度。

2020 年,Meta AI 研究人员提出了RAG的方法,用于提高 LLM 在特定任务上的性能。LLM 擅长语言理解、推理和生成等任务,但也存在一些问题:

  • 信息滞后:LLM 的知识是静态的,来源于当时训练时的数据,也就是 LLM 无法直接提供最新的信息。
  • 模型幻觉:实践表明,当前的生成式 AI 技术存在一定的幻觉,而在一些常见的业务应用中,我们是希望保证事实性的。
  • 私有数据匮乏:LLM 的训练数据主要来源于互联网公开的数据,而垂类领域、企业内部等有很多专属知识,这部分是 LLM 无法直接提供的。
  • 内容不可追溯: LLM 生成的内容往往缺乏明确的信息来源,影响内容的可信度。RAG 将生成内容与检索到的原始资料建立链接,增强了内容的可追溯性,从而提升了用户对生成内容的信任度。
  • 长文本处理能力较弱: LLM 在理解和生成长篇内容时受限于有限的上下文窗口,且必须按顺序处理内容,输入越长,速度越慢。RAG 通过检索和整合长文本信息,强化了模型对长上下文的理解和生成,有效突破了输入长度的限制,同时降低了调用成本,并提升了整体的处理效率。

RAG 通过将检索到的相关信息提供给 LLM,让 LLM 进行参考生成,可以较好地缓解上述问题。因此,合理使用 RAG 可以拓展 LLM 的知识边界,使其不仅能够访问专属知识库,还能动态地引入最新的数据,从而在生成响应时提供更准确、更新的信息。

RAG组成部分

自定义知识库,用于RAG检索的知识来源:

  • 结构化的数据库形态:比如MySQL
  • 非结构化的文档体系:比如文件、图片、音频、视频

RAG 是一个完整的系统,其工作流程可以简单地分为数据处理、检索、增强和生成四个阶段:

数据处理阶段

对原始数据进行清洗和处理。
将处理后的数据转化为检索模型可以使用的格式。
将处理后的数据存储在对应的数据库中。

检索阶段

将用户的问题输入到检索系统中,从数据库中检索相关信息。

增强阶段

对检索到的信息进行处理和增强,以便生成模型可以更好地理解和使用。

生成阶段

将增强后的信息输入到生成模型中,生成模型根据这些信息生成答案。

搭建RAG应用

数据处理
数据清洗和处理

数据处理阶段,一般需要对知识库中的数据进行数据清洗,比如去掉多余的换行、特殊符号,然后加载处理后的文件和分块:

  • 加载文件:使用langchain下的document_loaders加载pdf、docs、txt、md等格式文件
  • 文本分块:分块的方式有很多,选择不同的分块方法、分块大小、chunk_overlap,对最后的检索结果有影响,这一阶段也有RAG的优化点之一
import os

from langchain_community.document_loaders import PyPDFLoader, Docx2txtLoader, UnstructuredFileLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter

def load_document(file):
    """
    加载PDF、DOC、TXT文档
    :param file:
    :return:
    """
    name, extension = os.path.splitext(file)

    if extension == '.pdf':
        print(f'Loading {file}')
        loader = PyPDFLoader(file)
    elif extension == '.docx':
        print(f'Loading {file}')
        loader = Docx2txtLoader(file)
    elif extension == '.txt':
        loader = UnstructuredFileLoader(file)
    else:
        print('Document format is not supported!')
        return None
    data = loader.load()
    return data

def chunk_data(data, chunk_size=256, chunk_overlap=150):
    """
    将数据分割成块
    :param data:
    :param chunk_size: chunk块大小
    :param chunk_overlap: 重叠部分大小
    :return:
    """
    text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
    chunks = text_splitter.split_documents(data)
    return chunks


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
embedding模型存储

将分块后的文本,使用embedding模型持久化存储,目前常用的中文模型bge-large-zh-v1.5。持久化存储后,避免每次都去embedding一次,消耗很长的时间。下次使用时,直接加载模型就可以了。

import os

from langchain_community.embeddings import HuggingFaceBgeEmbeddings, OpenAIEmbeddings
from langchain_community.vectorstores import Chroma, FAISS

def get_embedding(embedding_name):
    """
    根据embedding名称去加载embedding模型
    :param embedding_name: 路径或者名称
    :return:
    """
    if embedding_name == "bge":
        embedding_path = os.environ[embedding_name]
        model_kwargs = {'device': 'cpu'}
        return HuggingFaceBgeEmbeddings(model_name=embedding_path, model_kwargs=model_kwargs)
    if embedding_name == "bce":
        return None

# create embeddings using OpenAIEmbeddings() and save them in a Chroma vector store
def create_embeddings_chroma(chunks):
    embeddings = OpenAIEmbeddings()
    vector_store = Chroma.from_documents(chunks, embeddings)

    # if you want to use a specific directory for chromadb
    # vector_store = Chroma.from_documents(chunks, embeddings, persist_directory='./mychroma_db')
    return vector_store

def create_embeddings_faiss(vector_db_path, embedding_name, chunks):
    """
    使用FAISS向量数据库,并保存
    :param vector_db_path: 向量
    :param embedding_name:
    :param chunks:
    :return:
    """
    embeddings = get_embedding(embedding_name)
    db = FAISS.from_documents(chunks, embeddings)

    if not os.path.isdir(vector_db_path):
        os.mkdir(vector_db_path)

    db.save_local(folder_path=vector_db_path)
    return db

def load_embeddings_faiss(vector_db_path, embedding_name):
    """
    加载向量库
    :param vector_db_path:
    :param embedding_name:
    :return:
    """
    embeddings = get_embedding(embedding_name)
    db = FAISS.load_local(vector_db_path, embeddings, allow_dangerous_deserialization=True)
    return db



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
构建模型

采用了函数和类两种方式定义模型:

  • 函数:get_llm_model定义了基本的参数,model、prompt、temperature、max_tokens、n_ctx
  • 自定义类:
import os
import sys
import time
from abc import ABC

from langchain_core.callbacks import CallbackManagerForLLMRun
from llama_cpp import Llama
from langchain.llms.base import LLM
from pydantic import Field
from typing import Dict, Any, Mapping, Optional, List

BASE_DIR = os.path.dirname(__file__)
# PRJ_DIR上层目录
# PRJ_DIR = os.path.abspath(os.path.join(BASE_DIR, ".."))
sys.path.append(BASE_DIR)

def get_llm_model(
        prompt: str = None,
        model: str = None,
        temperature: float = 0.0,
        max_token: int = 2048,
        n_ctx: int = 512):
    """
    根据模型名称去加载模型,返回response数据
    :param prompt:
    :param model:
    :param temperature:
    :param max_token:
    :param n_ctx:
    :return:
    """
    if model in ['Qwen_q2']:
        model_path = os.environ[model]
        llm = Llama(model_path=model_path, n_ctx=n_ctx)
        start = time.time()
        response = llm.create_chat_completion(
            messages=[
                {
                    "role": "system",
                    "content": "你是一个智能超级助手,请用专业的词语回答问题,整体上下文带有逻辑性,如果不知道,请不要乱说",
                },
                {
                    "role": "user",
                    "content": "{}".format(prompt)
                },
            ],
            temperature=temperature,
            max_tokens=max_token,
            stream=False
        )
        cost = time.time() - start
        print(f"模型生成时间:{cost}")
        print(f"大模型回复:\n{response}")
        return response['choices'][0]['message']['content']

class QwenLLM(LLM):
    """
    自定义QwenLLM
    """
    model_name: str = "Qwen_q2"
    # 访问时延上限
    request_timeout: float = None
    # 温度系数
    temperature: float = 0.1
    # 窗口大小
    n_ctx = 2048
    # token大小
    max_tokens = 1024
    # 必备的可选参数
    model_kwargs: Dict[str, Any] = Field(default_factory=dict)

    def _call(self, prompt: str, stop: Optional[List[str]] = None,
              run_manager: Optional[CallbackManagerForLLMRun] = None,
              **kwargs: Any):
        qwen_path = os.environ[self.model_name]
        print("qwen_path:", qwen_path)

        llm = Llama(model_path=qwen_path, n_ctx=self.n_ctx)

        response = llm.create_chat_completion(
            messages=[
                {
                    "role": "system",
                    "content": "你是一个智能超级助手,请用[中文]专业的词语回答问题,整体上下文带有逻辑性,并以markdown格式输出",
                },
                {
                    "role": "user",
                    "content": "{}".format(prompt)
                },
            ],
            temperature=self.temperature,
            max_tokens=self.max_tokens,
            stream=False
        )

        # prompt工程提示

        # print(f"Qwen prompt: \n{prompt}")
        # response = lla(
        #     prompt=prompt,
        #     temperature=self.temperature,
        #     max_tokens=self.max_tokens
        # )
        print(f"Qwen response: \n{response}")
        # return response['choices'][0]['text']
        return response['choices'][0]['message']['content']

    @property
    def _llm_type(self) -> str:
        return "Llama3"

        # 定义一个返回默认参数的方法

    @property
    def _default_params(self) -> Dict[str, Any]:
        """获取调用默认参数。"""
        normal_params = {
            "temperature": self.temperature,
            "request_timeout": self.request_timeout,
            "n_ctx": self.n_ctx,
            "max_tokens": self.max_tokens
        }
        # print(type(self.model_kwargs))
        return {**normal_params}

    @property
    def _identifying_params(self) -> Mapping[str, Any]:
        """Get the identifying parameters."""
        return {**{"model_name": self.model_name}, **self._default_params}


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
构建应用
import sys

import streamlit as st
import os
from langchain.chains import RetrievalQA
from langchain.chat_models import ChatOpenAI
import tiktoken
from dotenv import load_dotenv, find_dotenv
from langchain_core.prompts import PromptTemplate

BASE_DIR = os.path.dirname(__file__)
PRJ_DIR = os.path.abspath(os.path.join(BASE_DIR, ".."))
sys.path.append(PRJ_DIR)

from streamlit_demo.custom_llm import QwenLLM
from streamlit_demo.embedding_oper import create_embeddings_faiss, create_embeddings_chroma, load_embeddings_faiss
from streamlit_demo.prepare_data import load_document, chunk_data

_ = load_dotenv(find_dotenv(), override=True)
vector_db_path = os.path.join(BASE_DIR, "vector_db")
print(f"vector_db_path: {vector_db_path}")

DEFAULT_TEMPLATE = """
    你是一个聪明的超级智能助手,请用专业且富有逻辑顺序的句子回复,并以中文形式且markdown形式输出。
    检索到的信息:
    {context}
    问题:
    {question}
"""

def ask_and_get_answer_from_local(model_name, vector_db, prompt, top_k=5):
    """
    从本地加载大模型
    :param model_name: 模型名称
    :param vector_db:
    :param prompt:
    :param top_k:
    :return:
    """
    docs_and_scores = vector_db.similarity_search_with_score(prompt, k=top_k)
    print("docs_and_scores: ", docs_and_scores)
    # knowledge = [doc.page_content for doc in docs_and_scores]
    # print("检索到的知识:", knowledge)
    if model_name == "Qwen_q2":
        llm = QwenLLM(model_name=model_name, temperature=0.4)
    prompt_template = PromptTemplate(input_variables=["context", "question"], template=DEFAULT_TEMPLATE)
    retriever = vector_db.as_retriever(search_type='similarity', search_kwargs={'k': top_k})
    chain = RetrievalQA.from_chain_type(llm=llm,
                                        chain_type="stuff",
                                        retriever=retriever,
                                        chain_type_kwargs={"prompt": prompt_template},
                                        return_source_documents=True)
    answer = chain({"query": prompt, "top_k": top_k})
    print(f"answers: {answer}")
    # answer = chain.run(prompt)
    # answer = answer['choices'][0]['message']['content']
    answer = answer['result']
    return answer

def ask_and_get_answer(vector_store, q, k=3):
    llm = ChatOpenAI(model='gpt-3.5-turbo', temperature=1)
    retriever = vector_store.as_retriever(search_type='similarity', search_kwargs={'k': k})
    chain = RetrievalQA.from_chain_type(llm=llm, chain_type="stuff", retriever=retriever)

    answer = chain.run(q)
    return answer

# calculate embedding cost using tiktoken
def calculate_embedding_cost(texts):
    enc = tiktoken.encoding_for_model('text-embedding-ada-002')
    total_tokens = sum([len(enc.encode(page.page_content)) for page in texts])
    # print(f'Total Tokens: {total_tokens}')
    # print(f'Embedding Cost in USD: {total_tokens / 1000 * 0.0004:.6f}')
    return total_tokens, total_tokens / 1000 * 0.0004

# clear the chat history from streamlit session state
def clear_history():
    if 'history' in st.session_state:
        del st.session_state['history']

if __name__ == "__main__":

    # st.image('img.png')
    st.subheader('LLM Question-Answering Application 声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签