当前位置:   article > 正文

RabbitMq常见的消息模型,及交换机类型_rabbitmq 普通发送消息和交换机发送消息

rabbitmq 普通发送消息和交换机发送消息

RabbitMq常见的消息模型,及交换机类型

1、简单队列模型

请添加图片描述

代码实现

1).生产者和消费者都要写配置文件

spring:
  rabbitmq:
    host: 192.168.4.216 # 主机名
    port: 5672 # 端口
    virtual-host: / # 虚拟主机
    username: username # 用户名
    password: password # 密码
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

2).生产者发送信息

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSimpleQueue() {
        // 队列名称
        String queueName = "simple.queue";
        // 消息
        String message = "hello, spring amqp!";
        // 发送消息
        rabbitTemplate.convertAndSend(queueName, message);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

3).消费者监听队列

@Component
public class SpringRabbitListener {

    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueueMessage(String msg) throws InterruptedException {
        System.out.println("spring 消费者接收到消息:【" + msg + "】");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

2、任务模型(Work Queue)

简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息

请添加图片描述

应用场景:当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用work 模型,多个消费者共同处理消息处理,速度就能大大提高了。

1).消息发送

@Test
public void testWorkQueue() throws InterruptedException {
    // 队列名称
    String queueName = "simple.queue";
    // 消息
    String message = "hello, message_";
    for (int i = 0; i < 50; i++) {
        // 发送消息
        rabbitTemplate.convertAndSend(queueName, message + i);
        Thread.sleep(20);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

2).消息接收

模拟多个消费者绑定同一个队列

@RabbitListener(queues = "simple.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {
    System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());
    Thread.sleep(20);
}

@RabbitListener(queues = "simple.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {
    System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());
    Thread.sleep(200);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

3).测试结果

请添加图片描述

本次测试让两个线程睡眠的时间不同,模拟的就是,一个消费者消费信息的速度比另一个快

消费者1很快完成了自己的25条消息。消费者2却在缓慢的处理自己的25条消息。

也就是说消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。这样显然是有问题的

4)能者多劳

在spring中有一个简单的配置,可以解决这个问题。我们修改consumer服务的application.yml文件,添加配置:

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个
  • 1
  • 2
  • 3
  • 4
  • 5

Work模型的使用:

  • 多个消费者绑定到一个队列,同一条消息只会被一个消费者处理
  • 通过设置prefetch来控制消费者预取的消息数量

3、发布/订阅模型

请添加图片描述

  • Publisher:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
  • Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有以下3种类型:
    • Fanout:广播,将消息交给所有绑定到交换机的队列
    • Direct:定向,把消息交给符合指定routing key 的队列
    • Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
  • Consumer:消费者,与以前一样,订阅队列,没有变化
  • Queue:消息队列也与以前一样,接收消息、缓存消息。

注意:Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

1、Fanout - 广播

在广播模式下,消息发送流程是这样的:

  • 可以有多个队列
  • 每个队列都要绑定到Exchange(交换机)
  • 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定
  • 交换机把消息发送给绑定过的所有队列
  • 订阅队列的消费者都能拿到消息

代码实现:

1).写一个配置类来声明队列和交换机

@Configuration
public class NotifyConfig {
    //交换机
    public static final String EXCHANGE_FANOUT = "fanout";
    //队列名称
    public static final String NOTIFY_QUEUE = "notify_queue";
    public static final String REPLY_QUEUE = "reply_queue";
    //声明交换机
    @Bean(EXCHANGE_FANOUT)
    public FanoutExchange exchange_direct(){
        // 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除
        return new FanoutExchange(EXCHANGE_FANOUT, true, false);
    }
    //声明队列
    @Bean("SIMPLE_QUEUE")
    public Queue simple_queue(){
        return QueueBuilder.durable("simple.queue").build();
    }
    //声明队列
    @Bean(NOTIFY_QUEUE)
    public Queue publish_queue(){
        return QueueBuilder.durable(NOTIFY_QUEUE).build();
    }
    //交换机和队列绑定
    @Bean
    public Binding binding_publish_queue(@Qualifier(NOTIFY_QUEUE) Queue queue, @Qualifier(EXCHANGE_FANOUT) FanoutExchange exchange){
        return BindingBuilder.bind(queue).to(exchange);
    }


}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

2).消息发送

void convertAndSend(String exchange, String routingKey, Object object)
//参数信息故名思意,中间这个routingkey到Direct来详细讲讲
rabbitTemplate.convertAndSend(NotifyConfig.EXCHANGE_FANOUT, "",message);

  • 1
  • 2
  • 3
  • 4

3).消息接收

@RabbitListener(queues = NotifyConfig.NOTIFY_QUEUE)
public void receive(String message) {
    System.out.println("数据类型为string :spring  notify 消费者接收到消息:【" + message + "】");

}
  • 1
  • 2
  • 3
  • 4
  • 5

2、Direct

在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。

请添加图片描述

在Direct模型下:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
  • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

1).消费者绑定

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue1"),
    exchange = @Exchange(name = "direct", type = ExchangeTypes.DIRECT),
    key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){
    System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】");
}

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue2"),
    exchange = @Exchange(name = "direct", type = ExchangeTypes.DIRECT),
    key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){
    System.out.println("消费者接收到direct.queue2的消息:【" + msg + "】");
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

注意:指定的key不一样

2).消息发送

rabbitTemplate.convertAndSend(exchangeName, "red", message);
  • 1

Fanout和Direct的差别

  • Fanout交换机将消息路由给每一个与之绑定的队列
  • Direct交换机根据RoutingKey判断路由给哪个队列
  • 如果多个队列具有相同的RoutingKey,则与Fanout功能类似

3、Topic

Topic类型的ExchangeDirect相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!

请添加图片描述

通配符规则:

#:匹配一个或多个词

*:匹配不多不少恰好1个词

举例

item.#:能够匹配item.spu.insert 或者 item.spu

item.*:只能匹配item.spu

1)、消息发送

rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
  • 1

2)、消息接收

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "topic.queue1"),
    exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
    key = "china.#"
))
public void listenTopicQueue1(String msg){
    System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】");
}

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "topic.queue2"),
    exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
    key = "#.news"
))
public void listenTopicQueue2(String msg){
    System.out.println("消费者接收到topic.queue2的消息:【" + msg + "】");
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

3)、结果

" + msg + “】”);
}

@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = “topic.queue2”),
exchange = @Exchange(name = “itcast.topic”, type = ExchangeTypes.TOPIC),
key = “#.news”
))
public void listenTopicQueue2(String msg){
System.out.println(“消费者接收到topic.queue2的消息:【” + msg + “】”);
}




3)、结果

两个队列都能收到消息。


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/598226
推荐阅读
相关标签
  

闽ICP备14008679号