赞
踩
在之前的文章 “Elasticsearch:智能 RAG,获取周围分块(一) ” 里,它介绍了如何实现智能 RAG,获取周围分块。在那个文章里有一个 notebook。为了方便在本地部署的开发者能够顺利的运行那里的 notebook。在本篇文章里,我来详述如何进行配置。
如果你还没有安装好自己的 Elasticsearch 及 Kibana,请参考如下的链接来进行安装:
在安装的时候,我们选择 Elastic Stack 8.x 来进行安装。特别值得指出的是:ES|QL 只在 Elastic Stack 8.11 及以后得版本中才有。你需要下载 Elastic Stack 8.11 及以后得版本来进行安装。
在首次启动 Elasticsearch 的时候,我们可以看到如下的输出:
我们需要记下 Elasticsearch 超级用户 elastic 的密码。
我们还可以在安装 Elasticsearch 目录中找到 Elasticsearch 的访问证书:
- $ pwd
- /Users/liuxg/elastic/elasticsearch-8.14.0/config/certs
- $ ls
- http.p12 http_ca.crt transport.p12
在上面,http_ca.crt 是我们需要用来访问 Elasticsearch 的证书。
我们首先克隆已经写好的代码:
git clone https://github.com/liu-xiao-guo/elasticsearch-labs
我们然后进入到该项目的根目录下:
- $ pwd
- /Users/liuxg/python/elasticsearch-labs/supporting-blog-content/fetch-surrounding-chunks
- $ cp ~/elastic/elasticsearch-8.14.0/config/certs/http_ca.crt .
- $ ls
- README.md fetch-surrounding-chunks.ipynb
- http_ca.crt
在上面,我们把 Elasticsearch 的证书拷贝到当前的目录下。上面的 09-geospatial-search.ipynb 就是我们下面要展示的 notebook。
在下面,我们需要使用 ELSER。这是一个白金试用的功能。我们按照如下的步骤来启动白金试用:
这样我们就完成了白金试用功能。
我们在 Kibana 中,进行如下的步骤:
点击上面的拷贝按钮,我们就可以得到所需要的 Elastic API key。
为了能够使得下面的应用顺利执行,在项目当前的目录下运行如下的命令:
- export ES_ENDPOINT="localhost"
- export ES_USER="elastic"
- export ES_PASSWORD="Xw4_Nohry-LgaOum6oh-"
- export ELASTIC_API_KEY="WXhDakhwQUJFQklhemFRdVRQTkw6V3A0TFFieFZTRjJDdzFZbkF5dGVyUQ=="
在上面,我们需要根据自己的 Elasticsearch 配置来进行设置。
在我们的例程中,它讲使用哈利波特的文字来进行练习。这个文字,我们可以在地址进行获得。我们可以通过如下的方式来进行下载:
curl -o harry_potter.txt https://raw.githubusercontent.com/amephraim/nlp/master/texts/J.%20K.%20Rowling%20-%20Harry%20Potter%201%20-%20Sorcerer\'s%20Stone.txt
- $ pwd
- /Users/liuxg/python/elasticsearch-labs/supporting-blog-content/fetch-surrounding-chunks
- $ ls
- README.md fetch-surrounding-chunks.ipynb http_ca.crt
- $ curl -o harry_potter.txt https://raw.githubusercontent.com/amephraim/nlp/master/texts/J.%20K.%20Rowling%20-%20Harry%20Potter%201%20-%20Sorcerer\'s%20Stone.txt
- % Total % Received % Xferd Average Speed Time Time Time Current
- Dload Upload Total Spent Left Speed
- 100 429k 100 429k 0 0 274k 0 0:00:01 0:00:01 --:--:-- 274k
这样我们可以在当前目录下看到一个叫做 harry_potter.txt 的文件:
- $ ls
- README.md harry_potter.txt
- fetch-surrounding-chunks.ipynb http_ca.crt
pip3 install python-dotenv elasticsearch==8.14.0 pandas eland
好了,我们的一切准备工作就完成了。我们在下面,就可以打开 notebook 来进行练习了。
我们可以使用如下的命令来启动 notebook:
jupyter notebook fetch-surrounding-chunks.ipynb
- $ pwd
- /Users/liuxg/python/elasticsearch-labs/supporting-blog-content/fetch-surrounding-chunks
- $ jupyter notebook fetch-surrounding-chunks.ipynb
- !pip install elasticsearch==8.14.0
- !pip install pandas
- !python -m pip install eland
-
- import json
- import time
- import urllib.request
- import re
- import pandas as pd
- from transformers import AutoTokenizer, BertTokenizer
- from elasticsearch import Elasticsearch, helpers, exceptions
- import textwrap
如果在上面已经安装了所需要的包,那么我们可以省去上面的安装命令。
- from elasticsearch import Elasticsearch
- from dotenv import load_dotenv
- import os
- from transformers import BertTokenizer, BertForMaskedLM
-
- load_dotenv()
-
- raw_source_index = "harry_potter_dataset-raw"
- index_name = "harry_potter_dataset_enriched"
-
- dense_embedding_model_id = "sentence-transformers__all-minilm-l6-v2"
- dense_huggingface_model_id = "sentence-transformers/all-MiniLM-L6-v2"
- dense_model_number_of_allocators = 2
-
- elser_model_id = ".elser_model_2"
- elser_model_number_of_allocators = 2
-
- bert_tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")
-
-
- SEMANTIC_SEARCH_TOKEN_LIMIT = 500
- ELSER_TOKEN_OVERLAP = 0.0
-
-
- # Create the client instance
- load_dotenv()
-
- ES_USER = os.getenv("ES_USER")
- ES_PASSWORD = os.getenv("ES_PASSWORD")
- ES_ENDPOINT = os.getenv("ES_ENDPOINT")
- ELASTIC_API_KEY = os.getenv("ELASTIC_API_KEY")
-
- url = f"https://{ES_USER}:{ES_PASSWORD}@{ES_ENDPOINT}:9200"
- print(url)
-
- esclient = Elasticsearch(url, ca_certs = "./http_ca.crt", verify_certs = True)
- print(esclient.info())
如果你运行顺利的话,那么你可以看到如下的输出结果:
它表明我们的 Elasticsearch 客户端连接是成功的。
在这里,我们用到脚本来上传所需要的模型。使用 eland_import_hub_model 脚本,下载并安装 all-MiniLM-L6-v2 转换器模型。将 NLP --task-type 设置为 text_embedding。
要验证你的请求,请使用 Elastic API API 密钥。
- CA_CERT = "./http_ca.crt"
- print(url)
- !eland_import_hub_model --url $url --es-model-id {dense_embedding_model_id} --hub-model-id {dense_huggingface_model_id} --task-type text_embedding --es-api-key $ELASTIC_API_KEY --ca-cert $CA_CERT --start --clear-previous
- resp = esclient.ml.update_trained_model_deployment(
- model_id=dense_embedding_model_id,
- body={"number_of_allocations": dense_model_number_of_allocators},
- )
- print(resp)
- https://elastic:Xw4_Nohry-LgaOum6oh-@localhost:9200
- 2024-06-17 07:36:04,762 INFO : Establishing connection to Elasticsearch
- 2024-06-17 07:36:04,781 INFO : Connected to cluster named 'elasticsearch' (version: 8.14.0)
- 2024-06-17 07:36:04,781 INFO : Loading HuggingFace transformer tokenizer and model 'sentence-transformers/all-MiniLM-L6-v2'
- STAGE:2024-06-17 07:36:09 54226:14164655 ActivityProfilerController.cpp:314] Completed Stage: Warm Up
- STAGE:2024-06-17 07:36:09 54226:14164655 ActivityProfilerController.cpp:320] Completed Stage: Collection
- STAGE:2024-06-17 07:36:09 54226:14164655 ActivityProfilerController.cpp:324] Completed Stage: Post Processing
- 2024-06-17 07:36:09,768 WARNING : `SentenceTransformer._target_device` has been removed, please use `SentenceTransformer.device` instead.
- 2024-06-17 07:36:09,768 WARNING : `SentenceTransformer._target_device` has been removed, please use `SentenceTransformer.device` instead.
- 2024-06-17 07:36:09,996 WARNING : `SentenceTransformer._target_device` has been removed, please use `SentenceTransformer.device` instead.
- 2024-06-17 07:36:09,996 WARNING : `SentenceTransformer._target_device` has been removed, please use `SentenceTransformer.device` instead.
- 2024-06-17 07:36:10,705 INFO : Stopping deployment for model with id 'sentence-transformers__all-minilm-l6-v2'
- 2024-06-17 07:36:10,806 INFO : Deleting model with id 'sentence-transformers__all-minilm-l6-v2'
- 2024-06-17 07:36:10,962 INFO : Creating model with id 'sentence-transformers__all-minilm-l6-v2'
- 2024-06-17 07:36:11,120 INFO : Uploading model definition
- 100%|███████████████████████████████████████| 87/87 [00:03<00:00, 25.11 parts/s]
- 2024-06-17 07:36:14,584 INFO : Uploading model vocabulary
- 2024-06-17 07:36:14,622 INFO : Starting model deployment
- 2024-06-17 07:36:16,031 INFO : Model successfully imported with id 'sentence-transformers__all-minilm-l6-v2'
- {'assignment': {'task_parameters': {'model_id': 'sentence-transformers__all-minilm-l6-v2', 'deployment_id': 'sentence-transformers__all-minilm-l6-v2', 'model_bytes': 90303522, 'threads_per_allocation': 1, 'number_of_allocations': 2, 'queue_capacity': 1024, 'cache_size': '90303522b', 'priority': 'normal', 'per_deployment_memory_bytes': 90269696, 'per_allocation_memory_bytes': 291876956}, 'routing_table': {'PEyvsErNSXu8NbrlO_HPxA': {'current_allocations': 1, 'target_allocations': 2, 'routing_state': 'started', 'reason': ''}}, 'assignment_state': 'started', 'start_time': '2024-06-16T23:36:14.652847Z', 'max_assigned_allocations': 1}}
这个步骤,你可以参考之前的文章 “Elasticsearch:如何部署 NLP:文本嵌入和向量搜索” 来在命令行中进行部署。运行完上面的命令后,你需要在 Kibana 界面中进行选择:
对于一些开发者对 ELSER 还不是很熟的话,那么请阅我之前的文章 “Elasticsearch:部署 ELSER - Elastic Learned Sparse EncoderR”。
- # delete model if already downloaded and deployed
- try:
- esclient.ml.delete_trained_model(model_id=elser_model_id, force=True)
- print("Model deleted successfully, We will proceed with creating one")
- except exceptions.NotFoundError:
- print("Model doesn't exist, but We will proceed with creating one")
-
- # Creates the ELSER model configuration. Automatically downloads the model if it doesn't exist.
- esclient.ml.put_trained_model(
- model_id=elser_model_id, input={"field_names": ["text_field"]}
- )
在上面,它删除已经部署好的 ELSER,并重新对它进行部署。
- Model deleted successfully, We will proceed with creating one
- ObjectApiResponse({'model_id': '.elser_model_2', 'model_type': 'pytorch', 'model_package': {'packaged_model_id': 'elser_model_2', 'model_repository': 'https://ml-models.elastic.co', 'minimum_version': '11.0.0', 'size': 438123914, 'sha256': '2e0450a1c598221a919917cbb05d8672aed6c613c028008fedcd696462c81af0', 'metadata': {}, 'tags': [], 'vocabulary_file': 'elser_model_2.vocab.json'}, 'created_by': 'api_user', 'version': '12.0.0', 'create_time': 1718580981790, 'model_size_bytes': 0, 'estimated_operations': 0, 'license_level': 'platinum', 'description': 'Elastic Learned Sparse EncodeR v2', 'tags': ['elastic'], 'metadata': {}, 'input': {'field_names': ['text_field']}, 'inference_config': {'text_expansion': {'vocabulary': {'index': '.ml-inference-native-000002'}, 'tokenization': {'bert': {'do_lower_case': True, 'with_special_tokens': True, 'max_sequence_length': 512, 'truncate': 'first', 'span': -1}}}}, 'location': {'index': {'name': '.ml-inference-native-000002'}}})
上述命令将下载 ELSER 模型。这将需要几分钟才能完成。使用以下命令检查模型下载的状态。
- while True:
- status = esclient.ml.get_trained_models(
- model_id=elser_model_id, include="definition_status"
- )
-
- if status["trained_model_configs"][0]["fully_defined"]:
- print("ELSER Model is downloaded and ready to be deployed.")
- break
- else:
- print("ELSER Model is downloaded but not ready to be deployed.")
- time.sleep(5)
- ELSER Model is downloaded but not ready to be deployed.
- ELSER Model is downloaded but not ready to be deployed.
- ELSER Model is downloaded but not ready to be deployed.
- ELSER Model is downloaded but not ready to be deployed.
- ELSER Model is downloaded and ready to be deployed.
下载模型后,我们可以在 ML 节点中部署该模型。使用以下命令部署模型。这也需要几分钟才能完成。
- # Start ELSER model deployment if not already deployed
- esclient.ml.start_trained_model_deployment(
- model_id=elser_model_id,
- number_of_allocations=elser_model_number_of_allocators,
- wait_for="starting",
- )
-
- while True:
- status = esclient.ml.get_trained_models_stats(
- model_id=elser_model_id,
- )
- if status["trained_model_stats"][0]["deployment_stats"]["state"] == "started":
- print("ELSER Model has been successfully deployed.")
- break
- else:
- print("ELSER Model is currently being deployed.")
- time.sleep(5)
- ELSER Model is currently being deployed.
- ELSER Model has been successfully deployed.
一旦部署完毕,我们可以在 Kibana 中进行查看:
- import codecs
- f = codecs.open("harry_potter.txt", "r", "utf-8")
- harry_potter_book_text = f.read()
-
- chapter_pattern = re.compile(r"CHAPTER [A-Z]+", re.IGNORECASE)
- chapters = chapter_pattern.split(harry_potter_book_text)[1:]
- chapter_titles = re.findall(chapter_pattern, harry_potter_book_text)
- chapters_with_titles = list(zip(chapter_titles, chapters))
-
- print("Total chapters found:", len(chapters))
- if chapters_with_titles:
- print("First chapter title:", chapters_with_titles[0][0])
- print("Text sample from first chapter:", chapters_with_titles[0][1][:500])
-
-
- # Structuring chapters into a DataFrame
- df = pd.DataFrame(chapters_with_titles, columns=["chapter_title", "chapter_full_text"])
- df["chapter"] = df.index + 1
- df["book_title"] = "Harry Potter and the Sorcerer’s Stone"
- df["passages"] = df["chapter_full_text"].apply(lambda text: chunk(text))
- Total chapters found: 17
- First chapter title: CHAPTER ONE
- Text sample from first chapter:
-
- THE BOY WHO LIVED
-
- Mr. and Mrs. Dursley, of number four, Privet Drive, were proud to say
- that they were perfectly normal, thank you very much. They were the last
- people you'd expect to be involved in anything strange or mysterious,
- because they just didn't hold with such nonsense.
-
- Mr. Dursley was the director of a firm called Grunnings, which made
- drills. He was a big, beefy man with hardly any neck, although he did
- have a very large mustache. Mrs. Dursley was thin and blonde and had
- nearly t
上面的代码把文章按照每个 chapter 来进行拆分:
然后,我们通过如下的代码,把每个 chapter 写入到 Elasticsearch 中:
ndex_dataframe(esclient, raw_source_index, df)
- Indexing documents to harry_potter_dataset-raw...
- Successfully indexed 17 documents.
- Failed to index 0 documents.
我们可以在 Kibana 中进行查看:
此部分启动异步重新索引操作,将数据从原始源索引传输到 Elasticsearch 中的丰富索引。此过程在后台运行,允许其他操作继续进行而无需等待完成。
关键步骤:
- # Start the reindex operation asynchronously
- response = esclient.reindex(
- body={"source": {"index": raw_source_index}, "dest": {"index": index_name}},
- wait_for_completion=False,
- )
- task_id = response["task"]
- print("Task ID:", task_id)
- check_task_status(esclient, task_id)
在上面 reindex 的过程中,它讲自动调用 index_name 所定义的 default_pipeline。这个在上面的代码中所定义:
- index_settings = {
- "settings": {
- "number_of_shards": 2,
- "number_of_replicas": 0,
- "default_pipeline": "books_dataset_chunker",
- },
- "mappings": {
- "dynamic": "false",
- "properties": {
- "book_title": {"type": "keyword"},
- "chapter": {"type": "keyword"},
- "chapter_full_text": {"type": "text", "index": False},
- "passages": {
- "type": "nested",
- "properties": {
- "content_embedding": {
- "properties": {
- "is_truncated": {"type": "boolean"},
- "model_id": {
- "type": "text",
- "fields": {
- "keyword": {"type": "keyword", "ignore_above": 256}
- },
- },
- "predicted_value": {"type": "sparse_vector"},
- }
- },
请注意上面的 default_pipeline。这个在 reindex 时,会自动调用。这个 pipeline 的定义是在 pipeline_body 中所定义的:
- # Define the ingest pipeline configuration
- pipeline_body = {
- "description": "Pipeline for processing book passages",
- "processors": [
- {
- "foreach": {
- "field": "passages",
- "processor": {
- "inference": {
- "field_map": {"_ingest._value.text": "text_field"},
- "model_id": dense_embedding_model_id,
- "target_field": "_ingest._value.vector",
- "on_failure": [
- {
- "append": {
- "field": "_source._ingest.inference_errors",
- "value": [
- {
- "message": "Processor 'inference' in pipeline 'ml-inference-title-vector' failed with message '{{ _ingest.on_failure_message }}'",
- "pipeline": "ml-inference-title-vector",
- "timestamp": "{{{ _ingest.timestamp }}}",
- }
- ],
- }
- }
- ],
- }
- },
- }
- },
- {
- "foreach": {
- "field": "passages",
- "processor": {
- "inference": {
- "field_map": {"_ingest._value.text": "text_field"},
- "model_id": elser_model_id,
- "target_field": "_ingest._value.content_embedding",
- "on_failure": [
- {
- "append": {
- "field": "_source._ingest.inference_errors",
- "value": [
- {
- "message": "Processor 'inference' in pipeline 'ml-inference-title-vector' failed with message '{{ _ingest.on_failure_message }}'",
- "pipeline": "ml-inference-title-vector",
- "timestamp": "{{{ _ingest.timestamp }}}",
- }
- ],
- }
- }
- ],
- }
- },
- }
- },
- ],
- }
它分别针对 passages 中的每个段落进行 dene vector 的向量化(使用 sentence-transformers__all-minilm-l6-v2 模型),同时也针对它进行 sparse vectore 的向量化(使用 ELSER 模型):
整个 reindex 需要一定的时间来完成:
- Task ID: PEyvsErNSXu8NbrlO_HPxA:122681
- Indexing...
- Indexing...
- Indexing...
- Indexing...
- Indexing...
- Indexing...
- Indexing...
- Indexing...
- Indexing...
- Reindexing complete.
等 reindex 完成后,我们可以在 Kibana 中进行查看:
也就是说,同样一个 passage,被同时密集向量化和稀疏向量化。我们可以看到每个 chunk 都是同样的。我们可以通过如下的方法来查看每个 chapter 最多的 chunk 数值:
- GET harry_potter_dataset_enriched/_search
- {
- "size": 0,
- "query": {
- "match": {
- "chapter": "1"
- }
- },
- "aggs": {
- "max_passage_number": {
- "nested": {
- "path": "passages"
- },
- "aggs": {
- "max_number": {
- "max": {
- "field": "passages.chunk_number"
- }
- }
- }
- }
- }
- }
或者通过如下的方法来得到各个 chapter 的 passages 数值:
- GET harry_potter_dataset_enriched/_search
- {
- "size": 0,
- "aggs": {
- "chapter_chunks": {
- "terms": {
- "field": "chapter"
- },
- "aggs": {
- "max_passage_number": {
- "nested": {
- "path": "passages"
- },
- "aggs": {
- "max_number": {
- "max": {
- "field": "passages.chunk_number"
- }
- }
- }
- }
- }
- }
- }
- }
本节在 Elasticsearch 中构建和执行自定义搜索查询,利用结合向量和基于文本的搜索方法的混合方法来提高搜索准确性和相关性。使用的具体示例是关于“Nimbus 2000” 的用户查询。
关键步骤:
- # Custom Search Query Construction
- user_query = "what is a nimbus 2000"
-
-
- knn_boost_factor = 20
- text_expansion_boost = 1
- query = build_custom_query(
- build_vector(user_query),
- user_query,
- knn_boost_factor,
- text_expansion_boost,
- debug=False,
- )
-
- # Searching and identifying relevant passages
- results = esclient.search(index=index_name, body=query, _source=False)
-
- hit_id = None
- chunk_number = None
- chapter_number = None
- max_chunk_number = None
- max_chapter_chunk_result = None
- max_chunk_query = None
-
-
- if results and results.get("hits") and results["hits"].get("hits"):
- highest_score = -1
- best_hit = None
- hit_id = results["hits"]["hits"][0]["_id"]
- chapter_number = results["hits"]["hits"][0]["fields"]["chapter"][0]
- if "inner_hits" in results["hits"]["hits"][0]:
- for hit_type in ["text_hits", "dense_hit", "sparse_hits"]:
- if hit_type in results["hits"]["hits"][0]["inner_hits"]:
- inner_hit = results["hits"]["hits"][0]["inner_hits"][hit_type]["hits"]
- if inner_hit["hits"]:
- max_score = inner_hit["max_score"]
- if max_score and max_score > highest_score:
- highest_score = max_score
- best_hit = inner_hit["hits"][0]
-
- if best_hit:
- first_passage_text = best_hit["_source"]["text"]
- chunk_number = best_hit["_source"]["chunk_number"]
- # print(f"Matched Chunk ID: {hit_id}, Chunk Number: {chunk_number}, Text: {first_passage_text}")
- print(
- f"Matched Chunk ID: {hit_id}, Chunk Number: {chunk_number}, Text:\n{textwrap.fill(first_passage_text, width=200)}"
- )
- print(f"\n")
- else:
- print(f"ID: {hit_id}, No relevant passages found.")
- else:
- print("No results found.")
-
- # Fetch Surrounding Chunks if chapter_number is not None
- if chapter_number is not None:
- print(f"Fetch Surrounding Chunks")
- print(f"------------------------")
-
- # max_chunk_query = get_max_chunk_number_query(chapter_number, debug=False)
- # max_chapter_chunk_result = esclient.search(index=index_name, body=max_chunk_query, _source=False)
- max_chapter_chunk_result = esclient.search(
- index=index_name,
- body=get_max_chunk_number_query(chapter_number, debug=False),
- _source=False,
- )
- max_chunk_number = max_chapter_chunk_result["aggregations"]["max_chunk_number"][
- "max_chunk"
- ]["value"]
-
- adjacent_chunks_query = get_adjacent_chunks_query(
- hit_id, chunk_number, max_chunk_number, debug=False
- )
- results = esclient.search(
- index=index_name, body=adjacent_chunks_query, _source=False
- )
- print_text_from_results(results)
- else:
- print("Skipping fetch of surrounding chunks due to no initial results.")
完整的代码可以在地址 elasticsearch-labs/supporting-blog-content/fetch-surrounding-chunks at main · liu-xiao-guo/elasticsearch-labs · GitHub 进行下载。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。