当前位置:   article > 正文

死信队列详解

死信队列

什么是死信队列

消息队列中,执行异步任务时,通常是将消息生产者发布的消息存储在队列中,由消费者从队列中获取并处理这些消息。但是,在某些情况下,消息可能无法正常地被处理和消耗,例如:格式错误、设备故障等,这些未成功处理的消息就被称为“死信”。

为了避免这些未成功处理的消息导致程序异常或对系统造成影响,我们需要使用死信队列(Dead Letter Queue)。当我们设置死信队列后,所有无法成功处理的消息将被捕获并重定向到指定的死信交换机中。消费者可以从该交换机中读取并处理这些“死信”。

死信队列的优点

使用死信队列有以下优点:

  • 提高系统可靠性:避免因未处理的死信而导致程序异常,提高系统的可靠性。
  • 实现延迟消息:可以通过设置TTL时间,将超时未消费的消息转移到死信队列中,实现延迟消息的功能。
  • 防止滥用:当某些生产者恶意发送低质量的消息或进行滥用时,可以通过丢弃或重定向死信消息来防止滥用和恶意攻击。

死信队列的应用场景

死信队列在以下场景下是非常有用的:

  • 消息格式错误:当消息格式错误时,可能会导致消费者无法正确地解析或处理该消息,这个问题通常与生产者的代码有关。为了避免消息失效,并提高系统可靠性,我们可以使用死信队列。
  • 消费者故障:另一个常见的场景是消息处理者无法正确地处理或响应到推入到队列中的消息,例如消费者创建了一个协程并在逻辑执行完成后未正确地关闭该协程。由于该协程始终处于打开状态,它将一直阻止该消费者对其他消息进行正确消费。为了避免这种消息挂起并影响其他消息的正常处理,可以将其加入死信中心。

死信队列的实现方式

下面通过RabbitMQ和Spring Boot,演示如何实现死信队列。

RabbitMQ实现

创建交换机和队列
import pika

def main():
    credentials = pika.PlainCredentials('guest', 'guest')
    parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials)

    # 创建死信交换机
    dlx_exchnage_name = 'my-dlx-exchange'
    with pika.BlockingConnection(parameters) as connection:
        channel = connection.channel()
        channel.exchange_declare(exchange=dlx_exchnage_name, exchange_type='fanout', durable=True)

    # 创建死信队列和交换机
    dead_letter_queue_name = 'dead-letter-queue'
    with pika.BlockingConnection(parameters) as connection:
        channel = connection.channel()
        channel.queue_declare(queue=dead_letter_queue_name, durable=True)
        channel.queue_bind(queue=dead_letter_queue_name, exchange=dlx_exchnage_name)

    # 创建消息队列,并将其绑定到死信队列上  
    queue_name = "job-queue"
    arguments = {"x-dead-letter-exchange": dlx_exchnage_name} 
    with pika.BlockingConnection(parameters) as connection:
        channel = connection.channel()
        channel.queue_declare(queue=queue_name, durable=True,
                              arguments=arguments)
        channel.queue_bind(exchange='', queue=queue_name, routing_key=queue_name)
                              
    print("Queue is created")
                              
if __name__ == '__main__':
    main()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

以上代码创建了两个队列,一个是my-dlx-exchange,一个是dead-letter-queue。同时创建另外一个名为job-queue的队列,它绑定了dead-letter-exchange这个交换机。

在发送消息时,需要提供一些属性来指定该队列应采取哪些步骤来防止该类丢失的消息。 这里我们可以使用x-dead-letter-exchangex-message-ttl两个特殊属性,告诉RabbitMQ,如果消息在某段时间内无法正确处理,则将其放入死信队列中。

发送和接收消息
import pika

def send_message():
    credentials = pika.PlainCredentials('guest', 'guest')
    parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials)
    connection = pika.BlockingConnection(parameters)
    channel = connection.channel()

    # 发送一个消息5秒后过期,并且未被消费端确认
    queue_name = "job-queue"
    properties = pika.BasicProperties(delivery_mode=2,
                                      expiration="5000")
    channel.basic_publish(exchange='', routing_key=queue_name, body='Hello World!', properties=properties)

    channel.close()
    connection.close()


def receive_message():
    credentials = pika.PlainCredentials('guest', 'guest')
    parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials)
    connection = pika.BlockingConnection(parameters)
    channel = connection.channel()

    dlx_exchnage_name = 'my-dlx-exchange'

    def callback(ch, method, properties, body):
        print("Receivedmessage: %r" % body)
        ch.basic_ack(delivery_tag=method.delivery_tag)

    # 消费来自job-queue的消息
    queue_name = "job-queue"
    channel.basic_consume(queue_name, callback)

    channel.start_consuming()


if __name__ == '__main__':
    send_message()
    receive_message()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40

以上代码是一个简单的生产者和消费者。send_message()函数发送一个消息,并且未被消费端确认;receive_message()函数从job-queue中接收消息。

Spring Boot实现

在Spring Boot中,我们可以使用RabbitMQ来实现死信队列。

添加依赖
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  • 1
  • 2
  • 3
  • 4
配置文件
spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
创建死信交换机、队列和绑定
@Configuration
public class RabbitConfig {

    // 死信交换机
    public static final String DLX_EXCHANGE_NAME = "my-dlx-exchange";

    // 死信队列
    public static final String DEAD_LETTER_QUEUE_NAME = "dead-letter-queue";

    // 业务处理队列
    public static final String PROCESS_QUEUE_NAME = "process-queue";

    @Bean
    public TopicExchange dlxExchange() {
        return new TopicExchange(DLX_EXCHANGE_NAME);
    }

    @Bean
    public Queue deadLetterQueue() {
        return QueueBuilder.durable(DEAD_LETTER_QUEUE_NAME)
                .withArgument("x-dead-letter-exchange", DLX_EXCHANGE_NAME)
                .build();
    }

    @Bean
    public Queue processQueue() {
        return QueueBuilder.durable(PROCESS_QUEUE_NAME)
                .withArgument("x-message-ttl", 5000)
                .withArgument("x-dead-letter-exchange", DLX_EXCHANGE_NAME)
                .build();
    }

    @Bean
    public Binding dlxBinding(Queue deadLetterQueue, TopicExchange dlxExchange) {
        return BindingBuilder.bind(deadLetterQueue).to(dlxExchange).with("#");
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

以上代码创建了my-dlx-exchange死信交换机和dead-letter-queue死信队列。同时创建一个process-queue业务处理队列,该队列设置了消息的生存时间为5s,并在该时间内未被消费者消费,则将该消息转移到死信队列中。

发送和接收消息
@Component
public class MessageSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(String message) {
        rabbitTemplate.convertAndSend("", RabbitConfig.PROCESS_QUEUE_NAME, message);
    }

}

@Component
public class MessageReceiver {

    @RabbitListener(queues = "process-queue")
    public void receive(String message) throws Exception {
        System.out.println("Received message: " + message);
        Thread.sleep(10000);
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

以上代码是一个简单的生产者和消费者。生产者使用RabbitTemplate来发送消息到process-queue队列中;消费者通过使用注解@RabbitListener监听process-queue队列的消息并进行处理。

结语

通过本篇文章,我们详细介绍了什么是死信队列、死信队列的优点、应用场景以及如何实现死信队列。通过RabbitMQ和Spring Boot的实现方式,不难看出死信队列在项目中的重要性和实际价值。在工程实践中,我们可以根据具体业务需求,结合技术选型,灵活运用死信队列来提高系统的可靠性和稳定性。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/569741
推荐阅读
相关标签
  

闽ICP备14008679号