当前位置:   article > 正文

RabbitMQ五种模式和使用场景_rabbitmq发送消息5中模式使用场景

rabbitmq发送消息5中模式使用场景

RabbitMQ
在这里插入图片描述

RabbitMQ的五种工作模式

  1. 简单模式
    在这里插入图片描述

说明: 生产者将消息交给默认的交换机(AMQP default),交换机将获取到的信息绑定这个生产者对应的队列上,监听当前队列的消费者获取消息,执行消息消费。
应用场景: 短信

  • 代码实例:
发布者
  • 1
// An highlighted block

    public static void main(String[] args) throws IOException, TimeoutException {
        //建立连接
        ConnectionFactory cf = new ConnectionFactory();
        cf.setHost("localhost");
        cf.setPort(5672);
        Connection connection = cf.newConnection();

        //建立信道
        Channel channel = connection.createChannel();

        //创建交换机

        //创建消息队列
         /*
                queue – 队列名
                durable – 是否持久化,如果为true则会持久化 (服务器重启后仍存在)
                exclusive – 是否独占,true则为独占队列,其他链接不能使用
                autoDelete – 是否自动删除消息,true则在使用完后删除
                arguments – map类型,其他参数
            * */
        channel.queueDeclare("queue52",false,false,true,null);

        //发送消息
        /*
         exchange 交换机
         routing-key 路由键
         properties 属性
         body 消息体
         */
        String message = "苟哥遛狗";
        channel.basicPublish("","queue52",null,message.getBytes("UTF-8"));
        System.out.println("[x] 消息发送成功! "+message);
    }
;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
消费者
  • 1
// An highlighted block
 public static void main(String[] args) throws IOException {
        //建立连接

        //建立信道
        Channel channel = ConnUtils.connect();

        //创建交换机

        //创建消息队列

        //接收消息
        /**
         * queue  队列名
         * deliverCallback  收到消息处理
         * cancelCallback   取消消息处理
         */
        channel.basicConsume(Constants.SIMPLE_QUEUE,
        (consumerTag,delivery)->{
            //处理消息
            byte[] body = delivery.getBody();
            String message = new String(body,"utf-8");
            System.out.println("[x] 收到消息"+message);
        },(consumerTag)->{});
    };
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  1. 工作模式
    在这里插入图片描述

说明: 生产者将消息交给默认的交换机(AMQP default),交换机将获取到的信息绑定这个生产者对应的队列. 由于监听这个队列的消费者较多,并且消息只能有一个被消费,就会造成消息竞争
应用场景: 抢红包,和资源任务调度

  • 代码实例:看图,参照第一种模式
  1. 发布订阅 publish/subscribe
    在这里插入图片描述
    注:此处的x代表交换机,发布订阅模式需要使用交换机,其类型为FANOUT(群发)

说明: 生产者将消息给交换机,交换机根据自身的类型(fanout)将会把所有消息复制同步到所有与其绑定的队列,每个队列可以有一个消费者接收消息进行消费逻辑
应用场景:广告

  • 代码实例
发布者
  • 1
        //建立连接


        //建立信道
        Channel channel = ConnUtils.connect();

        //(1)创建交换机,类型是FANOUT
        channel.exchangeDeclare(Constants.SUB_EXCHANGE, BuiltinExchangeType.FANOUT);

        //(2)创建队列移到消费者端执行

        //发送消息
        /*
         exchange 交换机
         routing-key 路由键
         properties 属性
         body 消息体
         */
        Scanner scanner = new Scanner(System.in);
        while(true) {
            print("请输入您要发送的消息");
            String message = scanner.next();
            if("exit".equals(message)){
                break;
            }
            //(3) 发给交换机,不提供routing-key
            channel.basicPublish(Constants.SUB_EXCHANGE, "", null, message.getBytes("UTF-8"));
            print("[x] 消息发送成功! " + message);
        }
    }

    private static void print(String str) {
        System.out.println("发布者:"+str);
    }

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
消费者
  • 1
public class SubConsumer {

    private String name;

    public SubConsumer(String name) {
        this.name = name;
    }


    public void consume() throws IOException {
        //建立信道
        Channel channel = ConnUtils.connect();

        //创建交换机

        //创建消息队列
        String queueName = channel.queueDeclare().getQueue();

        //绑定交换机
        channel.queueBind(queueName,Constants.SUB_EXCHANGE,"");

        //接收消息
        /**
         * queue  队列名
         * autoAck 自动确认收到
         * deliverCallback  收到消息处理
         * cancelCallback   取消消息处理
         */
        channel.basicConsume(queueName,true,
                (consumerTag,delivery)->{
                    //处理消息
                    byte[] body = delivery.getBody();
                    String message = new String(body,"utf-8");
                    System.out.println(name + "[x] 收到消息"+message);
                },(consumerTag)->{});
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  1. 路由模式 routing
    在这里插入图片描述
    说明: 生产者将消息发送到交换机信息携带具体的路由key,交换机的类型是direct,将接收到的信息中的routingKey,比对与之绑定的队列routingkey。消费者监听一个队列,获取消息,执行消费逻辑,
    应用场景: 根据生产者的要求发送给特定的一个或者一批队列发送信息。
  • 代码实例
发布者
  • 1
  //建立连接


        //建立信道
        Channel channel = ConnUtils.connect();

        //(1)创建交换机,类型是DIRECT
        channel.exchangeDeclare(Constants.ROUTE_EXCHANGE, BuiltinExchangeType.DIRECT);

        //创建队列移到消费者端执行

        //发送消息
        /*
         exchange 交换机
         routing-key 路由键
         properties 属性
         body 消息体
         */
        Scanner scanner = new Scanner(System.in);
        while(true) {
            print("请输入您要发送的消息");
            String message = scanner.next();
            print("请输入您的routing-key");
            String routingKey = scanner.next();
            if("exit".equals(message)){
                break;
            }
            //(2) 发给交换机,提供routing-key
            channel.basicPublish(Constants.ROUTE_EXCHANGE, routingKey, null, message.getBytes("UTF-8"));
            print("[x] 消息发送成功! " + message);
        }
    }

    private static void print(String str) {
        System.out.println("发布者:"+str);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

下面展示一些 内联代码片

消费者
  • 1
private String name;
    //(1) 加入bindingKey,用于队列与交换机的绑定
    private String bindingKey;

    public RouteConsumer(String name, String bindingKey) {
        this.name = name;
        this.bindingKey = bindingKey;
    }

    public void consume() throws IOException {
        //建立信道
        Channel channel = ConnUtils.connect();

        //创建交换机

        //创建消息队列
        String queueName = channel.queueDeclare().getQueue();

        //(2)通过bindingkey来绑定交换机
        channel.queueBind(queueName,Constants.ROUTE_EXCHANGE,bindingKey);

        //接收消息
        /**
         * queue  队列名
         * autoAck 自动确认收到
         * deliverCallback  收到消息处理
         * cancelCallback   取消消息处理
         */
        channel.basicConsume(queueName,true,
                (consumerTag,delivery)->{
                    //处理消息
                    byte[] body = delivery.getBody();
                    String message = new String(body,"utf-8");
                    System.out.println(name + "[x] 收到消息"+message);
                },(consumerTag)->{});
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
测试
  • 1
public class RouteApp
{
    public static void main( String[] args ) throws IOException {

        new RouteConsumer("小周","hotpot").consume();
        new RouteConsumer("小李","cc").consume();
        new RouteConsumer("小朱","hotpot").consume();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  1. 主题订阅模式 topic
    在这里插入图片描述
    交换机类型:topic
    bindingkey可以使用通配符 * # 代表通配符 , . 分隔符
    • * 代表一个单词
    • #代表零个或多个单词
    路由功能添加模糊匹配
  • 示例代码
发布者
  • 1
  //建立连接


        //建立信道
        Channel channel = ConnUtils.connect();

        //(1)创建交换机,类型是TOPIC
        channel.exchangeDeclare(Constants.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC);

        //创建队列移到消费者端执行

        //发送消息
        /*
         exchange 交换机
         routing-key 路由键
         properties 属性
         body 消息体
         */
        Scanner scanner = new Scanner(System.in);
        while(true) {
            print("请输入您要发送的消息");
            String message = scanner.next();
            print("请输入您的routing-key");
            String routingKey = scanner.next();
            if("exit".equals(message)){
                break;
            }
            //(2) 发给交换机,提供routing-key
            channel.basicPublish(Constants.TOPIC_EXCHANGE, routingKey, null, message.getBytes("UTF-8"));
            print("[x] 消息发送成功! " + message);
        }
    }

    private static void print(String str) {
        System.out.println("发布者:"+str);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
消费者
  • 1
public class TopicConsumer {

    private String name;
    //(1) 加入bindingKey,用于队列与交换机的绑定
    private String bindingKey;

    public TopicConsumer(String name, String bindingKey) {
        this.name = name;
        this.bindingKey = bindingKey;
    }

    public void consume() throws IOException {
        //建立信道
        Channel channel = ConnUtils.connect();

        //创建交换机

        //创建消息队列
        String queueName = channel.queueDeclare().getQueue();

        //(2)通过bindingkey来绑定交换机
        channel.queueBind(queueName,Constants.TOPIC_EXCHANGE,bindingKey);

        //接收消息
        /**
         * queue  队列名
         * autoAck 自动确认收到
         * deliverCallback  收到消息处理
         * cancelCallback   取消消息处理
         */
        channel.basicConsume(queueName,true,
                (consumerTag,delivery)->{
                    //处理消息
                    byte[] body = delivery.getBody();
                    String message = new String(body,"utf-8");
                    System.out.println(name + "[x] 收到消息"+message);
                },(consumerTag)->{});
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
测试
  • 1
public class TopicApp
{
    public static void main( String[] args ) throws IOException {
        // 特点.颜色.动物名   lazy.orange.rabbit  happy.pink.pig
        new TopicConsumer("*.orange.* ","*.orange.*").consume();
        new TopicConsumer("*.*.rabbit ","*.*.rabbit").consume();
        new TopicConsumer("lazy.# ","lazy.#").consume();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/秋刀鱼在做梦/article/detail/742539
推荐阅读
相关标签
  

闽ICP备14008679号