当前位置:   article > 正文

Java基于rocketmq的订单生产与消费场景示例_java rocketmq的生产与消费

java rocketmq的生产与消费

一、依赖配置

1、maven pom.xml引入依赖

  1. <properties>
  2. <rocketmq-client.version>4.5.2</rocketmq-client.version>
  3. </properties>
  4. <dependency>
  5. <groupId>org.apache.rocketmq</groupId>
  6. <artifactId>rocketmq-client</artifactId>
  7. <version>${rocketmq-client.version}</version>
  8. </dependency>

2、生产者application.yml加上配置

  1. rocketmq:
  2. producer:
  3. isOnOff: on
  4. groupName: k12OrderNotifyGroup
  5. namesrvAddr: rocket-01:9876;rocket-02:9876;rocket-03:9876
  6. maxMessageSize: 4096
  7. sendMsgTimeout: 3000
  8. retryTimesWhenSendFailed: 2
  9. topic: k12_order

 3、消费者application.yml加上配置

  1. rocketmq:
  2. consumer:
  3. #该应用是否启用消费者
  4. isOnOff: on
  5. groupName: k12OrderNotifyGroup
  6. #mq的nameserver地址
  7. namesrvAddr: rocket-01:9876;rocket-02:9876;rocket-03:9876
  8. #该消费者订阅的主题和tags("*"号表示订阅该主题下所有的tags),格式:topic~tag1||tag2||tag3;topic2~*;
  9. topics: k12_order~*;
  10. consumeThreadMin: 20
  11. consumeThreadMax: 64
  12. #设置一次消费消息的条数,默认为1条
  13. consumeMessageBatchMaxSize: 1
  14. reconsumeTimes: 3
  15. topic: k12_order

二、Producer服务生产订单

1、生产启动配置类

  1. import lombok.Data;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.apache.rocketmq.client.exception.MQClientException;
  4. import org.apache.rocketmq.client.producer.DefaultMQProducer;
  5. import org.springframework.beans.factory.annotation.Value;
  6. import org.springframework.boot.context.properties.ConfigurationProperties;
  7. import org.springframework.context.annotation.Bean;
  8. import org.springframework.context.annotation.Configuration;
  9. import org.springframework.util.ObjectUtils;
  10. import java.util.UUID;
  11. @Slf4j
  12. @Data
  13. @ConfigurationProperties(prefix = "rocketmq-producer")
  14. public class RocketMQProducerConfig {
  15. /**
  16. * 生产/消费组
  17. */
  18. private String groupName;
  19. /**
  20. * 地址
  21. */
  22. private String namesrvAddr;
  23. /**
  24. * 最大生产量
  25. */
  26. private Integer maxMessageSize;
  27. /**
  28. * 往队列中发送消息超时时间
  29. */
  30. private Integer sendMsgTimeout;
  31. /**
  32. * 超时失败重发次数
  33. */
  34. private Integer retryTimesWhenSendFailed;
  35. /**
  36. * topic主题
  37. */
  38. private String topic;
  39. @Bean
  40. public DefaultMQProducer getRocketMQProducer() {
  41. DefaultMQProducer defaultMQProducer = new DefaultMQProducer(this.groupName);
  42. defaultMQProducer.setNamesrvAddr(this.namesrvAddr);
  43. //如果需要同一个jvm中不同的producer往不同的mq集群发送消息,需要设置不同的instanceName
  44. //defaultMQProducer.setInstanceName(UUID.randomUUID().toString());
  45. if (!ObjectUtils.isEmpty(this.maxMessageSize)) {
  46. defaultMQProducer.setMaxMessageSize(this.maxMessageSize);
  47. }
  48. if (!ObjectUtils.isEmpty(this.sendMsgTimeout)) {
  49. defaultMQProducer.setSendMsgTimeout(this.sendMsgTimeout);
  50. }
  51. if (!ObjectUtils.isEmpty(this.retryTimesWhenSendFailed)) {
  52. defaultMQProducer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed);
  53. }
  54. try {
  55. //生产者启动
  56. defaultMQProducer.start();
  57. log.info(String.format(
  58. "producer start, groupName:[%s],namesrvAddr:[%s]"
  59. , this.groupName
  60. , this.namesrvAddr
  61. )
  62. );
  63. } catch (MQClientException e) {
  64. log.error(String.format(
  65. "producer is error, {}"
  66. , e.getMessage()
  67. , e
  68. )
  69. );
  70. }
  71. return defaultMQProducer;
  72. }
  73. }

2、生产者类

接口

  1. public interface Producer {
  2. void send(String topic, String tags, String keys, String body);
  3. void sendDelay(String topic, String tags, String keys, String body, Integer delayTimeLevel);
  4. }

 实现类

  1. import com.xiyunerp.yzbizcenter.k12.order.dmm.Producer;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.apache.rocketmq.client.exception.MQBrokerException;
  4. import org.apache.rocketmq.client.exception.MQClientException;
  5. import org.apache.rocketmq.client.producer.DefaultMQProducer;
  6. import org.apache.rocketmq.client.producer.SendResult;
  7. import org.apache.rocketmq.common.message.Message;
  8. import org.apache.rocketmq.remoting.exception.RemotingException;
  9. import org.springframework.stereotype.Component;
  10. import javax.annotation.Resource;
  11. @Slf4j
  12. @Component
  13. public class RocketMQProducer implements Producer {
  14. @Resource
  15. private DefaultMQProducer defaultMQProducer;
  16. @Override
  17. public void send(String topic, String tags, String keys, String body) {
  18. Message message = new Message(topic, tags, keys, body.getBytes());
  19. log.debug("消息发送信息内容:{}", body);
  20. SendResult sendResult;
  21. try {
  22. sendResult = defaultMQProducer.send(message);
  23. log.debug("消息发送响应信息: {}", sendResult.toString());
  24. } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
  25. log.debug("发送消息异常信息: {}", e.getMessage());
  26. }
  27. }
  28. @Override
  29. public void sendDelay(String topic, String tags, String keys, String body, Integer delayTimeLevel) {
  30. Message message = new Message(topic, tags, keys, body.getBytes());
  31. message.setDelayTimeLevel(delayTimeLevel);
  32. log.debug("延迟消息发送信息内容:{}", body);
  33. SendResult sendResult;
  34. try {
  35. sendResult = defaultMQProducer.send(message);
  36. log.debug("延迟消息发送响应信息: {}", sendResult.toString());
  37. } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
  38. log.debug("延迟发送消息异常信息: {}", e.getMessage());
  39. }
  40. }
  41. }

3、业务类(调用生产者)

  1. import com.xiyunerp.yzbizcenter.k12.order.config.RocketMQConfig;
  2. import com.xiyunerp.yzbizcenter.k12.order.zys.dmm.RocketMQProducer;
  3. import com.xiyunerp.yzbizcenter.k12.utility.context.BeanUtils;
  4. import com.xiyunerp.yzbizcenter.k12.utility.utils.JsonUtils;
  5. import com.xiyunerp.yzbizcenter.k12.utility.utils.StringUtils;
  6. import lombok.extern.slf4j.Slf4j;
  7. import org.springframework.stereotype.Service;
  8. import java.util.List;
  9. @Slf4j
  10. @Service
  11. public class OrderProducerService {
  12. private RocketMQProducerConfig rocketMQProducerConfig =
  13. BeanUtils.getBean("rocketMQProducerConfig ", RocketMQProducerConfig .class);
  14. private RocketMQProducer rocketMQProducer = BeanUtils.getBean("rocketMQProducer", RocketMQProducer.class);
  15. public void sendDelay() {
  16. //处理自己其他的业务逻辑,设置相关参数
  17. //调用生产类进行往队列中发送消息
  18. String topic = "";
  19. String tags = "";
  20. String keys = "";
  21. String body = "";
  22. Integer delayTimeLevel = 4;
  23. try {
  24. //指定延迟时间
  25. rocketMQProducer.sendDelay(topic, tags, keys, body, delayTimeLevel);
  26. //rocketMQProducer.send(topic, tags, keys, body);
  27. } catch (Exception e) {
  28. log.debug("rocketMQ producer error, {}", e.getMessage());
  29. }
  30. }
  31. }

三、Consumer服务生产订单

1、消费启动配置类

  1. import lombok.Data;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
  4. import org.apache.rocketmq.client.exception.MQClientException;
  5. import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
  6. import org.springframework.beans.factory.annotation.Value;
  7. import org.springframework.context.annotation.Bean;
  8. import org.springframework.context.annotation.Configuration;
  9. import javax.annotation.Resource;
  10. import java.util.UUID;
  11. @Slf4j
  12. @Data
  13. @ConfigurationProperties(prefix = "rocketmq-consumer")
  14. public class RocketMQConsumerConfig {
  15. private String namesrvAddr;
  16. private String groupName;
  17. private Integer consumeThreadMin;
  18. private Integer consumeThreadMax;
  19. private String topics;
  20. private Integer consumeMessageBatchMaxSize;
  21. private Integer reconsumeTimes;
  22. private String topic;
  23. @Resource
  24. private RocketListenerProcessor mqMessageListenerProcessor;
  25. @Bean
  26. public DefaultMQPushConsumer getRocketMQConsumer() {
  27. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
  28. consumer.setNamesrvAddr(namesrvAddr);
  29. consumer.setInstanceName(UUID.randomUUID().toString());
  30. consumer.setConsumeThreadMin(consumeThreadMin);
  31. consumer.setConsumeThreadMax(consumeThreadMax);
  32. consumer.registerMessageListener(mqMessageListenerProcessor);
  33. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
  34. /**
  35. * 设置消费模型,集群还是广播,默认为集群
  36. */
  37. //consumer.setMessageModel(MessageModel.CLUSTERING);
  38. consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);
  39. try {
  40. String[] topicTagsArr = topics.split(";");
  41. for (String topicTags : topicTagsArr) {
  42. String[] topicTag = topicTags.split("~");
  43. consumer.subscribe(topicTag[0], topicTag[1]);
  44. }
  45. consumer.start();
  46. log.debug("consumer is start, groupName:{}, topics:{}, namesrvAddr:{}", groupName, topics, namesrvAddr);
  47. } catch (MQClientException e) {
  48. log.error("consumer is start, groupName:{}, topics:{}, namesrvAddr:{}", groupName, topics, namesrvAddr, e);
  49. }
  50. return consumer;
  51. }
  52. }

2、消费者类(调用业务类处理业务)

  1. import com.xiyunerp.yzbizcenter.k12.mini.alipay.service.trade.AliPayOrderSyncService;
  2. import com.xiyunerp.yzbizcenter.k12.utility.utils.CollectionUtils;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
  5. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
  6. import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
  7. import org.apache.rocketmq.common.message.MessageExt;
  8. import org.springframework.data.redis.core.RedisTemplate;
  9. import org.springframework.stereotype.Component;
  10. import javax.annotation.Resource;
  11. import java.util.List;
  12. import java.util.concurrent.TimeUnit;
  13. @Slf4j
  14. @Component
  15. public class RocketListenerProcessor implements MessageListenerConcurrently {
  16. @Resource
  17. RocketMQConsumerConfig rocketMQConfig;
  18. @Resource
  19. AliPayOrderSyncService aliPayOrderSyncService;//处理业务类,自定义
  20. @Resource
  21. RedisTemplate redisTemplate;
  22. private final static Long EXPIRE_TIME_MILLI = 1 * 60 * 1000L;
  23. private final static Long INCREMENT_STEP = 1L;
  24. @Override
  25. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
  26. if (CollectionUtils.isEmpty(list)) {
  27. log.debug("接受到的消息为空,不处理,直接返回成功");
  28. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  29. }
  30. for (MessageExt messageExt : list) {
  31. if (this.barrier(messageExt.getMsgId(), EXPIRE_TIME_MILLI)) {
  32. continue;
  33. }
  34. if (messageExt.getTopic().equalsIgnoreCase(rocketMQConfig.getTopic())) {
  35. int reconsume = messageExt.getReconsumeTimes();
  36. if (reconsume == rocketMQConfig.getReconsumeTimes()) {
  37. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  38. }
  39. log.debug("接受到的消息为:{}", messageExt.toString());
  40. log.debug("接受到的消息为:{}, {}", messageExt.getTags(), Long.parseLong(new String(messageExt.getBody())));
  41. //调用业务类消费数据,自定义
  42. //aliPayOrderSyncService.sendOrder(messageExt);
  43. }
  44. }
  45. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  46. }
  47. /**
  48. * 做幂等避免重复消费
  49. * @param key
  50. * @param expireMillis
  51. * @return
  52. */
  53. public Boolean barrier(String key, Long expireMillis) {
  54. Long count = this.increment(key, expireMillis);
  55. if (count > 1) {
  56. return Boolean.TRUE;
  57. }
  58. return Boolean.FALSE;
  59. }
  60. public Long increment(String key, Long expireMillis) {
  61. Long count = redisTemplate.opsForValue().increment(key, INCREMENT_STEP);
  62. if (1 == count) {
  63. redisTemplate.expire(key, expireMillis, TimeUnit.MILLISECONDS);
  64. }
  65. return count;
  66. }
  67. }

3、业务类(被消费类调用,处理自己业务)

  1. @Service
  2. public class AliPayOrderSyncService {
  3. }

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/很楠不爱3/article/detail/484963?site
推荐阅读
相关标签
  

闽ICP备14008679号