当前位置:   article > 正文

RabbitMQ安装,工作实战,消息可靠性及高级特性_rabittmq vb60

rabittmq vb60

RabbitMQ安装,工作实战,消息可靠性及高级特性

一.下载RabbitMQ安装包 这里还需要下载对应的erlang安装包
1.1 推荐大家直接下载我的

链接: https://pan.baidu.com/s/1aPx1kUupn_vd9oVNjhOxCA 提取码: 1999

1.2 在linux中输入如下命令
1.3 进入local目录 创建rabbitmq文件夹
[root@VM-12-10-centos ~]# cd /usr/local/
[root@VM-12-10-centos local]# mkdir rabbitmq
  • 1
  • 2
1.4 上传erlang-23.2.4-1.el7.x86_64.rpm,rabbitmq-server-3.7.28-1.el7.noarch.rpm到/usr/local/rabbitmq路径下
[root@VM-12-10-centos local]# cd rabbitmq/
[root@VM-12-10-centos rabbitmq]# ls
erlang-23.2.4-1.el7.x86_64.rpm  rabbitmq-server-3.7.28-1.el7.noarch.rpm
  • 1
  • 2
  • 3
1.5 解压,安装erlang
[root@VM-12-10-centos rabbitmq]# rpm -Uvh erlang-23.2.4-1.el7.x86_64.rpm 
[root@VM-12-10-centos rabbitmq]# yum install -y erlang
## 查看版本号
[root@VM-12-10-centos rabbitmq]# erl -v  #这里可以连续按两下Ctrl + C即可退出查看版本操作
  • 1
  • 2
  • 3
  • 4
1.6 安装rabbitmq
[root@VM-12-10-centos rabbitmq]# yum install -y socat
[root@VM-12-10-centos rabbitmq]# rpm -Uvh rabbitmq-server-3.7.28-1.el7.noarch.rpm
[root@VM-12-10-centos rabbitmq]# yum install -y rabbitmq-server
  • 1
  • 2
  • 3
1.7 启动rabbitmq
[root@VM-12-10-centos rabbitmq]# systemctl start rabbitmq-server
## 查看rabbitmq状态
[root@VM-12-10-centos rabbitmq]# systemctl status rabbitmq-server
  • 1
  • 2
  • 3

在这里插入图片描述

1.8 安装启动rabbitmq管理界面
## 打开RabbitMQWeb管理界面插件
[root@VM-12-10-centos rabbitmq]# rabbitmq-plugins enable rabbitmq_management
## 添加用户
[root@VM-12-10-centos rabbitmq]# rabbitmqctl add_user 用户名 密码
## 设置用户角色,分配操作权限
[root@VM-12-10-centos rabbitmq]# rabbitmqctl set_user_tags 用户 密码 administrator
## 为用户添加资源权限(授予访问虚拟机根节点的所有权限)
[root@VM-12-10-centos rabbitmq]# rabbitmqctl set_permissions -p / 用户名 ".*" ".*" ".*"
## 关闭防火墙 同时打开服务器的安全组 打开端口号为15672
[root@VM-12-10-centos rabbitmq]# systemctl stop firewalld.service
## 访问rabbitMQ管理界面
http://118.126.91.205:15672/#
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

在这里插入图片描述

1.8 其他命令操作
## 修改密码
[root@VM-12-10-centos rabbitmq]# rabbitmqctl change_ password 用户名 新密码
## 删除用户
[root@VM-12-10-centos rabbitmq]# rabbitmqctl delete_user 用户名
## 查看用户清单
[root@VM-12-10-centos rabbitmq]# rabbitmqctl list_users
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
二.RabbitMQ工作实战(不同消息模式的使用)

以下放入代码 大家创建springboot项目复制粘贴类文件即可

2.1 项目目录

在这里插入图片描述

2.1 项目代码

pom.xml

 <dependencies>
         <!--Springboot项目自带 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <!--Springboot Web项目 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!--lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.22</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.28</version>
        </dependency>

        <!-- Rabbitmq依赖 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <!-- Rabbitmq单元测试 不需要启动服务-->
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43

entity

package com.weige.javaskillpoint.entity;

import lombok.Data;

@Data
public
class Consume {
    private String mode;
    private String name;
    private String sex;
    private String content;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

application.yml

server:
  port: 8080

spring:
  rabbitmq:
    host: 118.126.91.205
    port: 5672
    username: weikai
    password: ******
    virtual-host: /

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

生产者
JavaSkillPointApplicationTests 单元测试

package com.weige.javaskillpoint;

import com.alibaba.fastjson.JSON;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.util.HashMap;
import java.util.Map;

@SpringBootTest
class JavaSkillPointApplicationTests {

    // 注入rabbitTemplate
    @Autowired
    RabbitTemplate rabbitTemplate;

    // Hello World模式
    @Test
    void HelloWorld() {
        Map<Object, Object> map = new HashMap<>();
        map.put("mode", "SpringBoot整合MQ发送的消息,模式为Hello World");
        map.put("mode_description", "点对点模式,生产者将消息发送到默认交换机后分到队列,队列只有一个消费者");
        map.put("name", "刘德华");
        map.put("sex", "男");
        map.put("content", "用户注册成功,异步发送给用户发送短信");
        rabbitTemplate.convertAndSend("helloWorldQueue", JSON.toJSONString(map));
    }

    // World queues模式
    @Test
    void World() {
        Map<Object, Object> map;
        for (int i = 1; i <= 5; i++) {
            map = new HashMap<>();
            map.put("mode", "SpringBoot整合MQ发送的消息,模式为World queues");
            map.put("mode_description", "工作队列模式,一个队列有多个消费者,但是最终只有一个消费者可以消费信息");
            map.put("name", "周杰伦" + i);
            map.put("sex", "男");
            map.put("content", "用户注册成功,异步发送给用户发送短信");
            rabbitTemplate.convertAndSend("WorldQueues", JSON.toJSONString(map));
        }
    }

    // Pub/Sub(广播模式)模式
    @Test
    void pubSub() {
        Map<Object, Object> map = new HashMap<>();
        map.put("mode", "SpringBoot整合MQ发送的消息,模式为publish/subscribe");
        map.put("mode_description", "发布订阅模式,生产者将消息发送到默认交换机,分发给多个队列,最终到多个消费者");
        map.put("name", "黄家驹");
        map.put("sex", "男");
        map.put("content", "用户注册成功,异步发送给用户发送短信");
        // 指定交换机
        rabbitTemplate.convertAndSend("boot-pubsub-exchange", "", JSON.toJSONString(map));
    }

    // routing(路由)模式
    @Test
    void route() {
        Map<Object, Object> map = new HashMap<>();
        map.put("mode", "SpringBoot整合MQ发送的消息,模式为route");
        map.put("mode_description", "路由模式,在不同情况希望有不同的消费者来收到消息,路由模式可以让我们自定义规则");
        map.put("name", "黎明");
        map.put("sex", "男");
        map.put("content", "用户注册成功,异步发送给用户发送短信");
        // 指定交换机
        rabbitTemplate.convertAndSend("boot-route-exchange", "info", JSON.toJSONString(map));
    }

    // topic模式
    @Test
    void topic() {
        Map<Object, Object> map = new HashMap<>();
        map.put("mode", "SpringBoot整合MQ发送的消息,模式为route");
        map.put("mode_description", "routing模式,在不同情况希望有不同的消费者来收到消息,路由模式可以让我们自定义规则");
        map.put("name", "周润发");
        map.put("sex", "男");
        map.put("content", "用户注册成功,异步发送给用户发送短信");
        // 指定交换机
        rabbitTemplate.convertAndSend("boot-topic-exchange", "black.dog.and.cat", JSON.toJSONString(map));
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85

consume 消费者
hello World模式

package com.weige.javaskillpoint.consume;

import com.alibaba.fastjson.JSON;
import com.weige.javaskillpoint.entity.Consume;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

// Hello World模式
@Component
public class HelloWorldConsume {

    // 使用queuesToDeclare声明队列并从这个队列中消费消息
    @RabbitListener(queuesToDeclare = @Queue(name = "helloWorldQueue"))
    public void receive(String msg) {
        Consume consume = JSON.parseObject(msg, Consume.class);
        System.out.println(consume.getName());
        System.out.println("消费者接收到的消息是:" + consume);
        System.out.println("进行逻辑操作");
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

World queues模式

package com.weige.javaskillpoint.consume;

import com.alibaba.fastjson.JSON;
import com.weige.javaskillpoint.entity.Consume;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

// World queues模式
@Component
public class WorldQueuesConsume {

    // 使用queuesToDeclare声明队列并从这个队列中消费消息
    @RabbitListener(queuesToDeclare = @Queue(name = "WorldQueues", durable = "false"))
    public void receive1(String msg) {
        Consume consume = JSON.parseObject(msg, Consume.class);
        System.out.println("消费者1接收到的消息是:" + consume);
        System.out.println("进行逻辑操作1");
    }

    // 使用queuesToDeclare声明队列并从这个队列中消费消息
    @RabbitListener(queuesToDeclare = @Queue(name = "WorldQueues", durable = "false"))
    public void receive2(String msg) {
        Consume consume = JSON.parseObject(msg, Consume.class);
        System.out.println("消费者2接收到的消息是:" + consume);
        System.out.println("进行逻辑操作2");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

发布订阅模式

package com.weige.javaskillpoint.consume;

import com.alibaba.fastjson.JSON;
import com.weige.javaskillpoint.entity.Consume;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

// 发布订阅模式
@Component
public class PubSubConsume {

    // 使用queuesToDeclare声明队列并从这个队列中消费消息
    @RabbitListener(bindings = {
            @QueueBinding(value = @Queue,   // 创建临时队列
                    exchange = @Exchange(value = "boot-pubsub-exchange", type = "fanout")) // 绑定的交换机
    })
    public void receive1(String msg) {
        Consume consume = JSON.parseObject(msg, Consume.class);
        System.out.println(consume.getName());
        System.out.println("消费者1接收到的消息是:" + consume);
        System.out.println("进行逻辑操作");
    }

    @RabbitListener(bindings = {
            @QueueBinding(value = @Queue,   // 创建临时队列
                    exchange = @Exchange(value = "boot-pubsub-exchange", type = "fanout")) // 绑定的交换机
    })
    public void receive2(String msg) {
        Consume consume = JSON.parseObject(msg, Consume.class);
        System.out.println(consume.getName());
        System.out.println("消费者2接收到的消息是:" + consume);
        System.out.println("进行逻辑操作");
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

路由模式

package com.weige.javaskillpoint.consume;

import com.alibaba.fastjson.JSON;
import com.weige.javaskillpoint.entity.Consume;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

// 路由模式
@Component
public class RoutingConsume {

    // 使用queuesToDeclare声明队列并从这个队列中消费消息
    @RabbitListener(bindings = {
            @QueueBinding(value = @Queue,   // 创建临时队列
                    exchange = @Exchange(value = "boot-route-exchange", type = "direct"),
                    key = {"info", "error"})})
    public void receive1(String msg) {
        Consume consume = JSON.parseObject(msg, Consume.class);
        System.out.println(consume.getName());
        System.out.println("消费者1接收到的消息是:" + consume);
        System.out.println("进行逻辑操作");
    }

    @RabbitListener(bindings = {
            @QueueBinding(value = @Queue,   // 创建临时队列
                    exchange = @Exchange(value = "boot-route-exchange", type = "direct"),
                    key = {"hh", "error"})})
    public void receive2(String msg) {
        Consume consume = JSON.parseObject(msg, Consume.class);
        System.out.println(consume.getName());
        System.out.println("消费者2接收到的消息是:" + consume);
        System.out.println("进行逻辑操作");
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

Topic模式

package com.weige.javaskillpoint.consume;

import com.alibaba.fastjson.JSON;
import com.weige.javaskillpoint.entity.Consume;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

//  Topic模式
@Component
public class TopicConsume {

    // 使用queuesToDeclare声明队列并从这个队列中消费消息
    @RabbitListener(
            bindings = {
                    @QueueBinding(
                            value = @Queue,
                            exchange = @Exchange(value = "boot-topic-exchange", type = "topic"),
                            key = {"*.red.*"}
                    )
            }
    )
    public void receive1(String msg) {
        Consume consume = JSON.parseObject(msg, Consume.class);
        System.out.println(consume.getName());
        System.out.println("消费者1接收到的消息是:" + consume);
        System.out.println("进行逻辑操作");
    }

    @RabbitListener(
            bindings = {
                    @QueueBinding(
                            value = @Queue,
                            exchange = @Exchange(value = "boot-topic-exchange", type = "topic"),
                            key = {"*.red.*", "black.*.#"}
                    )
            }
    )
    public void receive2(String msg) {
        Consume consume = JSON.parseObject(msg, Consume.class);
        System.out.println(consume.getName());
        System.out.println("消费者2接收到的消息是:" + consume);
        System.out.println("进行逻辑操作");
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
三.保证RabittMQ消息可靠性
3.1 springboot 实现rabbitmq消息的可靠性

编写application.yml配置文件 开启Confirm 和 Return 机制

spring:
  rabbitmq:
    host: 118.126.91.205
    port: 5672
    username: weikai
    password: ******
    virtual-host: /
    listener:
      simple:
        acknowledge-mode: manual    #  手动指定 ack
    publisher-confirm-type: simple    #  确认
    publisher-returns: true           #  消息可靠性
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

指定RabbitTemplate 对象 开启 Confirm 和 Return 并编写回调方法
生产者 消费者 没有什么变化

package com.weige.javaskillpoint.config;

import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

@Component
public class MqConfirmAndReturnConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnsCallback(this);
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        System.out.println("消息唯一标识" + correlationData);
        System.out.println("确认结果" + ack);
        System.out.println("失败原因" + cause);
        if (ack) {
            System.out.println("消息已经送到了exchange");
        } else {
            System.out.println("消息没有送到exchange");
        }
    }

    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
        // 如果执行了这个方法  说明消息没有送到 queue 中
        System.out.println("消息没有送到 queue ");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40

结果演示 随便写一个队列123456 该队列不存在消费者

package com.weige.javaskillpoint;

import com.alibaba.fastjson.JSON;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.util.HashMap;
import java.util.Map;

@SpringBootTest
class JavaSkillPointApplicationTests {

    // 注入rabbitTemplate
    @Autowired
    RabbitTemplate rabbitTemplate;

    // Hello World模式
    @Test
    void HelloWorld() {
        Map<Object, Object> map = new HashMap<>();
        map.put("mode", "SpringBoot整合MQ发送的消息,模式为Hello World");
        map.put("mode_description", "点对点模式,生产者将消息发送到默认交换机后分到队列,队列只有一个消费者");
        map.put("name", "刘德华");
        map.put("sex", "男");
        map.put("content", "用户注册成功,异步发送给用户发送短信");
        // 这里的123456是随便写的 不存在123456这个队列
        rabbitTemplate.convertAndSend("123456", JSON.toJSONString(map));
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

![在这里插入图片描述](https://img-blog.csdnimg.cn/8b4c7f434aed46809080f60f4b90ebc6.png

3.2 避免消息的重复消费

重复消费消息,会对非幂等行操作造成问题
重复消费消息的原因是,消费者没有给RabbitMQ一个ack
为了解决消息重复消费的问题,可以采用Redis,在消费者消费消息之前,现将消息的id放到Redis中,
id-0(正在执行业务)
id-1(执行业务成功)
如果ack失败,在RabbitMQ将消息交给其他的消费者时,先执行setnx,如果key已经存在,获取他的值,如果是0,当前消费者就什么都不做,如果是1,直接ack。

导入pom.xml依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
  • 1
  • 2
  • 3
  • 4

编写配置文件

spring:
  redis:
    host: 118.126.91.205
    port: 6379
  • 1
  • 2
  • 3
  • 4

修改生产者

package com.weige.javaskillpoint;

import com.alibaba.fastjson.JSON;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.util.HashMap;
import java.util.Map;

@SpringBootTest
class JavaSkillPointApplicationTests {

    // 注入rabbitTemplate
    @Autowired
    RabbitTemplate rabbitTemplate;

    // topic模式 避免消息重复消费
    @Test
    void topic() {
        Map<Object, Object> map = new HashMap<>();
        map.put("mode", "SpringBoot整合MQ发送的消息,模式为route");
        map.put("mode_description", "routing模式,在不同情况希望有不同的消费者来收到消息,路由模式可以让我们自定义规则");
        map.put("name", "周润发");
        map.put("sex", "男");
        map.put("content", "用户注册成功,异步发送给用户发送短信");
        // 指定交换机
         rabbitTemplate.convertAndSend("boot-topic-exchange", "black.dog.and.cat", JSON.toJSONString(map),new CorrelationData());
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

修改消费者

package com.weige.javaskillpoint.ack;

import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.Channel;
import com.weige.javaskillpoint.entity.Consume;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.io.IOException;
import java.util.concurrent.TimeUnit;

//  Topic模式
@Component
public class TopicAckConsume {

    @Resource
    private StringRedisTemplate redisTemplate;

    // 使用queuesToDeclare声明队列并从这个队列中消费消息
    @RabbitListener(bindings = {@QueueBinding(value = @Queue, exchange = @Exchange(value = "boot-topic-exchange", type = "topic"), key = {"*.red.*", "black.*.#"})})
    public void receive1(String msg, Channel channel, Message message) throws IOException {
        //0. 获取MessageId
        String messageId = message.getMessageProperties().getHeader("spring_returned_message_correlation");
        //1. 设置key到Redis
        if (redisTemplate.opsForValue().setIfAbsent(messageId, "0", 10, TimeUnit.SECONDS)) {
            //2. 消费消息
            Consume consume = JSON.parseObject(msg, Consume.class);
            System.out.println(consume.getName());
            System.out.println("消费者1接收到的消息是:" + consume);
            System.out.println("进行逻辑操作");
            //3. 设置key的value为1
            redisTemplate.opsForValue().set(messageId, "1", 10, TimeUnit.SECONDS);
            //4.  手动ack
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } else {
            //5. 获取Redis中的value即可 如果是1,手动ack
            if ("1".equalsIgnoreCase(redisTemplate.opsForValue().get(messageId))) {
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            }
        }

    }

    @RabbitListener(bindings = {@QueueBinding(value = @Queue, exchange = @Exchange(value = "boot-topic-exchange", type = "topic"), key = {"*.red.*", "black.*.#"})})
    public void receive2(String msg) {
        Consume consume = JSON.parseObject(msg, Consume.class);
        System.out.println(consume.getName());
        System.out.println("消费者2接收到的消息是:" + consume);
        System.out.println("进行逻辑操作");
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
四.高级特征
4.1 TTL

介绍

Time to live 过期时间 设置消息的过期时间 有两种方式
指定一条消息的过期时间
给队列设置消息过期时间,队列中所有的消息都有同样的过期时间
应用场景:比如 下单未支付 则订单自动删除的实现

设置消息的过期时间

发送一条设置了过期时间的消息
细节: 过期时间 指的是 消息在 队列中的存活时间,所以 此时 为了看到效果 不用设置消费者监听队列 一直消费消息,如果 一直监听队列 消费消息的话 就看不到消息过期之后 消息从队列中消失的的效果了
所以 创建 交换机 创建 队列 别创建消费者

创建一个工具类 封装 交换机名字 队列名字 等 这些常量值

package com.weige.javaskillpoint.config;

public class RabbitMQCommonConfig {

    //ttl-direct-exchange 交换机
    public static final String TTL_DIRECT_EXCHANGE = "ttldirectExchange";
    //ttl-direct-queue 队列
    public static final String TTL_DIRECT_QUEUE = "ttldirectQueue";
    //ttl-direct-routingkey  路由key
    public static final String TTL_DIRECT_ROUTINGKEY = "ttl_direct_routingkey";

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

在消费者端 创建 交换机 队列 Binging 这些bean ,注意 不创建消费者 是为了看到消息的过期时间属性效果

package com.weige.javaskillpoint;

import com.alibaba.fastjson.JSON;
import com.weige.javaskillpoint.config.RabbitMQCommonConfig;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

@SpringBootTest
class JavaSkillPointApplicationTests {

    // 注入rabbitTemplate
    @Autowired
    RabbitTemplate rabbitTemplate;
 	// TTL 过期队列 第一种方式  设置队列的过期时间
    @Test
    public void ttl(){
        String msg = "测试设置了过期时间的消息"+new Date().toLocaleString();
        MessageProperties messageProperties=new MessageProperties();   // 消息属性对象
        messageProperties.setMessageId(UUID.randomUUID().toString());
        messageProperties.setExpiration("10000");  // 设置消息的过期时间为10秒
        Message message = new Message(msg.getBytes(),messageProperties);
        rabbitTemplate.send(RabbitMQCommonConfig.TTL_DIRECT_EXCHANGE,RabbitMQCommonConfig.TTL_DIRECT_ROUTINGKEY,message);
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

测试结果 发现消息在队列10 秒后 消失

在这里插入图片描述
在这里插入图片描述
第二种方式设置队列的过期时间

直接在队列上设置消息的过期时间 这样 队列中的消息过期时间也都跟 队列设置的过期时间相同
如果 消息也设置了过期时间 谁小谁优先级高

package com.weige.javaskillpoint.ttl;

import com.weige.javaskillpoint.config.RabbitMQCommonConfig;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

import static com.weige.javaskillpoint.config.RabbitMQCommonConfig.TTL_DIRECT_QUEUE;

@Configuration
public class TTLConfig {

    @Bean
    public DirectExchange ttlDirectExchange() {
        return new DirectExchange(RabbitMQCommonConfig.TTL_DIRECT_EXCHANGE);
    }

//    @Bean
//    public Queue ttlDirectQueue() {
//        return new Queue(TTL_DIRECT_QUEUE);
//    }

    @Bean
    public Queue ttlDirectQueue(){
        // 在队列上 设置  此队列中  消息的过期时间
        Map<String,Object> map=new HashMap<>();
        // 队列中 所有的消息的过期时间 为 20秒
        //map.put("x-message-ttl",20000);
        // 队列中 所有的消息的过期时间 为 5秒
        map.put("x-message-ttl",5000);
        //return new Queue(TTL_DIRECT_QUEUE,true,false,false);
        return new Queue(TTL_DIRECT_QUEUE,true,false,false,map);
    }


    @Bean
    public Binding ttlDirectBinding() {
        return BindingBuilder.bind(ttlDirectQueue()).to(ttlDirectExchange()).with(RabbitMQCommonConfig.TTL_DIRECT_ROUTINGKEY);
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47

说明: 如果同时指定了Message的TTL 和 Queue 的 TTL ,则优先较小的那一个
所以 最佳实践 是 在 队列上设置过期时间

TTL的延时队列存在一个问题,就是同一个队列里的消息延时时间最好一致,比如说队列里的延时时间都是1小时,千万不能队列里的消息延时时间乱七八糟多久的都有,这样的话先入队的消息如果延时时间过长会堵着后入队延时时间小的消息,导致后面的消息到时也无法变成死信转发出去,很坑!!!
举个栗子:延时队列里先后进入A,B,C三条消息,存活时间是3h,2h,1h,结果到了1小时C不会死,到了2hB不会死,到了3小时A死了,同时B,C也死了,意味着3h后A,B,C才能消费,很坑!!!

4.2 死信队列

在这里插入图片描述

队列中的消息可能会变成死信消息(dead-lettered),进而当以下几个事件任意一个发生时,消息将会被重新发送到一个交换机:
1,消息被消费者使用basic.reject或basic.nack方法并且requeue参数值设置为false的方式进行消息确认(negatively acknowledged)
2,消息由于消息有效期(per-message TTL)过期
3,消息由于队列超过其长度限制而被丢弃

注意,队列的有效期并不会导致其中的消息过期

创建一个工具类 封装 交换机名字 队列名字 等 这些常量值

package com.weige.javaskillpoint.config;

public class RabbitMQCommonConfig {

    //ttl-direct-exchange 交换机
    public static final String TTL_DIRECT_EXCHANGE = "ttldirectExchange";
    //ttl-direct-queue 队列
    public static final String TTL_DIRECT_QUEUE = "ttldirectQueue";
    //ttl-direct-routingkey  路由key
    public static final String TTL_DIRECT_ROUTINGKEY = "ttl_direct_routingkey";


    //dead-direct-exchange 交换机
    public static final String DEAD_DIRECT_EXCHANGE = "deaddirectExchange";
    //dead-direct-queue 队列
    public static final String DEAD_DIRECT_QUEUE = "deaddirectQueue";
    //dead-direct-routingkey  路由key
    public static final String DEAD_DIRECT_ROUTINGKEY = "dead_direct_routingkey";

    //direct-exchange 交换机
    public static final String DIRECT_EXCHANGE = "directExchange";
    //direct-queue 队列
    public static final String DIRECT_QUEUE = "directQueue";
    //direct-routingkey  路由key
    public static final String DIRECT_ROUTINGKEY = "direct_routingkey";

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

绑定延迟队列到死信队列 超时未消费则进入死信队列

package com.weige.javaskillpoint.deadletter;

import com.weige.javaskillpoint.config.RabbitMQCommonConfig;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

import static com.weige.javaskillpoint.config.RabbitMQCommonConfig.*;
import static com.weige.javaskillpoint.config.RabbitMQCommonConfig.DEAD_DIRECT_ROUTINGKEY;

//死信队列
@Configuration
public class DEADConfig {

    // 配置正常队列
    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange(RabbitMQCommonConfig.DIRECT_EXCHANGE);
    }

    // 配置死信队列
    @Bean
    public DirectExchange deadDirectExchange() {
        return new DirectExchange(RabbitMQCommonConfig.DEAD_DIRECT_EXCHANGE);
    }


    @Bean
    public Queue directQueue() {
        Map<String, Object> args = new HashMap<>(2);
        //声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", DEAD_DIRECT_EXCHANGE);
        //声明当前队列的死信路由 key
        args.put("x-dead-letter-routing-key", DEAD_DIRECT_ROUTINGKEY);
        return QueueBuilder.durable(DIRECT_QUEUE).withArguments(args).build();
    }

    @Bean
    public Queue deadDirectQueue() {
        return new Queue(DEAD_DIRECT_QUEUE,true);
    }

    @Bean
    public Binding directBinding() {
        return BindingBuilder.bind(directQueue()).to(directExchange()).with(RabbitMQCommonConfig.DIRECT_ROUTINGKEY);
    }

    @Bean
    public Binding deadDirectBinding() {
        return BindingBuilder.bind(deadDirectQueue()).to(deadDirectExchange()).with(RabbitMQCommonConfig.DEAD_DIRECT_ROUTINGKEY);
    }


}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57

生产者

package com.weige.javaskillpoint;

import com.alibaba.fastjson.JSON;
import com.weige.javaskillpoint.config.RabbitMQCommonConfig;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

@SpringBootTest
class JavaSkillPointApplicationTests {

    // 注入rabbitTemplate
    @Autowired
    RabbitTemplate rabbitTemplate;
    
 	 // 死信队列
    @Test
    public void dead() {
        String msg = "测试设置了过期时间的消息" + new Date().toLocaleString();
        MessageProperties messageProperties = new MessageProperties();   // 消息属性对象
        messageProperties.setMessageId(UUID.randomUUID().toString());
        messageProperties.setExpiration("3000");  // 设置消息的过期时间为3秒
        Message message = new Message(msg.getBytes(), messageProperties);
        // 发送的消息3秒钟未处理 放入死信队列中
        for (int i = 0; i < 4; i++) {
            rabbitTemplate.send(RabbitMQCommonConfig.DIRECT_EXCHANGE, RabbitMQCommonConfig.DIRECT_ROUTINGKEY, message);
        }
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39

消费者 这个地方我们可以作用于订单半个小时未消费 则自动取消订单
延迟队列半个小时 未消费 进入死信队列 死信队列进行消费 手动确认ack
若逻辑异常 则重新放入死信队列中

package com.weige.javaskillpoint.consume;

import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.Channel;
import com.weige.javaskillpoint.entity.Consume;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Component
public class DeadConsume {

    @RabbitListener(queues = "deaddirectQueue")
    public void listener(Message message, Channel channel) throws IOException {
        System.out.println("三十分钟没有处理订单,取消订单");
        // 进行确认
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        // 重新将当前消息放入死信队列
        // channel.basicNack(message.getMessageProperties().getDeliveryTag(),true,true);
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/一键难忘520/article/detail/871498
推荐阅读
相关标签
  

闽ICP备14008679号