赞
踩
RocketMQ 是一款高性能、高吞吐量、低延迟的消息中间件。由阿里出品,后来捐赠给开源组织Apache。一般用于流量削峰填谷、异步通信、系统之间异步解耦、顺序消息、定时消息、事务消息等场景。RocketMQ5.0还支持类使用kafka的流处理。
已有springboot项目且已经能启动;本文基于RocketMQ2.0.4;
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.4</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot</artifactId>
<version>2.2.2</version>
</dependency>
rocketmq:
name-server: 192.168.1.224:9876 # 自己的访问地址,如果是集群,集群之间可以使用;分割如ip:端口;ip:端口;ip:端口
producer:
group: test_procuct_group # 生产者组group,事务消息重试需要
send-message-timeout: 3000 # 消息发送超时时长,默认3s
retry-times-when-send-failed: 3 # 同步发送消息失败重试次数,默认2
retry-times-when-send-async-failed: 3 # 异步发送消息失败重试次数,默认2
@Component public class MQProducerService { // 建议同一类业务用一个TOPIC,不同的小类型可以用tag区分 private static final String TOPIC = "TEST_TOPIC"; private static final String TAG = "TEST_TAG"; //超时时间,重试需要 @Value("${rocketmq.producer.send-message-timeout}") private Integer messageTimeOut; @Autowired private RocketMQTemplate rocketMQTemplate; /** * 发送单向消息 * 只负责发送消息,不等待应答,不关心发送结果,如日志 */ public void sendOneWayMsg(String message) { rocketMQTemplate.sendOneWay(TOPIC, MessageBuilder.withPayload(message).build()); } /** * 普通发送 message可以是对象 */ public void send(String message) { // 包含tag,=,包含tag的方式都是 TOPIC+":"+TAG 格式 rocketMQTemplate.send(TOPIC + ":" + TAG, MessageBuilder.withPayload(message).build()); //不包含tag rocketMQTemplate.send(TOPIC, MessageBuilder.withPayload(message).build()); } /** * 发送同步消息 * message也可以是对象,sendResult为返回的发送结果 */ public SendResult sendMsg(String message) { // 等待结果返回,此处会阻塞直到结果返回 SendResult sendResult = rocketMQTemplate.syncSend(TOPIC, MessageBuilder.withPayload(message).build()); // 真实业务,请不要使用此种日志打印方式,会影响线上的性能 System.out.println(sendResult); return sendResult; } /** * 发送异步消息 * message也可以是对象,结果回调 */ public void sendAsyncMsg(String message) { rocketMQTemplate.asyncSend(TOPIC, MessageBuilder.withPayload(message).build(), new SendCallback() { @Override public void onSuccess(SendResult sendResult) { /* * 处理消息发送成功逻辑, * SendResult: 结果有两种状态,如果业务需要区分请判断 * / } @Override public void onException(Throwable throwable) { // 处理消息发送异常逻辑 } }); } /** * 发送延时消息 * delayLevel=0表示不延迟; * 延时消息一共分为18个等级分别为:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h */ public void sendDelayMsg(String message, int delayLevel) { rocketMQTemplate.syncSend(TOPIC, MessageBuilder.withPayload(message).build(), messageTimeOut, delayLevel); } }
/** * topic = 生产者的topic * selectorExpression 生产者的tag * consumerGroup 消费者组 */ @Slf4j @Service @RocketMQMessageListener(topic = "TEST_TOPIC", selectorExpression = "TAG", consumerGroup = "test_consumer_group") // 泛型为发送的消息类型 public class MQConsumerService implements RocketMQListener<String> { // 监听到消息就会执行此方法 @Override public void onMessage(String message) { log.info("监听到消息:message={}", message); } }
说明: RocketMQ批量发送实现方式主要通过自身提供的 批量接口。有两种方式, 一般使用DefaultMQProducer 发送。通过看源码得知,最终将List 包装为继承了Message的MessageBatch,由此推出使用RocketMQTemplate 发送参数传递为在限制内的MessageBatch也是可以实现批量发送的。下文采用DefaultMQProducer演示。
package com.yzd.rpa.server.web; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.springframework.stereotype.Component; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.common.message.Message; //导入 RocketMqListSplitter import java.nio.charset.StandardCharsets; import java.util.List; import java.util.stream.Collectors; @Component public class MQBatchProducerService { // 建议同一类业务用一个TOPIC,不同的小类型可以用tag区分 private static final String TOPIC = "TEST_TOPIC"; private static final String TAG = "TEST_TAG"; public void sendBatch(List<String> messages) { DefaultMQProducer defaultMQProducer = new DefaultMQProducer(); //自己的访问地址,如果是集群,集群之间可以使用;分割如ip:端口;ip:端口;ip:端口 defaultMQProducer.setNamesrvAddr("192.168.1.224:9876"); // 生产者组group defaultMQProducer.setProducerGroup("test_procuct_group"); // 消息超时时间 defaultMQProducer.setSendMsgTimeout(3000); //同步发送重试次数 defaultMQProducer.setRetryTimesWhenSendFailed(1); //异步发送重试次数 defaultMQProducer.setRetryTimesWhenSendAsyncFailed(1); //最大消息大小 默认4M defaultMQProducer.setMaxMessageSize(1024 * 1024 * 4); //消息压缩阈值 默认4k defaultMQProducer.setCompressMsgBodyOverHowmuch(1024 * 4); try { defaultMQProducer.start(); } catch (Exception e) { throw new RuntimeException("启动生产者失败!"); } //消息封装为 Message List<Message> messageList = messages.stream().map(item -> { Message message = new Message(); message.setTopic(TOPIC); message.setTags(TAG); message.setBody(item.getBytes(StandardCharsets.UTF_8)); return message; }).collect(Collectors.toList()); //发送消息 RocketMQ 最大默认大小4M RocketMqListSplitter rocketMqMapSplitter = new RocketMqListSplitter(messages, 1024 * 1024 * 4); while (rocketMqMapSplitter.hasNext()) { List<Message> next = rocketMqMapSplitter.next(); try { defaultMQProducer.send(next, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { //发送成功回调 } @Override public void onException(Throwable e) { //发送失败回调 } }); } catch (Exception e) { //异常回调 } } //关闭资源 defaultMQProducer.shutdown(); } }
//包名 import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageClientIDSetter; import org.apache.rocketmq.common.message.MessageDecoder; import java.util.Iterator; import java.util.List; import java.util.Map; public class RocketMqListSplitter implements Iterator<List<Message>> { private final List<Message> messages; private final int maxLimit; // 要进行批量发送消息的小集合起始索引 private int currIndex; public RocketMqListSplitter(List<Message> messages, Integer maxLimit) { this.messages = messages; if (maxLimit == null) { // 默认每次最大发送4M this.maxLimit = 1024 * 1024 * 4; } else { this.maxLimit = maxLimit; } } @Override public boolean hasNext() { return currIndex < messages.size(); } @Override public List<Message> next() { int nextIndex = currIndex; // 记录当前要发送的这一小批次消息列表的大小 int totalSize = 0; for (; nextIndex < messages.size(); nextIndex++) { // 获取当前遍历的消息 Message message = messages.get(nextIndex); //设置唯一ID,需要先设置再计算大小 MessageClientIDSetter.setUniqID(message); // 统计当前遍历的message的大小, 使用mq自带的计算方式,不然大小计算可能错误 int tmpSize = MessageDecoder.encodeMessage(message).length; // 判断当前消息本身是否大于限制 if (tmpSize > maxLimit) { if (nextIndex - currIndex == 0) { nextIndex++; } break; } if (tmpSize + totalSize > maxLimit) { break; } else { totalSize += tmpSize; } } // end-for // 获取当前messages列表的子集合[currIndex, nextIndex) List<Message> subList = messages.subList(currIndex, nextIndex); // 下次遍历的开始索引 currIndex = nextIndex; return subList; } }
说明: 1. 手动拉取消费,此种方式不方便;2. 借助spring使用监听
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer; import org.apache.rocketmq.common.message.MessageExt; public class MQBatchConsumerService { // 建议同一类业务用一个TOPIC,不同的小类型可以用tag区分 private static final String TOPIC = "TEST_TOPIC"; private static final String TAG = "TEST_TAG"; public static void main(String[] args) throws Exception { //消费者组 DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("test_procuct_group"); //自己的访问地址,如果是集群,集群之间可以使用;分割如ip:端口;ip:端口;ip:端口 litePullConsumer.setNamesrvAddr("192.168.1.224:9876"); //设置订阅的topic litePullConsumer.subscribe(TOPIC, TAG); //每次拉取的数量 litePullConsumer.setPullBatchSize(20); //启动监听 litePullConsumer.start(); try { //监听 while (true) { List<MessageExt> messageExts = litePullConsumer.poll(); //睡眠 TimeUnit.SECONDS.sleep(5); System.out.printf("%s%n", messageExts); } } finally { litePullConsumer.shutdown(); } } }
// 包名 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import java.nio.charset.StandardCharsets; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; public class MQBatchConsumer { // 建议同一类业务用一个TOPIC,不同的小类型可以用tag区分 private static final String TOPIC = "TEST_TOPIC"; private static final String TAG = "TEST_TAG"; public static void main(String[] args) throws Exception { MQBatchConsumer batchConsumer = new MQBatchConsumer(); batchConsumer.listenerDefaultMQPushConsumer(); } public void listenerDefaultMQPushConsumer() { DefaultMQPushConsumer mqPullConsumer = new DefaultMQPushConsumer(consumerGroup); //自己的访问地址,如果是集群,集群之间可以使用;分割如ip:端口;ip:端口;ip:端口 mqPullConsumer.setNamesrvAddr("192.168.1.224:9876"); mqPullConsumer.setConsumerGroup("test_procuct_group"); mqPullConsumer.setPullBatchSize(10); try { mqPullConsumer.subscribe(TOPIC, TAG); mqPullConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { try { List<String> contents = msgs.stream().map(item -> new String(item.getBody(), StandardCharsets.UTF_8)).collect(Collectors.toList()); //业务处理 System.out.println(contents); } catch (Exception e) { } }); mqPullConsumer.start(); /* * 监听的topic需要已经存在, * 如果不存在不会报错 * 可以使用 mqPullConsumer.fetchSubscribeMessageQueues(topic); 获取topic信息,如果topic不存在会报错 * / //睡眠 TimeUnit.SECONDS.sleep(Integer.MAX_VALUE); } catch (Exception e) { throw new RuntimeException("服务连接失败"); mqPullConsumer.shutdown(); } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。