赞
踩
1、maven pom.xml引入依赖
- <properties>
- <rocketmq-client.version>4.5.2</rocketmq-client.version>
- </properties>
- <dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-client</artifactId>
- <version>${rocketmq-client.version}</version>
- </dependency>
2、生产者application.yml加上配置
- rocketmq:
- producer:
- isOnOff: on
- groupName: k12OrderNotifyGroup
- namesrvAddr: rocket-01:9876;rocket-02:9876;rocket-03:9876
- maxMessageSize: 4096
- sendMsgTimeout: 3000
- retryTimesWhenSendFailed: 2
- topic: k12_order
3、消费者application.yml加上配置
- rocketmq:
- consumer:
- #该应用是否启用消费者
- isOnOff: on
- groupName: k12OrderNotifyGroup
- #mq的nameserver地址
- namesrvAddr: rocket-01:9876;rocket-02:9876;rocket-03:9876
- #该消费者订阅的主题和tags("*"号表示订阅该主题下所有的tags),格式:topic~tag1||tag2||tag3;topic2~*;
- topics: k12_order~*;
- consumeThreadMin: 20
- consumeThreadMax: 64
- #设置一次消费消息的条数,默认为1条
- consumeMessageBatchMaxSize: 1
- reconsumeTimes: 3
- topic: k12_order
- import lombok.Data;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.rocketmq.client.exception.MQClientException;
- import org.apache.rocketmq.client.producer.DefaultMQProducer;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.boot.context.properties.ConfigurationProperties;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.util.ObjectUtils;
-
- import java.util.UUID;
-
- @Slf4j
- @Data
- @ConfigurationProperties(prefix = "rocketmq-producer")
- public class RocketMQProducerConfig {
-
- /**
- * 生产/消费组
- */
- private String groupName;
-
- /**
- * 地址
- */
- private String namesrvAddr;
-
- /**
- * 最大生产量
- */
- private Integer maxMessageSize;
-
- /**
- * 往队列中发送消息超时时间
- */
- private Integer sendMsgTimeout;
-
- /**
- * 超时失败重发次数
- */
- private Integer retryTimesWhenSendFailed;
-
- /**
- * topic主题
- */
- private String topic;
-
- @Bean
- public DefaultMQProducer getRocketMQProducer() {
- DefaultMQProducer defaultMQProducer = new DefaultMQProducer(this.groupName);
- defaultMQProducer.setNamesrvAddr(this.namesrvAddr);
-
- //如果需要同一个jvm中不同的producer往不同的mq集群发送消息,需要设置不同的instanceName
- //defaultMQProducer.setInstanceName(UUID.randomUUID().toString());
-
- if (!ObjectUtils.isEmpty(this.maxMessageSize)) {
- defaultMQProducer.setMaxMessageSize(this.maxMessageSize);
- }
- if (!ObjectUtils.isEmpty(this.sendMsgTimeout)) {
- defaultMQProducer.setSendMsgTimeout(this.sendMsgTimeout);
- }
- if (!ObjectUtils.isEmpty(this.retryTimesWhenSendFailed)) {
- defaultMQProducer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed);
- }
-
- try {
- //生产者启动
- defaultMQProducer.start();
- log.info(String.format(
- "producer start, groupName:[%s],namesrvAddr:[%s]"
- , this.groupName
- , this.namesrvAddr
- )
- );
- } catch (MQClientException e) {
- log.error(String.format(
- "producer is error, {}"
- , e.getMessage()
- , e
- )
- );
- }
- return defaultMQProducer;
- }
- }

接口
- public interface Producer {
-
- void send(String topic, String tags, String keys, String body);
-
- void sendDelay(String topic, String tags, String keys, String body, Integer delayTimeLevel);
- }
实现类
- import com.xiyunerp.yzbizcenter.k12.order.dmm.Producer;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.rocketmq.client.exception.MQBrokerException;
- import org.apache.rocketmq.client.exception.MQClientException;
- import org.apache.rocketmq.client.producer.DefaultMQProducer;
- import org.apache.rocketmq.client.producer.SendResult;
- import org.apache.rocketmq.common.message.Message;
- import org.apache.rocketmq.remoting.exception.RemotingException;
- import org.springframework.stereotype.Component;
-
- import javax.annotation.Resource;
-
- @Slf4j
- @Component
- public class RocketMQProducer implements Producer {
-
- @Resource
- private DefaultMQProducer defaultMQProducer;
-
- @Override
- public void send(String topic, String tags, String keys, String body) {
- Message message = new Message(topic, tags, keys, body.getBytes());
- log.debug("消息发送信息内容:{}", body);
- SendResult sendResult;
- try {
- sendResult = defaultMQProducer.send(message);
- log.debug("消息发送响应信息: {}", sendResult.toString());
- } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
- log.debug("发送消息异常信息: {}", e.getMessage());
- }
- }
-
- @Override
- public void sendDelay(String topic, String tags, String keys, String body, Integer delayTimeLevel) {
- Message message = new Message(topic, tags, keys, body.getBytes());
- message.setDelayTimeLevel(delayTimeLevel);
- log.debug("延迟消息发送信息内容:{}", body);
- SendResult sendResult;
- try {
- sendResult = defaultMQProducer.send(message);
- log.debug("延迟消息发送响应信息: {}", sendResult.toString());
- } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
- log.debug("延迟发送消息异常信息: {}", e.getMessage());
- }
- }
- }

- import com.xiyunerp.yzbizcenter.k12.order.config.RocketMQConfig;
- import com.xiyunerp.yzbizcenter.k12.order.zys.dmm.RocketMQProducer;
- import com.xiyunerp.yzbizcenter.k12.utility.context.BeanUtils;
- import com.xiyunerp.yzbizcenter.k12.utility.utils.JsonUtils;
- import com.xiyunerp.yzbizcenter.k12.utility.utils.StringUtils;
- import lombok.extern.slf4j.Slf4j;
-
- import org.springframework.stereotype.Service;
-
- import java.util.List;
-
- @Slf4j
- @Service
- public class OrderProducerService {
-
- private RocketMQProducerConfig rocketMQProducerConfig =
- BeanUtils.getBean("rocketMQProducerConfig ", RocketMQProducerConfig .class);
-
- private RocketMQProducer rocketMQProducer = BeanUtils.getBean("rocketMQProducer", RocketMQProducer.class);
-
- public void sendDelay() {
- //处理自己其他的业务逻辑,设置相关参数
-
- //调用生产类进行往队列中发送消息
- String topic = "";
- String tags = "";
- String keys = "";
- String body = "";
- Integer delayTimeLevel = 4;
- try {
- //指定延迟时间
- rocketMQProducer.sendDelay(topic, tags, keys, body, delayTimeLevel);
- //rocketMQProducer.send(topic, tags, keys, body);
- } catch (Exception e) {
- log.debug("rocketMQ producer error, {}", e.getMessage());
- }
- }
- }

- import lombok.Data;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
- import org.apache.rocketmq.client.exception.MQClientException;
- import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- import javax.annotation.Resource;
- import java.util.UUID;
-
- @Slf4j
- @Data
- @ConfigurationProperties(prefix = "rocketmq-consumer")
- public class RocketMQConsumerConfig {
-
- private String namesrvAddr;
-
- private String groupName;
-
- private Integer consumeThreadMin;
-
- private Integer consumeThreadMax;
-
- private String topics;
-
- private Integer consumeMessageBatchMaxSize;
-
- private Integer reconsumeTimes;
-
- private String topic;
-
- @Resource
- private RocketListenerProcessor mqMessageListenerProcessor;
-
- @Bean
- public DefaultMQPushConsumer getRocketMQConsumer() {
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
- consumer.setNamesrvAddr(namesrvAddr);
- consumer.setInstanceName(UUID.randomUUID().toString());
- consumer.setConsumeThreadMin(consumeThreadMin);
- consumer.setConsumeThreadMax(consumeThreadMax);
- consumer.registerMessageListener(mqMessageListenerProcessor);
- consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
- /**
- * 设置消费模型,集群还是广播,默认为集群
- */
- //consumer.setMessageModel(MessageModel.CLUSTERING);
- consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);
- try {
- String[] topicTagsArr = topics.split(";");
- for (String topicTags : topicTagsArr) {
- String[] topicTag = topicTags.split("~");
- consumer.subscribe(topicTag[0], topicTag[1]);
- }
- consumer.start();
- log.debug("consumer is start, groupName:{}, topics:{}, namesrvAddr:{}", groupName, topics, namesrvAddr);
- } catch (MQClientException e) {
- log.error("consumer is start, groupName:{}, topics:{}, namesrvAddr:{}", groupName, topics, namesrvAddr, e);
- }
- return consumer;
- }
- }

- import com.xiyunerp.yzbizcenter.k12.mini.alipay.service.trade.AliPayOrderSyncService;
- import com.xiyunerp.yzbizcenter.k12.utility.utils.CollectionUtils;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
- import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
- import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
- import org.apache.rocketmq.common.message.MessageExt;
- import org.springframework.data.redis.core.RedisTemplate;
- import org.springframework.stereotype.Component;
-
- import javax.annotation.Resource;
- import java.util.List;
- import java.util.concurrent.TimeUnit;
-
- @Slf4j
- @Component
- public class RocketListenerProcessor implements MessageListenerConcurrently {
-
- @Resource
- RocketMQConsumerConfig rocketMQConfig;
-
- @Resource
- AliPayOrderSyncService aliPayOrderSyncService;//处理业务类,自定义
-
- @Resource
- RedisTemplate redisTemplate;
-
- private final static Long EXPIRE_TIME_MILLI = 1 * 60 * 1000L;
-
- private final static Long INCREMENT_STEP = 1L;
-
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
- if (CollectionUtils.isEmpty(list)) {
- log.debug("接受到的消息为空,不处理,直接返回成功");
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- for (MessageExt messageExt : list) {
- if (this.barrier(messageExt.getMsgId(), EXPIRE_TIME_MILLI)) {
- continue;
- }
- if (messageExt.getTopic().equalsIgnoreCase(rocketMQConfig.getTopic())) {
- int reconsume = messageExt.getReconsumeTimes();
- if (reconsume == rocketMQConfig.getReconsumeTimes()) {
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
-
- log.debug("接受到的消息为:{}", messageExt.toString());
- log.debug("接受到的消息为:{}, {}", messageExt.getTags(), Long.parseLong(new String(messageExt.getBody())));
- //调用业务类消费数据,自定义
- //aliPayOrderSyncService.sendOrder(messageExt);
- }
- }
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- /**
- * 做幂等避免重复消费
- * @param key
- * @param expireMillis
- * @return
- */
- public Boolean barrier(String key, Long expireMillis) {
- Long count = this.increment(key, expireMillis);
- if (count > 1) {
- return Boolean.TRUE;
- }
- return Boolean.FALSE;
- }
-
- public Long increment(String key, Long expireMillis) {
- Long count = redisTemplate.opsForValue().increment(key, INCREMENT_STEP);
- if (1 == count) {
- redisTemplate.expire(key, expireMillis, TimeUnit.MILLISECONDS);
- }
- return count;
- }
- }

- @Service
- public class AliPayOrderSyncService {
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。