赞
踩
RocketMQ的消息类型有以下三种:
普通消息
顺序消息
事务消息
目录
一、普通消息
二、顺序消息
三、事务消息
3.1 发送半事务消息
3.2 执行本地事务
3.3 消息回查
下边分别介绍三种类型消息的使用场景,以及使用示例。
一、普通消息
对于普通消息,RocketMQ提供了三种发送方式:可靠同步发送、可靠异步发送和单向发送。
可靠同步发送
同步发送是指消息发送方发出数据后,会在收到接收方发回响应之后才会发送下一个数据包的通讯方式。
此种方式应用场景非常广泛,例如重要通知邮件、报名短信通知、营销短信系统等。
可靠异步发送
异步发送是指发送方发出数据后,不等接收方发回响应,接着发送下一个数据包的通讯方式。发送方通过回调接口接收服务器响应,并对响应结果进行处理。
单向发送
单向发送是指发送方只负责发送消息,不等待服务器回应,且没有回调函数触发。即只发送请求而不管响应。
适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。
下面通过springboot单元测试进行普通消息的使用示例:
第一步:添加依赖
<!--单元测试-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
第二步:编写测试方法
package cn.jack.mq;
import cn.jack.OrderApplication;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = OrderApplication.class)
public class MessageTypeTest {
@Autowired
private RocketMQTemplate rocketMQTemplate;
// 同步消息
@Test
public void testSyncSend() {
// 参数一:topic,如果想添加tag,可以使用"topic:tag"的写法
// 参数二:消息内容
SendResult result = this.rocketMQTemplate.syncSend("test-topic-1:test-tag", "同步消息测试");
System.out.println(result);
}
// 异步发送
@Test
public void testAsyncSend() {
this.rocketMQTemplate.asyncSend("test-topic-1", "异步消息测试", new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println(sendResult);
}
@Override
public void onException(Throwable throwable) {
System.out.println("消息发送异常");
throwable.printStackTrace();
}
});
System.out.println("=============================");
}
// 单向消息
@Test
public void testOneWay() {
this.rocketMQTemplate.sendOneWay("test-topic-1", "单向消息测试");
}
}
二、顺序消息
RocketMQ采用的消息队列,本身就有先进先出的特性。为什么还要有顺序消息呢?
这是因为一般情况下,每个主题(topic)都会有多个消息队列(message queue),假设投递了同一个主题的十条消息,那么这十条消息会分散在不同的队列中。对于消费者而言,每个消息队列是等价的,就不能确保消息总体的顺序。而顺序消息的方案就是把这十条消息都投递到同一个消息队列中。
顺序消息的发送方式,和普通消息差不多。同样是三种:同步发送、异步发送、单向发送。
// 单向顺序消息
@Test
public void testOneWayOrderly() {
for (int i=0; i<10; i++) {
// 通过第三个参数的hash值,决定消息投递到的队列
this.rocketMQTemplate.sendOneWayOrderly("test-topic-1", "单向消息测试", "hashKey");
}
}
三、事务消息
RocketMQ提供了事务消息,通过事务消息就能达到分布式事务的最终一致。
事务消息交互流程:
两个概念:
半事务消息:暂不能投递的消息,发送方已经成功地将消息发送到了RocketMQ服务端,但是服务端未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半事务消息。
消息回查:由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,RocketMQ服务端通过扫描发现某条消息长期处于“半事务消息”时,需要主动向消息生产者询问该消息的最终状态(Commit 或者 Rollback),该询问过程即消息回查。
下面通过商品下单来演示事务消息:只有商品成功下单(订单成功保存),才“发送”消息通知用户微服务发送短信。
3.1 发送半事务消息
调用createOrderBefore()方法发送半事务消息,此步骤对应交互流程图中的 1.发送事务消息
package cn.jack.service.impl;
import cn.jack.dao.OrderDao;
import cn.jack.dao.TxLogDao;
import cn.jack.domain.Order;
import cn.jack.domain.TxLog;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.Date;
import java.util.UUID;
@Service
public class OrderServiceImpl4 {
@Autowired
private OrderDao orderDao;
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Autowired
private TxLogDao txLogDao;
public void createOrderBefore(Order order) {
// 消息事务的id,放入消息头中
String txId = UUID.randomUUID().toString();
// 发送半事务消息
this.rocketMQTemplate.sendMessageInTransaction(
"jack_ts_topic",
MessageBuilder.withPayload(order).setHeader("txId", txId).build(),
order);
}
@Transactional
public void createOrder(Order order, String txId) {
// 保存订单
this.orderDao.save(order);
// 保存消息事务id。后续消息回查时,根据事务id是否存在,判断订单是否保存成功
TxLog txLog = new TxLog();
txLog.setCreateDate(new Date());
txLog.setTxId(txId);
this.txLogDao.save(txLog);
}
}
3.2 执行本地事务
executeLocalTransaction()方法,对应交互流程图中的 3.执行本地事务 以及 4.Commit或者Rollback
package cn.jack.service.impl;
import cn.jack.domain.Order;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Service;
@Service
@RocketMQTransactionListener
@Slf4j
public class OrderServiceImpl4Listener implements RocketMQLocalTransactionListener {
@Autowired
private OrderServiceImpl4 orderServiceImpl4;
/**
* 执行本地事务,根据返回状态进行消息的二次确认(消息是否投递)
* @param message
* @param o
* @return
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
String txId = message.getHeaders().get("txId", java.lang.String.class);
try {
// 本地事务:保存订单
Order order = (Order) o;
this.orderServiceImpl4.createOrder(order, txId);
// 订单保存成功,确认消息进行投递
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
log.error("订单保存异常", e);
return RocketMQLocalTransactionState.ROLLBACK;
}
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
return null;
}
}
3.3 消息回查
在checkLocalTransaction()方法中完成消息回查逻辑。此步骤对应交互流程图中的:
5.未收到4的确认时,回查事务状态
6.检查本地事务的状态
7.根据事务的状态Commit/Rollback
package cn.jack.service.impl;
import cn.jack.dao.TxLogDao;
import cn.jack.domain.Order;
import cn.jack.domain.TxLog;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Service;
import java.util.Optional;
@Service
@RocketMQTransactionListener
@Slf4j
public class OrderServiceImpl4Listener implements RocketMQLocalTransactionListener {
@Autowired
private OrderServiceImpl4 orderServiceImpl4;
@Autowired
private TxLogDao txLogDao;
/**
* 执行本地事务,根据返回状态进行消息的二次确认(消息是否投递)
* @param message
* @param o
* @return
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
String txId = message.getHeaders().get("txId", java.lang.String.class);
try {
// 本地事务:保存订单
Order order = (Order) o;
this.orderServiceImpl4.createOrder(order, txId);
// 订单保存成功,确认消息进行投递
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
log.error("订单保存异常", e);
return RocketMQLocalTransactionState.ROLLBACK;
}
}
/**
* 消息回查。从消息的头信息中获取事务id,如果根据id查询到事务数据,证明订单保存成功,二次确认投递消息。
* @param message
* @return
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
String txId = message.getHeaders().get("txId", java.lang.String.class);
Optional<TxLog> op = this.txLogDao.findById(txId);
if (op.equals(Optional.empty())) {
return RocketMQLocalTransactionState.ROLLBACK;
} else {
return RocketMQLocalTransactionState.COMMIT;
}
}
}
总结:RocketMQ的事务消息可以使分布式事务达到一致性。本地事务成功提交后,RocketMQ才进行消息的投递。而消息的消费由RocketMQ进行保证。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。