赞
踩
异步发送能够提升发送效率, 适合高并发场景下使用, 基于RocketMQ集成之普通消息发送做改造:
增加异步发送接口
com.mirson.spring.boot.mq.rocket.basic.provider.RocketMqProviderContorller
/**
* 异步发送消息
* @return
*/
@GetMapping("/asyncSendString")
public String asyncSendString() {
for(int i=0; i<10; i++) {
String msg = "seq number: " + i;
final String seq = String.valueOf(i);
// 异步方式发送
rocketMQTemplate.asyncSend(RabbitMqConfig.TOPIC, msg, new SendCallback() {
public void onSuccess(SendResult sendResult) {
// 发送成功回调处理
log.info("seq number: " + seq + ", send result: " + sendResult.getSendStatus());
}
public void onException(Throwable e) {
// 发送异常回调处理
log.error(e.getMessage(), e);
}
});
}
return "async send success";
}
测试验证
调用异步发送接口
异步方式发送十条消息。
监听器日志
十条消息全部接收成功, 注意消息的发送顺序, 与订阅的接收顺序, 没有保持一致;异步方式能够提升发送效率, 但缺点是不能保障消息的有序消费,在实际使用中, 可以结合同步锁来使用, 比如可以根据账户ID加锁, 因每个账户数据具有独立性, 这样可以提升消息的传递发送效率, 又能保障每个账户接收到的数据是有序的。
ACL是Access Control List简称, 意为访问控制列表, 是RocketMQ4.4新加入的功能。加入ACL能够通过权限管理控制消息队列, 针对不同角色用户分配不同的队列操作权限, 便于权限管控, 提升消息队列数据的安全性。
ACL基本处理流程
创建rocketmq-acl工程
工程配置
application.yml
server:
port: 12615
spring:
application:
name: rocketmq-acl
# RocketMQ配置
rocketmq:
name-server: 10.10.20.15:9876
# 生产者配置
producer:
group: basic-group
# 权限信息
access-key: rocketmq2
secret-key: 12345678
# 消费者配置
consumer:
# 权限信息
accessKey: rocketmq2
secret-key: 12345678
注意producer与consumer, 都需配置权限信息, accessKey相当于用户名, secret-key相当于密码。
这些信息不是随便填写, 要与RocketMQ服务配置文件保持一致。
定义消息监听器
com.mirson.spring.boot.mq.rocket.acl.consume.StrSpringMessageConsumer
@Service
@RocketMQMessageListener(
topic = RabbitMqConfig.TOPIC_SPRING_MESSAGE,
consumerGroup = RabbitMqConfig.CONSUME_GROUP_SPRING_MESSAGE
// accessKey = "RocketMQ", // 不需再填写, 会自动从配置文件中读取
// secretKey = "12345678" // 不需再填写, 会自动从配置文件中读取
)
@Log4j2
public class StrSpringMessageConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String str) {
log.info("StrSpringMessageConsumer => receive str: " + str);
}
}
通过RocketMQMessageListener注解, 也可以配置accessKey与secretKey信息, 但在工程配置文件中我们已经填好, 系统启动会自动读取, 可以不用再填写。
定义发送接口
com.mirson.spring.boot.mq.rocket.acl.provider.RocketMqProviderContorller
/**
* 发送RocketMQ Spring Message封装消息
* @return
*/
@GetMapping("/sendSpringMessage")
public String sendSpringMessage() {
String msg = "random number: " + RandomUtils.nextInt(0, 100);
// Send Spring Message With String
SendResult result = rocketMQTemplate.syncSend(RabbitMqConfig.TOPIC_SPRING_MESSAGE , MessageBuilder.withPayload(msg).build());
log.info("send result: " + result.getSendStatus());
return msg;
}
发送一个Spring Message封装的消息, 调用rocketMQTemplate的syncSend方法发送数据, 无需加额外参数。
RabiitMQ服务端用户权限设置
测试之前, 先要确保RocketMQ服务器开启了ACL验证功能。
开启ACL验证
修改%RABBITMQ_HOME%/conf/broker.conf文件, 末尾增加:
#开启ACL权限控制功能
aclEnable=true
分配用户权限
修改%RABBITMQ_HOME%/conf/plain_acl.yml文件:
globalWhiteRemoteAddresses:
accounts:
- accessKey: RocketMQ
secretKey: 12345678
whiteRemoteAddress:
admin: false
defaultTopicPerm: DENY
defaultGroupPerm: SUB
topicPerms:
- topicA=DENY
- topicB=PUB|SUB
- topicC=SUB
groupPerms:
# the group should convert to retry topic
- groupA=DENY
- groupB=PUB|SUB
- groupC=SUB
- accessKey: rocketmq2
secretKey: 12345678
whiteRemoteAddress: 192.168.1.*
# if it is admin, it could access all resources
admin: true
defaultTopicPerm: PUB|SUB
defaultGroupPerm: SUB
topicPerms:
- topicA=DENY
- topicB=PUB|SUB
- topic_acl_spring_message=PUB|SUB
- topic_acl_transaction_spring_message=PUB|SUB
- topicC=SUB
groupPerms:
# the group should convert to retry topic
- groupA=DENY
- groupB=PUB|SUB
- groupC=SUB
- group_acl_spring_message=PUB|SUB
- group_acl_transaction_spring_message=PUB|SUB
默认该配置文件下会有两个用户, RocketMQ与rocketmq2, 这里要修改rocketmq2的权限。
rocketmq2用户虽然具有admin权限, 但是Rocketmq的ACL处理源码仍要读取topicPerms属性配置,否则会报错, 这里追加我们用于ACL测试的相关TOPIC与GROUP, 确保rocketmq2用户拥有测试时的所有权限, RocketMQ用户则无权限:
defaultTopicPerm: PUB|SUB
defaultGroupPerm: SUB
topicPerms:
- topicA=DENY
- topicB=PUB|SUB
- topic_acl_spring_message=PUB|SUB
- topic_acl_transaction_spring_message=PUB|SUB
- topicC=SUB
groupPerms:
# the group should convert to retry topic
- groupA=DENY
- groupB=PUB|SUB
- groupC=SUB
- group_acl_spring_message=PUB|SUB
- group_acl_transaction_spring_message=PUB|SUB
权限控制参数说明
字段 | 取值 | 含义 |
---|---|---|
globalWhiteRemoteAddresses | ;192.168..*;192.168.0.1 | 全局IP白名单 |
accessKey | 字符串 | Access Key 用户名 |
secretKey | 字符串 | Secret Key 密码 |
whiteRemoteAddress | ;192.168..*;192.168.0.1 | 用户IP白名单 |
admin | true;false | 是否管理员账户 |
defaultTopicPerm | DENY;PUB;SUB;PUB|SUB | 默认的Topic权限 |
defaultGroupPerm | DENY;PUB;SUB;PUB|SUB | 默认的ConsumerGroup权限 |
topicPerms | topic=权限 | 各个Topic的权限 |
groupPerms | group=权限 | 各个ConsumerGroup的权限 |
测试验证
权限分配好后, 重启RabbitMQ服务, 要确保读取的是我们修改的配置文件。
启动NameServer
nohup bin/mqnamesrv >/dev/null 2>&1 &
启动Broker
sh bin/mqbroker -n 127.0.0.1:9876 -c /usr/local/rocketmq4.4/conf/broker.conf &
关闭Broker
bin/mqshutdown broker
关闭name server
bin/mqshutdown namesrv
使用RocketMQ用户,发送数据, 预期应该是无权限。
修改配置文件, 启动服务:
# RocketMQ配置
rocketmq:
name-server: 10.10.20.15:9876
# 生产者配置
producer:
group: basic-group
# 权限信息
access-key: RocketMQ
secret-key: 12345678
# 消费者配置
consumer:
# 权限信息
accessKey: RocketMQ
secret-key: 12345678
访问发送接口
http://127.0.0.1:12615/sendSpringMessage
出现异常, 查看控制台日志:
没有该主题topic_acl_spring_message的操作权限, ACL正常生效。
使用rocketmq2用户,发送数据,预期是可以正常发送与接收
修改配置文件:
# RocketMQ配置
rocketmq:
name-server: 10.10.20.15:9876
# 生产者配置
producer:
group: basic-group
# 权限信息
access-key: rocketmq2
secret-key: 12345678
# 消费者配置
consumer:
# 权限信息
accessKey: rocketmq2
secret-key: 12345678
调用发送接口
查看控制台监听日志:
通过ACL权限控制, 能够正常发送与接收队列数据。
假设场景,用户进行转账, 先扣除自身的账户金额, 再发送消息通知, 增加对方的账户金额, 在发送消息通知的过程中如果失败该如何处理? 为了解决本地事务执行与消息发送的原子性问题, RocketMQ推出了Transaction事务消息(并非分布式事务解决方案, 但可以基于此功能,与补偿机制实现一套方案)。 具体处理机制:
仍采用ACL机制, 基于rocketmq-acl工程改造实现
增加接收监听器
com.mirson.spring.boot.mq.rocket.acl.consume.StringTransactionConsumer
@Service
@RocketMQMessageListener(
topic = RabbitMqConfig.TOPIC_SPRING_TRANSACTION_MESSAGE,
consumerGroup = RabbitMqConfig.CONSUME_GROUP_TRANSACTION_SPRING_MESSAGE
)
@Log4j2
public class StringTransactionConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
log.info("StringTransactionConsumer => receive transaction str: " + message);
}
}
这里订阅的GROUP为RabbitMqConfig.CONSUME_GROUP_TRANSACTION_SPRING_MESSAGE。
自定义事务监听器
com.mirson.spring.boot.mq.rocket.acl.config.TransactionListener
@RocketMQTransactionListener(
txProducerGroup = RabbitMqConfig.CONSUME_GROUP_TRANSACTION_SPRING_MESSAGE
)
@Log4j2
public class TransactionListener implements RocketMQLocalTransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<String, Integer>();
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
String transId = (String)msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(transId, status);
if (status == 0) {
return RocketMQLocalTransactionState.COMMIT;
}
if (status == 1) {
log.info(" # ROLLBACK # Simulating %s related local transaction exec failed! {}", new String((byte[])msg.getPayload()));
return RocketMQLocalTransactionState.ROLLBACK;
}
return RocketMQLocalTransactionState.UNKNOWN;
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
String transId = (String)msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
RocketMQLocalTransactionState retState = RocketMQLocalTransactionState.COMMIT;
Integer status = localTrans.get(transId);
if (null != status) {
switch (status) {
case 0:
retState = RocketMQLocalTransactionState.UNKNOWN;
break;
case 1:
retState = RocketMQLocalTransactionState.COMMIT;
break;
case 2:
retState = RocketMQLocalTransactionState.COMMIT;
break;
}
}
return retState;
}
}
定义发送接口
/**
* 发送RocketMQ Transaction 事务消息
* @return
*/
@GetMapping("/sendTransactionMessage")
public String sendTransactionMessage() {
for (int i = 0; i < 10; i++)
{
Message msg = MessageBuilder.withPayload("seq number " + i).
setHeader(RocketMQHeaders.TRANSACTION_ID, "KEY_" + i).build();
SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(RabbitMqConfig.CONSUME_GROUP_TRANSACTION_SPRING_MESSAGE,
RabbitMqConfig.TOPIC_SPRING_TRANSACTION_MESSAGE, msg, null);
log.info("seq " + i + " send result: " + sendResult.getSendStatus());
}
return "send transaction message success.";
}
定义sendTransactionMessage接口发送事务消息, 这里连续发送十条事务消息,调用rocketMQTemplate 的sendMessageInTransaction方法, 指定配置的组别与主题信息。
测试验证
发送十条事务消息, 在事务监听器里面, 有部分数据会出现回滚, 下面验证, 监听器是否正常接收确认的消息, 能否接收到回滚的消息。
调用接口
查看接收日志结果
可以看到, 成功发送了十条数据, 有4条数据出现回滚, 监听器打印接收了6条数据, 验证成功。
这里全面的讲解RocketMQ技术点, 相对较多, 也可以看出RocketMQ功能比较丰富, 有较好的扩展性,灵活性,适用各种业务场景, 不仅可以与Spring Boot 集成, 还可以支持Spring Cloud Stream 在微服务中应用。RocketMQ还支持消息轨迹跟踪, 异步顺序发送, 并发消费等, 更多功能大家可以再深入研究, 能够更好的适应生产项目对不同场景的使用要求。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。