当前位置:   article > 正文

rabbitmq死信队列以及延迟_channel.queue_declare(queue='dead_letter_queue')

channel.queue_declare(queue='dead_letter_queue')

死信队列介绍

死信(Dead Letter)是RabbitMQ中的一种消息机制,当你在消费消息时,如果队列里的消息出现以下情况:

1)消息被拒绝
2)消息在队列的存活时间超过设置的TTL时间。
3)消息队列的消息数量已经超过最大队列长度。

那么该消息将成为“死信”。

“死信”消息会被RabbitMQ进行特殊处理,如果配置了死信队列信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃。

RabbitMQ 中有一种交换器叫 DLX,全称为 Dead-Letter-Exchange,可以称之为死信交换器。当消息在一个队列中变成死信(dead message)之后,它会被重新发送到另外一个交换器中,这个交换器就是 DLX,绑定在 DLX 上的队列就称之为死信队列

消息变成死信有以下几种情况

1)消息被拒绝
2)消息TTL过期 (延迟队列)
3)队列达到最大长度

在这里插入图片描述

延迟队列

延时队列就是用来存放需要在指定时间被处理的元素的队列

应用场景

1)订单在十分钟之内未支付则自动取消
2)账单在一周内未支付,则自动结算
3)用户注册成功后,如果三天内没有登陆则进行短信提醒
4)用户发起退款,如果三天内没有得到处理则通知相关运营人员

延迟队列实现

消息发送时需要带延迟时间

在这里插入图片描述

等待20秒未进行消费,数据进入到进入死信队列中
在这里插入图片描述
生产端
执行生产端如果报错可以先将生成的队列进行删除后运行

<?php

require_once "../vendor/autoload.php";

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

//建立connction
$connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order');
//Channel
$channel = $connection->channel();
//声明交换器
$exc_name ='exc_pay';
//指定routing_key
$routing_key = 'route_pay';
//声明队列名称
$queue_name = 'queue_pay';
//设置延迟时间20s过期
$ttl = 20000;

//死信队列
$dead_exc_name = 'dead_exc_pay';
//死信路由key
$dead_routing_key = 'dead_route_pay';
//声明死信队列名称
$dead_queue_name = 'dead_queue_pay';

//指定交换机类型为direct
$channel->exchange_declare($exc_name, 'direct', false, false, false);

//设置队列中数据存活时间、死信队列、死信路由key
$args = new AMQPTable(['x-message-ttl' => $ttl, 'x-dead-letter-exchange' => $dead_exc_name, 'x-dead-letter-routing-key' => $dead_routing_key]);
//设置回调有效期
$channel->queue_declare($queue_name, false, true, false, false, false, $args);
//绑定
$channel->queue_bind($queue_name, $exc_name, $routing_key);

//声明死信交换器队列
$channel->exchange_declare($dead_exc_name, 'direct', false, false, false);
$channel->queue_declare($dead_queue_name, false, true, false, false);
$channel->queue_bind($dead_queue_name, $dead_exc_name, $dead_routing_key);

//声明数据
$data = 'this is dead message';
//创建消息
$msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]);

//发布消息
//指定使用的routing_key
$channel->basic_publish($msg, $exc_name, $routing_key);
//关闭连接
$channel->close();
$connection->close();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54

消费端

<?php
require_once "../vendor/autoload.php";

use PhpAmqpLib\Connection\AMQPStreamConnection;

//建立connction
$connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order');
//Channel
$channel = $connection->channel();

//死信队列
$dead_exc_name = 'dead_exc_pay';
//死信路由key
$dead_routing_key = 'dead_route_pay';
//声明死信队列名称
$dead_queue_name = 'dead_queue_pay';

$channel->exchange_declare($dead_exc_name, 'direct', false, false, false);

//将队列名与交换器名进行绑定,并指定routing_key
$channel->queue_bind($dead_queue_name, $dead_exc_name, $dead_routing_key);

$callback = function ($msg) {
    echo 'received = ', $msg->body . "\n";
    //确认消息已被消费,从生产队列中移除
    $msg->ack();
};

//设置消费成功后才能继续进行下一个消费
$channel->basic_qos(null, 1, null);

//开启消费no_ack=false,设置为手动应答
$channel->basic_consume($dead_queue_name, '', false, false, false, false, $callback);

//不断的循环进行消费
while ($channel->is_open()) {
    $channel->wait();
}

//关闭连接
$channel->close();
$connection->close();

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43

rabbitmq延迟插件安装

rabbitmq 3.6版本以后有插件

# 下载插件,注意要下载与rabbit相对应的版本,此处下载3.8.x
http://www.rabbitmq.com/community-plugins.html

# 找到指定的版本
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/tags
  • 1
  • 2
  • 3
  • 4
  • 5

在这里插入图片描述
在这里插入图片描述
下载后直接放入plugins下
在这里插入图片描述


# 上传到
/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.19/plugins/
# 查看插件列表
rabbitmq-plugins list

# 启动指定列表中的插件rabbitmq_delayed_message_exchange
rabbitmq-plugins enable  rabbitmq_delayed_message_exchange
# 查看插件启动情况
rabbitmq-plugins list
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

在这里插入图片描述
在这里插入图片描述

延迟插件实现延迟队列功能

手动新增交换器
在这里插入图片描述
生产端代码:

<?php

require_once "../vendor/autoload.php";

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

//建立connction
$connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order');
//Channel
$channel = $connection->channel();
//声明交换器
$exc_name = 'delay_exc_pay';
//指定routing_key
$routing_key = 'delay_route_pay';
//声明队列名称
$queue_name = 'delay_queue_pay';
//设置延迟时间20s过期
$ttl = 20000;

//指定交换机类型为direct
$channel->exchange_declare($exc_name, 'x-delayed-message', false, true, false);
$args = new AMQPTable(['x-delayed-type' => 'direct']);

$channel->queue_declare($queue_name, false, true, false, false, false, $args);

//声明数据
$data = 'this is dead message';

//绑定
$channel->queue_bind($queue_name, $exc_name, $routing_key);

//创建消息
$arr = ['delivery_mode' => AMQPMEssage::DELIVERY_MODE_PERSISTENT, 'application_headers' => new AMQPTable(['x-delay' => $ttl])];

$msg = new AMQPMessage($data, $arr);

//发布消息
//指定使用的routing_key
$channel->basic_publish($msg, $exc_name, $routing_key);
//关闭连接
$channel->close();
$connection->close();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44

消费端代码

<?php
require_once "../vendor/autoload.php";

use PhpAmqpLib\Connection\AMQPStreamConnection;

//建立connction
$connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order');
//Channel
$channel = $connection->channel();

//声明交换器
$exc_name = 'delay_exc_pay';
//指定routing_key
$routing_key = 'delay_route_pay';
//声明队列名称
$queue_name = 'delay_queue_pay';

//指定交换机类型为direct
$channel->exchange_declare($exc_name, 'x-delayed-message', false, true, false);

//将队列名与交换器名进行绑定,并指定routing_key
$channel->queue_bind($queue_name, $exc_name, $routing_key);

$callback = function ($msg) {
    echo 'received = ', $msg->body . "\n";
    //确认消息已被消费,从生产队列中移除
    $msg->ack();
};

//设置消费成功后才能继续进行下一个消费
$channel->basic_qos(null, 1, null);

//开启消费no_ack=false,设置为手动应答
$channel->basic_consume($queue_name, '', false, false, false, false, $callback);

//不断的循环进行消费
while ($channel->is_open()) {
    $channel->wait();
}

//关闭连接
$channel->close();
$connection->close();

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/509792
推荐阅读
相关标签
  

闽ICP备14008679号