当前位置:   article > 正文

C# RabbitMQ基本连接方式和四种模式使用Dome_c# rabbitmq demo

c# rabbitmq demo

1.先部署好RabbitMQ

2.进入登录页面

3.创建虚拟机

4.添加新用户

5.新用户绑定虚拟机

6.代码创建与rabbitmq链接

需下载引用包

 

  1. public class RabbitMQHelper
  2. {
  3. /// <summary>
  4. /// 获取RabbitMQ连接对象方法(创建与RabbitMQ的连接)
  5. /// </summary>
  6. /// <returns></returns>
  7. public static IConnection GetConnection() {
  8. //创建连接工厂【设置相关属性】
  9. var connectionFactory = new ConnectionFactory()
  10. {
  11. //设置IP
  12. HostName = "127.0.0.1",//RabbitMQ地址
  13. Port = 5672,//端口
  14. VirtualHost = "/test",//RabbitMQ中要请求的VirtualHost名称
  15. UserName ="test",//RabbitMQ用户
  16. Password= "test"//RabbitMQ用户密码
  17. };
  18. //通过工厂创建连接对象
  19. return connectionFactory.CreateConnection();
  20. }
  21. }

 7.简单模式

生产者

  1. /// <summary>
  2. /// 简单队列模式-生产者
  3. /// </summary>
  4. public class SampleProducer
  5. {
  6. /// <summary>
  7. /// 简单队列模式
  8. /// 生产者
  9. /// 通过管道向RabbitMQ发送消息
  10. /// </summary>
  11. public void SendMessage() {
  12. //获取连接对象
  13. using var connection=RabbitMQHelper.GetConnection();
  14. //创建管道
  15. using var channel= connection.CreateModel();
  16. //创建队列
  17. channel.QueueDeclare("sample_queue",false,false,false,null);
  18. for (int i = 0; i < 5; i++)
  19. {
  20. string msg = $"Hello Word Message{i + 1}";
  21. var body = Encoding.UTF8.GetBytes(msg);
  22. //向RabbitMQ发送消息
  23. //routingKey指定发往哪个队列(如routingkey和队列名相同则为默认队列)
  24. //exchange为空则默认链接的交换机, falsenull, body为消息主体
  25. channel.BasicPublish("", "sample_queue", false, null, body);
  26. //输出发送的消息
  27. Console.WriteLine(msg);
  28. }
  29. }
  30. }

 消费者

  1. /// <summary>
  2. /// 简单队列模式-消费者
  3. /// </summary>
  4. public class SampleConsumer
  5. {
  6. /// <summary>
  7. /// 简单队列模式
  8. /// 消费者
  9. /// 通过管道向RabbitMQ获取消息且消费消息
  10. /// </summary>
  11. public static void ConsumerMessage() {
  12. //获取连接对象
  13. var connection=RabbitMQHelper.GetConnection();
  14. //创建管道
  15. var chennel=connection.CreateModel();
  16. //创建队列(如队列已存在的情况可不用再次创建/此创建为:确保先开启消费者,生产者未创建队列而引发报错)
  17. chennel.QueueDeclare("sample_queue", false, false, false, null);
  18. //事件对象
  19. var consumer= new EventingBasicConsumer(chennel);
  20. consumer.Received += (model, ea) => {
  21. //获取消息
  22. var body=ea.Body;//获取队列中消息主体
  23. var msg=Encoding.UTF8.GetString(body.ToArray());//转为string
  24. var routingkey=ea.RoutingKey;//队列名
  25. Console.WriteLine($"message is:{msg} ,routingkey => {routingkey}");
  26. };
  27. //消费消息
  28. chennel.BasicConsume("sample_queue", true, consumer);
  29. }
  30. }

8.发布订阅模式

生产者

  1. /// <summary>
  2. /// 发布订阅模式-生产者
  3. /// </summary>
  4. public class FanoutProducer
  5. {
  6. /// <summary>
  7. /// 发布订阅模式
  8. /// 生产者
  9. /// 设置交换机,创建多个队列,队列订阅交换机,生产者往交换机发送消息即可
  10. /// </summary>
  11. public void SendMessage() {
  12. //获取连接对象
  13. using var connection = RabbitMQHelper.GetConnection();
  14. //创建管道
  15. using var channel = connection.CreateModel();
  16. //创建交换机
  17. channel.ExchangeDeclare("fanout_Exchange","fanout",false,false,null);
  18. //创建队列
  19. channel.QueueDeclare("fanout_queue1",false,false,false,null);
  20. channel.QueueDeclare("fanout_queue2", false, false, false, null);
  21. channel.QueueDeclare("fanout_queue3", false, false, false, null);
  22. //把队列绑定到交换机(就是队列订阅交换机)
  23. channel.QueueBind("fanout_queue1", "fanout_Exchange", "",null);
  24. channel.QueueBind("fanout_queue2", "fanout_Exchange", "", null);
  25. channel.QueueBind("fanout_queue3", "fanout_Exchange", "", null);
  26. for (int i = 0; i < 10; i++)
  27. {
  28. string msg = $"RabbiteMQ fanout Message{i+1}";
  29. var body=Encoding.UTF8.GetBytes(msg);
  30. //发送消息
  31. //创建了交换机则不需要routingkey了,因为只需要队列订阅交换机即可
  32. //消息发送到交换机,创建了的队列订阅了交换机则会自动去交换机拿值
  33. channel.BasicPublish("fanout_Exchange","",false,null,body);
  34. Console.WriteLine(msg);
  35. }
  36. }
  37. }

消费者

  1. /// <summary>
  2. /// 发布订阅模式-消费者
  3. /// </summary>
  4. public class FanoutConsumer
  5. {
  6. /// <summary>
  7. /// 发布订阅模式
  8. /// 消费者
  9. /// 通过管道向RabbitMQ获取消息且消费消息
  10. /// </summary>
  11. public static void ConsumerMessage()
  12. {
  13. //获取连接对象
  14. var connection = RabbitMQHelper.GetConnection();
  15. //创建管道
  16. var channel = connection.CreateModel();
  17. /*
  18. * 在消费者创建队列/交换机是以防范先启动消费者而引发报错
  19. * (如队列已存在的情况可不用再次创建/此创建为:确保先开启消费者,生产者未创建队列而引发报错)
  20. */
  21. //创建交换机
  22. channel.ExchangeDeclare("fanout_Exchange", "fanout", false, false, null);
  23. //创建队列
  24. channel.QueueDeclare("fanout_queue1", false, false, false, null);
  25. channel.QueueDeclare("fanout_queue2", false, false, false, null);
  26. channel.QueueDeclare("fanout_queue3", false, false, false, null);
  27. //把队列绑定到交换机(就是队列订阅交换机)
  28. channel.QueueBind("fanout_queue1", "fanout_Exchange", "", null);
  29. channel.QueueBind("fanout_queue2", "fanout_Exchange", "", null);
  30. channel.QueueBind("fanout_queue3", "fanout_Exchange", "", null);
  31. //事件对象
  32. var consumer = new EventingBasicConsumer(channel);
  33. consumer.Received += (model, ea) => {
  34. //获取消息
  35. var body = ea.Body;//获取队列中消息主体
  36. var msg = Encoding.UTF8.GetString(body.ToArray());//转为string
  37. var routingkey = ea.RoutingKey;//队列名
  38. Console.WriteLine($"message is:{msg} ,routingkey => {routingkey}");
  39. };
  40. //消费消息
  41. //可一次消费多个队列消息(也可消费单个队列)
  42. channel.BasicConsume("fanout_queue1", true, consumer);
  43. channel.BasicConsume("fanout_queue2", true, consumer);
  44. channel.BasicConsume("fanout_queue3", true, consumer);
  45. }
  46. }

9.路由模式

生产者

  1. /// <summary>
  2. /// 路由模式-生产者
  3. /// </summary>
  4. public class DirectProducer
  5. {
  6. /// <summary>
  7. /// 路由模式
  8. /// 生产者
  9. /// 设置交换机,创建多个队列,队列订阅交换机,生产者往交换机发送消息即可
  10. /// 给队列设置指定routingkey,发送消息时可通过订阅了交换机队列的routingkry指定发送给某个队列
  11. /// </summary>
  12. public void SendMessage()
  13. {
  14. //获取连接对象
  15. using var connection = RabbitMQHelper.GetConnection();
  16. //创建管道
  17. using var channel = connection.CreateModel();
  18. //创建交换机
  19. channel.ExchangeDeclare("direct_Exchange", "direct", false, false, null);
  20. //创建队列
  21. channel.QueueDeclare("direct_queue1", false, false, false, null);
  22. channel.QueueDeclare("direct_queue2", false, false, false, null);
  23. channel.QueueDeclare("direct_queue3", false, false, false, null);
  24. //把队列绑定到交换机(就是队列订阅交换机)
  25. channel.QueueBind("direct_queue1", "direct_Exchange", "info", null);//设置routingkey(可不同的消息发往不同的队列) info:为正常的消息
  26. channel.QueueBind("direct_queue2", "direct_Exchange", "warn", null);//warn:非正常的消息
  27. channel.QueueBind("direct_queue3", "direct_Exchange", "error", null);//error:错误的消息
  28. for (int i = 0; i < 10; i++)
  29. {
  30. string msg = $"RabbiteMQ direct Message{i + 1}";
  31. var body = Encoding.UTF8.GetBytes(msg);
  32. //发送消息
  33. //根据已订阅交换机队列的routingkey指定发送消息
  34. channel.BasicPublish("direct_Exchange", "info", false, null, body);
  35. Console.WriteLine(msg);
  36. }
  37. }
  38. }

消费者

  1. /// <summary>
  2. /// 路由模式-消费者
  3. /// </summary>
  4. public class DirectConsumer
  5. {
  6. /// <summary>
  7. /// 路由模式-消费者
  8. /// 消费者
  9. /// 通过管道向RabbitMQ获取消息且消费消息
  10. /// </summary>
  11. public static void ConsumerMessage()
  12. {
  13. //获取连接对象
  14. var connection = RabbitMQHelper.GetConnection();
  15. //创建管道
  16. var channel = connection.CreateModel();
  17. /*
  18. * 在消费者创建队列/交换机是以防范先启动消费者而引发报错
  19. * (如队列已存在的情况可不用再次创建/此创建为:确保先开启消费者,生产者未创建队列而引发报错)
  20. */
  21. //创建交换机
  22. channel.ExchangeDeclare("direct_Exchange", "direct", false, false, null);
  23. //创建队列
  24. channel.QueueDeclare("direct_queue1", false, false, false, null);
  25. channel.QueueDeclare("direct_queue2", false, false, false, null);
  26. channel.QueueDeclare("direct_queue3", false, false, false, null);
  27. //把队列绑定到交换机(就是队列订阅交换机)
  28. channel.QueueBind("direct_queue1", "direct_Exchange", "info", null);
  29. channel.QueueBind("direct_queue2", "direct_Exchange", "warn", null);
  30. channel.QueueBind("direct_queue3", "direct_Exchange", "error", null);
  31. //事件对象
  32. var consumer = new EventingBasicConsumer(channel);
  33. consumer.Received += (model, ea) => {
  34. //获取消息
  35. var body = ea.Body;//获取队列中消息主体
  36. var msg = Encoding.UTF8.GetString(body.ToArray());//转为string
  37. var routingkey = ea.RoutingKey;//队列名
  38. Console.WriteLine($"message is:{msg} ,routingkey => {routingkey}");
  39. };
  40. //消费消息
  41. //可一次消费多个队列消息(也可消费单个队列)
  42. channel.BasicConsume("direct_queue1", true, consumer);
  43. channel.BasicConsume("direct_queue2", true, consumer);
  44. channel.BasicConsume("direct_queue3", true, consumer);
  45. }
  46. }

10.主题模式

生产者

 /// <summary>
    /// 主题模式-生产者
    /// </summary>
   public class TopicProducer
    {
        /// <summary>
        /// 主题模式
        /// 生产者
        /// 设置交换机,创建多个队列,队列订阅交换机,生产者往交换机发送消息即可
        /// 给队列设置指定routingkey,发送消息时可通过订阅了交换机队列的routingkry指定发送给某个队列
        /// </summary>
        public void SendMessage()
        {
            //获取连接对象
            using var connection = RabbitMQHelper.GetConnection();
            //创建管道
            using var channel = connection.CreateModel();
            //创建交换机
            channel.ExchangeDeclare("topic_Exchange", "topic", false, false, null);
            //创建队列
            channel.QueueDeclare("topic_queue1", false, false, false, null);
            channel.QueueDeclare("topic_queue2", false, false, false, null);
            channel.QueueDeclare("topic_queue3", false, false, false, null);

            //把队列绑定到交换机(就是队列订阅交换机)
            channel.QueueBind("topic_queue1", "topic_Exchange", "user.insert", null);//设置routingkey(可不同的消息发往不同的队列)  info:为正常的消息
            channel.QueueBind("topic_queue2", "topic_Exchange", "user.update", null);//warn:非正常的消息
            //通配符: * 代表随意一个单词  # 代表任意组合词汇
            //(在添加其它队列时,符合通配符条件则这个队列消息也会添加进去)
            channel.QueueBind("topic_queue3", "topic_Exchange", "user.*", null);//error:错误的消息 

            for (int i = 0; i < 10; i++)
            {
                string msg = $"RabbiteMQ topic Message{i + 1}";
                var body = Encoding.UTF8.GetBytes(msg);
                //发送消息
                //根据已订阅交换机队列的routingkey指定发送消息
                channel.BasicPublish("topic_Exchange", "user.update", false, null, body);
                Console.WriteLine(msg);
            }
        }
    }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/一键难忘520/article/detail/994074
推荐阅读
相关标签
  

闽ICP备14008679号