当前位置:   article > 正文

RabbitMQ详解与实战(绝对足够惊喜)_rabbitmq 实战

rabbitmq 实战

在这里插入图片描述

什么是RabbitMQ

RabbitMQ 是一个开源的消息队列中间件,它实现了高度可靠、灵活和可扩展的消息传递模型。它基于 AMQP(高级消息队列协议)来进行消息的传输和交互。

以下是 RabbitMQ 的一些重要组成部分和特性的详细介绍:

  1. 消息队列:RabbitMQ 使用消息队列来存储和传递消息。消息队列通过先进先出(FIFO)的方式处理消息,允许生产者将消息发送到队列,然后消费者从队列中接收这些消息。

  2. 生产者:生产者是发送消息到 RabbitMQ 交换机的应用程序。生产者将消息发布到特定的交换机,并且可以选择将消息发送到特定的队列或交换机。

  3. 交换机:交换机是 RabbitMQ 接收生产者消息并路由到相应队列的组件。它根据指定的规则(路由键)将消息发送到一个或多个绑定的队列。

  4. 队列:队列是 RabbitMQ 中消息的目的地。生产者通过交换机将消息发送到队列,而消费者从队列中接收消息以进行处理。

  5. 消费者:消费者是从 RabbitMQ 队列中获取消息并对其进行处理的应用程序。消费者订阅一个或多个队列,并接收队列中的消息。

  6. 路由:RabbitMQ 使用路由机制将消息从交换机路由到队列。这是通过在交换机和队列之间建立绑定关系,并使用路由键来匹配消息。

  7. ACK 机制:RabbitMQ 提供了 ACK(确认)机制,确保消息被正确处理。一旦消费者接收到并处理了消息,它可以发送一个 ACK 给 RabbitMQ,告知消息已被处理。如果消费者在处理消息过程中发生故障或崩溃,RabbitMQ 将重新传递未确认的消息给其他消费者。

  8. 可靠性:RabbitMQ 提供了可靠的消息传递机制。使用持久化(durable)队列和消息可以确保即使在发生故障或重启后,消息也不会丢失。

  9. 可扩展性:RabbitMQ 支持分布式部署和多节点集群,可以通过添加更多的节点来提高消息处理能力和容错性。

  10. 插件系统:RabbitMQ 提供了丰富的插件系统,允许用户根据自己的需求扩展和定制功能,例如通过 SSL 加密传输、使用不同的身份验证方式等。

总结:RabbitMQ 是一个功能强大的消息队列中间件,它提供了高度可靠、灵活和可扩展的消息传递模型。通过使用生产者、交换机、队列和消费者,开发人员可以构建可靠的分布式系统,实现异步通信和解耦应用程序的组件。

RabbitMQ与Kafka的区别

RabbitMQ 和 Kafka 都是流行的消息队列系统,它们在设计和用途上有一些区别。以下是 RabbitMQ 和 Kafka 之间的主要区别:

  1. 数据处理模型:
  • RabbitMQ:RabbitMQ 是一个传统的消息队列中间件,采用的是面向消息的数据处理模型。它接收、存储和转发消息,并使用AMQP等协议提供可靠的消息传递机制。
  • Kafka:Kafka 是一个高吞吐量的分布式流数据平台,采用发布-订阅模型。它以持久化并分区的方式存储消息,并支持批量读写,适用于大规模实时数据流处理场景。
  1. 数据保留时间:
  • RabbitMQ:RabbitMQ 默认情况下不会保留消息,即使消费者没有接收到消息,也不会在消息队列中保留太长时间。
  • Kafka:Kafka 保留所有的消息,并根据配置的时间保留策略(例如时间段或消息大小)决定消息在存储中的保留时间。
  1. 功能特性:
  • RabbitMQ:RabbitMQ 提供高级消息队列协议(AMQP)的完整实现,并且支持多种交换机类型、消息确认、消息持久化、消息优先级等功能。它还有广泛的插件生态系统可供扩展。
  • Kafka:Kafka 提供高吞吐量的消息传递,保证消息的可靠性和持久化存储。它支持流处理功能,具有日志存储和批量消费的特点。
  1. 扩展性和可靠性:
  • RabbitMQ:RabbitMQ 以队列为基本单位,并使用内存来管理消息,在高负载情况下可能会出现性能瓶颈。
  • Kafka:Kafka 具有良好的水平伸缩性,可以通过添加更多的节点来提高吞吐量和容错性。它使用磁盘文件存储消息,可以大规模地处理海

RabbitMQ与Kafka的各自适用场景

RabbitMQ适用场景:

  • 可靠性:RabbitMQ强调消息的可靠性传递,支持事务和持久化等机制,适用于需要确保每条消息都能被准确处理的场景。它适合于任务队列、工作流、订单处理等需要精确控制消息交付和顺序的应用。
  • 灵活的路由:RabbitMQ提供了灵活的路由机制,通过交换机和绑定规则将消息路由到特定的队列,适合处理复杂的消息路由需求。
  • 多语言支持:RabbitMQ提供了多种客户端库,支持多种编程语言,便于不同语言环境下的开发和集成。

Kafka适用场景:

  • 高吞吐量:Kafka注重高吞吐量和低延迟,适合处理大量数据流和日志类型的应用。它具备非常高的读写能力,能够同时处理大量的实时数据流。
  • 持久性存储:Kafka将消息以日志形式持久化到磁盘,可支持长时间存储数据,可以用作数据源和数据传输中间件,适合构建实时流处理和事件驱动的应用。
  • 分布式架构:Kafka具备分布式、可扩展的特性,支持水平扩展、副本集群和故障容错等功能,适合大规模分布式系统和多节点集群环境。

总结:
RabbitMQ适用于强调消息可靠性传递和复杂路由的场景,适合任务队列、工作流和订单处理等应用。Kafka适用于高吞吐量、低延迟、分布式架构和持久性存储,适合大规模数据流处理和实时流处理等应用。选择合适的系统取决于具体的业务需求和使用场景。

RabbitMQ的安装

Erlang下载安装

wget --content-disposition https://packagecloud.io/rabbitmq/erlang/packages/el/7/erlang-22.3.4.12-1.el7.x86_64.rpm/download.rpm

yum localinstall erlang-22.3.4.12-1.el7.x86_64.rpm
  • 1
  • 2
  • 3

RabbitMQ下载安装

wget --content-disposition https://packagecloud.io/rabbitmq/rabbitmq-server/packages/el/7/rabbitmq-server-3.8.13-1.el7.noarch.rpm/download.rpm

rpm --import https://www.rabbitmq.com/rabbitmq-release-signing-key.asc

yum localinstall rabbitmq-server-3.8.13-1.el7.noarch.rpm
  • 1
  • 2
  • 3
  • 4
  • 5

RabbotMQ的启动

systemctl start rabbitmq-server  #启动
systemctl enable rabbitmq-server #设置开机自启
systemctl status rabbitmq-server #查看状态
  • 1
  • 2
  • 3

RabbitMQ Web界面管理

默认情况下,是没有安装web端的客户端插件,需要安装插件才可以生效。执行命令:

rabbitmq-plugins enable rabbitmq_management
  • 1

然后需要重启服务

systemctl restart rabbitmq-server
  • 1

由于Web管理界面访问端口为15672,所以防火墙需要放行该端口

对于 Centos 7 上的防火墙,要放行端口 15672(默认 RabbitMQ 管理界面的端口),可以按照以下步骤进行操作:

  1. 登录到 CentOS 7 的服务器上,以具有管理员权限的用户身份。

  2. 检查防火墙状态,确认是否已安装 firewalld 防火墙:

    systemctl status firewalld
    
    • 1
  3. 如果防火墙处于开启状态,可以直接跳转到第 6 步。如果防火墙停止运行,则需要启动,请继续执行以下步骤。

  4. 启动 firewalld 服务:

    systemctl start firewalld
    
    • 1
  5. 设置 firewalld 开机自启:

    systemctl enable firewalld
    
    • 1
  6. 添加端口规则,允许在防火墙上开放 15672 端口和5672端口:

    firewall-cmd --zone=public --add-port=15672/tcp --permanent
    firewall-cmd --zone=public --add-port=5672/tcp --permanent
    
    • 1
    • 2
  7. 重新加载防火墙配置,使更改生效:

    firewall-cmd --reload
    
    • 1

现在,CentOS 7 的防火墙应该已经放行了 15672 端口和5672 端口,允许对 RabbitMQ 管理界面进行访问。请注意,为了安全起见,建议仅在需要时才开放必要的端口,并在完成使用后关闭不必要的端口。

Web界面访问管理

RabbitMQ 默认的管理界面账号和密码通常是:

  • 用户名:guest
  • 密码:guest

这对默认凭据在 RabbitMQ 安装后可用于访问管理界面(只限于本地)。然而,出于安全考虑,强烈建议在生产环境中修改默认凭据或创建新的管理员帐户,并使用更强大的密码来加强安全性。

要在 RabbitMQ 中添加新用户,您需要使用 RabbitMQ 提供的命令行工具或者管理界面进行操作。下面是两种方法的简要说明:

方法一:使用 RabbitMQ 命令行工具

  1. 打开命令行终端。

  2. 导航到 RabbitMQ 安装目录的 sbin 文件夹(例如,在 Linux 上可能是 /usr/lib/rabbitmq/sbin)。

  3. 运行以下命令来添加新用户:

    rabbitmqctl add_user <username> <password>
    
    • 1

    <username> 替换为要创建的用户名,将 <password> 替换为所需的密码。

  4. 运行以下命令来赋予用户管理员权限:

    rabbitmqctl set_user_tags <username> administrator
    
    • 1

    <username> 替换为刚创建的用户名。

方法二:使用 RabbitMQ 管理界面

  1. 打开您的浏览器并访问 RabbitMQ 管理界面。默认地址为 http://localhost:15672
  2. 使用默认的管理员账号和密码(通常是 guest/guest)登录到管理界面。
  3. 在管理界面上导航到 “Admin” -> “Users” 选项卡。
  4. 单击 “Add a user” 按钮。
  5. 输入用户名和密码,并选择 “Tag” 为 “Administrator”。
  6. 单击 “Add user” 按钮以创建新用户。

无论您使用哪种方法,确保为新用户选择一个强大的密码,并在生产环境中遵循安全最佳实践。

访问方式为 主机IP地址配合端口号,例如 192.168.18.14:15672

image-20230616170104156

image-20230616170123959

注意,此处需要进行一次授权,否则在代码中连接RabbitMQ会失败

image-20230616194600354

image-20230616194636865

SpringBoot+RabbitMQ实战

引入依赖

      <!--rabbitmq-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

修改配置文件

server:
    port: 8080
spring:
    application:
        name: RabbitMQExample
    rabbitmq:
        host: 192.168.18.14
        port: 5672
        username: admin
        password: admin

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

这是一个Spring Boot应用程序的配置文件,它使用RabbitMQ作为消息队列。下面是各个部分的详细解释:

  • server: 定义服务器的端口号为8080。

  • spring.application.name: 设置应用程序的名称为"RabbitMQExample"。

  • spring.rabbitmq.host: 设置RabbitMQ服务器的地址。

  • spring.rabbitmq.port: 设置RabbitMQ服务器的端口号为5672。

  • spring.rabbitmq.username: 设置连接到RabbitMQ服务器的用户名为"admin"(参照之前的rabbitmq的安装配置)。

  • spring.rabbitmq.password: 设置连接到RabbitMQ服务器的密码为"admin"(参照之前的rabbitmq的安装配置)。

RabbitMQ五种消息模型

再开始实战之前,我们在介绍一下RabbitMQ 主要的消息模式!RabbitMQ 支持多种消息模型,以下是其中五种常见的消息模型,功能设计我们也将围绕如下五种进行开展

  1. 简单模式(Simple Mode生产者消费者模式):
    在简单模式中,一个生产者将消息发送到一个队列,然后一个消费者从该队列接收并处理消息。这是最基本的消息模型,适用于简单的应用场景。

  2. 工作队列模式(Work Queue Mode 广播模式):
    工作队列模式也被称为任务队列模式。多个消费者共享一个队列,并通过轮询的方式接收消息。每个消息只会被一个消费者处理。适用于分布式任务的情况。

  3. 发布/订阅模式(Publish/Subscribe Mode):
    在发布/订阅模式中,一个生产者将消息发送到交换机(Exchange),而不是直接发送到队列。然后,绑定到该交换机的多个队列都会收到消息。适用于广播类型的消息发送。

  4. 路由模式(Routing Mode):
    路由模式中,消息根据路由键(Routing Key)的匹配规则被发送到特定的队列。生产者将消息发送到交换机,并指定一个路由键,在消费者端,队列通过绑定键(Binding Key)与交换机进行绑定。适用于根据条件筛选消息的情况。

  5. 主题模式(Topic Mode):
    主题模式是路由模式的扩展,它支持使用通配符进行更灵活的匹配。通过使用特定的通配符匹配规则,可以实现灵活而强大的消息路由策略。适用于订阅特定主题的场景。

以上是 RabbitMQ 的五种常见消息模型,每种模型都有其适用的场景和特点。您可以根据具体需求选择合适的消息模型来构建应用程序。

RabbitMQ简单模式(生产者消费者模式)

RabbitMQ简单模式,也称为基本模式(Basic Model),是RabbitMQ的最简单的消息传递模式,仅涉及到一个生产者和一个消费者。

在这个模式中,当我们启动一个程序作为生产者并向RabbitMQ发出消息时,我们希望它直接进入队列中,然后消费者会从队列中获取这个消息并进行处理。

简单模式在RabbitMQ中是一个单队列单生产者单消费者的模式,主要适用于单纯的任务处理,消息的生产者和消费者的削峰填谷能力非常高。

下面示范如何基于Spring Boot实现RabbitMQ的简单模式:

配置类
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @Description MQ配置类
 * @Author IT小辉同学
 * @Date 2023/06/16
 */
@Configuration
public class RabbitMQConfig {
    /**
     * @return {@link Queue }
     * @Description 设置队列
     * @Author IT小辉同学
     * @Date 2023/06/16
     */
    @Bean
    public Queue queue(){
        return new Queue("simple.hello");
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

这是一个名为RabbitMQConfig的配置类,用于定义 RabbitMQ 的相关配置。

该类中包含了一个标有 @Bean 注解的方法 queue(),用于创建并配置队列。方法返回类型为 Queue,并指定队列名称为 "simple.hello"

通过使用 @Bean 注解,Spring Boot 将会根据该方法的返回值来创建一个名为 "simple.hello" 的队列,并将其注册到 Spring 上下文中,以供其他组件使用。

该配置类提供了创建队列的逻辑,通常在应用启动时会自动执行该方法并创建对应的队列实例。可以在其他组件中通过依赖注入(如 @Autowired)或者获取应用上下文(如 ApplicationContext.getBean())的方式来获得该队列实例,以便进行消息发送和接收的操作。

注意:此配置不可省略,否则当队列不存在的时候会报错

生产者
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * @Description 生产者(发送消息)
 * @Author IT小辉同学
 * @Date 2023/06/16
 */
@Service

public class MessageSender {
    @Autowired

    private RabbitTemplate rabbitTemplate;

    /**
     * @param message 消息
     * @Description 发送消息
     * @Author IT小辉同学
     * @Date 2023/06/16
     */
    public void sendMessage(String message) {
        System.out.println("发送祝福:" + message);
        rabbitTemplate.convertAndSend("simple.hello", message);

    }

}


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

在MessageSender类中,有一个私有成员变量rabbitTemplate,类型为RabbitTemplate。RabbitTemplate是Spring提供的一个用于操作RabbitMQ消息队列的模板类。

该类中定义了一个公共方法sendMessage,接收一个字符串类型的参数message。该方法的作用是发送一条消息到名为"simple.hello"的队列中。具体实现是通过调用rabbitTemplate的convertAndSend方法来完成的。

消费者
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * @Description 消息接收器
 * @Author IT小辉同学
 * @Date 2023/06/16
 */
@Service
public class MessageReceiver {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * @param message 消息
     * @Description 处理消息
     * @Author IT小辉同学
     * @Date 2023/06/16
     */
    @RabbitListener(queues = "simple.hello")
    public void handleMessage(String message) {
        System.out.println("我收到了你的祝福: " + message);
    }

}


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

该类中定义了一个公共方法handleMessage,接收一个字符串类型的参数message。该方法的作用是处理从名为"simple.hello"的队列中接收到的消息,负责监听消息!!!

测试
import com.xiaohui.service.MessageSender;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
public class MQTestDemo {
    @Autowired
    private MessageSender messageSender;
    @Test
    public void testDemo1(){
            messageSender.sendMessage("我想跟你说:希望你开心快乐!!!");
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

注意:不可以省略@SpringBootTest,否则监听不到MQ配置

@SpringBootTest是Spring框架中的一个测试注解,用于运行Spring Boot应用程序的单元测试。它可以初始化和配置Spring应用程序上下文,使测试应用程序的行为更容易进行测试。

当使用@SpringBootTest注解时,它可以在测试类或测试方法上使用。当在测试类上使用时,它会自动为该类中的所有测试方法初始化和配置Spring应用程序上下文。当在测试方法上使用时,它会为该特定方法初始化和配置Spring应用程序上下文。

使用@SpringBootTest的一些好处包括:

  • 更轻松地测试Spring管理的组件,例如bean、服务和存储库。
  • 自动配置Spring应用程序上下文,减少设置测试环境所需的代码量。
  • 支持测试不同的配置,例如数据库连接、安全设置和其他应用程序属性。
  • 与JUnit和其他测试框架集成。

image-20230616213556350

image-20230616213816564

RabbitMQ工作队列模式(广播模式)

RabbitMQ工作队列模式,也称为Task Queues或Background Tasks,是一种常见的应用场景,它用于处理大量的任务,将任务进行排队,然后分发给多个消费者进行处理。这种模式适用于需要异步处理耗时的、密集型任务并且要求可靠性的情况。

RabbitMQ工作队列模式的基本原理是,将需要处理的任务投递到RabbitMQ中,生成任务队列(Task Queues),多个消费者通过拉取任务队列中的任务进行处理。

在RabbitMQ的工作队列模式中,队列中的每个消息都会分配给一个消费者进行处理。消费者可以是不同的进程、线程或服务,从而实现可扩展性和并行性。

在一个生产者-多个消费者的场景下,生产者只需要将消息发送到一个消息队列中,消费者会自动从队列中获取消息进行处理。如果存在多个消费者,队列中的消息将会被分摊给多个消费者进行处理,即实现了任务并行处理的功能。而且如果一个消费者挂掉,该消费者所占用的任务在一定的时间内不会被重新分配,即实现了任务可靠性处理的功能。

一般情况下,RabbitMQ的工作队列模式可以应用于以下场景:

  • 任务比较繁重,处理较慢
  • 任务多且耗时,无法同步处理
  • 需要执行一些必须的后台任务,如发送邮件、生成报表等
配置类
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @Description MQ配置类
 * @Author IT小辉同学
 * @Date 2023/06/16
 */
@Configuration
public class RabbitMQConfig {
    //队列1
    private static final String QUEUE01 = "queue01";
    //队列2
    private static final String QUEUE02 = "queue02";
    //交换机
    private static final String EXCHANGE_NAME = "fanout_exchange";

    @Bean
    public Queue queue1() {
        return new Queue(QUEUE01);
    }

    @Bean
    public Queue queue2() {
        return new Queue(QUEUE02);
    }

    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange(EXCHANGE_NAME);
    }

    @Bean
    public Binding binding01() {
        return BindingBuilder.bind(queue1()).to(fanoutExchange());
    }

    @Bean
    public Binding binding02() {
        return BindingBuilder.bind(queue2()).to(fanoutExchange());
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47

这段代码是一个配置类,用于配置RabbitMQ的队列和交换机。其中:

  • QUEUE01和QUEUE02是两个队列的名称;
  • EXCHANGE_NAME是交换机的名称;
  • queue1()和queue2()分别返回两个队列对象;
  • fanoutExchange()返回一个FanoutExchange对象,表示一个广播交换机;
  • binding01()和binding02()分别将queue1()和queue2()绑定到fanoutExchange()上,表示这两个队列都发送到广播交换机上。
生产者

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * @Description 生产者(发送消息)
 * @Author IT小辉同学
 * @Date 2023/06/16
 */
@Service

public class MessageSender {
    @Autowired

    private RabbitTemplate rabbitTemplate;

    /**
     * @param message 消息
     * @Description 发送消息
     * @Author IT小辉同学
     * @Date 2023/06/16
     */
    public void sendMessage(String message) {
        System.out.println( message);
        rabbitTemplate.convertAndSend("fanout_exchange","", message);

    }

}


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

这段代码是使用RabbitTemplate发送消息到名为"fanout_exchange"的交换机上。其中:

  • "fanout_exchange"是交换机的名称;
  • 第二个参数为空字符串,表示没有指定队列;
  • message是要发送的消息内容。
消费者
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * @Description 消息接收器
 * @Author IT小辉同学
 * @Date 2023/06/16
 */
@Service
public class MessageReceiver {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * @param message 消息
     * @Description 消费者01
     * @Author IT小辉同学
     * @Date 2023/06/16
     */
    @RabbitListener(queues = "queue01")
    public void receiver01(String message) {
        System.out.println("队列01:奔赴山海," + message);
    }

    /**
     * @param message 消息
     * @Description 消费者012
     * @Author IT小辉同学
     * @Date 2023/06/16
     */
    @RabbitListener(queues = "queue02")
    public void receiver02(String message) {
        System.out.println("队列02:向阳而生," + message);
    }

}


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
测试
@SpringBootTest
public class MQTestDemo {
    @Autowired
    private MessageSender messageSender;
    @Test
    public void testDemo1(){
            messageSender.sendMessage("相信梦想。。。。。。");
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

image-20230616222906407

RabbitMQ发布/订阅模式

RabbitMQ发布/订阅模式,也叫做“广播(Broadcast)模式”,是RabbitMQ的一种高级消息传递模式,主要用于广播消息。

在发布/订阅模式中,消息发送到Exchange(交换机)上,并携带着一个Routing Key(路由键),Exchange将收到的消息转发到绑定在它上面的所有队列。每个绑定键(Binding Key)都与一个队列相关联,而队列和消息的接收者实现了完全解耦,接收者只需要订阅(subscribe)与该队列相关联的绑定键即可。

我们将它作为“广播”模式,因为可以将一条消息同时发送到多个消费者。例如,我们可以让多个消费者接收网站上发布的新闻消息。

发布/订阅模式在RabbitMQ中的架构非常简单,主要可以描述为以下四个步骤:

  1. 生产者将消息发送到exchange中,并指定了Routing Key。

  2. Exchange将消息分发到所有绑定它的队列上。

  3. 消费者从队列中接收消息,并进行处理。

  4. 消费者对队列进行确认操作,告诉RabbitMQ该消息已经被接收并处理。

以下是如何使用Spring Boot实现RabbitMQ发布/订阅模式:

配置类


import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @Description MQ配置类
 * @Author IT小辉同学
 * @Date 2023/06/16
 */
@Configuration
public class RabbitmqConfig {
    //队列01
    private static final String QUEUE01= "queue01";
    //队列02
    private static final String QUEUE02= "queue02";
    //交换机
    private static final String EXCHANGE_NAME = "direct_exchange";
    //路由键01
    private static final String ROUTINGKEY01 = "queue_route01";
    //路由键02
    private static final String ROUTINGKEY02 = "queue_route02";

    @Bean
    public Queue queue01(){
        return new Queue(QUEUE01);
    }
    @Bean
    public Queue queue02(){
        return new Queue(QUEUE02);
    }
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange(EXCHANGE_NAME);
    }

    @Bean
    public Binding binding1(){
        //将列队01绑定到交换机上为给他设置路由键
        return BindingBuilder.bind(queue01()).to(directExchange()).with(ROUTINGKEY01);
    }
    @Bean
    public Binding binding2(){
        //将列队02绑定到交换机上为给他设置路由键
        return BindingBuilder.bind(queue02()).to(directExchange()).with(ROUTINGKEY02);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49

在RabbitmqConfig类中,定义了多个静态变量,用于存储队列、交换机和路由键的名称。同时,还定义了多个@Bean方法,用于创建队列、交换机和绑定对象。

其中,queue01()和queue02()方法分别创建了两个队列对象,并将它们与对应的队列名称关联起来。directExchange()方法创建了一个直接交换机对象,并将其与交换机的名称关联起来。binding1()和binding2()方法则分别创建了两个绑定对象,将队列绑定到交换机上,并设置了相应的路由键。

通过这些配置,可以方便地对RabbitMQ进行测试和开发。例如,可以使用@Autowired注解注入Queue对象,然后使用send()方法发送消息到指定的队列中。

生产者

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * @Description 生产者(发送消息)
 * @Author IT小辉同学
 * @Date 2023/06/16
 */
@Service

public class MessageSender {
    @Autowired

    private RabbitTemplate rabbitTemplate;

    /**
     * @param message 消息
     * @Description 发送消息
     * @Author IT小辉同学
     * @Date 2023/06/16
     */
    public void sendMessage(String message) {
        System.out.println( message);
        rabbitTemplate.convertAndSend("direct_exchange","queue_route01", message);

    }

}


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • rabbitTemplate:这是之前通过依赖注入注入到应用程序上下文中的RabbitTemplate类的实例。RabbitTemplate类提供了发送和接收RabbitMQ消息的方法。
  • convertAndSend():这是一个方法,它将要发送的消息转换为适当的格式,并将其发送到指定的交换机和路由键。在这个例子中,交换机名称是"direct_exchange",路由键是"queue_route01"。
  • message:这是要发送的消息对象。
消费者

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * @Description 消息接收器
 * @Author IT小辉同学
 * @Date 2023/06/16
 */
@Service
public class MessageReceiver {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * @param message 消息
     * @Description 消费者01
     * @Author IT小辉同学
     * @Date 2023/06/16
     */
    @RabbitListener(queues = "queue01")
    public void receiver01(String message) {
        System.out.println("队列01——路由01:奔赴山海," + message);
    }

    /**
     * @param message 消息
     * @Description 消费者012
     * @Author IT小辉同学
     * @Date 2023/06/16
     */
    @RabbitListener(queues = "queue02")
    public void receiver02(String message) {
        System.out.println("队列02——路由02:向阳而生," + message);
    }

}


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
测试
import com.xiaohui.service.MessageSender;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
public class MQTestDemo {
    @Autowired
    private MessageSender messageSender;
    @Test
    public void testDemo1(){
            messageSender.sendMessage("相信梦想。。。。。。");
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

image-20230616224415508

RabbitMQ路由模式

RabbitMQ路由模式是一种高级消息传递模式,它可以通过选择路由键(Routing Key)将消息推送到绑定键(Binding Key)与之匹配的队列中,以满足不同的消费者需要。

路由模式主要用于单一应用程序内的消息传递,生产者将消息发送到指定的Exchange(交换机)中,并且Exchange会根据Routing Key将消息放到绑定到Exchange上的队列中。而不同的消费者使用不同的Binding Key来决定与哪个队列建立联系并接收消息。

在RabbitMQ中,路由模式有以下几个步骤:

  1. 生产者将消息发送到Exchange中,并指定了Routing Key。
  2. Exchange将消息根据Routing Key发送到绑定到Exchange中的队列。
  3. 消费者从队列中接收消息,并进行处理。
  4. 消费者对队列进行确认操作,告诉RabbitMQ该消息已经被接收并处理。

下面是如何使用Spring Boot实现RabbitMQ路由模式:

配置类

接着,定义一个交换机和两个队列:

@Configuration
public class RabbitConfig {

    public static final String EXCHANGE_NAME = "direct_exchange";

    public static final String QUEUE_NAME_1 = "queue_1";
    public static final String QUEUE_NAME_2 = "queue_2";

    public static final String ROUTING_KEY_1 = "key_1";
    public static final String ROUTING_KEY_2 = "key_2";

    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange(EXCHANGE_NAME);
    }

    @Bean
    public Queue queue1() {
        return new Queue(QUEUE_NAME_1);
    }

    @Bean
    public Queue queue2() {
        return new Queue(QUEUE_NAME_2);
    }

    @Bean
    public Binding binding1() {
        return BindingBuilder.bind(queue1()).to(directExchange()).with(ROUTING_KEY_1);
    }

    @Bean
    public Binding binding2() {
        return BindingBuilder.bind(queue2()).to(directExchange()).with(ROUTING_KEY_2);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
生产者
@Service
public class MessageProducer {

    @Autowired
    private AmqpTemplate amqpTemplate;

    public void send(String message, String routingKey) {
        amqpTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, routingKey, message);
        System.out.println("Sent message: " + message + ", routing key: " + routingKey);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
消费者
@Service
public class MessageConsumer {

    @RabbitListener(queues = RabbitConfig.QUEUE_NAME_1)
    public void receiveFromQueue1(String message) {
        System.out.println("Received message from queue 1: " + message);
    }

    @RabbitListener(queues = RabbitConfig.QUEUE_NAME_2)
    public void receiveFromQueue2(String message) {
        System.out.println("Received message from queue 2: " + message);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
测试
@Service
public class TestService {

    @Autowired
    private MessageProducer producer;

    @PostConstruct
    public void test() {
        producer.send("hello, queue 1", RabbitConfig.ROUTING_KEY_1);
        producer.send("hello, queue 2", RabbitConfig.ROUTING_KEY_2);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

运行程序后,可以看到控制台输出:

Sent message: hello, queue 1, routing key: key_1
Sent message: hello, queue 2, routing key: key_2
Received message from queue 1: hello, queue 1
Received message from queue 2: hello, queue 2
  • 1
  • 2
  • 3
  • 4

说明消息成功发送到了指定的队列。

RabbitMQ主题模式

RabbitMQ主题模式(Topic Model)是一种高级消息传递模式,它使你可以订阅一个特定的主题(Topic)并接收所有与该主题相关的消息。主题模式是在发布/订阅模式基础上进一步增强了消息传递的粒度。

在主题模式中,Exchange不仅可以使用Routing Key来将消息传递到队列中,还可以使用一个模式字符串来匹配Routing Key,这个模式字符串被称为主题(Topic)。消费者可以通过订阅不同的主题来接收不同的消息。

一个主题可以包含一个或多个单词(Word),单词之间使用"."(点号)来分割。通配符符号“#”表示跟单词数不限,而“”则表示只匹配一个单词。例如,“news.it.#”可以匹配“news.it.abc”、“news.it.cnn”、“news.it.abc.def”等,而“news.it.”只能匹配到“news.it.abc”和“news.it.cnn”,不能匹配多于一个单词的情况。

主题模式在RabbitMQ中的架构非常简单,主要可以描述为以下四个步骤:

  1. 生产者将消息发送到Exchange中,并指定了Routing Key。
  2. Exchange将消息根据匹配的主题字符串发送到绑定到Exchange中的队列。
  3. 消费者从队列中接收消息,并进行处理。
  4. 消费者对队列进行确认操作,告诉RabbitMQ该消息已经被接收并处理。

下面是如何使用Spring Boot实现RabbitMQ主题模式:

配置类
@Configuration
public class RabbitConfig {

    public static final String EXCHANGE_NAME = "topic_exchange";

    public static final String QUEUE_NAME_1 = "queue_1";
    public static final String QUEUE_NAME_2 = "queue_2";
    public static final String QUEUE_NAME_3 = "queue_3";

    public static final String ROUTING_KEY_1 = "key_1.*";
    public static final String ROUTING_KEY_2 = "key_2.*";
    public static final String ROUTING_KEY_3 = "*.key_3";

    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange(EXCHANGE_NAME);
    }

    @Bean
    public Queue queue1() {
        return new Queue(QUEUE_NAME_1);
    }

    @Bean
    public Queue queue2() {
        return new Queue(QUEUE_NAME_2);
    }

    @Bean
    public Queue queue3() {
        return new Queue(QUEUE_NAME_3);
    }

    @Bean
    public Binding binding1() {
        return BindingBuilder.bind(queue1()).to(topicExchange()).with(ROUTING_KEY_1);
    }

    @Bean
    public Binding binding2() {
        return BindingBuilder.bind(queue2()).to(topicExchange()).with(ROUTING_KEY_2);
    }

    @Bean
    public Binding binding3() {
        return BindingBuilder.bind(queue3()).to(topicExchange()).with(ROUTING_KEY_3);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
生产者

在生产者端,发送消息到交换机,这里使用了三种不同的路由键:

@Service
public class MessageProducer {

    @Autowired
    private AmqpTemplate amqpTemplate;

    public void send(String message, String routingKey) {
        amqpTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, routingKey, message);
        System.out.println("Sent message: " + message + ", routing key: " + routingKey);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

消费者

在消费者端,使用通配符连接到交换机,并指定一个消费者:

@Service
public class MessageConsumer {

    @RabbitListener(queues = RabbitConfig.QUEUE_NAME_1)
    public void receiveFromQueue1(String message) {
        System.out.println("Received message from queue 1: " + message);
    }

    @RabbitListener(queues = RabbitConfig.QUEUE_NAME_2)
    public void receiveFromQueue2(String message) {
        System.out.println("Received message from queue 2: " + message);
    }

    @RabbitListener(queues = RabbitConfig.QUEUE_NAME_3)
    public void receiveFromQueue3(String message) {
        System.out.println("Received message from queue 3: " + message);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
测试

现在,我们来测试一下,这里发送了三条消息,分别匹配了不同的绑定键:

@Service
public class TestService {

    @Autowired
    private MessageProducer producer;

    @PostConstruct
    public void test() {
        producer.send("hello, queue 1", RabbitConfig.ROUTING_KEY_1);
        producer.send("hello, queue 2", RabbitConfig.ROUTING_KEY_2);
        producer.send("hello, queue 3", RabbitConfig.ROUTING_KEY_3);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

运行程序后,可以看到控制台输出:

Sent message: hello, queue 1, routing key: key_1.*
Sent message: hello, queue 2, routing key: key_2.*
Sent message: hello, queue 3, routing key: *.key_3
Received message from queue 1: hello, queue 1
Received message from queue 2: hello, queue 2
Received message from queue 3: hello, queue 3
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

我的天哪,太不容易了,坑倒是不多,文字东西太多了,还不能违背初心去抄袭,花费时间很长!如果您看到这里,祝贺你,我们一起成长了!感谢相遇,再会有期!!!

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/374519
推荐阅读
  

闽ICP备14008679号