赞
踩
欢迎来到小李哥全新亚马逊云科技AWS云计算知识学习系列,适用于任何无云计算或者亚马逊云科技技术背景的开发者,通过这篇文章大家零基础5分钟就能完全学会亚马逊云科技一个经典的服务开发架构方案。
我会每天介绍一个基于亚马逊云科技AWS云计算平台的全球前沿云开发/架构技术解决方案,帮助大家快速了解国际上最热门的云计算平台亚马逊云科技AWS最佳实践,并应用到自己的日常工作里。本次介绍的是如何利用亚马逊云科技上的MessageQueue为应用程序解耦,实现异步处理,提升应用扩展性、可用性和容错性的全部方案。本方案架构图如下:
亚马逊云科技的简单队列服务(Amazon Simple Queue Service,SQS)是一项全托管的消息队列服务,允许不同的应用组件之间发送、存储和接收消息。SQS 使得微服务、分布式系统和无服务器应用可以在高容量下解耦和扩展,不会丢失消息,也不需要各个服务同时在线。
SQS 使得应用的不同组件能够独立扩展。通过解耦服务,各组件可以根据自身的需求独立扩展,不会相互影响,从而更有效地处理不同的负载。
SQS 确保了服务之间消息传递的可靠性,即使其中一个服务暂时不可用,也不会丢失消息。这种机制使系统能够平稳运行,提高了应用的整体可靠性。
通过解耦应用组件,SQS 减少了管理服务之间依赖关系的复杂性。这使得更新、修改或替换单个组件变得更加容易,不会影响整个系统。
SQS 提供异步通信,允许各个服务以自己的节奏处理消息。这种灵活性特别适合工作负载不均或需要整合运行速度不同的服务的场景。
使用 SQS,可以根据需要扩展特定的组件,而不是为整个系统过度配置资源。这种有针对性的扩展优化了资源利用率,节省了成本。
1. 首先我们登录亚马逊云科技控制台,进入亚马逊云科技SNS消息服务
2. 创建一个SNS消息话题“ImageNotification”,每一个话题就是一个传递消息的通道。点击“Next Step”进入创建页面,点击Create创建。
3. 点击进入该SNS话题,复制SNS的ARN ID。
4. 我们再进入Message Queue:SQS服务,点击"Create Queue"创建一个SQS。为Queue命名为“ImageQueue”
5. 在Access Policy中复制以下内容,将刚刚复制的SNS ARN ID替换到“SNS_TOPIC_ARN”的值,授权SNS发送消息到SQS中。最后点击"Create Queue"创建该队列。
- ,{
- "Effect":"Allow",
- "Principal": {
- "Service": "sns.amazonaws.com"
- },
- "Action":"sqs:SendMessage",
- "Resource":"SQS_QUEUE_ARN",
- "Condition":{
- "ArnEquals":{
- "aws:SourceArn":"SNS_TOPIC_ARN"
- }
- }
- }
6. 接下来我们进入创建的Queue中,复制访问该Queue的URL,该URL可以用过API “sqs.send_message()调用”。
7. 接下来我们将该Queue订阅到我们刚刚创建的SNS消息通道。
8. 接下来我们选择订阅到我们刚刚创建的SNS话题“ImageNotification”,点击Save保存。
9. 接下来我们就可以通过以下代码示例向SNS发送消息,并将SNS消息同时发送给多个订阅者(微服务),相对于点对点的SQS订阅,有更好灵活的分发功能。
- import boto3
- import json
-
- # 配置 AWS 认证
- # 注意:你可以通过AWS CLI配置文件或者环境变量来设置你的凭证
- sns = boto3.client('sns', region_name='us-east-1')
-
- # 替换为你的SNS话题ARN
- topic_arn = 'arn:aws:sns:us-east-1:123456789012:my-topic'
-
- # 发送消息到SNS话题
- def publish_message():
- message = {
- 'order_id': '123456',
- 'status': 'created',
- 'customer_id': '78910'
- }
- response = sns.publish(
- TopicArn=topic_arn,
- Message=json.dumps(message),
- Subject='Order Created',
- MessageAttributes={
- 'AttributeOne': {
- 'StringValue': 'Value1',
- 'DataType': 'String'
- }
- }
- )
- print(f"Message sent with ID: {response['MessageId']}")
-
- if __name__ == '__main__':
- publish_message()
- import boto3
-
- # 配置 AWS 区域
- region_name = 'us-east-1'
-
- # 创建 SNS 客户端
- sns = boto3.client('sns', region_name=region_name)
-
- # 创建 SQS 客户端
- sqs = boto3.client('sqs', region_name=region_name)
-
- # 创建 SNS 主题
- def create_sns_topic(topic_name):
- response = sns.create_topic(Name=topic_name)
- topic_arn = response['TopicArn']
- print(f"Created SNS topic with ARN: {topic_arn}")
- return topic_arn
-
- # 创建 SQS 队列
- def create_sqs_queue(queue_name):
- response = sqs.create_queue(QueueName=queue_name)
- queue_url = response['QueueUrl']
- print(f"Created SQS queue with URL: {queue_url}")
- return queue_url
-
- # 获取 SQS 队列的 ARN
- def get_sqs_queue_arn(queue_url):
- response = sqs.get_queue_attributes(
- QueueUrl=queue_url,
- AttributeNames=['QueueArn']
- )
- return response['Attributes']['QueueArn']
-
- # 订阅 SQS 到 SNS 主题
- def subscribe_sqs_to_sns(topic_arn, queue_arn):
- response = sns.subscribe(
- TopicArn=topic_arn,
- Protocol='sqs',
- Endpoint=queue_arn
- )
- subscription_arn = response['SubscriptionArn']
- print(f"Subscribed SQS to SNS with Subscription ARN: {subscription_arn}")
- return subscription_arn
-
- # 设置 SQS 队列策略,允许SNS发送消息到SQS队列
- def set_sqs_policy(queue_url, queue_arn, topic_arn):
- policy = {
- "Version": "2012-10-17",
- "Statement": [
- {
- "Effect": "Allow",
- "Principal": {"Service": "sns.amazonaws.com"},
- "Action": "SQS:SendMessage",
- "Resource": queue_arn,
- "Condition": {
- "ArnEquals": {"aws:SourceArn": topic_arn}
- }
- }
- ]
- }
-
- sqs.set_queue_attributes(
- QueueUrl=queue_url,
- Attributes={
- 'Policy': json.dumps(policy)
- }
- )
- print("SQS policy set to allow SNS topic to send messages.")
-
- if __name__ == '__main__':
- # 定义 SNS 主题和 SQS 队列的名称
- topic_name = 'my-sns-topic'
- queue_name = 'my-sqs-queue'
-
- # 创建 SNS 主题
- topic_arn = create_sns_topic(topic_name)
-
- # 创建 SQS 队列
- queue_url = create_sqs_queue(queue_name)
-
- # 获取 SQS 队列的 ARN
- queue_arn = get_sqs_queue_arn(queue_url)
-
- # 设置 SQS 队列策略,允许SNS发送消息到SQS
- set_sqs_policy(queue_url, queue_arn, topic_arn)
-
- # 订阅 SQS 队列到 SNS 主题
- subscribe_sqs_to_sns(topic_arn, queue_arn)
-
- print("SNS topic and SQS queue setup completed.")
创建SNS Topic (
create_sns_topic
):使用
create_topic
方法创建一个SNS话题,并返回其ARN(Amazon Resource Name)。创建SQS队列 (
create_sqs_queue
):使用
create_queue
方法创建一个SQS队列,并返回其URL。获取SQS队列的ARN (
get_sqs_queue_arn
):使用
get_queue_attributes
方法获取队列的ARN,这是订阅SNS话题所需要的。设置SQS队列的策略 (
set_sqs_policy
):创建并应用一个队列策略,使SNS话题能够向SQS队列发送消息。这一步是必须的,否则即使SNS订阅成功,消息也无法发送到SQS队列。
订阅SQS到SNS话题 (
subscribe_sqs_to_sns
):使用
subscribe
方法将SQS队列订阅到SNS话题。这里需要提供队列的ARN和SNS话题的ARN。设置SQS队列策略 (
set_sqs_policy
):这里设置SQS队列的访问策略,允许SNS话题向该队列发送消息。
以上就是在亚马逊云科技上利用Message Queue为应用解耦,提升可用性和容错性的全部步骤。欢迎大家关注0基础5分钟上手AWS系列,未来获取更多国际前沿的AWS云开发/云架构方案!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。