赞
踩
RabbitMQ是一款使用Erlang语言开发的,实现AMQP(高级消息队列协议)的开源消息中间件。
消息指的是两个应用间传递的数据。数据的类型有很多种形式,可能只包含文本字符串,也可能包含嵌入对象。
“消息队列(Message Queue)”是在消息的传输过程中保存消息的容器。在消息队列中,通常有生产者和消费者两个角色。生产者只负责发送数据到消息队列; 消费者只负责从消息队列中取出数据处理。
RabbitMQ防止数据丢失:
1.生产者发送消息时,使用Confirm机制,来确认消息到达消息队列
2.RabbitMQ端设置消息持久化
3. 消费者消费消息时,使用ACK事务机制,进行手动签收,正常消费则返回确认签收,如果出现异常,则返回拒绝签收重回队列。
生产者RabbitMQClient.cs
- namespace RabbitMQClient
- {
- class RabbitMQClient
- {
- public void SendMessage()
- {
- //实例化连接工厂
- var factory = new ConnectionFactory()
- {
- HostName = "localhost", //RabbitMQ服务在本地运行
- Port = 5672, //端口号
- UserName = "guest", //默认用户名
- Password = "guest" //默认密码
- };
-
- //建立连接
- using (var connection=factory.CreateConnection())
- {
- //创建信道
- using (var channel=connection.CreateModel())
- {
- //声明队列
- /* 创建一个名为 ProcessQueue 的消息队列,如果名称相同不会重复创建,参数解释:
- * 参1:消息队列名称;
- * 参2:是否持久化,持久化的队列会存盘,服务器重启后任然存在;
- * 参3:是否为排他队列,排他队列表示仅对首次声明它的连接可见,并在连接断开时自动删除。这种队列适用于一个客户端同时发送和读取消息的应用场景。
- * 参4:是否自动删除,自动删除的前提是:至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。
- * 参5:设置队列的其他一些参数,如 x-rnessage-ttl、x-expires、x-rnax-length、x-rnax-length-bytes、x-dead-letter-exchange、x-deadletter-routing-key、x-rnax-priority 等。
- */
- channel.QueueDeclare("ProcessQueue", durable: true, false, false, null);
-
- //设置消息优先级(Priority)
- //channel.QueueDeclare("ProcessQueue", durable: true, false, false, new Dictionary<string, object> {
- // { "x-rnax-priority", 10 }
- //});
-
- var properties = channel.CreateBasicProperties();
- properties.DeliveryMode = 2; //持久化
-
- //设置优先级,0-9,大的优先发送,先消费
- //properties.Priority = 9;
-
- //轮询调度(默认)
- //此情形下,RabbitMQ把信息按顺序发送给每一个消费者。平均每个消费者将获得同等数量的信息。
-
- /*公平分发
- * 设置消息通道的基础 Qos 参数
- * 只有当消费者回传消息标记后,才会将下一个消息发送给它,否则将消息分发给其它空闲的消费者
- * prefetchCount:1 表示告诉 RabbitMQ, 在未接收到消费者确认消息之前,不在分发消息
- */
- channel.BasicQos(0, prefetchCount: 1, false);
- //构建字节数据包
- string message = State.Initialized.ToString();
- var body = Encoding.UTF8.GetBytes(message);
-
- //开启confirm模式
- channel.ConfirmSelect();
- //发送数据包
- channel.BasicPublish("", "ProcessQueue", properties, body);
- Console.WriteLine($"已发布消息:{message}");
-
- message = State.Terminated.ToString();
- body = Encoding.UTF8.GetBytes(message);
- channel.BasicPublish("", "ProcessQueue", properties, body);
- Console.WriteLine($"已发布消息:{message}");
-
- message = State.Error.ToString();
- body = Encoding.UTF8.GetBytes(message);
- channel.BasicPublish("", "ProcessQueue", properties, body);
- Console.WriteLine($"已发布消息:{message}");
-
- if (channel.WaitForConfirms())
- {
- Console.WriteLine("所有消息均已发送到Broker");
- }
- }
- }
- }
-
- //将生产者推送的信息发给每个订阅的消费者处理,使用Exchange实现
- public void PublishMessage()
- {
- var factory = new ConnectionFactory()
- {
- HostName ="localhost",
- Port = 5672,
- };
-
- using (var connection=factory.CreateConnection())
- {
- using (var channel=connection.CreateModel())
- {
- //声明信息交换机,
- //type: fanout(将信息分发到exchange上绑定的所有队列上);direct(消费者绑定的队列名称须和生产者发布指定的路由名称一致)
- channel.ExchangeDeclare(exchange:"fanoutTest",type:ExchangeType.Fanout);
-
- var message = "Kilter";
- byte[] body = Encoding.UTF8.GetBytes(message);
-
- //发布到指定exchange,fanout类型的会忽视routingKey的值,所以无需填写
- channel.BasicPublish(exchange:"fanoutTest",routingKey:"",null,body);
- Console.WriteLine($"发布消息:{message} 到exchange");
-
- }
- }
- }
-
-
- }
-
- public enum State
- {
- Terminated, // 终止状态
- Initialized, // 初始化状态
- Idle, // 空闲
- Busy, // 忙碌
- Pause, // 暂停状态
-
- Warning, // 警告,人工消除后可继续
- Error, // 错误,人工消除后可继续
- FatalError, // 致命错误,无法继续工作,仅可通过重新初始化
- }
- }
消费者RabbitMQServer.cs
- class RabbitMQServer
- {
- public ConcurrentDictionary<string, MethodInfo> methodDic = new ConcurrentDictionary<string, MethodInfo>();
- public object Instance { get; set; }
- public RabbitMQServer()
- {
- SetExecutor();
- }
-
- public void SetExecutor()
- {
- Type type= Assembly.Load("RabbitMQServer").GetType("RabbitMQServer.Executor");
- Instance = Activator.CreateInstance(type);
- MethodInfo[] methods = type.GetMethods();
- foreach (var item in methods)
- {
- methodDic.TryAdd(item.Name,item);
- }
- }
-
- public void ReceiveMessage()
- {
- var factory = new ConnectionFactory()
- {
- HostName = "localhost",
- Port = 5672,
- UserName = "guest",
- Password = "guest"
- };
-
- //tips: 不能放入 using 语句中,否则当 using 语句结束后会 Close 连接,EventingBasicConsumer.Received 事件将不会被触发
- var connection = factory.CreateConnection();
- var channel = connection.CreateModel();
- channel.QueueDeclare("ProcessQueue", durable:true, false, false, null);
- //构造消费者实例(指定消息通道)
- var consumer = new EventingBasicConsumer(channel);
- /*消费者消费消息(在当前通道中监听 ProcessQueue 队列,并进行消费)
- autoAck参数属性
- true:自动信息确认,当消费者接收到信息后,自动发送ack信号,不管信息是否处理完毕
- false:关闭自动信息确认,通过调用BasicAck方法手动进行信息确认,表示开启消息响应的功能, 当一条消息发送给消费者后,该消息必须得到消费者的“确认”后,RabbitMQ 才会将该消息删除。
- */
- channel.BasicConsume("ProcessQueue", autoAck: false, consumer);
- //绑定消息接收后的事件委托
- consumer.Received += (sender, e) =>
- {
- //ReadOnlyMemory<byte>转化为byte[] : .ToArray()
- //string转化为ReadOnlyMemory<byte>: .AsMemory()
- var body = e.Body.ToArray(); //消息字节数组
- var message = Encoding.UTF8.GetString(body); //消息内容
- Console.WriteLine($"已消费消息:{message}");
- //根据消息,添加逻辑
- if (methodDic.ContainsKey(message))
- {
- methodDic[message].Invoke(Instance, null);
- }
-
- //消息响应,在消息处理完成后回传该消息标记, 只有当响应此消息标记后,该消息才会在消息队列中删除
- //deliveryTag参数是分发的标记,multiple表示是否确认多条。
- channel.BasicAck(e.DeliveryTag, false);
-
- //用于拒绝消息,requeue表示消息被拒绝后是否重新放回queue中,true表示放回queue中,false表示直接丢弃。
- //channel.BasicReject(e.DeliveryTag, requeue: false);
- };
-
- }
-
- //多个消费者订阅一个生产者
- public void SubscribeMessage()
- {
- var factory = new ConnectionFactory()
- {
- HostName = "localhost",
- Port = 5672
- };
-
- var connection = factory.CreateConnection();
- var channel = connection.CreateModel();
- //声明信息交换机
- channel.ExchangeDeclare(exchange: "fanoutTest", type: ExchangeType.Fanout);
- //循环生产3个消费者,模拟此情境
- for (int i = 0; i < 3; i++)
- {
- //生成随机队列名称
- var queuename = channel.QueueDeclare().QueueName;
- //绑定队列到指定fanout类型exchange
- channel.QueueBind(queuename, "fanoutTest", "", null);
-
- var consumer = new EventingBasicConsumer(channel);
- consumer.Received += (sender, e) =>
- {
- var body = e.Body.ToArray(); //消息字节数组
- var message = Encoding.UTF8.GetString(body); //消息内容
- Console.WriteLine($"消费者{queuename}通过exchange已消费消息:{message}");
- };
-
- channel.BasicConsume(queuename, true, consumer);
- }
-
- }
-
-
- }
-
- public class Executor
- {
- public void Terminated()
- {
- Console.WriteLine("执行Terminated对应事件");
- }
-
- public void Initialized()
- {
- Console.WriteLine("执行Initialized对应事件");
- }
-
- public void Error()
- {
- Console.WriteLine("执行Error对应事件");
- }
- }
运行结果:
参考:
[1] C#调用RabbitMQ实现消息队列 - kiba518 - 博客园 (cnblogs.com)
[2] 快速掌握RabbitMQ(三)——消息确认、持久化、优先级的C#实现 - 捞月亮的猴子 - 博客园 (cnblogs.com)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。