赞
踩
关于消息中间件及各种MQ对比,见博主之前的博客:ActiveMQ/RabbitMQ;本文讲解阿里开源中间件RocketMQ。
上传最新的RocketMQ安装包 rocketmq-all-4.6.0-bin-release。
1. 解压配置文件
unzip rocketmq-all-4.6.0-bin-release.zip
报错:-bash: unzip: 未找到命令;解决办法:yum install -y unzip zip
2. 修改bin目录下NameServer、Broker服务器内存 默认为4g内存
- # vi runserver.sh
- JAVA_OPT="${JAVA_OPT} -server –Xms128m –Xmx128m –Xmn128m"
-
- # vi runbroker.sh
- JAVA_OPT="${JAVA_OPT} -server –Xms128m –Xmx128m –Xmn128m"
3. 启动NameServer
nohup sh bin/mqnamesrv &
4. 启动mqbroker
虚拟机环境:
nohup sh bin/mqbroker -c ./conf/broker.conf -n 127.0.0.1:9876 &
阿里云环境需要指定外网IP:
- echo "brokerIP1=47.104.xx.xxx" > broker.properties
- nohup sh bin/mqbroker -n ${namesrvIp}:9876 -c ./broker.properties &
5. Rocketmq-console控制端使用
默认RocketMQ服务端是没有可视化界面的,不过阿里ROcketMQ已经开放了相应客户端,去官网http://rocketmq.apache.org/下载RocketMQ源码,把rocketmq-console导入IDEA,修改配置文件一行代码即可运行:
rocketmq.config.namesrvAddr=47.104.xx.xxx:9876
- <dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-spring-boot-starter</artifactId>
- <version>2.0.3</version>
- </dependency>
- rocketmq:
- # nameServer连接地址
- name-server: 47.104.xx.xxx:9876
- producer:
- group: my_producer #生产者必须要有分组,不然启动会报错,可能是自带的一个bug
- server:
- port: 8088
- @Data
- @AllArgsConstructor
- @ToString
- public class OrderEntity implements Serializable {
- private String orderId;
- private String orderName;
- }
- @RestController
- public class ProducerController {
- @Autowired
- private RocketMQTemplate rocketMQTemplate;
-
- @RequestMapping("/sendMsg")
- public String sendMsg() {
- OrderEntity orderEntity = new OrderEntity("123456","腾讯视频永久会员");
- rocketMQTemplate.convertAndSend("my-topic", orderEntity);
- return "success";
- }
- }
- import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
- import org.apache.rocketmq.spring.core.RocketMQListener;
- import org.springframework.stereotype.Service;
-
- @Service
- @RocketMQMessageListener(topic = "my-topic", consumerGroup = "my-topic-consume-group")
- public class OrderConsumer implements RocketMQListener<OrderEntity> {
- @Override
- public void onMessage(OrderEntity o) {
- System.out.println("o:" + o.toString());
- }
- }
可以发现,生产者发送消息后,消费者能够实时监听,注意:生产者和消费者的topic须保持一致。
1. RocketMQ集群架构
RocketMQ 四种集群部署方式
① 单个Master节点, 缺点就是如果宕机之后可能整个服务不可用;
② 多个Master节点,分摊存放我们的消息,缺点:没有Slave节点,主的Master节点宕机之后消息数据可能会丢失的;
③ 多个Master和Slave节点 采用异步形式 效率非常高 数据可能短暂产生延迟(毫秒级别的,建议,如上图)
④ 多个Master和Slave节点 采用同步形式, 效率比较低、数据不会产生延迟。
集群方式环境搭建:参考博客:https://www.cnblogs.com/kevingrace/p/9015836.html,https://blog.csdn.net/leexide/article/details/80035470
- brokerClusterName = myCluster
- brokerName = broker-a
- brokerId = 0
- deleteWhen = 04
- fileReservedTime 48
- brokerRole = ASYNC_MASTER
- flushDiskType = ASYNC_FLUSH
- #namesrvAddr=192.168.0.5:9876;192.168.0.6:9876
-
- brokerClusterName = myCluster
- brokerName = broker-b
- brokerId = 0
- deleteWhen = 04
- fileReservedTime 48
- brokerRole = ASYNC_MASTER
- flushDiskType = ASYNC_FLUSH
- #namesrvAddr=192.168.0.5:9876;192.168.0.6:9876
-
- 上面是双主模式(没有从) - BrokerId为0表示Master,非0表示Slave
- 保证brokerClusterName,brokerName不同即可
- 且nameServer没有做集群,都用的105,所以注释掉namesrvAddr,如果双nameServer,则打开注释
- 1,先启动192.168.0.5的nameserver
- 2.启动192.168.0.5的broker
- 3.启动192.168.0.6的broker
-
- # 如下代码表示一主一备
- brokerName = broker-a
- brokerId = 0
-
- brokerName = broker-a
- brokerId = 1
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
2. RocketMQ消息顺序性问题(其实这种场景很少,除非要求十分苛刻的业务)
RocketMQ中,topic是队列的集合,队列是先进先出,只要有一个消费者,肯定是能够保证顺序的;不过RocketMQ不同,因为一个Broker分了4个队列,消息是存到不同队列的,相当于分摊存放到不同队列,消费者消费时无法拿到全局数据。
【顺序消息】最大的问题是消费者集群,队列集群(1个Broker分为4个队列,极大提高吞吐量)
解决顺序问题的核心思路:
① 相同的业务逻辑一定要放在同一个队列中(比如,新增,修改,删除等,是有顺序的,把这3个业务放在同一个队列)
② 每个队列必须要对应同一个消费者
③ RocketMQ中队列和消费者比例为1:1
代码实现:(以下代码场景:单台RocketMQ,即一个Broker,4个队列)
- @RestController
- public class ProducerController {
- @Autowired
- private RocketMQTemplate rocketMQTemplate;
-
- @RequestMapping("/sendMsg")
- public String sendMsg() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
- Long orderId = System.currentTimeMillis();
- String insertSql = getSqlMsg("insert", orderId);
- String updateSql = getSqlMsg("update", orderId);
- String deleteSql = getSqlMsg("delete", orderId);
- Message insertMsg = new Message("zb-topic", insertSql.getBytes());
- Message updateMsg = new Message("zb-topic", updateSql.getBytes());
- Message deleteMsg = new Message("zb-topic", deleteSql.getBytes());
- DefaultMQProducer producer = rocketMQTemplate.getProducer();
-
- rocketMQTemplate.getProducer().send(insertMsg, new MessageQueueSelector() {
- @Override
- public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
- // 该消息存放到队列0中
- return mqs.get(0);
- }
- }, orderId);
- rocketMQTemplate.getProducer().send(updateMsg, new MessageQueueSelector() {
- @Override
- public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
- return mqs.get(0);
- }
- }, orderId);
- rocketMQTemplate.getProducer().send(deleteMsg, new MessageQueueSelector() {
- @Override
- public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
- return mqs.get(0);
- }
- }, orderId);
-
- return orderId + "";
- }
- public String getSqlMsg(String type, Long orderId) {
- JSONObject dataObject = new JSONObject();
- dataObject.put("type", type);
- dataObject.put("orderId", orderId);
- return dataObject.toJSONString();
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
- /**
- * ConsumeMode.ORDERLY 相当于一个队列对应一个线程
- * consumeThreadMax 可以设为4,四个线程去消费broker的信息,也能保证顺序问题,只不过线程不同
- */
- @Service
- @RocketMQMessageListener(topic = "zb-topic", consumerGroup = "zbTopic",
- consumeMode = ConsumeMode.ORDERLY, consumeThreadMax = 1)
- public class OrdeConsumer implements RocketMQListener<MessageExt> {
-
- @Override
- public void onMessage(MessageExt message) {
- System.out.println(Thread.currentThread().getName() + "," +
- "队列" + message.getQueueId() + "," + new String(message.getBody()));
- }
- }
分布式事务解决方案有很多很多,博主在之前的博客也讲解了几个,如RabbitMQ最终一致性,LCN等。本次案例为:
如图所示,相信我们都定过外卖,当提交订单后会在数据库生成一条订单,然后等待分配骑手送餐。
该业务在SpringCloud微服务架构拆分为两个服务,订单服务service-order和派单服务service-distribute,订单服务添加订单后,通过feign客户端调用派单服务的接口进行分配骑手,那么分布式事务问题就来了,当订单服务调用完第二行代码,派单接口执行完毕,咔嚓,第三行报了个错,那么订单接口会回滚,而派单则已提交事务,那么就造成数据不一致问题,本文用博主推荐的第三种方式RocketMQ来解决该分布式事务问题。
首先进行service-order服务编写生产者,即订餐者:
- @RestController
- public class ProducerController {
- @Autowired
- private ProducerService producerService;
-
- @RequestMapping("/sendOrder")
- public String sendOrder() {
- return producerService.saveOrder();
- }
- }
- @Service
- public class ProducerService {
-
- @Autowired
- private RocketMQTemplate rocketMQTemplate;
-
- /** 提交订单业务逻辑 */
- public String saveOrder() {
- // 提前生成我们的订单id
- String orderId = System.currentTimeMillis() + "";
- /**
- * 1.提前生成我们的半消息
- * 2.半消息发送成功之后,再执行我们的本地事务
- */
- OrderEntity orderEntity = this.createOrder(orderId);
- String msg = JSONObject.toJSONString(orderEntity); //需转为JSONString类型
- MessageBuilder<String> stringMessageBuilder = MessageBuilder.withPayload(msg);
- stringMessageBuilder.setHeader("msg", msg);
- Message message = stringMessageBuilder.build();
- // 该api(sendMessageInTransaction)即为事务消息,俗称半消息
- rocketMQTemplate.sendMessageInTransaction("orderProducerGroup",
- "orderTopic", message, null); //该消息不允许被消费者消费
- // 一旦发送成功,直接去执行监听器SyncProducerListener的executeLocalTransaction方法
- return orderId;
-
- }
- /** 封装订单实体类 */
- public OrderEntity createOrder(String orderId) {
- OrderEntity orderEntity = new OrderEntity();
- orderEntity.setName("阿堡仔炸鸡汉堡-丁豪广场店");
- orderEntity.setOrderCreatetime(new Date());
- orderEntity.setOrderMoney(15d); // 价格是15元
- orderEntity.setOrderState(0); // 状态为未支付
- Long commodityId = 30L;
- orderEntity.setCommodityId(commodityId); // 模拟商品id为30
- orderEntity.setOrderId(orderId);
- return orderEntity;
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
RocketMQ事务监听器:
- @Component
- @RocketMQTransactionListener(txProducerGroup = "orderProducerGroup")
- public class SyncProducerListener implements RocketMQLocalTransactionListener {
-
- @Autowired
- private OrderMapper orderMapper;
- @Autowired
- private TransactionUtils transactionUtils;
-
- /**
- * 执行我们订单的事务(本地事务)
- */
- @Override
- public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
-
- MessageHeaders headers = msg.getHeaders();
- Object object = headers.get("msg");
- if (object == null) {
- return RocketMQLocalTransactionState.ROLLBACK; //rollback,删除半消息
- }
- String orderMsg = (String) object;
- OrderEntity orderEntity = JSONObject.parseObject(orderMsg, OrderEntity.class);
- TransactionStatus transactionStatus = null;
- try {
- transactionStatus = transactionUtils.begin();
- int result = orderMapper.addOrder(orderEntity);
- transactionUtils.commit(transactionStatus); // 本地事务提交即可,不用管result
- if (result == 0) {
- return RocketMQLocalTransactionState.ROLLBACK; //RocketMQ回滚,只要是rollback,则直接删除半消息
- }
- // 告诉我们的Broker可以消费该消息
- // 一旦RocketMQ提交后,直接走到OrderConsumer的监听器,不会走下面checkLocalTransaction方法
- return RocketMQLocalTransactionState.COMMIT;
- // return null; 如果返回null,或者Rocket..State.UNKNOWN,则等待60s走下面的checkLocalTransaction方法,
- } catch (Exception e) {
- if (transactionStatus != null) {
- transactionUtils.rollback(transactionStatus);
- return RocketMQLocalTransactionState.ROLLBACK;
- }
- }
- return null;
- }
-
- /**
- * 提供给Broker定时检查
- */
- @Override
- public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
- MessageHeaders headers = msg.getHeaders();
- Object object = headers.get("msg");
- if (object == null) {
- return RocketMQLocalTransactionState.ROLLBACK;
- }
- String orderMsg = (String) object;
- OrderEntity orderEntity = JSONObject.parseObject(orderMsg, OrderEntity.class);
- String orderId = orderEntity.getOrderId();
- // 直接查询我们的数据库(即用业务判断事务结果)
- OrderEntity orderDbEntity = orderMapper.findOrderId(orderId);
- if (orderDbEntity == null) {
- return RocketMQLocalTransactionState.UNKNOWN; //不确认,继续重试
- }
- return RocketMQLocalTransactionState.COMMIT;
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
由于需要手动提交/回滚事务,贴上一段事务工具类:
- @Service
- public class TransactionUtils {
-
- @Autowired
- public DataSourceTransactionManager transactionManager;
-
- public TransactionStatus begin() {
- TransactionStatus transaction = transactionManager.getTransaction(new DefaultTransactionAttribute());
- return transaction;
- }
- public void commit(TransactionStatus transaction) {
- transactionManager.commit(transaction);
- }
- public void rollback(TransactionStatus transaction) {
- transactionManager.rollback(transaction);
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
最后开始进行service-distribute服务,编写派单监听器:
- @Service
- @RocketMQMessageListener(topic = "orderTopic", consumerGroup = "myTopicGroup")
- public class OrderConsumer implements RocketMQListener<String> {
-
- @Autowired
- private DispatchMapper dispatchMapper;
-
- @Override
- public void onMessage(String msg) {
- OrderEntity orderEntity = JSONObject.parseObject(msg, OrderEntity.class);
- String orderId = orderEntity.getOrderId();
- // 模拟userId为136
- DispatchEntity dispatchEntity = new DispatchEntity(orderId, 136L);
- // 添加派单(具体逻辑肯定没这么简单,根据业务编写相应代码即可)
- dispatchMapper.insertDistribute(dispatchEntity);
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
【RocketMQ解决分布式事务总结】:
Rocketmq解决分布式事务的核心思路:
1> 生产者向我们的Broker(MQ服务器端)发送我们派单消息设置为半消息,该消息不可以被消费者消费。
2> 执行我们的本地的事务,将本地执行事务结果提交或者回滚告诉Broker
3> Broker获取本地事务的结果如果是为提交的话,将该半消息设置为允许被消费者消费,如果本地事务执行失败的情况下,将该半消息直接从Broker中移除
4> 如果我们的本地事务没有将结果及时通知给我们的Broker(网络波动等原因),这时候我们Broker会主动定时(默认60s)查询本地事务结果,最多重试15次,超时则删除半消息。
5> 如何获取本地事务执行结果?直接查询下订单在数据库是否存在即可,本地事务结果实际上就是一个回调方法,根据自己业务场景封装本地事务结果。
至此,SpringCloudAlibaba阿里系消息中间件RocketMQ基本结束,需要源码的小伙伴私聊我即可,免费赠送 ~
解决分布式事务的核心:必须先保证订单服务(调用方)事务能够百分百提交成功,如果事务提交成功,就可以发送派单消息;如果该事务回滚的情况下,则不发送派单消息。
简言之:确保调用方事务先执行成功~
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。