赞
踩
1. 一个消费者,一个队列,一个消费者。 2. 消息产生消息放入队列,消息的消费者(consumer) 监听(while) 消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除(隐患 消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失)应用场景:聊天(中间有一个过度的服务器;p端,c端)
22.cnblogs.com/blog/1913282/202207/1913282-20220730231124618-1368574550.png)
获取RabbitMQ连接帮助类
后面代码,这部分创建连接共用
public class RabbitMQHelper { /// <summary> /// 获取RabbitMQ连接 /// </summary> /// <returns></returns> public static IConnection GetConnection() { //实例化连接工厂 var factory = new ConnectionFactory { HostName = "127.0.0.1", //ip Port = 5672, // 端口 UserName = "Admin", // 账户 Password = "Admin", // 密码 VirtualHost = "/" // 虚拟主机 }; return factory.CreateConnection(); } }
生产者
public class Send { public static void SendMessage() { string queueName = "normal"; //1.创建链接 using (var connection = RabbitMQHelper.GetConnection()) { // 2.创建信道 using(var channel = connection.CreateModel()) { // 3.声明队列 channel.QueueDeclare(queueName, false, false, false, null); // 没有绑定交换机,怎么找到路由队列的呢? for (int i = 1; i <= 30; i++) { //4.构建Byte消息数据包 string message =$"第{i}条消息"; var body = Encoding.UTF8.GetBytes(message);//消息以二进制形式传输 // 发送消息到rabbitmq,使用rabbitmq中默认提供交换机路由,默认的路由Key和队列名称完全一致 //5.发送数据包 channel.BasicPublish(exchange: "", routingKey: queueName, null, body); Thread.Sleep(1000);//添加延迟 Console.WriteLine("生产:" + message); } } } } }
消费者
public class Receive { public static void ReceiveMessage() { // 消费者消费是队列中消息 string queueName = "normal"; //1.建立链接链接 var connection = RabbitMQHelper.GetConnection(); { //2.建立信道 var channel = connection.CreateModel(); { //3.声明队列:如果你先启动是消费端就会异常 channel.QueueDeclare(queueName, false, false, false, null); //4.创建一个消费者实例 var consumer = new EventingBasicConsumer(channel); //5.绑定消息接收后的事件委托 consumer.Received +=(model, ea) => { var message = Encoding.UTF8.GetString(ea.Body.ToArray()); Thread.Sleep(1000); Console.WriteLine(" Normal Received => {0}", message); }; //6.启动消费者 channel.BasicConsume( queue: queueName, autoAck:true, consumer);//开始消费 } } } }
一个消费者,一个队列,多个消费者。但多个消费者中只会有一个会成功地消费消息
消息产生者将消息放入队列消费者可以有多个,消费者1,消费者2,同时监听同一个队列,消息被消费?C1 C2共同争抢当前的消息队列内容,谁先拿到谁负责消费消息(隐患,高并发情况下,默认会产生某一个消息被多个消费者共同使用。
应用场景:红包;大项目中的资源调度(任务分配系统不需知道哪一个任务执行系统在空闲,直接将任务扔到消息队列中,空闲的系统自动争抢)
生产者
public class WorkerSend { public static void SendMessage() { string queueName = "Worker_Queue"; using (var connection = RabbitMQHelper.GetConnection()) { using(var channel = connection.CreateModel()) { channel.QueueDeclare(queueName, false, false, false, null); for (int i = 0; i < 30; i++) { string message = $"RabbitMQ Worker {i + 1} Message"; var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish("", queueName, null, body); Console.WriteLine("send Task {0} message",i + 1); } } } } }
消费者
public class WorkerReceive { public static void ReceiveMessage() { string queueName = "Worker_Queue"; var connection = RabbitMQHelper.GetConnection(); { var channel = connection.CreateModel(); { channel.QueueDeclare(queueName, false, false, false, null); var consumer = new EventingBasicConsumer(channel); //设置prefetchCount : 1来告知RabbitMQ,在未收到消费端的消息确认时,不再分发消息,也就确保了当消费端处于忙碌状态时,不再分配任务。 channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); consumer.Received +=(model, ea) => { var message = Encoding.UTF8.GetString(ea.Body.ToArray()); Console.WriteLine(" Worker Queue Received => {0}", message); }; channel.BasicConsume(queueName,true, consumer); } } } }
一个消息生产者,一个交换机,多个队列,多个消息消费者。每个消费队列中消息一致,且每个消息消费者都从自己的消息队列的第一个消息开始消费,直到最后。
交换机为rabbitMQ中内部组件。消息生产者将消息发送给rabbitMQ后,rabbitMQ会根据订阅的消费者个数,生成对应数目的消息队列,这样每个消费者都能获取生产者发送的全部消息。
一旦消费者断开与rabbitMQ的连接,队列就会消失。如果消费者数目很多,对于rabbitMQ而言,也是个重大负担,订阅模式是个长连接,占用并发数,且每个消费者一个队列会占用大量空间
相关应用场景:邮件群发,群聊,广播
public static void SendMessage() { //1.创建连接 using (var connection = RabbitMQHelper.GetConnection()) { //2.创建信道 using(var channel = connection.CreateModel()) { // 3.声明交换机对象 channel.ExchangeDeclare("fanout_exchange", "fanout"); // 4.创建队列 string queueName1 = "fanout_queue1"; channel.QueueDeclare(queueName1, false, false, false, null); string queueName2 = "fanout_queue2"; channel.QueueDeclare(queueName2, false, false, false, null); string queueName3 = "fanout_queue3"; channel.QueueDeclare(queueName3, false, false, false, null); // 5.绑定到交互机 // fanout_exchange 绑定了 3个队列 channel.QueueBind(queue: queueName1, exchange: "fanout_exchange", routingKey: "");//指定交换机 channel.QueueBind(queue: queueName2, exchange: "fanout_exchange", routingKey: ""); channel.QueueBind(queue: queueName3, exchange: "fanout_exchange", routingKey: ""); for (int i = 0; i < 10; i++) { //6.构建消息byte数组 string message = $"RabbitMQ Fanout {i + 1} Message"; var body = Encoding.UTF8.GetBytes(message); //7.发送消息 channel.BasicPublish("fanout_exchange", "", null, body);//同时把消息发送到订阅的三个队列 Console.WriteLine("Send Fanout {0} message",i + 1); } } } } }
public class FanoutConsumer { public static void ConsumerMessage() { //1.创建连接 var connection = RabbitMQHelper.GetConnection(); { //2,。创建信道 var channel = connection.CreateModel(); { //3.申明exchange channel.ExchangeDeclare(exchange: "fanout_exchange", type: "fanout"); // 4.创建队列 string queueName1 = "fanout_queue1"; channel.QueueDeclare(queueName1, false, false, false, null); string queueName2 = "fanout_queue2"; channel.QueueDeclare(queueName2, false, false, false, null); string queueName3 = "fanout_queue3"; channel.QueueDeclare(queueName3, false, false, false, null); // 5.绑定到交互机 channel.QueueBind(queue: queueName1, exchange: "fanout_exchange", routingKey: ""); channel.QueueBind(queue: queueName2, exchange: "fanout_exchange", routingKey: ""); channel.QueueBind(queue: queueName3, exchange: "fanout_exchange", routingKey: ""); Console.WriteLine("[*] Waitting for fanout logs."); //6.申明consumer var consumer = new EventingBasicConsumer(channel); //绑定消息接收后的事件委托 consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body.ToArray()); Console.WriteLine("[x] {0}", message); }; //7.启动消费者 channel.BasicConsume(queue: queueName1, autoAck: true, consumer: consumer);//只会消费队列queueName1中的消息,其他队列中订阅的消息仍然存在 Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } } }
一个消息生产者,一个交换机,多个队列,多个消息消费者。一个交换机绑定多个消息队列,每个消息队列都有自己唯一的Routekey,每一个消息队列有一个消费者监听。
消息生产者将消息发送给交换机,交换机按照路由判断,将路由到的RouteKey的消息,推送与之绑定的队列,交换机根据路由的key,只能匹配上路由key对应的消息队列,对应的消费者才能消费消息;
public static void SendMessage() { //1.创建连接 using (var connection = RabbitMQHelper.GetConnection()) { //2.创建信道 using(var channel = connection.CreateModel()) { // 3.声明Direct交换机 channel.ExchangeDeclare("direct_exchange", "direct"); // 4.创建队列 string queueName1 = "direct_queue1"; channel.QueueDeclare(queueName1, false, false, false, null); string queueName2 = "direct_queue2"; channel.QueueDeclare(queueName2, false, false, false, null); string queueName3 = "direct_queue3"; channel.QueueDeclare(queueName3, false, false, false, null); // 5.绑定到交互机 指定routingKey channel.QueueBind(queue: queueName1, exchange: "direct_exchange", routingKey: "red"); channel.QueueBind(queue: queueName2, exchange: "direct_exchange", routingKey: "yellow"); channel.QueueBind(queue: queueName3, exchange: "direct_exchange", routingKey: "green"); for (int i = 0; i < 10; i++) { string message = $"RabbitMQ Direct {i + 1} Message =>green"; var body = Encoding.UTF8.GetBytes(message); // 发送消息的时候需要指定routingKey发送 channel.BasicPublish(exchange: "direct_exchange", routingKey: "green", null, body);//只发布到RouteKey:green的队列 Console.WriteLine("Send Direct {0} message",i + 1); } } } } }
public class DirectConsumer { public static void ConsumerMessage() { //1.创建连接 var connection = RabbitMQHelper.GetConnection(); //2.创建通信 var channel = connection.CreateModel(); //3.声明交换机 channel.ExchangeDeclare(exchange: "direct_exchange", type: "direct"); //4.绑定交换机 var queueName = "direct_queue2";//队列direct_queue3绑定有red,yellow,green共3个RouteKey channel.QueueDeclare(queueName, false, false, false, null); //此处消费通信没有必要绑定所有的RouteKey,根据前生产者通信的路由规则,每个队列中只会路由到一种消息 channel.QueueBind(queue: queueName, exchange: "direct_exchange", routingKey: "red"); channel.QueueBind(queue: queueName, exchange: "direct_exchange", routingKey: "yellow"); channel.QueueBind(queue: queueName, exchange: "direct_exchange", routingKey: "green"); Console.WriteLine(" [*] Waiting for messages."); //5.实例化消费者 var consumer = new EventingBasicConsumer(channel); //6.为消费者绑定消费委托事件 consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body.ToArray()); var routingKey = ea.RoutingKey; Console.WriteLine(" [x] Received '{0}':'{1}'", routingKey, message); // 消费完成后需要手动签收消息,如果不写该代码就容易导致重复消费问题 //7.手动确认签收消息 channel.BasicAck(ea.DeliveryTag, true); // 可以降低每次签收性能损耗 }; // 消息签收模式 // 手动签收 保证正确消费,不会丢消息(基于客户端而已) // 自动签收 容易丢消息 // 签收:意味着消息从队列中删除 channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);//设置为不自动签收,进行手动签收 Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } }
一个消息生产者,一个交换机,多个队列,多个消息消费者。一个交换机绑定多个消息队列,每个消息队列都有自己唯一的Routekey,每一个消息队列有一个消费者监听。
此时的自己唯一的Routekey,不是一个确定值,像我们熟悉的正则表达式对应的匹配规则。
生产者产生消息,把消息交给交换机,交换机根据RouteKey的模糊匹配到对应的队列,由队列监听消费者消费消息。
规则:
*代表多个单词
public static void SendMessage() { //1.创建连接 using (var connection = RabbitMQHelper.GetConnection()) { //2.创建信道 using (var channel = connection.CreateModel()) { //3.声明交换机 channel.ExchangeDeclare("topic_exchange", "topic"); //4.声明队列 string queueName1 = "topic_queue1"; channel.QueueDeclare(queueName1, false, false, false, null); string queueName2 = "topic_queue2"; channel.QueueDeclare(queueName2, false, false, false, null); string queueName3 = "topic_queue3"; channel.QueueDeclare(queueName3, false, false, false, null); //5.绑定到交互机 channel.QueueBind(queue: queueName1, exchange: "topic_exchange", routingKey: "user.data.*"); channel.QueueBind(queue: queueName2, exchange: "topic_exchange", routingKey: "user.data.delete"); channel.QueueBind(queue: queueName3, exchange: "topic_exchange", routingKey: "user.data.update"); for (int i = 0; i < 10; i++) { //6.准备发送字节数组 string message = $"RabbitMQ Topic {i + 1} Delete Message"; var body = Encoding.UTF8.GetBytes(message); //7.根据RouteKey发布消息 channel.BasicPublish("topic_exchange", "user.data.delete", null, body);//会发布到queueName1,queueName2 Console.WriteLine("Send Topic {0} message", i + 1); } } } }
public static void ConsumerMessage() { //1.创建连接 var connection = RabbitMQHelper.GetConnection(); //2.创建通信 var channel = connection.CreateModel(); //3.声明交换机 channel.ExchangeDeclare(exchange: "topic_exchange", type: "topic"); //4.声明队列 var queueName = "topic_queue3"; channel.QueueDeclare(queueName, false, false, false, null); //5.绑定交换机 channel.QueueBind(queue: queueName, exchange: "topic_exchange", routingKey: "user.data.*"); Console.WriteLine(" [*] Waiting for messages."); //6.创建消费者 var consumer = new EventingBasicConsumer(channel); //7.绑定消费委托事件 consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body.ToArray()); var routingKey = ea.RoutingKey; Console.WriteLine(" [x] Received '{0}':'{1}'", routingKey, message); }; //8.启动消费 channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); }
RPC即客户端远程调用服务端的方法 ,使用MQ可以实现RPC的异步调用,基于Direct交换机实现,流程如下:
1、客户端即是生产者也是消费者,向RPC请求队列发送RPC调用消息,同时监听RPC响应队列。
2、服务端监听RPC请求队列的消息,收到消息后执行服务端的方法,得到方法返回的结果。
3、服务端将RPC方法 的结果发送到RPC响应队列。
4、客户端(RPC调用方)监听RPC响应队列,接收到RPC调用结果。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。