赞
踩
为了方便了解基于SpringBoot配置,我们首先熟悉在传统的Spring基于XML的配置,后续我们会更方便了解基于SpringBoot的配置
- <?xml version="1.0" encoding="UTF-8"?>
- <beans xmlns="http://www.springframework.org/schema/beans"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xmlns:context="http://www.springframework.org/schema/context"
- xmlns:rabbit="http://www.springframework.org/schema/rabbit"
- xsi:schemaLocation=
- "http://www.springframework.org/schema/context
- http://www.springframework.org/schema/context/spring-context.xsd
- http://www.springframework.org/schema/beans
- http://www.springframework.org/schema/beans/spring-beans.xsd
- http://www.springframework.org/schema/rabbit
- http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
-
- <context:property-placeholder location="rabbitmq.properties"/>
-
- <!--扫描 rabbit 包 自动声明交换器、队列、绑定关系-->
- <context:component-scan base-package="com.jetsen.config.rabbit"/>
-
- <!--1.声明连接工厂,在此我们设置rabbitmq的连接,用户名、密码、虚拟主机等信息-->
- <rabbit:connection-factory id="connectionFactory"
- addresses="${rabbitmq.addresses}"
- username="${rabbitmq.username}"
- password="${rabbitmq.password}"
- virtual-host="${rabbitmq.virtualhost}"/>
-
- <!--2.创建一个管理器(org.springframework.amqp.rabbit.core.RabbitAdmin),用于管理交换,队列和绑定。
- auto-startup 指定是否自动声明上下文中的队列,交换和绑定, 默认值为 true。-->
- <rabbit:admin connection-factory="connectionFactory" auto-startup="true" />
-
- <!-- 3.消息对象json转换类 -->
-
- <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />
-
- <!--4.声明 template 的时候需要声明 id 不然会抛出异常-->
- <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory message-converter="jsonMessageConverter" exchange="remoting.exchange"/>
-
- <!--5.声明 队列-->
- <!--定义消息队列,durable:是否持久化,如果想在RabbitMQ退出或崩溃的时候,不会失去所有的queue和消息,需要同时标志队列(queue)和交换机(exchange)是持久化的,即rabbit:queue标签和rabbit:direct-exchange中的durable=true,而消息(message)默认是持久化的可以看类org.springframework.amqp.core.MessageProperties中的属性public static final MessageDeliveryMode DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT;
- exclusive: 仅创建者可以使用的私有队列,断开后自动删除;
- auto_delete: 当所有消费客户端连接断开后,是否自动删除队列 -->
- <rabbit:queue name="remoting.queue" auto-declare="true" durable="true" auto-delete="false" exclusive="false"/>
-
- <!--6.声明交换机、队列、绑定管理 -->
-
- <rabbit:direct-exchange name="remoting.exchange" durable="true" auto-delete="false">
- <rabbit:bindings>
- <rabbit:binding queue="remoting.queue" key="remoting.binding"/>
- </rabbit:bindings>
- </rabbit:direct-exchange>
-
- <!-- 7.定义消费者,需实现implements MessageListener,实现 public void onMessage(Message msg)方法-->
-
- <bean name="queuehandler" class="com.jetsen.config.rabbitmq.RecvHandler" />
-
- <!-- 配置监听acknowledeg="manual"设置手动应答,它能够保证即使在一个worker处理消息的时候用CTRL+C来杀掉这个worker,或者一个consumer挂了(channel关闭了、connection关闭了或者TCP连接断了),也不会丢失消息。
- 因为RabbitMQ知道没发送ack确认消息导致这个消息没有被完全处理,将会对这条消息做re-queue处理。如果此时有另一个consumer连接,消息会被重新发送至另一个consumer会一直重发,直到消息处理成功,监听容器acknowledge="auto" concurrency="30"设置发送次数,最多发送30次 -->
-
- <!-- 8.定义消费者监听队列 -->
- <rabbit:listener-container
- connection-factory="connectionFactory">
- <rabbit:listener ref="queuehandler" queues="remoting.queue" />
- </rabbit:listener-container>
-
- </beans>

- public class RecvHandler implements MessageListener {
-
- private static final ObjectMapper MAPPER = new ObjectMapper();
-
- public void onMessage(Message msg) {
- try {
- // msg就是rabbitmq传来的消息,需要的同学自己打印看一眼
- // 使用jackson解析
- JsonNode jsonData = MAPPER.readTree(msg.getBody());
- System.out.println("接收消息:+jsonData.toString());
-
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- }

当然消费者可以在JAVA代码中实现
- @Bean
- public Queue remotingQueue() {
- // 创建一个持久化的队列 1
- return new Queue("remoting.queue", true);
- }
-
-
-
- @Bean
- public SimpleMessageListenerContainer simpleMessageQueueLister(ConnectionFactory connectionFactory) {
-
- SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
- // 设置监听的队列
- container.setQueues(remotingQueue());
- // 指定要创建的并发使用者数。
- container.setConcurrentConsumers(1);
- // 设置消费者数量的上限
- container.setMaxConcurrentConsumers(5);
- // 设置是否自动签收消费 为保证消费被成功消费,建议手工签收
- container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
- container.setMessageListener(new ChannelAwareMessageListener() {
- @Override
- public void onMessage(Message message, Channel channel) throws Exception {
- // 可以在这个地方得到消息额外属性
- MessageProperties properties = message.getMessageProperties();
- //得到消息体内容
- byte[] body = message.getBody();
- System.out.println(remotingQueue().getName() + "收到消息:" + new String(body));
- /*
- * DeliveryTag 是一个单调递增的整数
- * 第二个参数 代表是否一次签收多条,如果设置为 true,则所有 DeliveryTag 小于该 DeliveryTag 的消息都会被签收
- */
- channel.basicAck(properties.getDeliveryTag(), false);
- }
- });
- return container;
- }

- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
自动配置
- @EnableRabbit //开启基于注解的RabbitMQ模式
- @ComponentScan("com.jetsen.mq")
- @SpringBootApplication
- public class SpringRabbitMqApplication {
-
- public static void main(String[] args) {
- SpringApplication.run(SpringRabbitMqApplication.class, args);
- }
-
- }
- spring.rabbitmq.host=127.0.0.1
- spring.rabbitmq.port=5672
- spring.rabbitmq.username=rabbitadmin
- spring.rabbitmq.password=123456
- spring.rabbitmq.virtual-host=/vhost_rabbitmq
// Message需要自己构造一个;定义消息体内容和消息头
// rabbitTemplate.send(exchange,routeKey,message);
// object默认当成消息体,只需要传入要发送的对象,自动序列化发送给rabbitmq;
// rabbitTemplate.convertAndSend(exchange,routeKey,object);默认采用的是JAVA的序列化
- @Override
- public void convertAndSend(String exchange, String routingKey, final Object object) throws AmqpException {
- convertAndSend(exchange, routingKey, object, (CorrelationData) null);
- }
rabbitTemplate.receiveAndConvert("default.queue")
private volatile MessageConverter messageConverter = new SimpleMessageConverter();
在RabbitMQ中默认采用的序列化
SerializationUtils.deserialize(
createObjectInputStream(new ByteArrayInputStream(message.getBody()), this.codebaseUrl));
修改序列化机制
- @Configuration
- public class RabbitMQConfig {
-
- @Bean
- public MessageConverter messageConverter() {
- return new Jackson2JsonMessageConverter();
- }
- }
- @Component
- @RabbitListener(queues = "default.queue")
- public class RabbitMqConsumer {
-
- private static final Logger log= LoggerFactory.getLogger(RabbitMqConsumer.class);
-
- @RabbitHandler
- public void process(MessageBean messageBean,Channel channel, Message message) throws IOException {
- log.info("HelloReceiver收到 : " + messageBean +"收到时间"+DateUtil.formatDateByFormat(new Date(), "yyyyMMddHHmmss"));
- try {
-
- //告诉服务器收到这条消息 已经被我消费了 可以在队列删掉 这样以后就不会再发了 否则消息服务器以为这条消息没处理掉 后续还会在发
- //消息的标识,false只确认当前一个消息收到,true确认所有consumer获得的消息
- deliveryTag: 对于每个Channel来说,每个消息都会有一个DeliveryTag,一般用接收消息的顺序(index)来表示,一条消息就为1
- channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
- log.info("receiver success");
- } catch (IOException e) {
- log.info("接收到消息之后的处理发生异常.", e);
- }
-
-
- }}

同时注解可以申明交换机、队列和Binding
- @RabbitListener(
- bindings = @QueueBinding(
- value = @Queue(value = "default.queue1", durable = "true"),
- exchange = @Exchange(value = "default.exchange1",
- durable = "true",
- type = "topic",
- ignoreDeclarationExceptions = "true"),
- key = "default_routing_key.#")
- )
申明后会自动创建队列,交换机和Binding
收消息方法可以有两种
1、使用实体
- @RabbitListener(queues = "default.queue")
- @RabbitHandler
- public void process(MessageBean messageBean, Channel channel, Message message) throws IOException {
- log.info("HelloReceiver收到【MessageBean】 : " + messageBean + "收到时间"
- + DateUtil.formatDateByFormat(new Date(), "yyyyMMddHHmmss"));
- try {
-
- // 告诉服务器收到这条消息 已经被我消费了 可以在队列删掉 这样以后就不会再发了 否则消息服务器以为这条消息没处理掉 后续还会在发
- // 消息的标识,false只确认当前一个消息收到,true确认所有consumer获得的消息
- deliveryTag:
- // 对于每个Channel来说,每个消息都会有一个DeliveryTag,一般用接收消息的顺序(index)来表示,一条消息就为1
- channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
- log.info("receiver success");
- } catch (IOException e) {
- log.info("接收到消息之后的处理发生异常.", e);
- }
-
- }

2、使用Message
- @RabbitListener(queues = "default.queue")
- @RabbitHandler
- public void process(Channel channel, Message message) throws IOException {
- byte[] body = message.getBody();
- String messageStr = new String(body,"UTF-8");
- log.info("HelloReceiver收到【Message】 : " + messageStr + "收到时间"
- + DateUtil.formatDateByFormat(new Date(), "yyyyMMddHHmmss"));
- try {
-
- // 告诉服务器收到这条消息 已经被我消费了 可以在队列删掉 这样以后就不会再发了 否则消息服务器以为这条消息没处理掉 后续还会在发
- // 消息的标识,false只确认当前一个消息收到,true确认所有consumer获得的消息
- deliveryTag:
- // 对于每个Channel来说,每个消息都会有一个DeliveryTag,一般用接收消息的顺序(index)来表示,一条消息就为1
- channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
- log.info("receiver success");
- } catch (IOException e) {
- log.info("接收到消息之后的处理发生异常.", e);
- }
-
- }

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。