赞
踩
- import boto3
- import uuid
- import pandas as pd
- import json
- import requests
-
- '''
- 根据dataset_id -> 查询数据集信息
- 根据connect_id -> 查询连接器信息
- 根据数据集的信息前缀+s3基本信息 -> s3的数据集地址及连接
- s3_clint 可以进行临时数据集的读写
- 临时结果与最终入库结果需要区分开---有新的入库结果的dataset_id
- '''
-
-
- class ConnectData:
- def __init__(self, models_url, input_dataset, tempdata_connector, project, flow, output_dataset =None):
- self.models_url = models_url
- self.input_dataset = input_dataset
- self.output_dataset = output_dataset
- self.tempdata_connector = tempdata_connector
- self.project = project
- self.flow = flow
-
- # 提交数据集
- def post_data(self, data_type,prefix,category):
- HEADERS = {
- 'Content-Type': 'application/vnd.api+json',
- 'Authorization': 'hahfifpia...'
- }
- body = {
- "data":{
- "type":data_type,
- "attributes":{
- "name":"test_" + str(uuid.uuid1()),
- "project":self.project,
- "connector":self.tempdata_connector,
- "tags":[],
- "prefix":prefix,
- "category":category,
- "created": "1970-01-01T00:00:00.000Z",
- "origindataset": self.input_dataset
- }
- }
- }
- response = requests.post(f'{self.models_url}/{data_type}', headers=HEADERS, data=json.dumps(body))
- res = json.loads(response.content)
- data_id = res['data']['id']
- return data_id
-
-
- def s3_client(self):
- res = self.get_connector()
- s3_url = res["host"]
- s3_access_secret = res["secretkey"]
- s3_access_key = res["accesskey"]
- bucketname = res["bucketname"]
- s3_client = boto3.client(service_name='s3', endpoint_url=s3_url, aws_access_key_id=s3_access_key, aws_secret_access_key=s3_access_secret)
- return s3_client, bucketname
-
-
- # 根据连接器id获取对应的连接器基本信息,主要是s3的参数为主
- def get_connector(self):
- response = requests.get(url=f"{self.models_url}/connectors/{self.tempdata_connector}")
- if response.status_code == 200:
- res = json.loads(response.content)
- tempdata_connector_param = res["data"]["attributes"]["params"]
- return tempdata_connector_param
- else:
- raise Exception('status_code:' + str(response.status_code) + " connect error")
-
-
- # 根据数据集id获取对应的基本信息
- def get_dataset(self, dataset=None):
- if not dataset:
- dataset = self.input_dataset
- response = requests.get(url=f"{self.models_url}/datasets/{dataset}")
- if response.status_code == 200:
- res = json.loads(response.content)
- prefix = res["data"]["attributes"]["prefix"]
- return prefix
- else:
- raise Exception('status_code:' + str(response.status_code) + " dataset error")
- connect = ConnectData(
- models_url, input_dataset, tempdata_connector, project, flow, output_dataset)
- s3_client, bucketname = connect.s3_client()
res = connect.get_dataset()
- 1、读取csv
- ego_path = path
- with smart_open.open(ego_path, 'rb', transport_params={'client': s3_client}) as reader:
- df= pd.read_csv(reader)
-
- 2、上传csv
- with smart_open.open(output_path, 'wb', transport_params={'client': s3_client}) as writer:
- df.to_csv(writer, index=False)
mp4上传方式稍有不同,需要加最后一个参数,图片不用加最后的参数,output_file_path是指你本地视频的位置
- if os.path.exists(output_file_path):
- s3_client.upload_file(
- output_file_path, bucketname, 上传位置+ '/'+'od.mp4',ExtraArgs={'ContentType': "video/mp4"})
调用上面的类方法
connect.post_data("datasets", prefix, '类型')
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。