当前位置:   article > 正文

动手做一个最小RAG——TinyRAG

怎么做一个rag

 Datawhale干货 

作者:宋志学,Datawhale成员

大家好,我是不要葱姜蒜。

接下来我会带领大家一步一步地实现一个简单的RAG模型,这个模型是基于RAG的一个简化版本,我们称之为Tiny-RAG。Tiny-RAG是一个基于RAG的简化版本,它只包含了RAG的核心功能,即Retrieval和Generation。Tiny-RAG的目的是为了帮助大家更好地理解RAG模型的原理和实现。

OK,让我们开始吧!

1. RAG 介绍

LLM会产生误导性的 “幻觉”,依赖的信息可能过时,处理特定知识时效率不高,缺乏专业领域的深度洞察,同时在推理能力上也有所欠缺。

正是在这样的背景下,检索增强生成技术(Retrieval-Augmented Generation,RAG)应时而生,成为 AI 时代的一大趋势。

RAG 通过在语言模型生成答案之前,先从广泛的文档数据库中检索相关信息,然后利用这些信息来引导生成过程,极大地提升了内容的准确性和相关性。RAG 有效地缓解了幻觉问题,提高了知识更新的速度,并增强了内容生成的可追溯性,使得大型语言模型在实际应用中变得更加实用和可信。

RAG的基本结构有哪些呢?

  • 要有一个向量化模块,用来将文档片段向量化。

  • 要有一个文档加载和切分的模块,用来加载文档并切分成文档片段。

  • 要有一个数据库来存放文档片段和对应的向量表示。

  • 要有一个检索模块,用来根据 Query (问题)检索相关的文档片段。

  • 要有一个大模型模块,用来根据检索出来的文档回答用户的问题。

OK,那上述这些也就是 TinyRAG 仓库的所有模块内容。

a3405c63b7919a09fc13eb45eb6a2d12.png

那接下来,让我们梳理一下 RAG 的流程是什么样的呢?

  • 索引:将文档库分割成较短的 Chunk,并通过编码器构建向量索引。

  • 检索:根据问题和 chunks 的相似度检索相关文档片段。

  • 生成:以检索到的上下文为条件,生成问题的回答。

那也就是下图所示的流程,图片出处  Retrieval-Augmented Generation for Large Language Models: A Survey

2db79056383ec70629633a18df5ece49.png

2. 向量化

首先让我们来动手实现一个向量化的类,这是RAG架构的基础。向量化的类主要是用来将文档片段向量化,将一段文本映射为一个向量。

那首先我们要设置一个 Embedding 基类,这样我们再用其他的模型的时候,只需要继承这个基类,然后在此基础上进行修改即可,方便代码扩展。

  1. class BaseEmbeddings:
  2.     """
  3.     Base class for embeddings
  4.     """
  5.     def __init__(self, path: str, is_api: bool) -> None:
  6.         self.path = path
  7.         self.is_api = is_api
  8.     
  9.     def get_embedding(self, text: str, model: str) -> List[float]:
  10.         raise NotImplementedError
  11.     
  12.     @classmethod
  13.     def cosine_similarity(cls, vector1: List[float], vector2: List[float]) -> float:
  14.         """
  15.         calculate cosine similarity between two vectors
  16.         """
  17.         dot_product = np.dot(vector1, vector2)
  18.         magnitude = np.linalg.norm(vector1) * np.linalg.norm(vector2)
  19.         if not magnitude:
  20.             return 0
  21.         return dot_product / magnitude

观察一下BaseEmbeddings基类都有什么方法,首先有一个get_embedding方法,这个方法是用来获取文本的向量表示的,然后有一个cosine_similarity方法,这个方法是用来计算两个向量之间的余弦相似度的。其次在初始化类的时候设置了,模型的路径或者是否是API模型。比如使用OpenAI的Embedding API的话就需要设置self.is_api=Ture

继承BaseEmbeddings类的话,就只需要编写get_embedding方法即可,cosine_similarity方法会被继承下来,直接用就行。这就是编写基类的好处。

  1. class OpenAIEmbedding(BaseEmbeddings):
  2.     """
  3.     class for OpenAI embeddings
  4.     """
  5.     def __init__(self, path: str = '', is_api: bool = True) -> None:
  6.         super().__init__(path, is_api)
  7.         if self.is_api:
  8.             from openai import OpenAI
  9.             self.client = OpenAI()
  10.             self.client.api_key = os.getenv("OPENAI_API_KEY")
  11.             self.client.base_url = os.getenv("OPENAI_BASE_URL")
  12.     
  13.     def get_embedding(self, text: str, model: str = "text-embedding-3-large") -> List[float]:
  14.         if self.is_api:
  15.             text = text.replace("\n"" ")
  16.             return self.client.embeddings.create(input=[text], model=model).data[0].embedding
  17.         else:
  18.             raise NotImplementedError

3. 文档加载和切分

接下来我们来实现一个文档加载和切分的类,这个类主要是用来加载文档并切分成文档片段。

那我们都需要切分什么文档呢?这个文档可以是一篇文章,一本书,一段对话,一段代码等等。这个文档的内容可以是任何的,只要是文本就行。比如:pdf文件、md文件、txt文件等等。

这里只展示一部分内容了,完整的代码可以在 RAG/utils.py 文件中找到。在这个代码中可以看到,能加载的文件类型有:pdf、md、txt,只需要编写对应的函数即可。

  1. def read_file_content(cls, file_path: str):
  2.     # 根据文件扩展名选择读取方法
  3.     if file_path.endswith('.pdf'):
  4.         return cls.read_pdf(file_path)
  5.     elif file_path.endswith('.md'):
  6.         return cls.read_markdown(file_path)
  7.     elif file_path.endswith('.txt'):
  8.         return cls.read_text(file_path)
  9.     else:
  10.         raise ValueError("Unsupported file type")

那我们把文件内容都读取之后,还需要切分呀!那怎么切分呢,OK,接下来咱们就按 Token 的长度来切分文档。我们可以设置一个最大的 Token 长度,然后根据这个最大的 Token 长度来切分文档。这样切分出来的文档片段就是一个一个的差不多相同长度的文档片段了。

不过在切分的时候要注意,片段与片段之间最好要有一些重叠的内容,这样才能保证检索的时候能够检索到相关的文档片段。还有就是切分文档的时候最好以句子为单位,也就是按 \n 进行粗切分,这样可以基本保证句子内容是完整的。

  1. def get_chunk(cls, text: str, max_token_len: int = 600, cover_content: int = 150):
  2.     chunk_text = []
  3.     curr_len = 0
  4.     curr_chunk = ''
  5.     lines = text.split('\n')  # 假设以换行符分割文本为行
  6.     for line in lines:
  7.         line = line.replace(' ''')
  8.         line_len = len(enc.encode(line))
  9.         if line_len > max_token_len:
  10.             print('warning line_len = ', line_len)
  11.         if curr_len + line_len <= max_token_len:
  12.             curr_chunk += line
  13.             curr_chunk += '\n'
  14.             curr_len += line_len
  15.             curr_len += 1
  16.         else:
  17.             chunk_text.append(curr_chunk)
  18.             curr_chunk = curr_chunk[-cover_content:]+line
  19.             curr_len = line_len + cover_content
  20.     if curr_chunk:
  21.         chunk_text.append(curr_chunk)
  22.     return chunk_text

4. 数据库 && 向量检索

上面,我们做好了文档切分,也做好了 Embedding 模型的加载。那接下来就得设计一个向量数据库用来存放文档片段和对应的向量表示了。

还有就是也要设计一个检索模块,用来根据 Query (问题)检索相关的文档片段。OK,我们冲冲冲!

一个数据库对于最小RAG架构来说,需要实现几个功能呢?

  • persist:数据库持久化,本地保存

  • load_vector:从本地加载数据库

  • get_vector:获得文档的向量表示

  • query:根据问题检索相关的文档片段

嗯嗯,以上四个模块就是一个最小的RAG结构数据库需要实现的功能,具体代码可以在 RAG/VectorBase.py 文件中找到。

  1. class VectorStore:
  2.     def __init__(self, document: List[str] = ['']) -> None:
  3.         self.document = document
  4.     def get_vector(self, EmbeddingModel: BaseEmbeddings) -> List[List[float]]:
  5.         # 获得文档的向量表示
  6.         pass
  7.     def persist(self, path: str = 'storage'):
  8.         # 数据库持久化,本地保存
  9.         pass
  10.     def load_vector(self, path: str = 'storage'):
  11.         # 从本地加载数据库
  12.         pass
  13.     def query(self, query: str, EmbeddingModel: BaseEmbeddings, k: int = 1) -> List[str]:
  14.         # 根据问题检索相关的文档片段
  15.         pass

那让我们来看一下, query 方法具体是怎么实现的呢?

首先先把用户提出的问题向量化,然后去数据库中检索相关的文档片段,最后返回检索到的文档片段。可以看到咱们在向量检索的时候仅使用 Numpy 进行加速,代码非常容易理解和修改。

主要是方便改写和大家理解,并没有使用成熟的数据库,这样可以更好地理解RAG的原理。

  1. def query(self, query: str, EmbeddingModel: BaseEmbeddings, k: int = 1) -> List[str]:
  2.     query_vector = EmbeddingModel.get_embedding(query)
  3.     result = np.array([self.get_similarity(query_vector, vector)
  4.                         for vector in self.vectors])
  5.     return np.array(self.document)[result.argsort()[-k:][::-1]].tolist()

5. 大模型模块

那就来到了最后一个模块了,大模型模块。这个模块主要是用来根据检索出来的文档回答用户的问题。

一样的,我们还是先实现一个基类,这样我们在遇到其他的自己感兴趣的模型就可以快速的扩展了。

  1. class BaseModel:
  2.     def __init__(self, path: str = '') -> None:
  3.         self.path = path
  4.     def chat(self, prompt: str, history: List[dict], content: str) -> str:
  5.         pass
  6.     def load_model(self):
  7.         pass

BaseModel 包含了两个方法,chatload_model,如果使用API模型,比如OpenAI的话,那就不需要load_model方法,如果你要本地化运行的话,那还是会选择使用开源模型,那就需要load_model方法啦。

这里咱们以 InternLM2-chat-7B 模型为例

  1. class InternLMChat(BaseModel):
  2.     def __init__(self, path: str = '') -> None:
  3.         super().__init__(path)
  4.         self.load_model()
  5.     def chat(self, prompt: str, history: List = [], content: str='') -> str:
  6.         prompt = PROMPT_TEMPLATE['InternLM_PROMPT_TEMPALTE'].format(question=prompt, context=content)
  7.         response, history = self.model.chat(self.tokenizer, prompt, history)
  8.         return response
  9.     def load_model(self):
  10.         import torch
  11.         from transformers import AutoTokenizer, AutoModelForCausalLM
  12.         self.tokenizer = AutoTokenizer.from_pretrained(self.path, trust_remote_code=True)
  13.         self.model = AutoModelForCausalLM.from_pretrained(self.path, torch_dtype=torch.float16, trust_remote_code=True).cuda()

可以用一个字典来保存所有的prompt,这样比较好维护。

  1. PROMPT_TEMPLATE = dict(
  2.     InternLM_PROMPT_TEMPALTE="""先对上下文进行内容总结,再使用上下文来回答用户的问题。如果你不知道答案,就说你不知道。总是使用中文回答。
  3.         问题: {question}
  4.         可参考的上下文:
  5.         ···
  6.         {context}
  7.         ···
  8.         如果给定的上下文无法让你做出回答,请回答数据库中没有这个内容,你不知道。
  9.         有用的回答:"""
  10. )

那这样的话,我们就可以利用InternLM2模型来做RAG啦!

6.  LLM Tiny-RAG Demo

那接下来,我们就来看一下Tiny-RAG的Demo吧!

  1. from RAG.VectorBase import VectorStore
  2. from RAG.utils import ReadFiles
  3. from RAG.LLM import OpenAIChat, InternLMChat
  4. from RAG.Embeddings import JinaEmbedding, ZhipuEmbedding
  5. 没有保存数据库
  6. docs = ReadFiles('./data').get_content(max_token_len=600, cover_content=150) # 获得data目录下的所有文件内容并分割
  7. vector = VectorStore(docs)
  8. embedding = ZhipuEmbedding() # 创建EmbeddingModel
  9. vector.get_vector(EmbeddingModel=embedding)
  10. vector.persist(path='storage') # 将向量和文档内容保存到storage目录下,下次再用就可以直接加载本地的数据库
  11. question = 'git的原理是什么?'
  12. content = vector.query(question, model='zhipu', k=1)[0]
  13. chat = InternLMChat(path='model_path')
  14. print(chat.chat(question, [], content))

当然我们也可以从本地加载已经处理好的数据库,毕竟我们在上面的数据库环节已经写过这个功能啦。

  1. from RAG.VectorBase import VectorStore
  2. from RAG.utils import ReadFiles
  3. from RAG.LLM import OpenAIChat, InternLMChat
  4. from RAG.Embeddings import JinaEmbedding, ZhipuEmbedding
  5. # 保存数据库之后
  6. vector = VectorStore()
  7. vector.load_vector('./storage') # 加载本地的数据库
  8. question = 'git的原理是什么?'
  9. embedding = ZhipuEmbedding() # 创建EmbeddingModel
  10. content = vector.query(question, EmbeddingModel=embedding, k=1)[0]
  11. chat = InternLMChat(path='model_path')
  12. print(chat.chat(question, [], content))

7. 总结

经过上面的学习,你是否学会了如何搭建一个最小RAG架构呢?相信你一定学会啦,哈哈哈。

那让我们再来复习一下,一个最小RAG应该包含哪些内容叭?(此处默写!)

  • 向量化模块

  • 文档加载和切分模块

  • 数据库

  • 向量检索

  • 大模型模块

okk,你已经学会了,但别忘了给我的项目点个star哦!

项目地址:https://github.com/KMnO4-zx/TinyRAG

efc2eef19a4ee8c4f20d16ebafa35f63.png
一起“赞”三连

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/853537
推荐阅读
相关标签
  

闽ICP备14008679号