赞
踩
Spring Boot 2.6.3 、 RocketMq V4_9_2、 JDK1.8
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- <version>1.18.4</version>
- </dependency>
-
- <!-- rocketmq -->
- <dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-client</artifactId>
- <version>4.7.0</version>
- </dependency>
1.生产者application.yml配置
- #mq生产者配置信息
- rocketmq:
- producer:
- # mq的nameserver地址
- namesrvAddr: 192.168.116.108:9876
- # mq的分组名称
- groupName: producer-service
- # mq的topic主题
- topics: mq_topic
- # mq的tag标签
- tags: mq_tag
2.生产者配置
- package com.example.producers.config;
-
- import lombok.Data;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.rocketmq.client.exception.MQClientException;
- import org.apache.rocketmq.client.producer.DefaultMQProducer;
- import org.springframework.boot.context.properties.ConfigurationProperties;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- /**
- * @author lanx
- * @date 2022/3/5
- * @Description: mq生产者配置
- */
- @Slf4j
- @Data
- @Configuration
- @ConfigurationProperties(prefix = "rocketmq.producer")
- public class MQProducerConfigure {
-
- private String groupName;
- private String namesrvAddr;
-
- /**
- * mq 生成者配置
- *
- * @return
- * @throws MQClientException
- */
- @Bean
- public DefaultMQProducer defaultProducer() throws MQClientException {
- log.info("defaultProducer 正在创建---------------------------------------");
- DefaultMQProducer producer = new DefaultMQProducer(groupName);
- producer.setNamesrvAddr(namesrvAddr);
- producer.setVipChannelEnabled(false);
- producer.start();
- log.info("rocketmq producer server 开启成功----------------------------------");
- return producer;
- }
- }
3.生产者发送消息Controller
- package com.example.producers.web;
-
- import com.alibaba.fastjson.JSON;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.commons.collections.MapUtils;
- import org.apache.rocketmq.client.exception.MQBrokerException;
- import org.apache.rocketmq.client.exception.MQClientException;
- import org.apache.rocketmq.client.producer.DefaultMQProducer;
- import org.apache.rocketmq.client.producer.SendResult;
- import org.apache.rocketmq.common.message.Message;
- import org.apache.rocketmq.remoting.exception.RemotingException;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.web.bind.annotation.RequestBody;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.RestController;
-
- import java.util.HashMap;
- import java.util.Map;
-
- /**
- * @author lanx
- * @date 2022/3/5
- */
- @Slf4j
- @RestController
- public class MqController {
-
-
- @Autowired
- private DefaultMQProducer defaultMQProducer;
-
- @Value("${rocketmq.producer.topics}")
- private String topics;
- @Value("${rocketmq.producer.tags}")
- private String tags;
-
- /**
- * 发送的MQ消息
- * @param map
- * @return
- */
- @RequestMapping("/send")
- public Map send(@RequestBody Map<String,String> map) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
- if(MapUtils.isEmpty(map)){
- return new HashMap();
- }
- log.info("发送MQ消息内容:" + JSON.toJSONString(map));
- Message sendMsg = new Message(topics,tags, JSON.toJSONString(map).getBytes());
- // 默认3秒超时
- SendResult sendResult = defaultMQProducer.send(sendMsg);
- log.info("消息发送响应:" + sendResult.toString());
- return new HashMap();
- }
-
- }
4.发送消息返回状态
- public enum SendStatus {
-
- // 消息发送成功
- SEND_OK,
-
- // 消息发送成功,但是服务器刷盘超时,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失
- FLUSH_DISK_TIMEOUT,
-
- // 消息发送成功,但是服务器同步到 Slave 时超时,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失
- FLUSH_SLAVE_TIMEOUT,
-
- // 消息发送成功,但是此时 slave 不可用,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢
- SLAVE_NOT_AVAILABLE,
- }
1.消费者application.yml配置
- #mq消费者配置信息
- rocketmq:
- consumer:
- # mq的nameserver地址
- namesrvAddr: 192.168.116.108:9876
- # mq的分组名称
- groupName: consumer-service
- # mq的topic订阅主题
- topics: mq_topic
- # mq的tag订阅标签
- tags: mq_tag
2.消费者配置
- package com.example.consumers.config;
-
-
- import com.example.consumers.task.MQConsumeMsgListenerProcessor;
- import lombok.Data;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
- import org.apache.rocketmq.client.exception.MQClientException;
- import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.boot.context.properties.ConfigurationProperties;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- /**
- * @author lanx
- * @date 2022/3/5
- * @Description: mq消费者配置
- */
- @Slf4j
- @Data
- @Configuration
- @ConfigurationProperties(prefix = "rocketmq.consumer")
- public class MQConsumerConfigure {
-
- private String groupName;
- private String namesrvAddr;
- private String topics;
- private String tags;
-
-
- @Autowired
- private MQConsumeMsgListenerProcessor consumeMsgListenerProcessor;
-
- /**
- * mq 消费者配置
- *
- * @return
- * @throws MQClientException
- */
- @Bean
- public DefaultMQPushConsumer defaultConsumer() throws MQClientException {
- log.info("defaultConsumer 正在创建---------------------------------------");
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
- consumer.setNamesrvAddr(namesrvAddr);
- // 设置监听
- consumer.registerMessageListener(consumeMsgListenerProcessor);
-
- /**
- * 设置consumer第一次启动是从队列头部开始还是队列尾部开始
- * 如果不是第一次启动,那么按照上次消费的位置继续消费
- */
- consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
- /**
- * 设置消费模型,集群还是广播,默认为集群
- */
- //consumer.setMessageModel(MessageModel.CLUSTERING);
- try {
- // 设置该消费者订阅的主题和tag,如果订阅该主题下的所有tag,则使用*,
- consumer.subscribe(topics, tags);
- consumer.start();
- log.info("consumer 创建成功 groupName={}, topics={}, namesrvAddr={}", groupName, topics, namesrvAddr);
- } catch (MQClientException e) {
- log.error("consumer 创建失败!");
- }
- return consumer;
- }
- }
3.消费者监听
- package com.example.consumers.task;
-
- import lombok.extern.slf4j.Slf4j;
- import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
- import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
- import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
- import org.apache.rocketmq.common.message.MessageExt;
- import org.springframework.stereotype.Component;
- import org.springframework.util.CollectionUtils;
-
- import java.util.List;
-
- /**
- * @author lanx
- * @date 2022/3/5
- * @Description: 消费者监听
- */
- @Slf4j
- @Component
- public class MQConsumeMsgListenerProcessor implements MessageListenerConcurrently {
-
-
- /**
- * 默认msg里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息
- * 不要抛异常,如果没有return CONSUME_SUCCESS ,consumer会重新消费该消息,直到return CONSUME_SUCCESS
- *
- * @param msgList
- * @param consumeConcurrentlyContext
- * @return
- */
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
- if (CollectionUtils.isEmpty(msgList)) {
- log.info("MQ接收消息为空,直接返回成功");
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- MessageExt messageExt = msgList.get(0);
- log.info("MQ接收到的消息为:" + messageExt.toString());
- try {
- String topic = messageExt.getTopic();
- String tags = messageExt.getTags();
- String body = new String(messageExt.getBody(), "utf-8");
-
- log.info("MQ消息topic={}, tags={}, 消息内容={}", topic, tags, body);
- } catch (Exception e) {
- log.error("获取MQ消息内容异常{}", e);
- }
- // TODO 处理业务逻辑
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。