当前位置:   article > 正文

RabbitMQ详解(二)五种队列及其实现_rabbitmq发送消息给queue

rabbitmq发送消息给queue

1. 简单模式

一个生产者P发送消息到队列Q,一个消费者C接收。

1.1 代码示例

pom文件

  1. <dependency>
  2. <groupId>com.rabbitmq</groupId>
  3. <artifactId>amqp-client</artifactId>
  4. <version>5.6.0</version>
  5. </dependency>

RabbitMQUtil工具类

  1. package util;
  2. import com.rabbitmq.client.Connection;
  3. import com.rabbitmq.client.ConnectionFactory;
  4. import java.io.IOException;
  5. import java.util.concurrent.TimeoutException;
  6. /**
  7. * RabbitMQ Util
  8. *
  9. * @author HarryZhang
  10. */
  11. public class RabbitMQUtil {
  12. private static Connection connection;
  13. /**
  14. * getConnection
  15. *
  16. * @return Connection
  17. */
  18. public static Connection getConnection() {
  19. if (connection != null) {
  20. return connection;
  21. }
  22. ConnectionFactory factory = new ConnectionFactory();
  23. factory.setHost("127.0.0.1");//MQ ip address
  24. factory.setPort(5672);//port
  25. factory.setUsername("HarryZhang");//username
  26. factory.setPassword("123456");//password
  27. try {
  28. connection = factory.newConnection();
  29. } catch (IOException | TimeoutException e) {
  30. e.printStackTrace();
  31. }
  32. return connection;
  33. }
  34. }

消息生产者

  1. import com.rabbitmq.client.Channel;
  2. import util.RabbitMQUtil;
  3. import java.io.IOException;
  4. public class Producer {
  5. //队列名
  6. private static final String QUEUE_NAME="debugers_test";
  7. public static void main(String[] args) throws IOException {
  8. Channel channel = RabbitMQUtil.getConnection().createChannel();
  9. channel.queueDeclare(QUEUE_NAME,false,false,false,null);
  10. String message="Hello HarryZhang";
  11. channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
  12. System.out.println("推送成功");
  13. }
  14. }

消息消费者

  1. import com.rabbitmq.client.Channel;
  2. import com.rabbitmq.client.DeliverCallback;
  3. import util.RabbitMQUtil;
  4. import java.io.IOException;
  5. public class Consumer {
  6. private static final String QUEUE_NAME="debugers_test";
  7. public static void main(String[] args) throws IOException {
  8. Channel channel = RabbitMQUtil.getConnection().createChannel();
  9. channel.queueDeclare(QUEUE_NAME,false,false,false,null);
  10. DeliverCallback deliverCallback=(consumerTag, delivery) -> {
  11. String message=new String(delivery.getBody(),"utf-8");
  12. System.out.println("接收消息:"+message);
  13. };
  14. channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
  15. }
  16. }

2. 工作队列(Work Queues)

一个生产者,多个消费者,一个消息只能被一个消费者获取。多个消费者只有一个队列。

python-two

2.1 轮询分发(Round Robin)

使用工作队列的优点之一就是可以轻易的并行工作。如果我们积压了好多工作,我们可以通过增加工作者(消费者)来解决这一问题,使得系统的伸缩性更加容易。在默认情况下,RabbitMQ采用轮询分发策略将逐个发送消息到在序列中的下一个消费者(而不考虑每个任务的时长等等,且是提前一次性分配,并非一个一个分配)。平均每个消费者获得相同数量的消息。

消费者

  1. package worker;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.DeliverCallback;
  5. import util.RabbitMQUtil;
  6. public class Recv1 {
  7. private static final String QUEUE_NAME="debugers_test";
  8. public static void main(String[] args) throws Exception {
  9. Connection connection = RabbitMQUtil.getConnection();
  10. Channel channel = connection.createChannel();
  11. channel.queueDeclare(QUEUE_NAME,false,false,false,null);
  12. DeliverCallback deliverCallback=(consumerTag, delivery) -> {
  13. String message=new String(delivery.getBody(),"utf-8");
  14. System.out.println(" [x] Received '" + message + "'");
  15. // 休眠1秒
  16. try {
  17. Thread.sleep(1000);
  18. } catch (InterruptedException e) {
  19. e.printStackTrace();
  20. }
  21. };
  22. channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
  23. }
  24. }

生产者

  1. package workerfair;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import util.RabbitMQUtil;
  5. import java.io.IOException;
  6. import java.util.concurrent.TimeoutException;
  7. public class Producer {
  8. private static final String QUEUE_NAME="debugers_test";
  9. public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {
  10. Connection connection = RabbitMQUtil.getConnection();
  11. Channel channel = connection.createChannel();
  12. channel.queueDeclare(QUEUE_NAME,false,false,false,null);
  13. for (int i = 0; i < 100; i++) {
  14. // 消息内容
  15. String message = "" + i;
  16. channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
  17. System.out.println(" [x] Sent '" + message + "'");
  18. Thread.sleep(i * 10);
  19. }
  20. channel.close();
  21. connection.close();
  22. }
  23. }

2.2 公平分发(Fair Dispatch)

有可能消费者处理消息的能力有差异(硬件设备,网络原因),我们期望处理能力强的消费者多处理消息,处理能力弱的消费者少处理消息。通过basicQos(perfetch)autoAck配合也可以实现。

  • basicQos:设置同一时刻服务器只会发perfetch**(此处为1)**条消息给消费者
  • autoAck:将自动应答改为手动。就处理完一条消息后手动提交。

消费者

  1. package workerfair;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.DeliverCallback;
  5. import util.RabbitMQUtil;
  6. public class Recv1 {
  7. private static final String QUEUE_NAME="debugers_test";
  8. public static void main(String[] args) throws Exception {
  9. Connection connection = RabbitMQUtil.getConnection();
  10. Channel channel = connection.createChannel();
  11. channel.queueDeclare(QUEUE_NAME,false,false,false,null);
  12. // 同一时刻服务器只会发一条消息给消费者
  13. channel.basicQos(1);
  14. DeliverCallback deliverCallback=(consumerTag, delivery) -> {
  15. String message=new String(delivery.getBody(),"utf-8");
  16. System.out.println(" [x] Received '" + message + "'");
  17. // 休眠1秒
  18. try {
  19. Thread.sleep(1000);
  20. } catch (InterruptedException e) {
  21. e.printStackTrace();
  22. }finally {
  23. System.out.println("[x] Done");
  24. channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
  25. }
  26. };
  27. //修改为手动应答,true为自动应答,false相反
  28. channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { });
  29. }
  30. }

注意:使用公平分发,必须关闭自动应答ack,然后改成手动应答方式。

3. 发布订阅(Publish/Subscribe)

img

一个生产者发送的消息可能会被多个消费者获取。一个生产者、一个交换机、多个队列、多个消费者。

注:X表示交换器,在RabbitMQ中,交换器主要有四种类型:direct、fanout、topic、headers。这里采用的是fanout类型。后面会详细介绍这几种交换器。

3.1 模式特点

  • 一个生产者,多个消费者
  • 每个消费者都有自己的队列
  • 生产者没有直接将消息发送到队列,而是发送到交换机(Exchange
  • 每个队列都要绑定交换机
  • 生产者发送到消息经过交换机 --> 到达队列–> 可以实现一个消息被多个消费者消

3.2 代码示例

生产者

  1. package ps;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import util.RabbitMQUtil;
  5. public class Send {
  6. private final static String EXCHANGE_NAME = "test_exchange_fanout";
  7. public static void main(String[] args) throws Exception {
  8. Connection connection = RabbitMQUtil.getConnection();
  9. Channel channel = connection.createChannel();
  10. //声明交换机
  11. channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
  12. // 消息内容
  13. String message = "Hello World!";
  14. channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
  15. System.out.println(" [x] Sent '" + message + "'");
  16. //关闭通道连接
  17. channel.close();
  18. connection.close();
  19. }
  20. }

注意: 此时的交换机是没有绑定队列的,发送消息肯定会丢失。交换机本身是不能存储数据的,队列才是存储数据的

消费者

  1. package ps;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.DeliverCallback;
  5. import util.RabbitMQUtil;
  6. public class Recv1 {
  7. private final static String EXCHANGE_NAME = "test_exchange_fanout";
  8. private final static String QUEUE_NAME="debugers_test_sms";
  9. public static void main(String[] args) throws Exception {
  10. Connection connection = RabbitMQUtil.getConnection();
  11. Channel channel = connection.createChannel();
  12. //声明队列
  13. channel.queueDeclare(QUEUE_NAME,false,false,false,null);
  14. //绑定队列到交换机
  15. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
  16. // 同一时刻服务器只会发一条消息给消费者
  17. channel.basicQos(1);
  18. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  19. String message = new String(delivery.getBody(), "utf-8");
  20. System.out.println(" [x] Received '" + message + "'");
  21. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  22. };
  23. //修改为手动应答,true为自动应答,false相反
  24. channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {
  25. });
  26. }
  27. }
  1. package ps;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.DeliverCallback;
  5. import util.RabbitMQUtil;
  6. public class Recv2 {
  7. private final static String EXCHANGE_NAME = "test_exchange_fanout";
  8. private final static String QUEUE_NAME="debugers_test_email";
  9. public static void main(String[] args) throws Exception {
  10. Connection connection = RabbitMQUtil.getConnection();
  11. Channel channel = connection.createChannel();
  12. //声明队列
  13. channel.queueDeclare(QUEUE_NAME,false,false,false,null);
  14. //绑定队列到交换机
  15. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
  16. // 同一时刻服务器只会发一条消息给消费者
  17. channel.basicQos(1);
  18. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  19. String message = new String(delivery.getBody(), "utf-8");
  20. System.out.println(" [x] Received '" + message + "'");
  21. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  22. };
  23. //修改为手动应答,true为自动应答,false相反
  24. channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {
  25. });
  26. }
  27. }

队列和交换机绑定关系

1563099102615

4. 路由模式(Routing)

生产者将消息发送到direct交换器,在绑定队列和交换器的时候有一个路由key,生产者发送的消息会指定一个路由key,那么消息只会发送到相应key相同的队列,接着监听该队列的消费者消费消息。也就是让消费者有选择性的接收消息。

4.1 代码示例

绑定 

绑定(binding)是队列和交换机之间的关系。你可以简单的理解为:队列对来自此交换机的消息感兴趣。

绑定可以采用额外的routingKey参数。千万别和basicPublish混淆了。

  1. //绑定队列到交换机
  2. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");

生产者

  1. package routing;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import util.RabbitMQUtil;
  5. public class Send {
  6. private final static String EXCHANGE_NAME = "test_exchange_direct";
  7. public static void main(String[] args) throws Exception {
  8. Connection connection = RabbitMQUtil.getConnection();
  9. Channel channel = connection.createChannel();
  10. //声明交换机
  11. channel.exchangeDeclare(EXCHANGE_NAME,"direct");
  12. // 消息内容
  13. String message = "Hello World!";
  14. //参数2为routingKey
  15. channel.basicPublish(EXCHANGE_NAME, "delete", null, message.getBytes());
  16. System.out.println(" [x] Sent '" + message + "'");
  17. //关闭通道连接
  18. channel.close();
  19. connection.close();
  20. }
  21. }

消费者1

  1. package routing;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.DeliverCallback;
  5. import util.RabbitMQUtil;
  6. public class Recv1 {
  7. private final static String EXCHANGE_NAME = "test_exchange_direct";
  8. private static final String QUEUE_NAME="debugers_test_sms";
  9. public static void main(String[] args) throws Exception {
  10. Connection connection = RabbitMQUtil.getConnection();
  11. Channel channel = connection.createChannel();
  12. //声明队列
  13. channel.queueDeclare(QUEUE_NAME,false,false,false,null);
  14. //绑定队列到交换机
  15. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
  16. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
  17. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert");
  18. // 同一时刻服务器只会发一条消息给消费者
  19. channel.basicQos(1);
  20. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  21. String message = new String(delivery.getBody(), "utf-8");
  22. System.out.println(" [x] Received '" + message + "'");
  23. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  24. };
  25. //修改为手动应答,true为自动应答,false相反
  26. channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {
  27. });
  28. }
  29. }

消费者2

  1. package routing;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.DeliverCallback;
  5. import util.RabbitMQUtil;
  6. public class Recv2 {
  7. private final static String EXCHANGE_NAME = "test_exchange_direct";
  8. private static final String QUEUE_NAME="debugers_test_email";
  9. public static void main(String[] args) throws Exception {
  10. Connection connection = RabbitMQUtil.getConnection();
  11. Channel channel = connection.createChannel();
  12. //声明队列
  13. channel.queueDeclare(QUEUE_NAME,false,false,false,null);
  14. //绑定队列到交换机
  15. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
  16. // 同一时刻服务器只会发一条消息给消费者
  17. channel.basicQos(1);
  18. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  19. String message = new String(delivery.getBody(), "utf-8");
  20. System.out.println(" [x] Received '" + message + "'");
  21. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  22. };
  23. //修改为手动应答,true为自动应答,false相反
  24. channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {
  25. });
  26. }
  27. }

5. 主题模式(topic)

上面的路由模式是根据路由key进行完整的匹配(完全相等才发送消息),这里的通配符模式通俗的来讲就是模糊匹配。

发送到topic交换的消息不能具有任意的 routing_key- 它必须是由点(.)分隔的单词列表。单词可以是任何内容,但通常它们指定与消息相关的一些功能。

一些有效的路由键示例:stock.usd.nyse,nyse.vmw,quick.orange.rabbit。路由密钥中可以包含任意数量的字符,最多可达255个字节。

绑定键有两个重要的特殊特性:(是用.分割的单词,而不是字符)

  • *:可以替代一个单词。
  • #:可以替换零个或多个单词。

5.1 代码示例

生产者

  1. package topic;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import util.RabbitMQUtil;
  5. public class Send {
  6. private final static String EXCHANGE_NAME = "test_exchange_topic";
  7. public static void main(String[] args) throws Exception {
  8. Connection connection = RabbitMQUtil.getConnection();
  9. Channel channel = connection.createChannel();
  10. //声明交换机
  11. channel.exchangeDeclare(EXCHANGE_NAME,"topic");
  12. // 消息内容
  13. String message = "Hello World!";
  14. //参数2为routingKey
  15. channel.basicPublish(EXCHANGE_NAME, "routeKey.a", null, message.getBytes());
  16. System.out.println(" [x] Sent '" + message + "'");
  17. //关闭通道连接
  18. channel.close();
  19. connection.close();
  20. }
  21. }

 消费者1

  1. package topic;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.DeliverCallback;
  5. import util.RabbitMQUtil;
  6. public class Recv1 {
  7. private final static String EXCHANGE_NAME = "test_exchange_topic";
  8. private static final String QUEUE_NAME="debugers_test_sms";
  9. public static void main(String[] args) throws Exception {
  10. Connection connection = RabbitMQUtil.getConnection();
  11. Channel channel = connection.createChannel();
  12. //声明队列
  13. channel.queueDeclare(QUEUE_NAME,false,false,false,null);
  14. //绑定队列到交换机
  15. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*");
  16. // 同一时刻服务器只会发一条消息给消费者
  17. channel.basicQos(1);
  18. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  19. String message = new String(delivery.getBody(), "utf-8");
  20. System.out.println(" [x] Received '" + message + "'");
  21. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  22. };
  23. //修改为手动应答,true为自动应答,false相反
  24. channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {
  25. });
  26. }
  27. }

消费者2

  1. package topic;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.DeliverCallback;
  5. import util.RabbitMQUtil;
  6. public class Recv2 {
  7. private final static String EXCHANGE_NAME = "test_exchange_topic";
  8. private static final String QUEUE_NAME="debugers_test_email";
  9. public static void main(String[] args) throws Exception {
  10. Connection connection = RabbitMQUtil.getConnection();
  11. Channel channel = connection.createChannel();
  12. //声明队列
  13. channel.queueDeclare(QUEUE_NAME,false,false,false,null);
  14. //绑定队列到交换机
  15. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routeKey.*");
  16. // 同一时刻服务器只会发一条消息给消费者
  17. channel.basicQos(1);
  18. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  19. String message = new String(delivery.getBody(), "utf-8");
  20. System.out.println(" [x] Received '" + message + "'");
  21. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  22. };
  23. //修改为手动应答,true为自动应答,false相反
  24. channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {
  25. });
  26. }
  27. }

6. 四种交换器(Exchange)

前面五种队列模式介绍完了,但是实际上只有三种,第一种简单队列,第二种工作模式,剩下的三种都是和交换器绑定的合起来称为一种,这小节我们就来详细介绍交换器。

交换器分为四种,分别是:direct、fanout、topic和 headers。

前面三种分别对应路由模式、发布订阅模式和通配符模式,headers 交换器允许匹配 AMQP 消息的 header 而非路由键,除此之外,header 交换器和 direct 交换器完全一致,但是性能却差很多,因此基本上不会用到该交换器,这里也不详细介绍。

6.1 direct

如果路由键完全匹配的话,消息才会被投放到相应的队列。

6.2 fanout

当发送一条消息到fanout交换器上时,它会把消息投放到所有附加在此交换器上的队列。

6.3 topic

设置模糊的绑定方式,“*”操作符将“.”视为分隔符,匹配单个字符;“#”操作符没有分块的概念,它将任意“.”均视为关键字的匹配部分,能够匹配多个字符。

7. 总结

关于 RabbitMQ 的五种队列,其实实际使用最多的是最后一种主题模式,通过模糊匹配,使得操作更加自如。那么我们总结一下有交换器参与的队列(最后三种队列)工作方式如下:

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/代码探险家/article/detail/808765
推荐阅读
相关标签
  

闽ICP备14008679号