赞
踩
RabbitMQ是一个开源的消息中间件,它实现了AMQP标准,并且可以在分布式系统中存储、转发和接收消息,可以将消息从一个应用程序发送到另一个应用程序,即使这些应用程序不同时运行,也可以在消息队列中存储消息,确保消息的可靠传递。也就相当于快递,你发快递,你的朋友收快递,RabbitMQ就是快递公司。RabbitMQ可以用于解决各种问题,如解耦系统组件、异步处理任务、实现事件驱动架构等。
NuGet包安装:NuGet\Install-Package RabbitMQ.Client -Version 6.4.0
CreateModel是在连接上创建一个信道,信道是RabbitMQ中用于发送和接收消息的主要路径,几乎所有的操作都是通过信道完成的,它是建立在已经建立的TCP连接之上的虚拟连接,信道的创建和销毁相对于TCP连接来说开销较小,这使得客户端可以创建多个信道来并发处理多个任务。
QueueDeclare声明一个消息队列,在 RabbitMQ 中存储消息时,首先就要创建队列。
BasicPublish用于将消息发送到队列中。
- var factory = new ConnectionFactory
- {
- HostName = "localhost", // RabbitMQ 服务器的主机名或 IP 地址,我这里为本地
- //Port = 5672, // RabbitMQ 服务器的端口号
- //UserName = "guest", // 用于身份验证的用户名
- //Password = "guest", // 用于身份验证的密码
- //VirtualHost = "/"// 虚拟主机名称
- };
- using var connection = factory.CreateConnection();//建立与 RabbitMQ 服务器的连接
- using var channel = connection.CreateModel();//在已经建立的连接上创建一个新的信道(channel)
- // 声明一个队列
- channel.QueueDeclare(queue: "NewRabbitMQ",//队列的名称。如果此参数为空字符串,服务器将生成一个唯一的队列名称
- durable: false,//true为开启队列持久化
- exclusive: false,//队列是否只能由创建它的连接使用。当连接关闭时,队列将被自动删除。
- autoDelete: false,//当队列中的所有消息都被消费者消费后,是否应该自动删除队列。
- arguments: null//用于声明队列时的其他属性设置
- );
- const string message = "你好";
- var body = Encoding.UTF8.GetBytes(message);
- channel.BasicPublish(exchange: string.Empty,//交换机名称。
- routingKey: "NewRabbitMQ",//路由键。它决定了消息应该被发送到哪个队列。
- basicProperties: null,//消息的属性,如消息的持久性、优先级、内容类型等。
- body: body//消息的主体,通常是一个字节数组。
- );
- Console.WriteLine("OK");
EventingBasicConsumer(T) 是 RabbitMQ 中的一个消费者类,用于接收来自 RabbitMQ 服务器中T队列的消息。
Received事件会在队列中有新消息到达时被触发。
- //前面的和发送基本一致
- var factory = new ConnectionFactory { HostName = "localhost" };
- using var connection = factory.CreateConnection();
- using var channel = connection.CreateModel();
-
- channel.QueueDeclare(queue: "NewRabbitMQ",
- durable: false,//true为开启队列持久化
- exclusive: false,
- autoDelete: false,
- arguments: null);
-
- var consumer = new EventingBasicConsumer(channel);//接收来自 RabbitMQ 服务器的消息
- //Received在队列中有新消息到达时被触发
- consumer.Received += (model, ea) =>//第一个参数通常为创建的EventingBasicConsumer实例,第二个参数包含了与接收到的消息相关的信息
- {
- var body = ea.Body.ToArray();
- var message = Encoding.UTF8.GetString(body);
- Console.WriteLine($" [x] Received {message}");
- };
- channel.BasicConsume(queue: "NewRabbitMQ",
- autoAck: true,//是否自动确认消息。如果设置为 true,则每当消息被消费者接收时,RabbitMQ 会自动认为该消息已被成功处理,并将其从队列中移除。如果设置为 false,则消费者需要显式地发送一个确认(通过 BasicAck 方法)来告诉 RabbitMQ 该消息已被成功处理。
- consumer: consumer);
- Console.ReadLine();
消息确认机制是为了确保消费者成功消费了队列中的消息,并避免因消费者处理失败而导致消息丢失或重复处理的问题。即就是消费者在对一个任务处理时,任务还没有处理完成,而消费者却意外挂了,消息确认就是处理方案。
在使用BasicAck时,消费者需要提供一个参数,即deliveryTag,这个参数用于标识要确认的消息。RabbitMQ服务器在接收到BasicAck请求后,会将该消息从队列中移除,以确保消息不会被重复处理。
- consumer.Received += (model, ea) =>//第一个参数通常为创建的EventingBasicConsumer实例,第二个参数包含了与接收到的消息相关的信息
- {
- var body = ea.Body.ToArray();
- var message = Encoding.UTF8.GetString(body);
- Console.WriteLine($" [x] Received {message}");
- channel.BasicAck(deliveryTag:ea.DeliveryTag,multiple:false);//ea.DeliveryTag队列编号
- };
- channel.BasicConsume(queue: "NewRabbitMQ",
- autoAck: false,//是否自动确认消息。如果设置为 true,则每当消息被消费者接收时,RabbitMQ 会自动认为该消息已被成功处理,并将其从队列中移除。如果设置为 false,则消费者需要显式地发送一个确认(通过 BasicAck 方法)来告诉 RabbitMQ 该消息已被成功处理。
- consumer: consumer);
- Console.ReadLine();
注:如果忘记消息确认,会导致RabbitMQ服务器一直投送重复任务,导致内存占用严重。
消费者挂掉的解决办法有了,那如果RabbitMQ服务器整个都挂掉了呢,那会使我们原本不富裕的头顶更加贫瘠。为了避免这种事情的发生,需要使用消息持久化。
消息持久化是指将消息和队列都保存在磁盘上,以确保在RabbitMQ服务重启或其他异常情况下,消息不会丢失。首先,需要在创建队列时(生产者和消费者都要)将durable设置为true,在发送消息时,需要将消息的DeliveryMode属性设置为2。
- channel.QueueDeclare(queue: "NewRabbitMQ",//队列的名称。如果此参数为空字符串,服务器将生成一个唯一的队列名称
- durable: true,//true为开启队列持久化
- exclusive: false,//队列是否只能由创建它的连接使用。当连接关闭时,队列将被自动删除。
- autoDelete: false,//当队列中的所有消息都被消费者消费后,是否应该自动删除队列。
- arguments: null//用于声明队列时的其他属性设置
- );
- channel.CreateBasicProperties().DeliveryMode = 2;//设置消息持久化
此外需要注意的是: 队列和消息都被设置为持久化,也不能百分之一百保证消息不丢失(剩下的交给天意吧),因为RabbitMQ在接收到消息后,不会立即将其保存到磁盘上,而是会先将其存储在内存中,然后再异步地将其写入磁盘,也因为消息会写入到磁盘中,所以会对性能有一定的影响。
如果有多个消费者,就会出现一些消费者繁忙,一些消费者清闲,为了避免某些消费者压力过大,会需要使用流量控制的限制,通过BasicQos可以控制消费者从RabbitMQ队列中拉取的消息的数量和大小。
prefetchSize整数值,可以接收但尚未确认的消息的最大字节数,达到限制时,RabbitMQ将停止向该消费者发送更多消息。
prefetchCount整数值,可以接收但尚未确认的消息的最大数量。
global布尔值,是否应用于整个连接中的所有信道,true连接所有信道,false连接当前信道。
- //在消费者中设置
- channel.BasicQos(
- prefetchSize: 0,
- prefetchCount: 1,
- global: false
- );
-
- var consumer = new EventingBasicConsumer(channel);//接收来自 RabbitMQ 服务器的消息
注意:限流只有在autoAck为false的情况下才有用(即手动消息确认模式下),自动消息确认下不起作用。
RabbitMQ中生产者不会直接将消息发送到队列,而是将消息发送到交换机,交换机再将消息路由到一个或多个队列中。
将消息路由到与其绑定的队列中,要求消息的路由键(routingKey)与队列的绑定键完全匹配。
声明交换机使用ExchangeDeclare,包含五个参数:
exchange交换机的名称。
type交换机的类型。
durable交换机是否应该在RabbitMQ服务器重启后依然存在。
autoDelete最后一个队列与交换机解绑后,是否自动删除交换机。
arguments传递特定于交换机的参数。
- var factory = new ConnectionFactory() { HostName = "localhost" };
- using var connection = factory.CreateConnection();
- using (var channel = connection.CreateModel())
- {
- // 声明一个直连交换机
- channel.ExchangeDeclare(
- exchange: "RabbitMQ_direct",//交换机名称
- type: "direct",//交换机类型
- durable:true
- );
-
- // 声明一个队列
- channel.QueueDeclare(
- queue: "NewRabbitMQ",
- durable: false,
- exclusive: false,
- autoDelete: false,
- arguments: null
- );
-
- // 绑定队列到直连交换机,指定路由键
- channel.QueueBind(
- queue: "NewRabbitMQ",//要绑定的队列名称。
- exchange: "RabbitMQ_direct",//要绑定的交换机名称。
- routingKey: "direct.key"//是否将消息路由到这个队列
- );
-
- // 发送消息到直连交换机
- string message = "你好";
- var body = Encoding.UTF8.GetBytes(message);
- channel.BasicPublish(
- exchange: "RabbitMQ_direct",//交换机名称
- routingKey: "direct.key",
- basicProperties: null,
- body: body
- );
- Console.WriteLine("OK" + message);
- }
- var factory = new ConnectionFactory() { HostName = "localhost" };
- using var connection = factory.CreateConnection();
- using (var channel = connection.CreateModel())
- {
- //接收消息
- var consumer = new EventingBasicConsumer(channel);
- consumer.Received += (model, ea) =>
- {
- var body = ea.Body.ToArray();
- var message = Encoding.UTF8.GetString(body);
- Console.WriteLine(message);
- };
- channel.BasicConsume(
- queue:"NewRabbitMQ",
- autoAck: true,
- consumer: consumer
- );
-
- Console.ReadLine();
- }
根据消息的路由键和队列的绑定模式进行匹配,将消息路由到符合模式的队列中。主题交换机支持通配符匹配,如“*”表示匹配一个单词,“#”表示匹配多个单词,例如:"*.news" 会匹配所有以 "news" 结尾的路由键。
- var factory = new ConnectionFactory() { HostName = "localhost" };
- using var connection = factory.CreateConnection();
- using (var channel = connection.CreateModel())
- {
- // 声明一个直连交换机
- channel.ExchangeDeclare(
- exchange: "RabbitMQ_topic",//交换机名称
- type: "topic"//交换机类型
- );
-
- // 声明一个队列
- channel.QueueDeclare(
- queue: "NewRabbitMQ",
- durable: false,
- exclusive: false,
- autoDelete: false,
- arguments: null
- );
-
- // 绑定队列到直连交换机,指定路由键
- channel.QueueBind(
- queue: "NewRabbitMQ",//要绑定的队列名称。
- exchange: "RabbitMQ_topic",//要绑定的交换机名称。
- routingKey: "user.*.info"//是否将消息路由到这个队列,这里表示匹配所有以 "user." 开头,并以 "info" 结尾的路由键
- );
-
- // 发送消息到直连交换机
- string message = "你好";
- var body = Encoding.UTF8.GetBytes(message);
- channel.BasicPublish(
- exchange: "RabbitMQ_topic",//交换机名称
- routingKey: "user.abc.info",
- basicProperties: null,
- body: body
- );
- Console.WriteLine("OK" + message);
- }
与上面一致
将消息路由到所有与其绑定的队列中,不关心消息的路由键和队列的绑定键。
- var factory = new ConnectionFactory() { HostName = "localhost" };
- using var connection = factory.CreateConnection();
- using (var channel = connection.CreateModel())
- {
- // 声明一个直连交换机
- channel.ExchangeDeclare(
- exchange: "RabbitMQ_fanout",//交换机名称
- type: "fanout"//交换机类型
- );
-
- // 声明一个队列
- channel.QueueDeclare(
- queue: "NewRabbitMQ",
- durable: false,
- exclusive: false,
- autoDelete: false,
- arguments: null
- );
-
- // 绑定队列到直连交换机,指定路由键
- channel.QueueBind(
- queue: "NewRabbitMQ",//要绑定的队列名称。
- exchange: "RabbitMQ_fanout",//要绑定的交换机名称。
- routingKey: ""//扇形交换机不需要路由键,因为所有消息都会被路由到所有绑定的队列
- );
-
- // 发送消息到直连交换机
- string message = "你好";
- var body = Encoding.UTF8.GetBytes(message);
- channel.BasicPublish(
- exchange: "RabbitMQ_fanout",//交换机名称
- routingKey: "",
- basicProperties: null,
- body: body
- );
- Console.WriteLine("OK" + message);
- }
与上面一致
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。