赞
踩
在之前的文章 “Elasticsearch:RBAC 和 RAG - 最好的朋友(一)”,我们详细描述了如何使用 RBAC 来控制 RAG 的访问。在今天的文章中,我们来通过一个 jupyter notebook 来描述如何实现这个。
如果你还没有安装好自己的 Elasticsearch 及 Kibana,请参考如下的链接来进行安装:
在安装的时候,我们选择 Elastic Stack 8.x 来进行安装。特别值得指出的是:ES|QL 只在 Elastic Stack 8.11 及以后得版本中才有。你需要下载 Elastic Stack 8.11 及以后得版本来进行安装。
在首次启动 Elasticsearch 的时候,我们可以看到如下的输出:
我们需要记下 Elasticsearch 超级用户 elastic 的密码。
我们还可以在安装 Elasticsearch 目录中找到 Elasticsearch 的访问证书:
- $ pwd
- /Users/liuxg/elastic/elasticsearch-8.13.2/config/certs
- $ ls
- http.p12 http_ca.crt transport.p12
在上面,http_ca.crt 是我们需要用来访问 Elasticsearch 的证书。
我们首先克隆已经写好的代码:
git clone https://github.com/liu-xiao-guo/elasticsearch-labs
我们然后进入到该项目的根目录下:
- $ pwd
- /Users/liuxg/python/elasticsearch-labs/supporting-blog-content/rbac-and-rag-best-friends
- $ cp ~/elastic/elasticsearch-8.13.2/config/certs/http_ca.crt .
- $ ls
- http_ca.crt rbac-and-rag-best-friends.ipynb
在上面,我们把 Elasticsearch 的证书拷贝到当前的目录下。上面的 rbac-and-rag-best-friends.ipynb 就是我们下面要展示的 notebook。
在运行 jupyter notebook 之前,我们先在命令行中打入如下的命令来设置变量:
- export ES_USER="elastic"
- export ES_PASSWORD="VDMlz5QnM_0g-349fFq7"
- export ES_ENDPOINT="localhost"
我们需要根据自己的配置做相应的改动。然后,我们在当前的 terminal 中打入如下的命令:
jupyter notebook
!pip install elasticsearch python-dotenv
- from elasticsearch import Elasticsearch
- from IPython.display import HTML, display
- from pprint import pprint
- from dotenv import load_dotenv
- import os, json
在运行完上面的命令后,我们可以查看安装好的 elasticsearch 包的版本:
- $ pip list | grep elasticsearch
- elasticsearch 8.13.0
- load_dotenv()
-
- ES_USER = os.getenv("ES_USER")
- ES_PASSWORD = os.getenv("ES_PASSWORD")
- ES_ENDPOINT = os.getenv("ES_ENDPOINT")
-
- url = f"https://{ES_USER}:{ES_PASSWORD}@{ES_ENDPOINT}:9200"
- print(url)
-
- es = Elasticsearch(url, ca_certs = "./http_ca.crt", verify_certs = True)
- print(es.info())
更多有关如何使用 Python 连接到 Elasticsearch 的知识,请参阅文章 “Elasticsearch:关于在 Python 中使用 Elasticsearch 你需要知道的一切 - 8.x”。
- # Delete indices
- def delete_indices():
- try:
- es.indices.delete(index="rbac_rag_demo-data_public")
- print("Deleted index: rbac_rag_demo-data_public")
- except Exception as e:
- print(f"Error deleting index rbac_rag_demo-data_public: {str(e)}")
-
- try:
- es.indices.delete(index="rbac_rag_demo-data_sensitive")
- print("Deleted index: rbac_rag_demo-data_sensitive")
- except Exception as e:
- print(f"Error deleting index rbac_rag_demo-data_sensitive: {str(e)}")
-
-
- delete_indices()
- # Create indices
- def create_indices():
- # Create data_public index
- es.indices.create(
- index="rbac_rag_demo-data_public",
- ignore=400,
- body={
- "settings": {"number_of_shards": 1},
- "mappings": {"properties": {"info": {"type": "text"}}},
- },
- )
-
- # Create data_sensitive index
- es.indices.create(
- index="rbac_rag_demo-data_sensitive",
- ignore=400,
- body={
- "settings": {"number_of_shards": 1},
- "mappings": {
- "properties": {
- "document": {"type": "text"},
- "confidentiality_level": {"type": "keyword"},
- }
- },
- },
- )
-
-
- # Populate sample data
- def populate_data():
- # Public HR information
- public_docs = [
- {"title": "Annual leave policies updated.", "confidentiality_level": "low"},
- {"title": "Remote work guidelines available.", "confidentiality_level": "low"},
- {
- "title": "Health benefits registration period starts next month.",
- "confidentiality_level": "low",
- },
- ]
- for doc in public_docs:
- es.index(index="rbac_rag_demo-data_public", document=doc)
-
- # Sensitive HR information
- sensitive_docs = [
- {
- "title": "Executive compensation details Q2 2024.",
- "confidentiality_level": "high",
- },
- {
- "title": "Bonus payout structure for all levels.",
- "confidentiality_level": "high",
- },
- {
- "title": "Employee stock options plan details.",
- "confidentiality_level": "high",
- },
- ]
- for doc in sensitive_docs:
- es.index(index="rbac_rag_demo-data_sensitive", document=doc)
-
-
- create_indices()
- populate_data()
我们可以在 Kibana 中使用如下的命令来查看索引:
- # Create roles
- def create_roles():
- # Role for the engineer
- es.security.put_role(
- name="engineer_role",
- body={
- "indices": [
- {"names": ["rbac_rag_demo-data_public"], "privileges": ["read"]}
- ]
- },
- )
-
- # Role for the manager
- es.security.put_role(
- name="manager_role",
- body={
- "indices": [
- {
- "names": [
- "rbac_rag_demo-data_public",
- "rbac_rag_demo-data_sensitive",
- ],
- "privileges": ["read"],
- }
- ]
- },
- )
-
-
- # Create users with respective roles
- def create_users():
- # User 'engineer'
- es.security.put_user(
- username="engineer",
- body={
- "password": "password123",
- "roles": ["engineer_role"],
- "full_name": "Engineer User",
- },
- )
-
- # User 'manager'
- es.security.put_user(
- username="manager",
- body={
- "password": "password123",
- "roles": ["manager_role"],
- "full_name": "Manager User",
- },
- )
-
-
- create_roles()
- create_users()
运行完上面的代码后,我们可以在 Kibana 中进行查看:
我们其实也可以使用 Kibana 的 UI 来创建这些用户及 role。你可以想象阅读文章 “Elasticsearch:用户安全设置”。
用于查询每个用户的辅助函数和一些输出格式
- """
- def get_es_connection(cid, username, password):
- return Elasticsearch(cloud_id=cid, basic_auth=(username, password))
- """
-
- def get_es_connection(username, password):
- url = f"https://{username}:{password}@{ES_ENDPOINT}:9200"
- print(url)
- return Elasticsearch(url, ca_certs = "./http_ca.crt", verify_certs = True)
-
-
- def query_index(es, index_name, username):
- try:
- response = es.search(index=index_name, body={"query": {"match_all": {}}})
-
- # Prepare the message
- results_message = f'Results from querying as <span style="color: orange;">{username}:</span><br>'
- for hit in response["hits"]["hits"]:
- confidentiality_level = hit["_source"].get("confidentiality_level", "N/A")
- index_name = hit.get("_index", "N/A")
- title = hit["_source"].get("title", "No title")
-
- # Set color based on confidentiality level
- if confidentiality_level == "low":
- conf_color = "lightgreen"
- elif confidentiality_level == "high":
- conf_color = "red"
- else:
- conf_color = "black"
-
- # Set color based on index name
- if index_name == "rbac_rag_demo-data_public":
- index_color = "lightgreen"
- elif index_name == "rbac_rag_demo-data_sensitive":
- index_color = "red"
- else:
- index_color = "black" # Default color
-
- results_message += (
- f'Index: <span style="color: {index_color};">{index_name}</span>\t '
- f'confidentiality level: <span style="color: {conf_color};">{confidentiality_level}</span> '
- f'title: <span style="color: lightblue;">{title}</span><br>'
- )
-
- display(HTML(results_message))
-
- except Exception as e:
- print(f"Error accessing {index_name}: {str(e)}")
- index_pattern = "rbac_rag_demo-data*"
- print(
- f"Each user will log in with their credentials and query the same index pattern: {index_pattern}\n\n"
- )
-
- for user in ["engineer", "manager"]:
- print(f"Logged in as {user}:")
-
- es_conn = get_es_connection(user, "password123")
- results = query_index(es_conn, index_pattern, user)
- print("\n\n")
- index_pattern = "rbac_rag_demo-data*"
- print(
- f"Each user will log in with their credentials and query the same index pattern: {index_pattern}\n\n"
- )
-
- for user in ["engineer", "manager"]:
- print(f"Logged in as {user}:")
-
- es_conn = get_es_connection(user, "password123")
- results = query_index(es_conn, index_pattern, user)
- print("\n\n")
从上面的输出中,我们可以看出来经理可以同时访问两个索引的数据,但是工程师只能访问属于工程师的数据。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。