赞
踩
最近在系统学习RabbitMQ的实战课程,感觉很受用,总结一下以供参考及分享
搭建spring boot项目,引入rabbitmq依赖。这里不再赘述
一、application.properties配置
server.port=8730 server.servlet.context-path=/mq #rabbitmq配置 spring.rabbitmq.virtual-host=/ spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.listener.concurrency=10 spring.rabbitmq.listener.max-concurrency=20 spring.rabbitmq.listener.prefetch=5 mq.env=local simple.mq.queue.name=${mq.env}.simple.mq.queue simple.mq.exchange.name=${mq.env}.simple.mq.exchange simple.mq.routing.key.name=${mq.env}.simple.mq.routing.key
二、创建RabbitMQConfig配置类
package com.jvli.project.config; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.rabbit.support.MessagePropertiesConverter; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.env.Environment; import com.jvli.project.rabbitmqlistener.SimpleListener; @Configuration public class RabbitMqConfig { private static final Logger logger = LoggerFactory.getLogger(RabbitMqConfig.class); @Autowired private Environment env; @Autowired private CachingConnectionFactory connectionFactory; @Autowired private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer; @Autowired private SimpleListener simpleListener; @Bean public RabbitTemplate rabbitTemplate() { connectionFactory.setPublisherConfirms(true); connectionFactory.setPublisherReturns(true); RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMandatory(true); rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { logger.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause); } }); rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { logger.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message); } }); return rabbitTemplate; } // TODO: 并发配置-消息手动确认机制 // 1. 创建消息模型 @Bean(name = "simpleQueue") public Queue simpleQueue() { return new Queue(env.getProperty("simple.mq.queue.name"), true); // true表示持久化 } @Bean public TopicExchange simpleExchange() { return new TopicExchange(env.getProperty("simple.mq.exchange.name"), true, false); // true表示持久化,false表示不自动删除 } @Bean public Binding simpleBinding() { return BindingBuilder.bind(simpleQueue()).to(simpleExchange()) .with(env.getProperty("simple.mq.routing.key.name")); } // 2.创建并发、手动确认监听容器 @Bean(name = "simpleContainer") public SimpleMessageListenerContainer simpleContainer(@Qualifier("simpleQueue") Queue simpleQueue) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); //container.setMessageConverter(new Jackson2JsonMessageConverter()); // TODO:并发配置 container.setConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.concurrency", Integer.class)); container.setMaxConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.max-concurrency", Integer.class)); container.setPrefetchCount(env.getProperty("spring.rabbitmq.listener.prefetch", Integer.class)); // TODO:消息确认-确认机制种类 container.setAcknowledgeMode(AcknowledgeMode.MANUAL); container.setQueues(simpleQueue); container.setMessageListener(simpleListener); return container; } }
三、创建消费监听
package com.jvli.project.rabbitmqlistener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import com.fasterxml.jackson.databind.ObjectMapper; import com.rabbitmq.client.Channel; @Component("simpleListener") public class SimpleListener implements ChannelAwareMessageListener, MessageListener{ private static final Logger logger = LoggerFactory.getLogger(SimpleListener.class); @Autowired private ObjectMapper objectMapper; @Override public void onMessage(Message message, Channel channel) throws Exception { long tag = message.getMessageProperties().getDeliveryTag(); try { byte[] body = message.getBody(); //User user=objectMapper.readValue(body,User.class); String msg = new String(body,"UTF-8"); logger.info("当前线程:{},简单消息监听确认机制监听到消息:{}",Thread.currentThread().getName(),msg); //int i=1/0; 测试抛出异常 //手动确认 channel.basicAck(tag, true); } catch (Exception e) { logger.error("简单消息监听确认机制发生异常:{}",e.fillInStackTrace()); channel.basicReject(tag, false); } } }
四、创建Controller,触发事件
package com.jvli.project.controller; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageBuilder; import org.springframework.amqp.core.MessageDeliveryMode; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.env.Environment; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; import com.fasterxml.jackson.databind.ObjectMapper; import com.jvli.project.dto.User; import com.jvli.project.response.BaseResponse; import com.jvli.project.response.StatusCode; @RestController @RequestMapping("/ack") public class AcknowledgeController { private static final Logger logger = LoggerFactory.getLogger(AcknowledgeController.class); @Autowired private RabbitTemplate rabbitTemplate; @Autowired private ObjectMapper objectMapper; @Autowired private Environment env; @RequestMapping(value = "/user/info",method = RequestMethod.GET) public BaseResponse ackUser() { try { rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); rabbitTemplate.setExchange(env.getProperty("simple.mq.exchange.name")); rabbitTemplate.setRoutingKey(env.getProperty("simple.mq.routing.key.name")); for (int i = 1; i <= 50; i++) { User user = new User(i, "army", "acknowledge"); Message message = MessageBuilder.withBody(objectMapper.writeValueAsBytes(user)).setDeliveryMode(MessageDeliveryMode.PERSISTENT).build(); rabbitTemplate.convertAndSend(message); } } catch (Exception e) { e.printStackTrace(); } return new BaseResponse<>(StatusCode.SUCCESS); } }
测试效果:
setPrefetchCount()
属性)Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。