赞
踩
本文主要是企业级的SpringBoot整合RabbitMq,妥妥的符合任务使用MQ的业务配置。
下面实际讲的就是高级部分,rabbitmq的初级部分没说,也就分为以下几个点,你理解即可:
1、消息应答,就是下面的ACK机制,分为自动应答和手动应答。
2、发布确认,就是下面ConfirmCallback,ReturnCallback这两个回调函数的执行场景。
3、交换机:分为3中Fanout,Direct ,Topic 。随便百度理解下概念,一般最常用的就是Topic ,因为Topic 扩展性强,下面案例中也就是使用的Topic 。
如果下面案例你真正理解了,那么你的MQ可以适用于任何复杂业务的封装场景了。
说明:下面这个例子,是把邮件和短信放到MQ里,然后消费端去消费。
列子中列举了MQ的所有的高级特性,具体看下代码注释,很详细。
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!--整合rabbitMq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.7.5</version>
</dependency>
</dependencies>
server:
port: 8080
spring:
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
username: root
password: root
url: jdbc:mysql://127.0.0.1:3306/rightcloud?useUnicode=true&autoReconnect=true&failOverReadOnly=false&characterEncoding=UTF-8&allowMultiQueries=true&useSSL=false&serverTimezone=Asia/Shanghai&zeroDateTimeBehavior=CONVERT_TO_NULL
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
requested-heartbeat: 60s
publisher-confirm-type: correlated
publisher-returns: true
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
@Slf4j
@Configuration
public class MessageQueueConfiguration {
@Value("${spring.rabbitmq.template.reply-timeout:1800000}")
private Integer replyTimeout;
/**
* 存在此名字的bean 自带的容器工厂会不加载(yml下rabbitmq下的template的配置),
* 如果想自定义来区分开 需要改变bean 的名称
* 配置的其他的bean也都遵循这个规则配置
* @param connectionFactory
* @return
*/
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
/**
* 单位:毫秒
* 同步消息方法convertSendAndReceive(),发送端等待接收消费端给出return msg的时间
*/
template.setReplyTimeout(replyTimeout);
template.setMessageConverter(new Jackson2JsonMessageConverter());
initMessageSendConfirm(template);
return template;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
//设置手动ACK
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
return factory;
}
@Bean(name = "connectionFactory")
@Primary
public ConnectionFactory connectionFactory(@Value("${spring.rabbitmq.host}") String host,
@Value("${spring.rabbitmq.port}") int port,
@Value("${spring.rabbitmq.username}") String username,
@Value("${spring.rabbitmq.password}") String password) {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost("/");
connectionFactory.setRequestedHeartBeat(60);
/**
* CORRELATED:异步回调,消息发送到交换机时会回调这个ConfirmCallback
* SIMPLE:则不会出发ConfirmCallback
*/
connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
return connectionFactory;
}
private void initMessageSendConfirm(RabbitTemplate rabbitTemplate) {
/**
* ConfirmCallback为发送Exchange(交换器)时回调,成功或者失败都会触发;
*/
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
log.info("消息发送到exchange成功");
} else {
log.error("消息发送到exchange失败,原因: {}, CorrelationData: {}", cause,
correlationData);
}
});
/**
* Mandatory为true时,消息通过交换器无法匹配到队列会返回给生产者 并触发ReturnCallback
* 为false时,匹配不到会直接被丢弃
*/
/**
* Mandatory为true时,消息通过交换器无法匹配到队列会返回给生产者 并触发ReturnCallback
* 为false时,匹配不到会直接被丢弃
*
* spring.rabbitmq.template.mandatory属性的优先级高于spring.rabbitmq.publisher-returns的优先级
* 一般不设置publisher-returns
* spring.rabbitmq.template.mandatory属性可能会返回三种值null、false、true.
* spring.rabbitmq.template.mandatory结果为true、false时会忽略掉spring.rabbitmq.publisher-returns属性的值
* spring.rabbitmq.template.mandatory结果为null(即不配置)时结果由spring.rabbitmq.publisher-returns确定
*/
rabbitTemplate.setMandatory(true);
/**
* ReturnCallback为路由不到队列时触发,成功则不触发;
*/
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
log.error("消息:{} 发送失败, 应答码:{} 原因:{} 交换机: {} 路由键: {}", message,
replyCode, replyText,
exchange, routingKey);
});
}
}
单位:毫秒,同步消息,方法convertSendAndReceive(),发送端等待接收消费端给出return msg的时间。
convertSendAndReceive方法是mq的同步方法,调用该方法会阻塞主方法,直到消费端消费完才继续往下走。replyTimeout设置的是消费端执行的最大时间,如果超过设置的时间还没执行完,则会报错。
这两个配置代码注释很详细,可以看注释理解,直接拿来用即可。
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.UUID;
@Slf4j
@Component
public class MessageQueueHelper {
@Resource
private RabbitTemplate rabbitTemplate;
@Resource
private RabbitAdmin rabbitAdmin;
@PostConstruct
public void init() {
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
}
/**
* 发送异步消息,根据参数动态创建交换机、队列,和业务更解耦
*
* @param exchangeName
* @param queueName
* @param sendMessage
*/
public void sendMessage(String exchangeName, String queueName, String routingKey, Object sendMessage) {
try {
TopicExchange exchange = new TopicExchange(exchangeName);
rabbitAdmin.declareExchange(exchange);
Queue queue = new Queue(queueName);
rabbitAdmin.declareQueue(queue);
String simpleName = sendMessage.getClass().getSimpleName();
/**
* *(星号)可以代替一个单词
* #(井号)可以替代零个或多个单词
*/
rabbitAdmin.declareBinding(BindingBuilder
.bind(queue)
.to(exchange)
.with(simpleName.toLowerCase() + ".#"));
rabbitTemplate.convertAndSend(exchangeName, routingKey, sendMessage, message -> {
/**
* 指定消费结果返回的队列
*/
message.getMessageProperties().setReplyTo("result-stu");
message.getMessageProperties().setCorrelationId(UUID.randomUUID().toString());
return message;
});
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 发送同步消息
*
* @param exchangeName
* @param queueName
* @param sendMessage
*/
public void sendMessageAndReceive(String exchangeName, String queueName, Object sendMessage) {
try {
TopicExchange exchange = new TopicExchange(exchangeName);
rabbitAdmin.declareExchange(exchange);
Queue queue = new Queue(queueName);
rabbitAdmin.declareQueue(queue);
/**
* *(星号)可以代替一个单词
* #(井号)可以替代零个或多个单词
*/
String routingKey = "vm.#";
rabbitAdmin.declareBinding(BindingBuilder
.bind(queue)
.to(exchange)
.with(routingKey));
Object o = rabbitTemplate.convertSendAndReceive(exchangeName, "vm.fff", sendMessage);
System.out.println(o);
} catch (Exception e) {
e.printStackTrace();
}
}
}
import cn.yx.zg.pojo.Mail;
import cn.yx.zg.pojo.Sms;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class TestConsumer {
@RabbitListener(queues = "mail.send")
@RabbitHandler
public String testConsumer(Mail mail, Channel channel, Message message) throws Exception {
log.info("消费消息:{}", mail.toString());
/**
* ACK,用的最多的一种
* deliveryTag:该消息的index
* false:表示不是批量
*/
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
/**
* Nack:手动拒绝
* deliveryTag:该消息的index
* false:表示不是批量
* false:被拒绝的是否重新入队列,一般默认false,因为第一次被拒绝后,后面多次肯定也被拒绝
*/
// channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
/**
* Reject:手动拒绝,和Nack相比少一个参数
* deliveryTag:该消息的index
* false:被拒绝的是否重新入队列,一般默认false,因为第一次被拒绝后,后面多次肯定也被拒绝
*/
// channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
return "消费-result";
}
@RabbitListener(queues = "sms.send")
@RabbitHandler
public String testConsumer2(Sms sms, Channel channel, Message message) throws Exception {
log.info("消费消息:{}", sms.toString());
/**
* ACK,用的最多的一种
* deliveryTag:该消息的index
* false:表示不是批量
*/
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
/**
* Nack:手动拒绝
* deliveryTag:该消息的index
* false:表示不是批量
* false:被拒绝的是否重新入队列,一般默认false,因为第一次被拒绝后,后面多次肯定也被拒绝
*/
// channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
/**
* Reject:手动拒绝,和Nack相比少一个参数
* deliveryTag:该消息的index
* false:被拒绝的是否重新入队列,一般默认false,因为第一次被拒绝后,后面多次肯定也被拒绝
*/
// channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
return "消费-result";
}
import lombok.Data;
import lombok.ToString;
import java.io.Serializable;
/**
* 邮件
*/
@Data
@ToString
public class Mail implements Serializable {
private String mailId;
private String content;
public Mail() {
}
public Mail(String mailId, String content) {
this.mailId = mailId;
this.content = content;
}
}
import lombok.Data;
import lombok.ToString;
import java.io.Serializable;
@Data
@ToString
public class Sms implements Serializable {
private String smsId;
private String content;
public Sms() {
}
public Sms(String smsId, String content) {
this.smsId = smsId;
this.content = content;
}
}
@Resource
private MessageQueueHelper messageQueueHelper;
@RequestMapping("send")
public void sendMsage() {
Mail mail = new Mail("1","我是邮件");
messageQueueHelper.sendMessage("message_ex", "mail.send", "mail", mail);
Sms sms = new Sms("1","我是短信");
messageQueueHelper.sendMessage("message_ex", "sms.send", "sms", sms);
}
访问地址:http://localhost:8080/send 测试消息
下面封装的这个属于比较复杂的业务,很多公司也是用不到的,有兴趣的可以了解一下。
先说下上面的缺点,其实也不算缺点,比如想让同一个消费者消费多个队列的数据,这样我们就得写多个@RabbitListener(queues = “mail.send”), 这里有个比较高级的写法,就是通过SimpleMessageListenerContainer动态的设置队列的消费类。
还是以上面代码为基础, 先把上面的消费者给注释,我们重新写个消费者。
新建类:TestConsumerListener.java
import cn.yx.zg.pojo.Mail;
import cn.yx.zg.pojo.Sms;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class TestConsumerListener {
/**
* 注意方法名称,一定要是handleMessage
* @param mail
* @return
* @throws Exception
*/
public String handleMessage(Mail mail) throws Exception {
log.info("消费消息listener:{}", mail.toString());
/**
* ACK,用的最多的一种
* deliveryTag:该消息的index
* false:表示不是批量
*/
// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
/**
* Nack:手动拒绝
* deliveryTag:该消息的index
* false:表示不是批量
* false:被拒绝的是否重新入队列,一般默认false,因为第一次被拒绝后,后面多次肯定也被拒绝
*/
// channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
/**
* Reject:手动拒绝,和Nack相比少一个参数
* deliveryTag:该消息的index
* false:被拒绝的是否重新入队列,一般默认false,因为第一次被拒绝后,后面多次肯定也被拒绝
*/
// channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
return "消费-result";
}
public String handleMessage(Sms sms) throws Exception {
log.info("消费消息listener:{}", sms.toString());
/**
* ACK,用的最多的一种
* deliveryTag:该消息的index
* false:表示不是批量
*/
// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
/**
* Nack:手动拒绝
* deliveryTag:该消息的index
* false:表示不是批量
* false:被拒绝的是否重新入队列,一般默认false,因为第一次被拒绝后,后面多次肯定也被拒绝
*/
// channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
/**
* Reject:手动拒绝,和Nack相比少一个参数
* deliveryTag:该消息的index
* false:被拒绝的是否重新入队列,一般默认false,因为第一次被拒绝后,后面多次肯定也被拒绝
*/
// channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
return "消费-result";
}
}
修改类:MessageQueueHelper.java
import cn.yx.zg.consumer.TestConsumerListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.UUID;
@Slf4j
@Component
public class MessageQueueHelper {
@Resource
private RabbitTemplate rabbitTemplate;
@Resource
private RabbitAdmin rabbitAdmin;
@Resource
private CachingConnectionFactory cachingConnectionFactory;
@Resource
private TestConsumerListener testConsumerListener;
@PostConstruct
public void init() {
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
startListenerForConsumer(testConsumerListener);
}
/**
* 发送异步消息,根据参数动态创建交换机、队列,和业务更解耦
*
* @param exchangeName
* @param queueName
* @param sendMessage
*/
public void sendMessage(String exchangeName, String queueName, String routingKey, Object sendMessage) {
try {
TopicExchange exchange = new TopicExchange(exchangeName);
rabbitAdmin.declareExchange(exchange);
Queue queue = new Queue(queueName);
rabbitAdmin.declareQueue(queue);
String simpleName = sendMessage.getClass().getSimpleName();
/**
* *(星号)可以代替一个单词
* #(井号)可以替代零个或多个单词
*/
rabbitAdmin.declareBinding(BindingBuilder
.bind(queue)
.to(exchange)
.with(simpleName.toLowerCase() + ".#"));
rabbitTemplate.convertAndSend(exchangeName, routingKey, sendMessage, message -> {
/**
* 指定消费结果返回的队列
*/
message.getMessageProperties().setReplyTo("result-stu");
message.getMessageProperties().setCorrelationId(UUID.randomUUID().toString());
return message;
});
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 发送同步消息
*
* @param exchangeName
* @param queueName
* @param sendMessage
*/
public void sendMessageAndReceive(String exchangeName, String queueName, Object sendMessage) {
try {
TopicExchange exchange = new TopicExchange(exchangeName);
rabbitAdmin.declareExchange(exchange);
Queue queue = new Queue(queueName);
rabbitAdmin.declareQueue(queue);
/**
* *(星号)可以代替一个单词
* #(井号)可以替代零个或多个单词
*/
String routingKey = "vm.#";
rabbitAdmin.declareBinding(BindingBuilder
.bind(queue)
.to(exchange)
.with(routingKey));
Object o = rabbitTemplate.convertSendAndReceive(exchangeName, "vm.fff", sendMessage);
System.out.println(o);
} catch (Exception e) {
e.printStackTrace();
}
}
public void startListenerForConsumer(Object listener) {
SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(cachingConnectionFactory);
MessageListenerAdapter adapter = new MessageListenerAdapter(listener,
new Jackson2JsonMessageConverter());
simpleMessageListenerContainer.setMessageListener(adapter);
//针对哪些队列(参数为可变参数)
simpleMessageListenerContainer.setQueueNames("mail.send","sms.send");
//同时有多少个消费者线程在消费这个队列,相当于线程池的线程数字。
simpleMessageListenerContainer.setConcurrentConsumers(6);
//最大的消费者线程数
simpleMessageListenerContainer.setMaxConcurrentConsumers(6);
/**
* 这种设置监听对象的方式,需要重新设置ACK方式,
* 不过这里我们设置了手动ACK和MessageListener,并不会触发消费者,就先不设置了,很多业务也不用手动ACK。
* 队列消费的结果,还是回放到我们发送消息时设置的返回队列
*/
// //手动确认(单条确认)
// simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
// simpleMessageListenerContainer.setMessageListener((ChannelAwareMessageListener) (message, channel) -> {
// log.info("消费端接收到的消息:[{}]", message);
// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// });
//消费端限流
simpleMessageListenerContainer.setPrefetchCount(1);
simpleMessageListenerContainer.start();
}
}
访问地址:http://localhost:8080/send 测试消息
很简单,这里不举列子了
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。