当前位置:   article > 正文

通过阿里云向量检索 Milvus 版和通义千问快速构建基于专属知识库的问答系统_如何使用 milvus 构建知识库问答

如何使用 milvus 构建知识库问答


阿里云向量检索 Milvus 版是一款 Serverless 全托管服务,确保了与开源 Milvus 的完全兼容性,并支持无缝迁移。它在开源版本的基础上增强了可扩展性,能提供大规模 AI 向量数据的相似性检索服务。凭借其开箱即用的特性、灵活的扩展能力和全链路监控告警,Milvus 云服务成为多样化 AI 应用场景的理想选择,包括多模态搜索、检索增强生成(RAG)、搜索推荐、内容风险识别等。您还可以利用开源的 Attu 工具进行可视化操作,进一步促进应用的快速开发和部署。

阿里云向量检索 Milvus 版已开启免费公测。您可以在E-MapReduce控制台,选择 EMR Serverless > Milvus,进入 Milvus 页面创建入门版的实例,公测期间您可以免费试用 Milvus 服务。



请确保您的运行环境中已安装 Python 3.8或以上版本,以便顺利安装并使用 DashScope。




pip3 install pymilvus tqdm dashscope


本文示例使用了公开数据集 CEC-Corpus。CEC-Corpus 数据集包含332篇针对各类突发事件的新闻报道,语料和标注数据,这里我们只需要提取原始的新闻稿文本,并将其向量化后入库。

git clone https://github.com/shijiebei2009/CEC-Corpus.git


  1. 创建 embedding.py 文件,内容如下所示。
  1. import os
  2. import time
  3. from tqdm import tqdm
  4. import dashscope
  5. from dashscope import TextEmbedding
  6. from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection, utility
  7. def prepareData(path, batch_size=25):
  8. batch_docs = []
  9. for file in os.listdir(path):
  10. with open(path + '/' + file, 'r', encoding='utf-8') as f:
  11. batch_docs.append(f.read())
  12. if len(batch_docs) == batch_size:
  13. yield batch_docs
  14. batch_docs = []
  15. if batch_docs:
  16. yield batch_docs
  17. def getEmbedding(news):
  18. model = TextEmbedding.call(
  19. model=TextEmbedding.Models.text_embedding_v1,
  20. input=news
  21. )
  22. embeddings = [record['embedding'] for record in model.output['embeddings']]
  23. return embeddings if isinstance(news, list) else embeddings[0]
  24. if __name__ == '__main__':
  25. current_path = os.path.abspath(os.path.dirname(__file__)) # 当前目录
  26. root_path = os.path.abspath(os.path.join(current_path, '..')) # 上级目录
  27. data_path = f'{root_path}/CEC-Corpus/raw corpus/allSourceText' # 数据下载git clone https://github.com/shijiebei2009/CEC-Corpus.git
  28. # 配置Dashscope API KEY
  29. dashscope.api_key = 'sk-630319159edb4e97a614f17f9609****'
  30. # 配置Milvus参数
  31. COLLECTION_NAME = 'CEC_Corpus'
  32. DIMENSION = 1536
  33. MILVUS_HOST = 'c-97a7d8038fb8****.milvus.aliyuncs.com'
  34. MILVUS_PORT = '19530'
  35. USER = 'root'
  36. PASSWORD = '<password>'
  37. connections.connect(host=MILVUS_HOST, port=MILVUS_PORT, user=USER, password=PASSWORD)
  38. # Remove collection if it already exists
  39. if utility.has_collection(COLLECTION_NAME):
  40. utility.drop_collection(COLLECTION_NAME)
  41. # Create collection which includes the id, title, and embedding.
  42. fields = [
  43. FieldSchema(name='id', dtype=DataType.INT64, descrition='Ids', is_primary=True, auto_id=False),
  44. FieldSchema(name='text', dtype=DataType.VARCHAR, description='Text', max_length=4096),
  45. FieldSchema(name='embedding', dtype=DataType.FLOAT_VECTOR, description='Embedding vectors', dim=DIMENSION)
  46. ]
  47. schema = CollectionSchema(fields=fields, description='CEC Corpus Collection')
  48. collection = Collection(name=COLLECTION_NAME, schema=schema)
  49. # Create an index for the collection.
  50. index_params = {
  51. 'index_type': 'IVF_FLAT',
  52. 'metric_type': 'L2',
  53. 'params': {'nlist': 1024}
  54. }
  55. collection.create_index(field_name="embedding", index_params=index_params)
  56. id = 0
  57. for news in tqdm(list(prepareData(data_path))):
  58. ids = [id + i for i, _ in enumerate(news)]
  59. id += len(news)
  60. vectors = getEmbedding(news)
  61. # insert Milvus Collection
  62. for id, vector, doc in zip(ids, vectors, news):
  63. insert_doc = (doc[:498] + '..') if len(doc) > 500 else doc
  64. ins = [[id], [insert_doc], [vector]] # Insert the title id, the text, and the text embedding vector
  65. collection.insert(ins)
  66. time.sleep(2)


COLLECTION_NAME设置Miluvs Collection名称,您可以自定义。
MILVUS_PORTMilvus实例的Proxy Port。您可以在Milvus实例的实例详情页面查看。默认为19530。

在 Attu 中您可以看到创建的 Collection,具体操作请参见 Attu操作指南

在本文示例中,我们将 Embedding 向量和新闻报道文稿一起存入 Milvus 中,同时构建索引类型采用了 IVF_FLAT,在向量检索时,同时可以召回原始文稿。


数据写入完成后,即可进行快速的向量检索。在通过提问搜索到相关的知识点后,我们可以按照特定的模板将“提问 + 知识点”作为 prompt 向 LLM 发起提问。在这里我们所使用的 LLM 是通义千问,这是阿里巴巴自主研发的超大规模语言模型,能够在用户自然语言输入的基础上,通过自然语言理解和语义分析,理解用户意图。通过提供尽可能清晰详细的指令(prompt),可以获得更符合预期的结果。这些能力都可以通过通义千问来获得。


创建 answer.py 文件,内容如下所示。

  1. import os
  2. import dashscope
  3. from dashscope import Generation
  4. from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection
  5. from embedding import getEmbedding
  6. def getAnswer(query, context):
  7. prompt = f'''请基于```内的报道内容,回答我的问题。
  8. ```
  9. {context}
  10. ```
  11. 我的问题是:{query}。
  12. '''
  13. rsp = Generation.call(model='qwen-turbo', prompt=prompt)
  14. return rsp.output.text
  15. def search(text):
  16. # Search parameters for the index
  17. search_params = {
  18. "metric_type": "L2"
  19. }
  20. results = collection.search(
  21. data=[getEmbedding(text)], # Embeded search value
  22. anns_field="embedding", # Search across embeddings
  23. param=search_params,
  24. limit=1, # Limit to five results per search
  25. output_fields=['text'] # Include title field in result
  26. )
  27. ret = []
  28. for hit in results[0]:
  29. ret.append(hit.entity.get('text'))
  30. return ret
  31. if __name__ == '__main__':
  32. current_path = os.path.abspath(os.path.dirname(__file__)) # 当前目录
  33. root_path = os.path.abspath(os.path.join(current_path, '..')) # 上级目录
  34. data_path = f'{root_path}/CEC-Corpus/raw corpus/allSourceText'
  35. # 配置Dashscope API KEY
  36. dashscope.api_key = 'sk-630319159edb4e97a614f17f9609****'
  37. # 配置Milvus参数
  38. COLLECTION_NAME = 'CEC_Corpus'
  39. DIMENSION = 1536
  40. MILVUS_HOST = 'c-97a7d8038fb8****.milvus.aliyuncs.com'
  41. MILVUS_PORT = '19530'
  42. USER = 'root'
  43. PASSWORD = '<password>'
  44. connections.connect(host=MILVUS_HOST, port=MILVUS_PORT, user=USER, password=PASSWORD)
  45. fields = [
  46. FieldSchema(name='id', dtype=DataType.INT64, descrition='Ids', is_primary=True, auto_id=False),
  47. FieldSchema(name='text', dtype=DataType.VARCHAR, description='Text', max_length=4096),
  48. FieldSchema(name='embedding', dtype=DataType.FLOAT_VECTOR, description='Embedding vectors', dim=DIMENSION)
  49. ]
  50. schema = CollectionSchema(fields=fields, description='CEC Corpus Collection')
  51. collection = Collection(name=COLLECTION_NAME, schema=schema)
  52. # Load the collection into memory for searching
  53. collection.load()
  54. question = '北京中央电视台工地发生大火,发生在哪里?出动了多少辆消防车?人员伤亡情况如何?'
  55. context = search(question)
  56. answer = getAnswer(question, context)
  57. print(answer)





