赞
踩
一、消息确认模式
#开启confirm和return模式
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
配置多实例RabbitTemplate
自定义RabbitListenerContainerFactory 容器工厂,消费者时来使用。在容器中可以设置消费者签收模式、消费者标签策略、消息转换器、并行消费数量、预抓取数量、异常处理。。。。。。)
package com.host.mq; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory; import org.springframework.amqp.support.ConsumerTagStrategy; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Scope; import org.springframework.context.annotation.ScopedProxyMode; import javax.annotation.Resource; @Configuration public class RabbitMqConfig { @Resource private ConnectionFactory connectionFactory; @Bean @Scope(value = "prototype") public RabbitTemplate protoRabbitTemplate() { //如果是RabbitTemplate是单列的话setConfirmCallback() 和setReturnCallback()都只能设置一次,否则就会报异常。 //所有为了定制化,各个生产者按需设置setConfirmCallback() 和setReturnCallback() // 因此protoRabbitTemplate是多实例的,在生产者中取出一个新的实例RabbitTemplate,单独设置setConfirmCallback() 和setReturnCallback() RabbitTemplate template = new RabbitTemplate(connectionFactory); template.setMandatory(true); //开启强制委托模式 return template; } @Bean public RabbitListenerContainerFactory myListenerContainerFactory() { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); //手动签收 factory.setConcurrentConsumers(5);//并行消费者数量 factory.setMaxConcurrentConsumers(5);//最大消费者数量 factory.setPrefetchCount(16); //消费者标签策略 factory.setConsumerTagStrategy(new ConsumerTagStrategy() { @Override public String createConsumerTag(String queue) { return queue + "_tag"; } }); //消息转换器 factory.setMessageConverter(new Jackson2JsonMessageConverter()); return factory; } }
配置ConfirmCallback
package com.host.mq; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.stereotype.Component; @Component public class MyConfirmCallback implements RabbitTemplate.ConfirmCallback { private static final Logger LOGGER = LoggerFactory.getLogger(MyConfirmCallback.class); @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { //只要到达exchange就会回调 LOGGER.info("=====correlationData= " + correlationData); //如果开启了手动确认,但是消费者没有手动确认,这时ack会默认为true if (ack) { LOGGER.info("消息确认成功:"); } else { LOGGER.info("消息确认失败:" + cause); } } }
配置ReturnCallback 。前面配置的ConfirmCallback 并不能保证消费者已经接受到了。但是结合ReturnCallback 可以确定消息没有到达queue。ReturnCallback 只有在出现异常,没有到达queue时才会起作用。
package com.host.mq; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component; @Component public class MyReturnCallBackHandler implements RabbitTemplate.ReturnCallback { private static final Logger LOGGER = LoggerFactory.getLogger(MyReturnCallBackHandler.class); //消息到达exchange,但是没有到queue就会回调 @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { LOGGER.info("消息主体 message : "+message); LOGGER.info("消息主体 message : "+replyCode); LOGGER.info("描述:"+replyText); LOGGER.info("消息使用的交换器 exchange : "+exchange); LOGGER.info("消息使用的路由键 routing : "+routingKey); } }
生产者:
可以看到在生产者中我们设置了ReturnCallback 和ConfirmCallback
package com.host.mq; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageDeliveryMode; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Component; import java.util.UUID; @Component public class ProductMessage { private static final Logger LOGGER = LoggerFactory.getLogger(ProductMessage.class); @Autowired @Qualifier(value = "protoRabbitTemplate") private RabbitTemplate protoRabbitTemplate; @Autowired private MyReturnCallBackHandler myReturnCallBackHandler; @Autowired private MyConfirmCallback myConfirmCallback; private static final String EXCHANG = "test_exchang"; private static final String ROUNTING_KEY = "meng_key"; public void send() { protoRabbitTemplate.setReturnCallback(myReturnCallBackHandler); protoRabbitTemplate.setConfirmCallback(myConfirmCallback); CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString().replace("-", "")); String str = "{\"name\":\"zhangsan\"}"; MessageProperties properties = new MessageProperties(); properties.setContentType(MessageProperties.CONTENT_TYPE_JSON); properties.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT); Message message = new Message(str.getBytes(), properties); protoRabbitTemplate.convertAndSend(EXCHANG, ROUNTING_KEY, message, correlationData); } }
消费者:
使用@RabbitListener注解监听队列消息。我在消费者通过自定义的containerFactory 类"myListenerContainerFactory"。来实现消费者自定义模式。避免干扰其他消费者。
package com.host.mq; import com.rabbitmq.client.Channel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; @Component public class ConsumerMessage { private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerMessage.class); @RabbitListener(bindings = {@QueueBinding(value = @Queue("testqueue"),exchange = @Exchange(name = "test_exchang"), key="meng_key")}, containerFactory = "myListenerContainerFactory") public void process(Message message, Channel channel) { LOGGER.info("========ConsumerMessage=====" + new String(message.getBody())); try { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (IOException e) { try { channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } catch (IOException ex) { ex.printStackTrace(); } } } }
所有这些完成后,我们进行发送接收消息就可以了。。。。。
二、消息持久化
rabbitmq分为exchange、queue、message持久化。spring框架默认者三个都持久化。如果message要持久化,queue肯定也有持久化,因为message是存在queue中的。
exchange和queue可以通过durable设置为false,就是非持久化了。
message通过下面设置进行持久化和非持久化设置
MessageProperties properties = new MessageProperties();
properties.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
三、消息的两种消费模式
消费消息主要有两种方式: 1、通过消息监听容器MessageListenerContainer:下面代码中被我注释掉了的SimpleMessageListenerContainer就是一种实现,我们通过消息监听容器可以设置消费者签收模式、消费者标签策略、消息转换器、并行消费数量、预抓取数量、异常处理。。。。。。当然,前面的这些设置我们也可以通过在application中配置,由SpringBoot的自动装配功能帮我们自动创建。还有一点,就是这个容器可以同时监听多个队列,对多个队列进行统一设置处理。 2、另一种就是通过注解@RabbitListener和@RabbitHandler, @RabbitListener可以加在类或者方法上,用于标注当前类或方法是一个消息监听器,一般结合@RabbitHandler进行使用。@RabbitListener(queues = "") 也可以指定多个队列进行消费。 @RabbitListener(containerFactory="")注解中还有一个关键的属性containerFactory,containerFactory是消息监听容器工厂,如果不指定会使用默认的(这个根据配置spring.rabbitmq.listener.simple/direct决定),其生产MessageListenerContainer,设置的属性也差不多(消费者签收模式、消费者标签策略、消息转换器、并行消费数量、预抓取数量、异常处理。。。。。。) 使用这两个注解的消息处理方法是通过MessageConverter转化的,可以通过RabbitListenerContainerFactory 去设置我们自定义的消息转换器,下面代码中有个被我注释掉的方法中就定义了一个SimpleRabbitListenerContainerFactory,然后可以通过@RabbitListener(containerFactory="自定义消息监听容器bean名称")的方式指定使用我们自己的containerFactory。 消息的 content_type 属性表示消息 body 数据以什么数据格式存储,接收消息除了使用 Message 对象接收消息(包含消息属性等信息)之外,还可直接使用对应类型接收消息 body 内容,但若方法参数类型不正确会抛异常: application/octet-stream:二进制字节数组存储,使用 byte[] application/x-java-serialized-object:java 对象序列化格式存储,使用 Object、相应类型(反序列化时类型应该同包同名,否则会抛出找不到类异常) text/plain:文本数据类型存储,使用 String application/json:JSON 格式,使用 Object、相应类型
大家也可以参考下面的文章
https://www.cnblogs.com/leslies2/p/11238859.html
https://blog.csdn.net/u012988901/article/details/89673618
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。