当前位置:   article > 正文

SpringBoot整合阿里云RocketMQ_consumethreadnums

consumethreadnums

本文主要介绍SpringBoot整合阿里云消息队列的使用

1.Maven依赖

 <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.10.RELEASE</version>
        <relativePath/>
    </parent>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <ons-client.version>1.7.8.Final</ons-client.version>
        <commons-lang3.version>3.4</commons-lang3.version>
        <hutool-all.version>3.0.9</hutool-all.version>
        <fastjson.version>1.2.47</fastjson.version>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <!-- springBoot 相关  -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>

        <!-- ons-client -->
        <dependency>
            <groupId>com.aliyun.openservices</groupId>
            <artifactId>ons-client</artifactId>
            <version>${ons-client.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>${commons-lang3.version}</version>
        </dependency>

        <!-- hutool 工具 -->
        <dependency>
            <groupId>com.xiaoleilu</groupId>
            <artifactId>hutool-all</artifactId>
            <version>${hutool-all.version}</version>
        </dependency>
        <!-- json 工具 -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>${fastjson.version}</version>
        </dependency>
    </dependencies>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57

2.配置文件

	server.port=9010
	#阿里云 Access Key 
	aliyun.mq.accesskey=xxxxxx
	##阿里云 Access Key Secret
	aliyun.mq.secretkey=xxxxxx

	#消息队列信息
	aliyun.mq.normal.topic=My_Mq_Test_Topic_01
	aliyun.mq.normal.tag=TagA
	aliyun.mq.normal.producerId=PID_My_Mq_test_Producer_01
	aliyun.mq.normal.consumerId=CID_My_Mq_Test_Topic_01
	aliyun.mq.normal.keyPrefix=Mq_Test_01_

	aliyun.mq.broadcast.topic=My_Mq_Test_Topic_02
	aliyun.mq.broadcast.tag=TagA
	aliyun.mq.broadcast.producerId=PID_My_Mq_test_Producer_02
	aliyun.mq.broadcast.consumerId=CID_My_Mq_Test_Topic_02
	aliyun.mq.broadcast.keyPrefix=Mq_Test_02_

	#日志 
	logging.level.root=WARN
	logging.level.com.boot.aliware.mq=DEBUG
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

3.参数配置类

3.1阿里云账户信息

@Configuration
	@ConfigurationProperties(prefix = "aliyun.mq")
	public class AliyunAccountConfig {

		/**
		 * 阿里云 Access Key
		 */
		private String accesskey;

		/**
		 * 阿里云 Access Key Secret
		 */
		private String secretkey;
		//省略get/set方法
	}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

3.2 消息队列属性基础 properties

public class MqBaseProperties {

    /**
     * 队列主题
     */
    private String topic;

    /**
     * 队列标签
     */
    private String tag;

    /**
     * 生产者id
     */
    private String producerId;

    /**
     * 消费者id
     */
    private String consumerId;

    /**
     * 设置代表消息的业务关键属性前缀,尽可能全局唯一
     * 以方便您在无法正常收到消息情况下,可通过MQ 控制台查询消息并补发
     */
    private String keyPrefix;
	//省略get/set 方法
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

3.3普通消息队列属性配置类

@Configuration
@ConfigurationProperties(prefix = "aliyun.mq.normal")
public class MqNormalParamConfig extends MqBaseProperties {
}
  • 1
  • 2
  • 3
  • 4

3.4广播消息队列属性配置类

@Configuration
@ConfigurationProperties(prefix = "aliyun.mq.broadcast")
public class MqBroadcastParamConfig extends MqBaseProperties {
}
  • 1
  • 2
  • 3
  • 4

4.Mq配置

4.1 公共基础抽象配置

package com.boot.aliware.mq.config;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.PropertyValueConst;
import com.aliyun.openservices.ons.api.bean.ConsumerBean;
import com.aliyun.openservices.ons.api.bean.ProducerBean;
import com.aliyun.openservices.ons.api.bean.Subscription;
import com.boot.aliware.mq.config.param.AliyunAccountConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.ObjectUtils;

import java.util.*;

/**
 * 消息队列基础配置 父类
 */
public class MqBaseConfig {
    private static final Logger LOGGER = LoggerFactory.getLogger(MqBaseConfig.class);
    private final String STAR_FLOWER = "*";
    @Autowired
    private AliyunAccountConfig mqParamProperties;

    /**
     * 创建生产者
     *
     * @param producerId 生产者id
     * @return 生产者bean
     */
    protected ProducerBean createProducer(String producerId) {
        ProducerBean producerBean = new ProducerBean();
        Properties properties = this.createProducerProperties(producerId);
        producerBean.setProperties(properties);
        LOGGER.info("创建生产者参数 producerId={}}", producerId);
        return producerBean;
    }

    /**
     * 创建集群订阅消费者
     *
     * @param consumerId 消费者id
     * @return 消费者bean
     */
    protected ConsumerBean createConsumer(String consumerId, String consumeThreadNum,
                                          Map<Subscription, MessageListener> subscriptionTable) {
        ConsumerBean consumerBean = new ConsumerBean();
        Properties properties = this.createConsumerProperties(consumerId, consumeThreadNum);
        consumerBean.setProperties(properties);
        consumerBean.setSubscriptionTable(subscriptionTable);
        LOGGER.info("创建消费者参数 consumerId={}", consumerId);
        return consumerBean;
    }

    /**
     * 创建广播模式消费者
     *
     * @param consumerId        消费者id
     * @param consumeThreadNum  固定消费者线程数为xx个
     * @param subscriptionTable 消费监听Map
     * @return 消费者bean
     */
    protected ConsumerBean createBbRoadCastConsumer(String consumerId, String consumeThreadNum,
                                                    Map<Subscription, MessageListener> subscriptionTable) {
        ConsumerBean consumerBean = new ConsumerBean();
        Properties properties = this.createConsumerProperties(consumerId, consumeThreadNum, true);
        consumerBean.setProperties(properties);
        consumerBean.setSubscriptionTable(subscriptionTable);
        LOGGER.info("创建消费者参数 consumerId={}", consumerId);
        return consumerBean;
    }


    /**
     * 创建消费者属性参数
     * 默认 集群订阅
     *
     * @param consumerId       消费者id
     * @param consumeThreadNum 最大线程数
     * @return 消费者属性参数
     */
    private Properties createConsumerProperties(String consumerId, String consumeThreadNum) {
        return this.createConsumerProperties(consumerId, consumeThreadNum, false);
    }

    /**
     * 创建消费者属性参数
     *
     * @param consumerId       消费者id
     * @param consumeThreadNum 最大线程数
     * @param isBbRoadCast     是否是广播订阅队列
     * @return 消费者属性参数
     * 集群订阅:默认模式 ,相同消费者id所有消费者平均分摊消费消息。
     * 例如某个 Topic 有 9 条消息,一个 Consumer ID 有 3 个 Consumer 实例,
     * 那么在集群消费模式下每个实例平均分摊,只消费其中的 3 条消息。
     * <p>
     * 广播订阅:相同消费者id 所标识的所有消费者都会各自消费某条消息一次。
     * 例如某个 Topic 有 9 条消息,一个 Consumer ID 有 3 个 Consumer 实例,
     * 那么在广播消费模式下每个实例都会各自消费 9 条消息。
     */
    private Properties createConsumerProperties(String consumerId, String consumeThreadNum, boolean isBbRoadCast) {
        Properties properties = this.buildBaseProperties();
        //消费者 id
        properties.setProperty(PropertyKeyConst.ConsumerId, consumerId);
        //固定消费者线程数为xx个
        properties.setProperty(PropertyKeyConst.ConsumeThreadNums, consumeThreadNum);
        if (isBbRoadCast) {
            LOGGER.info("广播模式消费者 consumerId={}", consumerId);
            properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
        }
        return properties;
    }


    /**
     * 创建生产者属性参数
     *
     * @param producerId 生产者id
     * @return 生产者属性参数
     */
    private Properties createProducerProperties(String producerId) {
        Properties properties = this.buildBaseProperties();
        //生产者id
        properties.setProperty(PropertyKeyConst.ProducerId, producerId);
        return properties;
    }

    /**
     * 创建消费者监听Map
     * key:订阅相关类 (Subscription )
     * value: 消费者监听(MessageListener)
     *
     * @param topic     消息主题
     * @param listeners 监听队列,可设置多个
     * @return 消费者监听Map
     */
    protected Map<Subscription, MessageListener> createSubscriptionTable(String topic, MessageListener listeners) {
        //expression即Tag,可以设置成具体的Tag,如 taga||tagb||tagc,也可设置成*。
        // *仅代表订阅所有Tag,不支持通配
        return this.createSubscriptionTable(topic, STAR_FLOWER, listeners);
    }


    /**
     * 创建消费者监听Map
     * key:订阅相关类 (Subscription )
     * value: 消费者监听(MessageListener)
     *
     * @param topic     消息主题
     * @param tag       消息标签
     * @param listeners 监听队列,可设置多个
     * @return 消费者监听Map
     */
    protected Map<Subscription, MessageListener> createSubscriptionTable(String topic,
                                                                         String tag, MessageListener listeners) {
        //创建监听
        Subscription subscription = new Subscription();
        //主题
        subscription.setTopic(topic);
        //标签
        subscription.setExpression(tag);
        LOGGER.info("消费者创建 参数 subscription={}", subscription);
        Map<Subscription, MessageListener> subscriptionTable = new HashMap<>();
        if (!ObjectUtils.isEmpty(listeners)) {
            subscriptionTable.put(subscription, listeners);
        }
        LOGGER.info("消费者创建 参数 subscriptionTableSize={}", subscriptionTable.size());
        return subscriptionTable;
    }

    /**
     * 公共属性参数
     */
    private Properties buildBaseProperties() {
        Properties properties = new Properties();
        //阿里云 Access Key
        properties.setProperty(PropertyKeyConst.AccessKey, mqParamProperties.getAccesskey());
        //阿里云 Access secret key
        properties.setProperty(PropertyKeyConst.SecretKey, mqParamProperties.getSecretkey());
        return properties;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182

4.2 生产者配置

package com.boot.aliware.mq.config;
import com.aliyun.openservices.ons.api.bean.ProducerBean;
import com.boot.aliware.mq.config.param.MqBroadcastParamConfig;
import com.boot.aliware.mq.config.param.MqNormalParamConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 消息队列生产者配置
 */
@Configuration
public class MqProducerConfig extends MqBaseConfig {
    private static final Logger LOGGER = LoggerFactory.getLogger(MqProducerConfig.class);
    @Autowired
    private MqNormalParamConfig normalParamConfig;
    @Autowired
    private MqBroadcastParamConfig broadcastParamConfig;

    /**
     * 创建 普通生产者
     */
    @Bean(name = "normalProducer", initMethod = "start", destroyMethod = "shutdown")
    public ProducerBean normalProducer() {
        ProducerBean producerBean = this.createProducer(normalParamConfig.getProducerId());
        LOGGER.info("{} 生产者创建完毕", "normalProducer");
        return producerBean;
    }

    /**
     * 创建 广播订阅消息 生产者
     */
    @Bean(name = "broadcastProducer", initMethod = "start", destroyMethod = "shutdown")
    public ProducerBean broadcastProducer() {
        ProducerBean producerBean = this.createProducer(broadcastParamConfig.getProducerId());
        LOGGER.info("{} 生产者创建完毕", "broadcastProducer");
        return producerBean;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41

4.3 消费者配置

package com.boot.aliware.mq.config;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.bean.ConsumerBean;
import com.aliyun.openservices.ons.api.bean.Subscription;
import com.boot.aliware.mq.config.param.MqBroadcastParamConfig;
import com.boot.aliware.mq.config.param.MqNormalParamConfig;
import com.boot.aliware.mq.listener.BroadcastMessageListener;
import com.boot.aliware.mq.listener.NormalMessageListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.Map;

/**
 * 消息队列消费者配置
 */
@Configuration
public class MqConsumerConfig extends MqBaseConfig {
    private static final Logger LOGGER = LoggerFactory.getLogger(MqConsumerConfig.class);
    @Autowired
    private MqNormalParamConfig normalParamConfig;
    @Autowired
    private MqBroadcastParamConfig broadcastParamConfig;

    /**
     * 创建 普通消费者
     * 默认: 集群订阅(多实例可防重)
     */
    @Bean(name = "normalConsumer01", initMethod = "start", destroyMethod = "shutdown")
    public ConsumerBean normalConsumerBer() {
        LOGGER.info("{} 消费者创建开始", "normalConsumer");
        //消费固定线程数
        String consumeThreadNum = "1";
        Map<Subscription, MessageListener> subscriptionTable =
                this.createSubscriptionTable(normalParamConfig.getTopic(), new NormalMessageListener());
        //集群订阅消费者
        ConsumerBean consumerBean =
                this.createConsumer(normalParamConfig.getConsumerId(), consumeThreadNum, subscriptionTable);
        consumerBean.setSubscriptionTable(subscriptionTable);
        LOGGER.info("{} 消费者创建完毕", "normalConsumer");
        return consumerBean;
    }

    /**
     * 创建 广播消费者
     * 默认: 广播订阅
     */
    @Bean(name = "broadcastConsumer", initMethod = "start", destroyMethod = "shutdown")
    public ConsumerBean broadcastConsumer() {
        LOGGER.info("{} 广播订阅消费者创建开始", "broadcastConsumer");
        //消费固定线程数
        String consumeThreadNum = "3";
        Map<Subscription, MessageListener> subscriptionTable =
                this.createSubscriptionTable(broadcastParamConfig.getTopic(),
                        new BroadcastMessageListener());
        LOGGER.info("广播订阅消费者 size={}", subscriptionTable.size());
        //广播订阅消费者
        ConsumerBean consumerBean =
                this.createBbRoadCastConsumer(broadcastParamConfig.getConsumerId(), consumeThreadNum, subscriptionTable);
        consumerBean.setSubscriptionTable(subscriptionTable);
        LOGGER.info("{} 广播订阅消费者创建完毕", "broadcastConsumer");
        return consumerBean;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66

5.消息发送

5.1 消息发送抽象父类接口

package com.boot.aliware.mq.service.common;
import com.aliyun.openservices.ons.api.*;
import com.aliyun.openservices.ons.api.exception.ONSClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 发送消息到消息队列 抽象公共方法
 * 异步同步发送参考
 * https://help.aliyun.com/document_detail/29547.html?spm=a2c4g.11186623.6.568.67c2a3cbssAWx8
 *
 */
public class MqSendAbstractService {
    private Logger LOGGER = LoggerFactory.getLogger(MqSendAbstractService.class);

    /**
     * 同步发送消息
     */
    public boolean send(Message msg, Producer currentProducer) {
        try {
            //执行发送
            SendResult sendResult = currentProducer.send(msg);
            assert sendResult != null;
            LOGGER.info("列发送消息成功 sendResult={}", sendResult);
            return true;
        } catch (ONSClientException e) {
            System.out.println("发送失败");
            LOGGER.info("消息发送失败 ", e);
            //出现异常意味着发送失败,为了避免消息丢失,建议缓存该消息然后进行重试。
            return false;
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

5.2消息发送接口实现

package com.boot.aliware.mq.service.impl;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.Producer;
import com.boot.aliware.mq.config.param.MqBroadcastParamConfig;
import com.boot.aliware.mq.config.param.MqNormalParamConfig;
import com.boot.aliware.mq.service.common.MqSendAbstractService;
import com.boot.aliware.mq.service.MqSendService;
import com.boot.aliware.mq.util.DateUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import java.util.Date;

/**
 * 发送消息到消息队列接口实现
 *
 */
@Service
public class MqSendServiceImpl extends MqSendAbstractService implements MqSendService {

    private Logger LOGGER = LoggerFactory.getLogger(MqSendServiceImpl.class);

    @Autowired
    @Qualifier("normalProducer")
    private Producer normalProducer;
    @Autowired
    @Qualifier("broadcastProducer")
    private Producer broadcastProducer;

    @Autowired
    private MqNormalParamConfig normalParamConfig;

    @Autowired
    private MqBroadcastParamConfig broadcastParamConfig;
    /**
     * 普通消息
     *
     * @param mess 消息内容
     *             Message(String topic, String tags, byte[] body)  参数介绍
     *             topic :消息所属Topic (主题)
     *             tags :消息标签,二级消息类型,用来进一步区分某个 Topic 下的消息分类。
     *             对消息进行再归类,方便Consumer指定过滤条件在MQ服务器过滤
     *             body:消息体 可以是任何二进制形式的数据, MQ不做任何干预
     *             注意需要Producer与Consumer协商好一致的序列化和反序列化方式
     *             key:设置代表消息的业务关键属性,请尽可能全局唯一
     *             方便在无法正常收到消息情况下,可通过MQ 控制台查询消息并补发
     *             注意:不设置也不会影响消息正常收发
     */
    @Override
    public void sendNormalMess(String mess) {
        LOGGER.info("发送普通消息开始");
        Message msg = new Message(normalParamConfig.getTopic(),
                normalParamConfig.getTag(),
                mess.getBytes());
        LOGGER.info("普通消息 msg={}", msg);
        //消息表示前缀
        msg.setKey(normalParamConfig.getKeyPrefix().concat(mess));
        // 发送消息,只要不抛异常就是成功
        this.send(msg, normalProducer);
    }


    /**
     * 广播消息
     *
     * @param mess 消息内容
     */
    @Override
    public void sendBroadcastMess(String mess) {
        LOGGER.info("发送广播消息开始");
        Message msg = new Message(broadcastParamConfig.getTopic(),
                broadcastParamConfig.getTag(),
                mess.getBytes());
        LOGGER.info("广播消息 msg={}", msg);
        //消息表示前缀
        msg.setKey(broadcastParamConfig.getKeyPrefix().concat(mess));
        // 发送消息,只要不抛异常就是成功
        this.sendAsync(msg, broadcastProducer);
    }

    /**
     * 延时消息
     *
     * @param mess 消息内容
     */
    @Override
    public void sendDelayMess(String mess) {
        Message msg = new Message(normalParamConfig.getTopic(),
                normalParamConfig.getTag(),
                mess.getBytes());
        LOGGER.info("发送延时消息开始");
        //消息表示前缀
        msg.setKey(normalParamConfig.getKeyPrefix().concat(mess));
        //时间偏移1 分钟
        long time = DateUtil.offsetMinute(new Date(), 1).getTime();
        msg.setStartDeliverTime(time);

        LOGGER.info("延时消息 msg={}", msg);
        // 发送消息,只要不抛异常就是成功
        this.send(msg, normalProducer);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104

6.消息监听

6.1消息公共抽象父类监听

package com.boot.aliware.mq.listener;

import com.aliyun.openservices.ons.api.Message;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.UnsupportedEncodingException;

/**
 * 消息队列基础抽象监听公共父类
 * 注:抽离了一些公共的方法,以做复用
 *
 */
public class MqBaseListener {

    public static final Logger LOGGER = LoggerFactory.getLogger(MqBaseListener.class);

    private Integer FIVE_TIMES = 5;


    /**
     * 获取字符串类型消息体
     *
     * @param message 消息信息
     */
    public String getStringMess(Message message) {
        //获取消息转成字符串
        String msg = null;
        try {
            msg = new String(message.getBody(), "utf-8");
        } catch (UnsupportedEncodingException e) {
            String stack = ExceptionUtils.getMessage(e);
            LOGGER.info("消息监听->[获取消息体异常] stack={}", stack);
        }
        return msg;
    }


    /**
     * 是否可以重试 5 次
     *
     * @param runTime 当前执行次数
     * @return
     */
    public Boolean canRetryFiveTimes(int runTime) {
        return this.canRetryTimes(runTime, FIVE_TIMES);
    }

    /**
     * 是否可以重试;
     *
     * @param runTime    当前执行次数
     * @param retryTimes 重试次数
     */
    public Boolean canRetryTimes(int runTime, int retryTimes) {
        if (runTime < retryTimes) {
            return true;
        }
        return false;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61

6.2 普通消息队列监听示例

package com.boot.aliware.mq.listener;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.boot.aliware.mq.util.DateUtil;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 普通消息队列监听
 *
 * @author mengqiang
 * @version NormalMessageListener.java, v 2.0 2018-10-13 13:55
 */
public class NormalMessageListener extends MqBaseListener implements MessageListener {
    public static final Logger LOGGER = LoggerFactory.getLogger(NormalMessageListener.class);

    /**
     * 监听
     */
    @Override
    public Action consume(Message message, ConsumeContext consumeContext) {
        LOGGER.info("进入普通消息队列监听 ");
        LOGGER.info("消息 id={},执行Host={}", message.getMsgID(), message.getBornHost());
        LOGGER.info("消息 Topic={},Tag={}", message.getTopic(), message.getTag());
        LOGGER.info("消息生成时间={}", DateUtil.formatTimeStamp(message.getBornTimestamp()));
        LOGGER.info("消息执行次数={}", message.getReconsumeTimes());

        //获取消息转成字符串
        String srtMsg = this.getStringMess(message);
        if (null == srtMsg) {
            //消息体获取失败-> 进行重试
            return Action.ReconsumeLater;
        }
        //是否执行成功
        boolean successFlg = true;
        try {
            //反序列化为对象

            //执行处理消息
            LOGGER.info("此处模拟消息处理代码");

        } catch (Exception e) {
            successFlg = false;
            String stack = ExceptionUtils.getMessage(e);
            LOGGER.info("用户阅读分享记录消息->[消费处理异常] {}", stack);

        }

        //判断当前执行次数是否达到上限
        boolean canRetry = this.canRetryFiveTimes(message.getReconsumeTimes());
        //执行失败并且重试次数小于5 次 -> 进行消息重试
        if (!successFlg && canRetry) {
            return Action.ReconsumeLater;
        }
        LOGGER.debug("消息处理成功");
        return Action.CommitMessage;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61

7. 消息生产测试用Controller

package com.boot.aliware.mq.controller;

import com.boot.aliware.mq.service.MqSendService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

/**
 * 普通消息队列生产 controller
 *
 */
@RestController
public class MessageProductController {

    private Logger LOGGER = LoggerFactory.getLogger(MessageProductController.class);

    @Autowired
    private MqSendService mqSendService;

    /**
     * 发送普通消息到队列
     */
    @RequestMapping("/send/normal/mess")
    public String sendNormalMess(@RequestParam("mess") String mess) {
        mqSendService.sendNormalMess(mess);
        return "success";
    }

    /**
     * 发送广播消息到队列
     */
    @RequestMapping("/send/broadcast/mess")
    public String sendBroadcastMess(@RequestParam("mess") String mess) {
        mqSendService.sendBroadcastMess(mess);
        return "success";
    }

    /**
     * 批量发送普通消息到队列
     */
    @RequestMapping("/send/many-normal/mess")
    public String sendManyNormalMess(@RequestParam("mess") String mess) {
        LOGGER.info("批量发送消息测试开始了 ");
        for (int i = 0; i < 10; i++) {
            mqSendService.sendNormalMess(mess.concat(String.valueOf(i)));
        }
        LOGGER.info("批量发送消息测试完毕了 ");
        return "success";
    }

    /**
     * 发送延时消息到队列
     */
    @RequestMapping("/send/delay/mess")
    public String sendDelayMess(@RequestParam("mess") String mess) {
        mqSendService.sendDelayMess(mess);
        return "success";
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/484960
推荐阅读
相关标签
  

闽ICP备14008679号