当前位置:   article > 正文

rabbitmq(二),Exchang,Queue,4种交换机类型详解(附源码)_rabbitmq exchanges 和 queues的区别

rabbitmq exchanges 和 queues的区别

本文主要介绍Rabbitmq消息中间件里面的,Exchang(交换机)、交换机类型解释、Queue(队列)大家一起探讨 -

1、RabbitMQ来源

1.RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(也称为消息中间件),RabbitMQ服务器是用Erlang语言编写的。是由 LShift 提供的一个 Advanced Message Queuing Protocol (AMQP) 的开源实现,由以高性能、健壮以及可伸缩性出名。
2.AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。

2、RabbitMQ优点

1.开源、可靠性。提供可靠性消息投递模式(confirm)、返回模式(return),消息确认模式(ack)
2.灵活的路由
3.集群、高可用
4.支持市面上大部分的语言(java、c、Python、PHP、Swift、GO等等)

3、名词解释

1.Exchang
也称为交换机,和路由配合,用于接收生产者的消息,根据路由规则将消息投递到绑定的队列。
2.Routing Key
路由规则,交换机通过路由规则来投递消息到对应的队列
3.Binding
用于将Exchang和Queue、RoutingKey进行绑定,建立关系
4.Queue
消息队列,用于接收Exchang投递的消息

4、Exchang,交换机

RabbitMQ的交换机有4中类型:direct、fanout、topic、headers
1.direct类型,直连交换机,使用这种类型的交换机,投递消息的时候,生产者生产消息的Routing Key要和队列绑定的Routing Key完全匹配,才能将消息投递到对应的队列。
获取连接工厂:

public class RabbitmqConnectionFactory {
    /**
     * 获取一个rabbitmq连接工厂
     * @return
     */
    public static ConnectionFactory getConnectionFactory(){
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");// 主机
        factory.setPort(5672);// 端口号
        factory.setVirtualHost("/");// 虚拟机
        factory.setUsername("admin");// 用户名
        factory.setPassword("admin");// 密码
        factory.setAutomaticRecoveryEnabled(true);// 是否支持自动重连
        factory.setNetworkRecoveryInterval(3000);// 多久重连一次
        return factory;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

创建消费者:

public class DirectConsumer {

    // 队列
    private static final String QUEUE = "test.direct.queue";
    // 交换机
    private static final String EXCHANG_NAME = "test.direct.exchang";
    // 路由
    private static final String ROUTING_KEY = "test.direct";

    public static void main(String args[]) throws Exception {
        // 1、获取连接工厂
        ConnectionFactory connectionFactory = RabbitmqConnectionFactory.getConnectionFactory();
        // 2、创建连接
        Connection connection = connectionFactory.newConnection();
        // 3、创建通道
        Channel channel = connection.createChannel();
        // 4、创建交换机指定类型为direct
        channel.exchangeDeclare(EXCHANG_NAME, BuiltinExchangeType.DIRECT, true, false, false, null);
        // 创建队列
        channel.queueDeclare(QUEUE, true, false, false, null);
        // 队列、交换机和路由绑定
        channel.queueBind(QUEUE, EXCHANG_NAME, ROUTING_KEY);

        // 5、创建消费者
        MyConsumer consumer = new MyConsumer(channel);
        channel.basicConsume(QUEUE, true, consumer);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

创建生产者:

public class DirectProducer {

    // 交换机
    private static final String EXCHANG_NAME = "test.direct.exchang";
    // 这个路由要和队列绑定的路由完全相同才能投递到
    private static final String ROUTING_KEY = "test.direct";

    public static void main(String args[]) throws Exception {
        // 1、获取连接工厂
        ConnectionFactory connectionFactory = RabbitmqConnectionFactory.getConnectionFactory();
        // 2、创建连接
        Connection connection = connectionFactory.newConnection();
        // 3、创建 通道
        Channel channel = connection.createChannel();
        // 4、发送消息到交换机,交换机根据路由规则投递到队列
        String msg = "this is direct exchang msg!";
        channel.basicPublish(EXCHANG_NAME, ROUTING_KEY, null, msg.getBytes());
        // 5、关闭
        channel.close();
        connection.close();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

2.fanout,扇形交换机,也可以称为广播交换机,这种类型的交换机可以忽略路由Routing Key,可以将消息投递到所有绑定了这个交换机的队列

3.topic,主题交换机,Routing Key==Binding Key,支持模糊匹配,这种类型的交换机可以支持路由key全匹配,也支持模糊匹配,模糊匹配有两种方式:
test.topic.*,*号可以匹配一个单词,比如test.topic.user,test.topic.order都能匹配到,但是test.topic.user.save就不能匹配。
test.popic.#,#号支持多个单词匹配,比如test.topic.user,test.topic.user.save,test.topic.user.delete都可以匹配。

4、headers,头交换机,这个类型的交换机和fanout类型类似,在没有指定RoutingKey的时候,可以投递到所有绑定这个交换机的队列,这个类型的交换机主要根据header里面的参数来进行路由,在队列和交换机,路由建立绑定关系的时候配置头信息,里面有一个参数:x-match,这个参数有两个属性:
all:一个传送消息的header里的键值对和交换机的header键值对全部匹配,才可以路由到对应交换机
any:一个传送消息的header里的键值对和交换机的header键值对任意一个匹配,就可以路由到对应交换机
创建消费者:

public class HeadersConsumer {

    // 队列
    public static final String QUEUE = "test.header.queue";
    // 交换机
    public static final String EXCHANG_NAME = "test.header.exchang";

    public static void main(String args[]) throws Exception {
        // 1、获取连接工厂
        ConnectionFactory connectionFactory = RabbitmqConnectionFactory.getConnectionFactory();
        // 2、创建连接
        Connection connection = connectionFactory.newConnection();
        // 3、创建通道
        Channel channel = connection.createChannel();
        // 4、创建队列、交换机、绑定
        channel.exchangeDeclare(EXCHANG_NAME, BuiltinExchangeType.HEADERS, true);
        channel.queueDeclare(QUEUE, true, false,false, null);
        // x-match属性:
        // all:一个传送消息的header里的键值对和交换机的header键值对全部匹配,才可以路由到对应交换机
        // any:一个传送消息的header里的键值对和交换机的header键值对任意一个匹配,就可以路由到对应交换机
        Map<String, Object> headers = new HashMap<>();
        headers.put("name", "zhangsan");// 消费者设置了name属性
        headers.put("x-match", "all");
        channel.queueBind(QUEUE, EXCHANG_NAME, "", headers);
        // 5、创建消费者
        MyConsumer consumer = new MyConsumer(channel);
        channel.basicConsume(QUEUE, true, consumer);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

创建生产者:

public class HeadersProducer {

    // 交换机
    public static final String EXCHANG_NAME = "test.header.exchang";

    public static void main(String args[]) throws Exception {
        // 1、获取连接工厂
        ConnectionFactory connectionFactory = RabbitmqConnectionFactory.getConnectionFactory();
        // 2、创建连接
        Connection connection = connectionFactory.newConnection();
        // 3、创建通道
        Channel channel = connection.createChannel();
        // 4、发送消息,这里的头信息
        Map<String, Object> headers = new HashMap<>();
        // 这个属性要根据消费者设置的x-match的属性才能投递到
        // 消费者如果设置x-match=all,那么这里的key要和消费者设置的可以完全匹配,才能投递到
        // 消费者如果设置x-match=any,那么这里的可以和消费者设置的其中一个key匹配就能投递到
        headers.put("name", "zhangsan");
        AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().headers(headers).build();
        String msg = "this is headers exchang msg !";
        channel.basicPublish(EXCHANG_NAME, "", properties, msg.getBytes());
        // 5、关闭
        channel.close();
        connection.close();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

源码地址:https://github.com/wangyi0102/RabbitmqDemo

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/633729
推荐阅读
相关标签
  

闽ICP备14008679号