当前位置:   article > 正文

Springboot 集成 RabbitMQ —— 发布订阅模式的广播实现_springboot rabbitmq 广播消息

springboot rabbitmq 广播消息

通过前面的学习,可以发现,消息都是通过交换器发送至队列的,一条消息只能被一个消费者处理,实际开发中还会有一种情况,就是一条消息需要被多个消费者处理,就是广播的形式;广播的模式需要使用到 FanoutExchange (散列交换器),FanoutExchange 会将消息发送至每一个与之绑定的队列中

FanoutExchange

代码主体没有太大的改动,增加了 FanoutExchange,并且将队列绑定至 FanoutExchange
在这里插入图片描述
截图中被标记的部分如下

@Configuration
public class MQTopicConfig {
    @Value("${spring.rabbitmq.host}")
    private String host;

    @Value("${spring.rabbitmq.port}")
    private int port;

    @Value("${spring.rabbitmq.username}")
    private String username;

    @Value("${spring.rabbitmq.password}")
    private String password;

    @Value("${spring.rabbitmq.publisher-returns}")
    private boolean publisherReturns;


    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setPublisherReturns(publisherReturns);
        connectionFactory.setPublisherConfirms(true);
        return connectionFactory;
    }

    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    //必须是prototype类型
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        return template;
    }

    @Bean
    public Queue coreQueue() {
        return new Queue(Constant.HOSH_TOPIC);
    }

    @Bean
    public Queue subCoreQueue() {
        return new Queue(Constant.HOSH_TOPIC_NEW);
    }

    @Bean
    public TopicExchange coreTopicExchange() {
        return new TopicExchange(Constant.HOSH_TOPIC_EXC);
    }

    @Bean
    FanoutExchange fanoutExchange() {       // 广播交换器
        return new FanoutExchange(Constant.HOSH_BROADCAST_EXC);
    }

    @Bean
    public Binding bindingFanoutExchange() {    // coreQueue 队列绑定至广播交换器
        return BindingBuilder.bind(coreQueue()).to(fanoutExchange());
    }

    @Bean
    public Binding bindingSubFanoutExchange() {     // subCoreQueue 队列绑定至广播交换器
        return BindingBuilder.bind(subCoreQueue()).to(fanoutExchange());
    }

    @Bean
    public Binding bindingCoreExchange() {
        return BindingBuilder.bind(coreQueue()).to(coreTopicExchange()).with(Constant.HOSH_TOPIC);
    }

    @Bean
    public Binding bindingSubCoreExchange() {
        return BindingBuilder.bind(subCoreQueue()).to(coreTopicExchange()).with(Constant.HOSH_TOPIC_NEW);
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78

生产者

把消息发送至散列交换器 FanoutExchange

@Component
public class HoshMQSender implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback{

    private RabbitTemplate rabbitTemplate;

    @Autowired
    public HoshMQSender(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
        this.rabbitTemplate.setConfirmCallback(this);
        this.rabbitTemplate.setReturnCallback(this);
    }

    public void send(String str) {
        sendMessageWithAck(str);
    }

    private void sendMessageWithAck(String str) {
        // 消息内容
        byte[] toSendBytes = str.getBytes();
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setMessageId(String.valueOf(System.currentTimeMillis()));
        Message msg = new Message(toSendBytes, messageProperties);
        CorrelationData correlationData = new CorrelationData();
        correlationData.setId(UUID.randomUUID().toString());
//        rabbitTemplate.convertAndSend(Constant.HOSH_TOPIC_EXC, Constant.HOSH_TOPIC, msg, correlationData);
        rabbitTemplate.convertAndSend(Constant.HOSH_BROADCAST_EXC, "", msg, correlationData);

    }

    // 队列内容发送到 MQ 的确认
    @Override
    public void confirm(CorrelationData correlationData, boolean b, String s) {
//        if (b) {
//            System.out.println("recv HoshMQSender confirm id=" + correlationData.getId());
//        } else {
//            System.out.println("not recv " + s);
//        }
    }

    /**
     * exchange 到达 queue, 则 returnedMessage 不回调
     * exchange 到达 queue 失败, 则 returnedMessage 回调
     * 需要设置spring.rabbitmq.publisher-returns=true
     * @param message
     * @param replyCode
     * @param replyText
     * @param exchange
     * @param routingKey
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        System.out.println("sender return success" + message.toString()
                +"\n replyCode "+replyCode+"\n replyText "+replyText
                +"\n exchange "+exchange + "\n routingKey "+routingKey);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56

消费者

为了实现广播的效果,需要两个消费者

1、消费者1

@Component
public class HoshMqReceiver {

    @RabbitHandler
    @RabbitListener(queues = Constant.HOSH_TOPIC_NEW)
    public void onReceiver(String info, Channel channel, Message msg) throws IOException {

        System.out.println("HoshMqReceiver msg " + msg);
        System.out.println("HoshMqReceiver info " + info);
        channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

2、消费者2

@Component
public class HoshMqReceiver2 {

    @RabbitHandler
    @RabbitListener(queues = Constant.HOSH_TOPIC)
    public void onReceiver(String info, Channel channel, Message msg) {
        try {
            // 开启手动应答 ack 以后,只有当程序明确回复,数据已经被处理,
            // 对应数据才会被 RabbitMQ server 清除,否则保留在 RabbitMQ server 上
            channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
            System.out.println("HoshMqReceiver2 msg " + JSON.toJSONString(msg));
            System.out.println("HoshMqReceiver2 info " + info);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

测试效果

发送消息测试一下,先看看测试结果
在这里插入图片描述
可以发现,在使用 FanoutExchange 后,一条消息会发送至所有与其绑定的队列中,而后,监听了对应队列的消费者就可以获取到同一条消息

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小桥流水78/article/detail/1001395
推荐阅读
相关标签
  

闽ICP备14008679号