当前位置:   article > 正文

【学习笔记】Java——消息队列:RabbitMQ_java mq消息队列

java mq消息队列

1、初识RabbitMQ(官网学习):

  • 消息队列:接收并转发消息。类似于“快递公司”
  • producer:消息的发送者、生产者
  • consumer:消息的消费者,从队列获取消息,并且使用
  • queue:先进先出,一个queue可以对应多个consumer

2、为什么要用消息队列:

  • 代码解耦,提高系统的稳定性
  • 应对流量高峰,降低流量冲击(抢购)
  • 异步执行,提高系统响应速度

3、消息队列的特性:(消息队列有很多中,RabbitMQ是其中一种)

  • 性能好
  • 基础组件
  • 支持消息确认
  • 业务无关,无需考虑上层业务模型,做好消息的分发即可

4、RabbitMQ的特点:

  • 路由能力灵活强大
  • 开源免费
  • 支持编程语言多
  • 应用广泛,社区活跃
  • 有开箱即用的监控和管理后台

5、RabbitMQ的核心概念:

Producer:消息生产者

Message:消息

Exchange:交换机

Binding:绑定交换机和队列

Routing Key:路由键,决定路由规则

Queue:队列,存储消息

Connection:连接服务端

Channel:信道,读写数据

Consumer:消费者

Broker:服务实例

Virtual host:虚拟主机,用于区分不同服务,蕾丝与不同域名,不会相互影响
在这里插入图片描述

路由键(Routing key):就是专门用于决定交换机和队列如何进行绑定的

6、安装RabbitMQ(Linux)

第一步:执行

curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh | sudo bash
  • 1

第二步,执行:

curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh | sudo bash
  • 1

第三步:sudo yum install rabbitmq-server-3.8.2-1.el7.noarch
第四步,看到类似以下的画面:
在这里插入图片描述
输入y
即可完成安装。

启动RabbitMQ:systemctl start rabbitmq-server

看看端口有没有起来,查看状态:rabbitmqctl status
配置阿里云安全组,打开15672端口
添加admin用户:

rabbitmqctl add_user admin password
rabbitmqctl set_user_tags admin administrator
  • 1
  • 2

浏览器访问ip: 15672
用admin,密码password即可登录
访问出现错误:
可能没有安装插件,只需安装一下即可,输入命令:rabbitmq-plugins enable rabbitmq_management

7、RabbitMQ管理后台:

在这里插入图片描述

  • 登陆到阿里云服务器,启动服务
  • 概览页面
  • 添加用户
  • 创建虚拟主机(Virtual Hosts)

8、RabbitMQ支持多语言

支持多语言,java就是支持的其中一种语言

API丰富,RabbitMQ提供的Java客户端一系列的API来完成各种操作

(1)引入依赖:

<dependency>
   <groupId>com.rabbitmq</groupId>
   <artifactId>amqp-client</artifactId>
   <version>5.8.0</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

RabbitMQ内部会要求我们再去引入一个依赖,记录日志的,没有这个依赖,不能正常运作

<dependency>
   <groupId>org.slf4j</groupId>
   <artifactId>slf4j-nop</artifactId>
   <version>1.7.29</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

(2)发送消息
里面RabbitMQ设置的主机地址就是阿里云服务器上RabbitMQ的主机地址

public static void main(String[] args) throws IOException, TimeoutException {
        // 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("121.40.123.230");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("password");

        // 建立连接
        Connection connection = connectionFactory.newConnection();
        // 获得信道
        Channel channel = connection.createChannel();
        // 声明队列 hello 队列名
        channel.queueDeclare("hello", false, false, false, null);
        // 声明消息
        String publishMessage = "hello world!";
        channel.basicPublish("", "hello", null, publishMessage.getBytes());
        System.out.println("发送了消息");

        // 关闭连接
        channel.close();
        connection.close();
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

(3)消费者

public static void main(String[] args) throws IOException, TimeoutException {
        // 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("121.40.123.230");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("password");

        // 建立连接
        Connection connection = connectionFactory.newConnection();
        // 获得信道
        Channel channel = connection.createChannel();
        // 声明队列 hello 队列名
        channel.queueDeclare("hello", false, false, false, null);
        // 接收消息消费
        channel.basicConsume("hello", true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, StandardCharsets.UTF_8);
                System.out.println("接收到消息:" + message);
            }
        });
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

(4)根据消息内容做处理

(5)多个消费者平均压力

交换机工作模式:
  • fanout:广播,这种模式只需要将队列绑定到交换机上即可,是不需要设置路由键的
  • direct:根据RoutingKey匹配消息路由到指定的队列
  • topic:生产者指定RoutingKey消息根据消费端指定的队列通过模糊匹配的方式进行相应转发
  • headers:根据发送消息内容中的headers属性来匹配

9、Sprint Boot整合RabbitMQ

(1)引入依赖

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-amqp</artifactId>
</dependdency>
  • 1
  • 2
  • 3
  • 4

(2)在yml配置文件中进行配置RabbitMQ

:学习来源——官网+慕课网

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/AllinToyou/article/detail/614555
推荐阅读
相关标签
  

闽ICP备14008679号