赞
踩
1.先部署好RabbitMQ
2.进入登录页面
3.创建虚拟机
4.添加新用户
5.新用户绑定虚拟机
6.代码创建与rabbitmq链接
需下载引用包
- public class RabbitMQHelper
- {
- /// <summary>
- /// 获取RabbitMQ连接对象方法(创建与RabbitMQ的连接)
- /// </summary>
- /// <returns></returns>
- public static IConnection GetConnection() {
- //创建连接工厂【设置相关属性】
- var connectionFactory = new ConnectionFactory()
- {
- //设置IP
- HostName = "127.0.0.1",//RabbitMQ地址
- Port = 5672,//端口
- VirtualHost = "/test",//RabbitMQ中要请求的VirtualHost名称
- UserName ="test",//RabbitMQ用户
- Password= "test"//RabbitMQ用户密码
-
- };
- //通过工厂创建连接对象
- return connectionFactory.CreateConnection();
-
- }
- }
7.简单模式
生产者
- /// <summary>
- /// 简单队列模式-生产者
- /// </summary>
- public class SampleProducer
- {
- /// <summary>
- /// 简单队列模式
- /// 生产者
- /// 通过管道向RabbitMQ发送消息
- /// </summary>
- public void SendMessage() {
- //获取连接对象
- using var connection=RabbitMQHelper.GetConnection();
- //创建管道
- using var channel= connection.CreateModel();
- //创建队列
- channel.QueueDeclare("sample_queue",false,false,false,null);
-
- for (int i = 0; i < 5; i++)
- {
- string msg = $"Hello Word Message{i + 1}";
- var body = Encoding.UTF8.GetBytes(msg);
- //向RabbitMQ发送消息
- //routingKey指定发往哪个队列(如routingkey和队列名相同则为默认队列)
- //exchange为空则默认链接的交换机, false,null, body为消息主体
- channel.BasicPublish("", "sample_queue", false, null, body);
- //输出发送的消息
- Console.WriteLine(msg);
- }
- }
- }
消费者
- /// <summary>
- /// 简单队列模式-消费者
- /// </summary>
- public class SampleConsumer
- {
- /// <summary>
- /// 简单队列模式
- /// 消费者
- /// 通过管道向RabbitMQ获取消息且消费消息
- /// </summary>
- public static void ConsumerMessage() {
- //获取连接对象
- var connection=RabbitMQHelper.GetConnection();
- //创建管道
- var chennel=connection.CreateModel();
- //创建队列(如队列已存在的情况可不用再次创建/此创建为:确保先开启消费者,生产者未创建队列而引发报错)
- chennel.QueueDeclare("sample_queue", false, false, false, null);
-
- //事件对象
- var consumer= new EventingBasicConsumer(chennel);
- consumer.Received += (model, ea) => {
- //获取消息
- var body=ea.Body;//获取队列中消息主体
- var msg=Encoding.UTF8.GetString(body.ToArray());//转为string
- var routingkey=ea.RoutingKey;//队列名
- Console.WriteLine($"message is:{msg} ,routingkey => {routingkey}");
- };
- //消费消息
- chennel.BasicConsume("sample_queue", true, consumer);
-
- }
- }
8.发布订阅模式
生产者
- /// <summary>
- /// 发布订阅模式-生产者
- /// </summary>
- public class FanoutProducer
- {
- /// <summary>
- /// 发布订阅模式
- /// 生产者
- /// 设置交换机,创建多个队列,队列订阅交换机,生产者往交换机发送消息即可
- /// </summary>
- public void SendMessage() {
- //获取连接对象
- using var connection = RabbitMQHelper.GetConnection();
- //创建管道
- using var channel = connection.CreateModel();
- //创建交换机
- channel.ExchangeDeclare("fanout_Exchange","fanout",false,false,null);
- //创建队列
- channel.QueueDeclare("fanout_queue1",false,false,false,null);
- channel.QueueDeclare("fanout_queue2", false, false, false, null);
- channel.QueueDeclare("fanout_queue3", false, false, false, null);
- //把队列绑定到交换机(就是队列订阅交换机)
- channel.QueueBind("fanout_queue1", "fanout_Exchange", "",null);
- channel.QueueBind("fanout_queue2", "fanout_Exchange", "", null);
- channel.QueueBind("fanout_queue3", "fanout_Exchange", "", null);
-
- for (int i = 0; i < 10; i++)
- {
- string msg = $"RabbiteMQ fanout Message{i+1}";
- var body=Encoding.UTF8.GetBytes(msg);
- //发送消息
- //创建了交换机则不需要routingkey了,因为只需要队列订阅交换机即可
- //消息发送到交换机,创建了的队列订阅了交换机则会自动去交换机拿值
- channel.BasicPublish("fanout_Exchange","",false,null,body);
- Console.WriteLine(msg);
- }
- }
- }
消费者
- /// <summary>
- /// 发布订阅模式-消费者
- /// </summary>
- public class FanoutConsumer
- {
- /// <summary>
- /// 发布订阅模式
- /// 消费者
- /// 通过管道向RabbitMQ获取消息且消费消息
- /// </summary>
- public static void ConsumerMessage()
- {
- //获取连接对象
- var connection = RabbitMQHelper.GetConnection();
- //创建管道
- var channel = connection.CreateModel();
-
- /*
- * 在消费者创建队列/交换机是以防范先启动消费者而引发报错
- * (如队列已存在的情况可不用再次创建/此创建为:确保先开启消费者,生产者未创建队列而引发报错)
- */
-
- //创建交换机
- channel.ExchangeDeclare("fanout_Exchange", "fanout", false, false, null);
- //创建队列
- channel.QueueDeclare("fanout_queue1", false, false, false, null);
- channel.QueueDeclare("fanout_queue2", false, false, false, null);
- channel.QueueDeclare("fanout_queue3", false, false, false, null);
- //把队列绑定到交换机(就是队列订阅交换机)
- channel.QueueBind("fanout_queue1", "fanout_Exchange", "", null);
- channel.QueueBind("fanout_queue2", "fanout_Exchange", "", null);
- channel.QueueBind("fanout_queue3", "fanout_Exchange", "", null);
-
- //事件对象
- var consumer = new EventingBasicConsumer(channel);
- consumer.Received += (model, ea) => {
- //获取消息
- var body = ea.Body;//获取队列中消息主体
- var msg = Encoding.UTF8.GetString(body.ToArray());//转为string
- var routingkey = ea.RoutingKey;//队列名
- Console.WriteLine($"message is:{msg} ,routingkey => {routingkey}");
- };
- //消费消息
- //可一次消费多个队列消息(也可消费单个队列)
- channel.BasicConsume("fanout_queue1", true, consumer);
- channel.BasicConsume("fanout_queue2", true, consumer);
- channel.BasicConsume("fanout_queue3", true, consumer);
- }
- }
9.路由模式
生产者
- /// <summary>
- /// 路由模式-生产者
- /// </summary>
- public class DirectProducer
- {
- /// <summary>
- /// 路由模式
- /// 生产者
- /// 设置交换机,创建多个队列,队列订阅交换机,生产者往交换机发送消息即可
- /// 给队列设置指定routingkey,发送消息时可通过订阅了交换机队列的routingkry指定发送给某个队列
- /// </summary>
- public void SendMessage()
- {
- //获取连接对象
- using var connection = RabbitMQHelper.GetConnection();
- //创建管道
- using var channel = connection.CreateModel();
- //创建交换机
- channel.ExchangeDeclare("direct_Exchange", "direct", false, false, null);
- //创建队列
- channel.QueueDeclare("direct_queue1", false, false, false, null);
- channel.QueueDeclare("direct_queue2", false, false, false, null);
- channel.QueueDeclare("direct_queue3", false, false, false, null);
-
- //把队列绑定到交换机(就是队列订阅交换机)
- channel.QueueBind("direct_queue1", "direct_Exchange", "info", null);//设置routingkey(可不同的消息发往不同的队列) info:为正常的消息
- channel.QueueBind("direct_queue2", "direct_Exchange", "warn", null);//warn:非正常的消息
- channel.QueueBind("direct_queue3", "direct_Exchange", "error", null);//error:错误的消息
-
- for (int i = 0; i < 10; i++)
- {
- string msg = $"RabbiteMQ direct Message{i + 1}";
- var body = Encoding.UTF8.GetBytes(msg);
- //发送消息
- //根据已订阅交换机队列的routingkey指定发送消息
- channel.BasicPublish("direct_Exchange", "info", false, null, body);
- Console.WriteLine(msg);
- }
- }
- }
消费者
- /// <summary>
- /// 路由模式-消费者
- /// </summary>
- public class DirectConsumer
- {
- /// <summary>
- /// 路由模式-消费者
- /// 消费者
- /// 通过管道向RabbitMQ获取消息且消费消息
- /// </summary>
- public static void ConsumerMessage()
- {
- //获取连接对象
- var connection = RabbitMQHelper.GetConnection();
- //创建管道
- var channel = connection.CreateModel();
-
- /*
- * 在消费者创建队列/交换机是以防范先启动消费者而引发报错
- * (如队列已存在的情况可不用再次创建/此创建为:确保先开启消费者,生产者未创建队列而引发报错)
- */
-
- //创建交换机
- channel.ExchangeDeclare("direct_Exchange", "direct", false, false, null);
- //创建队列
- channel.QueueDeclare("direct_queue1", false, false, false, null);
- channel.QueueDeclare("direct_queue2", false, false, false, null);
- channel.QueueDeclare("direct_queue3", false, false, false, null);
- //把队列绑定到交换机(就是队列订阅交换机)
- channel.QueueBind("direct_queue1", "direct_Exchange", "info", null);
- channel.QueueBind("direct_queue2", "direct_Exchange", "warn", null);
- channel.QueueBind("direct_queue3", "direct_Exchange", "error", null);
-
- //事件对象
- var consumer = new EventingBasicConsumer(channel);
- consumer.Received += (model, ea) => {
- //获取消息
- var body = ea.Body;//获取队列中消息主体
- var msg = Encoding.UTF8.GetString(body.ToArray());//转为string
- var routingkey = ea.RoutingKey;//队列名
- Console.WriteLine($"message is:{msg} ,routingkey => {routingkey}");
- };
-
- //消费消息
- //可一次消费多个队列消息(也可消费单个队列)
- channel.BasicConsume("direct_queue1", true, consumer);
- channel.BasicConsume("direct_queue2", true, consumer);
- channel.BasicConsume("direct_queue3", true, consumer);
- }
- }
10.主题模式
生产者
/// <summary>
/// 主题模式-生产者
/// </summary>
public class TopicProducer
{
/// <summary>
/// 主题模式
/// 生产者
/// 设置交换机,创建多个队列,队列订阅交换机,生产者往交换机发送消息即可
/// 给队列设置指定routingkey,发送消息时可通过订阅了交换机队列的routingkry指定发送给某个队列
/// </summary>
public void SendMessage()
{
//获取连接对象
using var connection = RabbitMQHelper.GetConnection();
//创建管道
using var channel = connection.CreateModel();
//创建交换机
channel.ExchangeDeclare("topic_Exchange", "topic", false, false, null);
//创建队列
channel.QueueDeclare("topic_queue1", false, false, false, null);
channel.QueueDeclare("topic_queue2", false, false, false, null);
channel.QueueDeclare("topic_queue3", false, false, false, null);
//把队列绑定到交换机(就是队列订阅交换机)
channel.QueueBind("topic_queue1", "topic_Exchange", "user.insert", null);//设置routingkey(可不同的消息发往不同的队列) info:为正常的消息
channel.QueueBind("topic_queue2", "topic_Exchange", "user.update", null);//warn:非正常的消息
//通配符: * 代表随意一个单词 # 代表任意组合词汇
//(在添加其它队列时,符合通配符条件则这个队列消息也会添加进去)
channel.QueueBind("topic_queue3", "topic_Exchange", "user.*", null);//error:错误的消息
for (int i = 0; i < 10; i++)
{
string msg = $"RabbiteMQ topic Message{i + 1}";
var body = Encoding.UTF8.GetBytes(msg);
//发送消息
//根据已订阅交换机队列的routingkey指定发送消息
channel.BasicPublish("topic_Exchange", "user.update", false, null, body);
Console.WriteLine(msg);
}
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。