当前位置:   article > 正文

『RabbitMQ』入门指南(安装,配置,应用)_laravel rabbitmq 只作为 消费端 怎么配置?

laravel rabbitmq 只作为 消费端 怎么配置?

前言

RabbitMQ 是在 AMQP(Advanced Message Queuing Protocol) 协议标准基础上完整的,可复用的企业消息系统。它遵循 Mozilla Public License 开源协议,采用 Erlang 实现的工业级的消息队列(MQ)服务器,建立在 Erlang OTP 平台上(因为采用 Erlang 开发,所以 RabbitMQ 稳定性和可靠性比较高

其他主流 MQ 产品

  • ActiveMQ:Apache 出品,最流行的,能力强劲的开源消息总线,基于 JMS(Java Message Service)规范
  • RocketMQ:阿里低延迟、高并发、高可用、高可靠的分布式消息中间件,基于 JMS,目前由 Apache 基金会维护
  • Kafka:分布式,分区的,多副本的,多订阅者的消息发布订阅系统(分布式 MQ 系统),可以用于搜索日志,监控日志,访问日志等

本文为 RabbitMQ 入门教程,主要将会讲解 RabbitMQ 安装配置(Windows),相关概念,及项目中具体应用

image.png

安装

Erlang

官网下载链接:Downloads - Erlang/OTP

RabbitMQ 服务器必须首先安装 Erlang 运行环境,同时安装时需要注意 RabbityMQ 所依赖的 Erlang 版本,我们可以查看下方官方版本对应信息

版本对应:RabbitMQ Erlang Version Requirements — RabbitMQ

本次使用版本 Erlang OTP 25.3(点击跳转下载链接)

image.png

双击执行 exe 安装程序,除了安装路径其他都按照默认即可

然后配置环境变量

ERLANG_HOME = D:\Erlang\Erlang\Erlang OTP
  • 1

并且添加 /bin 目录到 Path 环境变量中,即添加 %ERLANG_HOME%\bin 到 Path 中

安装配置之后,打开 CMD,输入 erl 然后回车键,会弹出版本信息,表示 Erlang 安装成功

RabbitMQ

官方下载页面:RabbitMQ Changelog — RabbitMQ

下载链接: RabbitMQ 3.12.0

安装 exe 文件,执行安装包,同样除了安装路径外其他保持默认

配置环境变量

RABBITMQ_SERVER = D:\RabbitMQ\RabbitMQ\rabbitmq_server-3.12.0
  • 1

然后添加 %RABBITMQ_SERVER%\sbin 到 Path 环境变量中

查看所有插件

rabbitmq-plugins list
  • 1

注:如果出现问题请参考最后一章 彻底卸载

image.png

之后我们需要安装 rabbitmq_management 插件,可以使用可视化的方式查看 RabbitMQ 服务器实例的状态,以及操控 RabbitMQ 服务器

# 安装插件
rabbitmq-plugins enable rabbitmq_management
  • 1
  • 2

访问管理界面: http://localhost:15672/ (账号密码:guest / guest)

image.png

前期安装配置完毕,下面可以配合官方入门文档学习

官方文档:RabbitMQ Tutorials — RabbitMQ

消息队列

定义

消息指的是两个应用间传递的数据。数据的类型有很多种形式,可能只包含文本字符串,也可能包含嵌入对象。

“消息队列(Message Queue)”是在消息的传输过程中保存消息的容器。在消息队列中,通常有生产者和消费者两个角色。生产者只负责发送数据到消息队列,谁从消息队列中取出数据处理,他不管。消费者只负责从消息队列中取出数据处理,他不管这是谁发送的数据

image.png

作用

解耦。如图所示。假设有系统 B、C、D 都需要系统 A 的数据,于是系统 A 调用三个方法发送数据到 B、C、D。这时,系统 D 不需要了,那就需要在系统 A 把相关的代码删掉。假设这时有个新的系统 E 需要数据,这时系统 A 又要增加调用系统 E 的代码。为了降低这种强耦合,就可以使用 MQ,系统 A 只需要把数据发送到 MQ,其他系统如果需要数据,则从 MQ 中获取即可

image.png

异步。如图所示。一个客户端请求发送进来,系统 A 会调用系统 B、C、D 三个系统,同步请求的话,响应时间就是系统 A、B、C、D 的总和,也就是 800ms。如果使用 MQ,系统 A 发送数据到 MQ,然后就可以返回响应给客户端,不需要再等待系统 B、C、D 的响应,可以大大地提高性能。对于一些非必要的业务,比如发送短信,发送邮件等等,就可以采用 MQ

image.png

削峰。如图所示。这其实是 MQ 一个很重要的应用。假设系统 A 在某一段时间请求数暴增,有 5000 个请求发送过来,系统 A 这时就会发送 5000 条 SQL 进入 MySQL 进行执行,MySQL 对于如此庞大的请求当然处理不过来,MySQL 就会崩溃,导致系统瘫痪。如果使用 MQ,系统 A 不再是直接发送 SQL 到数据库,而是把数据发送到 MQ,MQ 短时间积压数据是可以接受的,然后由消费者每次拉取 2000 条进行处理,防止在请求峰值时期大量的请求直接发送到 MySQL 导致系统崩溃

image.png

特点

可靠性:通过支持消息持久化,支持事务,支持消费和传输的 ack 等来确保可靠性

路由机制:支持主流的订阅消费模式,如广播,订阅,headers 匹配等

扩展性:多个 RabbitMQ 节点可以组成一个集群,也可以根据实际业务情况动态地扩展集群中节点

高可用性:队列可以在集群中的机器上设置镜像,使得在部分节点出现问题的情况下队仍然可用

多种协议:RabbitMQ 除了原生支持 AMQP 协议,还支持 STOMP,MQTT 等多种消息中间件协议

多语言客户端:RabbitMQ 几乎支持所有常用语言,比如 Java、Python、Ruby、PHP、C#、JavaScript 等

管理界面:RabbitMQ 提供了易用的用户界面,使得用户可以监控和管理消息、集群中的节点等

插件机制:RabbitMQ 提供了许多插件,以实现从多方面进行扩展,当然也可以编写自己的插件

应用

本章将会集成 rabbitmq 到 SpringBoot 中,并使用 rabbitmq-provider (生产者)和 rabbitmq-consumer(消费者) 两个项目进行具体讲解, 也可以在父项目中创建这两个模块(本文采用父子模块方式)

所有代码示例已经上传到 GitHub 仓库

仓库地址:ReturnTmp/rabbitmq-demo: rabbitmq 实例代码 (github.com)

生产者

配置

创建子模块 rabbitmq-provider

依赖配置(也可以 IDEA 初始化模块直接勾选)

        <!--rabbitmq-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

application.yml

server:  
  port: 8021  
spring:  
  application:  
    name: rabbitmq-provider  
  rabbitmq:  
    host: 127.0.0.1  
    port: 5672  
    username: root  
    password: 111111  
    virtual-host: RootHost
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

其中虚拟 host 配置项不是必须的,需要自行创建 vhost,如果未自行创建,默认为 virtual-host: /

注:vhost 可以理解为虚拟 broker,即 mini-RabbitMQ server,其内部均含有独立的 queue、bind、exchange 等,最重要的是拥有独立的权限系统,可以做到 vhost 范围内的用户控制。当然,从 RabbitMQ 全局角度,vhost 可以作为不同权限隔离的手段

可以按照如下步骤创建 vhost

image.png

然后创建用户(管理员)

image.png

然后我们需要为用户分配权限,指定使用我们刚刚创建的 vhost

image.png

代码

创建直连交换机配置类

注:RabbitMQ 共有四种交换机,分别为:直连交换机,扇形交换机,主题交换机,首部交换机。这里使用直连交换机演示,其他读者可以自行尝试

@Configuration
public class DirectRabbitConfig {

    //队列 起名:TestDirectQueue
    @Bean
    public Queue TestDirectQueue() {
        // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
        // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
        // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
        //   return new Queue("TestDirectQueue",true,true,false);

        //一般设置一下队列的持久化就好,其余两个就是默认false
        return new Queue("TestDirectQueue", true);
    }

    //Direct交换机 起名:TestDirectExchange
    @Bean
    DirectExchange TestDirectExchange() {
        //  return new DirectExchange("TestDirectExchange",true,true);
        return new DirectExchange("TestDirectExchange", true, false);
    }

    //绑定  将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting
    @Bean
    Binding bindingDirect() {
        return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");
    }


    @Bean
    DirectExchange lonelyDirectExchange() {
        return new DirectExchange("lonelyDirectExchange");
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

然后写简单的接口进行消息推送(可以视情况写为定时任务)

@RestController
public class SendMessageController {

    @Autowired
    RabbitTemplate rabbitTemplate;  //使用RabbitTemplate,这提供了接收/发送等等方法

    @GetMapping("/sendDirectMessage")
    public String sendDirectMessage() {
        String messageId = String.valueOf(UUID.randomUUID());
        String messageData = "test message, hello!";
        String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        Map<String, Object> map = new HashMap<>();
        map.put("messageId", messageId);
        map.put("messageData", messageData);
        map.put("createTime", createTime);
        //将消息携带绑定键值:TestDirectRouting 发送到交换机TestDirectExchange
        rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", map);
        return "ok";
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

启动项目,调用接口: http://localhost:8021/sendDirectMessage

查看 RabbitMQ 管理页面查看是否推送成功

image.png

image.png

消费者

配置

创建子模块 rabbitmq-consumer

依赖配置

        <!--rabbitmq-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

application.yml

server:  
  port: 8022  
spring:  
  application:  
    name: rabbitmq-consumer  
  rabbitmq:  
    host: 127.0.0.1  
    port: 5672  
    username: root  
    password: 111111  
    virtual-host: RootHost
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
代码

创建消息接收监听类

@Component
@RabbitListener(queues = "TestDirectQueue")
public class DirectReceiver {

    @RabbitHandler
    public void process(Map testMessage) {
        System.out.println("DirectReceiver receive message: " + testMessage.toString());
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

之后启动项目,查看消费者接收情况

image.png

序列化

发送接收消息可能出现 Failed to convert message 问题,可以通过使用 JSON 序列化传输信息方式解决

生产者
@Configuration
public class RabbitMQConfig implements InitializingBean {

    /**
     * 自动注入RabbitTemplate模板
     */
    @Resource
    private RabbitTemplate rabbitTemplate;

    /**
     * 发送消息JSON序列化
     */
    @Override
    public void afterPropertiesSet() {
        //使用JSON序列化
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
消费者
@Configuration
public class RabbitMQConfig {

    @Bean
    public MessageConverter jsonMessageConverter(ObjectMapper objectMapper) {
        return new Jackson2JsonMessageConverter(objectMapper);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

彻底卸载

我们安装中可能出现各种问题,一般情况下是 RabbitMQ 和 Erlang 版本不对应,需要完全卸载 RabbitMQ 和 Erlang,可以按照如下步骤卸载

注:博主首次安装使用的是 Erlang 20.3 Rabbit 3.7.15 ,之后似乎小版本不对应,出现问题,需要重新卸载安装

(1)打开 Windows 控制面板,双击“程序和功能”。

(2)在当前安装的程序列表中,右键单击 RabbitMQ Server,然后单击“卸载”。

(3)在当前安装的程序列表中,右键单击“Erlang OTP”,然后单击“卸载”。

(4)打开 Windows 任务管理器。

(5)在任务管理器中,查找进程 epmd.exe。 如果此进程仍在运行,请右键单击该进程,然后单击“结束进程”。

(6)删除 RabbitMQ 和 Erlang 的所有安装目录。

(7)删除文件 C:\Windows\System32\config\systemprofile.erlang.cookie如果存在)。

(8)转到用户文件夹:C:\Users\[username],然后删除文件.erlang.cookie。

(9)同样在 User 文件夹中,转到 AppData \ Roaming \ RabbitMQ。删除 RabbitMQ 文件夹。

(10)删除注册表 HKEY_LOCAL_MACHINE\SOFTWARE\Ericsson\Erlang\ErlSrv 的子项。

(11)打开运行 cmd->sc delete RabbitMQ。

(12)打开运行->regedit 找到 RabbitMQ 节点,删掉即可(如果存在

参考链接

本文由博客一文多发平台 OpenWrite 发布!

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/226860
推荐阅读
相关标签
  

闽ICP备14008679号