赞
踩
LangChain 就是一个 LLM 编程框架,你想开发一个基于 LLM 应用,需要什么组件它都有,直接使用就行;甚至针对常规的应用流程,它利用链(LangChain中Chain的由来)这个概念已经内置标准化方案了。下面我们从新兴的大语言模型(LLM)技术栈的角度来看看为何它的理念这么受欢迎。
参考文档:
一文全面搞懂LangChain
Milvus在我之前的文章中,已经有提到,并且讲述了如何使用Milvus Java SDK来构建存储,但在更广泛的LLM应用的场景中,Python是更为流行的开发语言,因此本文章为了能够快速构建一个基于LLM的分析任务,选择使用Python脚本。
用户输入一个问题和相关的统计数据,并且提供了额外的一些文章/文档(PDF, TXT, DOC等),希望LLM能够结合这些信息来回答用户的问题,并给出合理地建议。
对文章切片,并进行向量化存入Milvus。
# 将文件内容,切分成N个块,每个块500个字符,同时连续两个段落之前会重复50字符 def load_and_split_text_file(self, file_path, chunk_size=500, chunk_overlap=50): file_abs_path = os.path.expanduser(file_path) filename = os.path.basename(file_abs_path) loader = TextLoader(file_abs_path) documents = loader.load() text_splitter = CharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap) docs = text_splitter.split_documents(documents) segments = [] for doc in docs: if len(doc.page_content) <= 0: continue info = { "filename": filename, "text": str(doc.page_content) } segments.append(info) return segments def insert_info_data(self, infos): """ 将数据插入到Milvus中。 """ if len(infos) <= 0: print("empty infos, return") return # 构建列式数据集 entities = [ { "name": "filename", "type": DataType.VARCHAR, "values": [info["filename"] for info in infos] }, { "name": "title", "type": DataType.VARCHAR, "values": [info["title"] for info in infos] }, { "name": "embeddings", "type": DataType.FLOAT_VECTOR, # 使用AzureOpenAI,对每一个文章分段,进行向量化 "values": [self.embed_by_azureopenai(info["text"]) for info in infos] }, { "name": "text", "type": DataType.VARCHAR, "values": [info["text"] for info in infos] } ] # 插入数据到Milvus ids = self.insert_data(entities) # 持久化 self._milvus.flush([self.collection_name]) insert_info_data(load_and_split_text_file('test_document.txt'))
def get_formatted_documents(question: str, document_names: Optional[list[str]]) -> str: """ 格式化根据相似性搜索得到的文章内容 """ content = transform_documents_to_str(search_documents(question, document_names)) tokens_num = _estimate_tokens_num(content) if tokens_num > 3000: # TODO: using map/reduce to generate the summary #raise RuntimeError(f"Too many tokens {tokens_num} encoded from docs by model {models.model_factory.CHAT_MODEL_DEFAULT.model_name}, hence we will try to output the summary based on your question!") agent = initialize_agent( llm=models.model_factory.get_chat_model(temperature=0), tools=tools.copilot_tools.get_tools(), agent=AgentType.CHAT_ZERO_SHOT_REACT_DESCRIPTION, verbose=True, max_tokens=1000, handle_parsing_errors=True) return agent.invoke({ "input": "Summary the documents according to the question:\n\n# question\n{}\n# content\n{}".format(question, "\n".join(documents)) })['output'] return content def search_documents(question: str, document_names: Optional[list[str]]) -> list[dict]: """ 根据相似性搜索,查找与question相关的文章内容 """ content = [] try: if document_names is not None and len(document_names) > 0: datas = [] for document_name in document_names: results = milvus_utility.search_by_titles(question, [document_name]) datas.extend(results) print("data len:{}".format(len(datas))) return datas except BaseException: print(sys.exc_info()) if len(content) > 0: print("!!!!!!!!!!!search result valid!!!!!!!!!!!!!!") return content
关于Chain,这里一共定义了两大类Chain:
一类是summary_chain
,负责根据问题、数据、相关文章内容,生成一个总结性的报告;一类是output chain
,如publish_chain
,负责对cummary_chain的输出结果再加工,然后转发给客户端。
关于Cache,每一个Chain就是与LLM的一次交互,很容易想到的一个问题是,对于相同的输出内容,实际上它的输出结果应该也是相同的,因此这里需要对每一个Chain进行缓存的判断,但由于是样例代码Cache实现地比较粗糙,仅仅基于threading.Lock
和{}
,故更好的做法是引入第三方的缓存框架。
def summary_by_chain(questionAndData: list, document_names: Optional[list[str]], **kwargs): # 生成用于描述问题、统计数据以及相关的文章内容的Prompt question, prompt = PromptGenerator.generate_prompt_for_chain( questionAndData, document_names, lambda text, docs: documents_helper.get_formatted_documents(text, docs)) # 构建一个回答用户question的工作链 summary_chain = utils.chain_util.debug_wrapper(_construct_chain_with_one_prompt(prompt, question=question, **kwargs), **kwargs) outputs_chain = RunnableParallel( # 这个字段缓存了summary_chain的结果 summaries=lambda x: x['input'], # append a publishing chain to the summary chain publish_output=lambda x: _generate_publish_output_with_cache(x['input'], **kwargs), # other chain need input other_output=lambda x: _generate_output_with_cache(x['input'], **kwargs), ) # outputs chain's input comes from summary chain # outputs_chain的输入变量,来自于summary_chain的输出 outputs = outputs_chain.invoke({'input': _generate_summary_with_cache(questionAndData, documents, summary_chain)}) output = { 'documents': document_names, 'referenced_documents': document_names, 'summary': outputs['summaries'], 'dynamics': { 'publish_content': outputs['publish_output'], 'other_output': outputs['other_output'] } } print("Output from AI: " + str(output)) return output def _construct_chain_with_one_prompt(prompt: str, **kwargs): # 可以提供语言,以使得LLM按指定的语言输出结果 summary_chain = ( {'language': lambda x: kwargs['language']} | ChatPromptTemplate.from_template(prompt) | model_factory.CHAT_MODEL_DEFAULT | StrOutputParser()) final_chain = summary_chain.with_config(tags=['summary_chain']) return final_chain def _generate_summary_with_cache(questionAndData, documents, chain, **kwargs): # 根据问题及数据,找到是否存在缓存,如果有,则直接返回之前已经生成的output, # 否则才需要执行chain cached_summary = GLOBAL_CACHE.load(questionAndData=questionAndData, documents=documents, cache_type="summaries") return cached_summary if cached_summary is not None else GLOBAL_CACHE.save(questionAndData, documents, lambda x: chain.invoke(""), cache_type="summaries") def _generate_publish_output_with_cache(content, **kwargs): # 对content进行加密,以此作为识别不同content的依据,利用Cache逻辑, # 避免不必要的Chat md5 = hashlib.md5(content.encode('utf-8')) cache_key = "PUBLISH:" + md5.hexdigest() return _generate_output_with_cache(cache_key, content, lambda x: PublishTool.get_publish_chain(**kwargs).invoke({'summaries': x}))
由于内容检索的好坏,会由如下几点影响:
针对影响1:可以考虑使用更加格式化的文档格式,如Markdown, XML等,以避免chunk的切分方式,提升切分后的段落内聚性;
针对影响2:考虑利用LLMChainExtractor
工具,检索到相关文章内容后,通过绑定的LLM对原文章内容做精简,达到去冗余的目的;同时考虑结合MMR(Maximal Marginal Relevance)搜索算法,使返回的文章内容尽量相关且多样化,避免文章内容过于聚焦。
Chain很明显的一个问题是,我们不得不事先定义好工作链,并为每一个Agent构造合适的Prompt,而无法充分利用LLM模型对任务的拆解能力,理论上可以通过Agent模式,让LLM分析任务、自动调用function call / tool,最终完成整个任务流,以取代Chain的固定模式。
但Agent也有缺点,就是稳定性和时效性。所谓稳定性,是每一次都需要AI交换确认下一步的工作是什么,一旦Clues信息描述不清楚以及输入数据难以理解,很容易产生不符合预期的结果;对于时效性,下一步应该做什么都需要经过一次LLM交互,因此时间必须是远大于Chain链的固定模式。
当然也可以结合Agent和Chain的优点完成工作。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。