当前位置:   article > 正文

RabbitMQ(03)——RabbitMQ的Fanout消息模型_rabbitmq fanout 生产者发送消息

rabbitmq fanout 生产者发送消息

RabbitMQ——RabbitMQ的Fanout消息模型

Fanout消息模型结构

在这里插入图片描述
P:生产者,向Exchange发送消息
X: Exchange(交换机),接收生产者的消息
C:消费者,领取消息并消费消息

Fanout消息模型可以有多个消费者;

每个消费者都绑定有自己的队列queue(临时队列);

每个队列绑定到交换机exchange,这里使用的交换机是扇型交换机(funout exchange);

生产者生产的消息,只能发送到交换机,由交换机决定发送给哪个队列,生产者通常不知道消息是否会被传递到哪个队列;

交换机把消息发送给绑定到该交换机的所有队列,这也是扇型交换机的特点,所以也叫广播模型;

队列的消费者都能拿到消息,实现一条消息被多个消费者消费。

1、Fanout消息模型之发布者发布消息

消息生产者的开发

package com.cheng.fanout;

import com.cheng.utils.ConnectUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Provider {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectUtils.getConnection("121.199.53.150", 5672, "/ems", "ems", "ems");
        Channel channel = connection.createChannel();

        //声明与通道连接的交换机,参数一:交换机名称,如果没有,会自动创建,参数二:交换机类型 fanout扇形交换机
        channel.exchangeDeclare("logs","fanout");
        //发送消息
        channel.basicPublish("logs","",false,"fanout rabbitmq".getBytes());

        channel.close();
        connection.close();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

运行后,查看rabbitmq的管理控制页面:

在这里插入图片描述

发送了一条消息,Exchanges里面多了一个名为logs交换机。

2、Fanout消息模型之消费者消费消息

消息消费者的开发

Consumer1:

public class Consumer1 {
    public static void main(String[] args) throws IOException {
        Connection connection = ConnectUtils.getConnection("121.199.53.150", 5672, "/ems", "ems", "ems");
        final Channel channel = connection.createChannel();
        //声明与通道连接的交换机
        channel.exchangeDeclare("logs","fanout");
        //创建临时队列
        String queue = channel.queueDeclare().getQueue();
        //通道绑定队列和交换机
        channel.queueBind(queue,"logs","");
        //消费消息
        channel.basicConsume(queue,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumer1 fanout rabbitmq" + new String(body));
            }
        });
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

Consumer2:

public class Consumer2 {

    public static void main(String[] args) throws IOException {
        Connection connection = ConnectUtils.getConnection("121.199.53.150", 5672, "/ems", "ems", "ems");
        final Channel channel = connection.createChannel();
        //声明与通道连接的交换机
        channel.exchangeDeclare("logs","fanout");
        //创建临时队列
        String queue = channel.queueDeclare().getQueue();
        //通道绑定队列和交换机
        channel.queueBind(queue,"logs","");
        //消费消息
        channel.basicConsume(queue,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumer2 fanout rabbitmq" + new String(body));
            }
        });
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

Consumer3:

public class Consumer2 {

    public static void main(String[] args) throws IOException {
        Connection connection = ConnectUtils.getConnection("121.199.53.150", 5672, "/ems", "ems", "ems");
        final Channel channel = connection.createChannel();
        //声明与通道连接的交换机
        channel.exchangeDeclare("logs","fanout");
        //创建临时队列
        String queue = channel.queueDeclare().getQueue();
        //通道绑定队列和交换机
        channel.queueBind(queue,"logs","");
        //消费消息
        channel.basicConsume(queue,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumer2 fanout rabbitmq" + new String(body));
            }
        });
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

先执行三个消息消费者,监听队列中的消息,再执行消息生产者发送消息,查看控制台的输出信息:

consumer1:

在这里插入图片描述

consumer2:

在这里插入图片描述

consumer3:

在这里插入图片描述

实现了同一条消息被多个消费者消费。

扇型交换机的应用案例:

  • 大规模多用户在线(MMO)游戏可以使用它来处理排行榜更新等全局事件
  • 体育新闻网站可以用它来近乎实时地将比分更新分发给移动客户端
  • 分发系统使用它来广播各种状态和配置更新
  • 在群聊的时候,它被用来分发消息给参与群聊的用户。(AMQP没有内置presence的概念,因此XMPP可能会是个更好的选择)
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/430637
推荐阅读
相关标签
  

闽ICP备14008679号