赞
踩
环境:
我们知道,进入队列的消息需要按顺序分发给消费者,但可能不同的消息处理的时间不同,后面的消息仅需要几秒,而前面有很多耗时的消息(比如需要几分钟),这就很不利于消息的快速处理(各个消息之间没有严格的顺序关系,使用消息队列仅仅是为了降低并发对系统的压力)。
那么有没有什么办法能让消息B快速的跳过消息A被处理呢?
答:有的,在发布消息时,给消息B增加了优先级就行。
我们可能会想,这两种消息是这样的不同(耗时差别大),为什么不拆分到两个队列呢?
按道理,是要拆分两个队列的,但实际情况却可能是各种各样。
发送端代码如下:
using RabbitMQ.Client; using System; using System.Text; namespace Send { class Program { static void Main(string[] args) { //1. 实例化连接工厂 var factory = new ConnectionFactory() { HostName = "localhost", Port = 5672, VirtualHost = "/", UserName = "guest", Password = "guest" }; //2. 建立连接 using (var connection = factory.CreateConnection()) { //3. 创建信道 using (var channel = connection.CreateModel()) { //4. 声明队列(注意声明:x-max-priority) channel.QueueDeclare(queue: "hello", durable: true, exclusive: false, autoDelete: false, arguments: new Dictionary<string, object> { {"x-max-priority",9 }//取值范围[0,255] }); //发送多个无优先级的消息 var index = 1; for (int i = 0; i < 2; i++) { string message = $"消息:{index},无优先级"; var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body); Console.WriteLine($"已发送:{message}"); index++; } //发送多个优先级消息 for (int i = 0; i < 6; i++) { var priority = (byte)(i / 2); string message = $"消息:{index},优先级:{priority}"; var body = Encoding.UTF8.GetBytes(message); var prop = channel.CreateBasicProperties(); prop.Priority = priority; channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: prop, body: body); Console.WriteLine($"已发送:{message}"); index++; } Console.WriteLine("ok"); } } } } }
先运行发送端,结果如下:
此时rabbitmq中也积累了8条消息,我们可以观察到:
其实,我们从rabbitmq的管理端就已经能看出来消息可被消费的先后顺序了。
但我们还是用代码验证一下:
消费端代码:
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Text; using System.Threading; namespace Receive { class Program { static void Main(string[] args) { //1. 实例化连接工厂 var factory = new ConnectionFactory() { HostName = "localhost", Port = 5672, VirtualHost = "/", UserName = "guest", Password = "guest" }; //2. 建立连接 using (var connection = factory.CreateConnection()) { //3. 创建信道 using (var channel = connection.CreateModel()) { //4. 声明队列(和发送端保持一致) channel.QueueDeclare(queue: "hello", durable: true, exclusive: false, autoDelete: false, arguments: new Dictionary<string, object> { {"x-max-priority",9 } }); //5. 构造消费者实例 var consumer = new EventingBasicConsumer(channel); //6. 绑定消息接收后的事件委托 consumer.Received += (model, ea) => { var message = Encoding.UTF8.GetString(ea.Body.ToArray()); Console.WriteLine(" [x] Received {0}", message); Thread.Sleep(1000);//模拟耗时 Console.WriteLine(" [x] Done"); }; //7. 启动消费者 channel.BasicConsume(queue: "hello", autoAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } } } }
现在先让我们运行
我们需要声明 x-max-priority 属性;
除了在程序中声明,我们还可以在rabbitmq端建队列时声明,如下:
优先级属性是 byte
类型的,所以它的取值范围:[0,255];
消息优先级为0或不设优先级的效果一样;
消息优先级大于设定的优先级最大值时,效果同优先级的最大值(我们可以自行实验);
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。