当前位置:   article > 正文

SpringCloudAlibaba - 阿里系消息中间件RocketMQ_spring cloud alibaba rocketmq

spring cloud alibaba rocketmq

关于消息中间件及各种MQ对比,见博主之前的博客:ActiveMQ/RabbitMQ;本文讲解阿里开源中间件RocketMQ。

一. 环境搭建

上传最新的RocketMQ安装包 rocketmq-all-4.6.0-bin-release。

1. 解压配置文件

unzip rocketmq-all-4.6.0-bin-release.zip

报错:-bash: unzip: 未找到命令;解决办法:yum install -y unzip zip

2. 修改bin目录下NameServer、Broker服务器内存  默认为4g内存

  1. # vi runserver.sh
  2. JAVA_OPT="${JAVA_OPT} -server –Xms128m –Xmx128m –Xmn128m"
  3. # vi runbroker.sh
  4. JAVA_OPT="${JAVA_OPT} -server –Xms128m –Xmx128m –Xmn128m"

3. 启动NameServer

nohup sh bin/mqnamesrv &

4. 启动mqbroker

  虚拟机环境:

nohup sh bin/mqbroker -c ./conf/broker.conf -n 127.0.0.1:9876 &

  阿里云环境需要指定外网IP:

  1. echo "brokerIP1=47.104.xx.xxx" > broker.properties
  2. nohup sh bin/mqbroker -n ${namesrvIp}:9876 -c ./broker.properties &

5. Rocketmq-console控制端使用

  默认RocketMQ服务端是没有可视化界面的,不过阿里ROcketMQ已经开放了相应客户端,去官网http://rocketmq.apache.org/下载RocketMQ源码,把rocketmq-console导入IDEA,修改配置文件一行代码即可运行:

rocketmq.config.namesrvAddr=47.104.xx.xxx:9876

二. SpringBoot简单整合RocketMQ

  1. <dependency>
  2. <groupId>org.apache.rocketmq</groupId>
  3. <artifactId>rocketmq-spring-boot-starter</artifactId>
  4. <version>2.0.3</version>
  5. </dependency>
  1. rocketmq:
  2. # nameServer连接地址
  3. name-server: 47.104.xx.xxx:9876
  4. producer:
  5. group: my_producer #生产者必须要有分组,不然启动会报错,可能是自带的一个bug
  6. server:
  7. port: 8088
  1. @Data
  2. @AllArgsConstructor
  3. @ToString
  4. public class OrderEntity implements Serializable {
  5. private String orderId;
  6. private String orderName;
  7. }
  1. @RestController
  2. public class ProducerController {
  3. @Autowired
  4. private RocketMQTemplate rocketMQTemplate;
  5. @RequestMapping("/sendMsg")
  6. public String sendMsg() {
  7. OrderEntity orderEntity = new OrderEntity("123456","腾讯视频永久会员");
  8. rocketMQTemplate.convertAndSend("my-topic", orderEntity);
  9. return "success";
  10. }
  11. }
  1. import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
  2. import org.apache.rocketmq.spring.core.RocketMQListener;
  3. import org.springframework.stereotype.Service;
  4. @Service
  5. @RocketMQMessageListener(topic = "my-topic", consumerGroup = "my-topic-consume-group")
  6. public class OrderConsumer implements RocketMQListener<OrderEntity> {
  7. @Override
  8. public void onMessage(OrderEntity o) {
  9. System.out.println("o:" + o.toString());
  10. }
  11. }

    

可以发现,生产者发送消息后,消费者能够实时监听,注意:生产者和消费者的topic须保持一致。

三. RocketMQ进阶

1. RocketMQ集群架构

  

RocketMQ 四种集群部署方式

① 单个Master节点, 缺点就是如果宕机之后可能整个服务不可用;

②  多个Master节点,分摊存放我们的消息,缺点:没有Slave节点,主的Master节点宕机之后消息数据可能会丢失的;

③ 多个Master和Slave节点 采用异步形式 效率非常高 数据可能短暂产生延迟(毫秒级别的,建议,如上图)

④ 多个Master和Slave节点 采用同步形式, 效率比较低、数据不会产生延迟。

集群方式环境搭建:参考博客:https://www.cnblogs.com/kevingrace/p/9015836.htmlhttps://blog.csdn.net/leexide/article/details/80035470

  1. brokerClusterName = myCluster
  2. brokerName = broker-a
  3. brokerId = 0
  4. deleteWhen = 04
  5. fileReservedTime 48
  6. brokerRole = ASYNC_MASTER
  7. flushDiskType = ASYNC_FLUSH
  8. #namesrvAddr=192.168.0.5:9876;192.168.0.6:9876
  9. brokerClusterName = myCluster
  10. brokerName = broker-b
  11. brokerId = 0
  12. deleteWhen = 04
  13. fileReservedTime 48
  14. brokerRole = ASYNC_MASTER
  15. flushDiskType = ASYNC_FLUSH
  16. #namesrvAddr=192.168.0.5:9876;192.168.0.6:9876
  17. 上面是双主模式(没有从) - BrokerId为0表示Master,非0表示Slave
  18. 保证brokerClusterName,brokerName不同即可
  19. 且nameServer没有做集群,都用的105,所以注释掉namesrvAddr,如果双nameServer,则打开注释
  20. 1,先启动192.168.0.5的nameserver
  21. 2.启动192.168.0.5的broker
  22. 3.启动192.168.0.6的broker
  23. # 如下代码表示一主一备
  24. brokerName = broker-a
  25. brokerId = 0
  26. brokerName = broker-a
  27. brokerId = 1

2. RocketMQ消息顺序性问题(其实这种场景很少,除非要求十分苛刻的业务)

RocketMQ中,topic是队列的集合,队列是先进先出,只要有一个消费者,肯定是能够保证顺序的;不过RocketMQ不同,因为一个Broker分了4个队列,消息是存到不同队列的,相当于分摊存放到不同队列,消费者消费时无法拿到全局数据。

【顺序消息】最大的问题是消费者集群,队列集群(1个Broker分为4个队列,极大提高吞吐量)

解决顺序问题的核心思路:
① 相同的业务逻辑一定要放在同一个队列中(比如,新增,修改,删除等,是有顺序的,把这3个业务放在同一个队列)
② 每个队列必须要对应同一个消费者
③ RocketMQ中队列和消费者比例为1:1

  代码实现:(以下代码场景:单台RocketMQ,即一个Broker,4个队列)

  1. @RestController
  2. public class ProducerController {
  3. @Autowired
  4. private RocketMQTemplate rocketMQTemplate;
  5. @RequestMapping("/sendMsg")
  6. public String sendMsg() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
  7. Long orderId = System.currentTimeMillis();
  8. String insertSql = getSqlMsg("insert", orderId);
  9. String updateSql = getSqlMsg("update", orderId);
  10. String deleteSql = getSqlMsg("delete", orderId);
  11. Message insertMsg = new Message("zb-topic", insertSql.getBytes());
  12. Message updateMsg = new Message("zb-topic", updateSql.getBytes());
  13. Message deleteMsg = new Message("zb-topic", deleteSql.getBytes());
  14. DefaultMQProducer producer = rocketMQTemplate.getProducer();
  15. rocketMQTemplate.getProducer().send(insertMsg, new MessageQueueSelector() {
  16. @Override
  17. public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
  18. // 该消息存放到队列0中
  19. return mqs.get(0);
  20. }
  21. }, orderId);
  22. rocketMQTemplate.getProducer().send(updateMsg, new MessageQueueSelector() {
  23. @Override
  24. public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
  25. return mqs.get(0);
  26. }
  27. }, orderId);
  28. rocketMQTemplate.getProducer().send(deleteMsg, new MessageQueueSelector() {
  29. @Override
  30. public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
  31. return mqs.get(0);
  32. }
  33. }, orderId);
  34. return orderId + "";
  35. }
  36. public String getSqlMsg(String type, Long orderId) {
  37. JSONObject dataObject = new JSONObject();
  38. dataObject.put("type", type);
  39. dataObject.put("orderId", orderId);
  40. return dataObject.toJSONString();
  41. }
  42. }
  1. /**
  2. * ConsumeMode.ORDERLY 相当于一个队列对应一个线程
  3. * consumeThreadMax 可以设为4,四个线程去消费broker的信息,也能保证顺序问题,只不过线程不同
  4. */
  5. @Service
  6. @RocketMQMessageListener(topic = "zb-topic", consumerGroup = "zbTopic",
  7. consumeMode = ConsumeMode.ORDERLY, consumeThreadMax = 1)
  8. public class OrdeConsumer implements RocketMQListener<MessageExt> {
  9. @Override
  10. public void onMessage(MessageExt message) {
  11. System.out.println(Thread.currentThread().getName() + "," +
  12. "队列" + message.getQueueId() + "," + new String(message.getBody()));
  13. }
  14. }

四. RocketMQ解决分布式事务

分布式事务解决方案有很多很多,博主在之前的博客也讲解了几个,如RabbitMQ最终一致性,LCN等。本次案例为:

如图所示,相信我们都定过外卖,当提交订单后会在数据库生成一条订单,然后等待分配骑手送餐。

该业务在SpringCloud微服务架构拆分为两个服务,订单服务service-order和派单服务service-distribute,订单服务添加订单后,通过feign客户端调用派单服务的接口进行分配骑手,那么分布式事务问题就来了,当订单服务调用完第二行代码,派单接口执行完毕,咔嚓,第三行报了个错,那么订单接口会回滚,而派单则已提交事务,那么就造成数据不一致问题,本文用博主推荐的第三种方式RocketMQ来解决该分布式事务问题。

  首先进行service-order服务编写生产者,即订餐者:

  1. @RestController
  2. public class ProducerController {
  3. @Autowired
  4. private ProducerService producerService;
  5. @RequestMapping("/sendOrder")
  6. public String sendOrder() {
  7. return producerService.saveOrder();
  8. }
  9. }
  1. @Service
  2. public class ProducerService {
  3. @Autowired
  4. private RocketMQTemplate rocketMQTemplate;
  5. /** 提交订单业务逻辑 */
  6. public String saveOrder() {
  7. // 提前生成我们的订单id
  8. String orderId = System.currentTimeMillis() + "";
  9. /**
  10. * 1.提前生成我们的半消息
  11. * 2.半消息发送成功之后,再执行我们的本地事务
  12. */
  13. OrderEntity orderEntity = this.createOrder(orderId);
  14. String msg = JSONObject.toJSONString(orderEntity); //需转为JSONString类型
  15. MessageBuilder<String> stringMessageBuilder = MessageBuilder.withPayload(msg);
  16. stringMessageBuilder.setHeader("msg", msg);
  17. Message message = stringMessageBuilder.build();
  18. // 该api(sendMessageInTransaction)即为事务消息,俗称半消息
  19. rocketMQTemplate.sendMessageInTransaction("orderProducerGroup",
  20. "orderTopic", message, null); //该消息不允许被消费者消费
  21. // 一旦发送成功,直接去执行监听器SyncProducerListener的executeLocalTransaction方法
  22. return orderId;
  23. }
  24. /** 封装订单实体类 */
  25. public OrderEntity createOrder(String orderId) {
  26. OrderEntity orderEntity = new OrderEntity();
  27. orderEntity.setName("阿堡仔炸鸡汉堡-丁豪广场店");
  28. orderEntity.setOrderCreatetime(new Date());
  29. orderEntity.setOrderMoney(15d); // 价格是15元
  30. orderEntity.setOrderState(0); // 状态为未支付
  31. Long commodityId = 30L;
  32. orderEntity.setCommodityId(commodityId); // 模拟商品id为30
  33. orderEntity.setOrderId(orderId);
  34. return orderEntity;
  35. }
  36. }

  RocketMQ事务监听器

  1. @Component
  2. @RocketMQTransactionListener(txProducerGroup = "orderProducerGroup")
  3. public class SyncProducerListener implements RocketMQLocalTransactionListener {
  4. @Autowired
  5. private OrderMapper orderMapper;
  6. @Autowired
  7. private TransactionUtils transactionUtils;
  8. /**
  9. * 执行我们订单的事务(本地事务)
  10. */
  11. @Override
  12. public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
  13. MessageHeaders headers = msg.getHeaders();
  14. Object object = headers.get("msg");
  15. if (object == null) {
  16. return RocketMQLocalTransactionState.ROLLBACK; //rollback,删除半消息
  17. }
  18. String orderMsg = (String) object;
  19. OrderEntity orderEntity = JSONObject.parseObject(orderMsg, OrderEntity.class);
  20. TransactionStatus transactionStatus = null;
  21. try {
  22. transactionStatus = transactionUtils.begin();
  23. int result = orderMapper.addOrder(orderEntity);
  24. transactionUtils.commit(transactionStatus); // 本地事务提交即可,不用管result
  25. if (result == 0) {
  26. return RocketMQLocalTransactionState.ROLLBACK; //RocketMQ回滚,只要是rollback,则直接删除半消息
  27. }
  28. // 告诉我们的Broker可以消费该消息
  29. // 一旦RocketMQ提交后,直接走到OrderConsumer的监听器,不会走下面checkLocalTransaction方法
  30. return RocketMQLocalTransactionState.COMMIT;
  31. // return null; 如果返回null,或者Rocket..State.UNKNOWN,则等待60s走下面的checkLocalTransaction方法,
  32. } catch (Exception e) {
  33. if (transactionStatus != null) {
  34. transactionUtils.rollback(transactionStatus);
  35. return RocketMQLocalTransactionState.ROLLBACK;
  36. }
  37. }
  38. return null;
  39. }
  40. /**
  41. * 提供给Broker定时检查
  42. */
  43. @Override
  44. public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
  45. MessageHeaders headers = msg.getHeaders();
  46. Object object = headers.get("msg");
  47. if (object == null) {
  48. return RocketMQLocalTransactionState.ROLLBACK;
  49. }
  50. String orderMsg = (String) object;
  51. OrderEntity orderEntity = JSONObject.parseObject(orderMsg, OrderEntity.class);
  52. String orderId = orderEntity.getOrderId();
  53. // 直接查询我们的数据库(即用业务判断事务结果)
  54. OrderEntity orderDbEntity = orderMapper.findOrderId(orderId);
  55. if (orderDbEntity == null) {
  56. return RocketMQLocalTransactionState.UNKNOWN; //不确认,继续重试
  57. }
  58. return RocketMQLocalTransactionState.COMMIT;
  59. }
  60. }

  由于需要手动提交/回滚事务,贴上一段事务工具类:

  1. @Service
  2. public class TransactionUtils {
  3. @Autowired
  4. public DataSourceTransactionManager transactionManager;
  5. public TransactionStatus begin() {
  6. TransactionStatus transaction = transactionManager.getTransaction(new DefaultTransactionAttribute());
  7. return transaction;
  8. }
  9. public void commit(TransactionStatus transaction) {
  10. transactionManager.commit(transaction);
  11. }
  12. public void rollback(TransactionStatus transaction) {
  13. transactionManager.rollback(transaction);
  14. }
  15. }

  最后开始进行service-distribute服务,编写派单监听器

  1. @Service
  2. @RocketMQMessageListener(topic = "orderTopic", consumerGroup = "myTopicGroup")
  3. public class OrderConsumer implements RocketMQListener<String> {
  4. @Autowired
  5. private DispatchMapper dispatchMapper;
  6. @Override
  7. public void onMessage(String msg) {
  8. OrderEntity orderEntity = JSONObject.parseObject(msg, OrderEntity.class);
  9. String orderId = orderEntity.getOrderId();
  10. // 模拟userId为136
  11. DispatchEntity dispatchEntity = new DispatchEntity(orderId, 136L);
  12. // 添加派单(具体逻辑肯定没这么简单,根据业务编写相应代码即可)
  13. dispatchMapper.insertDistribute(dispatchEntity);
  14. }
  15. }

  【RocketMQ解决分布式事务总结】:

 Rocketmq解决分布式事务的核心思路:

1>  生产者向我们的Broker(MQ服务器端)发送我们派单消息设置为半消息,该消息不可以被消费者消费。

2>  执行我们的本地的事务,将本地执行事务结果提交或者回滚告诉Broker

3>  Broker获取本地事务的结果如果是为提交的话,将该半消息设置为允许被消费者消费,如果本地事务执行失败的情况下,将该半消息直接从Broker中移除

4>  如果我们的本地事务没有将结果及时通知给我们的Broker(网络波动等原因),这时候我们Broker会主动定时(默认60s)查询本地事务结果,最多重试15次,超时则删除半消息。

5>  如何获取本地事务执行结果?直接查询下订单在数据库是否存在即可,本地事务结果实际上就是一个回调方法,根据自己业务场景封装本地事务结果。

至此,SpringCloudAlibaba阿里系消息中间件RocketMQ基本结束,需要源码的小伙伴私聊我即可,免费赠送 ~ 

解决分布式事务的核心:必须先保证订单服务(调用方)事务能够百分百提交成功,如果事务提交成功,就可以发送派单消息;如果该事务回滚的情况下,则不发送派单消息。

简言之:确保调用方事务先执行成功~

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/712972
推荐阅读
相关标签
  

闽ICP备14008679号