当前位置:   article > 正文

python csv、jpg、mp4等文件上传至s3,提交数据集_python aws s3 文件上传

python aws s3 文件上传

1、创建类

  1. import boto3
  2. import uuid
  3. import pandas as pd
  4. import json
  5. import requests
  6. '''
  7. 根据dataset_id -> 查询数据集信息
  8. 根据connect_id -> 查询连接器信息
  9. 根据数据集的信息前缀+s3基本信息 -> s3的数据集地址及连接
  10. s3_clint 可以进行临时数据集的读写
  11. 临时结果与最终入库结果需要区分开---有新的入库结果的dataset_id
  12. '''
  13. class ConnectData:
  14. def __init__(self, models_url, input_dataset, tempdata_connector, project, flow, output_dataset =None):
  15. self.models_url = models_url
  16. self.input_dataset = input_dataset
  17. self.output_dataset = output_dataset
  18. self.tempdata_connector = tempdata_connector
  19. self.project = project
  20. self.flow = flow
  21. # 提交数据集
  22. def post_data(self, data_type,prefix,category):
  23. HEADERS = {
  24. 'Content-Type': 'application/vnd.api+json',
  25. 'Authorization': 'hahfifpia...'
  26. }
  27. body = {
  28. "data":{
  29. "type":data_type,
  30. "attributes":{
  31. "name":"test_" + str(uuid.uuid1()),
  32. "project":self.project,
  33. "connector":self.tempdata_connector,
  34. "tags":[],
  35. "prefix":prefix,
  36. "category":category,
  37. "created": "1970-01-01T00:00:00.000Z",
  38. "origindataset": self.input_dataset
  39. }
  40. }
  41. }
  42. response = requests.post(f'{self.models_url}/{data_type}', headers=HEADERS, data=json.dumps(body))
  43. res = json.loads(response.content)
  44. data_id = res['data']['id']
  45. return data_id
  46. def s3_client(self):
  47. res = self.get_connector()
  48. s3_url = res["host"]
  49. s3_access_secret = res["secretkey"]
  50. s3_access_key = res["accesskey"]
  51. bucketname = res["bucketname"]
  52. s3_client = boto3.client(service_name='s3', endpoint_url=s3_url, aws_access_key_id=s3_access_key, aws_secret_access_key=s3_access_secret)
  53. return s3_client, bucketname
  54. # 根据连接器id获取对应的连接器基本信息,主要是s3的参数为主
  55. def get_connector(self):
  56. response = requests.get(url=f"{self.models_url}/connectors/{self.tempdata_connector}")
  57. if response.status_code == 200:
  58. res = json.loads(response.content)
  59. tempdata_connector_param = res["data"]["attributes"]["params"]
  60. return tempdata_connector_param
  61. else:
  62. raise Exception('status_code:' + str(response.status_code) + " connect error")
  63. # 根据数据集id获取对应的基本信息
  64. def get_dataset(self, dataset=None):
  65. if not dataset:
  66. dataset = self.input_dataset
  67. response = requests.get(url=f"{self.models_url}/datasets/{dataset}")
  68. if response.status_code == 200:
  69. res = json.loads(response.content)
  70. prefix = res["data"]["attributes"]["prefix"]
  71. return prefix
  72. else:
  73. raise Exception('status_code:' + str(response.status_code) + " dataset error")

2、实例化并调用

  1. connect = ConnectData(
  2. models_url, input_dataset, tempdata_connector, project, flow, output_dataset)
  3. s3_client, bucketname = connect.s3_client()

3、根据数据集id获取信息

res = connect.get_dataset()

4、csv上传、读取

  1. 1、读取csv
  2. ego_path = path
  3. with smart_open.open(ego_path, 'rb', transport_params={'client': s3_client}) as reader:
  4. df= pd.read_csv(reader)
  5. 2、上传csv
  6. with smart_open.open(output_path, 'wb', transport_params={'client': s3_client}) as writer:
  7. df.to_csv(writer, index=False)

5、mp4、jpg上传

mp4上传方式稍有不同,需要加最后一个参数,图片不用加最后的参数,output_file_path是指你本地视频的位置

  1. if os.path.exists(output_file_path):
  2. s3_client.upload_file(
  3. output_file_path, bucketname, 上传位置+ '/'+'od.mp4',ExtraArgs={'ContentType': "video/mp4"})

6、提交数据集

调用上面的类方法

connect.post_data("datasets", prefix, '类型')
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/707212
推荐阅读
相关标签
  

闽ICP备14008679号