当前位置:   article > 正文

【docker部署rabbitmq、使用DelayExchange插件实现延时消息发送】_rabbitmq 延时队列 docker部署

rabbitmq 延时队列 docker部署

认识RabbitMQ及AMQP协议

RabbitMQ

  • RabbitMQ是采用Erlang编写的、实现高级消息队列协议(Advanced Message Queuing Protocol,AMQP)规范的一种开源消息代理服务器
  • RabbitMQ客户端官方支持Erlang,Java,Ruby等,社区产出多种API,几乎支持所有语言
  • RabbitMQ非常轻量,运行核心功能以及诸如管理界面的插件只需要不到40MB内存,性能很好,单机吞吐量可达万级
  • 依托于Erlang天生支持并发的特点,RabbitMQ支持集群部署,支持镜像队列,允许水平扩展
  • 支持众多第三方插件,社区活跃,文档较多

AMQP协议

  • AMQP协议是一个具有现代特征的二进制协议。一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制
  • 整个AMQP协议的过程:由生产者发布消息到交换机,交换机再通过路由规则将消息发送到不同的队列中去存储,然后消费者从队列中监听拿走对应的消息来消费

部署Rabbitmq单节点

  • 拉取rabbitmq镜像
docker pull rabbitmq:management
  • 1
  • 准备配置文件
# 创建文件夹
mkdir /app/mq/data
# 编辑配置
vim /app/mq/rabbitmq.conf
# 配置内容
#开启默认账号
loopback_users.guest = true
#监听对外通信端口配置
listeners.tcp.default = 5672
#集群节点发现配置
cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config
#集群节点1(后续基于节点1扩展多节点)
cluster_formation.classic_config.nodes.1 = rabbit@rabbitmq1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 配置 cookie
vim /app/mq/.erlang.cookie

BVWDHYVHEZGVSKPDYUDK
**集群中各节点.erlang.cookie内容应一致**
# 修改cookie文件的权限
chmod 600 /caso/mq/.erlang.cookie
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 创建docker共享网络
docker network create mq-net
  • 1
  • 启动容器
docker run -d --hostname rabbitmq1 --restart unless-stopped --name rabbitmq1 --net mq-net -p 4369:4369 -p 25672:25672 -p 15672:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_USER=root -e RABBITMQ_DEFAULT_PASS=123456 -v /app/mq/data:/var/lib/rabbitmq -v /app/mq/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf -v /app/mq/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie rabbitmq:management

参数解释:
	 -d 容器后台运行
	--hostname rabbitmq1 容器的主机名为 rabbitmq1
	--restart=unless-stopped docker 容器重启后重启MQ
	--name rabbitmq1 容器名为rabbitma1,在宿主机上运行“docker ps”命令时显示的名称
	-p 4369:4369 EPMD(Erlang端口映射守护进程)端口号
	-p 25672:25672 节点间和CLI工具通信(Erlang分发服务器端口)
	-p 5672:5672 消息通讯端口
	-p 15672:15672 后台管理端口
	-e RABBITMQ_DEFAULT_USER=root 设置rabbitmq默认用户为root
	-e RABBITMQ_DEFAULT_PASS=123456 设置rabbitmq默认密码为123456
	-v /app/mq/data:/var/lib/rabbitmq 挂载/app/mq/data目录到容器
	-v /app/mq/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf 映射配置文件
	-v /app/mq/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie 映射cookie文件,也可以使用-e RABBITMQ_ERLANG_COOKIE='rabbitmq' 设置   rabbitmq的cookie为“rabbitmq”,可以自定义为其他文本,容器保持一致即可。因为新版本中RABBITMQ_ERLANG_COOKIE环境变量方式已被标记为弃用,在需要加载cookie的命令执行时会有警告,未来版本该方式会被移除,推荐映射文件方式部署
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 验证是否启动成功
    浏览器访问管理控制台 http://ip:15672/,输入账号、密码 登录
  • 安装延时消息插件
# 下载插件包
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
**注意插件版本要与rabbitmq版本对应**
mkdir /app/mq/mq-plugins
将插件上传到对应目录
# 拷贝插件到容器
docker cp /app/mq/mq-plugins/rabbitmq_delayed_message_exchange-3.9.0.ez rabbitmq1:/plugins
# 进入容器
docker exec -it rabbitmq1 /bin/bash
# 验证插件是否存在
cd plugins
ls |grep delay
# 启动延时消息插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • rabbitmq安装包中自带消息追踪日志插件,为了方便排查问题可以提前开启插件
# 启动消息追踪日志插件
rabbitmq-plugins enable rabbitmq_tracing
  • 1
  • 2
exit
docker restart rabbitmq1
  • 1
  • 2

rabbitmq集群单节点部署完成,在spring-boot项目中实现消息发送、延时消息消费及异常消息存储

  • 依赖
<!-- 消息中间件 rabbitmq -->
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
	<version>3.0.4</version>                
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • rabbitmq客户端配置
spring:
  rabbitmq:
    host: 192.168.0.210
    port: 5672
    username: root
    password: 123456
    # 消费者确认机制相关配置
    # 开启publisher-confirm,
    # 这里支持两种类型:simple:同步等待confirm结果,直到超时;# correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调ConfirmCallback
    publisher-confirm-type: correlated
    # publish-returns:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback
    publisher-returns: true
    # 定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息
    template:
      mandatory: true
    listener:
      simple:
        # 一次拉取的数量(默认值:250,强制要求顺序消费配置1)
        prefetch: 250
        # 消费端的监听个数(即@RabbitListener开启几个线程去处理数据。) 
        concurrency: 10
        # 消费端的监听最大个数 
        max-concurrency: 100 
        # 消息手动确认
        acknowledge-mode: manual
        # 消费者retry重试机制
        retry:
          enabled: true
          # 每次失败后等待时长的翻倍数
          multiplier: 1
          # 最大重试次数
          max-attempts: 3
          # 重试状态
          stateless: true
      direct:
        auto-startup: true
        acknowledge-mode: manual
        retry:
          enabled: true
          max-attempts: 3
      type: simple
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 配置延时消息生产者消费者交换机、队列、路由
@Configuration
public class MqProducerConsumerConfig {
    /**
     * 延迟交换机
     */
    public static final String DELAY_EXCHANGE_NAME = "member.delay.exchange";
    /**
     * 延迟队列
     */
    public static final String DELAY_QUEUE_NAME = "member.delay.queue";
    /**
     * 延迟队列路由key
     */
    public static final String DELAY_QUEUE_ROUTING_KEY = "delay.routingKey";

    /**
     * 声明延迟交换机
     *
     * @return CustomExchange 延迟交换机
     */
    @Bean
    public CustomExchange delayExchange() {
        HashMap<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange(DELAY_EXCHANGE_NAME, "x-delayed-message", true, false, args);
    }

    /**
     * 声明延迟队列
     *
     * @return Queue 延迟队列
     */
    @Bean
    public Queue delayQueue() {
        return new Queue(DELAY_QUEUE_NAME);
    }

    /**
     * 设置延迟队列的绑定关系
     *
     * @return Binding 延迟队列的绑定关系
     */
    @Bean
    public Binding delayBinding() {
        return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(DELAY_QUEUE_ROUTING_KEY).noargs();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 配置异常消息生产者交换机、队列、路由
@Configuration
public class MqErrorMsgConfig {
    /**
     * 交换机
     */
    public static final String EXCHANGE_NAME = "member.error.exchange";
    /**
     * 队列
     */
    public static final String QUEUE_NAME = "member.error.queue";
    /**
     * 队列路由key
     */
    public static final String QUEUE_ROUTING_KEY = "error";

    /**
     * 声明失败消息交换机
     *
     * @return 交换机
     */
    @Bean
    public DirectExchange errorExchange() {
        return new DirectExchange(EXCHANGE_NAME);
    }

    /**
     * 声明失败消息队列
     *
     * @return 消息队列
     */
    @Bean
    public Queue errorQueue() {
        // 设置队列持久化
        return new Queue(QUEUE_NAME, true);
    }

    /**
     * 绑定交换机、消息队列,指定对应的队列路由key
     *
     * @return 绑定
     */
    @Bean
    public Binding errorBind() {
        return BindingBuilder.bind(errorQueue()).to(errorExchange()).with(QUEUE_ROUTING_KEY);
    }

    /**
     * 实现MessageRecover接口的子类RepublishMessageBuilder
     *
     * @param rabbitTemplate rabbitTemplate
     * @return 异常消息
     */
    @Bean
    public MessageRecoverer republishMessageBuilder(RabbitTemplate rabbitTemplate) {
        return new RepublishMessageRecoverer(rabbitTemplate, EXCHANGE_NAME, QUEUE_ROUTING_KEY);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 封装消息发送组件
@Component
public class MqProducerMsgSendUtil {
    private static final Logger log = LoggerFactory.getLogger(MqProducerMsgSendUtil.class);
    @Resource
    private RabbitTemplate rabbitTemplate;

    public MqProducerMsgSendUtil() {
    }

    public void send(String delayExchangeName, String delayQueueRoutingKey, String msg, Integer delayTime) {
        log.info("发送mq消息:{},延时:{} ms接收", msg, delayTime);
        this.rabbitTemplate.convertAndSend(delayExchangeName, delayQueueRoutingKey, msg, (message) -> {
            message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
            message.getMessageProperties().setHeader("x-delay", delayTime);
            return message;
        });
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 延时消息消费业务
@Slf4j
@Component
public class RabbitMqMessageListener {

    @Autowired
    private MqProducerMsgSendUtil mqProducerMsgSendUtil;

    /**
     * rabbitmq延迟消息监听
     *
     * @param message 消息
     * @param c       信道
     * @param msg     消息内容
     */
    @RabbitListener(queues = DELAY_QUEUE_NAME)
    public void receiveMqDelayMsg(Message message, Channel c, String msg) throws Exception {
        MessageProperties properties = message.getMessageProperties();
        String receivedExchange = properties.getReceivedExchange();
        String consumerQueue = properties.getConsumerQueue();
        String routingKey = properties.getReceivedRoutingKey();
        log.info("收到 交换机:{} 队列:{} 路由键:{} 消息:{}", receivedExchange, consumerQueue, routingKey, msg);

        Boolean haveConsumer = false;

        // 是需要处理的消息
        if (msg.contains(RedisKeyConstants.PAY_ORDER_STATE)) {
            haveConsumer = true;
            log.info("执行xxx业务 >>> 消息:{}", msg);

            ...
            ...
            ...

            // 手动回执,签收
            long tag = properties.getDeliveryTag();
            c.basicAck(tag, false);
        }

        // 没有合适消费者,抛异常消息重发
        if (!haveConsumer) {
            long tag = properties.getDeliveryTag();
            // 应答,防止Unacked堆积
            c.basicAck(tag, false);
            log.error("交换机:{} 队列:{} 路由键:{} 消息:{} 没有正常消费,异常重试!", receivedExchange, consumerQueue, routingKey, msg);
            // 需要将异常抛出才能触发retry重试机制
            throw new Exception(MqExceptionEnum.MESSAGE_CONSUMPTION_FAILED.getMsg());
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 高可用方面
    • 集群部署
      将rabbitmq1作为节点1,在另外两台服务其中部署节点2、节点3
      配置、cookie与节点1相同
# 节点2
docker network create mq-net
docker run -d --hostname rabbitmq2 --add-host rabbitmq3:x.x.x.x --add-host rabbitmq1:x.x.x.x --restart unless-stopped --name rabbitmq2 --net mq-net -p 4369:4369 -p 25672:25672 -p 15672:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_USER=root -e RABBITMQ_DEFAULT_PASS=123456 -v /app/mq/data:/var/lib/rabbitmq -v /app/mq/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf -v /app/mq/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie rabbitmq:management

# 节点3
docker network create mq-net
docker run -d --hostname rabbitmq3 --add-host rabbitmq2:x.x.x.x --add-host rabbitmq1:x.x.x.x --restart unless-stopped --name rabbitmq3 --net mq-net -p 4369:4369 -p 25672:25672 -p 15672:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_USER=root -e RABBITMQ_DEFAULT_PASS=123456 -v /app/mq/data:/var/lib/rabbitmq -v /app/mq/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf -v /app/mq/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie rabbitmq:management

# 节点1 host增加节点2、节点3映射
docker exec -it rabbitmq1 /bin/bash
apt-get update
apt-get install vim
vim /etc/hosts
# 节点2
x.x.x.x   rabbitmq2
# 节点3
x.x.x.x   rabbitmq3
#重新加载节点;
#停止MQ
rabbitmqctl stop_app;
#resetMQ
rabbitmqctl reset;
#启动MQ
rabbitmqctl start_app;
#退出并重启容器
exit;
docker restart rabbitmq1;

# 节点2加入
#进入容器
docker exec -it rabbitmq2 bash;
#停止MQ
rabbitmqctl stop_app;
#resetMQ
rabbitmqctl reset;
#启动插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange;
#跟机器1的消息队列建立关系
rabbitmqctl join_cluster --ram rabbit@rabbitmq1;
#修改节点类型
rabbitmqctl change_cluster_node_type disc
#重新启动MQ
rabbitmqctl start_app;
#退出并重启容器
exit;
docker restart rabbitmq2;

# 节点3加入
#进入容器
docker exec -it rabbitmq3 bash;
#停止MQ
rabbitmqctl stop_app;
#resetMQ
rabbitmqctl reset;
#启动插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange;
#跟机器1的消息队列建立关系
rabbitmqctl join_cluster --ram rabbit@rabbitmq1;
#修改节点类型
rabbitmqctl change_cluster_node_type disc
#重新启动MQ
rabbitmqctl start_app;
#退出并重启容器
exit;
docker restart rabbitmq3;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65

三节点集群

  • 使用HAProxy/Nginx进行负载均衡,使用keepalived进行负载均衡中间件保活、路由
    keepalived原理是基于VRRP(虚拟路由选择协议)实现高可用性,即在多个服务器之间动态地分配IP地址和路由功能,可以在自建主机、虚拟机、支持VRRP的云服务器中部署,不支持虚拟IP的环境下部署无效!
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/401134?site
推荐阅读
相关标签
  

闽ICP备14008679号