赞
踩
最近由于公司技术框架更新,原先的RabbitMQ需要换成RocketMQ,两者原理和使用都大同小异,业务简单的话,切换起来成本也还好
当前版本为:
<aliyun-ons.version>1.8.4.Final</aliyun-ons.version>
<!--阿里云RocketMQ-->
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>ons-client</artifactId>
<version>${aliyun-ons.version}</version>
</dependency>
实际使用中需要替换你的阿里云配置,包括API鉴权和实例TCP协议接入地址
server: port: 8082 #配置队列 aliyun: mq: # API鉴权 accessKeyId: xxx accessKeySecret: xxx # 实例TCP协议接入地址(内网) nameSrvAddr: http://MQ_INST_xxx_BXQwkNTl.cn-shanghai.mq-internal.aliyuncs.com:8080 # 普通消息-短信 sms: topic: sms tag: '*' groupId: GID_message # 普通消息-邮件 email: topic: email tag: '*' groupId: GID_message # 定时/延时消息 time: topic: time tag: '*' groupId: GID_message
yml配置映射java类
import com.aliyun.openservices.ons.api.PropertyKeyConst; import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.Properties; /** * 阿里云MQ配置 * * @author jason */ @Data @Configuration @ConfigurationProperties(prefix = "aliyun.mq") public class AliyunMQConfig { /** * 阿里云 oss 公钥 */ private String accessKeyId; /** * 阿里云 oss 私钥 */ private String accessKeySecret; /** * 实例TCP协议接入地址(内网) */ private String nameSrvAddr; public Properties getMqProperties() { Properties properties = new Properties(); properties.setProperty(PropertyKeyConst.AccessKey, this.accessKeyId); properties.setProperty(PropertyKeyConst.SecretKey, this.accessKeySecret); properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, this.nameSrvAddr); // 设置发送超时时间,单位毫秒 properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000"); return properties; } /** * 获取消息队列配置 */ @Data public static class TopicProperties { private String topic; private String groupId; private String tag; } /** * 短信 */ @Bean @ConfigurationProperties(prefix = "aliyun.mq.sms") public TopicProperties smsTopicProperties() { return new TopicProperties(); } /** * 邮件 */ @Bean @ConfigurationProperties(prefix = "aliyun.mq.email") public TopicProperties emailTopicProperties() { return new TopicProperties(); } /** * 定时/延时消息 */ @Bean @ConfigurationProperties(prefix = "aliyun.mq.time") public TopicProperties timeTopicProperties() { return new TopicProperties(); } }
通过buildProducer注册成为生产者
import com.aliyun.openservices.ons.api.bean.ProducerBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * 阿里云MQ-生产者 * * @author jason */ @Configuration public class ProducerClient { @Autowired private AliyunMQConfig aliyunMQConfig; @Bean(initMethod = "start", destroyMethod = "shutdown") public ProducerBean buildProducer() { ProducerBean producer = new ProducerBean(); producer.setProperties(aliyunMQConfig.getMqProperties()); return producer; } }
封装生产者发送消息工具类
import com.aliyun.openservices.ons.api.Message; import com.aliyun.openservices.ons.api.SendResult; import com.aliyun.openservices.ons.api.bean.ProducerBean; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * 生产者发送消息的工具类 * * @author jason */ @Slf4j @Component public class ProducerUtil { @Autowired private ProducerBean producer; /** * 同步发送消息 * * @param topic topic名 * @param msgTag 标签,可用于消息小分类标注 * @param messageBody 消息body内容,生产者自定义内容 * @param msgKey 消息key值,建议设置全局唯一,可不传,不影响消息投递 * @return success:SendResult or error:null */ public SendResult sendMsg(String topic, String msgTag, byte[] messageBody, String msgKey) { Message msg = new Message(topic, msgTag, msgKey, messageBody); return this.send(msg, Boolean.FALSE); } /** * 同步发送单向消息 * * @param topic topic名 * @param msgTag 标签,可用于消息小分类标注 * @param messageBody 消息body内容,生产者自定义内容 * @param msgKey 消息key值,建议设置全局唯一,可不传,不影响消息投递 */ public void sendOneWayMsg(String topic, String msgTag, byte[] messageBody, String msgKey) { Message msg = new Message(topic, msgTag, msgKey, messageBody); this.send(msg, Boolean.TRUE); } /** * 同步发送定时/延时消息 * * @param topic * @param msgTag 标签,可用于消息小分类标注,对消息进行再归类 * @param messageBody 消息body内容,生产者自定义内容,二进制形式的数据 * @param msgKey 消息key值,建议设置全局唯一值,可不设置,不影响消息收发 * @param delayTime 服务端发送消息时间,立即发送输入0或比更早的时间 * @return success:SendResult or error:null */ public SendResult sendTimeMsg(String topic, String msgTag, byte[] messageBody, String msgKey, long delayTime) { Message msg = new Message(topic, msgTag, msgKey, messageBody); msg.setStartDeliverTime(delayTime); return this.send(msg, Boolean.FALSE); } /** * 发送普通消息 * * @param msg 消息 * @param isOneWay 是否单向发送 */ private SendResult send(Message msg, Boolean isOneWay) { try { if (isOneWay) { //由于在 oneway 方式发送消息时没有请求应答处理,一旦出现消息发送失败,则会因为没有重试而导致数据丢失。 //若数据不可丢,建议选用同步或异步发送方式。 producer.sendOneway(msg); success(msg, "单向消息MsgId不返回"); return null; } else { //可靠同步发送 SendResult sendResult = producer.send(msg); //获取发送结果,不抛异常即发送成功 assert sendResult != null; success(msg, sendResult.getMessageId()); return sendResult; } } catch (Exception e) { error(msg, e); return null; } } /** * 成功日志打印 * * @param msg * @param messageId */ private void success(Message msg, String messageId) { log.info("发送MQ消息成功 -- Topic:{} ,msgId:{} , Key:{}, tag:{}, body:{}" , msg.getTopic(), messageId, msg.getKey(), msg.getTag(), new String(msg.getBody())); } /** * 异常日志打印 * * @param msg * @param e */ private void error(Message msg, Exception e) { log.error("发送MQ消息失败-- Topic:{}, Key:{}, tag:{}, body:{}" , msg.getTopic(), msg.getKey(), msg.getTag(), new String(msg.getBody())); log.error("errorMsg", e); } }
这里订阅一个消费者监听3条队列
import com.aliyun.openservices.ons.api.MessageListener; import com.aliyun.openservices.ons.api.PropertyKeyConst; import com.aliyun.openservices.ons.api.bean.ConsumerBean; import com.aliyun.openservices.ons.api.bean.Subscription; import com.example.aliyunmqdemo.normal.EmailMqMessageListener; import com.example.aliyunmqdemo.normal.SmsMqMessageListener; import com.example.aliyunmqdemo.time.TimeMqMessageListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; import java.util.Properties; /** * 阿里云MQ-订阅消费者监听 * * @author jason */ @Configuration public class ConsumerClient { @Autowired private AliyunMQConfig aliyunMQConfig; @Autowired @Qualifier("smsTopicProperties") private AliyunMQConfig.TopicProperties smsTopicProperties; @Autowired @Qualifier("emailTopicProperties") private AliyunMQConfig.TopicProperties emailTopicProperties; @Autowired @Qualifier("timeTopicProperties") private AliyunMQConfig.TopicProperties timeTopicProperties; @Autowired private EmailMqMessageListener emailMqMessageListener; @Autowired private SmsMqMessageListener smsMqMessageListener; @Autowired private TimeMqMessageListener timeMqMessageListener; @Bean(initMethod = "start", destroyMethod = "shutdown") public ConsumerBean messageBuildConsumer() { ConsumerBean consumerBean = new ConsumerBean(); //配置文件 Properties properties = aliyunMQConfig.getMqProperties(); //消费者 properties.setProperty(PropertyKeyConst.GROUP_ID, smsTopicProperties.getGroupId()); //设置消费者线程数为20个(默认20) properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "20"); consumerBean.setProperties(properties); //订阅消息 Map<Subscription, MessageListener> subscriptionTable = new HashMap<>(); //订阅短信消息 Subscription smsSubscription = new Subscription(); smsSubscription.setTopic(smsTopicProperties.getTopic()); smsSubscription.setExpression(smsTopicProperties.getTag()); subscriptionTable.put(smsSubscription, smsMqMessageListener); //订阅邮件消息 Subscription emailSubscription = new Subscription(); emailSubscription.setTopic(emailTopicProperties.getTopic()); emailSubscription.setExpression(emailTopicProperties.getTag()); subscriptionTable.put(emailSubscription, emailMqMessageListener); //订阅定时/延时消息 Subscription timeSubscription = new Subscription(); timeSubscription.setTopic(timeTopicProperties.getTopic()); timeSubscription.setExpression(timeTopicProperties.getTag()); subscriptionTable.put(timeSubscription, timeMqMessageListener); consumerBean.setSubscriptionTable(subscriptionTable); return consumerBean; } }
定义接收MQ消息监听器
import com.aliyun.openservices.ons.api.Action; import com.aliyun.openservices.ons.api.ConsumeContext; import com.aliyun.openservices.ons.api.Message; import com.aliyun.openservices.ons.api.MessageListener; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; /** * 普通(默认同步)MQ消息监听消费 * 【邮件】 * * @author jason */ @Slf4j @Component public class EmailMqMessageListener implements MessageListener { @Override public Action consume(Message message, ConsumeContext context) { log.info("【邮件】接收到MQ详细信息:{}", message); log.info("解析MQ-Body自定义内容:{}", new String(message.getBody())); try { //do something.. return Action.CommitMessage; } catch (Exception e) { log.error("消费MQ消息失败,msgId:" + message.getMsgID() + ",ExceptionMsg:" + e.getMessage()); return Action.ReconsumeLater; } } }
SmsMqMessageListener.java
TimeMqMessageListener.java
替换类名即可,代码相同
添加三条Topic
添加一个GroupID
启动spring-boot服务后,我们可以在阿里云控制台看到消费者的状态:在线 则代表服务启动成功了
点击详细信息,可以看到订阅关系,这里表示该GroupID订阅了三条Topic
import com.aliyun.openservices.ons.api.SendResult; import com.aliyun.openservices.shade.com.alibaba.fastjson.JSONObject; import com.example.aliyunmqdemo.mq.AliyunMQConfig; import com.example.aliyunmqdemo.utils.ProducerUtil; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.test.context.SpringBootTest; import java.util.UUID; @SpringBootTest class AliyunMqDemoApplicationTests { @Autowired @Qualifier("smsTopicProperties") private AliyunMQConfig.TopicProperties smsTopicProperties; @Autowired @Qualifier("emailTopicProperties") private AliyunMQConfig.TopicProperties emailTopicProperties; @Autowired @Qualifier("timeTopicProperties") private AliyunMQConfig.TopicProperties timeTopicProperties; @Autowired private ProducerUtil producerUtil; /** * 测试MQ * * 使用封装的ProducerUtil,传入对应的参数即可发送消息 * msgTag 标签,可用于消息小分类标注 * messageBody 消息body内容,生产者自定义内容,任何二进制数据,生产者和消费者协定数据的序列化和反序列化 * msgKey 消息key值,建议设置全局唯一,比如订单号,用户id这种,可不传,不影响消息投递 */ @Test public void mqTest() { // 自定义一条body内容 JSONObject body = new JSONObject(); body.put("id", UUID.randomUUID()); body.put("notice", "这是一条通知类信息"); //同步发送消息-不带返回值的(一般使用该方法) producerUtil.sendOneWayMsg(smsTopicProperties.getTopic(), smsTopicProperties.getTag(), body.toJSONString().getBytes(), null); //同步发送消息-带返回值的 SendResult sendResult = producerUtil.sendMsg(emailTopicProperties.getTopic(), emailTopicProperties.getTag(), body.toJSONString().getBytes(), null); //定时/延时消息,当前时间的10秒后推送。时间可自定义 SendResult timeSendResult = producerUtil.sendTimeMsg(timeTopicProperties.getTopic(), timeTopicProperties.getTopic(), "延时消息".getBytes(), null, System.currentTimeMillis() + (10 * 1000)); //顺序消息(全局顺序 / 分区顺序)、分布式事务消息,同理 } }
运行测试方法,模拟生产者发送消息,可以看到消费者已接收到消息,延时的消息也收到了
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。