赞
踩
前面写过一片文章使用rocketmq-client整合RocketMQ的,这篇文章也不讲这些理论,理论还是前往RocketMQ消息类型或者其他往期文章,本文就如标题,纯粹的操一下rocketmq-spring-boot-starter这个玩意!
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>
这里就不能单纯使用rocketmq-client了,有很多API是rocketmq-spring-boot-starter提供的,虽然底层还是调用的rocketmq-client,下文会介绍!
@Setter
@Getter
@Accessors(chain = true)
@AllArgsConstructor
@NoArgsConstructor
public class MsgTest {
private int id;
private String context;
private Date date;
}
同步消息
同步消息也就这些API,简单讲解一下!
//发送普通同步消息-Object
syncSend(String destination, Object payload)
//发送普通同步消息-Message
syncSend(String destination, Message<?> message)
//发送批量普通同步消息
syncSend(String destination, Collection<T> messages)
//发送普通同步消息-Object,并设置发送超时时间
syncSend(String destination, Object payload, long timeout)
//发送普通同步消息-Message,并设置发送超时时间
syncSend(String destination, Message<?> message, long timeout)
//发送批量普通同步消息,并设置发送超时时间
syncSend(String destination, Collection<T> messages, long timeout)
//发送普通同步延迟消息,并设置超时,这个下文会演示
syncSend(String destination, Message<?> message, long timeout, int delayLevel)
/** * 同步消息- */ @Test void syncSendStr() { //syncSend和send是等价的 rocketMQTemplate.syncSend("first-topic-str", "hello world test1"); //send底层还是会调用syncSend的代码 rocketMQTemplate.send("first-topic-str", MessageBuilder.withPayload("hello world test1").build()); SendResult res = rocketMQTemplate.syncSend("first-topic-str:tag1", "hello world test2"); log.info("syncSend===>{}",res); } /** * 同步消息- */ @Test void syncSendPojo() { MsgTest msg = new MsgTest(1,"hello world test3",new Date()); SendResult res = rocketMQTemplate.syncSend("first-topic-pojo", MessageBuilder.withPayload(msg).build()); log.info("syncSend===>{}",res); }
这里存在两种消息体,一种是Object的,另一种是Message<?>的形式的,其实我们发送Object的时候,底层是有帮我们做转换的,其实和我们在上层调用
MessageBuilder.withPayload("hello world test1").build()
是一样的!源码如下
异步消息
//发送普通异步消息-Object
asyncSend(String destination, Object payload, SendCallback sendCallback)
//发送普通异步消息-Message
asyncSend(String destination, Message<?> message, SendCallback sendCallback)
//发送普通异步消息-Object,并设置发送超时时间
asyncSend(String destination, Object payload, SendCallback sendCallback, long timeout)
//发送普通异步消息-Message,并设置发送超时时间
asyncSend(String destination, Message<?> message, SendCallback sendCallback, long timeout)
//发送普通异步延迟消息,并设置超时,这个下文会演示
asyncSend(String destination, Message<?> message, SendCallback sendCallback, long timeout,
int delayLevel)
/** * 异步消息-String * 指发送方发出数据后,不等接收方发回响应,接着发送下个数据包 * 关键实现异步发送回调接口(SendCallback) * 在执行消息的异步发送时应用不需要等待服务器响应即可直接返回,通过回调接口接收务器响应,并对服务器的响应结果进行处理 * 这种方式任然需要返回发送消息任务的执行结果,异步不影响后续任务,不会造成阻塞 */ @Test void asyncSendStr() { rocketMQTemplate.asyncSend("first-topic-str:tag1", "hello world test2 asyncSendStr", new SendCallback() { @Override public void onSuccess(SendResult sendResult) { log.info("异步消息发送成功:{}",sendResult); } @Override public void onException(Throwable throwable) { log.info("异步消息发送失败:{}",throwable.getMessage()); } }); }
单向消息
这里普通单向消息就只有两个操作空间,这个不用多说了,一个是Object,另一个是Message
/**
* 单向消息
* 特点为只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答
* 此方式发送消息的过程耗时非常短,一般在微秒级别
* 应用场景:适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集
*/
@Test
void sendOneWayStr() {
rocketMQTemplate.sendOneWay("first-topic-str:tag1", "hello world test2 sendOneWayStr");
log.info("单向消息已发送");
}
/**
* 批量消息
*/
@Test
void asyncSendBatch() {
Message<String> msg = MessageBuilder.withPayload("hello world test1").build();
List<Message> msgList = Arrays.asList(msg,msg,msg,msg,msg);
SendResult res = rocketMQTemplate.syncSend("first-topic-str:tag1", msgList);
log.info("批量消息");
}
同步延迟消息
/** * 同步延迟消息 * rocketMQ的延迟消息发送其实是已发送就已经到broker端了,然后消费端会延迟收到消息。 * RocketMQ 目前只支持固定精度的定时消息。 * 固定等级:1到18分别对应1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h * 延迟的底层方法是用定时任务实现的。 */ @Test void syncSendDelayedStr() { Message<String> message= MessageBuilder.withPayload("syncSendDelayedStr"+new Date()).build(); /** * @param destination formats: `topicName:tags` * @param message 消息体 * @param timeout 发送超时时间 * @param delayLevel 延迟级别 1到18 * @return {@link SendResult} */ SendResult res=rocketMQTemplate.syncSend("first-topic-str:tag1", message, 3000, 3); log.info("res==>{}",res); }
异步延迟消息
/** * 异步延迟消息 */ @Test void asyncSendDelayedStr() { //Callback SendCallback sc=new SendCallback() { @Override public void onSuccess(SendResult sendResult) { log.info("发送异步延时消息成功"); } @Override public void onException(Throwable throwable) { log.info("发送异步延时消息失败:{}",throwable.getMessage()); } }; Message<String> message= MessageBuilder.withPayload("asyncSendDelayedStr").build(); rocketMQTemplate.asyncSend("first-topic-str:tag1", message, sc, 3000, 3); }
理论铺垫请看RocketMQ顺序消息,这里使用rocketmq-spring-boot-starter发送顺序消息就比较方便了,不像使用rocket-client那样,需要手动获取RocketMQ中当前topic的队列个数然后再通过hashKey值,mqs.size()取模,得到一个索引值,这里底层都帮我们做好了处理!
/** * 顺序消息 */ @Test void SendOrderStr() { List<MsgTest> msgList = new ArrayList<>(); for (int i = 0; i < 100; i++) { msgList.add(new MsgTest(100, "我是id为100的第"+(i+1)+"条消息", new Date())); } //msgList.add(new MsgTest(1, "我是id为1的第1条消息", new Date())); //msgList.add(new MsgTest(2, "我是id为2的第1条消息", new Date())); //msgList.add(new MsgTest(1, "我是id为1的第2条消息", new Date())); //msgList.add(new MsgTest(1, "我是id为1的第3条消息", new Date())); //msgList.add(new MsgTest(2, "我是id为2的第2条消息", new Date())); //msgList.add(new MsgTest(2, "我是id为2的第3条消息", new Date())); //msgList.add(new MsgTest(2, "我是id为2的第4条消息", new Date())); //msgList.add(new MsgTest(2, "我是id为2的第5条消息", new Date())); //msgList.add(new MsgTest(2, "我是id为2的第6条消息", new Date())); //msgList.add(new MsgTest(2, "我是id为2的第7条消息", new Date())); //msgList.add(new MsgTest(1, "我是id为1的第4条消息", new Date())); //msgList.add(new MsgTest(1, "我是id为1的第5条消息", new Date())); //msgList.add(new MsgTest(1, "我是id为1的第6条消息", new Date())); //msgList.add(new MsgTest(1, "我是id为1的第7条消息", new Date())); msgList.forEach(t ->{ //rocketMQTemplate.sendOneWayOrderly("first-topic-str:tag1", t,String.valueOf(t.getId())); //rocketMQTemplate.syncSendOrderly("first-topic-str:tag1", t, String.valueOf(t.getId())); rocketMQTemplate.asyncSendOrderly("first-topic-str:tag1", t,String.valueOf(t.getId()), new SendCallback() { @Override public void onSuccess(SendResult sendResult) { log.info("异步消息发送成功:{}", sendResult); } @Override public void onException(Throwable throwable) { log.info("异步消息发送失败:{}", throwable.getMessage()); } }); }); }
使用for循环100条数据,或者使用注释掉的代码其实都是一样的,说明一下使用for循环100是确定id一致的时候,通过hashKey会被分配到同一个队列中,如下
上面代码共测试了三总类型,同步,异步,单向,但是异步,单向好像顺序还是有问题,但是查看了数据,发现数据确实是在分派到一个队列,
至于原因,这个放在RocketMQ顺序消息这篇文章中统一讲!
消费者
/** * 事务消息 注意这里还有一个监听器 TransactionListenerImpl */ @Test void sendTransactionStr() { String[] tags = {"TAGA", "TAGB", "TAGC"}; for (int i = 0; i < 3; i++) { Message<String> message=MessageBuilder.withPayload("事务消息===>"+i).build(); TransactionSendResult res=rocketMQTemplate.sendMessageInTransaction("transaction-str:"+tags[i], message, i+1); if(res.getLocalTransactionState().equals(LocalTransactionState.COMMIT_MESSAGE)&&res.getSendStatus().equals(SendStatus.SEND_OK)){ log.info("事物消息发送成功"); } log.info("事物消息发送结果:{}",res); } }
事务消息生产者端的消息监听器
@Slf4j @Component @RocketMQTransactionListener public class TransactionListenerImpl implements RocketMQLocalTransactionListener { @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { // 执行本地事务 String tag = String.valueOf(msg.getHeaders().get("rocketmq_TAGS")); if (StringUtils.equals("TAGA", tag)){ //这里只讲TAGA消息提交,状态为可执行 return RocketMQLocalTransactionState.COMMIT; }else if (StringUtils.equals("TAGB", tag)) { return RocketMQLocalTransactionState.ROLLBACK; } else if (StringUtils.equals("TAGC",tag)) { return RocketMQLocalTransactionState.UNKNOWN; } return RocketMQLocalTransactionState.UNKNOWN; } //mq回调检查本地事务执行情况 @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { log.info("checkLocalTransaction===>{}",msg); return RocketMQLocalTransactionState.COMMIT; } }
消费者
/** * @description: 事务消息消费者 * @author TAO * @date 2021/12/28 12:33 上午 */ @Slf4j @Component @RocketMQMessageListener(consumerGroup = "transaction-group", topic = "transaction-str",consumeMode = ConsumeMode.ORDERLY) public class TransactionConsumer implements RocketMQListener<String> { @Override public void onMessage(String str) { log.info("===>"+str); } }
注意
我这里使用的rocketmq-spring-boot-starter版本时2.1.0,和老版本的写法是不同的,关于这部分可以查看rocketmq-spring-boot-starter 2.1.0 事务消息 txProducerGroup 移除解读
上述API中带了超时时间的是总的耗时(包含重试n次的耗时) < sendMsgTimeout(发送消息时传入的参数)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。