当前位置:   article > 正文

Spring Boot 整合 RocketMq_springboot rocketmq yml配置

springboot rocketmq yml配置

技术架构:

Spring Boot 2.6.3 、 RocketMq V4_9_2、 JDK1.8

导入依赖

  1. <dependency>
  2. <groupId>org.projectlombok</groupId>
  3. <artifactId>lombok</artifactId>
  4. <version>1.18.4</version>
  5. </dependency>
  6. <!-- rocketmq -->
  7. <dependency>
  8. <groupId>org.apache.rocketmq</groupId>
  9. <artifactId>rocketmq-client</artifactId>
  10. <version>4.7.0</version>
  11. </dependency>

生产者

1.生产者application.yml配置

  1. #mq生产者配置信息
  2. rocketmq:
  3. producer:
  4. # mq的nameserver地址
  5. namesrvAddr: 192.168.116.108:9876
  6. # mq的分组名称
  7. groupName: producer-service
  8. # mq的topic主题
  9. topics: mq_topic
  10. # mq的tag标签
  11. tags: mq_tag

2.生产者配置

  1. package com.example.producers.config;
  2. import lombok.Data;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.apache.rocketmq.client.exception.MQClientException;
  5. import org.apache.rocketmq.client.producer.DefaultMQProducer;
  6. import org.springframework.boot.context.properties.ConfigurationProperties;
  7. import org.springframework.context.annotation.Bean;
  8. import org.springframework.context.annotation.Configuration;
  9. /**
  10. * @author lanx
  11. * @date 2022/3/5
  12. * @Description: mq生产者配置
  13. */
  14. @Slf4j
  15. @Data
  16. @Configuration
  17. @ConfigurationProperties(prefix = "rocketmq.producer")
  18. public class MQProducerConfigure {
  19. private String groupName;
  20. private String namesrvAddr;
  21. /**
  22. * mq 生成者配置
  23. *
  24. * @return
  25. * @throws MQClientException
  26. */
  27. @Bean
  28. public DefaultMQProducer defaultProducer() throws MQClientException {
  29. log.info("defaultProducer 正在创建---------------------------------------");
  30. DefaultMQProducer producer = new DefaultMQProducer(groupName);
  31. producer.setNamesrvAddr(namesrvAddr);
  32. producer.setVipChannelEnabled(false);
  33. producer.start();
  34. log.info("rocketmq producer server 开启成功----------------------------------");
  35. return producer;
  36. }
  37. }

3.生产者发送消息Controller

  1. package com.example.producers.web;
  2. import com.alibaba.fastjson.JSON;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.apache.commons.collections.MapUtils;
  5. import org.apache.rocketmq.client.exception.MQBrokerException;
  6. import org.apache.rocketmq.client.exception.MQClientException;
  7. import org.apache.rocketmq.client.producer.DefaultMQProducer;
  8. import org.apache.rocketmq.client.producer.SendResult;
  9. import org.apache.rocketmq.common.message.Message;
  10. import org.apache.rocketmq.remoting.exception.RemotingException;
  11. import org.springframework.beans.factory.annotation.Autowired;
  12. import org.springframework.beans.factory.annotation.Value;
  13. import org.springframework.web.bind.annotation.RequestBody;
  14. import org.springframework.web.bind.annotation.RequestMapping;
  15. import org.springframework.web.bind.annotation.RestController;
  16. import java.util.HashMap;
  17. import java.util.Map;
  18. /**
  19. * @author lanx
  20. * @date 2022/3/5
  21. */
  22. @Slf4j
  23. @RestController
  24. public class MqController {
  25. @Autowired
  26. private DefaultMQProducer defaultMQProducer;
  27. @Value("${rocketmq.producer.topics}")
  28. private String topics;
  29. @Value("${rocketmq.producer.tags}")
  30. private String tags;
  31. /**
  32. * 发送的MQ消息
  33. * @param map
  34. * @return
  35. */
  36. @RequestMapping("/send")
  37. public Map send(@RequestBody Map<String,String> map) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
  38. if(MapUtils.isEmpty(map)){
  39. return new HashMap();
  40. }
  41. log.info("发送MQ消息内容:" + JSON.toJSONString(map));
  42. Message sendMsg = new Message(topics,tags, JSON.toJSONString(map).getBytes());
  43. // 默认3秒超时
  44. SendResult sendResult = defaultMQProducer.send(sendMsg);
  45. log.info("消息发送响应:" + sendResult.toString());
  46. return new HashMap();
  47. }
  48. }

4.发送消息返回状态

  1. public enum SendStatus {
  2. // 消息发送成功
  3. SEND_OK,
  4. // 消息发送成功,但是服务器刷盘超时,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失
  5. FLUSH_DISK_TIMEOUT,
  6. // 消息发送成功,但是服务器同步到 Slave 时超时,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失
  7. FLUSH_SLAVE_TIMEOUT,
  8. // 消息发送成功,但是此时 slave 不可用,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢
  9. SLAVE_NOT_AVAILABLE,
  10. }

消费者

1.消费者application.yml配置

  1. #mq消费者配置信息
  2. rocketmq:
  3. consumer:
  4. # mq的nameserver地址
  5. namesrvAddr: 192.168.116.108:9876
  6. # mq的分组名称
  7. groupName: consumer-service
  8. # mq的topic订阅主题
  9. topics: mq_topic
  10. # mq的tag订阅标签
  11. tags: mq_tag

2.消费者配置

  1. package com.example.consumers.config;
  2. import com.example.consumers.task.MQConsumeMsgListenerProcessor;
  3. import lombok.Data;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
  6. import org.apache.rocketmq.client.exception.MQClientException;
  7. import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
  8. import org.springframework.beans.factory.annotation.Autowired;
  9. import org.springframework.boot.context.properties.ConfigurationProperties;
  10. import org.springframework.context.annotation.Bean;
  11. import org.springframework.context.annotation.Configuration;
  12. /**
  13. * @author lanx
  14. * @date 2022/3/5
  15. * @Description: mq消费者配置
  16. */
  17. @Slf4j
  18. @Data
  19. @Configuration
  20. @ConfigurationProperties(prefix = "rocketmq.consumer")
  21. public class MQConsumerConfigure {
  22. private String groupName;
  23. private String namesrvAddr;
  24. private String topics;
  25. private String tags;
  26. @Autowired
  27. private MQConsumeMsgListenerProcessor consumeMsgListenerProcessor;
  28. /**
  29. * mq 消费者配置
  30. *
  31. * @return
  32. * @throws MQClientException
  33. */
  34. @Bean
  35. public DefaultMQPushConsumer defaultConsumer() throws MQClientException {
  36. log.info("defaultConsumer 正在创建---------------------------------------");
  37. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
  38. consumer.setNamesrvAddr(namesrvAddr);
  39. // 设置监听
  40. consumer.registerMessageListener(consumeMsgListenerProcessor);
  41. /**
  42. * 设置consumer第一次启动是从队列头部开始还是队列尾部开始
  43. * 如果不是第一次启动,那么按照上次消费的位置继续消费
  44. */
  45. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
  46. /**
  47. * 设置消费模型,集群还是广播,默认为集群
  48. */
  49. //consumer.setMessageModel(MessageModel.CLUSTERING);
  50. try {
  51. // 设置该消费者订阅的主题和tag,如果订阅该主题下的所有tag,则使用*,
  52. consumer.subscribe(topics, tags);
  53. consumer.start();
  54. log.info("consumer 创建成功 groupName={}, topics={}, namesrvAddr={}", groupName, topics, namesrvAddr);
  55. } catch (MQClientException e) {
  56. log.error("consumer 创建失败!");
  57. }
  58. return consumer;
  59. }
  60. }

 3.消费者监听

  1. package com.example.consumers.task;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
  4. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
  5. import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
  6. import org.apache.rocketmq.common.message.MessageExt;
  7. import org.springframework.stereotype.Component;
  8. import org.springframework.util.CollectionUtils;
  9. import java.util.List;
  10. /**
  11. * @author lanx
  12. * @date 2022/3/5
  13. * @Description: 消费者监听
  14. */
  15. @Slf4j
  16. @Component
  17. public class MQConsumeMsgListenerProcessor implements MessageListenerConcurrently {
  18. /**
  19. * 默认msg里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息
  20. * 不要抛异常,如果没有return CONSUME_SUCCESS ,consumer会重新消费该消息,直到return CONSUME_SUCCESS
  21. *
  22. * @param msgList
  23. * @param consumeConcurrentlyContext
  24. * @return
  25. */
  26. @Override
  27. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
  28. if (CollectionUtils.isEmpty(msgList)) {
  29. log.info("MQ接收消息为空,直接返回成功");
  30. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  31. }
  32. MessageExt messageExt = msgList.get(0);
  33. log.info("MQ接收到的消息为:" + messageExt.toString());
  34. try {
  35. String topic = messageExt.getTopic();
  36. String tags = messageExt.getTags();
  37. String body = new String(messageExt.getBody(), "utf-8");
  38. log.info("MQ消息topic={}, tags={}, 消息内容={}", topic, tags, body);
  39. } catch (Exception e) {
  40. log.error("获取MQ消息内容异常{}", e);
  41. }
  42. // TODO 处理业务逻辑
  43. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  44. }
  45. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/484952
推荐阅读
相关标签
  

闽ICP备14008679号