赞
踩
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
springBoot引入rabbitMq需要添加依赖
spring.application.name=cluster-1 spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5673 spring.rabbitmq.username=guest spring.rabbitmq.password=guest #必须配置这个才会确认回调 spring.rabbitmq.publisher-confirm-type=correlated #消息发送到交换机确认机制,是否返回回馈 spring.rabbitmq.publisher-returns=true # 对 rabbitmqTemplate 进行监听,当消息由于server的原因无法到达queue时,就会被监听到,以便执行ReturnCallback方法 # 默认为false,Server端会自动删除不可达消息 spring.rabbitmq.template.mandatory=true # 消费端手动确认 spring.rabbitmq.listener.type=simple #manual 手动确认 spring.rabbitmq.listener.simple.acknowledge-mode=manual # 并发消费 同一个队列启动几个消费者 spring.rabbitmq.listener.simple.concurrency=3 # 启动消费者最大数量 spring.rabbitmq.listener.simple.max-concurrency=3 #是否支持重试 true 支持 spring.rabbitmq.listener.simple.retry.enabled=true #最大重试次数 spring.rabbitmq.listener.simple.retry.max-attempts=5 #重试是无状态的还是有状态的 spring.rabbitmq.listener.simple.retry.stateless=false #时间策略乘数因子 spring.rabbitmq.listener.simple.retry.multiplier = 1.0 #第一次和第二次尝试发布或传递消息之间的间隔 spring.rabbitmq.listener.direct.retry.initial-interval=1000ms #最大重试时间间隔 spring.rabbitmq.listener.direct.retry.max-interval = 10000m #重试次数超过上面的设置之后是否丢弃(false不丢弃时需要写相应代码将该消息加入死信队列) spring.rabbitmq.listener.direct.default-requeue-rejected = true
package com.notification.rabbitcluster.normal; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Scope; import java.util.HashMap; import java.util.Map; /** * @author wang */ @Configuration public class RabbitMQConfig { @Autowired private CachingConnectionFactory connectionFactory; /** * 目标转换器,需要哪种类型的转换器就创建哪种类型的转换器 * * @return */ @Bean public DirectExchange exchangeHello() { Map<String, Object> eArguments = new HashMap<>(); //备份交换器参数 eArguments.put("alternate-exchange", "exchange.ae"); return new DirectExchange("exchange.hello", true, false, eArguments); } /** * 备份转换器 * * @return */ @Bean public FanoutExchange exchangeAE() { return new FanoutExchange("exchange.ae", true, false, null); } /** * 死信转换器 * * @return */ @Bean public TopicExchange exchangeDLX() { return new TopicExchange("exchange.dlx", true, false, null); } /** * 目标对列 * * @return 队列 */ @Bean public Queue queueHello() { Map<String, Object> args = new HashMap<>(); //声明死信交换器 args.put("x-dead-letter-exchange", "exchange.dlx"); //声明死信路由键 args.put("x-dead-letter-routing-key", "dlx.test"); //声明队列消息过期时间 5000ms args.put("x-message-ttl", 5000); return new Queue("queue.hello", true, false, false, args); } /** * 备份对列 * * @return 队列 */ @Bean public Queue queueAE() { return new Queue("queue.ae", true, false, false, null); } /** * 死信对列 * * @return 队列 */ @Bean public Queue queueDLX() { return new Queue("queue.dlx", true, false, false, null); } /** * 绑定目标对列 * * @param queueHello * @param exchangeHello * @return */ @Bean public Binding bindingExchangeDirect(@Qualifier("queueHello") Queue queueHello, @Qualifier("exchangeHello") DirectExchange exchangeHello) { return BindingBuilder.bind(queueHello).to(exchangeHello).with("helloKey"); } /** * 绑定备份对列 * * @param queueAE * @param exchangeAE * @return */ @Bean public Binding bindingExchangeAE(@Qualifier("queueAE") Queue queueAE, @Qualifier("exchangeAE") FanoutExchange exchangeAE) { return BindingBuilder.bind(queueAE).to(exchangeAE); } /** * 绑定死信对列 * * @param queueAE * @param exchangeDLX * @return */ @Bean public Binding bindingExchangeDLX(@Qualifier("queueDLX") Queue queueAE, @Qualifier("exchangeDLX") TopicExchange exchangeDLX) { return BindingBuilder.bind(queueAE).to(exchangeDLX).with("dlx.*"); } /** * 如果需要在生产者需要消息发送后的回调, * 需要对rabbitTemplate设置ConfirmCallback对象, * 由于不同的生产者需要对应不同的ConfirmCallback, * 如果rabbitTemplate设置为单例bean, * 则所有的rabbitTemplate实际的ConfirmCallback为最后一次申明的ConfirmCallback。 * * @return */ @Bean @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) public RabbitTemplate rabbitTemplate() { return new RabbitTemplate(connectionFactory); } }
package com.notification.rabbitcluster.queue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import java.util.Date; import java.util.UUID; /** * @author wang */ @RestController public class Sender implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback { private static Logger log = LoggerFactory.getLogger(Sender.class); @Autowired private RabbitTemplate rabbitTemplate; /** * 构造方法注入 */ @Autowired public Sender(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; //这是是设置回调能收到发送到响应 rabbitTemplate.setConfirmCallback(this); //如果设置备份队列则不起作用 rabbitTemplate.setMandatory(true); rabbitTemplate.setReturnCallback(this); } @GetMapping("/send") public void send() { String sendMsg = "hello1 " + new Date(); //convertAndSend(exchange:交换机名称,routingKey:路由关键字,object:发送的消息内容,correlationData:消息ID) CorrelationData cd = new CorrelationData(); // 消息唯一标识 String replace = UUID.randomUUID().toString().replace("-", ""); System.out.println("Sender : " + sendMsg+" ID :"+replace); cd.setId(replace); rabbitTemplate.convertAndSend("exchange.hello", "helloKey", sendMsg, cd); } /** * 回调确认 * @param correlationData * @param ack * @param cause */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack) { log.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause); } else { log.info("消息发送失败:correlationData({}),ack({}),cause({})", correlationData, ack, cause); } } /** * 消息发送到转换器的时候没有对列,配置了备份对列该回调则不生效 * @param message * @param replyCode * @param replyText * @param exchange * @param routingKey */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message); } }
新建一个项目,项目的jar包引入与rabbitMq配置与生产者相同
package com.notification.rabbitcustomer; import com.rabbitmq.client.Channel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import org.springframework.util.Assert; import java.io.IOException; /** * @author wang */ @Component public class Consumer { private static Logger log = LoggerFactory.getLogger(Consumer.class); @RabbitHandler @RabbitListener(queues = "queue.hello") public void process(Message message, Channel channel) throws IOException { log.info(String.format("receive:%s线程名:%s线程id:%d", new String(message.getBody()), Thread.currentThread().getName(), Thread.currentThread().getId())); /* * 手工ACK,不批量ack * 取值为 false 时,表示通知 RabbitMQ 当前消息被确认 * 如果为 true,则额外将比第一个参数指定的 delivery tag 小的消息一并确认 */ long deliveryTag = message.getMessageProperties().getDeliveryTag(); try{ Assert.isTrue(false); //发送消费成功消息 channel.basicAck(deliveryTag, false); } catch (Exception e){ //手动发送消费失败消息 channel.basicReject(deliveryTag, true); } } }
以上配置中的配置为消息重试配置
#是否支持重试 true 支持
spring.rabbitmq.listener.simple.retry.enabled=true
#最大重试次数
spring.rabbitmq.listener.simple.retry.max-attempts=5
#重试是无状态的还是有状态的
spring.rabbitmq.listener.simple.retry.stateless=false
#时间策略乘数因子
spring.rabbitmq.listener.simple.retry.multiplier = 1.0
#第一次和第二次尝试发布或传递消息之间的间隔
spring.rabbitmq.listener.direct.retry.initial-interval=1000ms
#最大重试时间间隔
spring.rabbitmq.listener.direct.retry.max-interval = 10000m
#重试次数超过上面的设置之后是否丢弃(false不丢弃时需要写相应代码将该消息加入死信队列)
spring.rabbitmq.listener.direct.default-requeue-rejected = true
创建队列时绑定死信交换器
/** * 目标对列 * * @return 队列 */ @Bean public Queue queueHello() { Map<String, Object> args = new HashMap<>(); //声明死信交换器 args.put("x-dead-letter-exchange", "exchange.dlx"); //声明死信路由键 args.put("x-dead-letter-routing-key", "dlx.test"); //声明队列消息过期时间 5000ms args.put("x-message-ttl", 5000); return new Queue("queue.hello", true, false, false, args); }
创建新的监听方法,监听死信队列,对死信队列中的数据进行处理。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。