当前位置:   article > 正文

.Net中RabbitMQ的使用详情_net rabbitmq

net rabbitmq

一、什么是RabbitMQ

RabbitMQ是一个开源的消息中间件,它实现了AMQP标准,并且可以在分布式系统中存储、转发和接收消息,可以将消息从一个应用程序发送到另一个应用程序,即使这些应用程序不同时运行,也可以在消息队列中存储消息,确保消息的可靠传递。也就相当于快递,你发快递,你的朋友收快递,RabbitMQ就是快递公司。RabbitMQ可以用于解决各种问题,如解耦系统组件、异步处理任务、实现事件驱动架构等。

NuGet包安装:NuGet\Install-Package RabbitMQ.Client -Version 6.4.0

二、发送

CreateModel是在连接上创建一个信道,信道是RabbitMQ中用于发送和接收消息的主要路径,几乎所有的操作都是通过信道完成的,它是建立在已经建立的TCP连接之上的虚拟连接,信道的创建和销毁相对于TCP连接来说开销较小,这使得客户端可以创建多个信道来并发处理多个任务。

QueueDeclare声明一个消息队列,在 RabbitMQ 中存储消息时,首先就要创建队列。

BasicPublish用于将消息发送到队列中。

  1. var factory = new ConnectionFactory
  2. {
  3. HostName = "localhost", // RabbitMQ 服务器的主机名或 IP 地址,我这里为本地
  4. //Port = 5672, // RabbitMQ 服务器的端口号
  5. //UserName = "guest", // 用于身份验证的用户名
  6. //Password = "guest", // 用于身份验证的密码
  7. //VirtualHost = "/"// 虚拟主机名称
  8. };
  9. using var connection = factory.CreateConnection();//建立与 RabbitMQ 服务器的连接
  10. using var channel = connection.CreateModel();//在已经建立的连接上创建一个新的信道(channel)
  11. // 声明一个队列
  12. channel.QueueDeclare(queue: "NewRabbitMQ",//队列的名称。如果此参数为空字符串,服务器将生成一个唯一的队列名称
  13. durable: false,//true为开启队列持久化
  14. exclusive: false,//队列是否只能由创建它的连接使用。当连接关闭时,队列将被自动删除。
  15. autoDelete: false,//当队列中的所有消息都被消费者消费后,是否应该自动删除队列。
  16. arguments: null//用于声明队列时的其他属性设置
  17. );
  18. const string message = "你好";
  19. var body = Encoding.UTF8.GetBytes(message);
  20. channel.BasicPublish(exchange: string.Empty,//交换机名称。
  21. routingKey: "NewRabbitMQ",//路由键。它决定了消息应该被发送到哪个队列。
  22. basicProperties: null,//消息的属性,如消息的持久性、优先级、内容类型等。
  23. body: body//消息的主体,通常是一个字节数组。
  24. );
  25. Console.WriteLine("OK");

三、接收

EventingBasicConsumer(T) 是 RabbitMQ 中的一个消费者类,用于接收来自 RabbitMQ 服务器中T队列的消息。

Received事件会在队列中有新消息到达时被触发。

  1. //前面的和发送基本一致
  2. var factory = new ConnectionFactory { HostName = "localhost" };
  3. using var connection = factory.CreateConnection();
  4. using var channel = connection.CreateModel();
  5. channel.QueueDeclare(queue: "NewRabbitMQ",
  6. durable: false,//true为开启队列持久化
  7. exclusive: false,
  8. autoDelete: false,
  9. arguments: null);
  10. var consumer = new EventingBasicConsumer(channel);//接收来自 RabbitMQ 服务器的消息
  11. //Received在队列中有新消息到达时被触发
  12. consumer.Received += (model, ea) =>//第一个参数通常为创建的EventingBasicConsumer实例,第二个参数包含了与接收到的消息相关的信息
  13. {
  14. var body = ea.Body.ToArray();
  15. var message = Encoding.UTF8.GetString(body);
  16. Console.WriteLine($" [x] Received {message}");
  17. };
  18. channel.BasicConsume(queue: "NewRabbitMQ",
  19. autoAck: true,//是否自动确认消息。如果设置为 true,则每当消息被消费者接收时,RabbitMQ 会自动认为该消息已被成功处理,并将其从队列中移除。如果设置为 false,则消费者需要显式地发送一个确认(通过 BasicAck 方法)来告诉 RabbitMQ 该消息已被成功处理。
  20. consumer: consumer);
  21. Console.ReadLine();

四、消息确认

消息确认机制是为了确保消费者成功消费了队列中的消息,并避免因消费者处理失败而导致消息丢失或重复处理的问题。即就是消费者在对一个任务处理时,任务还没有处理完成,而消费者却意外挂了,消息确认就是处理方案。

在使用BasicAck时,消费者需要提供一个参数,即deliveryTag,这个参数用于标识要确认的消息。RabbitMQ服务器在接收到BasicAck请求后,会将该消息从队列中移除,以确保消息不会被重复处理。

  1. consumer.Received += (model, ea) =>//第一个参数通常为创建的EventingBasicConsumer实例,第二个参数包含了与接收到的消息相关的信息
  2. {
  3. var body = ea.Body.ToArray();
  4. var message = Encoding.UTF8.GetString(body);
  5. Console.WriteLine($" [x] Received {message}");
  6. channel.BasicAck(deliveryTag:ea.DeliveryTag,multiple:false);//ea.DeliveryTag队列编号
  7. };
  8. channel.BasicConsume(queue: "NewRabbitMQ",
  9. autoAck: false,//是否自动确认消息。如果设置为 true,则每当消息被消费者接收时,RabbitMQ 会自动认为该消息已被成功处理,并将其从队列中移除。如果设置为 false,则消费者需要显式地发送一个确认(通过 BasicAck 方法)来告诉 RabbitMQ 该消息已被成功处理。
  10. consumer: consumer);
  11. Console.ReadLine();

 注:如果忘记消息确认,会导致RabbitMQ服务器一直投送重复任务,导致内存占用严重。

五、消息持久化

消费者挂掉的解决办法有了,那如果RabbitMQ服务器整个都挂掉了呢,那会使我们原本不富裕的头顶更加贫瘠。为了避免这种事情的发生,需要使用消息持久化。

消息持久化是指将消息和队列都保存在磁盘上,以确保在RabbitMQ服务重启或其他异常情况下,消息不会丢失。首先,需要在创建队列时(生产者和消费者都要)将durable设置为true,在发送消息时,需要将消息的DeliveryMode属性设置为2。

  1. channel.QueueDeclare(queue: "NewRabbitMQ",//队列的名称。如果此参数为空字符串,服务器将生成一个唯一的队列名称
  2. durable: true,//true为开启队列持久化
  3. exclusive: false,//队列是否只能由创建它的连接使用。当连接关闭时,队列将被自动删除。
  4. autoDelete: false,//当队列中的所有消息都被消费者消费后,是否应该自动删除队列。
  5. arguments: null//用于声明队列时的其他属性设置
  6. );
  7. channel.CreateBasicProperties().DeliveryMode = 2;//设置消息持久化

此外需要注意的是: 队列和消息都被设置为持久化,也不能百分之一百保证消息不丢失(剩下的交给天意吧),因为RabbitMQ在接收到消息后,不会立即将其保存到磁盘上,而是会先将其存储在内存中,然后再异步地将其写入磁盘,也因为消息会写入到磁盘中,所以会对性能有一定的影响。

六、限流

如果有多个消费者,就会出现一些消费者繁忙,一些消费者清闲,为了避免某些消费者压力过大,会需要使用流量控制的限制,通过BasicQos可以控制消费者从RabbitMQ队列中拉取的消息的数量和大小。

prefetchSize整数值,可以接收但尚未确认的消息的最大字节数,达到限制时,RabbitMQ将停止向该消费者发送更多消息。

prefetchCount整数值,可以接收但尚未确认的消息的最大数量。

global布尔值,是否应用于整个连接中的所有信道,true连接所有信道,false连接当前信道。

  1. //在消费者中设置
  2. channel.BasicQos(
  3. prefetchSize: 0,
  4. prefetchCount: 1,
  5. global: false
  6. );
  7. var consumer = new EventingBasicConsumer(channel);//接收来自 RabbitMQ 服务器的消息

注意:限流只有在autoAck为false的情况下才有用(即手动消息确认模式下),自动消息确认下不起作用。

七、交换机

RabbitMQ中生产者不会直接将消息发送到队列,而是将消息发送到交换机,交换机再将消息路由到一个或多个队列中。

直连交换机

将消息路由到与其绑定的队列中,要求消息的路由键(routingKey)与队列的绑定键完全匹配。

发送

声明交换机使用ExchangeDeclare,包含五个参数:

exchange交换机的名称。

type交换机的类型。

durable交换机是否应该在RabbitMQ服务器重启后依然存在。

autoDelete最后一个队列与交换机解绑后,是否自动删除交换机。

arguments传递特定于交换机的参数。

  1. var factory = new ConnectionFactory() { HostName = "localhost" };
  2. using var connection = factory.CreateConnection();
  3. using (var channel = connection.CreateModel())
  4. {
  5. // 声明一个直连交换机
  6. channel.ExchangeDeclare(
  7. exchange: "RabbitMQ_direct",//交换机名称
  8. type: "direct"//交换机类型
  9. durable:true
  10. );
  11. // 声明一个队列
  12. channel.QueueDeclare(
  13. queue: "NewRabbitMQ",
  14. durable: false,
  15. exclusive: false,
  16. autoDelete: false,
  17. arguments: null
  18. );
  19. // 绑定队列到直连交换机,指定路由键
  20. channel.QueueBind(
  21. queue: "NewRabbitMQ",//要绑定的队列名称。
  22. exchange: "RabbitMQ_direct",//要绑定的交换机名称。
  23. routingKey: "direct.key"//是否将消息路由到这个队列
  24. );
  25. // 发送消息到直连交换机
  26. string message = "你好";
  27. var body = Encoding.UTF8.GetBytes(message);
  28. channel.BasicPublish(
  29. exchange: "RabbitMQ_direct",//交换机名称
  30. routingKey: "direct.key",
  31. basicProperties: null,
  32. body: body
  33. );
  34. Console.WriteLine("OK" + message);
  35. }

接收

  1. var factory = new ConnectionFactory() { HostName = "localhost" };
  2. using var connection = factory.CreateConnection();
  3. using (var channel = connection.CreateModel())
  4. {
  5. //接收消息
  6. var consumer = new EventingBasicConsumer(channel);
  7. consumer.Received += (model, ea) =>
  8. {
  9. var body = ea.Body.ToArray();
  10. var message = Encoding.UTF8.GetString(body);
  11. Console.WriteLine(message);
  12. };
  13. channel.BasicConsume(
  14. queue:"NewRabbitMQ",
  15. autoAck: true,
  16. consumer: consumer
  17. );
  18. Console.ReadLine();
  19. }

主题交换机

根据消息的路由键和队列的绑定模式进行匹配,将消息路由到符合模式的队列中。主题交换机支持通配符匹配,如“*”表示匹配一个单词,“#”表示匹配多个单词,例如:"*.news" 会匹配所有以 "news" 结尾的路由键。

发送

  1. var factory = new ConnectionFactory() { HostName = "localhost" };
  2. using var connection = factory.CreateConnection();
  3. using (var channel = connection.CreateModel())
  4. {
  5. // 声明一个直连交换机
  6. channel.ExchangeDeclare(
  7. exchange: "RabbitMQ_topic",//交换机名称
  8. type: "topic"//交换机类型
  9. );
  10. // 声明一个队列
  11. channel.QueueDeclare(
  12. queue: "NewRabbitMQ",
  13. durable: false,
  14. exclusive: false,
  15. autoDelete: false,
  16. arguments: null
  17. );
  18. // 绑定队列到直连交换机,指定路由键
  19. channel.QueueBind(
  20. queue: "NewRabbitMQ",//要绑定的队列名称。
  21. exchange: "RabbitMQ_topic",//要绑定的交换机名称。
  22. routingKey: "user.*.info"//是否将消息路由到这个队列,这里表示匹配所有以 "user." 开头,并以 "info" 结尾的路由键
  23. );
  24. // 发送消息到直连交换机
  25. string message = "你好";
  26. var body = Encoding.UTF8.GetBytes(message);
  27. channel.BasicPublish(
  28. exchange: "RabbitMQ_topic",//交换机名称
  29. routingKey: "user.abc.info",
  30. basicProperties: null,
  31. body: body
  32. );
  33. Console.WriteLine("OK" + message);
  34. }

接收 

与上面一致

扇形交换机

将消息路由到所有与其绑定的队列中,不关心消息的路由键和队列的绑定键。

发送

  1. var factory = new ConnectionFactory() { HostName = "localhost" };
  2. using var connection = factory.CreateConnection();
  3. using (var channel = connection.CreateModel())
  4. {
  5. // 声明一个直连交换机
  6. channel.ExchangeDeclare(
  7. exchange: "RabbitMQ_fanout",//交换机名称
  8. type: "fanout"//交换机类型
  9. );
  10. // 声明一个队列
  11. channel.QueueDeclare(
  12. queue: "NewRabbitMQ",
  13. durable: false,
  14. exclusive: false,
  15. autoDelete: false,
  16. arguments: null
  17. );
  18. // 绑定队列到直连交换机,指定路由键
  19. channel.QueueBind(
  20. queue: "NewRabbitMQ",//要绑定的队列名称。
  21. exchange: "RabbitMQ_fanout",//要绑定的交换机名称。
  22. routingKey: ""//扇形交换机不需要路由键,因为所有消息都会被路由到所有绑定的队列
  23. );
  24. // 发送消息到直连交换机
  25. string message = "你好";
  26. var body = Encoding.UTF8.GetBytes(message);
  27. channel.BasicPublish(
  28. exchange: "RabbitMQ_fanout",//交换机名称
  29. routingKey: "",
  30. basicProperties: null,
  31. body: body
  32. );
  33. Console.WriteLine("OK" + message);
  34. }

接收

与上面一致

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/秋刀鱼在做梦/article/detail/1013199
推荐阅读
相关标签
  

闽ICP备14008679号