赞
踩
server:
port: 11111
spring:
rabbitmq:
port: 5672
host: 192.168.201.81
username: admin
password: 123
publisher-confirm-type: correlated
ConfirmCallback 是一个回调接口,消息发送到 Broker 后触发回调,确认消息是否到达 Broker 服务器,也就是只确认是否正确到达 Exchange 中。
在配置类中编码确认回调函数。tips: 设置 rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback(ConfirmCallback confirmCallback);
CorrelationData:
1、消息ID需要封装到CorrelationData
2、correlationData.getFuture().addCallback(…)是一个回调函数:决定了每个业务处理confirm成功或失败的逻辑。
@Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){ RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); log.debug("Rabbitmq配置启动成功,RabbitTemplate:{}设置完成",rabbitTemplate); rabbitTemplate.setMessageConverter(messageConverter()); rabbitTemplate.setConfirmCallback(new RabbitConfirmCallbackImpl()); return rabbitTemplate; } /** * 确保消息是否发送到交换机 */ class RabbitConfirmCallbackImpl implements RabbitTemplate.ConfirmCallback{ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.warn("****Exchange callback-检验是否发送成功********"); log.warn("correlationData->相关数据:{}",correlationData); log.warn("ack->Exchange响应:{}",ack); log.warn("cause->错误原因:{}",cause); } }
测试向交换机发送数据,测试交换机是否成功收到。
@Service public class MqServiceImpl implements IMqService { @Autowired private RabbitTemplate rabbitTemplate; @Override public void sendMessage(String msg) { //错误的Exchange名称,实际名称为:ssc_sc_routing_exchange final String EXCHANGE = "ssc_sc_routing_exchangex"; final String ROUTING_KEY = "ssc_sc_routing_key"; rabbitTemplate.convertAndSend( EXCHANGE, ROUTING_KEY, msg ); } }
@Override
public void sendMessage(String msg) {
final String EXCHANGE = "ssc_sc_routing_exchange";
final String ROUTING_KEY = "ssc_sc_routing_key";
rabbitTemplate.convertAndSend(
EXCHANGE,
ROUTING_KEY,
msg
);
}
通过实现 ReturnCallback 接口,启动消息失败返回,此接口是在交换器路由不到队列时触发回调。
spring:
rabbitmq:
publisher-confirm-type: correlated
publisher-returns: true #检查是否绑定到队列中
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnsCallback(new RabbitConfirmReturnCallbackImpl());
class RabbitConfirmReturnCallbackImpl implements RabbitTemplate.ReturnsCallback{
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
log.warn("message:{}",returnedMessage.getMessage());
log.warn("exchange:{}",returnedMessage.getExchange());
log.warn("replyCode:{}",returnedMessage.getReplyCode());
log.warn("replyText:{}",returnedMessage.getReplyText());
log.warn("routingKey:{}",returnedMessage.getRoutingKey());
}
}
修改routingkey的值,让交换机不能路由到指定Queue。
package com.wnhz.ssc.cloud.mq.service.impl; import com.wnhz.ssc.cloud.mq.service.IMqService; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service public class MqServiceImpl implements IMqService { @Autowired private RabbitTemplate rabbitTemplate; @Override public void sendMessage(String msg) { final String EXCHANGE = "ssc_sc_routing_exchange"; //修改routingkey,给一个错误的值,正确值为: ssc_sc_routing_key final String ROUTING_KEY = "ssc_sc_routing_keyx"; rabbitTemplate.convertAndSend( EXCHANGE, ROUTING_KEY, msg ); } }
返回message:
message:(
Body:'"hello confirm call back"'
MessageProperties
[
headers={
__TypeId__=java.lang.String
},
contentType=application/json,
contentEncoding=UTF-8,
contentLength=0,
receivedDeliveryMode=PERSISTENT,
priority=0,
deliveryTag=0
]
)
Simple模式
Simple模式即SMLC。simple模式每个消费者都有其私有的线程,可以增加消费者,也会自动增加消费线程,不管消费者是不是在处理消息,可能会造成资源线程的浪费。 对每个消费者使用一个内部队列和一个专用线程。如果容器配置为侦听多个队列,则使用同一个消费者线程来处理所有队列。并发控制由concurrentConsumers和其他属性。当消息从 RabbitMQ 客户端到达时,客户端线程通过队列将它们传递给消费者线程。
Direct模式
压力集中在Connection线程池上,线程可以复用与多个消费者,但是如果采用这种模式,需要设置Connection线程池合适的参数。
Message对象的结构,
消费者在订阅队列时,可以指定 autoAck 参数,当 autoAck 参数等于 false 时,RabbitMQ 会等待消费者显式地回复确认信号后才从内存(或者磁盘)中移除消息(实际上是先打上删除标记,之后在删除)。当 autoAck 参数等于 true 时,RabbitMQ 会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正地消费到了这些消息。
- AcknowledgeMode.AUTO:自动确认。
- AcknowledgeMode.NONE:根据情况确认。
- AcknowledgeMode.MANUAL:手动确认。
direct模式:
simple模式:
@RabbitListener(queues = "data_confirm_queue") @Override public void receiveBookFromMq(Message message, Channel channel, Book book) { log.debug("message:{}", message); log.debug("message.getMessageProperties().getHeaders()===>{}", message.getMessageProperties().getHeaders()); log.debug("[order消费者:]接收到消息: {}", book); try { channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); log.debug("消息队列确认: {},{}", message.getMessageProperties().getConsumerQueue(), "接收到回调方法"); } catch (IOException e) { e.printStackTrace(); } }
- Basic.Ack 命令:用于确认当前消息。
- Basic.Nack 命令:用于否定当前消息(注意:这是AMQP 0-9-1的RabbitMQ扩展) 。
- Basic.Reject 命令:用于拒绝当前消息
basicAck 方法用于确认当前消息。
public void basicAck(long deliveryTag, boolean multiple) throws IOException {
this.delegate.basicAck(deliveryTag, multiple);
}
deliveryTag(唯一标识 ID):当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel ,RabbitMQ 会用 basic.deliver 方法向消费者推送消息,这个方法携带了一个 delivery tag, 它代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,delivery tag 的范围仅限于 Channel。
multiple:为了减少网络流量,手动确认可以被批处理。
false: 只会应答 tag=8 的消息 5,6,7 这三个消息依然不会被确认收到消息应答
basicNack 方法用于否定当前消息。 由于 basicReject 方法一次只能拒绝一条消息,如果想批量拒绝消息,则可以使用 basicNack 方法。消费者客户端可以使用 channel.basicNack 方法来实现
public void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException {
this.delegate.basicNack(deliveryTag, multiple, requeue);
}
basicNack 方法用于否定当前消息。basicReject 方法用于明确拒绝当前的消息而不是确认。
public void basicReject(long deliveryTag, boolean requeue) throws IOException {
this.delegate.basicReject(deliveryTag, requeue);
}
消息遗弃或入队,一般建议消息丢弃重新发。
package com.wnhz.mq.order.service.impl; import com.rabbitmq.client.Channel; import com.wnhz.domain.Book; import com.wnhz.mq.order.service.IOrderService; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; import java.io.IOException; @Slf4j @Service public class OrderServiceImpl implements IOrderService { private void buildException(){ throw new RuntimeException("[消费者:] 消费出现异常......"); } @RabbitListener(queues = "data_confirm_queue") @Override public void receiveBookFromMq(Message message, Channel channel, Book book) { try { //制造异常测试 buildException(); log.debug("message:{}", message); log.debug("message.getMessageProperties().getHeaders()===>{}", message.getMessageProperties().getHeaders()); log.debug("[order消费者:]接收到消息: {}", book); channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); log.debug("消息队列确认: {},{}", message.getMessageProperties().getConsumerQueue(), "接收到回调方法"); } catch (Exception e) { log.debug("消费异常: {}",e.getMessage()); try { log.debug("尝试丢弃:{}消息.....................",book); channel.basicReject(message.getMessageProperties().getDeliveryTag(),false); } catch (IOException ex) { ex.printStackTrace(); } } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。