当前位置:   article > 正文

rabbitmq多个消费者消费同一个队列中的同一条消息_rabbitmq多个消费者消费一个消息

rabbitmq多个消费者消费一个消息

前言

使用springboot整合rabbitmq实现,一个生产者生产一条数据,多个消费者消费同一条数据案例,可以解决微服务分布式事务控制。保证最终一致性原则

rabbitmq是什么?

相对简易得一种消息中间件,功能强大,性能中等,吞吐量一般。

使用步骤

  1. 引入库

    <!-- rabbitmq消息队列-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.62</version>
    </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
  2. rabbitmq相关配置

    @Configuration
    public class RabbitmqConfig {
    	@Bean
        public Queue queue1() {
            return new Queue("queue1",true);
        }
    
        @Bean
        public FanoutExchange exchange1() {
            return new FanoutExchange("exchange1",true, false);
        }
    
        @Bean
        public Binding binding1() {
            return BindingBuilder.bind(queue1()).to(exchange1());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    因为是fanout广播模式,所以不用配置路由键

  3. 生产者代码

    @Resource
    private RabbitTemplate rabbitTemplate;
    
    @Test
    public void pushMessage() {
        HashMap<Object, Object> hashMap = new HashMap<>();
        hashMap.put("name","zhangsan");
        hashMap.put("age","18");
        rabbitTemplate.convertAndSend("exchange1","", hashMap);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
  4. 多个消费者,消费同一条数据

    消费者1号

    @Component
    public class RabbitMqConsumer {
    //    @RabbitListener(queues = "queue1")
        @RabbitListener(bindings = @QueueBinding(
                // value = @Queue(value = "queue1"),
    		value = @Queue(), //切记: 此处无需设置队列名称,否在得话,多个消费者只有一个消费者能消费数据。其它消费者无法消费数据。
            exchange = @Exchange(value = "exchange1",type = ExchangeTypes.FANOUT)
        ))
        public void getData(Message message) {
            try {
                String str = new String(message.getBody(),"utf-8");
                System.out.println(str);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    消费者2号

    @Component
    public class RabbitMqConsumer {
    	// @RabbitListener(queues = "queue1")
        @RabbitListener(bindings = @QueueBinding(
    	// value = @Queue(value = "queue1"),
    		value = @Queue(), //切记: 此处无需设置队列名称,否在得话,多个消费者只有一个消费者能消费数据。其它消费者无法消费数据。
            exchange = @Exchange(value = "exchange1",type = ExchangeTypes.FANOUT)
        ))
        public void getData(Message message) {
            try {
                String str = new String(message.getBody(),"utf-8");
                System.out.println(str);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/爱喝兽奶帝天荒/article/detail/1002637
推荐阅读
相关标签
  

闽ICP备14008679号