AWS是亚马逊的云服务,其提供了非常丰富的套件,以及支持多种语言的SDK/API。本文针对其S3云储存服务的Python SDK(boto3)的使用进行介绍。
本文将针对如何调用boto3和endpoint来实现aws S3服务的功能进行介绍。
从AWS到其中的S3服务的关系链可以简单地描述为:AWS -> VPC -> S3 -> Endppoint -> EC2
“Amazon Virtual Private Cloud (Amazon VPC),简单理解,就是在云上建个大楼,大楼里面的网络、门禁,安检等都一应俱全,我们根据需要在大楼里选择房间(创建ec2)办公,这个房间自己也有相应的门禁系统”
安装aws cli:How to Install AWS CLI on Ubuntu 20.04
- curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip"
- unzip awscliv2.zip
- sudo ./aws/install
- aws configure
- # 输入access key和security key:后两项可以忽略(假如只需要使用S3的话)
- # view folder
- aws [option] --endpoint-url [endpoint_url] s3 [action] s3://[bucket]
- # download single file
- aws [option] --endpoint-url [endpoint_url] s3 cp s3://[bucket]/[file_path] [local_path]
- # download folder
- aws [option] --endpoint-url [endpoint_url] s3 sync s3://[bucket]/[folder_path] [local_path]
Amazon Simple Queue Service (Amazon SQS) 是一种完全托管的消息队列服务,可以轻松解耦和扩展微服务、分布式系统和无服务器应用程序。 Amazon SQS 在分布式应用程序组件之间移动数据并帮助您解耦这些组件。
- import boto3
- '''send messages'''
- # Get the service resource
- sqs = boto3.resource('sqs')
- # Get the queue
- queue = sqs.get_queue_by_name(QueueName='test')
- # Create a new message
- response = queue.send_message(MessageBody='world')
- # The response is NOT a resource, but gives you a message ID and MD5
- print(response.get('MessageId'))
- print(response.get('MD5OfMessageBody'))
- '''process messages'''
- # Process messages by printing out body and optional author name
- for message in queue.receive_messages(MessageAttributeNames=['Author']):
- # Get the custom author message attribute if it was set
- author_text = ''
- if message.message_attributes is not None:
- author_name = message.message_attributes.get('Author').get('StringValue')
- if author_name:
- author_text = ' ({0})'.format(author_name)
- # Print out the body and author (if set)
- print('Hello, {0}!{1}'.format(message.body, author_text))
- # Let the queue know that the message is processed
- message.delete()

资源表示 Amazon Web Services (AWS) 的面向对象的接口。 它们提供了比服务客户端进行的原始低级调用更高级别的抽象。
每个资源实例都有许多属性和方法。 这些在概念上可以分为标识符、属性、操作、引用、子资源和集合。资源本身也可以在概念上分为服务资源(如 sqs、s3、ec2 等)和单个资源(如 sqs.Queue 或 s3.Bucket)。 服务资源没有标识符或属性。 否则,两者共享相同的组件。
- # Get resources from the default session
- sqs = boto3.resource('sqs')
- s3 = boto3.resource('s3')
- '''example'''
- # S3 Object (bucket_name and key are identifiers)
- obj = s3.Object(bucket_name='boto3', key='test.py')
- print(obj.bucket_name)print(obj.key)
- # S3 Object attributes
- obj.last_modifie
- dobj.e_tag
- # S3 Object actions
- obj = s3.Object(bucket_name='boto3', key='test.py')
- response = obj.get()
- data = response['Body'].read()
- # S3 sub-resources
- obj = bucket.Object(key='new_file.txt')
- print(obj.bucket_name)
- print(obj.key)
- # S3: Wait for a bucket to exist.
- bucket.wait_until_exists()

资源实例不是线程安全的,不应跨线程或进程共享。 这些特殊类包含无法共享的附加元数据。 建议在多线程或多处理中为每个线程或进程创建一个新资源。
会话管理有关特定配置的状态。 会话通常存储以下内容:
- '''Using the default session'''
- sqs = boto3.client('sqs')
- s3 = boto3.resource('s3')
- '''Create your own session'''
- my_session = boto3.session.Session()
- # Now we can create low-level clients or resource clients from our custom session
- sqs = my_session.client('sqs')
- s3 = my_session.resource('s3')
与 Resource 对象类似,Session 对象不是线程安全的,不应在线程和进程之间共享。 建议在多线程或多处理中为每个线程或进程创建一个新的 Session 对象。
客户端为 AWS 提供了一个低级接口,其方法与服务 API 的映射接近 1:1。 所有服务操作均由客户端支持。
- import boto3
- sqs = boto3.client('sqs')
- # It is also possible to access the low-level client from an existing resource:
- # Create the resource
- sqs_resource = boto3.resource('sqs')
- # Get the client from the resource
- sqs = sqs_resource.meta.client
- # send messages
- response = sqs.send_message(QueueUrl='...', MessageBody='...')
- # handling messages
- response = sqs.list_queues()
- for url in response.get('QueueUrls', []):
- print(url)
- # waiters
- sqs.waiter_names

多处理:虽然客户端是线程安全的,但由于它们的网络实现,它们不能跨进程共享。 这样做可能会导致调用服务时响应顺序不正确。
共享元数据:客户端通过一些属性(即元、异常和服务员名称)向最终用户公开元数据。 这些是可以安全阅读的,但任何突变都不应该被认为是线程安全的。
自定义 Botocore 事件:Botocore(构建 Boto3 库)允许高级用户提供他们自己的自定义事件挂钩,这些挂钩可以与 boto3 的客户端交互。 大多数用户不需要使用这些接口,但是那些不需要仔细审查的用户不应再考虑他们的客户端线程安全。
参考:Botocore Events - botocore 1.27.25 documentation
- import boto3.session
- from concurrent.futures import ThreadPoolExecutor
- def do_s3_task(client, task_definition):
- # Put your thread-safe code here
- def my_workflow():
- # Create a session and use it to make our client
- session = boto3.session.Session()
- s3_client = session.client('s3')
- # Define some work to be done, this can be anything
- my_tasks = [ ... ]
- # Dispatch work tasks with our s3_client
- with ThreadPoolExecutor(max_workers=8) as executor:
- futures = [executor.submit(do_s3_task, s3_client, task) for task in my_tasks]

Endpoint (AWS PrivateLink for S3)
在将 S3 客户端配置为使用接口 VPC 终端节点时,请务必注意,只有终端节点中指定的资源类型才能使用该客户端进行寻址( only the resource type specified in the endpoint can be addressed)。 访问存储桶和访问点需要实例化两个客户端,每个资源类型一个。
- import boto3
- s3_client = boto3.client(
- service_name='s3',
- endpoint_url='https://bucket.vpce-abc123-abcdefgh.s3.us-east-1.vpce.amazonaws.com')
一些 AWS 操作返回的结果不完整,需要后续请求才能获得整个结果集。 发送后续请求以在前一个请求中断的地方继续的过程称为分页。
Boto3 提供了许多功能来帮助导航您在与 AWS 服务交互时可能遇到的错误和异常。
1. 确定要捕获的异常
2. 使用低级客户端时捕获异常
3. 解析错误响应并从 AWS 服务中捕获异常
4. 从错误响应中辨别有用信息
- try:
- client.some_api_call(SomeParam='some_param')
- except botocore.exceptions.ClientError as error:
- # Put your error handling logic here
- raise error
- except botocore.exceptions.ParamValidationError as error:
- raise ValueError('The parameters you provided are incorrect: {}'.format(error))
- '''Error message structure
- {
- 'Error': {
- 'Code': 'SomeServiceException',
- 'Message': 'Details/context around the exception or error'
- },
- 'ResponseMetadata': {
- 'RequestId': '1234567890ABCDEF',
- 'HostId': 'host ID data will appear here as a hash',
- 'HTTPStatusCode': 400,
- 'HTTPHeaders': {'header metadata key/values will appear here'},
- 'RetryAttempts': 0
- }
- }
- except botocore.exceptions.ClientError as err:
- if err.response['Error']['Code'] == 'InternalError': # Generic error
- print('Error Message: {}'.format(err.response['Error']['Message']))
- '''

1. 你有什么访问权限? 在使用low-level client处理存储桶时,必须使用具有访问 ID/密钥的端点(Endpoint with access id / key)。 如果您想在 AWS 中尝试其他高级服务,请检查您的授权。 所拥有的 AWS 权限决定使用什么链接方式
2. 您处理的文件的大小。 请参考file-size limitation和File transfer configuration。 操作文件的大小
3.错误处理策略。 异常处理
4. 快速api参考。 API查询
- aws configure
- # -> input aws_access_key_id & aws_secret_access_key
然后你会在 ~/.aws/credentials 找到你的配置文件。也可以忽略这一步,在python程序中设置。
2. Create an s3 client & explore buckects(查询)
- s3_client = boto3.client(service_name='s3', endpoint_url=aws_s3_endpoint_url)
- response = s3_client.list_buckets()
- buckets = [bucket['Name'] for bucket in response['Buckets']]
- print(buckets)
3. Upload files(上传,覆盖写)
response = s3_client.upload_file(local_file_path, bucket_name, target_path_in_bucket)
4. Upload large files(上传大文件)
在上传、下载或复制文件或 S3 对象时,适用于 Python 的 AWS 开发工具包会自动管理重试以及multipart 和非multipart 传输。 通过使用非常适合大多数场景的合理默认设置来执行管理操作。 为了处理特殊情况,可以配置默认设置以满足要求。
- # using simple upload
- self.client.upload_file(local_file_path, bucket_name, target_path_in_bucket)
- # using multi-part upload to extend size limitation
- GB = 1024 ** 3
- config = TransferConfig(multipart_threshold=5*GB)
- self.client.upload_file(local_file_path, bucket_name, target_path_in_bucket, Config=config)
Test file | Default upload | Specified multi-part upload |
2GB | 221MB/s | 150~212MB/s |
13GB | 232MB/s | 224~240MB/s |
5. Download files(下载)
- # download single file
- self.client.download_file(bucket_name, target_path_in_bucket, local_file_path)
- # download single file as object
- with open('FILE_NAME', 'wb') as f: # binary mode only
- s3.download_fileobj('BUCKET_NAME', 'OBJECT_NAME', f)
- # download folder
- list_objects_v2() -> download_file()
6. Delete files in buckets(删除)
self.client.delete_object(Bucket=bucket_name, Key=target_path_in_bucket)
7. Using Calback as ProgressBar(监控进度条)
示例来自 Uploading files ‒ Boto3 Docs 1.24.25 documentation 和 How can I increase my AWS s3 upload speed when using boto3?
- import threading
- class ProgressPercentage(object):
- def __init__(self, filename):
- self._filename = filename
- self._size = float(os.path.getsize(filename))
- self._seen_so_far = 0
- self._lock = threading.Lock()
- def __call__(self, bytes_amount):
- # To simplify, assume this is hooked up to a single filename
- with self._lock:
- self._seen_so_far += bytes_amount
- percentage = (self._seen_so_far / self._size) * 100
- sys.stdout.write(
- "\r%s %s / %s (%.2f%%)" % (
- self._filename, self._seen_so_far, self._size,
- percentage))
- sys.stdout.flush()
- s3.upload_file(
- Callback=ProgressPercentage('FILE_NAME'))

- from tqdm import tqdm
- import boto3.s3.transfer as s3transfer
- class Tool():
- def __init__():
- pass
- def client_upload_files(self, bucket, local_path, aws_path, progress_func):
- transfer_config = s3transfer.TransferConfig(
- use_threads=True,
- max_concurrency=10,
- )
- s3t = s3transfer.create_transfer_manager(
- self.client, transfer_config)
- s3t.upload(
- local_path, bucket, aws_path,
- subscribers=[
- s3transfer.ProgressCallbackInvoker(progress_func),
- ]
- )
- s3t.shutdown()
- with tqdm(desc='upload', ncols=60,
- total=totalsize, unit='B', unit_scale=1) as pbar:
- tool.client_upload_files(
- bucket_name, large_file, large_target_file, pbar.update)

