赞
踩
先决条件
本教程假设 RabbitMQ 已安装并且正在
本地主机
的标准端口(5672
)上运行。如果您使用了不同的主机、端口或凭证,则要求调整连接设置。获取帮助
如果您在阅读本教程时遇到问题,可以通过邮件列表或者 RabbitMQ 社区 Slack 与 RabbitMQ 官方取得联系。
在教程一中,我们编写程序以从命名队列中发送和接收消息。在本教程中我们将会创建一个 工作队列(Work Queue)用于在多个工作者(worker)之间分发耗时任务(竞争消费者模式)。
工作队列(又名:任务队列(Task Queue))背后的主要思想是避免立即执行资源密集型任务且必须等待它完成。相反,我们将任务安排在以后完成。我们将任务封装为消息并将其发送至队列中。在后台运行的工作者进程将取出任务并最终执行作业。当您运行多个工作者时,任务会在它们之间共享。
这个概念在 Web 应用程序中非常有用,因为在短的 HTTP 请求窗口期间不可能处理复杂的任务。
原文链接:https://www.rabbitmq.com/tutorials/tutorial-two-dotnet.html
在教程一中我们曾发送过包含 “Hello World!” 的消息。现在我们将会发送代表着复杂任务的字符串。我们没有现实世界中的任务(例如调整图像大小或者渲染 pdf 文件),但我们可以通过使用 Thread.Sleep()
函数(您需要在靠近文件顶部添加 using System.Threading;
以访问 threading APIs)假装自己很忙来模拟它。我们使用字符串中“点”的数量代表其复杂程度:每个点是花费一秒钟的“工作”。例如,由 Hello...
所描述的虚假任务将会花费三秒钟来处理。
我们将稍微修改上一篇例程中的 RabbitMQProducer 程序,以允许从命令行发送任意消息。这个程序将把任务安排到我们的工作队列中,所以我们将其命名为 NewTask
:
与教程一类似我们需要创建两个项目并分别为其引入 RabbitMQ.Client 库。
dotnet new console --name NewTask
mv NewTask/Program.cs NewTask/NewTask.cs
dotnet new console --name Worker
mv Worker/Program.cs Worker/Worker.cs
cd NewTask
dotnet add package RabbitMQ.Client
cd ../Worker
dotnet add package RabbitMQ.Client
将旧的 RabbitMQProducer.cs 代码拷贝到 NewTask.cs 并做出如下修改:
更新 message 变量初始化:
var message = GetMessage(args);
在 NewTask 类的结尾处添加 GetMessage 方法:
static string GetMessage(string[] args)
{
return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
}
旧的 RabbitMQConsumer.cs 脚本同样需要做一些修改以将消息体中的每一个点伪装为耗时一秒的工作。它会处理由 RabbitMQ 转发的消息并执行任务。将其拷贝到 Worker.cs 并做出如下修改:
当我们现有的 WriteLine 接收到消息后,添加虚假任务来模拟执行时间:
Console.WriteLine($" [x] Received {message}");
int dots = message.Split('.').Length - 1;
Thread.Sleep(dots * 1000);
Console.WriteLine(" [x] Done");
使用任务队列的优点之一是能够轻松地并行化工作。如果我们积压了大量的工作,我们可以增加更多的工作者(worker),这样就很容易扩大规模。
首先,让我们尝试同时运行两个 Worker
实例,它们双方都会从队列中获取消息,但具体如何获取呢?让我康康。
您需要打开三个控制台,其中两个将会运行 Worker
程序。这俩控制台就是我们的两个消费者 —— C1 和 C2。
# shell 1
cd Worker
dotnet run
# => Press [enter] to exit.
# shell 2
cd Worker
dotnet run
# => Press [enter] to exit.
在第三个控制台中我们将会发布新的任务。一旦您启动了消费者之后您就可以发布一些消息了:
# shell 3
cd NewTask
dotnet run "First message."
dotnet run "Second message.."
dotnet run "Third message..."
dotnet run "Fourth message...."
dotnet run "Fifth message....."
让我们看看工作者(worker)们收到了什么:
# shell 1
# => Press [enter] to exit.
# => [x] Received First message.
# => [x] Done
# => [x] Received Third message...
# => [x] Done
# => [x] Received Fifth message.....
# => [x] Done
# shell 2
# => Press [enter] to exit.
# => [x] Received Second message..
# => [x] Done
# => [x] Received Fourth message....
# => [x] Done
默认情况下,RabbitMQ 会依次将每条消息发送给下一个消费者。平均而言,每个消费者都会收到相同数量的信息。这种分发消息的方式称为 round-robin 轮询。您可以在三个或者更多工作者(worker)上继续尝试一下。
执行任务会花费数秒钟,你也许想知道如果一个消费者启动了一个长时间任务并在仅仅完成部分工作后就“死掉”会发生什么。以我们目前的代码,一旦 RabbitMQ 将一条消息传递给消费者后,它会立刻标记删除该消息。在这种情况下,如果您终止了某个工作者(worker)进程,我们会丢失其正在处理的消息。同样我们也会丢失分发到该工作者身上还未来得及处理的所有消息。
显然,我们是不希望丢失任何任务的。如果一个工作者(worker)“死掉”了,我们会希望将任务交给另一个工作者。
为了确保消息从不丢失,RabbitMQ 支持 消息确认。消费者回复一个 ack(nowledgement) 以告知 RabbitMQ 其已经接收、处理了某条特定消息并且 RabbitMQ 可自由删除它。
如果一个消费者在发送 ack 应答前“死掉”了(它的 channel(通道) 关闭,connection(连接) 关闭,或者 TCP 连接丢失),RabbitMQ 会知道消息还未完全得到处理并会将其重新入队。如果当前有别的消费者在线,消息将会被迅速转发给另一个消费者。通过这种方式您可以确保没有消息会丢失,即便工作者(worker)偶尔会“死掉”。
timeout 超时(默认为 30 分钟)用于消费者交付确认超时时强制执行。这有助于检测从不确认交付的有 BUG 的(卡住的)消费者。您可以按照交付确认超时中所述增加此超时。
默认情况下,手动消息确认会被启用。但在之前的例程中,我们通过将 autoAck (“automatic acknowledgement mode”) 参数设置为 true 来显式地关闭它们。现在是时候移除该标志并从工作者(worker)手动发送适当的确认应答了(一旦我们完成了某项任务)。
autoAck 笔记
在这里,有必要解释一下 autoAck 。所谓“自动确认模式”,是针对 RabbitMQ 来说的,而不是针对消费者(consumer)来说的。也就是说,当我们将 autoAck 参数设置为 true 时,一旦 RabbitMQ 将一条消息传递给消费者后,RabbitMQ 会自动确认消息已被成功传递给消费者并将消息从队列中删除,而不需要等待消费者发送确认消息。“自动确认模式”并不是指在该模式下消费者会自动发送确认应答;实际上,在该模式下,消费者根本不会发送任何确认(ack)应答给 RabbitMQ。
自动确认模式的优点是简单且快速,但缺点是如果消费者在处理消息时出现问题(例如崩溃或断开连接),则该消息可能会丢失。因此,在需要确保消息不丢失的场景,建议使用手动确认模式。
在现有的 WriteLine 之后,添加对 BasicAck 的调用并使用 autoAck:false 更新 BasicConsume:
Console.WriteLine(" [x] Done");
// here channel could also be accessed as ((EventingBasicConsumer)sender).Model
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: "hello",
autoAck: false,
consumer: consumer);
使用该代码,您可以确保即便您在一个工作者(worker)处理消息的过程中使用 Ctrl+C 终止了节点也不会丢失任何东西。在工作者节点终止后的很短的时间内,所有的未确认应答的消息将会被重新传递。
确认应答必须在接收消息的同一通道(channel)上发送。尝试使用不同的通道发送确认将会导致通道级别协议异常。参阅确认文档指南以了解更多信息。
忘记确认
缺失
BasicAck
是一个常见的错误。这是一个很容易犯的错误,但后果很严重。当您的客户端退出时,消息会被重新发送(这可能看起来像随机的重新发送),但是 RabbitMQ 会消耗越来越多的内存,因为它不能释放任何未确认的消息。为了调试此类错误您可以使用
rabbitmqctl
打印messages_unacknowledged
字段:sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
- 1
在 Windows 平台下,则不要 sudo:
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
- 1
我们已经学习了即便在消费者“死掉”的情况下如何确保任务不会丢失。但是如果 RabbitMQ 服务器停止工作了,我们的任务仍然会丢失。
除非您告知了 RabbitMQ,否则它会在退出或者崩溃时忘记队列和消息。为了确保消息不会丢失,要求满足两件事情:我们需要将队列和消息双方都标记为持久存储的。
第一步,我们需要确保队列能够在 RabbitMQ 节点重启后幸存下来。为此,我们需要将其声明为 durable:
channel.QueueDeclare(queue: "hello",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
尽管这条命令本身是正确的,但它在我们当前的配置中不会生效。这是因为我们已经定义了一个名为 hello
的非持久存储队列。RabbitMQ 不允许您使用不同的参数去重定义一个已有队列,任何尝试这样做的程序会返回错误。但是有一个快速的解决方案 —— 让我们使用不同的名字声明一个队列,例如 task_queue
:
channel.QueueDeclare(queue: "task_queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
这个 QueueDeclare
变更需要同时应用于生产者和消费者双方的代码。同样您还需要变更 BasicConsume
和 BasicPublish
队列的名称。
此时,我们确信即便 RabbitMQ 重新启动,task_queue
队列也不会丢失。现在,我们需要将我们的消息标记为持久存储。
在现有的 GetBytes 之后,设置 IBasicProperties.Persistent
为 true
:
var body = Encoding.UTF8.GetBytes(message);
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
消息持久存储笔记
将消息标记为持久存储并不完全保证消息不会丢失。尽管它告知 RabbitMQ 将消息存储在磁盘上,但是当 RabbitMQ 接受消息并还未将其存储起来时,仍然有很短的时间窗口。而且,RabbitMQ 并没有为每条消息执行
fsync(2)
—— 它可能只是保存在缓存中,而不是真正写入磁盘。虽然持久存储保证还不是很健壮,但对于我们简单的任务队列来说已经足够了。如果您需要更优健壮性的保证,您可以使用发布者确认。
您可能已经注意到调度仍然没有完全按照我们所期望的那样工作。例如,在两个工作者(worker)的情况下,当所有奇数消息都很繁重,偶数消息很轻松时;一个工作者将一直很忙,而另一个几乎不做任何工作。好吧,RabbitMQ 对此一无所知,仍然会均匀地分发消息。
会发生这样的事是因为 RabbitMQ 只在消息进入队列时调度消息。它不关注消费者未确认消息的数量。它只是盲目地将第 n 条消息分派给第 n 个消费者。
为了改变这种行为,我们可以使用带有 prefetchCount
= 1
设置的 BasicQos
方法。这告诉 RabbitMQ 一次不要给一个工作者(worker)发送多条消息。或者,换句话说,在工作者线程处理并确认前一条消息之前,不要向它发送新消息。相反,RabbitMQ 会将消息分派给下一个不忙的工作者(worker)。
在 Worker.cs 中现有的 QueueDeclare 之后添加对 BasicQos
的调用:
channel.QueueDeclare(queue: "task_queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
有关队列大小的笔记
如果所有的工作者(worker)都很忙,您的队列是可以被填满的。您会想要对此添加监视,并或许添加更多的工作者(worker),或者有一些其他策略。
打开两个终端。
首先运行消费者(即工作者 worker),以便拓扑(主要是队列)就位。下面是它的完整代码:
using System.Text; using RabbitMQ.Client; using RabbitMQ.Client.Events; var factory = new ConnectionFactory { HostName = "localhost" }; using var connection = factory.CreateConnection(); using var channel = connection.CreateModel(); channel.QueueDeclare(queue: "task_queue", durable: true, exclusive: false, autoDelete: false, arguments: null); channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); Console.WriteLine(" [*] Waiting for messages."); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { byte[] body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); Console.WriteLine($" [x] Received {message}"); int dots = message.Split('.').Length - 1; Thread.Sleep(dots * 1000); Console.WriteLine(" [x] Done"); // here channel could also be accessed as ((EventingBasicConsumer)sender).Model channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; channel.BasicConsume(queue: "task_queue", autoAck: false, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine();
现在运行任务发布者(NewTask)。它的最终代码长这样:
using System.Text; using RabbitMQ.Client; var factory = new ConnectionFactory { HostName = "localhost" }; using var connection = factory.CreateConnection(); using var channel = connection.CreateModel(); channel.QueueDeclare(queue: "task_queue", durable: true, exclusive: false, autoDelete: false, arguments: null); var message = GetMessage(args); var body = Encoding.UTF8.GetBytes(message); var properties = channel.CreateBasicProperties(); properties.Persistent = true; channel.BasicPublish(exchange: string.Empty, routingKey: "task_queue", basicProperties: properties, body: body); Console.WriteLine($" [x] Sent {message}"); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); static string GetMessage(string[] args) { return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!"); }
运行效果:
您可以使用消息确认和 BasicQos
设置一个工作队列。持久化操作让任务即便在 RabbitMQ 重启后也能幸存下来。
有关 IModel
方法和 IBasicProperties
的更多信息,您可以浏览 RabbitMQ .NET client API reference online。
现在我们可以移步至教程三,学习如何向多个消费者传递相同的消息。
请记住,本教程和其他教程都是教程。他们一次展示一个新概念,可能会有意地过度简化一些东西,而忽略其他东西。例如,为了简洁起见,连接管理、错误处理、连接恢复、并发性和指标收集等主题在很大程度上被省略了。这种简化的代码不应该被认为可以用于生产。
在发布您的应用之前,请先查看其他文档。我们特别推荐以下指南:发布者确认和消费者确认,生产清单和监控。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。