当前位置:   article > 正文

rabbitmq的4种交换机_rabbitmq交换机类型

rabbitmq交换机类型

RabbitMQ交换机类型

1、Direct exchange: 直连交换机,根据Routing Key(路由键)进行投递到不同队列。

路由键1
路由键2
Direct exchange
消息队列1
消息队列2

代码实现:

消息发送者,在pom.xml导入相关依赖
  • 1
	<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
     </dependencies>

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
在application.yml自定义direct交换机的属性
  • 1
# 应用名称
spring:
  application:
    name:
      001-rabbitmq-send
  rabbitmq:
    host: 192.168.126.124 #rabbitmq服务器地址
    port: 5672 #AMQP协议端口
    username: admin #账号
    password: admin #密码
    virtual-host: / #虚拟机
server:
  port:
    1001

#自定义消息队列的属性
custom:
  rabbitmq:
    # direct类型的消息的发送
    direct:
      queue-name: directQueue
      exchange-name: directExchange
      routing-key: directRoutingKey
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
读取application.yml自定义属性付给实体类对象
  • 1
@Data
@Component
@ConfigurationProperties(prefix = "custom.rabbitmq.direct")
public class ExchangeProperties {
    private String queueName;
    private String exchangeName;
    private String routingKey;
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
声明Direct类型交换机、消息队列、绑定关系
  • 1
@Configuration
public class RabbitMqConfiguration {
    @Autowired
    private ExchangeProperties exchangeProperties

    //声明Direct类型交换机、消息队列、绑定关系
    @Bean
    public DirectExchange directExchange(){
        //默认创建的交换机为持久化、非自动删除(自动删除没有监听的交换机,该交换机会被自动删除)的交换机
        return new DirectExchange(exchangeProperties.getExchangeName());
    }

    @Bean
    public Queue directQueue(){
        //默认创建的消息队列持久化、非自动删除、非排外(当前消息可以被任意消息消费,但是只能有一个消费者消费)
        return new Queue(exchangeProperties.getQueueName());
    }

    @Bean
    public Binding directBinding(Queue directQueue,DirectExchange directExchange){
        return BindingBuilder
                .bind(directQueue)
                .to(directExchange)
                .with(exchangeProperties.getRoutingKey());
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
消息发送者发送消息
  • 1
@Service
public class SendMsgImpl implements SendMsg {
    @Autowired
    private RabbitTemplate rabbitTemplate;

 
    @Autowired
    private ExchangeProperties exchangeProperties;


    @Override
    public void sendDirectMessage() throws JsonProcessingException {


        rabbitTemplate.convertAndSend(
                exchangeProperties.getExchangeName(),
                exchangeProperties.getRoutingKey(),
                "这是第2条测试消息"
        );
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
消息接收者,application.yml
  • 1
# 应用名称
spring:
  application:
    name:
      001-rabbitmq-receive
  rabbitmq:
    host: 192.168.126.124 #rabbitmq服务器地址
    port: 5672 #AMQP协议端口
    username: admin #账号
    password: admin #密码
    virtual-host: / #虚拟机
    listener:
      simple:
        #消息确认机制,manual表示手动确认,auto代表自动确认
        acknowledge-mode: manual
# 应用服务 WEB 访问端口
server:
  port:
    2001
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
消息接收者接收消息并手动确认消息
  • 1
package com.wen.servive.impl;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.Channel;
import com.wen.servive.ReceiveMsg;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.util.List;
import java.util.Map;

@Service
public class ReceiveMsgImpl implements ReceiveMsg {

    @RabbitListener(queues = "directQueue")
    public void receiveDirectMessage(Message msg, Channel channel) throws IOException {
        System.out.println(msg);
        //手动消费,注入一个参数Channel对象
        channel.basicAck(
                //根据消息标识,来手动消费消息
                msg.getMessageProperties().getDeliveryTag(),false);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

2、Fanout exchange
不使用路由键,使用广播模式,将消息传递到所有绑定Fanout exchang的消息队列,当消息队列未被消息消费者监听时,自动删除消息队列,所以消息队列的消息是非持久化。

binding
binding
Direct exchange
消息队列1
消息队列2
消息发送者,application.yml自定义fanout交换机的属性
  • 1
#自定义消息队列的属性
custom:
  rabbitmq:
    # fanout类型的消息的发送
    fanout:
      queue-name: fanoutQueue
      exchange-name: fanoutExchange
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
读取application.yml自定义属性付给实体类对象
  • 1
@Data
@Component
@ConfigurationProperties(prefix = "custom.rabbitmq.fanout")
public class FanoutProperties {

    private String queueName;
    private String exchangeName;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
消息发送者发送消息
  • 1
   @Override
    public void sendFanoutMessage() throws JsonProcessingException {
        rabbitTemplate.convertAndSend(
                //Fanout交换机没有路由键
                fanoutProperties.getExchangeName(), "","这是第3条测试消息");

    }

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
消息接收者接收消息服务
 - 声明fanout类型交换机、消息队列、绑定关系 
 - 接收消息
  • 1
  • 2
  • 3
   @RabbitListener(bindings = {
            @QueueBinding(
                    //声明绑定关系中的消息队列,如果没有命名消息队列名称,是随机命名的名称
                    value = @Queue,
                    exchange = @Exchange(name="fanoutExchange",type = "fanout")
            )
    })
    public void receiveFanoutMessage(Message msg, Channel channel) throws IOException {
        System.out.println(msg);
        //手动消费,注入一个参数Channel对象
        channel.basicAck(
                //根据消息标识,来手动消费消息
                msg.getMessageProperties().getDeliveryTag(), false
        );
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

3、Topic exchange:主题交换机,对路由键进行模式匹配后进行投递,符号#表示一个或多个词,.表示一个词。

例子:
路由键为routingKeyA aa,routingKeyB aa.bb,routingKeyC aa.bb.cc
Topic exchange的路由通配规则如下
aa.*  必须以aa开头的两个单词,只有routingKeyB符合
aa.# 必须以aa开头的零个或多个单词,routingKeyA routingKeyB routingKeyC符合
  • 1
  • 2
  • 3
  • 4
  • 5
消息发送者,application.yml自定义Topic 交换机的属性
  • 1
#自定义消息队列的属性
custom:
  rabbitmq:
      # topic类型的消息的发送
    topic:
      queue-name1: topicQueue1
      queue-name2: topicQueue2
      queue-name3: topicQueue3
      binding-key1: aa
      binding-key2: aa.*
      binding-key3: aa.#
      exchange-name: topicExchange
      routing-key1: aa
      routing-key2: aa.bb
      routing-key3: aa.bb.cc
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
读取application.yml自定义属性付给实体类对象
  • 1
@Data
@Component
@ConfigurationProperties(prefix = "custom.rabbitmq.topic")
public class TopicProperties {
    private String queueName1;
    private String queueName2;
    private String queueName3;
    private String bindingKey1;
    private String bindingKey2;
    private String bindingKey3;
    private String exchangeName;
    private String routingKey1;
    private String routingKey2;
    private String routingKey3;
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
声明Topic类型交换机、消息队列、绑定关系
  • 1
@Configuration
public class RabbitMqConfiguration {
   
    @Autowired
    private TopicProperties topicProperties;

    //声明 Topic类型交换机(1)、消息队列(3)、绑定关系(3)
    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange(topicProperties.getExchangeName());
    }
    @Bean
    public Queue topicQuene1(){
        return new Queue(topicProperties.getQueueName1());
    }

    @Bean
    public Queue topicQuene2(){
        return new Queue(topicProperties.getQueueName2());
    }
    @Bean
    public Queue topicQuene3(){
        return new Queue(topicProperties.getQueueName3());
    }

    @Bean
    public Binding topicBinding1(){
        return BindingBuilder
                .bind(topicQuene1())
                .to(topicExchange())
                .with(topicProperties.getBindingKey1());
    }

    @Bean
    public Binding topicBinding2(){
        return BindingBuilder
                .bind(topicQuene2())
                .to(topicExchange())
                .with(topicProperties.getBindingKey2());
    }
    @Bean
    public Binding topicBinding3(){
        return BindingBuilder
                .bind(topicQuene3())
                .to(topicExchange())
                .with(topicProperties.getBindingKey3());
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
消息发送者发送消息
  • 1
 @Override
    public void sendTopicMessage() throws JsonProcessingException {
        rabbitTemplate.convertAndSend(
                topicProperties.getExchangeName(), topicProperties.getRoutingKey1(),"这是第4条测试消息");

        rabbitTemplate.convertAndSend(
                //Fanout交换机没有路由键
                topicProperties.getExchangeName(), topicProperties.getRoutingKey2(),"这是第5条测试消息");

        rabbitTemplate.convertAndSend(
                //Fanout交换机没有路由键
                topicProperties.getExchangeName(), topicProperties.getRoutingKey3(),"这是第6条测试消息");
    }

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
消息接收者接收消息
  • 1
   @RabbitListener(queues = {"topicQueue1",
            "topicQueue2",
            "topicQueue3"
            })
    public void receiveTopicMessage(Message msg, Channel channel) throws IOException {
        System.out.println(msg);
        //手动消费,注入一个参数Channel对象
        channel.basicAck(
                //根据消息标识,来手动消费消息
                msg.getMessageProperties().getDeliveryTag(), false
        );
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

4、Header exchange:头交换机,不处理路由键。而是根据发送的消息内容中的headers属性进行匹配。

死信队列:存放过期消息的队列,对应有死信交换机和死信消费者

消息发送者,application.yml自定义正常交换机和死信交换机的属性
  • 1
#自定义消息队列的属性
custom:
  rabbitmq:
    #模拟死性队列的场景 ,direct类型的交换机
    model:
      queue-name: modelQuene
      exchange-name: modelExchange
      routing-key: modelRoutingKey
    #direct类型的交换机
    dead:
      queue-name: deadQuene
      exchange-name: deadExchange
      routing-key: deadRoutingKey

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
读取application.yml自定义属性付给实体类对象
  • 1
@Data
@Component
@ConfigurationProperties(prefix = "custom.rabbitmq.model")
public class ModelProperties {
    private String queueName;
    private String exchangeName;
    private String routingKey;
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
@Data
@Component
@ConfigurationProperties(prefix = "custom.rabbitmq.dead")
public class DeadProperties {
    private String queueName;
    private String exchangeName;
    private String routingKey;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
声明正常交换机和死信交换机、消息队列、绑定关系
  • 1
@Configuration
public class RabbitMqConfiguration {
    @Autowired
    private ModelProperties modelProperties;

    @Autowired
    private DeadProperties deadProperties;
    
    //声明死性队列场景
    //向Model中发送消息,等待消息过期,会自动发送到死信交换机中,根据死信routingKey转发到死信队列中
    @Bean
    public DirectExchange modelExchange(){
        return new DirectExchange(modelProperties.getExchangeName());
    }
    @Bean
    public Queue modelQuene(){
        Map<String,Object> arguments = new HashMap<>();
        //指定消息过期的时间,30秒过期
        arguments.put("x-message-ttl",30*1000);
        //指定死信交换机
        arguments.put("x-dead-letter-exchange",deadProperties.getExchangeName());
        //指定死信routingKey
        arguments.put("x-dead-letter-routing-key",deadProperties.getRoutingKey());
        return new Queue(modelProperties.getQueueName(),true,false,false,arguments);
    }

    @Bean
    public Binding modelBinding(){
        return BindingBuilder
                .bind(modelQuene())
                .to(modelExchange())
                .with(modelProperties.getRoutingKey());
    }

    @Bean
    public DirectExchange deadExchange(){
        return new DirectExchange(deadProperties.getExchangeName());
    }
    @Bean
    public Queue deadQuene(){
        return new Queue(deadProperties.getQueueName());
    }
    @Bean
    public Binding deadBinding(){
        return BindingBuilder
                .bind(deadQuene())
                .to(deadExchange())
                .with(deadProperties.getRoutingKey());
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
消息发送者发送消息到正常交换机
  • 1
    @Override
    public void sendModelMessage() throws JsonProcessingException {
        rabbitTemplate.convertAndSend(
                modelProperties.getExchangeName(), modelProperties.getRoutingKey(),"这是第7条测试消息");
    }
  • 1
  • 2
  • 3
  • 4
  • 5

死信消息接收者接收消息

@Service
public class ReceiveMsgImpl implements ReceiveMsg {
   
    //接收modelQueue的消息队列,测试死信队列,这里不能消费消息
   // @RabbitListener(queues = "modelQueue")
    public void receiveModelMessage(Message msg, Channel channel) throws IOException {
        System.out.println(msg);
        //手动消费,注入一个参数Channel对象
        channel.basicAck(
                //根据消息标识,来手动消费消息
                msg.getMessageProperties().getDeliveryTag(), false
        );
    }

    //  当modelQueue消息超时时,会被转发到死信队列中进行处理
    @RabbitListener(queues = "deadQuene")
    public void receiveDeadMessage(Message msg, Channel channel) throws IOException {
        System.out.println("receiveDeadMessage:"+msg);
        //手动消费,注入一个参数Channel对象
        channel.basicAck(
                //根据消息标识,来手动消费消息
                msg.getMessageProperties().getDeliveryTag(), false);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/633713
推荐阅读
相关标签
  

闽ICP备14008679号