当前位置:   article > 正文

.NET中间件 -- 消息队列- RabbitMQ_net rabbitmq

net rabbitmq

RabbitMQ是一款使用Erlang语言开发的,实现AMQP(高级消息队列协议)的开源消息中间件。

消息指的是两个应用间传递的数据。数据的类型有很多种形式,可能只包含文本字符串,也可能包含嵌入对象。

“消息队列(Message Queue)”是在消息的传输过程中保存消息的容器。在消息队列中,通常有生产者和消费者两个角色。生产者只负责发送数据到消息队列; 消费者只负责从消息队列中取出数据处理。

启用之前下载安装RabbitMQ

RabbitMQ防止数据丢失:

1.生产者发送消息时,使用Confirm机制,来确认消息到达消息队列

2.RabbitMQ端设置消息持久化

3. 消费者消费消息时,使用ACK事务机制,进行手动签收,正常消费则返回确认签收,如果出现异常,则返回拒绝签收重回队列。

生产者RabbitMQClient.cs

  1. namespace RabbitMQClient
  2. {
  3. class RabbitMQClient
  4. {
  5. public void SendMessage()
  6. {
  7. //实例化连接工厂
  8. var factory = new ConnectionFactory()
  9. {
  10. HostName = "localhost", //RabbitMQ服务在本地运行
  11. Port = 5672, //端口号
  12. UserName = "guest", //默认用户名
  13. Password = "guest" //默认密码
  14. };
  15. //建立连接
  16. using (var connection=factory.CreateConnection())
  17. {
  18. //创建信道
  19. using (var channel=connection.CreateModel())
  20. {
  21. //声明队列
  22. /* 创建一个名为 ProcessQueue 的消息队列,如果名称相同不会重复创建,参数解释:
  23. * 参1:消息队列名称;
  24. * 参2:是否持久化,持久化的队列会存盘,服务器重启后任然存在;
  25. * 参3:是否为排他队列,排他队列表示仅对首次声明它的连接可见,并在连接断开时自动删除。这种队列适用于一个客户端同时发送和读取消息的应用场景。
  26. * 参4:是否自动删除,自动删除的前提是:至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。
  27. * 参5:设置队列的其他一些参数,如 x-rnessage-ttl、x-expires、x-rnax-length、x-rnax-length-bytes、x-dead-letter-exchange、x-deadletter-routing-key、x-rnax-priority 等。
  28. */
  29. channel.QueueDeclare("ProcessQueue", durable: true, false, false, null);
  30. //设置消息优先级(Priority)
  31. //channel.QueueDeclare("ProcessQueue", durable: true, false, false, new Dictionary<string, object> {
  32. // { "x-rnax-priority", 10 }
  33. //});
  34. var properties = channel.CreateBasicProperties();
  35. properties.DeliveryMode = 2; //持久化
  36. //设置优先级,0-9,大的优先发送,先消费
  37. //properties.Priority = 9;
  38. //轮询调度(默认)
  39. //此情形下,RabbitMQ把信息按顺序发送给每一个消费者。平均每个消费者将获得同等数量的信息。
  40. /*公平分发
  41. * 设置消息通道的基础 Qos 参数
  42. * 只有当消费者回传消息标记后,才会将下一个消息发送给它,否则将消息分发给其它空闲的消费者
  43. * prefetchCount:1 表示告诉 RabbitMQ, 在未接收到消费者确认消息之前,不在分发消息
  44. */
  45. channel.BasicQos(0, prefetchCount: 1, false);
  46. //构建字节数据包
  47. string message = State.Initialized.ToString();
  48. var body = Encoding.UTF8.GetBytes(message);
  49. //开启confirm模式
  50. channel.ConfirmSelect();
  51. //发送数据包
  52. channel.BasicPublish("", "ProcessQueue", properties, body);
  53. Console.WriteLine($"已发布消息:{message}");
  54. message = State.Terminated.ToString();
  55. body = Encoding.UTF8.GetBytes(message);
  56. channel.BasicPublish("", "ProcessQueue", properties, body);
  57. Console.WriteLine($"已发布消息:{message}");
  58. message = State.Error.ToString();
  59. body = Encoding.UTF8.GetBytes(message);
  60. channel.BasicPublish("", "ProcessQueue", properties, body);
  61. Console.WriteLine($"已发布消息:{message}");
  62. if (channel.WaitForConfirms())
  63. {
  64. Console.WriteLine("所有消息均已发送到Broker");
  65. }
  66. }
  67. }
  68. }
  69. //将生产者推送的信息发给每个订阅的消费者处理,使用Exchange实现
  70. public void PublishMessage()
  71. {
  72. var factory = new ConnectionFactory()
  73. {
  74. HostName ="localhost",
  75. Port = 5672,
  76. };
  77. using (var connection=factory.CreateConnection())
  78. {
  79. using (var channel=connection.CreateModel())
  80. {
  81. //声明信息交换机,
  82. //type: fanout(将信息分发到exchange上绑定的所有队列上);direct(消费者绑定的队列名称须和生产者发布指定的路由名称一致)
  83. channel.ExchangeDeclare(exchange:"fanoutTest",type:ExchangeType.Fanout);
  84. var message = "Kilter";
  85. byte[] body = Encoding.UTF8.GetBytes(message);
  86. //发布到指定exchange,fanout类型的会忽视routingKey的值,所以无需填写
  87. channel.BasicPublish(exchange:"fanoutTest",routingKey:"",null,body);
  88. Console.WriteLine($"发布消息:{message} 到exchange");
  89. }
  90. }
  91. }
  92. }
  93. public enum State
  94. {
  95. Terminated, // 终止状态
  96. Initialized, // 初始化状态
  97. Idle, // 空闲
  98. Busy, // 忙碌
  99. Pause, // 暂停状态
  100. Warning, // 警告,人工消除后可继续
  101. Error, // 错误,人工消除后可继续
  102. FatalError, // 致命错误,无法继续工作,仅可通过重新初始化
  103. }
  104. }

消费者RabbitMQServer.cs

  1. class RabbitMQServer
  2. {
  3. public ConcurrentDictionary<string, MethodInfo> methodDic = new ConcurrentDictionary<string, MethodInfo>();
  4. public object Instance { get; set; }
  5. public RabbitMQServer()
  6. {
  7. SetExecutor();
  8. }
  9. public void SetExecutor()
  10. {
  11. Type type= Assembly.Load("RabbitMQServer").GetType("RabbitMQServer.Executor");
  12. Instance = Activator.CreateInstance(type);
  13. MethodInfo[] methods = type.GetMethods();
  14. foreach (var item in methods)
  15. {
  16. methodDic.TryAdd(item.Name,item);
  17. }
  18. }
  19. public void ReceiveMessage()
  20. {
  21. var factory = new ConnectionFactory()
  22. {
  23. HostName = "localhost",
  24. Port = 5672,
  25. UserName = "guest",
  26. Password = "guest"
  27. };
  28. //tips: 不能放入 using 语句中,否则当 using 语句结束后会 Close 连接,EventingBasicConsumer.Received 事件将不会被触发
  29. var connection = factory.CreateConnection();
  30. var channel = connection.CreateModel();
  31. channel.QueueDeclare("ProcessQueue", durable:true, false, false, null);
  32. //构造消费者实例(指定消息通道)
  33. var consumer = new EventingBasicConsumer(channel);
  34. /*消费者消费消息(在当前通道中监听 ProcessQueue 队列,并进行消费)
  35. autoAck参数属性
  36. true:自动信息确认,当消费者接收到信息后,自动发送ack信号,不管信息是否处理完毕
  37. false:关闭自动信息确认,通过调用BasicAck方法手动进行信息确认,表示开启消息响应的功能, 当一条消息发送给消费者后,该消息必须得到消费者的“确认”后,RabbitMQ 才会将该消息删除。
  38. */
  39. channel.BasicConsume("ProcessQueue", autoAck: false, consumer);
  40. //绑定消息接收后的事件委托
  41. consumer.Received += (sender, e) =>
  42. {
  43. //ReadOnlyMemory<byte>转化为byte[] : .ToArray()
  44. //string转化为ReadOnlyMemory<byte>: .AsMemory()
  45. var body = e.Body.ToArray(); //消息字节数组
  46. var message = Encoding.UTF8.GetString(body); //消息内容
  47. Console.WriteLine($"已消费消息:{message}");
  48. //根据消息,添加逻辑
  49. if (methodDic.ContainsKey(message))
  50. {
  51. methodDic[message].Invoke(Instance, null);
  52. }
  53. //消息响应,在消息处理完成后回传该消息标记, 只有当响应此消息标记后,该消息才会在消息队列中删除
  54. //deliveryTag参数是分发的标记,multiple表示是否确认多条。
  55. channel.BasicAck(e.DeliveryTag, false);
  56. //用于拒绝消息,requeue表示消息被拒绝后是否重新放回queue中,true表示放回queue中,false表示直接丢弃。
  57. //channel.BasicReject(e.DeliveryTag, requeue: false);
  58. };
  59. }
  60. //多个消费者订阅一个生产者
  61. public void SubscribeMessage()
  62. {
  63. var factory = new ConnectionFactory()
  64. {
  65. HostName = "localhost",
  66. Port = 5672
  67. };
  68. var connection = factory.CreateConnection();
  69. var channel = connection.CreateModel();
  70. //声明信息交换机
  71. channel.ExchangeDeclare(exchange: "fanoutTest", type: ExchangeType.Fanout);
  72. //循环生产3个消费者,模拟此情境
  73. for (int i = 0; i < 3; i++)
  74. {
  75. //生成随机队列名称
  76. var queuename = channel.QueueDeclare().QueueName;
  77. //绑定队列到指定fanout类型exchange
  78. channel.QueueBind(queuename, "fanoutTest", "", null);
  79. var consumer = new EventingBasicConsumer(channel);
  80. consumer.Received += (sender, e) =>
  81. {
  82. var body = e.Body.ToArray(); //消息字节数组
  83. var message = Encoding.UTF8.GetString(body); //消息内容
  84. Console.WriteLine($"消费者{queuename}通过exchange已消费消息:{message}");
  85. };
  86. channel.BasicConsume(queuename, true, consumer);
  87. }
  88. }
  89. }
  90. public class Executor
  91. {
  92. public void Terminated()
  93. {
  94. Console.WriteLine("执行Terminated对应事件");
  95. }
  96. public void Initialized()
  97. {
  98. Console.WriteLine("执行Initialized对应事件");
  99. }
  100. public void Error()
  101. {
  102. Console.WriteLine("执行Error对应事件");
  103. }
  104. }

运行结果:

参考:

[1] C#调用RabbitMQ实现消息队列 - kiba518 - 博客园 (cnblogs.com)

[2] 快速掌握RabbitMQ(三)——消息确认、持久化、优先级的C#实现 - 捞月亮的猴子 - 博客园 (cnblogs.com)

[3] C# 消息队列之 RabbitMQ 进阶篇 - Abeam - 博客园 (cnblogs.com)

[4] C#教程之RabbitMQ基础入门篇|C#教程 (xin3721.com)

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/998687
推荐阅读
相关标签
  

闽ICP备14008679号