当前位置:   article > 正文

SpringBoot整合 RabbitMQ_rabbittemplate 导入包

rabbittemplate 导入包

SpringBoot整合 RabbitMQ


建springboot项目时导入springboot整合rabbitmq依赖包
在这里插入图片描述

1,生产者

  • application.yml配置文件
spring:
  rabbitmq:
    port: 5672
    password: root
    username: root
    virtual-host: /pms
    host: localhost
    template:
      mandatory: true   #没有投递到队列,强制将消息退回给生产者
    publisher-confirm-type: correlated
    publisher-returns: true
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • RabbitConfig.java
@Configuration
public class RabbitConfg {

    //开启确认,返回
    RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
        @Override
        public void confirm(CorrelationData correlationData, boolean b, String s) {
            System.out.println("消息的Id:" + correlationData.getId());
            if(!b){
                System.out.println("拒收消息的原因:" + s);
            }
        }
    };
    RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
        @Override
        public void returnedMessage(Message message, int i, String s, String s1, String s2) {
            System.out.println("-------------------------------------");
            System.out.println("消息的主体:" + new String(message.getBody()));
            System.out.println("错误码:" + i);
            System.out.println("错误的消息:" +s );
            System.out.println("交换机:" +s1);
            System.out.println("路由Key: " +s2);
            System.out.println("-------------------------------------");
        }
    };

    //配置交换机
    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange("topic_exchange");
    }
    @Bean
    public TopicExchange orderExchange(){
        return new TopicExchange("order_exchange");
    }

    //对象要用消息转换器(自动注入,需要导入jackson-databind包)
    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory factory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        //设置连接工厂
        rabbitTemplate.setConnectionFactory(factory);
        //设置消息转换器
        rabbitTemplate.setMessageConverter(messageConverter());
        //找不到投递的队列,将消息强制退回给生产者
        rabbitTemplate.setMandatory(true);
        //设置确认机制
        rabbitTemplate.setConfirmCallback(confirmCallback);
        //设置返回机制
        rabbitTemplate.setReturnCallback(returnCallback);
        return rabbitTemplate;
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 生产者
@Component
public class OrderProducer {

    @Autowired
    RabbitTemplate rabbitTemplate;

    public void sendOrder(String exchange, String routingKey, OrderDto orderDto){
        CorrelationData data = new CorrelationData(orderDto.getOrderSn());
        //参数data使用来发送消息的附加参数,用来自定消息id
        rabbitTemplate.convertAndSend(exchange,routingKey,orderDto,data);

    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

2,消费者

  • 配置手动签收
spring:
  rabbitmq:
    port: 5672
    username: root
    password: root
    virtual-host: /pms
    host: localhost
    listener:
      simple:
        acknowledge-mode: manual  # 手动签收
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

消息转换器

@Configuration
public class rabbitConfig implements RabbitListenerConfigurer {

    @Override
    public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
        //注册消息处理器工厂
        registrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory());
    }

    //消息处理器方法工厂
    @Bean
    public MessageHandlerMethodFactory messageHandlerMethodFactory(){
        DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
        //设置消息转换器
        factory.setMessageConverter(messageConverter());
        return factory;
    }

    //消息反序列化
    @Bean
    public MappingJackson2MessageConverter messageConverter(){
        return new MappingJackson2MessageConverter();
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 消费者注解方式
@Component
public class MsgConsumer {

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(value = "msg_queue",declare = "true",durable = "true",exclusive = "false",autoDelete = "false"),
                    exchange = @Exchange(value = "topic_exchange",durable = "true",autoDelete = "false",type = "topic",declare = "true"),
                    key = "msg.#"
            )
    })
    @RabbitHandler//处理消息的消费者
    //@Payload要处理的消息对象
   public void processMsg(@Payload String msg, Channel channel, @Headers Map header){
        System.out.println("消费者处理消息:" + msg);
        //签收消息要先获取消息的id
        Long tag = (Long) header.get(AmqpHeaders.DELIVERY_TAG);
        try {
            //签收
            channel.basicAck(tag,false);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 消费者方法对象绑定方式
@Component
public class OrderConsumer {

    //队列
    @Bean
    public Queue orderQueue(){
        return new Queue("order_queue",true,false,false);
    }
    //交换机
    @Bean
    public TopicExchange orderExchange(){
        return new TopicExchange("order_exchange");
    }
    //绑定队列到交换机
    @Bean
    public Binding binding(){
        return BindingBuilder.bind(orderQueue()).to(orderExchange()).with("order.#");
    }

    @RabbitListener(queues = "order_queue")
    @RabbitHandler
   public void proccessOrder(@Payload OrderDto orderDto, Channel channel, @Headers Map header){
        System.out.println("消费者处理消息:" + orderDto.getOrderSn()+":" +orderDto.getCreatTime());
        //签收消息
        Long tag = (Long) header.get(AmqpHeaders.DELIVERY_TAG);
        try {
            channel.basicAck(tag,false);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/139129?site
推荐阅读
相关标签
  

闽ICP备14008679号