赞
踩
本文尝试基于文心一言AI大模型,编写一段python3程序以获取华为分布式块存储REST接口的实时数据。
1、第一次对话:“python3写一段从rest服务器获取数据的样例代码”
同时生成了以下注解
这段代码首先定义了一个函数
get_data_from_rest_server
,该函数使用requests.get
方法向指定的 URL 发送 GET 请求。如果请求成功(即响应的状态码为 200),则将响应的 JSON 数据返回。否则,它将打印出错误消息并返回 None。然后,我们调用这个函数来获取数据,并打印出获取到的数据。注意你需要将url
替换为你的 REST 服务器的实际 URL。
2、第二次对话:“以上请求需带入服务器的认证参数”
同时生成了以下注解
在上述代码中,我们通过在
requests.get
方法中传递auth=(username, password)
来提供服务器的认证参数。这样,当你发送请求时,requests
会自动将认证参数添加到请求中,从而完成身份验证过程。
3、第三次对话:“以上代码需代入请求消息头”
- import requests
-
- # 定义服务器认证参数的用户名和密码
- username = 'your_username'
- password = 'your_password'
-
- def get_data_from_rest_server():
- # 将此 URL 替换为你的 REST 服务器 URL
- url = 'http://example.com/api/data'
-
- # 定义请求头,可以包含自定义的头部字段和值
- headers = {
- 'Content-Type': 'application/json', # 声明请求的内容类型为 JSON
- 'Authorization': f'Basic {username}:{password}', # 添加基本的身份验证头部字段
- }
-
- # 使用 GET 方法请求数据,并携带服务器认证参数和请求头
- response = requests.get(url, headers=headers)
-
- # 检查响应的状态码,如果请求成功(状态码为 200),则处理响应的数据
- if response.status_code == 200:
- data = response.json()
- return data
- else:
- print(f"请求失败,状态码: {response.status_code}")
- return None
-
- # 调用函数获取数据
- data = get_data_from_rest_server()
- if data is not None:
- print(data)
注解如下 :
在上述代码中,我们通过创建一个字典对象来定义请求头,其中包含了一个
Content-Type
字段和一个Authorization
字段。Content-Type
字段声明了请求的内容类型为 JSON,而Authorization
字段包含了基本的身份验证值。然后,我们将这个字典对象传递给requests.get
方法的headers
参数,从而将请求头添加到请求中。
通过和文心一言AI大模型的三次对话,我们大致了解了用python 的 requests
库从 REST 服务器获取数据的基本方法。
1、RESTful API的请求必须包括如下消息头
2、拟测试请求接口官方样例
- #!/usr/bin/python3
- #coding=utf-8
-
- ##__author__='daijianbing'
-
- import requests
- import json
-
- def authenticate(username, password,url_auth):
- # 构建认证数据
- auth_data = {
- 'user_name': username,
- 'password': password
- }
-
- # 将认证数据转换为JSON格式
- auth_json = json.dumps(auth_data)
-
- #禁止未经验证的HTTPS
- requests.packages.urllib3.disable_warnings()
- # 发送POST请求到REST API进行身份验证
- response = requests.post(url_auth,verify = False, data=auth_json, headers={'Content-Type': 'application/json'})
-
- # 检查响应状态码,如果成功则返回认证令牌,否则抛出异常
- if response.status_code == 200:
- #print(response.json())
- return response.json()['data']['x_auth_token']
- else:
- connect = 0
- raise Exception('Authentication failed')
-
- def queryStoragePool(token,url):
- # 构建认证数据
- headers = {
- 'Connection': 'keep-alive',
- 'X-Auth-Token': token
- }
-
- #print(headers)
-
- # 发送请求到REST API
- response = requests.get(url, verify = False, headers=headers)
-
- # 检查响应状态码,如果成功则返回认证令牌,否则抛出异常
- if response.status_code == 200:
- #print(response.json())
- return response.json()['storagePools']
- else:
- raise Exception('Authentication failed')
-
- def queryPoolstatus(token,url,poolId):
- # 构建认证数据
- headers = {
- 'Connection': 'keep-alive',
- 'X-Auth-Token': token
- }
- params = {
- 'poolId':str(poolId),
- 'vbs': 'true'
- }
-
- params_json = json.dumps(params)
- #print("url:",url,"headers:",headers,"params_json:",params_json)
-
- # 发送T请求到REST API
- response = requests.get(url, verify = False, params=params, headers=headers)
-
- # 检查响应状态码,如果成功则返回认证令牌,否则抛出异常
- if response.status_code == 200:
- #print(response.json())
- return response.json()['status']
- else:
- raise Exception('Authentication failed')
-
- def queryAlarms(token,url):
- # 构建认证数据
- headers = {
- 'Connection': 'keep-alive',
- 'X-Auth-Token': token
- }
- params = {
- 'filter':"alarmStatus::1",
- }
- params_json = json.dumps(params)
- # 发送请求到REST API
- response = requests.get(url, verify = False, params=params, headers=headers)
-
- # 检查响应状态码,如果成功则返回认证令牌,否则抛出异常
- if response.status_code == 200:
- #print(response.json())
- return response.json()['data']
- else:
- raise Exception('Authentication failed')
-
- # 使用示例
- username = 'zabbix'
- password = 'passwd'
- hostip = '192.168.0.1'
- port = '8088'
- hostport = "https://"+hostip+":"+port
- url_auth = hostport+'/api/v2/aa/sessions'
- checkstatus = 1
- requests.packages.urllib3.disable_warnings()
- try:
- token = authenticate(username, password, url_auth)
- print('x_auth_token:', token)
-
- url_pool = hostport+'/dsware/service/resource/queryStoragePool'
- pools = queryStoragePool(token,url_pool)
-
- url_poolstatus=hostport+'/dsware/service/cluster/storagepool/queryStatus'
- for pool in pools:
- poolstatus=queryPoolstatus(token,url_poolstatus,pool['poolId'])
-
- url_alarms = hostport+'/api/v2/common/alarm_count'
- Alarms=queryAlarms(token,url_alarms)
- print('poolid:',pool['poolId'],'poolname:',pool['poolName'],'poolstatus:',poolstatus,'alarmcount:',Alarms['count'])
- except:
- checkstatus = 0
- finally:
- print("checkstatus:",checkstatus)
- print("chech over")
'运行
实际运行:
- # python3 --version
- Python 3.9.9
- # python3 healthck.py
- x_auth_token: MzMwMDE0OTkyNkEzMDAxNGUwOTcwMzE2
- poolid: 0 poolname: Pool01 poolstatus: 0 alarmcount: 0
- checkstatus: 1
- chech over
本段代码实现了存储REST接口认证登录,查询全部的pool并列出poolid,poolname,pool状态,输出当前未恢复告警数并返回本次的检查状态(1为检查成功)。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。