赞
踩
本文主要介绍SpringBoot整合阿里云消息队列的使用
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.10.RELEASE</version> <relativePath/> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <ons-client.version>1.7.8.Final</ons-client.version> <commons-lang3.version>3.4</commons-lang3.version> <hutool-all.version>3.0.9</hutool-all.version> <fastjson.version>1.2.47</fastjson.version> <java.version>1.8</java.version> </properties> <dependencies> <!-- springBoot 相关 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> <!-- ons-client --> <dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>ons-client</artifactId> <version>${ons-client.version}</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>${commons-lang3.version}</version> </dependency> <!-- hutool 工具 --> <dependency> <groupId>com.xiaoleilu</groupId> <artifactId>hutool-all</artifactId> <version>${hutool-all.version}</version> </dependency> <!-- json 工具 --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>${fastjson.version}</version> </dependency> </dependencies>
server.port=9010 #阿里云 Access Key aliyun.mq.accesskey=xxxxxx ##阿里云 Access Key Secret aliyun.mq.secretkey=xxxxxx #消息队列信息 aliyun.mq.normal.topic=My_Mq_Test_Topic_01 aliyun.mq.normal.tag=TagA aliyun.mq.normal.producerId=PID_My_Mq_test_Producer_01 aliyun.mq.normal.consumerId=CID_My_Mq_Test_Topic_01 aliyun.mq.normal.keyPrefix=Mq_Test_01_ aliyun.mq.broadcast.topic=My_Mq_Test_Topic_02 aliyun.mq.broadcast.tag=TagA aliyun.mq.broadcast.producerId=PID_My_Mq_test_Producer_02 aliyun.mq.broadcast.consumerId=CID_My_Mq_Test_Topic_02 aliyun.mq.broadcast.keyPrefix=Mq_Test_02_ #日志 logging.level.root=WARN logging.level.com.boot.aliware.mq=DEBUG
@Configuration
@ConfigurationProperties(prefix = "aliyun.mq")
public class AliyunAccountConfig {
/**
* 阿里云 Access Key
*/
private String accesskey;
/**
* 阿里云 Access Key Secret
*/
private String secretkey;
//省略get/set方法
}
public class MqBaseProperties { /** * 队列主题 */ private String topic; /** * 队列标签 */ private String tag; /** * 生产者id */ private String producerId; /** * 消费者id */ private String consumerId; /** * 设置代表消息的业务关键属性前缀,尽可能全局唯一 * 以方便您在无法正常收到消息情况下,可通过MQ 控制台查询消息并补发 */ private String keyPrefix; //省略get/set 方法 }
@Configuration
@ConfigurationProperties(prefix = "aliyun.mq.normal")
public class MqNormalParamConfig extends MqBaseProperties {
}
@Configuration
@ConfigurationProperties(prefix = "aliyun.mq.broadcast")
public class MqBroadcastParamConfig extends MqBaseProperties {
}
package com.boot.aliware.mq.config; import com.aliyun.openservices.ons.api.MessageListener; import com.aliyun.openservices.ons.api.PropertyKeyConst; import com.aliyun.openservices.ons.api.PropertyValueConst; import com.aliyun.openservices.ons.api.bean.ConsumerBean; import com.aliyun.openservices.ons.api.bean.ProducerBean; import com.aliyun.openservices.ons.api.bean.Subscription; import com.boot.aliware.mq.config.param.AliyunAccountConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.util.ObjectUtils; import java.util.*; /** * 消息队列基础配置 父类 */ public class MqBaseConfig { private static final Logger LOGGER = LoggerFactory.getLogger(MqBaseConfig.class); private final String STAR_FLOWER = "*"; @Autowired private AliyunAccountConfig mqParamProperties; /** * 创建生产者 * * @param producerId 生产者id * @return 生产者bean */ protected ProducerBean createProducer(String producerId) { ProducerBean producerBean = new ProducerBean(); Properties properties = this.createProducerProperties(producerId); producerBean.setProperties(properties); LOGGER.info("创建生产者参数 producerId={}}", producerId); return producerBean; } /** * 创建集群订阅消费者 * * @param consumerId 消费者id * @return 消费者bean */ protected ConsumerBean createConsumer(String consumerId, String consumeThreadNum, Map<Subscription, MessageListener> subscriptionTable) { ConsumerBean consumerBean = new ConsumerBean(); Properties properties = this.createConsumerProperties(consumerId, consumeThreadNum); consumerBean.setProperties(properties); consumerBean.setSubscriptionTable(subscriptionTable); LOGGER.info("创建消费者参数 consumerId={}", consumerId); return consumerBean; } /** * 创建广播模式消费者 * * @param consumerId 消费者id * @param consumeThreadNum 固定消费者线程数为xx个 * @param subscriptionTable 消费监听Map * @return 消费者bean */ protected ConsumerBean createBbRoadCastConsumer(String consumerId, String consumeThreadNum, Map<Subscription, MessageListener> subscriptionTable) { ConsumerBean consumerBean = new ConsumerBean(); Properties properties = this.createConsumerProperties(consumerId, consumeThreadNum, true); consumerBean.setProperties(properties); consumerBean.setSubscriptionTable(subscriptionTable); LOGGER.info("创建消费者参数 consumerId={}", consumerId); return consumerBean; } /** * 创建消费者属性参数 * 默认 集群订阅 * * @param consumerId 消费者id * @param consumeThreadNum 最大线程数 * @return 消费者属性参数 */ private Properties createConsumerProperties(String consumerId, String consumeThreadNum) { return this.createConsumerProperties(consumerId, consumeThreadNum, false); } /** * 创建消费者属性参数 * * @param consumerId 消费者id * @param consumeThreadNum 最大线程数 * @param isBbRoadCast 是否是广播订阅队列 * @return 消费者属性参数 * 集群订阅:默认模式 ,相同消费者id所有消费者平均分摊消费消息。 * 例如某个 Topic 有 9 条消息,一个 Consumer ID 有 3 个 Consumer 实例, * 那么在集群消费模式下每个实例平均分摊,只消费其中的 3 条消息。 * <p> * 广播订阅:相同消费者id 所标识的所有消费者都会各自消费某条消息一次。 * 例如某个 Topic 有 9 条消息,一个 Consumer ID 有 3 个 Consumer 实例, * 那么在广播消费模式下每个实例都会各自消费 9 条消息。 */ private Properties createConsumerProperties(String consumerId, String consumeThreadNum, boolean isBbRoadCast) { Properties properties = this.buildBaseProperties(); //消费者 id properties.setProperty(PropertyKeyConst.ConsumerId, consumerId); //固定消费者线程数为xx个 properties.setProperty(PropertyKeyConst.ConsumeThreadNums, consumeThreadNum); if (isBbRoadCast) { LOGGER.info("广播模式消费者 consumerId={}", consumerId); properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING); } return properties; } /** * 创建生产者属性参数 * * @param producerId 生产者id * @return 生产者属性参数 */ private Properties createProducerProperties(String producerId) { Properties properties = this.buildBaseProperties(); //生产者id properties.setProperty(PropertyKeyConst.ProducerId, producerId); return properties; } /** * 创建消费者监听Map * key:订阅相关类 (Subscription ) * value: 消费者监听(MessageListener) * * @param topic 消息主题 * @param listeners 监听队列,可设置多个 * @return 消费者监听Map */ protected Map<Subscription, MessageListener> createSubscriptionTable(String topic, MessageListener listeners) { //expression即Tag,可以设置成具体的Tag,如 taga||tagb||tagc,也可设置成*。 // *仅代表订阅所有Tag,不支持通配 return this.createSubscriptionTable(topic, STAR_FLOWER, listeners); } /** * 创建消费者监听Map * key:订阅相关类 (Subscription ) * value: 消费者监听(MessageListener) * * @param topic 消息主题 * @param tag 消息标签 * @param listeners 监听队列,可设置多个 * @return 消费者监听Map */ protected Map<Subscription, MessageListener> createSubscriptionTable(String topic, String tag, MessageListener listeners) { //创建监听 Subscription subscription = new Subscription(); //主题 subscription.setTopic(topic); //标签 subscription.setExpression(tag); LOGGER.info("消费者创建 参数 subscription={}", subscription); Map<Subscription, MessageListener> subscriptionTable = new HashMap<>(); if (!ObjectUtils.isEmpty(listeners)) { subscriptionTable.put(subscription, listeners); } LOGGER.info("消费者创建 参数 subscriptionTableSize={}", subscriptionTable.size()); return subscriptionTable; } /** * 公共属性参数 */ private Properties buildBaseProperties() { Properties properties = new Properties(); //阿里云 Access Key properties.setProperty(PropertyKeyConst.AccessKey, mqParamProperties.getAccesskey()); //阿里云 Access secret key properties.setProperty(PropertyKeyConst.SecretKey, mqParamProperties.getSecretkey()); return properties; } }
4.2 生产者配置
package com.boot.aliware.mq.config; import com.aliyun.openservices.ons.api.bean.ProducerBean; import com.boot.aliware.mq.config.param.MqBroadcastParamConfig; import com.boot.aliware.mq.config.param.MqNormalParamConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * 消息队列生产者配置 */ @Configuration public class MqProducerConfig extends MqBaseConfig { private static final Logger LOGGER = LoggerFactory.getLogger(MqProducerConfig.class); @Autowired private MqNormalParamConfig normalParamConfig; @Autowired private MqBroadcastParamConfig broadcastParamConfig; /** * 创建 普通生产者 */ @Bean(name = "normalProducer", initMethod = "start", destroyMethod = "shutdown") public ProducerBean normalProducer() { ProducerBean producerBean = this.createProducer(normalParamConfig.getProducerId()); LOGGER.info("{} 生产者创建完毕", "normalProducer"); return producerBean; } /** * 创建 广播订阅消息 生产者 */ @Bean(name = "broadcastProducer", initMethod = "start", destroyMethod = "shutdown") public ProducerBean broadcastProducer() { ProducerBean producerBean = this.createProducer(broadcastParamConfig.getProducerId()); LOGGER.info("{} 生产者创建完毕", "broadcastProducer"); return producerBean; } }
package com.boot.aliware.mq.config; import com.aliyun.openservices.ons.api.MessageListener; import com.aliyun.openservices.ons.api.bean.ConsumerBean; import com.aliyun.openservices.ons.api.bean.Subscription; import com.boot.aliware.mq.config.param.MqBroadcastParamConfig; import com.boot.aliware.mq.config.param.MqNormalParamConfig; import com.boot.aliware.mq.listener.BroadcastMessageListener; import com.boot.aliware.mq.listener.NormalMessageListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.Map; /** * 消息队列消费者配置 */ @Configuration public class MqConsumerConfig extends MqBaseConfig { private static final Logger LOGGER = LoggerFactory.getLogger(MqConsumerConfig.class); @Autowired private MqNormalParamConfig normalParamConfig; @Autowired private MqBroadcastParamConfig broadcastParamConfig; /** * 创建 普通消费者 * 默认: 集群订阅(多实例可防重) */ @Bean(name = "normalConsumer01", initMethod = "start", destroyMethod = "shutdown") public ConsumerBean normalConsumerBer() { LOGGER.info("{} 消费者创建开始", "normalConsumer"); //消费固定线程数 String consumeThreadNum = "1"; Map<Subscription, MessageListener> subscriptionTable = this.createSubscriptionTable(normalParamConfig.getTopic(), new NormalMessageListener()); //集群订阅消费者 ConsumerBean consumerBean = this.createConsumer(normalParamConfig.getConsumerId(), consumeThreadNum, subscriptionTable); consumerBean.setSubscriptionTable(subscriptionTable); LOGGER.info("{} 消费者创建完毕", "normalConsumer"); return consumerBean; } /** * 创建 广播消费者 * 默认: 广播订阅 */ @Bean(name = "broadcastConsumer", initMethod = "start", destroyMethod = "shutdown") public ConsumerBean broadcastConsumer() { LOGGER.info("{} 广播订阅消费者创建开始", "broadcastConsumer"); //消费固定线程数 String consumeThreadNum = "3"; Map<Subscription, MessageListener> subscriptionTable = this.createSubscriptionTable(broadcastParamConfig.getTopic(), new BroadcastMessageListener()); LOGGER.info("广播订阅消费者 size={}", subscriptionTable.size()); //广播订阅消费者 ConsumerBean consumerBean = this.createBbRoadCastConsumer(broadcastParamConfig.getConsumerId(), consumeThreadNum, subscriptionTable); consumerBean.setSubscriptionTable(subscriptionTable); LOGGER.info("{} 广播订阅消费者创建完毕", "broadcastConsumer"); return consumerBean; } }
5.1 消息发送抽象父类接口
package com.boot.aliware.mq.service.common; import com.aliyun.openservices.ons.api.*; import com.aliyun.openservices.ons.api.exception.ONSClientException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * 发送消息到消息队列 抽象公共方法 * 异步同步发送参考 * https://help.aliyun.com/document_detail/29547.html?spm=a2c4g.11186623.6.568.67c2a3cbssAWx8 * */ public class MqSendAbstractService { private Logger LOGGER = LoggerFactory.getLogger(MqSendAbstractService.class); /** * 同步发送消息 */ public boolean send(Message msg, Producer currentProducer) { try { //执行发送 SendResult sendResult = currentProducer.send(msg); assert sendResult != null; LOGGER.info("列发送消息成功 sendResult={}", sendResult); return true; } catch (ONSClientException e) { System.out.println("发送失败"); LOGGER.info("消息发送失败 ", e); //出现异常意味着发送失败,为了避免消息丢失,建议缓存该消息然后进行重试。 return false; } }
5.2消息发送接口实现
package com.boot.aliware.mq.service.impl; import com.aliyun.openservices.ons.api.Message; import com.aliyun.openservices.ons.api.Producer; import com.boot.aliware.mq.config.param.MqBroadcastParamConfig; import com.boot.aliware.mq.config.param.MqNormalParamConfig; import com.boot.aliware.mq.service.common.MqSendAbstractService; import com.boot.aliware.mq.service.MqSendService; import com.boot.aliware.mq.util.DateUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Service; import java.util.Date; /** * 发送消息到消息队列接口实现 * */ @Service public class MqSendServiceImpl extends MqSendAbstractService implements MqSendService { private Logger LOGGER = LoggerFactory.getLogger(MqSendServiceImpl.class); @Autowired @Qualifier("normalProducer") private Producer normalProducer; @Autowired @Qualifier("broadcastProducer") private Producer broadcastProducer; @Autowired private MqNormalParamConfig normalParamConfig; @Autowired private MqBroadcastParamConfig broadcastParamConfig; /** * 普通消息 * * @param mess 消息内容 * Message(String topic, String tags, byte[] body) 参数介绍 * topic :消息所属Topic (主题) * tags :消息标签,二级消息类型,用来进一步区分某个 Topic 下的消息分类。 * 对消息进行再归类,方便Consumer指定过滤条件在MQ服务器过滤 * body:消息体 可以是任何二进制形式的数据, MQ不做任何干预 * 注意需要Producer与Consumer协商好一致的序列化和反序列化方式 * key:设置代表消息的业务关键属性,请尽可能全局唯一 * 方便在无法正常收到消息情况下,可通过MQ 控制台查询消息并补发 * 注意:不设置也不会影响消息正常收发 */ @Override public void sendNormalMess(String mess) { LOGGER.info("发送普通消息开始"); Message msg = new Message(normalParamConfig.getTopic(), normalParamConfig.getTag(), mess.getBytes()); LOGGER.info("普通消息 msg={}", msg); //消息表示前缀 msg.setKey(normalParamConfig.getKeyPrefix().concat(mess)); // 发送消息,只要不抛异常就是成功 this.send(msg, normalProducer); } /** * 广播消息 * * @param mess 消息内容 */ @Override public void sendBroadcastMess(String mess) { LOGGER.info("发送广播消息开始"); Message msg = new Message(broadcastParamConfig.getTopic(), broadcastParamConfig.getTag(), mess.getBytes()); LOGGER.info("广播消息 msg={}", msg); //消息表示前缀 msg.setKey(broadcastParamConfig.getKeyPrefix().concat(mess)); // 发送消息,只要不抛异常就是成功 this.sendAsync(msg, broadcastProducer); } /** * 延时消息 * * @param mess 消息内容 */ @Override public void sendDelayMess(String mess) { Message msg = new Message(normalParamConfig.getTopic(), normalParamConfig.getTag(), mess.getBytes()); LOGGER.info("发送延时消息开始"); //消息表示前缀 msg.setKey(normalParamConfig.getKeyPrefix().concat(mess)); //时间偏移1 分钟 long time = DateUtil.offsetMinute(new Date(), 1).getTime(); msg.setStartDeliverTime(time); LOGGER.info("延时消息 msg={}", msg); // 发送消息,只要不抛异常就是成功 this.send(msg, normalProducer); } }
package com.boot.aliware.mq.listener; import com.aliyun.openservices.ons.api.Message; import org.apache.commons.lang3.exception.ExceptionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.UnsupportedEncodingException; /** * 消息队列基础抽象监听公共父类 * 注:抽离了一些公共的方法,以做复用 * */ public class MqBaseListener { public static final Logger LOGGER = LoggerFactory.getLogger(MqBaseListener.class); private Integer FIVE_TIMES = 5; /** * 获取字符串类型消息体 * * @param message 消息信息 */ public String getStringMess(Message message) { //获取消息转成字符串 String msg = null; try { msg = new String(message.getBody(), "utf-8"); } catch (UnsupportedEncodingException e) { String stack = ExceptionUtils.getMessage(e); LOGGER.info("消息监听->[获取消息体异常] stack={}", stack); } return msg; } /** * 是否可以重试 5 次 * * @param runTime 当前执行次数 * @return */ public Boolean canRetryFiveTimes(int runTime) { return this.canRetryTimes(runTime, FIVE_TIMES); } /** * 是否可以重试; * * @param runTime 当前执行次数 * @param retryTimes 重试次数 */ public Boolean canRetryTimes(int runTime, int retryTimes) { if (runTime < retryTimes) { return true; } return false; } }
package com.boot.aliware.mq.listener; import com.aliyun.openservices.ons.api.Action; import com.aliyun.openservices.ons.api.ConsumeContext; import com.aliyun.openservices.ons.api.Message; import com.aliyun.openservices.ons.api.MessageListener; import com.boot.aliware.mq.util.DateUtil; import org.apache.commons.lang3.exception.ExceptionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * 普通消息队列监听 * * @author mengqiang * @version NormalMessageListener.java, v 2.0 2018-10-13 13:55 */ public class NormalMessageListener extends MqBaseListener implements MessageListener { public static final Logger LOGGER = LoggerFactory.getLogger(NormalMessageListener.class); /** * 监听 */ @Override public Action consume(Message message, ConsumeContext consumeContext) { LOGGER.info("进入普通消息队列监听 "); LOGGER.info("消息 id={},执行Host={}", message.getMsgID(), message.getBornHost()); LOGGER.info("消息 Topic={},Tag={}", message.getTopic(), message.getTag()); LOGGER.info("消息生成时间={}", DateUtil.formatTimeStamp(message.getBornTimestamp())); LOGGER.info("消息执行次数={}", message.getReconsumeTimes()); //获取消息转成字符串 String srtMsg = this.getStringMess(message); if (null == srtMsg) { //消息体获取失败-> 进行重试 return Action.ReconsumeLater; } //是否执行成功 boolean successFlg = true; try { //反序列化为对象 //执行处理消息 LOGGER.info("此处模拟消息处理代码"); } catch (Exception e) { successFlg = false; String stack = ExceptionUtils.getMessage(e); LOGGER.info("用户阅读分享记录消息->[消费处理异常] {}", stack); } //判断当前执行次数是否达到上限 boolean canRetry = this.canRetryFiveTimes(message.getReconsumeTimes()); //执行失败并且重试次数小于5 次 -> 进行消息重试 if (!successFlg && canRetry) { return Action.ReconsumeLater; } LOGGER.debug("消息处理成功"); return Action.CommitMessage; } }
package com.boot.aliware.mq.controller; import com.boot.aliware.mq.service.MqSendService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; /** * 普通消息队列生产 controller * */ @RestController public class MessageProductController { private Logger LOGGER = LoggerFactory.getLogger(MessageProductController.class); @Autowired private MqSendService mqSendService; /** * 发送普通消息到队列 */ @RequestMapping("/send/normal/mess") public String sendNormalMess(@RequestParam("mess") String mess) { mqSendService.sendNormalMess(mess); return "success"; } /** * 发送广播消息到队列 */ @RequestMapping("/send/broadcast/mess") public String sendBroadcastMess(@RequestParam("mess") String mess) { mqSendService.sendBroadcastMess(mess); return "success"; } /** * 批量发送普通消息到队列 */ @RequestMapping("/send/many-normal/mess") public String sendManyNormalMess(@RequestParam("mess") String mess) { LOGGER.info("批量发送消息测试开始了 "); for (int i = 0; i < 10; i++) { mqSendService.sendNormalMess(mess.concat(String.valueOf(i))); } LOGGER.info("批量发送消息测试完毕了 "); return "success"; } /** * 发送延时消息到队列 */ @RequestMapping("/send/delay/mess") public String sendDelayMess(@RequestParam("mess") String mess) { mqSendService.sendDelayMess(mess); return "success"; } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。