赞
踩
RocketMQ是阿里巴巴旗下一款开源的MQ框架,经历过双十一考验、Java编程语言实现,有非常好完整生态系统
Rocketmq相比于Rabbitmq、kafka具有主要优势特性有:
RocketMQ集群搭建,参考下面我的一篇博客:
RocketMQ双主双备集群搭建: https://blog.csdn.net/qq_43692950/article/details/111823611
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.3</version>
</dependency>
server:
port: 80
rocketmq:
# name-server: 192.168.40.128:9876;192.168.40.130:9876; #集群方式
name-server: 192.168.40.128:9876; #单机方式
producer:
group: bxc_producer
@Data
public class JmsMsg {
private int code;
private String msg;
public JmsMsg() { }
public JmsMsg(int code, String msg) {
this.code = code;
this.msg = msg;
}
}
@Slf4j
@Component
@RocketMQMessageListener(topic = "simpletopic", consumerGroup = "simpletopic")
public class JmsConsumer implements RocketMQListener<JmsMsg> {
@SneakyThrows
@Override
public void onMessage(JmsMsg jmsMsg) {
log.info("消费者-->"+jmsMsg.toString());
}
}
consumerGroup 为消费者组,同一组的消费者会均摊消息,每个组都会受到消息。
@Component
public class JmsProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public boolean send(JmsMsg msg){
try{
rocketMQTemplate.convertAndSend("simpletopic", msg);
return true;
}catch (Exception e){
e.printStackTrace();
}
return false;
}
}
JmsMsg jmsMsg = new JmsMsg(200,"test");
Message message = new Message("simpletopic","tag", JSONObject.toJSONString(msg).getBytes());
SendResult send = rocketMQTemplate.getProducer().send(message);
if (send.getSendStatus() == SendStatus.SEND_OK){
System.out.println("发送成功!");
}
rocketMQTemplate.getProducer().send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
if (sendResult.getSendStatus() == SendStatus.SEND_OK){
System.out.println("发送成功!");
}
}
@Override
public void onException(Throwable throwable) {
System.out.println("发送失败!+"+throwable.toString());
}
});
rocketMQTemplate.sendOneWay("simpletopic",jmsMsg);
//或者
rocketMQTemplate.getProducer().sendOneway(message);
Message message = new Message("simpletopic","tag",JSONObject.toJSONString(jmsMsg).getBytes());
//messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
//4 指的是30s
message.setDelayTimeLevel(2);
//延时发送
rocketMQTemplate.getProducer().send(message);
RocketMQ 默认消息同时的,但是在有些场景需要队列中的消息有顺序性,比如:发送了一条添加数据的消息,然后接着发送了一条更改数据的消息,此时肯定需要添加数据的消息先被消费,在rocketmq中可以指定消费者的consumeMode,其有两个属性CONCURRENTLY、ORDERLY,ORDERLY便是让一个队列中的数据有顺序的消费,先进先出。
RocketMQ 一个Topic其实是分成了4个队列,所以在消息投递时,需要顺序的消息,要放在同一个队列中。
@Slf4j @Component @RocketMQMessageListener(topic = "bxctopic", consumerGroup = "bxctopic1",consumeMode = ConsumeMode.ORDERLY) public class JmsOrderConsumer implements RocketMQListener<MessageExt> { @Override public void onMessage(MessageExt msg) { try { int time = (int)(Math.random()*5+1)*100; Thread.sleep(time); log.info("消费者2-->"+ Thread.currentThread().getName() + "," + "队列" + msg.getQueueId() + msg.getQueueId()+",内容"+ new String(msg.getBody())+" "+time); } catch (Exception e) { e.printStackTrace(); } } }
@RestController public class OrderMsg { @Autowired private RocketMQTemplate rocketMQTemplate; /* 根据code选择队列存放 */ class TopicMessageQueueSelector implements MessageQueueSelector{ @Override public MessageQueue select(List<MessageQueue> mqs, Message message, Object o) { int orderId = (int) o; long index = orderId % mqs.size(); return mqs.get((int) index); } } @GetMapping("/OrderQueue") public String sendMsg() throws InterruptedException, RemotingException, MQClientException, MQBrokerException { JmsMsg msg1 = new JmsMsg(200, "AAAAAAAA"); Message message1 = new Message("bxctopic", JSONObject.toJSONString(msg1).getBytes()); rocketMQTemplate.getProducer().send(message1,new TopicMessageQueueSelector(),msg1.getCode()); JmsMsg msg2 = new JmsMsg(200, "BBBBB"); Message message2 = new Message("bxctopic", JSONObject.toJSONString(msg2).getBytes()); rocketMQTemplate.getProducer().send(message2,new TopicMessageQueueSelector(),msg2.getCode()); JmsMsg msg3 = new JmsMsg(200, "CCCCC"); Message message3 = new Message("bxctopic", JSONObject.toJSONString(msg3).getBytes()); rocketMQTemplate.getProducer().send(message3,new TopicMessageQueueSelector(),msg3.getCode()); JmsMsg msg4 = new JmsMsg(201, "AAAAAAAA"); Message message4 = new Message("bxctopic", JSONObject.toJSONString(msg4).getBytes()); rocketMQTemplate.getProducer().send(message4,new TopicMessageQueueSelector(),msg4.getCode()); JmsMsg msg5 = new JmsMsg(201, "BBBBB"); Message message5 = new Message("bxctopic", JSONObject.toJSONString(msg5).getBytes()); rocketMQTemplate.getProducer().send(message5,new TopicMessageQueueSelector(),msg5.getCode()); JmsMsg msg6 = new JmsMsg(201, "CCCCC"); Message message6 = new Message("bxctopic", JSONObject.toJSONString(msg6).getBytes()); rocketMQTemplate.getProducer().send(message6,new TopicMessageQueueSelector(),msg6.getCode()); return "success"; } }
如果消费者的模式为:consumeMode = ConsumeMode.CONCURRENTLY
效果如下,缺失了顺序性。
RocketMQ的事物消息,在一定情况下可以解决分布式事物问题,在发起方发出消息后,由于突发状况发生错误引起数据回滚,但消费方不知道发起方状况无法回滚,导致数据不一致情况,正好RocketMQ的事物消息可以解决这一问题,在发起方发送消息时,消息不能立即被消费者消费,等待本地事物提交后,再交给消费者。
<dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-boot-starter</artifactId> <version>3.3.2</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> <version>1.1.6</version> </dependency>
spring:
datasource:
url: jdbc:mysql://192.168.40.130:3306/db1?useUnicode=true&characterEncoding=utf8
username: root
password: root
type: com.alibaba.druid.pool.DruidDataSource
mybatis-plus:
# mapper-locations: classpath:mapper/xml/*.xml
# type-aliases-package: com.bxc.eurekaprovide.Entity
configuration:
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
map-underscore-to-camel-case: true
@Data
@TableName("user")
public class UserEntity {
@TableId(value = "id",type = IdType.AUTO)
private Integer id;
private String name;
private int age;
public UserEntity(String name, int age) {
this.name = name;
this.age = age;
}
}
@Mapper
@Component
public interface UserMapper extends BaseMapper<UserEntity> {
}
@Component
@RocketMQMessageListener(topic = "TBxcTopic", consumerGroup = "TBxcTopic")
public class TxConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String msg) {
UserEntity userEntity = JSONObject.parseObject(msg, UserEntity.class);
System.out.println("事物队列--> 消息-->"+userEntity.toString());
}
}
@Service public class TransationalUtils { @Autowired public DataSourceTransactionManager transactionManager; public TransactionStatus begin() { TransactionStatus transaction = transactionManager.getTransaction(new DefaultTransactionAttribute()); return transaction; } public void commit(TransactionStatus transaction) { transactionManager.commit(transaction); } public void rollback(TransactionStatus transaction) { transactionManager.rollback(transaction); } }
@Slf4j @Component @RocketMQTransactionListener(txProducerGroup = "bxcProducer") public class SyncProducerListener implements RocketMQLocalTransactionListener { @Autowired private TransationalUtils transationalUtils; @Autowired private UserMapper userMapper; @Override public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) { TransactionStatus beginStatus = null; try { beginStatus = transationalUtils.begin(); MessageHeaders headers = message.getHeaders(); String msg = (String) headers.get("msg"); if (StringUtils.isEmpty(msg)) { return RocketMQLocalTransactionState.ROLLBACK; } UserEntity user = JSONObject.parseObject(msg, UserEntity.class); int result = userMapper.insert(user); if (result > 0) { transationalUtils.commit(beginStatus); return RocketMQLocalTransactionState.COMMIT; } } catch (Exception e) { e.printStackTrace(); if (beginStatus != null) { transationalUtils.rollback(beginStatus); } } return RocketMQLocalTransactionState.ROLLBACK; } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message message) { log.info("检查触发"); MessageHeaders headers = message.getHeaders(); String msg = (String) headers.get("msg"); if (StringUtils.isEmpty(msg)) { return RocketMQLocalTransactionState.ROLLBACK; } UserEntity userEntity = JSONObject.parseObject(msg, UserEntity.class); Integer id = userEntity.getId(); UserEntity user = userMapper.selectById(id); if (user != null && user.getId() != null && user.getId() > 0) { return RocketMQLocalTransactionState.COMMIT; } return RocketMQLocalTransactionState.UNKNOWN; } }
其中executeLocalTransaction方法,会在消息发出时触发,如果返回RocketMQLocalTransactionState.COMMIT 则该消息会被消费者消费,如果返回RocketMQLocalTransactionState.ROLLBACK 则消息不会被消费。
checkLocalTransaction方法,是executeLocalTransaction方法返回的既不提交也不回滚时,或者在executeLocalTransaction方法执行中服务突然停掉了RocketMQ没收到返回,这种情况下RocketMq会定期触发checkLocalTransaction方法询问数据是否已保存,如果返回RocketMQLocalTransactionState.UNKNOWN则表明不清楚,会继续询问。
@RestController public class TController { @Autowired RocketMQTemplate template; @GetMapping("TGetTest") public String TGetTest() throws MQClientException { UserEntity userEntity = new UserEntity("testname",18); String user = JSONObject.toJSONString(userEntity); Message message = MessageBuilder.withPayload(user) .setHeader("msg",user) .build(); template.sendMessageInTransaction("bxcProducer","TBxcTopic",message,null); return "success"; } }
在消费者,消费过程中,如果消息处理不当,需求再次消费该消息时,RocketMQ的消息重试机制便可解决,当消费者消费时,如果抛出了异常,则会重新再次投递给该消费者。
@Slf4j
@Component
@RocketMQMessageListener(topic = "simpletopic", consumerGroup = "simpletopic")
public class JmsConsumer implements RocketMQListener<JmsMsg> {
@SneakyThrows
@Override
public void onMessage(JmsMsg jmsMsg) {
log.info("消费者1-->" + jmsMsg.toString());
#抛出异常、重新获取消息
throw new Exception("abc");
}
}
但消息不是一直就可以重新投递的,当重试次数达到默认的16次后(可以通过配置文件修改)如果对应的消息还没被成功消费的话,该消息就会投递到DLQ死信队列。
可以在控制台Topic列表中看到“DLQ”相关的Topic,默认命名是:%RETRY%消费组名称(重试Topic)%DLQ%消费组名称(死信Topic)。
死信队列也可以被订阅和消费,并且也会过期。有效期与正常消息相同,均为 3 天,3 天后会被自动删除。因此,请在死信消息产生后的 3天内及时处理。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。