当前位置:   article > 正文

延迟队列_delayqueue 弊端

delayqueue 弊端

1. 场景:“订单下单成功后,15分钟未支付自动取消”

1.传统处理超时订单
采取定时任务轮训数据库订单,并且批量处理。其弊端也是显而易见的;对服务器、数据库性会有很大的要求,
并且当处理大量订单起来会很力不从心,而且实时性也不是特别好。当然传统的手法还可以再优化一下,
即存入订单的时候就算出订单的过期时间插入数据库,设置定时任务查询数据库的时候就只需要查询过期了的订单,
然后再做其他的业务操作

2.rabbitMQ延时队列方案
一台普通的rabbitmq服务器单队列容纳千万级别的消息还是没什么压力的,而且rabbitmq集群扩展支持的也是非常好的,
并且队列中的消息是可以进行持久化,即使我们重启或者宕机也能保证数据不丢失

2. TTL和DLX

rabbitMQ中是没有延时队列的,也没有属性可以设置,只能通过死信交换器(DLX)和设置过期时间(TTL)结合起来实现延迟队列

1.TTL
TTL是Time To Live的缩写, 也就是生存时间。
RabbitMq支持对消息和队列设置TTL,对消息这设置是在发送的时候指定,对队列设置是从消息入队列开始计算, 只要超过了队列的超时时间配置, 那么消息会自动清除。
如果两种方式一起使用消息对TTL和队列的TTL之间较小的为准,也就是消息5s过期,队列是10s,那么5s的生效。
默认是没有过期时间的,表示消息没有过期时间;如果设置为0,表示消息在投递到消费者的时候直接被消费,否则丢弃。

 设置消息的过期时间用 x-message-ttl 参数实现,单位毫秒。
 设置队列的过期时间用 x-expires 参数,单位毫秒,注意,不能设置为0。
  • 1
  • 2

2.DLX和死信队列
DLX即Dead-Letter-Exchange(死信交换机),它其实就是一个正常的交换机,能够与任何队列绑定。

 死信队列是指队列(正常)上的消息(过期)变成死信后,能够后发送到另外一个交换机(DLX),然后被路由到一个队列上,
 这个队列,就是死信队列

 成为死信一般有以下几种情况:
 消息被拒绝(basic.reject or basic.nack)且带requeue=false参数
 消息的TTL-存活时间已经过期
 队列长度限制被超越(队列满)

 
 注1:如果队列上存在死信, RabbitMq会将死信消息投递到设置的DLX上去 ,
 注2:通过在队列里设置x-dead-letter-exchange参数来声明DLX,如果当前DLX是direct类型还要声明
      x-dead-letter-routing-key参数来指定路由键,如果没有指定,则使用原队列的路由键
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

3. 延迟队列
通过DLX和TTL模拟出延迟队列的功能,即,消息发送以后,不让消费者拿到,而是等待过期时间,变成死信后,发送给死信交换机再路由到死信队列进行消费
在这里插入图片描述

4. 开发步骤
1.生产者创建一个正常消息,并添加消息过期时间/死信交换机/死信路由键这3个参数
QueueDelayConfig

package com.lyl.rabbitmqprovider.rabbitmq;

import org.omg.CORBA.PUBLIC_MEMBER;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * @authorlyl
 * @site
 * @company
 * @create  2019-12-25 15:26
 */
@Configuration
public class QueueDelayConfig {
    /**
     * 定义正常的队列、交换机、路由键
     */
    public static final String NORMAL_QUEUE="normal-queue";
    public static final String NORMAL_EXCHANGE="normal-exchange";
    public static final String NORMAL_ROUTINGKEY="normal_routingkey";


    /**
     * 定义死信的队列、交换机、路由键
     */
    public static final String DELAY_QUEUE="delay-queue";
    public static final String DELAY_EXCHANGE="delay-exchange";
    public static final String DELAY_ROUTINGKEY="delay-routingkey";

    /**
     * 定义正常队列
     * @return
     */
    @Bean
    public Queue normalQueue(){
//        设置消息过期时间/死信交换机/死信路由键这3个参数
        Map<String,Object> map = new HashMap<String,Object>();
        map.put("x-message-ttl", 15000);//message在该队列queue的存活时间最大为10秒
        map.put("x-dead-letter-exchange", DELAY_EXCHANGE); //x-dead-letter-exchange参数是设置该队列的死信交换器(DLX)
        map.put("x-dead-letter-routing-key", DELAY_ROUTINGKEY);//x-dead-letter-routing-key参数是给这个DLX指定路由键
        return new Queue(NORMAL_QUEUE, true, false, false, map);
    }

    @Bean
    public DirectExchange normalExchange(){
        return new DirectExchange(NORMAL_EXCHANGE,true,false);
    }

    @Bean
    public Binding normalRoutingKey(){
        return BindingBuilder.bind(normalQueue())
                .to(normalExchange())
                .with(NORMAL_ROUTINGKEY);
    }

    @Bean
    public Queue delayQueue(){
        return new Queue(DELAY_QUEUE,true);
    }

    @Bean
    public DirectExchange delayExchange(){
        return new DirectExchange(DELAY_EXCHANGE);
    }

    @Bean
    public Binding delayRoutingKey(){
        return BindingBuilder.bind(delayQueue())
                .to(delayExchange())
                .with(DELAY_ROUTINGKEY);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79

2.消费者A
正常情况下,由消费者A去消费队列“normal-queue”中的消息,但实际上没有,而是等消息过期
在这里插入图片描述
3.消费者B
消息过期后,变成死信,根据配置会被投递到DLX,然后根据死信路由键投到死信队列(即延时队列)中
QueueRecevier

package com.lyl.rabbitmqconsumer.controller;

import com.lyl.commonvo.vo.OrderVo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;

/**
 * @authorlyl
 * @site
 * @company
 * @create  2019-12-25 16:53
 */
@Component
@Slf4j
@RabbitListener(queues = {"delay-queue"})
public class QueueRecevier {

    @RabbitHandler
    public void handlerMessage(OrderVo orderVo){
        log.info("QueueRecevier.handlerMessage,data={}",orderVo);
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

在这里插入图片描述

5. 子模块间共享Model
1.创建公共子模块common
添加公共的JavaBean对象,并使用lombok简化代码

	 @Data:会为类的所有属性自动生成setter/getter、equals、canEqual、hashCode、toString方法
     @NoArgsConstructor:无参构造器
     @AllArgsConstructor:全参构造器
  • 1
  • 2
  • 3

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.9.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.lyl</groupId>
    <artifactId>commonvo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>commonvo</name>
    <description>Demo project for Spring Boot</description>
    <packaging>jar</packaging>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.10</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51

2.主模块

     <!-- 添加子模块 -->
     <modules>
        <module>rabbitmq-provider</module>
        <module>rabbitmq-consumer</module>
        <module>common</module>
     </modules>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

在这里插入图片描述

3.各子模块

	<!-- 1.packaging模式改为jar --> 
     <packaging>jar</packaging>
  • 1
  • 2

在这里插入图片描述
4.配置公共common模块

在主模块的POM的中添加公共子模块common

  <dependencies>
     <!--添加子模块common-->
     <dependency>
       <groupId>com.lyl</groupId>
       <artifactId>common</artifactId>
       <version>0.0.1-SNAPSHOT</version>
     </dependency>
     ...
   </dependencies>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

6. json转换
1.生产者
SendController

package com.lyl.rabbitmqprovider.controller;

import com.lyl.commonvo.vo.OrderVo;
import com.lyl.rabbitmqprovider.rabbitmq.QueueDelayConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;

/**
 * @authorlyl
 * @site
 * @company
 * @create  2019-12-25 16:12
 */
@RestController
@Slf4j
public class SendController {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RequestMapping("/sender")
    public Map<String,Object> sender(){
//        Map<String,Object> data = this.createData();

        OrderVo orderVo = new OrderVo();
        orderVo.setOrderId(1);
        orderVo.setOrderNo("P001");

        rabbitTemplate.convertAndSend(QueueDelayConfig.NORMAL_EXCHANGE,
                QueueDelayConfig.NORMAL_ROUTINGKEY,orderVo);

        Map<String,Object> result=new HashMap<String,Object>();
        result.put("msg","ok");
        result.put("code","1");
        return result;
    }

    private Map<String,Object> createData(){
        Map<String,Object> map = new HashMap<String,Object>();

        String date=LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));

        map.put("msg","hello,rabbitmq");
        map.put("success",true);
        map.put("createdate",date);

        return map;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56

QueueProviderMessageConvert

package com.lyl.rabbitmqprovider.rabbitmq;

import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @authorlyl
 * @site
 * @company
 * @create  2019-12-26 16:28
 */
@Configuration
public class QueueProviderMessageConvert {
    @Bean
    public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory, Jackson2JsonMessageConverter jackson2JsonMessageConverter) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter);//指定json转换器
        return rabbitTemplate;
    }

    @Bean
    public Jackson2JsonMessageConverter jackson2JsonMessageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

2.消费者
QueueRecevierMessageConvert

package com.lyl.rabbitmqconsumer.rabbitmq;

import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @authorlyl
 * @site
 * @company
 * @create  2019-12-26 16:42
 */
@Configuration
public class QueueRecevierMessageConvert {
    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory, Jackson2JsonMessageConverter jackson2JsonMessageConverter) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(jackson2JsonMessageConverter);
        return factory;
    }

    @Bean
    public Jackson2JsonMessageConverter jackson2JsonMessageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/2023面试高手/article/detail/689500
推荐阅读
相关标签
  

闽ICP备14008679号