赞
踩
死信(Dead Letter)是RabbitMQ中的一种消息机制,当你在消费消息时,如果队列里的消息出现以下情况:
1)消息被拒绝
2)消息在队列的存活时间超过设置的TTL时间。
3)消息队列的消息数量已经超过最大队列长度。
那么该消息将成为“死信”。
“死信”消息会被RabbitMQ进行特殊处理,如果配置了死信队列信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃。
RabbitMQ 中有一种交换器叫 DLX,全称为 Dead-Letter-Exchange,可以称之为死信交换器。当消息在一个队列中变成死信(dead message)之后,它会被重新发送到另外一个交换器中,这个交换器就是 DLX,绑定在 DLX 上的队列就称之为死信队列
消息变成死信有以下几种情况
1)消息被拒绝
2)消息TTL过期 (延迟队列)
3)队列达到最大长度
延时队列就是用来存放需要在指定时间被处理的元素的队列
应用场景
1)订单在十分钟之内未支付则自动取消
2)账单在一周内未支付,则自动结算
3)用户注册成功后,如果三天内没有登陆则进行短信提醒
4)用户发起退款,如果三天内没有得到处理则通知相关运营人员
消息发送时需要带延迟时间
等待20秒未进行消费,数据进入到进入死信队列中
生产端
执行生产端如果报错可以先将生成的队列进行删除后运行
<?php require_once "../vendor/autoload.php"; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Wire\AMQPTable; //建立connction $connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order'); //Channel $channel = $connection->channel(); //声明交换器 $exc_name ='exc_pay'; //指定routing_key $routing_key = 'route_pay'; //声明队列名称 $queue_name = 'queue_pay'; //设置延迟时间20s过期 $ttl = 20000; //死信队列 $dead_exc_name = 'dead_exc_pay'; //死信路由key $dead_routing_key = 'dead_route_pay'; //声明死信队列名称 $dead_queue_name = 'dead_queue_pay'; //指定交换机类型为direct $channel->exchange_declare($exc_name, 'direct', false, false, false); //设置队列中数据存活时间、死信队列、死信路由key $args = new AMQPTable(['x-message-ttl' => $ttl, 'x-dead-letter-exchange' => $dead_exc_name, 'x-dead-letter-routing-key' => $dead_routing_key]); //设置回调有效期 $channel->queue_declare($queue_name, false, true, false, false, false, $args); //绑定 $channel->queue_bind($queue_name, $exc_name, $routing_key); //声明死信交换器队列 $channel->exchange_declare($dead_exc_name, 'direct', false, false, false); $channel->queue_declare($dead_queue_name, false, true, false, false); $channel->queue_bind($dead_queue_name, $dead_exc_name, $dead_routing_key); //声明数据 $data = 'this is dead message'; //创建消息 $msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]); //发布消息 //指定使用的routing_key $channel->basic_publish($msg, $exc_name, $routing_key); //关闭连接 $channel->close(); $connection->close();
消费端
<?php require_once "../vendor/autoload.php"; use PhpAmqpLib\Connection\AMQPStreamConnection; //建立connction $connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order'); //Channel $channel = $connection->channel(); //死信队列 $dead_exc_name = 'dead_exc_pay'; //死信路由key $dead_routing_key = 'dead_route_pay'; //声明死信队列名称 $dead_queue_name = 'dead_queue_pay'; $channel->exchange_declare($dead_exc_name, 'direct', false, false, false); //将队列名与交换器名进行绑定,并指定routing_key $channel->queue_bind($dead_queue_name, $dead_exc_name, $dead_routing_key); $callback = function ($msg) { echo 'received = ', $msg->body . "\n"; //确认消息已被消费,从生产队列中移除 $msg->ack(); }; //设置消费成功后才能继续进行下一个消费 $channel->basic_qos(null, 1, null); //开启消费no_ack=false,设置为手动应答 $channel->basic_consume($dead_queue_name, '', false, false, false, false, $callback); //不断的循环进行消费 while ($channel->is_open()) { $channel->wait(); } //关闭连接 $channel->close(); $connection->close();
rabbitmq 3.6版本以后有插件
# 下载插件,注意要下载与rabbit相对应的版本,此处下载3.8.x
http://www.rabbitmq.com/community-plugins.html
# 找到指定的版本
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/tags
下载后直接放入plugins下
# 上传到
/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.19/plugins/
# 查看插件列表
rabbitmq-plugins list
# 启动指定列表中的插件rabbitmq_delayed_message_exchange
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
# 查看插件启动情况
rabbitmq-plugins list
手动新增交换器
生产端代码:
<?php require_once "../vendor/autoload.php"; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Wire\AMQPTable; //建立connction $connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order'); //Channel $channel = $connection->channel(); //声明交换器 $exc_name = 'delay_exc_pay'; //指定routing_key $routing_key = 'delay_route_pay'; //声明队列名称 $queue_name = 'delay_queue_pay'; //设置延迟时间20s过期 $ttl = 20000; //指定交换机类型为direct $channel->exchange_declare($exc_name, 'x-delayed-message', false, true, false); $args = new AMQPTable(['x-delayed-type' => 'direct']); $channel->queue_declare($queue_name, false, true, false, false, false, $args); //声明数据 $data = 'this is dead message'; //绑定 $channel->queue_bind($queue_name, $exc_name, $routing_key); //创建消息 $arr = ['delivery_mode' => AMQPMEssage::DELIVERY_MODE_PERSISTENT, 'application_headers' => new AMQPTable(['x-delay' => $ttl])]; $msg = new AMQPMessage($data, $arr); //发布消息 //指定使用的routing_key $channel->basic_publish($msg, $exc_name, $routing_key); //关闭连接 $channel->close(); $connection->close();
消费端代码
<?php require_once "../vendor/autoload.php"; use PhpAmqpLib\Connection\AMQPStreamConnection; //建立connction $connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order'); //Channel $channel = $connection->channel(); //声明交换器 $exc_name = 'delay_exc_pay'; //指定routing_key $routing_key = 'delay_route_pay'; //声明队列名称 $queue_name = 'delay_queue_pay'; //指定交换机类型为direct $channel->exchange_declare($exc_name, 'x-delayed-message', false, true, false); //将队列名与交换器名进行绑定,并指定routing_key $channel->queue_bind($queue_name, $exc_name, $routing_key); $callback = function ($msg) { echo 'received = ', $msg->body . "\n"; //确认消息已被消费,从生产队列中移除 $msg->ack(); }; //设置消费成功后才能继续进行下一个消费 $channel->basic_qos(null, 1, null); //开启消费no_ack=false,设置为手动应答 $channel->basic_consume($queue_name, '', false, false, false, false, $callback); //不断的循环进行消费 while ($channel->is_open()) { $channel->wait(); } //关闭连接 $channel->close(); $connection->close();
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。