赞
踩
- server:
- port: 8021
- spring:
- #给项目来个名字
- application:
- name: rabbitmq-test
- #配置rabbitMq 服务器
- rabbitmq:
- host: 127.0.0.1
- port: 5672
- username: need
- password: 123456
- #虚拟host 可以不设置,使用server默认host
- virtual-host: /testhost
-
- listener:
- simple:
- acknowledge-mode: manual #手动ACK
- max-concurrency: 10 #消费之最大数量
- concurrency: 1 #消费者最小数量
- prefetch: 2 #在单个请求中处理的消息个数,他应该大于等于事务数量(unack的最大数量)
- direct:
- acknowledge-mode: manual #手动ACK
a:消息序列化转换
- package org.example.service_b.config;
-
- import com.fasterxml.jackson.databind.ObjectMapper;
- import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
- import org.springframework.amqp.support.converter.MessageConverter;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- /**
- * 此代码添加 在消费者的项目中
- */
- @Configuration
- public class RabbitmqGlobalConfig {
-
- /**
- * 当发送的消息为pojo时,为报转换异常
- * 解决方法:添加这个类进行序列化解析
- * 会自动识别
- * @param objectMapper json序列化实现类
- * @return mq 消息序列化工具
- */
- @Bean
- public MessageConverter jsonMessageConverter(ObjectMapper objectMapper) {
- return new Jackson2JsonMessageConverter(objectMapper);
- }
-
- }
b:接收消息
- import com.rabbitmq.client.Channel;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
- import java.io.IOException;
-
- @Component
- public class DirectReceiver_1 {
-
- @RabbitListener(queues = "TestDirectQueue")//监听的队列名称 TestDirectQueue
- /**
- * 注意:后4个参数,需要生产者发送消息时加上,否则为报错,注解里添加 required=false 或 删除参数
- */
- public void process(Message message,
- Channel channel,
- @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag,
- @Header(AmqpHeaders.MESSAGE_ID) String messageId,
- @Header(AmqpHeaders.CONSUMER_TAG) String consumerTag,
- CorrelationData correlationData) throws IOException, InterruptedException {
-
- //long deliveryTag = message.getMessageProperties().getDeliveryTag();
- //或
- //@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag
-
- //设置3秒的睡眠好看变化情况
- Thread.sleep(1000);
-
- try {
- String msgbody = new String(message.getBody());
- //1.接收转换消息
- System.out.println("DirectReceiver消费者 1 收到消息 : " + msgbody + " 编号: " + deliveryTag);
-
- //3. 手动签收
- channel.basicAck(deliveryTag, true);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。