当前位置:   article > 正文

RabbitMQ广播模式(动态生成queue)_rabbitmq 广播一对多

rabbitmq 广播一对多

RabbitMQ的广播机制和ActiveMQ有所不同。
先来梳理下RabbitMQ中消息从产生到消费的流程吧:
在这里插入图片描述
而exchange 存在多种类型,这里就只说广播模式(fanout)了。在广播模式中,一个exchange对应多个queue,会向每个queue都发送信息,然后不同的queue再由其对应的消费者消费信息,即完成了广播。
因为广播模式中不关注routingkey和queue,只需要queue的queue name唯一即可,所以这里把routingkey移出来了,实际上还是会经过的哦。

在这里插入图片描述

1.新建一个spring boot 项目并引入官方的amqp包

<dependency>
	<groupId>org.springframework.amqp</groupId>
	<artifactId>spring-rabbit</artifactId>
</dependency>
  • 1
  • 2
  • 3
  • 4

2.添加RabbitMQ连接参数

spring.rabbitmq.host=xx
spring.rabbitmq.port=5672
spring.rabbitmq.username=xx
spring.rabbitmq.password=xx
  • 1
  • 2
  • 3
  • 4

3.创建生产者

同时添加一个配置参数rabbit.exchange,用来动态指定exchange(比如使用apollo或者spring cloud config)
增加配置参数

rabbit.exchange=testExchange
  • 1

增加生产者

@Component
public class Producer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Value("${rabbit.exchange}")
    private String exchange;

    public void sendInfo(){
        Message message = new Message("123".getBytes(),new MessageProperties());
        rabbitTemplate.send(exchange,"",message);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

4.动态创建queue和消费者

因为需要执行createQueue方法才能生成一个queue和消费者,所以这里先用@Component指定扫描当前类,再用@PostConstruct指定扫描时执行该方法。
因为创建的queue是临时queue,当消费者消失时,该queue就会自动删除,因为创建queue也是由rabbitMQ自行生成的,所以queue name一定是唯一的。这样在集群部署时,就可以做到即开即用了,就算关闭了服务,对应的queue也会自动消失。

@Component
public class Consumer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Value("${rabbit.exchange}")
    private String exchange;

    @PostConstruct
    public void createQueue(){
        Channel channel = rabbitTemplate.getConnectionFactory().createConnection().createChannel(true);

        try{
            /**
             * 与生产者使用同一个交换机
             */
            channel.exchangeDeclare(exchange, "fanout",true);
            /**
             * 获取一个随机的队列名称,使用默认方式,产生的队列为临时队列,在没有消费者时将会自动删除
             */
            String queueName = channel.queueDeclare().getQueue();

            /**
             * 关联 exchange 和 queue ,因为是广播无需指定routekey,routingKey设置为空字符串
             */
            // channel.queueBind(queue, exchange, routingKey)
            channel.queueBind(queueName, exchange, "");

            com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");

                    if(StringUtils.isEmpty(message)){
                        return;
                    }

                    /**
                     * 对信息做操作
                     */
                }

            };
            //true 自动回复ack
            channel.basicConsume(queueName, true, consumer);
        }catch (Exception ex){
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49

5.手动在RabbitMQ控制界面创建exchange

如下图选择广播模式,并且为永久exchange,如果要设置临时exchange的话要修改4中的如下语句最后一个参数为false

channel.exchangeDeclare(exchange, "fanout",true);
  • 1

在这里插入图片描述

6.启动项目,测试

Consumer中打个断点,就能看到在启动时调用createQueue后所产生的队列名
在这里插入图片描述
到RabbitMQ控制台上搜索下,确实存在
在这里插入图片描述
点开看下对应的exchange
在这里插入图片描述
完全OK,发送条信息试下,也能够接收到
在这里插入图片描述
在这里插入图片描述
可以试下再重复第4步,再建一个queue,或者再起一个项目,你会发现两个消费者都能接收到信息。

然后我们把项目关闭,看看queue还在不在,明显已经木有了。
在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/688500
推荐阅读
相关标签
  

闽ICP备14008679号