赞
踩
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.4</version>
</dependency>
rocketmq:
name-server: 192.168.171.128:9876
producer:
group: boot-product
rocketmq:
name-server: 192.168.171.128:9876
package com.xiaoge;
import com.xiaoge.util.OrderStep;
import com.xiaoge.util.OrderUtil;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.context.junit4.SpringRunner;
import java.sql.SQLOutput;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* TODO
*
* @author <a href="mailto:1330137071@qq.com">Zhang Xiao</a>
* @since
*/
@SpringBootTest(classes = RocketMQApplication.class)
@RunWith(SpringRunner.class)
public class TestApplication {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 发送同步消息
* @throws Exception
*/
@Test
public void sendMessage() throws Exception {
Message msg = MessageBuilder.withPayload("springboot发送同步消息").build();
rocketMQTemplate.send("helloBoot", msg);
}
}
package com.xiaoge;
import com.xiaoge.util.OrderStep;
import com.xiaoge.util.OrderUtil;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.context.junit4.SpringRunner;
import java.sql.SQLOutput;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* TODO
*
* @author <a href="mailto:1330137071@qq.com">Zhang Xiao</a>
* @since
*/
@SpringBootTest(classes = RocketMQApplication.class)
@RunWith(SpringRunner.class)
public class TestApplication {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 发送异步消息
* @throws Exception
*/
@Test
public void sendAsyncMessage() throws Exception {
System.out.println("发送前");
rocketMQTemplate
.asyncSend("helloBoot", "springboot发送异步消息", new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("发送状态:"+sendResult.getSendStatus());
}
@Override
public void onException(Throwable throwable) {
System.out.println("消息发送失败!");
}
});
System.out.println("发送完毕");
TimeUnit.SECONDS.sleep(5);
}
}
package com.xiaoge;
import com.xiaoge.util.OrderStep;
import com.xiaoge.util.OrderUtil;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.context.junit4.SpringRunner;
import java.sql.SQLOutput;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* TODO
*
* @author <a href="mailto:1330137071@qq.com">Zhang Xiao</a>
* @since
*/
@SpringBootTest(classes = RocketMQApplication.class)
@RunWith(SpringRunner.class)
public class TestApplication {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 发送单向消息
* @throws Exception
*/
@Test
public void sendOneWayMessage() throws Exception {
rocketMQTemplate
.sendOneWay("helloBoot", "springboot发送单向消息");
}
}
package com.xiaoge.listener;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import java.nio.charset.Charset;
import java.util.Date;
/**
* TODO
*
* @author <a href="mailto:1330137071@qq.com">Zhang Xiao</a>
* @since
*/
@Component
@RocketMQMessageListener(consumerGroup = "boot-consumer", topic = "helloBoot")
public class HelloBootListener implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
System.out.println("收到的消息: " + new String(messageExt.getBody(), Charset.defaultCharset()));
}
}
package com.xiaoge.listener;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import java.nio.charset.Charset;
import java.util.Date;
/**
* TODO
* 集群 @RocketMQMessageListener(consumerGroup = "boot-consumer", topic = "helloBoot", messageModel = MessageModel.CLUSTERING)
* 广播 @RocketMQMessageListener(consumerGroup = "boot-consumer", topic = "helloBoot", messageModel = MessageModel.BROADCASTING)
*
* @author <a href="mailto:1330137071@qq.com">Zhang Xiao</a>
* @since
*/
@Component
// todo MessageModel.CLUSTERING(集群-->默认集群) MessageModel.BROADCASTING(广播)
@RocketMQMessageListener(consumerGroup = "boot-consumer", topic = "helloBoot", messageModel = MessageModel.CLUSTERING)
public class HelloBootListener implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
System.out.println("收到的消息: " + new String(messageExt.getBody(), Charset.defaultCharset()) + " " + new Date());
}
}
package com.xiaoge.listener;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import java.nio.charset.Charset;
import java.util.Date;
/**
* TODO
* 集群 @RocketMQMessageListener(consumerGroup = "boot-consumer", topic = "helloBoot", messageModel = MessageModel.CLUSTERING)
* 广播 @RocketMQMessageListener(consumerGroup = "boot-consumer", topic = "helloBoot", messageModel = MessageModel.BROADCASTING)
*
* @author <a href="mailto:1330137071@qq.com">Zhang Xiao</a>
* @since
*/
@Component
// todo MessageModel.CLUSTERING(集群-->默认集群) MessageModel.BROADCASTING(广播)
@RocketMQMessageListener(consumerGroup = "boot-consumer", topic = "helloBoot", messageModel = MessageModel.BROADCASTING)
public class HelloBootListener implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
System.out.println("收到的消息: " + new String(messageExt.getBody(), Charset.defaultCharset()) + " " + new Date());
}
}
package com.xiaoge;
import com.xiaoge.util.OrderStep;
import com.xiaoge.util.OrderUtil;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.context.junit4.SpringRunner;
import java.sql.SQLOutput;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* TODO
*
* @author <a href="mailto:1330137071@qq.com">Zhang Xiao</a>
* @since
*/
@SpringBootTest(classes = RocketMQApplication.class)
@RunWith(SpringRunner.class)
public class TestApplication {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 发送10单向消息, 消费者默认是集群的, 所以默认有负载
* @throws Exception
*/
@Test
public void send10OneWayMessage() throws Exception {
for (int i = 0; i < 10; i++) {
rocketMQTemplate
.sendOneWay("helloBoot", "springboot发送单向消息" + i);
}
}
}
package com.xiaoge.util;
import lombok.Data;
import lombok.ToString;
/**
* TODO
*
* @author <a href="mailto:1330137071@qq.com">Zhang Xiao</a>
* @since
*/
@Data
@ToString
public class OrderStep {
private long orderId;
private String desc;
}
package com.xiaoge.util;
import java.util.ArrayList;
import java.util.List;
public class OrderUtil {
/**
* 生成模拟订单数据
*/
public static <T> List<T> buildOrders() {
List<OrderStep> orderList = new ArrayList();
OrderStep orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("推送");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
return (List<T>) orderList;
}
}
package com.xiaoge;
import com.xiaoge.util.OrderStep;
import com.xiaoge.util.OrderUtil;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.context.junit4.SpringRunner;
import java.sql.SQLOutput;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* TODO
*
* @author <a href="mailto:1330137071@qq.com">Zhang Xiao</a>
* @since
*/
@SpringBootTest(classes = RocketMQApplication.class)
@RunWith(SpringRunner.class)
public class TestApplication {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 发送顺序消息
* @throws Exception
*/
@Test
public void sendOrderlyMessage() throws Exception {
// 设置队列选择器
rocketMQTemplate.setMessageQueueSelector(new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> list, org.apache.rocketmq.common.message.Message message, Object o) {
String orderStr = (String) o;
Long orderId = Long.valueOf(orderStr);
int index = (int)(orderId % list.size());
// 获取选择的队列返回
return list.get(index);
}
});
List<OrderStep> orderSteps = OrderUtil.buildOrders();
// String.valueOf(orderStep.getOrderId())就是你用来决定去拉格队列的值, 它会传递给上面的Object o
orderSteps.forEach(orderStep -> {
rocketMQTemplate.sendOneWayOrderly("orderlyTopicBoot", orderStep.toString(), String.valueOf(orderStep.getOrderId()));
});
}
}
package com.xiaoge.listener;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import java.nio.charset.Charset;
/**
* TODO 顺序消息的消费
* consumeMode = ConsumeMode.ORDERLY 表示 一个队列对应一个线程
*
* @author <a href="mailto:1330137071@qq.com">Zhang Xiao</a>
* @since
*/
@Component
@RocketMQMessageListener(consumerGroup = "orderly-consumer", topic = "orderlyTopicBoot", consumeMode = ConsumeMode.ORDERLY)
public class OrderlyTopicListener implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
System.out.println("收到的消息: " + new String(messageExt.getBody(), Charset.defaultCharset()));
}
}
package com.xiaoge;
import com.xiaoge.util.OrderStep;
import com.xiaoge.util.OrderUtil;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.context.junit4.SpringRunner;
import java.sql.SQLOutput;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* TODO
*
* @author <a href="mailto:1330137071@qq.com">Zhang Xiao</a>
* @since
*/
@SpringBootTest(classes = RocketMQApplication.class)
@RunWith(SpringRunner.class)
public class TestApplication {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 发送延时消息
* @throws Exception
*/
@Test
public void sendDelayMessage() throws Exception {
Message msg = MessageBuilder.withPayload("springboot发送延时消息" + new Date()).build();
// 发送超时时间 3000(3秒钟) rocketmq超时等级3
rocketMQTemplate.syncSend("helloBoot", msg, 3000, 3);
}
}
package com.xiaoge.listener;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import java.nio.charset.Charset;
import java.util.Date;
/**
* TODO
*
* @author <a href="mailto:1330137071@qq.com">Zhang Xiao</a>
* @since
*/
@Component
@RocketMQMessageListener(consumerGroup = "boot-consumer", topic = "helloBoot")
public class HelloBootListener implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
System.out.println("收到的消息: " + new String(messageExt.getBody(), Charset.defaultCharset()) + " " + new Date());
}
}
package com.xiaoge;
import com.xiaoge.util.OrderStep;
import com.xiaoge.util.OrderUtil;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.context.junit4.SpringRunner;
import java.sql.SQLOutput;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* TODO
*
* @author <a href="mailto:1330137071@qq.com">Zhang Xiao</a>
* @since
*/
@SpringBootTest(classes = RocketMQApplication.class)
@RunWith(SpringRunner.class)
public class TestApplication {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 发送tag消息过滤
* todo tag只能跟topic写在一起用:分割
* @throws Exception
*/
@Test
public void sendTagFilterMessage() throws Exception {
Message msg1 = MessageBuilder.withPayload("消息A").build();
// todo 它的tag标签只能写在topic中用:分割, :左边是主题, :右边是tag, 所以我们在命令主题的时候不要带:
rocketMQTemplate.sendOneWay("tagFilterBoot:TagA", msg1);
Message msg2 = MessageBuilder.withPayload("消息B").build();
rocketMQTemplate.sendOneWay("tagFilterBoot:TagB", msg2);
Message msg3 = MessageBuilder.withPayload("消息C").build();
rocketMQTemplate.sendOneWay("tagFilterBoot:TagC", msg3);
}
}
package com.xiaoge.listener;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import java.nio.charset.Charset;
/**
* TODO
* 接收消息tag过滤, selectorExpression写过滤规则
*
* @author <a href="mailto:1330137071@qq.com">Zhang Xiao</a>
* @since
*/
@Component
@RocketMQMessageListener(consumerGroup = "tag-consumer", topic = "tagFilterBoot", selectorExpression = "TagA || TagC")
public class TagFilterListener implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt message) {
System.out.println("收到的消息: " + new String(message.getBody(), Charset.defaultCharset()));
}
}
enablePropertyFilter = true
package com.xiaoge;
import com.xiaoge.util.OrderStep;
import com.xiaoge.util.OrderUtil;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.context.junit4.SpringRunner;
import java.sql.SQLOutput;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* TODO
*
* @author <a href="mailto:1330137071@qq.com">Zhang Xiao</a>
* @since
*/
@SpringBootTest(classes = RocketMQApplication.class)
@RunWith(SpringRunner.class)
public class TestApplication {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 发送sql92消息过滤
* todo sql过滤属性只能写在setHeader里面, message没有对应的put
* @throws Exception
*/
@Test
public void sendSql92FilterMessage() throws Exception {
Message msg1 = MessageBuilder.withPayload("美女A, 年龄22, 体重45")
.setHeader("age", 22)
.setHeader("weight", 45)
.build();
rocketMQTemplate.sendOneWay("SQL92FilterBoot", msg1);
Message msg2 = MessageBuilder.withPayload("美女B, 年龄25, 体重60")
.setHeader("age", 25)
.setHeader("weight", 60)
.build();
rocketMQTemplate.sendOneWay("SQL92FilterBoot", msg2);
Message msg3 = MessageBuilder.withPayload("美女C, 年龄40, 体重70")
.setHeader("age", 40)
.setHeader("weight", 70)
.build();
rocketMQTemplate.sendOneWay("SQL92FilterBoot", msg3);
}
}
package com.xiaoge.listener;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import java.nio.charset.Charset;
/**
* TODO
* 接收消息SQL92过滤, selectorType = SelectorType.SQL92(使用SQL92的方式) selectorExpression = "age > 23 and weight > 60" (写过滤规则)
*
* @author <a href="mailto:1330137071@qq.com">Zhang Xiao</a>
* @since
*/
@Component
@RocketMQMessageListener(consumerGroup = "sql92-consumer", topic = "SQL92FilterBoot", selectorType = SelectorType.SQL92, selectorExpression = "age > 23 and weight > 60")
public class Sql92FilterListener implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt message) {
System.out.println("收到的消息: " + new String(message.getBody(), Charset.defaultCharset()));
}
}
package com.xiaoge.service.impl;
import com.xiaoge.domain.OperateIntergralVo;
import com.xiaoge.domain.OrderInfo;
import com.xiaoge.mapper.OrderInfoMapper;
import com.xiaoge.service.IOrderInfoService;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
/**
* TODO
*
* @author <a href="mailto:1330137071@qq.com">Zhang Xiao</a>
* @since
*/
@Service
public class OrderInfoServiceImpl implements IOrderInfoService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Autowired
private OrderInfoMapper orderInfoMapper;
@Override
public String refund(String orderNo) {
System.out.println("发送消息");
OrderInfo orderInfo = orderInfoMapper.select(orderNo);
OperateIntergralVo vo = new OperateIntergralVo();
vo.setUserId(orderInfo.getUserId());
vo.setValue(orderInfo.getIntergral());
// setHeader可以额外添加值
Message<OperateIntergralVo> message = MessageBuilder.withPayload(vo).setHeader("orderNo",orderNo).build();
//发送事务消息
TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction("tx_group", "tx_topic", message, orderNo);
System.out.println("发送的状态:"+result.getSendStatus());
System.out.println("本地事务执行结果:"+ result.getLocalTransactionState());
int i = 1/0;
if(LocalTransactionState.COMMIT_MESSAGE.equals(result.getLocalTransactionState())){
return "退款成功";
}else{
return "退款失败";
}
}
@Override
@Transactional
public void updateRefundStatus(String orderNo) {
orderInfoMapper.changeRefundStatus(orderNo,OrderInfo.STATUS_REFUND);
}
@Override
public OrderInfo find(String orderNo) {
return orderInfoMapper.select(orderNo);
}
}
发送事务消息
// tx_group 事务生产者
// tx_topic 主题
// message 消息
// orderNo 本地事务方法参数(注意mq调用本地事务方法会带这个参数, 回查方法不会带, 如果回查方法业务要这个参数, 那就放在消息setHeader里面)
TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction("tx_group", "tx_topic", message, orderNo);
正常监听消费, 只有事务状态为commit的时候, 半消息才会变成全消息, 这时候消费者监听才能监听到消息, 事务状态为rollback把mq中的半消息删除, 但是, 这里要注意, 有可能我们消费了, 告诉mq我们消费了这条消息时, 网络断了, 这样mq就会没收到回复, 走重试消息, 这样消费者继续消费这个消息, 这样重复消费了, 所以一定要保证幂等性。
package com.xiaoge.mq;
import com.xiaoge.domain.OperateIntergralVo;
import com.xiaoge.service.IIntegralService;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
@RocketMQMessageListener(consumerGroup = "consumer-order",topic = "tx_topic")
public class OrderInfoListener implements RocketMQListener<OperateIntergralVo> {
@Autowired
private IIntegralService iIntegralService;
@Override
public void onMessage(OperateIntergralVo message) {
System.out.println("12.消费消息");
iIntegralService.incrIntergral(message);
}
}
本地事务方法和回查方法 类必须实现这个RocketMQLocalTransactionListener监听
package com.xiaoge.mq;
import com.xiaoge.domain.OperateIntergralVo;
import com.xiaoge.domain.OrderInfo;
import com.xiaoge.service.IOrderInfoService;
import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import java.nio.charset.Charset;
@Component
@RocketMQTransactionListener(txProducerGroup = "tx_group")
public class OrderTransactionMessageListener implements RocketMQLocalTransactionListener {
@Autowired
private IOrderInfoService orderInfoService;
//执行本地事务
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
System.out.println("执行本地事务方法");
byte[] contents = (byte[]) message.getPayload();
String objStr = new String(contents, Charset.defaultCharset());
OperateIntergralVo vo = JSON.parseObject(objStr,OperateIntergralVo.class);
System.out.println(vo);
String orderNo = (String) o;
try{
//本地业务逻辑
orderInfoService.updateRefundStatus(orderNo);
System.out.println("执行成功");
// return RocketMQLocalTransactionState.UNKNOWN; 这里返回UNKNOWN, mq会调用回查方法
return RocketMQLocalTransactionState.COMMIT; // 把mq中的半消息变成全消息, 这样consumer才能正常消费
}catch(Exception e){
e.printStackTrace();
System.out.println("执行失败");
// 删除mq中的半消息
return RocketMQLocalTransactionState.ROLLBACK;
}
}
//回查方法
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
System.out.println("回查方法");
String orderNo = (String) message.getHeaders().get("orderNo");
OrderInfo orderInfo = orderInfoService.find(orderNo);
if(OrderInfo.STATUS_REFUND.equals(orderInfo.getStatus())){
//已经退款成功,说明本地事务执行成功
return RocketMQLocalTransactionState.COMMIT; // 把mq中的半消息变成全消息, 这样consumer才能正常消费
}else{
//说明本地事务执行失败.
return RocketMQLocalTransactionState.ROLLBACK; // 删除mq中的半消息
}
}
}
事务消息demo下载地址: https://download.csdn.net/download/zsx1314lovezyf/88347191
多个消费组监听同一个topic, 每个消费组都会读取获取该topic中的消息, 如果对应的消费组做了集群, 那么消息是分开发送的, 如果又有新的消费组监听该topic由于它的offset为0, 但是由于以前有消费组它们的offset为3的话, 新的消费组也会直接拿到0-3的消息, offset会跟老的消费组同步
广播模式的话没什么特别的, 都是一起发同样的消息, 不管消费组做没做集群
demo下载地址: https://download.csdn.net/download/zsx1314lovezyf/88282585
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。