赞
踩
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.2.5.RELEASE</version>
</dependency>
通用配置,实际根据需要添加修改
spring:
#配置rabbitMq 服务器
rabbitmq:
host: localhost
port: 5672
username: admin
password: admin
# 重试次数,默认为3次
listener:
simple:
retry:
enabled: true
max-attempts: 5
# 手动ack
acknowledge-mode: manual
# ack
publisher-confirm-type: correlated
# 发送失败返回
publisher-returns: true
队列配置类,以direct模式为例,其他模式类似。定义queue,定义exchange,绑定queue与exchange。
生产者,交换机,多个消息队列,多个消费者
package cloud.lcx.learn.common.config.mq;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @ClassName DirectRabbitConfig
* @Description DirectRabbitConfig
* @Author jocker
* @Date 2020/9/2
*/
@Configuration
public class DirectRabbitConfig {
public static final String MY_DIRECT_QUEUE = "MyDirectQueue";
public static final String MY_DIRECT_EXCHANGE = "MyDirectExchange";
public static final String ROUTINGKEY = "hadoop";
// direct queue
@Bean
public Queue myDirectQueue(){
// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
//一般设置一下队列的持久化就好,其余两个就是默认false
return new Queue(MY_DIRECT_QUEUE,true,true,false);
}
//Direct交换机 起名:TestDirectExchange
@Bean
DirectExchange myDirectExchange() {
return new DirectExchange(MY_DIRECT_EXCHANGE,true,false);
}
//绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting
@Bean
Binding bindingDirect() {
return BindingBuilder.bind(myDirectQueue()).to(myDirectExchange()).with(ROUTINGKEY);
}
}
direct 如果路由键完全匹配的话,消息才会被投放到相应的队列,应用最多。
fanout 当发送一条消息到fanout交换器上时,它会把消息投放到所有附加在此交换器上的队列。
topic 设置模糊的绑定方式,“*”操作符将“.”视为分隔符,匹配单个字符;“#”操作符没有分块的概念,它将任意“.”均视为关键字的匹配部分,能够匹配多个字符,应用较多。
header 交换器允许匹配 AMQP 消息的 header 而非路由键,除此之外,header 交换器和 direct 交换器完全一致,但是性能却差很多,因此基本上不会用到该交换器
这里说的是消息怎么从交换机(exchange)到队列(queue)的过程,按以上的规则进行。而消息怎么从队列中被消费是竞争的,比如说在应用程序多节点部署情况下,会存在多个节点监听某个队列,一般情况下消息只会被消费一次,默认情况下是轮询的。
全局ACK配置,配置消息确认消费处理器,配置消息发送失败处理器。(非必须)
package cloud.lcx.learn.common.config.mq;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
/**
* @ClassName MqAckConfig
* @Description 消息消费确认配置
* @Author jocker
* @Date 2020/9/3
*/
@Configuration
public class MqAckConfig {
@Autowired
RabbitTemplate rabbitTemplate;
//初始化加载方法,对RabbitTemplate进行配置
@PostConstruct
void rabbitTemplate(){
//消息发送确认,发送到交换器Exchange后触发回调
rabbitTemplate.setConfirmCallback(new ReturnCallBackHandler());
//消息发送确认,如果消息从交换器发送到对应队列失败时触发(比如根据发送消息时指定的routingKey找不到队列时会触发)
rabbitTemplate.setReturnCallback(new ReturnHandler());
//自定义格式转换
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
// 设置为自动提交,即使配置文件添加了配置
factory.setConnectionFactory(connectionFactory);
// 手动设置手动ACK
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
return factory;
}
}
定义消息确认消费处理器(非必须)
package cloud.lcx.learn.common.config.mq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
/* *
* @Description 设置生产者消息confirm回调函数
* @Param
* @Returns
* @Author jocker
* @Date 2020/9/3 9:50
*/
@Slf4j
public class ReturnCallBackHandler implements RabbitTemplate.ConfirmCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if(ack){
log.info("rabbitMq Ack : {} recved",correlationData);
}else{
log.info("rabbitMq Ack : {} unrecved,cause {}",correlationData,cause);
}
}
}
这此处处理消费者Ack或Nack的回调逻辑。
定义消息发送失败处理器(非必须)
package cloud.lcx.learn.common.config.mq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
/* *
* @Description 设置生产者消息returns回调函数
* @Param 通过实现 ReturnCallback 接口,启动消息失败返回,比如路由不到队列时触发回调
* @Returns
* @Author jocker
* @Date 2020/9/3 9:44
*/
@Slf4j
public class ReturnHandler implements RabbitTemplate.ReturnCallback {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.error("rabbitmq : {} returned in {} with {},{}",message,exchange,replyCode,replyText);
}
}
与ConfirmCallback区别在于,此处一般处理的是事消息发送异常事件,消息未曾到达消费者那边。
可以定义一个统一的消息格式(非必须)
package cloud.lcx.learn.common.config.mq.model;
import lombok.Data;
/**
* @ClassName MqMessage
* @Description
* @Author jocker
* @Date 2020/9/3
*/
@Data
public class MqMessage {
String appId;
String msgBody;
}
使用RabbitTemplate发送消息,和redis类似
//使用RabbitTemplate,这提供了接收/发送等等方法
@Autowired
RabbitTemplate rabbitTemplate;
MqMessage message = new MqMessage();
message.setAppId("rabbitMq application");
message.setMsgBody("hello rabbit!");
//将消息携带绑定键值:TestDirectRouting 发送到交换机TestDirectExchange,指定消息唯一ID
rabbitTemplate.convertAndSend(DirectRabbitConfig.MY_DIRECT_EXCHANGE, DirectRabbitConfig.ROUTINGKEY, message,new CorrelationData(UUID.randomUUID().toString()) );
@Component
@Slf4j
public class RabbitDirectReceiver {
/* *
* 1. direct 下多个消费者只有一个消费者能消费消息,类似于负载,默认是轮询。
* 2. 重复消费,中心思想,在redis或者db中存储消息唯一值,消费前判断下,多线程环境下需要考虑线程安全。
*
*/
@RabbitListener(queues = DirectRabbitConfig.MY_DIRECT_QUEUE, containerFactory="rabbitListenerContainerFactory")
public void process(Message message, Channel channel) throws IOException {
try {
// 业务代码
// .......
// multiple: false 只确认当前消费者已消费消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// multiple: true 确认改消息已被所有消费者消费
//channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
}catch (Exception e){
log.error("消息处理异常!",e);
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true);
}
}
}
一般情况下,为防止消息人为丢失(代码错误导致消息消费异常而引起消息丢失),会开启手动消费确认,在客户端在取出消息处理完成后,手动回复确认是否消费成功,api如下:
成功消费
// multiple: false 只确认当前消费者已消费消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
消费失败
// 消费失败,消息回到队列
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true);
// 消费失败,删除消息
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
multiple批量确认,我用的比较少,一般都是只确认自己节点,可能在一些特殊场景下会用。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。