当前位置:   article > 正文

图文并茂的RabbitMQ工作模式_basicqos

basicqos

一、简单模式

1、简单模式

一对一模式,一个生产者、一个消费者,生产者发送消息,消费者消费消息。

生产者:也就是要发送消息的程序

消费者:消息的接收者,会一直等待消息到来

消息队列:类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息

2、实例代码:

  1. //生产者
  2. public class Producer {
  3. public static void main(String[] args) throws IOException, TimeoutException {
  4. ConnectionFactory connectionFactory = new ConnectionFactory();
  5. connectionFactory.setHost("118.31.55.110");
  6. connectionFactory.setPort(5672);
  7. connectionFactory.setUsername("guest");
  8. connectionFactory.setPassword("guest");
  9. connectionFactory.setVirtualHost("/");
  10. Connection connecte = connectionFactory.newConnection();
  11. Channel channel = connecte.createChannel();
  12. channel.queueDeclare("queue", false, false, false, null);
  13. String message = "Jaosn Hello World!";
  14. channel.basicPublish("","queue",null,message.getBytes());
  15. channel.close();
  16. connecte.close();
  17. System.out.println("=====消息发送成功======");
  18. }
  19. }
  20. //消费者
  21. public class Consumer {
  22. public static void main(String[] args) throws IOException, TimeoutException {
  23. ConnectionFactory connectionFactory = new ConnectionFactory();
  24. connectionFactory.setHost("118.31.55.110");
  25. connectionFactory.setPort(5672);
  26. connectionFactory.setUsername("guest");
  27. connectionFactory.setPassword("guest");
  28. connectionFactory.setVirtualHost("/");
  29. Connection connecte = connectionFactory.newConnection();
  30. Channel channel = connecte.createChannel();
  31. channel.queueDeclare("queue", false, false, false, null);
  32. //从mq服务器获取数据
  33. channel.basicConsume("queue",false,new Reciver(channel));
  34. }
  35. }
  36. class Reciver extends DefaultConsumer {
  37. private Channel channel;
  38. //重写构造函数,Channel通道对象需要从外层传入,在handleDelivery中要用到
  39. public Reciver(Channel channel) {
  40. super(channel);
  41. this.channel = channel;
  42. }
  43. @Override
  44. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  45. String message = new String(body);
  46. System.out.println("消费者接收到的消息:"+message);
  47. System.out.println("消息的TagId:"+envelope.getDeliveryTag());
  48. //false只确认签收当前的消息,设置为true的时候则代表签收该消费者所有未签收的消息
  49. channel.basicAck(envelope.getDeliveryTag(), false);
  50. }
  51. }

二、工作队列模式

1、工作队列模式

一对多模式,一个生产者,多个消费者,一个队列,每个消费者从队列中获取唯一的消息。与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。

  • 在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系
  • 工作模式对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。例如:短信服务部署多个,只需要有一个节点成功发送即可。

2、代码:

  1. //订单系统
  2. public class OrderSystem {
  3. public static void main(String[] args) throws IOException, TimeoutException {
  4. Connection connection = RabbitUtils.getConnection();
  5. Channel channel = connection.createChannel();
  6. channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);
  7. for(int i = 1 ; i <= 100 ; i++) {
  8. SMS sms = new SMS("乘客" + i, "13900000" + i, "您的车票已预订成功");
  9. String jsonSMS = new Gson().toJson(sms);
  10. channel.basicPublish("" , RabbitConstant.QUEUE_SMS , null , jsonSMS.getBytes());
  11. }
  12. System.out.println("发送数据成功");
  13. channel.close();
  14. connection.close();
  15. }
  16. }
  17. //短信服务1
  18. public class SMSSender1 {
  19. public static void main(String[] args) throws IOException {
  20. Connection connection = RabbitUtils.getConnection();
  21. final Channel channel = connection.createChannel();
  22. channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);
  23. //如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者
  24. //basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的
  25. channel.basicQos(1);//处理完一个取一个
  26. channel.basicConsume(RabbitConstant.QUEUE_SMS , false , new DefaultConsumer(channel){
  27. @Override
  28. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  29. String jsonSMS = new String(body);
  30. System.out.println("SMSSender1-短信发送成功:" + jsonSMS);
  31. try {
  32. Thread.sleep(10);
  33. } catch (InterruptedException e) {
  34. e.printStackTrace();
  35. }
  36. channel.basicAck(envelope.getDeliveryTag() , false);
  37. }
  38. });
  39. }
  40. }
  41. //短信服务2
  42. public class SMSSender2 {
  43. public static void main(String[] args) throws IOException {
  44. Connection connection = RabbitUtils.getConnection();
  45. final Channel channel = connection.createChannel();
  46. channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);
  47. //如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者
  48. //basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的
  49. channel.basicQos(1);//处理完一个取一个
  50. channel.basicConsume(RabbitConstant.QUEUE_SMS , false , new DefaultConsumer(channel){
  51. @Override
  52. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  53. String jsonSMS = new String(body);
  54. System.out.println("SMSSender2-短信发送成功:" + jsonSMS);
  55. try {
  56. Thread.sleep(100);
  57. } catch (InterruptedException e) {
  58. e.printStackTrace();
  59. }
  60. channel.basicAck(envelope.getDeliveryTag() , false);
  61. }
  62. });
  63. }
  64. }
  65. //短信服务3
  66. public class SMSSender3 {
  67. public static void main(String[] args) throws IOException {
  68. Connection connection = RabbitUtils.getConnection();
  69. final Channel channel = connection.createChannel();
  70. channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);
  71. //如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者
  72. //basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的
  73. channel.basicQos(1);//处理完一个取一个
  74. channel.basicConsume(RabbitConstant.QUEUE_SMS , false , new DefaultConsumer(channel){
  75. @Override
  76. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  77. String jsonSMS = new String(body);
  78. System.out.println("SMSSender3-短信发送成功:" + jsonSMS);
  79. try {
  80. Thread.sleep(500);
  81. } catch (InterruptedException e) {
  82. e.printStackTrace();
  83. }
  84. channel.basicAck(envelope.getDeliveryTag() , false);
  85. }
  86. });
  87. }
  88. }

三、发布订阅模式

1、示意图:

2、发布订阅模式

在订阅模型中,多了一个 Exchange 角色,而且过程略有变化:

生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)

消费者,消息的接收者,会一直等待消息到来

消息队列,接收消息、缓存消息

交换机一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:

  • Fanout:广播,将消息交给所有绑定到交换机的队列
  • Direct:定向,把消息交给符合指定routing key 的队列
  • Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列

交换机:只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失!

3、实例:

(1)图

(2)创建队列

(3)创建交换机

(4)队列跟交换机绑定

(5)代码:

  1. //发布者
  2. public class WeatherBureau {
  3. public static void main(String[] args) throws Exception {
  4. Connection connection = RabbitUtils.getConnection();
  5. String input = new Scanner(System.in).next();
  6. Channel channel = connection.createChannel();
  7. //第一个参数交换机名字 其他参数和之前的一样
  8. channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER,"" , null , input.getBytes());
  9. channel.close();
  10. connection.close();
  11. }
  12. }
  13. //订阅者
  14. public class Sina {
  15. public static void main(String[] args) throws IOException {
  16. //获取TCP长连接
  17. Connection connection = RabbitUtils.getConnection();
  18. //获取虚拟连接
  19. final Channel channel = connection.createChannel();
  20. //声明队列信息
  21. channel.queueDeclare(RabbitConstant.QUEUE_SINA, false, false, false, null);
  22. //queueBind用于将队列与交换机绑定
  23. //参数1:队列名 参数2:交互机名 参数三:路由key(暂时用不到)
  24. channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER, "");
  25. channel.basicQos(1);
  26. channel.basicConsume(RabbitConstant.QUEUE_SINA , false , new DefaultConsumer(channel){
  27. @Override
  28. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  29. System.out.println("新浪天气收到气象信息:" + new String(body));
  30. channel.basicAck(envelope.getDeliveryTag() , false);
  31. }
  32. });
  33. }
  34. }
  35. public class BiaDu {
  36. public static void main(String[] args) throws IOException {
  37. //获取TCP长连接
  38. Connection connection = RabbitUtils.getConnection();
  39. //获取虚拟连接
  40. final Channel channel = connection.createChannel();
  41. //声明队列信息
  42. channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, false, false, false, null);
  43. //queueBind用于将队列与交换机绑定
  44. //参数1:队列名 参数2:交互机名 参数三:路由key(暂时用不到)
  45. channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER, "");
  46. channel.basicQos(1);
  47. channel.basicConsume(RabbitConstant.QUEUE_BAIDU , false , new DefaultConsumer(channel){
  48. @Override
  49. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  50. System.out.println("百度天气收到气象信息:" + new String(body));
  51. channel.basicAck(envelope.getDeliveryTag() , false);
  52. }
  53. });
  54. }

(6)小结:

a、交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者都收到。

b、发布订阅模式与工作队列模式的区别:

  • 工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机
  • 发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使用默认交换机)
  • 发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将队列绑 定到默认的交换机

四、路由模式

1、路由模式:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个 RoutingKey(路由key)
  • 消息的发送方在向 Exchange 发送消息时,也必须指定消息的 RoutingKey
  • Exchange 不再把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列的Routingkey 与消息的 Routing key 完全一致,才会接收到消息

2、图:

图解:

  • 生产者:向交换机发送消息,发送消息时,会指定一个routing key
  • 交换机:接收生产者的消息,然后把消息递交给与 routing key 完全匹配的队列
  • 消费者1:其所在队列指定了需要 routing key 为 info、error 的消息
  • 消费者2:其所在队列指定了需要 routing key 为 error、warning 的消息

3、实例图:

4、代码:

  1. //发布者
  2. public class WeatherBureau {
  3. public static void main(String[] args) throws Exception {
  4. Map area = new LinkedHashMap<String, String>();
  5. area.put("china.hunan.changsha.20201127", "中国湖南长沙20201127天气数据");
  6. area.put("china.hubei.wuhan.20201127", "中国湖北武汉20201127天气数据");
  7. area.put("china.hunan.zhuzhou.20201127", "中国湖南株洲20201128天气数据");
  8. area.put("us.cal.lsj.20201127", "美国加州洛杉矶20201127天气数据");
  9. area.put("china.hebei.shijiazhuang.20201128", "中国河北石家庄20201128天气数据");
  10. area.put("china.hubei.wuhan.20201128", "中国湖北武汉20201128天气数据");
  11. area.put("china.henan.zhengzhou.20201128", "中国河南郑州20201128天气数据");
  12. area.put("us.cal.lsj.20201128", "美国加州洛杉矶20201128天气数据");
  13. Connection connection = RabbitUtils.getConnection();
  14. Channel channel = connection.createChannel();
  15. Iterator<Map.Entry<String, String>> itr = area.entrySet().iterator();
  16. while (itr.hasNext()) {
  17. Map.Entry<String, String> me = itr.next();
  18. //第一个参数交换机名字 第二个参数作为 消息的routing key
  19. channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_ROUTING,me.getKey() , null , me.getValue().getBytes());
  20. }
  21. channel.close();
  22. connection.close();
  23. }
  24. }
  25. //消费者
  26. public class Sina {
  27. public static void main(String[] args) throws IOException {
  28. //获取TCP长连接
  29. Connection connection = RabbitUtils.getConnection();
  30. //获取虚拟连接
  31. final Channel channel = connection.createChannel();
  32. //声明队列信息
  33. channel.queueDeclare(RabbitConstant.QUEUE_SINA, false, false, false, null);
  34. //指定队列与交换机以及routing key之间的关系
  35. channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "us.cal.lsj.20201127");
  36. channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hubei.wuhan.20201127");
  37. channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "us.cal.lsj.20201128");
  38. channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.henan.zhengzhou.20201012");
  39. channel.basicQos(1);
  40. channel.basicConsume(RabbitConstant.QUEUE_SINA , false , new DefaultConsumer(channel){
  41. @Override
  42. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  43. System.out.println("新浪天气收到气象信息:" + new String(body));
  44. channel.basicAck(envelope.getDeliveryTag() , false);
  45. }
  46. });
  47. }
  48. }
  49. public class BiaDu {
  50. public static void main(String[] args) throws IOException {
  51. Connection connection = RabbitUtils.getConnection();
  52. final Channel channel = connection.createChannel();
  53. channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, false, false, false, null);
  54. //queueBind用于将队列与交换机绑定
  55. //参数1:队列名 参数2:交互机名 参数三:路由key
  56. channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hunan.changsha.20201127");
  57. channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hebei.shijiazhuang.20201128");
  58. channel.basicQos(1);
  59. channel.basicConsume(RabbitConstant.QUEUE_BAIDU , false , new DefaultConsumer(channel){
  60. @Override
  61. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  62. System.out.println("百度天气收到气象信息:" + new String(body));
  63. channel.basicAck(envelope.getDeliveryTag() , false);
  64. }
  65. });
  66. }
  67. }

5、小结

Routing 模式要求队列在绑定交换机时要指定 routing key,消息会转发到符合 routing key 的队列。

五、通配符模式

1、通配符模式

  • Topic 类型与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型Exchange 可以让队列在绑定 Routing key 的时候使用通配符
  • Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
  • 通配符规则:# 匹配一个或多个词,* 匹配不多不少恰好1个词,例如:item.# 能够匹配 item.insert.abc 或者 item.insert,item.* 只能匹配 item.insert

2、实例代码:

  1. //发布者
  2. public class WeatherBureau {
  3. public static void main(String[] args) throws Exception {
  4. Map area = new LinkedHashMap<String, String>();
  5. area.put("china.hunan.changsha.20201127", "中国湖南长沙20201127天气数据");
  6. area.put("china.hubei.wuhan.20201127", "中国湖北武汉20201127天气数据");
  7. area.put("china.hunan.zhuzhou.20201127", "中国湖南株洲20201127天气数据");
  8. area.put("us.cal.lsj.20201127", "美国加州洛杉矶20201127天气数据");
  9. area.put("china.hebei.shijiazhuang.20201128", "中国河北石家庄20201128天气数据");
  10. area.put("china.hubei.wuhan.20201128", "中国湖北武汉20201128天气数据");
  11. area.put("china.henan.zhengzhou.20201128", "中国河南郑州20201128天气数据");
  12. area.put("us.cal.lsj.20201128", "美国加州洛杉矶20201128天气数据");
  13. Connection connection = RabbitUtils.getConnection();
  14. Channel channel = connection.createChannel();
  15. Iterator<Map.Entry<String, String>> itr = area.entrySet().iterator();
  16. while (itr.hasNext()) {
  17. Map.Entry<String, String> me = itr.next();
  18. //第一个参数交换机名字 第二个参数作为 消息的routing key
  19. channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_TOPIC,me.getKey() , null , me.getValue().getBytes());
  20. }
  21. channel.close();
  22. connection.close();
  23. }
  24. }
  25. //消费者
  26. public class Sina {
  27. public static void main(String[] args) throws IOException {
  28. //获取TCP长连接
  29. Connection connection = RabbitUtils.getConnection();
  30. //获取虚拟连接
  31. final Channel channel = connection.createChannel();
  32. //声明队列信息
  33. channel.queueDeclare(RabbitConstant.QUEUE_SINA, false, false, false, null);
  34. //指定队列与交换机以及routing key之间的关系
  35. channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "us.#");
  36. channel.basicQos(1);
  37. channel.basicConsume(RabbitConstant.QUEUE_SINA , false , new DefaultConsumer(channel){
  38. @Override
  39. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  40. System.out.println("新浪天气收到气象信息:" + new String(body));
  41. channel.basicAck(envelope.getDeliveryTag() , false);
  42. }
  43. });
  44. }
  45. }
  46. public class BiaDu {
  47. public static void main(String[] args) throws IOException {
  48. Connection connection = RabbitUtils.getConnection();
  49. final Channel channel = connection.createChannel();
  50. channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, false, false, false, null);
  51. //queueBind用于将队列与交换机绑定
  52. //参数1:队列名 参数2:交互机名 参数三:路由key
  53. channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "*.*.*.20201127");
  54. channel.basicQos(1);
  55. channel.basicConsume(RabbitConstant.QUEUE_BAIDU , false , new DefaultConsumer(channel){
  56. @Override
  57. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  58. System.out.println("百度天气收到气象信息:" + new String(body));
  59. channel.basicAck(envelope.getDeliveryTag() , false);
  60. }
  61. });
  62. }
  63. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Gausst松鼠会/article/detail/594758
推荐阅读
相关标签
  

闽ICP备14008679号