当前位置:   article > 正文

SpringBoot+消息队列RocketMQ(基于阿里云)_aliyun.openservices onsmessagetrace

aliyun.openservices onsmessagetrace

最近由于公司技术框架更新,原先的RabbitMQ需要换成RocketMQ,两者原理和使用都大同小异,业务简单的话,切换起来成本也还好

安装

当前版本为:
<aliyun-ons.version>1.8.4.Final</aliyun-ons.version>

<!--阿里云RocketMQ-->
<dependency>
    <groupId>com.aliyun.openservices</groupId>
    <artifactId>ons-client</artifactId>
    <version>${aliyun-ons.version}</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

配置

实际使用中需要替换你的阿里云配置,包括API鉴权和实例TCP协议接入地址

server:
  port: 8082

#配置队列
aliyun:
  mq:
    # API鉴权
    accessKeyId: xxx
    accessKeySecret: xxx
    # 实例TCP协议接入地址(内网)
    nameSrvAddr: http://MQ_INST_xxx_BXQwkNTl.cn-shanghai.mq-internal.aliyuncs.com:8080
    # 普通消息-短信
    sms:
      topic: sms
      tag: '*'
      groupId: GID_message
    # 普通消息-邮件
    email:
      topic: email
      tag: '*'
      groupId: GID_message
    # 定时/延时消息
    time:
      topic: time
      tag: '*'
      groupId: GID_message
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

yml配置映射java类

import com.aliyun.openservices.ons.api.PropertyKeyConst;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.Properties;

/**
 * 阿里云MQ配置
 *
 * @author jason
 */
@Data
@Configuration
@ConfigurationProperties(prefix = "aliyun.mq")
public class AliyunMQConfig {

	/**
	 * 阿里云 oss 公钥
	 */
	private String accessKeyId;

	/**
	 * 阿里云 oss 私钥
	 */
	private String accessKeySecret;

	/**
	 * 实例TCP协议接入地址(内网)
	 */
	private String nameSrvAddr;

	public Properties getMqProperties() {
		Properties properties = new Properties();
		properties.setProperty(PropertyKeyConst.AccessKey, this.accessKeyId);
		properties.setProperty(PropertyKeyConst.SecretKey, this.accessKeySecret);
		properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, this.nameSrvAddr);
		// 设置发送超时时间,单位毫秒
		properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
		return properties;
	}

	/**
	 * 获取消息队列配置
	 */
	@Data
	public static class TopicProperties {
		private String topic;
		private String groupId;
		private String tag;
	}

	/**
	 * 短信
	 */
	@Bean
	@ConfigurationProperties(prefix = "aliyun.mq.sms")
	public TopicProperties smsTopicProperties() {
		return new TopicProperties();
	}

	/**
	 * 邮件
	 */
	@Bean
	@ConfigurationProperties(prefix = "aliyun.mq.email")
	public TopicProperties emailTopicProperties() {
		return new TopicProperties();
	}

	/**
	 * 定时/延时消息
	 */
	@Bean
	@ConfigurationProperties(prefix = "aliyun.mq.time")
	public TopicProperties timeTopicProperties() {
		return new TopicProperties();
	}

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81

生产者

通过buildProducer注册成为生产者

import com.aliyun.openservices.ons.api.bean.ProducerBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 阿里云MQ-生产者
 *
 * @author jason
 */
@Configuration
public class ProducerClient {

	@Autowired
	private AliyunMQConfig aliyunMQConfig;

	@Bean(initMethod = "start", destroyMethod = "shutdown")
	public ProducerBean buildProducer() {
		ProducerBean producer = new ProducerBean();
		producer.setProperties(aliyunMQConfig.getMqProperties());
		return producer;
	}

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

封装生产者发送消息工具类

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.bean.ProducerBean;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * 生产者发送消息的工具类
 *
 * @author jason
 */
@Slf4j
@Component
public class ProducerUtil {

	@Autowired
	private ProducerBean producer;

	/**
	 * 同步发送消息
	 *
	 * @param topic       topic名
	 * @param msgTag      标签,可用于消息小分类标注
	 * @param messageBody 消息body内容,生产者自定义内容
	 * @param msgKey      消息key值,建议设置全局唯一,可不传,不影响消息投递
	 * @return success:SendResult or error:null
	 */
	public SendResult sendMsg(String topic, String msgTag, byte[] messageBody, String msgKey) {
		Message msg = new Message(topic, msgTag, msgKey, messageBody);
		return this.send(msg, Boolean.FALSE);
	}

	/**
	 * 同步发送单向消息
	 *
	 * @param topic       topic名
	 * @param msgTag      标签,可用于消息小分类标注
	 * @param messageBody 消息body内容,生产者自定义内容
	 * @param msgKey      消息key值,建议设置全局唯一,可不传,不影响消息投递
	 */
	public void sendOneWayMsg(String topic, String msgTag, byte[] messageBody, String msgKey) {
		Message msg = new Message(topic, msgTag, msgKey, messageBody);
		this.send(msg, Boolean.TRUE);
	}

	/**
	 * 同步发送定时/延时消息
	 *
	 * @param topic
	 * @param msgTag      标签,可用于消息小分类标注,对消息进行再归类
	 * @param messageBody 消息body内容,生产者自定义内容,二进制形式的数据
	 * @param msgKey      消息key值,建议设置全局唯一值,可不设置,不影响消息收发
	 * @param delayTime   服务端发送消息时间,立即发送输入0或比更早的时间
	 * @return success:SendResult or error:null
	 */
	public SendResult sendTimeMsg(String topic, String msgTag, byte[] messageBody, String msgKey, long delayTime) {
		Message msg = new Message(topic, msgTag, msgKey, messageBody);
		msg.setStartDeliverTime(delayTime);
		return this.send(msg, Boolean.FALSE);
	}

	/**
	 * 发送普通消息
	 *
	 * @param msg      消息
	 * @param isOneWay 是否单向发送
	 */
	private SendResult send(Message msg, Boolean isOneWay) {
		try {
			if (isOneWay) {
				//由于在 oneway 方式发送消息时没有请求应答处理,一旦出现消息发送失败,则会因为没有重试而导致数据丢失。
				//若数据不可丢,建议选用同步或异步发送方式。
				producer.sendOneway(msg);
				success(msg, "单向消息MsgId不返回");
				return null;
			} else {
				//可靠同步发送
				SendResult sendResult = producer.send(msg);
				//获取发送结果,不抛异常即发送成功
				assert sendResult != null;
				success(msg, sendResult.getMessageId());
				return sendResult;
			}
		} catch (Exception e) {
			error(msg, e);
			return null;
		}
	}

	/**
	 * 成功日志打印
	 *
	 * @param msg
	 * @param messageId
	 */
	private void success(Message msg, String messageId) {
		log.info("发送MQ消息成功 -- Topic:{} ,msgId:{} , Key:{}, tag:{}, body:{}"
				, msg.getTopic(), messageId, msg.getKey(), msg.getTag(), new String(msg.getBody()));
	}

	/**
	 * 异常日志打印
	 *
	 * @param msg
	 * @param e
	 */
	private void error(Message msg, Exception e) {
		log.error("发送MQ消息失败-- Topic:{}, Key:{}, tag:{}, body:{}"
				, msg.getTopic(), msg.getKey(), msg.getTag(), new String(msg.getBody()));
		log.error("errorMsg", e);
	}

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114

消费者

这里订阅一个消费者监听3条队列

import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.bean.ConsumerBean;
import com.aliyun.openservices.ons.api.bean.Subscription;
import com.example.aliyunmqdemo.normal.EmailMqMessageListener;
import com.example.aliyunmqdemo.normal.SmsMqMessageListener;
import com.example.aliyunmqdemo.time.TimeMqMessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

/**
 * 阿里云MQ-订阅消费者监听
 *
 * @author jason
 */
@Configuration
public class ConsumerClient {

	@Autowired
	private AliyunMQConfig aliyunMQConfig;

	@Autowired
	@Qualifier("smsTopicProperties")
	private AliyunMQConfig.TopicProperties smsTopicProperties;

	@Autowired
	@Qualifier("emailTopicProperties")
	private AliyunMQConfig.TopicProperties emailTopicProperties;

	@Autowired
	@Qualifier("timeTopicProperties")
	private AliyunMQConfig.TopicProperties timeTopicProperties;

	@Autowired
	private EmailMqMessageListener emailMqMessageListener;

	@Autowired
	private SmsMqMessageListener smsMqMessageListener;

	@Autowired
	private TimeMqMessageListener timeMqMessageListener;

	@Bean(initMethod = "start", destroyMethod = "shutdown")
	public ConsumerBean messageBuildConsumer() {
		ConsumerBean consumerBean = new ConsumerBean();
		//配置文件
		Properties properties = aliyunMQConfig.getMqProperties();
		//消费者
		properties.setProperty(PropertyKeyConst.GROUP_ID, smsTopicProperties.getGroupId());
		//设置消费者线程数为20个(默认20)
		properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "20");
		consumerBean.setProperties(properties);
		//订阅消息
		Map<Subscription, MessageListener> subscriptionTable = new HashMap<>();
		//订阅短信消息
		Subscription smsSubscription = new Subscription();
		smsSubscription.setTopic(smsTopicProperties.getTopic());
		smsSubscription.setExpression(smsTopicProperties.getTag());
		subscriptionTable.put(smsSubscription, smsMqMessageListener);
		//订阅邮件消息
		Subscription emailSubscription = new Subscription();
		emailSubscription.setTopic(emailTopicProperties.getTopic());
		emailSubscription.setExpression(emailTopicProperties.getTag());
		subscriptionTable.put(emailSubscription, emailMqMessageListener);
		//订阅定时/延时消息
		Subscription timeSubscription = new Subscription();
		timeSubscription.setTopic(timeTopicProperties.getTopic());
		timeSubscription.setExpression(timeTopicProperties.getTag());
		subscriptionTable.put(timeSubscription, timeMqMessageListener);

		consumerBean.setSubscriptionTable(subscriptionTable);
		return consumerBean;
	}

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81

定义接收MQ消息监听器

import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

/**
 * 普通(默认同步)MQ消息监听消费
 * 【邮件】
 * 
 * @author jason
 */
@Slf4j
@Component
public class EmailMqMessageListener implements MessageListener {

	@Override
	public Action consume(Message message, ConsumeContext context) {
		log.info("【邮件】接收到MQ详细信息:{}", message);
		log.info("解析MQ-Body自定义内容:{}", new String(message.getBody()));
		try {
			//do something..
			return Action.CommitMessage;
		} catch (Exception e) {
			log.error("消费MQ消息失败,msgId:" + message.getMsgID() + ",ExceptionMsg:" + e.getMessage());
			return Action.ReconsumeLater;
		}
	}

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

SmsMqMessageListener.java
TimeMqMessageListener.java
替换类名即可,代码相同

阿里云控制台

添加三条Topic
在这里插入图片描述
添加一个GroupID
在这里插入图片描述

测试

启动spring-boot服务后,我们可以在阿里云控制台看到消费者的状态:在线 则代表服务启动成功了
在这里插入图片描述
点击详细信息,可以看到订阅关系,这里表示该GroupID订阅了三条Topic
在这里插入图片描述

测试

import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.shade.com.alibaba.fastjson.JSONObject;
import com.example.aliyunmqdemo.mq.AliyunMQConfig;
import com.example.aliyunmqdemo.utils.ProducerUtil;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;

import java.util.UUID;

@SpringBootTest
class AliyunMqDemoApplicationTests {

	@Autowired
	@Qualifier("smsTopicProperties")
	private AliyunMQConfig.TopicProperties smsTopicProperties;

	@Autowired
	@Qualifier("emailTopicProperties")
	private AliyunMQConfig.TopicProperties emailTopicProperties;

	@Autowired
	@Qualifier("timeTopicProperties")
	private AliyunMQConfig.TopicProperties timeTopicProperties;

	@Autowired
	private ProducerUtil producerUtil;

	/**
	 * 测试MQ
	 *
	 * 使用封装的ProducerUtil,传入对应的参数即可发送消息
	 * msgTag 标签,可用于消息小分类标注
	 * messageBody 消息body内容,生产者自定义内容,任何二进制数据,生产者和消费者协定数据的序列化和反序列化
	 * msgKey 消息key值,建议设置全局唯一,比如订单号,用户id这种,可不传,不影响消息投递
	 */
	@Test
	public void mqTest() {
		// 自定义一条body内容
		JSONObject body = new JSONObject();
		body.put("id", UUID.randomUUID());
		body.put("notice", "这是一条通知类信息");

		//同步发送消息-不带返回值的(一般使用该方法)
		producerUtil.sendOneWayMsg(smsTopicProperties.getTopic(), smsTopicProperties.getTag(), body.toJSONString().getBytes(), null);
		//同步发送消息-带返回值的
		SendResult sendResult = producerUtil.sendMsg(emailTopicProperties.getTopic(), emailTopicProperties.getTag(), body.toJSONString().getBytes(), null);
		//定时/延时消息,当前时间的10秒后推送。时间可自定义
		SendResult timeSendResult = producerUtil.sendTimeMsg(timeTopicProperties.getTopic(), timeTopicProperties.getTopic(), "延时消息".getBytes(), null, System.currentTimeMillis() + (10 * 1000));
		//顺序消息(全局顺序 / 分区顺序)、分布式事务消息,同理
	}

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54

运行测试方法,模拟生产者发送消息,可以看到消费者已接收到消息,延时的消息也收到了
在这里插入图片描述

源码

点击下载

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Cpp五条/article/detail/386559
推荐阅读
相关标签
  

闽ICP备14008679号