当前位置:   article > 正文

【消息队列】消息中间件RabbitMQ急速入门_如何将socket转成rebbitmq模式

如何将socket转成rebbitmq模式

1.RabbitMQ消息队列和核心概念

1.1.RabbitMQ介绍

RabbitMQ是一个开源的AMQP实现,采用erlang语言编写,支持多种客户端,如:Python、java、C、.NET,用于分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不错。

1.2.RabbitMQ图解

在这里插入图片描述

1.3.RabbitMQ核心概念

Broker

RabbitMQ的服务端程序,一个mq节点就是一个broker
  • 1

Producer生产者

创建Messsage发送到RabbitMQ中
  • 1

Consumer消费者

消费队列中的消息
  • 1

Message消息

生产消费的内容,有消息头和消息体,包括多个属性的配置,如RoutingKey
  • 1

Channel信道

一条支持多路复用的通道,独立的双向数据流通道,可以发布订阅、接收消息、信道是建立在真实的TCP连接内的虚拟连接,复用TCP连接的通道
  • 1

Connection连接

RabbitMQ的socket链接,他封装了socket协议相关的部分逻辑,一个链接上可以有多个信道进行通信
  • 1

Exchange交换机

生产者将消息通过信道发送给交换机,交换机将消息路由给一个或者多个队列中,交换机和对列是多对多的关系
  • 1

RoutingKey路由键

生产者将消息发送给路由的时候,都会指定一个RoutingKey,交换机和队列之间绑定一个BindingKey,交换机同过RoutingKey匹配BindingKey发送到指定的队列
  • 1

BindingKey绑定key

交换机和队列绑定的key
  • 1

Virtual host虚拟主机

用于不同的业务模块的逻辑隔离,一个Virtual Host里面可以有若干个Exchange和Queue,同一个Virtual Host里面不能有相同的名称的Exchange和Queue
  • 1

默认是:/

自行添加:/dev
		/prod
		/test
  • 1
  • 2
  • 3
1.4.容器化部署RabbitMQ
#拉取镜像
docker pull rabbitmq:management

docker run -d --hostname rabbit_host --name rabbitmq -e RABBITMQ_DEFAULT_USER=lixiang -e RABBITMQ_DEFAULT_PASS=992184xiang. -p 15672:15672 -p 5672:5672 rabbitmq:management

#介绍
-d 以守护进程方式在后台运行
-p 15672:15672 management 界面管理访问端口
-p 5672:5672 amqp 访问端口
--name:指定容器名
--hostname:设定容器的主机名,它会被写到容器内的 /etc/hostname 和 /etc/hosts,作为容器主机IP的别名,并且将显示在容器的bash中

-e 参数
  RABBITMQ_DEFAULT_USER 用户名
  RABBITMQ_DEFAULT_PASS 密码
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 主要端口介绍:
4369 erlang 发现口

5672 client 端通信口

15672 管理界面 ui 端口

25672 server 间内部通信口
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 控制台介绍:
    • 默认rabbitmq账号密码 guest/guest,我们这里指定了lixiang和992184xiang.

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

1.5.Java项目创建整合RabbitMQ
  • 创建SpringBoot项目,加入依赖
<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>11</maven.compiler.source>
    <maven.compiler.target>11</maven.compiler.target>
  </properties>

  <dependencies>

    <dependency>
      <groupId>com.rabbitmq</groupId>
      <artifactId>amqp-client</artifactId>
      <version>5.10.0</version>
    </dependency>

  </dependencies>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

2.RabbitMQ工作队列模型实战

2.1.简单队列

在这里插入图片描述

简单队列就是最简单的一种模式,有生产者,消费者还有队列组成,生产者将消息发送给队列,消费者从队列中读取消息完成消费。

编码实现

生产者

编码流程:创建连接信息->创建信道->设置队列信息->发布消息
核心API:connection.createChannel()、channel.queueDeclare()、channel.basicPublish()
  • 1
  • 2
/**
 * 消息的生产者
 */
public class Send {


    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) {

        //创建连接参数
        ConnectionFactory factory = new ConnectionFactory();
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("992184xiang.");
        factory.setHost("8.140.116.67");
        factory.setVirtualHost("/dev");

        //创建连接
        try(Connection connection = factory.newConnection()){
            //创建信道
            Channel channel = connection.createChannel();

            //设置队列参数
            /**
             * queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments)
             * 第一个参数:队列名称
             * 第二个参数:持久话配置
             * 第三个参数:是否独占,只能有一个消费者监听队列,发布订阅是独占
             * 第四个参数:当没有消费者消费的时候,消息自动删除
             * 第五个参数:其他参数
             */
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);

            //设置消息内容
            String msg = "Hello word";

            //推送消息
            /**
             * basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
             * 第一个参数:交换机名称,简单队列没有交换机,所以指定为空串
             * 第二个参数:RoutingKey,简单队列的routingKey和队列的名称相同
             * 第三个参数:配置信息
             * 第四个参数:消息体内容,转换成字节数组
             */
            channel.basicPublish("",QUEUE_NAME,null,msg.getBytes(StandardCharsets.UTF_8));

            System.out.println("消息成功发送到mq中一条");

        } catch (TimeoutException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }

    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57

消费者

编码流程:创建连接->创建信道->设置队列信息->创建消费者回调函数->消费消息
核心API:queueDeclare()、new DefualtConusmer(channel)、channel.basicConsume()
  • 1
  • 2
/**
 * 消息的消费者(持续监听)
 */
public class Recv {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException {

        //创建配置连接信息对象
        ConnectionFactory factory = new ConnectionFactory();

        factory.setHost("8.140.116.67");
        factory.setUsername("admin");
        factory.setPassword("992184xiang.");
        factory.setVirtualHost("/dev");
        factory.setPort(5672);

        //创建连接,消费者一版不自动关闭,因为持续监听
        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();

        //设置队列参数
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        //消费者消费消息,创建消费者,把信道传进去
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumerTag:"+consumerTag);
                System.out.println("envelope:"+envelope);
                System.out.println("properties:"+properties);
                System.out.println("body:"+new String(body,"utf-8"));
            }
        };

        //消费消息,自动确认
        /**
         * basicConsume(String queue, boolean autoAck, Consumer callback)
         * 第一个参数:队列名称
         * 第二个参数:自动确认消费完成
         * 第三个参数:消费者的回调函数
         */
        channel.basicConsume(QUEUE_NAME,true,consumer);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47

这种简单队列的模式,系统会为每个队列隐式地绑定一个默认交换机,交换机名称为" (AMQP default)",类型为直连 direct,当你手动创建一个队列时,系统会自动将这个队列绑定到一个名称为空的 Direct 类型的交换机上,绑定的路由键 routing key 与队列名称相同。

2.2.工作队列

在这里插入图片描述

工作队列:使用于生产者能力大于消费者的场景,增多消费者节点,有两中策略,默认是round robin轮询策略,还有一个是公平策略

轮询策略

在简单队列的基础上,把消费确认机制改成手动确认,并且加上延时。

核心API:channel.basicAck(envelope.getDeliveryTag(),false)

/**
 * 消息的消费者(持续监听)
 */
public class Recv1 {

    private final static String QUEUE_NAME = "work_ms_rr";

    public static void main(String[] args) throws IOException, TimeoutException {

        //创建配置连接信息对象
        ConnectionFactory factory = new ConnectionFactory();

        factory.setHost("8.140.116.67");
        factory.setUsername("admin");
        factory.setPassword("992184xiang.");
        factory.setVirtualHost("/dev");
        factory.setPort(5672);

        //创建连接,消费者一版不自动关闭,因为持续监听
        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();

        //设置队列参数
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        //消费者消费消息,创建消费者,把信道传进去
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                /**
                 * 模拟消费延迟
                 */
                try {
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                System.out.println("consumerTag:"+consumerTag);
                System.out.println("envelope:"+envelope);
                System.out.println("properties:"+properties);
                System.out.println("body:"+new String(body,"utf-8"));

                //手工确认消息消费,不是多条确认
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };

        //消费消息,自动确认
        /**
         * basicConsume(String queue, boolean autoAck, Consumer callback)
         * 第一个参数:队列名称
         * 第二个参数:自动确认消费完成
         * 第三个参数:消费者的回调函数
         */
        channel.basicConsume(QUEUE_NAME,false,consumer);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61

公平策略

解决消费者能力不足的问题,降低消费时间问题

channel中设置消费者每次消费一个,消费完在进入下一个

//限制消费者每次消费一个,消费完在消费下一个
channel.basicQos(1);
  • 1
  • 2
/**
 * 消息的消费者(持续监听)
 */
public class Recv1 {

    private final static String QUEUE_NAME = "work_ms_fair";

    public static void main(String[] args) throws IOException, TimeoutException {

        //创建配置连接信息对象
        ConnectionFactory factory = new ConnectionFactory();

        factory.setHost("8.140.116.67");
        factory.setUsername("admin");
        factory.setPassword("992184xiang.");
        factory.setVirtualHost("/dev");
        factory.setPort(5672);

        //创建连接,消费者一版不自动关闭,因为持续监听
        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();

        //设置队列参数
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        //限制消费者每次消费一个,消费完在消费下一个
        channel.basicQos(1);

        //消费者消费消息,创建消费者,把信道传进去
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                /**
                 * 模拟消费延迟
                 */
                try {
                    TimeUnit.SECONDS.sleep(5);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }


                System.out.println("body:"+new String(body,"utf-8"));

                //手工确认消息消费,不是多条确认
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };

        //消费消息,自动确认
        /**
         * basicConsume(String queue, boolean autoAck, Consumer callback)
         * 第一个参数:队列名称
         * 第二个参数:自动确认消费完成
         * 第三个参数:消费者的回调函数
         */
        channel.basicConsume(QUEUE_NAME,false,consumer);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61

3.RabbitMQ交换机和发布订阅模型实战

3.1.RabbitMQ常见的Exchange交换机类型

生产者将消息发送到Exchange,交换机将消息路由到一个或者多个队列中,交换机有四个类型,队列和交换机是多对多的关系。

交换机只负责转发消息,不具备存储消息的能力,如果没有队列和交换机绑定,或者没有符合规则的路由,消息将被丢失。

RabbitMQ有四种交换机类型,分别是Direct exchange、Fanout exchange、Topic exchange、Headers exchange

交换机类型

  • Direct Exchange定向
    • 将一个队列绑定到交换机上,要求消息的routingKey与路由匹配的bindingKey完全匹配。
    • 例子:如果一个队列绑定到改交换机上要求路由键“aabb”,则只有被标记为aabb的消息才会被转发,不会转发aabb.cc,也不会转发gg.aabb,只会转发aabb。
  • Fanout Exchange广播
    • 只需要简单的将队列绑定到交换机上,一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。
    • Fanout Exchange 交换机转发消息是最快的,用于发布订阅、广播形式。
    • 不处理路由键
  • Topic Exchange主题
    • 主题交换机是一种发布/订阅的模式,结合了直连交换机和扇形交换机的特点。
    • 将路由键和某种模式进行匹配,此时队列上需要绑定在一个模式上
    • 符号#匹配一个或者多个词,符号*匹配一个词
    • 例子:“abc.#”能够匹配abc.gc.bs,“abc.*”只能匹配abc.hs
  • Headers Exchange(少用)
    • 根据发送的消息内容中的headers属性进行匹配,在绑定Queue与Exchange时指定一组键值对
    • 当消息发送到RabbitMQ时会取到该消息的headers与与Exchange绑定时指定的键值对进行匹配
    • 完全匹配则消息会路由到该队列,否则不会路由到该队列

RabbitMQ默认自带7个交换机

3.2.RabbitMQ发布订阅模型实战

RabbitMQ发布订阅模型

发布-订阅模型中,消息生产者不在直接面对queue,而是直面exchange,都需要经过exchange俩进行消息的转发,不需要指定routingKey,所有发送到同一个fanout交换机的消息都会被监听这个交换机的队列收到。

发布订阅模型应用场景

  • 微信公众号
  • 新浪微博关注

发布订阅模型通过把消息发送给交换机,交换机转发给对应的绑定队列,交换机绑定的队列是排它独占队列,自动删除。

在这里插入图片描述

编码实战

生产者

编码流程:创建连接对象->创建信道->绑定交换机->推送消息

核心API:channel.exchangeDeclare(EXCHANGE_NAME,“fanout”)

/**
 * 消息的生产者
 */
public class Send {

    private final static String EXCHANGE_NAME = "exchange_fanout";

    public static void main(String[] args) {

        //创建连接参数
        ConnectionFactory factory = new ConnectionFactory();
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("992184xiang.");
        factory.setHost("8.140.116.67");
        factory.setVirtualHost("/dev");

        //创建连接
        try(Connection connection = factory.newConnection()){
            //创建信道
            Channel channel = connection.createChannel();

            //绑定交换机,fanout_exchange,广播,第一个参数交换机名称,第二个参数交换机类型
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);

            //设置消息内容
            String msg = "小滴课堂大课训练营发布";

            //推送消息
            /**
             * basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
             * 第一个参数:交换机名称,简单队列没有交换机,所以指定为空串
             * 第二个参数:RoutingKey,简单队列的routingKey和队列的名称相同
             * 第三个参数:配置信息
             * 第四个参数:消息体内容,转换成字节数组
             */
            channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes(StandardCharsets.UTF_8));
            System.out.println("广播消息发送成功");

        } catch (TimeoutException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48

消费者

创建连接对象->创建信道->绑定交换机->获取队列名称->绑定交换机和队列->消费者消费消息

核心API:channel.queueDeclare().getQueue(),channel.queueBind(queue,EXCHANGE_NAME,“”)

/**
 * 消息的消费者(持续监听)
 */
public class Recv2 {

    private final static String EXCHANGE_NAME = "exchange_fanout";

    public static void main(String[] args) throws IOException, TimeoutException {

        //创建配置连接信息对象
        ConnectionFactory factory = new ConnectionFactory();

        factory.setHost("8.140.116.67");
        factory.setUsername("admin");
        factory.setPassword("992184xiang.");
        factory.setVirtualHost("/dev");
        factory.setPort(5672);

        //创建连接,消费者一版不自动关闭,因为持续监听
        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();

        //绑定交换机
        channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.FANOUT);

        //获取对列名称
        String queue = channel.queueDeclare().getQueue();

        //绑定交换机和队列,队列名称,交换机名称,routingKey
        channel.queueBind(queue,EXCHANGE_NAME,"");

        //消费者消费消息,创建消费者,把信道传进去
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                /**
                 * 模拟消费延迟
                 */
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }


                System.out.println("body:"+new String(body,"utf-8"));

                //手工确认消息消费,不是多条确认
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };

        //消费消息,自动确认
        /**
         * basicConsume(String queue, boolean autoAck, Consumer callback)
         * 第一个参数:队列名称
         * 第二个参数:自动确认消费完成
         * 第三个参数:消费者的回调函数
         */
        channel.basicConsume(queue,false,consumer);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
3.3.RabbitMQ路由模式实战

路由模式简介

交换机类型:Direct

队列和交换机绑定,需要指定一个路由key(BindingKey)

消息生产者发送消息到交换机,需要指定RoutingKey

交换机根据消息的路由key,转发给对应的队列

例子:日志采集系统,一个队列收集错误日志,一个队列收集全部日志

在这里插入图片描述

编码实战

/**
 * 消息的生产者
 */
public class Send {


    private final static String EXCHANGE_NAME = "exchange_direct";

    public static void main(String[] args) {

        //创建连接参数
        ConnectionFactory factory = new ConnectionFactory();
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("992184xiang.");
        factory.setHost("8.140.116.67");
        factory.setVirtualHost("/dev");

        //创建连接
        try(Connection connection = factory.newConnection()){
            //创建信道
            Channel channel = connection.createChannel();

            //绑定交换机,直连交换机direct
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

            //设置消息内容
            String errMsg = "错误日志";
            String infoMsg = "日常日志";
            String warnMsg = "警告日志";

            //推送消息
            /**
             * basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
             * 第一个参数:交换机名称,简单队列没有交换机,所以指定为空串
             * 第二个参数:RoutingKey,简单队列的routingKey和队列的名称相同
             * 第三个参数:配置信息
             * 第四个参数:消息体内容,转换成字节数组
             */
            //发送消息指定routingKey
            channel.basicPublish(EXCHANGE_NAME,"errRoutingKey",null,errMsg.getBytes(StandardCharsets.UTF_8));

            channel.basicPublish(EXCHANGE_NAME,"infoRoutingKey",null,infoMsg.getBytes(StandardCharsets.UTF_8));

            channel.basicPublish(EXCHANGE_NAME,"warnRoutingKey",null,warnMsg.getBytes(StandardCharsets.UTF_8));

            System.out.println("消息成功发送到mq中一条");

        } catch (TimeoutException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
/**
 * 消息的消费者(持续监听)
 */
public class Recv {

    private final static String EXCHANGE_NAME = "exchange_direct";

    public static void main(String[] args) throws IOException, TimeoutException {

        //创建配置连接信息对象
        ConnectionFactory factory = new ConnectionFactory();

        factory.setHost("8.140.116.67");
        factory.setUsername("admin");
        factory.setPassword("992184xiang.");
        factory.setVirtualHost("/dev");
        factory.setPort(5672);

        //创建连接,消费者一版不自动关闭,因为持续监听
        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();

        //绑定交换机,direct直连模式
        channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT);

        //获取队列名称
        String queueName = channel.queueDeclare().getQueue();

        //绑定交换机和队列
        channel.queueBind(queueName,EXCHANGE_NAME,"errRoutingKey");

        channel.queueBind(queueName,EXCHANGE_NAME,"infoRoutingKey");

        channel.queueBind(queueName,EXCHANGE_NAME,"warnRoutingKey");



        //消费者消费消息,创建消费者,把信道传进去
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumerTag:"+consumerTag);
                System.out.println("envelope:"+envelope);
                System.out.println("properties:"+properties);
                System.out.println("body:"+new String(body,"utf-8"));
            }
        };

        //消费消息,自动确认
        /**
         * basicConsume(String queue, boolean autoAck, Consumer callback)
         * 第一个参数:队列名称
         * 第二个参数:自动确认消费完成
         * 第三个参数:消费者的回调函数
         */
        channel.basicConsume(queueName,true,consumer);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
/**
 * 消息的消费者(持续监听)
 */
public class Recv2 {

    private final static String EXCHANGE_NAME = "exchange_direct";

    public static void main(String[] args) throws IOException, TimeoutException {

        //创建配置连接信息对象
        ConnectionFactory factory = new ConnectionFactory();

        factory.setHost("8.140.116.67");
        factory.setUsername("admin");
        factory.setPassword("992184xiang.");
        factory.setVirtualHost("/dev");
        factory.setPort(5672);

        //创建连接,消费者一版不自动关闭,因为持续监听
        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();

        //绑定交换机,direct直连模式
        channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT);

        //获取队列名称
        String queueName = channel.queueDeclare().getQueue();

        //绑定交换机和队列
        channel.queueBind(queueName,EXCHANGE_NAME,"errRoutingKey");

        //消费者消费消息,创建消费者,把信道传进去
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumerTag:"+consumerTag);
                System.out.println("envelope:"+envelope);
                System.out.println("properties:"+properties);
                System.out.println("body:"+new String(body,"utf-8"));
            }
        };

        //消费消息,自动确认
        /**
         * basicConsume(String queue, boolean autoAck, Consumer callback)
         * 第一个参数:队列名称
         * 第二个参数:自动确认消费完成
         * 第三个参数:消费者的回调函数
         */
        channel.basicConsume(queueName,true,consumer);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
3.4.RabbitMQ主题模式实战

背景

  • 如果业务很多路由key,怎末维护?
  • topic交换机,支持通配符,功能强大
  • 工作中基本都是topic模式

主题模式简介

交换机是topic,可以实现发布订阅模式fanout和路由模式Direct的功能,更加灵活,支持模式匹配和通配符匹配。

交换机通过通配符转发到相应的队列,*代表一个词,#代表一个或者多个词。

注意:交换机和队列绑定时用的binding使用通配符的路由键,生产者和交换机绑定要使用具体的路由键。

在这里插入图片描述

例子:日志采集系统,一个队列中收集全部的订单日志信息,order.log.#,一个队列中收集群全部的日志消息*.log.#

编码实战

/**
 * 消息的生产者
 */
public class Send {


    private final static String EXCHANGE_NAME = "exchange_topic";

    public static void main(String[] args) {

        //创建连接参数
        ConnectionFactory factory = new ConnectionFactory();
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("992184xiang.");
        factory.setHost("8.140.116.67");
        factory.setVirtualHost("/dev");

        //创建连接
        try(Connection connection = factory.newConnection()){
            //创建信道
            Channel channel = connection.createChannel();

            //绑定交换机,直连交换机direct
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

            //设置消息内容
            String errMsg = "错误日志";
            String infoMsg = "日常日志";
            String warnMsg = "警告日志";

            //推送消息
            /**
             * basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
             * 第一个参数:交换机名称,简单队列没有交换机,所以指定为空串
             * 第二个参数:RoutingKey,简单队列的routingKey和队列的名称相同
             * 第三个参数:配置信息
             * 第四个参数:消息体内容,转换成字节数组
             */
            channel.basicPublish(EXCHANGE_NAME,"order.log.error",null,errMsg.getBytes(StandardCharsets.UTF_8));

            channel.basicPublish(EXCHANGE_NAME,"order.log.info",null,infoMsg.getBytes(StandardCharsets.UTF_8));

            channel.basicPublish(EXCHANGE_NAME,"order.log.warn",null,warnMsg.getBytes(StandardCharsets.UTF_8));

            System.out.println("消息成功发送到mq中一条");

        } catch (TimeoutException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
/**
 * 消息的消费者(持续监听)
 */
public class Recv {

    private final static String EXCHANGE_NAME = "exchange_topic";

    public static void main(String[] args) throws IOException, TimeoutException {

        //创建配置连接信息对象
        ConnectionFactory factory = new ConnectionFactory();

        factory.setHost("8.140.116.67");
        factory.setUsername("admin");
        factory.setPassword("992184xiang.");
        factory.setVirtualHost("/dev");
        factory.setPort(5672);

        //创建连接,消费者一版不自动关闭,因为持续监听
        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();

        //绑定交换机,direct直连模式
        channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.TOPIC);

        //获取队列名称
        String queueName = channel.queueDeclare().getQueue();

        //绑定交换机和队列
        channel.queueBind(queueName,EXCHANGE_NAME,"order.log.error");

        //消费者消费消息,创建消费者,把信道传进去
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumerTag:"+consumerTag);
                System.out.println("envelope:"+envelope);
                System.out.println("properties:"+properties);
                System.out.println("body:"+new String(body,"utf-8"));
            }
        };

        //消费消息,自动确认
        /**
         * basicConsume(String queue, boolean autoAck, Consumer callback)
         * 第一个参数:队列名称
         * 第二个参数:自动确认消费完成
         * 第三个参数:消费者的回调函数
         */
        channel.basicConsume(queueName,true,consumer);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
/**
 * 消息的消费者(持续监听)
 */
public class Recv2 {

    private final static String EXCHANGE_NAME = "exchange_topic";

    public static void main(String[] args) throws IOException, TimeoutException {

        //创建配置连接信息对象
        ConnectionFactory factory = new ConnectionFactory();

        factory.setHost("8.140.116.67");
        factory.setUsername("admin");
        factory.setPassword("992184xiang.");
        factory.setVirtualHost("/dev");
        factory.setPort(5672);

        //创建连接,消费者一版不自动关闭,因为持续监听
        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();

        //绑定交换机,direct直连模式
        channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.TOPIC);

        //获取队列名称
        String queueName = channel.queueDeclare().getQueue();

        //绑定交换机和队列
        channel.queueBind(queueName,EXCHANGE_NAME,"*.log.#");

        //消费者消费消息,创建消费者,把信道传进去
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumerTag:"+consumerTag);
                System.out.println("envelope:"+envelope);
                System.out.println("properties:"+properties);
                System.out.println("body:"+new String(body,"utf-8"));
            }
        };

        //消费消息,自动确认
        /**
         * basicConsume(String queue, boolean autoAck, Consumer callback)
         * 第一个参数:队列名称
         * 第二个参数:自动确认消费完成
         * 第三个参数:消费者的回调函数
         */
        channel.basicConsume(queueName,true,consumer);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
3.5.RabbitMQ多种工作模式的总结

简单模式

一个生产者、一个消费者,不用指定交换机,使用默认的交换机。

在这里插入图片描述

工作模式

一个生产者,多个消费,可以有轮询和公平策略,不用指定交换机,使用默认的交换机。

在这里插入图片描述

发布订阅模式

fanout类型交换机,通过交换机和队列绑定,不用指定绑定的路由键,生产者发送消息到交换机,fanout交换机直接进行转发,消息不用指定routingKey路由键

在这里插入图片描述

路由模式

direct类型交换机,交换机和队列绑定,指定的绑定的路由键,生产者发送消息到交换机,交换机根据消息的路由key进行转发到对应的队列,消息指定的RoutingKey要和交换机绑定队列的bindingKey一致进行转发。

在这里插入图片描述

主题模式

topic交换机,交换机和队列绑定,指定绑定的通配符,生产者发送消息到交换机,交换机根据消息的路由key进行转发到对应的队列,消息要制定routingKey路由键

在这里插入图片描述

3.6.JAVA整合RabbitMQ的核心API

创建配置连接对象

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort("5672");
factory.setUsername("admin");
factory.setPassword("123456");
factory.setVirtualHost("/dev");
//创建连接对象
Connection connection = factory.newConnection();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

创建信道

Channel channel = connection.createChannel();
  • 1

绑定队列

/**
  * 第一个参数:队列的名称 
  * 第二个参数:是否持久化配置
  * 第三个参数:是否独占,发布订阅模型一般独占
  * 第四个参数:自动删除,当没有消费者的消费消息的时候,是否自动删除消息
  * 第五个参数:其他参数
  **/
channel.queueDeclare(queueName,false,false,false,null);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

推送消息

/**
  * 第一个参数:交换机名称 
  * 第二个参数:队列名称
  * 第三个参数:配置信息
  * 第四个参数:发送消息的字节数组
  **/
channel.basicPublish("",queueName,null,msg.getBytes(StandardCharsets.UTF_8));
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

消费消息

        //消费者消费消息,创建消费者,把信道传进去
Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumerTag:"+consumerTag);
                System.out.println("envelope:"+envelope);
                System.out.println("properties:"+properties);
                System.out.println("body:"+new String(body,"utf-8"));
            }
        };
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
/**
  * 第一个参数:队列名称 
  * 第二个参数:是否自动消费完成
  * 第三个参数:消费者的回调函数
  **/
channel.basicConsume(queueName,true,consumer);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

限制消费者每次只能消费一个

channel.basicQos(1);
  • 1

手工确认消费消息

channel.basicAck(envelope.getDeliveryTag,false);
  • 1

绑定交换机

channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.TOPIC);
  • 1

获取队列名称

String queueName = channel.queueDeclare().getQueue();
  • 1

绑定交换机和队列

channel.queueBind(queueName,EXCHANGE_NAME,"order.log.error");
  • 1

4.SpringBoot整合Spring-AMPQ

4.1.什么是Spring-AMQP
Spring 框架的AMQP消息解决方案,提供模板化的发送和接收消息的抽象层,提供基于消息驱动的 POJO的消息监听等.
提供不依赖于任何特定的AMQP代理实现或客户端库通用的抽象,最终用户代码将很容易实现更易替换、添加和删除AMQP,因为它可以只针对抽象层来开发.
总之就是提高我们的框架整合消息队列的效率,SpringBoot为更方便开发RabbitMQ推出了starter.
  • 1
  • 2
  • 3
4.2.引入AMQP-starter依赖
<!-- 代码库 -->
    <repositories>
        <repository>
            <id>maven-ali</id>
            <url>http://maven.aliyun.com/nexus/content/groups/public//</url>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>true</enabled>
                <updatePolicy>always</updatePolicy>
                <checksumPolicy>fail</checksumPolicy>
            </snapshots>
        </repository>
    </repositories>

    <pluginRepositories>
        <pluginRepository>
            <id>public</id>
            <name>aliyun nexus</name>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </pluginRepository>
    </pluginRepositories>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
 <!--引入AMQP-->
<dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
4.3.SpringBoot整合RabbitMQ编码

yml配置文件修改

#消息队列
spring: 
  rabbitmq:
    host: 10.211.63.14
    port: 5672
    virtual-host: /dev
    password: 123456
    username: admin
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

RabbitMQConfig配置类

@Configuration
public class RabbitMQConfig{
    public static final String EXCHANGE_NAME = "exchange_order";
    public static final String QUEUE = "order_queue";
    
    /**
     * 交换机
     * @return
     */
    @Bean
    public Exchange orderExchange(){
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(ture).build();
    }
    
    /**
     * 队列
     * @return
     */
    @Bean
    public Queue orderQueue(){
        return QueueBuilder.durable(QUEUE).build();
    }
    
    /**
     * 交换机和队列绑定关系
     */
    @Bean
    public Binding orderBinding(Queue queue,Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("order.#").noargs();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

生产者发送消息

rabbitTemplate.converAndSend(RabbitMQConfig.EXCHANGE_NAME,"order.new","新订单");
  • 1

消费者监听消息

@Component
@RabbitMQListener(queue = "order_queue")
public class OrderMQListener{
    
    /**
     * RabbitHandler 会自动匹配 消息类型(消息自动确认)
     * @param msg
     * @param message
     * @throws IOException
     */
    @RabbitHandler
    public void messageHandler(String body,Message message){
        long msgTag = message.getMessageProperties().getDeliveryTag();
        System.out.println("msgTag="+msgTag);
        System.out.println("message="+message.toString());
        System.out.println("监听到消息:消息内容:"+message.getBody());
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

5.RabbitMQ的消息可靠性投递

5.1.什么是RabbitMQ的消息可靠性投递
  • 保证mq节点成功接收到消息,消息发送端需要接收到mq服务端接受到消息的确认应答,完善消息补偿机制,发送失败的消息可以二次感知,并进行二次处理。
5.2.RabbitMQ消息投递路径
  • 生产者->交换机->队列->消费者
5.3.通过两个点控制消息的可靠性投递
  • 生产者到交换机:confirmCallback

  • 交换机到队列:returnCallback

**注意:**开启消息确认机制以后,保证了消息的准确送达,但由于频繁的确认交互,rabbitmq整体的新跟那个效率会变低,吞吐量下降严重,不是非常重要的消息不建议开启消息确认机制

5.4.RabbitMQ消费可靠性投递confirmCallback实战

生产者到交换机

通过confirmCallback

生产者投递消息后,如果Broker收到消费后,会给生产者一个ACK。生产者通过ACK,可以确认这条消息是否正常发送到Broker,这种法师是方式可靠性投递的核心。

开启confirmCallback

#旧版,确认消息发送成功,通过实现ConfirmCallBack接口,消息发送到交换机Exchange后触发回调
spring.rabbitmq.publisher-confirms=true
#新版,NONE值是禁用发布确认模式,是默认值,CORRELATED值事发布消息成功到交换机后触发回调方法
spring.rabbitmq.publisher-confirm-type: correlated
  • 1
  • 2
  • 3
  • 4

编码实战

核心API:setConfirmCallback() 、confirm(配置,是否接到消息,失败的原因)

@Test
    void testConfirmCallback(){
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {

            /**
             * @param correlationData 配置
             * @param ack 交换机是否收到消息,true是成功,false是失败
             * @param cause 失败的原因
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("ConfirmCallback======>");
                System.out.println("correlationData======>"+correlationData);
                System.out.println("ack======>"+ack);
                System.out.println("cause======>"+cause);

                if(ack){
                    System.out.println("发送成功");
                    //更新数据库的状态,状态为成功
                }else {
                    System.out.println("发送失败,记录到日志或者数据库");
                    //更新数据库的状态,状态为失败
                }

            }
        });
        //数据库新增一个消息记录,状态是发送,发送消息
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"order.new", "中台收到一条新订单");
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

**模拟异常:**修改交换机名称

5.5.RabbitMQ消费可靠性投递returnCallback实战

交换机到队列

  • 通过returnCallback
    • 消息从交换机发送到对应的队列失败时,触发
  • 两种模式:
    • 交换机到队列不成功,则丢弃消息(默认)
    • 交换机到队列不成功,发挥生产者,触发returnCallback
#配置为true,则交换机处理消息到路由失败,会返回给生产者
spring.rabbitmq.template.mandatory=true
#或者在temlate对象上设置
template.setMandatory(true);
  • 1
  • 2
  • 3
  • 4

第一步:开启returnCallback配置

#新版
spring.rabbitmq.publisher-returns=true
  • 1
  • 2

第二步:修改交换机投递到队列失败的策略

#为true,则交换机处理消息到路由失败会返回给生产者
spring.rabbitmq.template.mandatory=true
  • 1
  • 2

编码实战

    @Test
    void testReturnCallback(){
		//publisher-returns为true则交换机处理消息到路由失败,返回给生产者
        //mandatory为true则消息未被路由到任何一个queue,则回退一条消息给生产者
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returned) {
                System.out.println("ReturnedMessage:" + returned.toString());

                int replyCode = returned.getReplyCode();

                System.out.println("_______________________");

                System.out.println("replyCode:" + replyCode);
            }
        });
        //数据库新增一个消息记录,状态是发送,发送消息
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"order.new", "中台收到一条新订单");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
5.6.Rabbitmq的消息确机制ACK讲解

背景:消费者从broker中监听消息,需要确保消息被合理的处理掉

RabbitMQ的ACK介绍

  • 消费者从RabbitMQ收到消息并处理完成后,反馈给RabbitMQ,RabbitMQ收到反馈后才将此消息从队列中删除。
  • 消费者在处理消息出现了网络不稳定、服务器异常等现象,按摩就不会有ACK反馈,RabbitMQ回认为i这个消息没有正常消费,会将消息重新放入队列中。
  • 只有当消费者正确的发送ACK反馈,RabbitMQ确认收到后,消息才会从RabbitMQ服务器的数据中删除。
  • 消息的ACK确认机制默认是打开的,消息如违背进行ACK消息确认机制,这条消息将被锁定Unacked

确认方式

  • 自动确认(默认)
  • 手动确认(manual)
spring:
  rabbitmq: 
    #开启手动确认消息,如果消息重新入队,进行重试
    listener: 
      simple: 
        acknowledge-mode: manual
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
5.7.编码实战
@Component
@RabbitListener(queues = "order_queue")
public class OrderMQListener {

    /**
     * 处理器,适配器,加上@RabbitHandler注解
     * 加上Channel这个参数
     */
    @RabbitHandler
    public void messageHandler(String body, Message message, Channel channel) throws IOException {
        long tag = message.getMessageProperties().getDeliveryTag();

        System.out.println("message:"+message.toString());
        System.out.println("==============");

        System.out.println("消息标识tag:"+tag);
        System.out.println("消息体body:"+body);

        //第一个参数是该消息的index,第二个是是否批量操作
        channel.basicAck(tag,false);
        //第一个参数是index,第二个是否批量,第三个是失败后是否重新返回给生产者重新投递
        //channel.basicNack(msgTag,false,true);
    }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

deliveryTage介绍

  • 表示消息投递序号,每次消费消息或者消息重新投递后,deliveryTag都会增加

basicNack和basicReject介绍

  • basicReject一次只能拒绝接收一个消息,可以设置是否requeue
  • basicNack方法可以支持一次0个或多个消息的拒收,可以设置是否requeue

6.RabbitMQ死信队列

6.1.什么是TTL
  • time to live消息存活时间
  • 如果消息在存活时间内未被消费,就会清除
  • RabbitMQ支持两种TTL
    • 单独消息进行配置ttl
    • 个队列进行配置ttl
6.2.什么是RabbitMQ的死信队列
  • 没有被及时消费的消息存放的队列
6.3.什么是RabbitMQ的死信交换机
  • Dead Letter Exchange(死信交换机,缩写:DLX)当消息过期未被消费后,会通过死信交换机,转发到死信队列,这个交换机就是DLX死信交换机

在这里插入图片描述

6.4.消息有哪几种情况成为死信
  • 消费者拒收消息(basicNack或者basicReject),并且没有重新入队requeue=false
  • 消息在队列中未被消费,且到了过期时间的消息(TTL)
  • 队列长度达到极限后,如果绑定死信交换机和死信队列就会被投放到死信队列
6.5.什么是延迟队列
  • 一种带有延迟功能的消息队列,Producer将消息发送到消息队列的服务端,但并不希望该条消息立马投递,而是推迟到当前时间的之后的一个时间Consumer进行消费,也可以叫定时消息

使用场景

  • 通过消息触发一些定时任务,比如在某一固定时间点向用户发送提醒消息
  • 用户登录之后5分钟给用户做分类推送、用户多少天未登录给用户做召回推送
  • 订单超时未支付关闭订单场景

业界一些实现方式

  • 定时任务高精度轮回
  • 采用RocketMQ自带延迟消息功能
  • RabbitMQ结合死信交换机死信队列做到延迟消息
6.6.死信队列+死信交换机实现延迟消息

在这里插入图片描述

/**
 * mq配置类
 * 新商家上架->new_merchant_queue->死信交换机->死信队列
 */
@Configuration
public class RabbitMQConfig {

    /**
     * 死信队列
     */
    public static final String LOCK_MERCHANT_DEAD_QUEUE = "lock_merchant_dead_queue";

    /**
     * 死信交换机
     */
    public static final String LOCK_MERCHANT_DEAD_EXCHANGE = "lock_merchant_dead_exchange";

    /**
     * 进入死信队列的路由key
     */
    public static final String LOCK_MERCHANT_ROUTING_KEY = "lock_merchant_routing_key";


    /**
     * 创建死信交换机
     * @return
     */
    @Bean
    public Exchange lockMerchantDeadExchange(){
        return new TopicExchange(LOCK_MERCHANT_DEAD_EXCHANGE,true,false);
    }

    /**
     * 创建死信队列
     * @return
     */
    @Bean
    public Queue lockMerchantDeadQueue(){
        return QueueBuilder.durable(LOCK_MERCHANT_DEAD_QUEUE).build();
    }

    /**
     * 绑定死信交换机和死信队列
     * @return
     */
    @Bean
    public Binding lockMerchantBinding(){

        return new Binding(LOCK_MERCHANT_DEAD_QUEUE,Binding.DestinationType.QUEUE,
                LOCK_MERCHANT_DEAD_EXCHANGE,LOCK_MERCHANT_ROUTING_KEY,null);
    }


    /**
     * 普通队列,绑定的个死信交换机
     */
    public static final String NEW_MERCHANT_QUEUE = "new_merchant_queue";

    /**
     * 普通的topic交换机
     */
    public static final String NEW_MERCHANT_EXCHANGE = "new_merchant_exchange";

    /**
     * 路由key
     */
    public static final String NEW_MERCHANT_ROUTIING_KEY = "new_merchant_routing_key";


    /**
     * 创建普通交换机
     * @return
     */
    @Bean
    public Exchange newMerchantExchange(){
        return new TopicExchange(NEW_MERCHANT_EXCHANGE,true,false);
    }

    /**
     * 创建普通队列
     * @return
     */
    @Bean
    public Queue newMerchantQueue(){

        Map<String,Object> args = new HashMap<>(3);
        //消息过期后,进入到死信交换机
        args.put("x-dead-letter-exchange",LOCK_MERCHANT_DEAD_EXCHANGE);

        //消息过期后,进入到死信交换机的路由key
        args.put("x-dead-letter-routing-key",LOCK_MERCHANT_ROUTING_KEY);

        //过期时间,单位毫秒
        args.put("x-message-ttl",20000);


        return QueueBuilder.durable(NEW_MERCHANT_QUEUE).withArguments(args).build();
    }

    /**
     * 绑定交换机和队列
     * @return
     */
    @Bean
    public Binding newMerchantBinding(){

        return new Binding(NEW_MERCHANT_QUEUE,Binding.DestinationType.QUEUE,
                NEW_MERCHANT_EXCHANGE,NEW_MERCHANT_ROUTIING_KEY,null);
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
@Component
@RabbitListener(queues = RabbitMQConfig.LOCK_MERCHANT_DEAD_QUEUE)
public class OrderMQListener {

    /**
     * 处理器,适配器,加上@RabbitHandler注解
     */
    @RabbitHandler
    public void messageHandler(String body, Message message, Channel channel) throws IOException {
        long tag = message.getMessageProperties().getDeliveryTag();

        System.out.println("message:"+message.toString());
        System.out.println("==============");

        System.out.println("消息标识tag:"+tag);
        System.out.println("消息体body:"+body);

        //如果发生异常就重新入队
        try{
            channel.basicAck(tag,false);
        }catch (Exception e){
            //拒收消息,重新入队
            channel.basicNack(tag,false,true);
        }
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

7.RabbitMQ高可用集群模式

7.1.RabbitMQ集群模式介绍

普通集群

  • 默认的集群模式,普通集群模式下同步交换机、队列、虚拟主机元数据,不同步消息信息,当消费者去B节点访问数据,数据在A节点,这时A节点先把数据转发给B节点,消费者在从B节点中pull消息。
  • 存在问题:假如存在消息的节点宕机了,那么消费者想要消费这个消息,就要等当前节点恢复后才可以恢复正常,如果没有做消息持久化,则消息会丢失

镜像集群

  • 队列做成镜像队列,让队列存在于各个节点中,和普通集群比较大的区别就是queue的message在各个节点之间同步,并不是在consumer获取时拉去,转发。
  • 存在问题:由于镜像队列模式下,消息数量过去,大量的消息同步也会加大网络带宽的开销,适合高可用的项目,多节点性能会收到影响。

注意:集群需要保证各个节点有相同的token令牌,集群内各个节点的erlang.cookie需要相同,才可以相互通信

在这里插入图片描述

7.2.RabbitMQ搭建普通集群

准备三个mq节点

#节点一,主节点,创建-v映射目录
docker run -d --hostname rabbit_host1 --name rabbitmq1 -p 15672:15672 -p 5672:5672 -e RABBITMQ_NODENAME=rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=xdclass.net168  -e RABBITMQ_ERLANG_COOKIE='rabbitmq_cookie_xdclass' --privileged=true -v /usr/local/rabbitmq/1/lib:/var/lib/rabbitmq -v /usr/local/rabbitmq/1/log:/var/log/rabbitmq rabbitmq:management
  • 1
  • 2
#节点二,创建-v映射目录
docker run -d --hostname rabbit_host2 --name rabbitmq2  -p 15673:15672 -p 5673:5672 --link rabbitmq1:rabbit_host1 -e RABBITMQ_NODENAME=rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=xdclass.net168 -e RABBITMQ_ERLANG_COOKIE='rabbitmq_cookie_xdclass' --privileged=true -v /usr/local/rabbitmq/2/lib:/var/lib/rabbitmq -v /usr/local/rabbitmq/2/log:/var/log/rabbitmq rabbitmq:management
  • 1
  • 2
#节点三,创建-v映射目录
docker run -d --hostname rabbit_host3 --name rabbitmq3 -p 15674:15672 -p 5674:5672 --link rabbitmq1:rabbit_host1 --link rabbitmq2:rabbit_host2 -e RABBITMQ_NODENAME=rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=xdclass.net168 -e RABBITMQ_ERLANG_COOKIE='rabbitmq_cookie_xdclass' --privileged=true -v /usr/local/rabbitmq/3/lib:/var/lib/rabbitmq -v /usr/local/rabbitmq/3/log:/var/log/rabbitmq rabbitmq:management
  • 1
  • 2

参数说明:

--hostname:自定义Docker容器的hostname
--link:容器之间连接,link不可缺,使得三个容器相互通信
--privileged=true:使用该参数,container内的root拥有真正的root权限,否则容器出现perission denied
-v:宿主机和容器路径映射
RABBITMQ_DEFAULT_USER=admin:配置用户名
RABBITMQ_DEFAULT_PASS=123456:配置密码
Erlang Cookie值必须相同,相当于不同节点间通信的密钥。
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

配置集群:

#节点一配置集群
docker exec -it rabbitmq1 bash
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app
exit

#节点二假如集群,--ram是以内存方式加入,忽略该参数默认未磁盘节点
docker exce -it rabbitmq2 bash
rabbitmqctl stop_app
rabbitmqctl join_cluster --ram rabbit@rabbit_host1
exit

#节点三假如集群,--ram是以内存方式加入,忽略该参数默认未磁盘节点
docker exce -it rabbitmq3 bash
rabbitmqctl stop_app
rabbitmqctl join_cluster --ram rabbit@rabbit_host1
exit

#查看集群节点状态,配置启动三个节点,1个磁盘节点和2个内存节点
rabbitmqctl cluster_status
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

访问节点1的web控制台,可以看到多个节点

在这里插入图片描述

测试:node1节点创建队列,发送消息(可以选择消息的持久化,Spring-AMPQ中默认就是持久化),node2和node3都可以看到消息同步,kill掉node1节点,发现node2和node3为NaN模式,如果是非主节点创建队列和发送消息,则其他队列也可以显示。

7.3.RabbitMQ高可用mirror镜像集群配置策略

背景:解决普通集群主节点宕机出现数据丢失的现象

RabbitMQ的策略policy是用来控制和修改集群的bhost队列和Exchange复制行为,就是要设置那些Exchange或者queue的数据需要复制、同步,以及如何复制同步

创建一个策略来匹配队列

路径:rabbitmq管理界面->Admin->Policies->Add/update a policy

参数:策略会同步一个VirtualHost中的交换机和队列数据

name:自定义策略名称
Pattern:^匹配符,代表匹配所有
Definition:ha-mode=all为匹配类型,分为3中模式
  • 1
  • 2
  • 3
ha-model:指明镜像队列的模式,可选下面一种
all:表示在集群中所有的节点上进行镜像同步
exactly:表示在指定个数的节点上进行镜像同步,节点的个数由ha-params指定
nodes:表示在指定的节点上进行镜像同步,界定啊名称由ha-params指定
ha-sync-mode:镜像消息同步方式automatic(自动),manually(手动)
  • 1
  • 2
  • 3
  • 4
  • 5

在这里插入图片描述

配置好后,+2的意思就是由三个节点,一个节点本身和两个镜像节点

在这里插入图片描述

集群重启顺序

  • 集群重启的顺序是固定的,并且是相反的
  • 启动顺序:磁盘节点=》内存节点
  • 关闭顺序:内存节点=》磁盘节点
7.4.SpringBoot配置RabbitMQ集群
把host和port节点去掉换成addresses :10.211.55.13:5672,10.211.55.13:5673,10.211.55.13:5674
  • 1
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/很楠不爱3/article/detail/430543
推荐阅读
相关标签
  

闽ICP备14008679号