当前位置:   article > 正文

RabbitMQ的六种工作模式以及代码实现_rabbitmq 模式

rabbitmq 模式

一、什么是RabbitMQ

rabbitmq是基于amqp协议实现一套高效的数据传输组件,MQ(消息队列)。
常见的MQ:ActiveMQ、Kafka、RocketMQ、RabbitMQ
官方文档:https://www.rabbitmq.com/getstarted.html

二、MQ的应用场景

1、消息异步通知(注册时邮箱认证、添加商品生成详情页和将商品添加到搜索库等)
2、消息顺序处理
3、消息延迟处理
4、请求削峰

三、六种工作模式

1.1 simple简单模式
在这里插入图片描述
1)消息产生后将消息放入队列
2)消息的消费者(consumer) 监听(while) 消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除(隐患 消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失)应用场景:聊天(中间有一个过度的服务器;p端,c端)

1.2 work工作模式(资源的竞争)
在这里插入图片描述
1)消息产生者将消息放入队列消费者可以有多个,消费者1,消费者2,同时监听同一个队列,消息被消费?C1 C2共同争抢当前的消息队列内容,谁先拿到谁负责消费消息(隐患,高并发情况下,默认会产生某一个消息被多个消费者共同使用,可以设置一个开关(syncronize,与同步锁的性能不一样。保证一条消息只能被一个消费者使用)
2)应用场景:红包;大项目中的资源调度(任务分配系统不需知道哪一个任务执行系统在空闲,直接将任务扔到消息队列中,空闲的系统自动争抢)

1.3 publish/subscribe发布订阅(共享资源)
在这里插入图片描述
1)X代表交换机rabbitMQ内部组件,erlang 消息产生者是代码完成,代码的执行效率不高,消息产生者将消息放入交换机,交换机发布订阅把消息发送到所有消息队列中,对应消息队列的消费者拿到消息进行消费
2)相关场景:邮件群发,群聊天,广播(广告)

1.4 routing路由模式
在这里插入图片描述
1)消息生产者将消息发送给交换机按照路由判断,路由是字符串(info) 当前产生的消息携带路由字符(对象的方法),交换机根据路由的key,只能匹配上路由key对应的消息队列,对应的消费者才能消费消息
2)根据业务功能定义路由字符串
3)从系统的代码逻辑中获取对应的功能字符串,将消息任务扔到对应的队列中业务场景:error 通知;EXCEPTION;错误通知的功能;传统意义的错误通知;客户通知;利用key路由,可以将程序中的错误封装成消息传入到消息队列中,开发者可以自定义消费者,实时接收错误

1.5 topic 主题模式(路由模式的一种)
在这里插入图片描述
1)星号、#号代表通配符
2)星号代表一个单词,#号代表一个或多个单词
3)路由功能添加模糊匹配
4)消息产生者产生消息,把消息交给交换机
5)交换机根据key的规则模糊匹配到对应的队列,由队列的监听消费者接收消息消费

1.6 RPC
在这里插入图片描述
RPC即客户端远程调用服务端的方法 ,使用MQ可以实现RPC的异步调用,基于Direct交换机实现,流程如下:
1)客户端即是生产者也是消费者,向RPC请求队列发送RPC调用消息,同时监听RPC响应队列。
2)服务端监听RPC请求队列的消息,收到消息后执行服务端的方法,得到方法返回的结果。
3)服务端将RPC方法 的结果发送到RPC响应队列。
4)客户端(RPC调用方)监听RPC响应队列,接收到RPC调用结果。

四、工作模式的代码实现

1.1 simple简单模式
工具类

public class ConnectionUtil {
    //连接rabbitmq服务,共享一个工厂对象
    private static ConnectionFactory factory;
    static {
        factory=new ConnectionFactory();
        //设置rabbitmq属性
        factory.setHost("192.168.65.128");
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setVirtualHost("/admin");
        factory.setPort(5672);
    }
    public static Connection getConnection(){
        Connection connection=null;
        try {
            //获取连接对象
            connection = factory.newConnection();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return connection;
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

提供方

public class Provider {
    public static void main(String[] args) {
        try {
            //获取连接对象
            Connection connection = ConnectionUtil.getConnection();
            //获取通道对象
            Channel channel = connection.createChannel();
            //通过通道创建队列,后续所有的操作都是基于channel实现(队列也可以由消费方创建)
            channel.queueDeclare("queue1",false,false,false,null);
            //向队列中发送消息
            channel.basicPublish("","queue1",null,"Hello RabbitMQ!!!".getBytes());
            //断开连接
            connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

消费方

public class Consumer {
    public static void main(String[] args) {
        Connection connection = ConnectionUtil.getConnection();
        try {
            //获取通道对象
            Channel channel = connection.createChannel();
            //监听队列中的消息(消费的是队列,而不是交换机)
            channel.basicConsume("queue1",true,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者获得消息为:"+new String(body,"utf-8"));
                }
            });
            //消费方不需要关闭连接,保持一直监听队列状态
            //connection.close();
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

1.2 work工作模式(资源的竞争)
提供方

public class Provider {
    public static void main(String[] args) {
        try {
            //获取连接对象
            Connection connection = ConnectionUtil.getConnection();
            //获取通道对象
            Channel channel = connection.createChannel();
            //通过通道创建队列
            channel.queueDeclare("queue1",false,false,false,null);
            //向队列中发送消息
            for(int i=1;i<=10;i++){
                channel.basicPublish("","queue1",null,("Hello RabbitMQ!!!"+i).getBytes());
            }
            //断开连接
            connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

消费方1

public class Consumer {
    public static void main(String[] args) {
        Connection connection = ConnectionUtil.getConnection();
        try {
            //获取通道对象
            Channel channel = connection.createChannel();
            //监听队列中的消息
            channel.basicConsume("queue1",true,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者1获得消息为:"+new String(body,"utf-8"));
                }
            });
            //消费方不需要关闭连接,保持一直监听队列状态
            //connection.close();
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

消费方2

public class Consumer2 {
    public static void main(String[] args) {
        Connection connection = ConnectionUtil.getConnection();
        try {
            //获取通道对象
            Channel channel = connection.createChannel();
            //监听队列中的消息
            channel.basicConsume("queue1",true,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者2获得消息为:"+new String(body,"utf-8"));
                }
            });
            //消费方不需要关闭连接,保持一直监听队列状态
            //connection.close();
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

1.3 publish/subscribe发布订阅(共享资源)
提供方

//交换机和队列可以在提供方和消费方某一方创建,在两边同时创建也可以,只要创建的名称一致。保证,哪一方先运行则在哪一方创建
public class Provider {
    public static void main(String[] args) {
        try {
            //获取连接对象
            Connection connection = ConnectionUtil.getConnection();
            //获取通道对象
            Channel channel = connection.createChannel();
            //创建交换机(交换机没有存储数据的能力,数据存储在队列上,如果有交换机没队列的情况下,数据会丢失)  
            //1.参数一:交换机名称    参数二:交换机类型
            channel.exchangeDeclare("fanout_exchange","fanout");
            //通过通道创建队列
            //channel.queueDeclare("queue1",false,false,false,null);
            //向队列中发送消息
            for(int i=1;i<=10;i++){
                channel.basicPublish("fanout_exchange","",null,("Hello RabbitMQ!!!"+i).getBytes());
            }
            //断开连接
            connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

消费方1

public class Consumer {
    public static void main(String[] args) {
        Connection connection = ConnectionUtil.getConnection();
        try {
            //获取通道对象
            Channel channel = connection.createChannel();
            //创建队列
            channel.queueDeclare("fanout_queue1",false,false,false,null);
            //给队列绑定交换机
            channel.queueBind("fanout_queue1","fanout_exchange","");
            //监听队列中的消息
            channel.basicConsume("fanout_queue1",true,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者1获得消息为:"+new String(body,"utf-8"));
                }
            });
            //消费方不需要关闭连接,保持一直监听队列状态
            //connection.close();
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

消费方2

public class Consumer2 {
    public static void main(String[] args) {
        Connection connection = ConnectionUtil.getConnection();
        try {
            //获取通道对象
            Channel channel = connection.createChannel();
            //创建队列
            channel.queueDeclare("fanout_queue2",false,false,false,null);
            //给队列绑定交换机
            channel.queueBind("fanout_queue2","fanout_exchange","");
            //监听队列中的消息
            channel.basicConsume("fanout_queue2",true,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者2获得消息为:"+new String(body,"utf-8"));
                }
            });
            //消费方不需要关闭连接,保持一直监听队列状态
            //connection.close();
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

1.4 routing路由模式(不支持通配符)
提供方

//交换机和队列可以在提供方和消费方某一方创建,在两边同时创建也可以,只要创建的名称一致。保证,哪一方先运行则在哪一方创建
public class Provider {
    public static void main(String[] args) {
        try {
            //获取连接对象
            Connection connection = ConnectionUtil.getConnection();
            //获取通道对象
            Channel channel = connection.createChannel();
            //创建交换机(交换机没有存储数据的能力,数据存储在队列上,如果有交换机没队列的情况下,数据会丢失)   //1.参数一:交换机名称    参数二:交换机类型
            channel.exchangeDeclare("direct_exchange","direct");
            //向队列中发送消息
            for(int i=1;i<=10;i++){
                channel.basicPublish("direct_exchange",
                        "insert",  //设置路由键,符合路由键的队列,才能拿到消息
                        null,
                        ("Hello RabbitMQ!!!"+i).getBytes());
            }
            //断开连接
            connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

消费方1

public class Consumer {
    public static void main(String[] args) {
        Connection connection = ConnectionUtil.getConnection();
        try {
            //获取通道对象
            Channel channel = connection.createChannel();
            //创建队列
            channel.queueDeclare("direct_queue1",false,false,false,null);
            //绑定交换机(routingKey:路由键)
            channel.queueBind("direct_queue1","direct_exchange","select");
            channel.queueBind("direct_queue1","direct_exchange","insert");
            //监听队列中的消息
            channel.basicConsume("direct_queue1",true,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者1获得消息为:"+new String(body,"utf-8"));
                }
            });
            //消费方不需要关闭连接,保持一直监听队列状态
            //connection.close();
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

消费方2

public class Consumer2 {
    public static void main(String[] args) {
        Connection connection = ConnectionUtil.getConnection();
        try {
            //获取通道对象
            Channel channel = connection.createChannel();
            //创建队列
            channel.queueDeclare("direct_queue2",false,false,false,null);
            //绑定交换机(routingKey:路由键)
            channel.queueBind("direct_queue2","direct_exchange","delete");
            channel.queueBind("direct_queue2","direct_exchange","select");
            //监听队列中的消息
            channel.basicConsume("direct_queue2",true,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者2获得消息为:"+new String(body,"utf-8"));
                }
            });
            //消费方不需要关闭连接,保持一直监听队列状态
            //connection.close();
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

1.5 topic 主题模式(路由模式的一种,支持通配符)
提供方


//交换机和队列可以在提供方和消费方某一方创建,在两边同时创建也可以,只要创建的名称一致。保证,哪一方先运行则在哪一方创建
public class Provider {
    public static void main(String[] args) {
        try {
            //获取连接对象
            Connection connection = ConnectionUtil.getConnection();
            //获取通道对象
            Channel channel = connection.createChannel();
            //创建交换机(交换机没有存储数据的能力,数据存储在队列上,如果有交换机没队列的情况下,数据会丢失)   //1.参数一:交换机名称    参数二:交换机类型
            channel.exchangeDeclare("topic_exchange","topic");
            //向队列中发送消息
            for(int i=1;i<=10;i++){
                channel.basicPublish("topic_exchange",
                        "emp.hello world",  // #:匹配0-n个单词(之间以.区分,两点之间算一个单词,可以匹配hello world空格的情况)   *(匹配一个单词)
                        null,
                        ("Hello RabbitMQ!!!"+i).getBytes());
            }
            //断开连接
            connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

消费方1

public class Consumer {
    public static void main(String[] args) {
        Connection connection = ConnectionUtil.getConnection();
        try {
            //获取通道对象
            Channel channel = connection.createChannel();
            //创建队列
            channel.queueDeclare("topic_queue1",false,false,false,null);
            //绑定交换机(routingKey:路由键)  #:匹配0-n个单词(之间以.区分,两点之间算一个单词)
            channel.queueBind("topic_queue1","topic_exchange","emp.#");
            //监听队列中的消息
            channel.basicConsume("topic_queue1",true,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者1获得消息为:"+new String(body,"utf-8"));
                }
            });
            //消费方不需要关闭连接,保持一直监听队列状态
            //connection.close();
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

消费方2

public class Consumer2 {
    public static void main(String[] args) {
        Connection connection = ConnectionUtil.getConnection();
        try {
            //获取通道对象
            Channel channel = connection.createChannel();
            //创建队列
            channel.queueDeclare("topic_queue2",false,false,false,null);
            //绑定交换机(routingKey:路由键)  *:匹配1个单词(之间以.区分,两点之间算一个单词)
            channel.queueBind("topic_queue2","topic_exchange","emp.*");
            //监听队列中的消息
            channel.basicConsume("topic_queue2",true,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者2获得消息为:"+new String(body,"utf-8"));
                }
            });
            //消费方不需要关闭连接,保持一直监听队列状态
            //connection.close();
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

总结

1、交换机和队列可以在提供方和消费方某一方创建,在两边同时创建也可以,只要创建的名称一致。保证,哪一方先运行则在哪一方创建。
2、交换机没有存储数据的能力,数据存储在队列上,如果有交换机没队列的情况下,数据会丢失。
3、通配符:#:匹配0-n个单词(之间以.区分,两点之间算一个单词,可以匹配hello world空格的情况) 、 *(匹配一个单词)

有不清楚的地方可以在评论下方留言。。。既然来了,不妨点个关注,点个赞吧!!!

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号