当前位置:   article > 正文

RabbitMQ 操作测试_rabbitmq 测试

rabbitmq 测试

以下内容是团队要使用 RabbitMQ 的时候自己做的一些测试,并不是权威的,只是自己根据测试结果推测的结论。

一、使用 SpringCloud Stream 集成 RabbitMQ

1. 生产者 cloud-stream-rabbitmq-provider8801

1)pom

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-web</artifactId>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.springframework.boot</groupId>
  8. <artifactId>spring-boot-starter-actuator</artifactId>
  9. </dependency>
  10. <dependency>
  11. <groupId>org.springframework.cloud</groupId>
  12. <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
  13. </dependency>
  14. <!--RabbitMQ-->
  15. <dependency>
  16. <groupId>org.springframework.cloud</groupId>
  17. <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
  18. </dependency>
  19. <!--基础配置-->
  20. <dependency>
  21. <groupId>org.springframework.boot</groupId>
  22. <artifactId>spring-boot-devtools</artifactId>
  23. <scope>runtime</scope>
  24. <optional>true</optional>
  25. </dependency>
  26. <dependency>
  27. <groupId>org.projectlombok</groupId>
  28. <artifactId>lombok</artifactId>
  29. <optional>true</optional>
  30. </dependency>
  31. <dependency>
  32. <groupId>org.springframework.boot</groupId>
  33. <artifactId>spring-boot-starter-test</artifactId>
  34. <scope>test</scope>
  35. </dependency>
  36. </dependencies>

2)配置文件

  1. server:
  2. port: 8801
  3. spring:
  4. application:
  5. name: cloud-stream-provider
  6. cloud:
  7. stream:
  8. binders: # 在此处配置要绑定的rabbitmq的服务信息;
  9. defaultRabbit: # 表示定义的名称,用于于binding整合
  10. type: rabbit # 消息组件类型
  11. environment: # 设置rabbitmq的相关的环境配置
  12. spring:
  13. rabbitmq:
  14. host: localhost
  15. port: 5672
  16. username: user_lyd
  17. password: 1234
  18. virtual-host: /vhost_lyd
  19. bindings: # 服务的整合处理
  20. output: # 这个名字是一个通道的名称 (可以自己定义,这里是默认的)
  21. destination: testMaterialExchange # 表示要使用的 Exchange名称定义
  22. content-type: application/json # 设置消息类型,本次为 json,文本则设置 “text/plain”
  23. binder: defaultRabbit # 设置要绑定的消息服务的具体设置
  24. eureka:
  25. client: # 客户端进行Eureka注册的配置
  26. service-url:
  27. defaultZone: http://admin:admin@localhost:10002/eureka/eureka/
  28. instance:
  29. lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
  30. lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
  31. instance-id: send-8801.com # 在信息列表时显示主机名称
  32. prefer-ip-address: true # 访问的路径变为IP地址

注意上面写的交换机和队列,要创建好哦

3)发送消息的接口以及实现类

  1. /**
  2. * @Description 发送消息接口
  3. */
  4. public interface IMessageProvider {
  5. String send();
  6. }
  1. package com.janet.springcloud.service.impl;
  2. import com.janet.springcloud.service.IMessageProvider;
  3. import org.springframework.cloud.stream.annotation.EnableBinding;
  4. import org.springframework.cloud.stream.messaging.Source;
  5. import org.springframework.messaging.MessageChannel;
  6. import org.springframework.messaging.support.MessageBuilder;
  7. import javax.annotation.Resource;
  8. import java.util.UUID;
  9. /**
  10. * @Description 发送消息接口实现类
  11. */
  12. @EnableBinding(Source.class) //定义消息的推送管道,指信道channel和exchange绑定到一起
  13. public class MessageProviderImpl implements IMessageProvider {
  14. @Resource
  15. private MessageChannel output; //消息发送管道
  16. @Override
  17. public String send() {
  18. String serial = UUID.randomUUID().toString();
  19. output.send(MessageBuilder.withPayload(serial).build());
  20. System.out.println("------发送消息:"+serial);
  21. return serial;
  22. }
  23. }

4)生产者发送消息测试

  1. import com.janet.springcloud.service.IMessageProvider;
  2. import org.springframework.web.bind.annotation.GetMapping;
  3. import org.springframework.web.bind.annotation.RestController;
  4. import javax.annotation.Resource;
  5. @RestController
  6. public class SendMessageController {
  7. @Resource
  8. private IMessageProvider messageProvider;
  9. @GetMapping(value = "/sendMessage")
  10. public String SendMessage(){
  11. return messageProvider.send();
  12. }
  13. }

5)主启动类

  1. @SpringBootApplication
  2. public class StreamMQMain8801 {
  3. public static void main(String[] args) {
  4. SpringApplication.run(StreamMQMain8801.class,args);
  5. }
  6. }

2. 消费者1  cloud-stream-rabbitmq-consumer8802

1)POM

  1. <dependency>
  2. <groupId>org.springframework.cloud</groupId>
  3. <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
  4. </dependency>
  5. <dependency>
  6. <groupId>org.springframework.cloud</groupId>
  7. <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
  8. </dependency>
  9. <dependency>
  10. <groupId>org.springframework.boot</groupId>
  11. <artifactId>spring-boot-starter-web</artifactId>
  12. </dependency>
  13. <dependency>
  14. <groupId>org.springframework.boot</groupId>
  15. <artifactId>spring-boot-starter-actuator</artifactId>
  16. </dependency>
  17. <!--基础配置-->
  18. <dependency>
  19. <groupId>org.springframework.boot</groupId>
  20. <artifactId>spring-boot-devtools</artifactId>
  21. <scope>runtime</scope>
  22. <optional>true</optional>
  23. </dependency>
  24. <dependency>
  25. <groupId>org.projectlombok</groupId>
  26. <artifactId>lombok</artifactId>
  27. <optional>true</optional>
  28. </dependency>
  29. <dependency>
  30. <groupId>org.springframework.boot</groupId>
  31. <artifactId>spring-boot-starter-test</artifactId>
  32. <scope>test</scope>
  33. </dependency>

2)配置文件

  1. server:
  2. port: 8802
  3. spring:
  4. application:
  5. name: cloud-stream-consumer
  6. cloud:
  7. stream:
  8. binders: # 在此处配置要绑定的rabbitmq的服务信息;
  9. defaultRabbit: # 表示定义的名称,用于于binding整合
  10. type: rabbit # 消息组件类型
  11. environment: # 设置rabbitmq的相关的环境配置
  12. spring:
  13. rabbitmq:
  14. host: localhost
  15. port: 5672
  16. username: user_lyd
  17. password: 1234
  18. virtual-host: /vhost_lyd
  19. bindings: # 服务的整合处理
  20. input: # 这个名字是一个通道的名称
  21. destination: testMaterialExchange # 表示要使用的Exchange名称定义
  22. content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain”
  23. binder: defaultRabbit # 设置要绑定的消息服务的具体设置
  24. eureka:
  25. client: # 客户端进行Eureka注册的配置
  26. service-url:
  27. defaultZone: http://admin:admin@localhost:10002/eureka/eureka/
  28. instance:
  29. lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
  30. lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
  31. instance-id: send-8801.com # 在信息列表时显示主机名称
  32. prefer-ip-address: true # 访问的路径变为IP地址

3)主启动类

  1. package com.janet.springcloud;
  2. import org.springframework.boot.SpringApplication;
  3. import org.springframework.boot.autoconfigure.SpringBootApplication;
  4. /**
  5. * @Description 生产者1号主启动类
  6. */
  7. @SpringBootApplication
  8. public class StreamMQMain8802 {
  9. public static void main(String[] args) {
  10. SpringApplication.run(StreamMQMain8802.class,args);
  11. }
  12. }

4)接收消息类

  1. package com.janet.springcloud.controller;
  2. import org.springframework.beans.factory.annotation.Value;
  3. import org.springframework.cloud.stream.annotation.EnableBinding;
  4. import org.springframework.cloud.stream.annotation.StreamListener;
  5. import org.springframework.cloud.stream.messaging.Sink;
  6. import org.springframework.messaging.Message;
  7. import org.springframework.stereotype.Component;
  8. /**
  9. * @Description TODO
  10. * @Author Janet
  11. * @Date 2020/6/27
  12. */
  13. @Component
  14. @EnableBinding(Sink.class)
  15. public class ReceiveMessageListenerController {
  16. @Value("${server.port}")
  17. private String serverPort;
  18. @StreamListener(Sink.INPUT)
  19. public void input(Message<String> message){
  20. // 用 withPayload 发送,用 getPayload 接收
  21. System.out.println("消费者 1 号,------------> 接收到的消息"+message.getPayload()+"\t port: "+serverPort);
  22. }
  23. }

3. 消费者3 cloud-stream-rabbitmq-consumer8803

配置和消费者2一模一样

二、测试

1. 消费者队列名设置

1)两个消费者 group 属性都没有设置,消费者默认了两个不同的分组:

消费者发送了三条消息:

两个消费者都收到了这三条消息:

结论 1:如果消费者不设置分组信息,则每个消费者会默认不同的分组。不同的分组会重复消费生产者生产的消息。

2)两个消费者设置相同分组:

结论 2:此时两个消费者设置了同一个分组,连接了同一个队列,相当于负载均衡的情况,一个消息只能有一个消费者拿到。这里没有设置手动确认机制,默认了自动确认,可以看出消费者 1 和消费者 2 是有序的获取了消息。(轮询发送)

3)两个消费者在同一个组(绑定了同一个队列)的情况下,先关掉两个消费者的服务,生产者生产消息发送,再打开消费者1,过几秒后再打开消费者2,观察日志:

结论 3:在消费者绑定组的情况下,即使生产消息的时候不在线,后来在线了依然会收到消息(持久化)。毋庸置疑,如果消费者没有同时启动,一个先一个后的话,则只会有第一个消费者接收到全部消息。

4)两个消费者去掉组的属性,不绑定队列,然后像上一步一样,先发消息,再打开消费者服务:

此时可以看到消费者没有接收到上一次生产者发送的消息。

可以看到消费者各自有了默认的队列,如果此时关闭两个消费者服务,两个默认的队列就没了:

重启之后消费者又重新默认了一个新的队列,消费者自然也是接收不到上一次发送的信息的。


以下不用SpringCloud Stream 了,自己写一个队列。

2. 超时时间

2.1 队列设置过期时间

  1. public class Send {
  2. private static final String EXCHANGE_NAME = "testMaterialExchange1";
  3. public static void main(String[] args) throws Exception {
  4. Connection connection = ConnectionUtils.getConnection();
  5. Channel channel = connection.createChannel();
  6. //定义交换机
  7. channel.exchangeDeclare(EXCHANGE_NAME,"topic");
  8. for(int i = 0; i <10; i++){
  9. String msg = "hello mq"+i;
  10. System.out.println("生产者发送消息:"+msg);
  11. //4. 发送消息
  12. channel.basicPublish(EXCHANGE_NAME,"goods.new.add",null,msg.getBytes());
  13. System.out.println("----send"+msg);
  14. }
  15. channel.close();
  16. connection.close();
  17. }
  18. }
  1. public class Receive1 {
  2. private static final String EXCHANGE_NAME = "testMaterialExchange1";
  3. private static final String QUEUE_NAME = "testMaterialGroup3";
  4. public static void main(String[] args) throws Exception {
  5. Connection connection = ConnectionUtils.getConnection();
  6. final Channel channel = connection.createChannel();
  7. HashMap<String, Object> argss = new HashMap<>();
  8. argss.put("x-message-ttl", 10000); //设置队列里面的消息过期时间10秒
  9. channel.queueDelete(QUEUE_NAME); //创建队列前先删除队列,避免已存在队列造成误差
  10. //声明队列
  11. channel.queueDeclare(QUEUE_NAME, false, false, false, argss);
  12. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.#");
  13. channel.basicQos(1);
  14. //定义一个消费者
  15. Consumer consumer = new DefaultConsumer(channel) {
  16. // 消息到达,触发这个消息
  17. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  18. String msg = new String(body, "utf-8");
  19. System.out.println("[1]: " + msg);
  20. try {
  21. Thread.sleep(3000);
  22. } catch (InterruptedException e) {
  23. e.printStackTrace();
  24. }finally {
  25. channel.basicAck(envelope.getDeliveryTag(),false);
  26. System.out.println("[1] done");
  27. }
  28. }
  29. };
  30. //4. 监听队列
  31. boolean autoAck = false; //自动应答 false
  32. channel.basicConsume(QUEUE_NAME, autoAck, consumer);
  33. }
  34. }

消费者创建队列时设置了 10 秒的超时时间,并且消费一个消息休眠 3 秒,生产者发送消息:

可以看到消费者只消费了 4 个消息之后队列中的消息都超时了,由于没有设置死信队列,消息消失了。

结论:当队列设置超时时间,并且没有死信队列时,消息过期了但没来得及消费就会被删除。队列设置过期时间,队列中所有的消息过期时间相同。

2.2 发消息时消息自己设置过期时间

1)消费者发送 10 条消息,且每条设置过期时间为10秒,消费者 10 秒消费一条消息:

可以看出,消费者才消费了两个消息,其他的过期后都消失了。

2)生产者发送 1 条消息,请设置超时时间为 10 秒,消费者接收信息,且不确认:

  1. public class Send {
  2. private static final String EXCHANGE_NAME = "testMaterialExchange1";
  3. public static void main(String[] args) throws Exception {
  4. Connection connection = ConnectionUtils.getConnection();
  5. Channel channel = connection.createChannel();
  6. //定义交换机
  7. channel.exchangeDeclare(EXCHANGE_NAME,"topic",true);
  8. AMQP.BasicProperties.Builder bd = new AMQP.BasicProperties().builder();
  9. bd.deliveryMode(2);//持久化 1 是不持久化
  10. bd.expiration("10000");//设置消息有效期10秒钟
  11. AMQP.BasicProperties pros = bd.build();
  12. String message = "测试ttl消息";
  13. channel.basicPublish(EXCHANGE_NAME, "goods.new.add", true,false, pros, message.getBytes());
  14. System.out.println("----send"+message);
  15. channel.close();
  16. connection.close();
  17. }
  18. }
  1. public class Receive2 {
  2. private static final String EXCHANGE_NAME = "testMaterialExchange1";
  3. private static final String QUEUE_NAME = "testMaterialGroup3";
  4. public static void main(String[] args) throws Exception {
  5. Connection connection = ConnectionUtils.getConnection();
  6. final Channel channel = connection.createChannel();
  7. channel.queueDelete(QUEUE_NAME); //创建队列前先删除队列,避免已存在队列造成误差
  8. //声明队列
  9. channel.queueDeclare(QUEUE_NAME,true,false,false,null);
  10. channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"goods.new.*");
  11. channel.basicQos(1);
  12. //定义一个消费者
  13. Consumer consumer = new DefaultConsumer(channel) {
  14. // 消息到达,触发这个消息
  15. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  16. String msg = new String(body, "utf-8");
  17. System.out.println("[2]: " + msg);
  18. try {
  19. Thread.sleep(12000);
  20. } catch (InterruptedException e) {
  21. e.printStackTrace();
  22. }finally {
  23. // channel.basicAck(envelope.getDeliveryTag(),false);
  24. System.out.println("[2] done");
  25. }
  26. }
  27. };
  28. //4. 监听队列
  29. boolean autoAck = false; //自动应答 false
  30. channel.basicConsume(QUEUE_NAME,autoAck,consumer);
  31. }
  32. }

可以看到消息一直都被阻塞了:

结论:消息设置的过期时间是指消费者未拿到消息之前,如果消费者还没拿到数据,那么超过设置的有效期并且队列没有设置死信交换机的话,消息就会被删除。如果消费者已经拿到了消息,并且未确认,那么消息就会一直阻塞,不会消失。(无论是消息队列整体设置过期时间还是消息单独设置过期时间都是这样)

2.3 生产者发送 10 条消息,并设置 5 秒过期时间。两个消费者加上  channel.basicQos(1)  方法,设置手动确认且不确认。

生产者:

发送 10 条消息,并设置 5 秒过期时间

  1. public class Send {
  2. private static final String EXCHANGE_NAME = "testTopicExchange";
  3. public static void main(String[] args) throws Exception {
  4. Connection connection = ConnectionUtils.getConnection();
  5. Channel channel = connection.createChannel();
  6. //定义交换机
  7. channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
  8. AMQP.BasicProperties.Builder bd = new AMQP.BasicProperties().builder();
  9. bd.deliveryMode(2);//持久化
  10. bd.expiration("5000");//设置消息有效期5秒钟
  11. AMQP.BasicProperties pros = bd.build();
  12. for (int i = 0; i < 10; i++) {
  13. String msg = "hello mq" + i;
  14. // 发送消息
  15. channel.basicPublish(EXCHANGE_NAME, "goods.new.add", true, false, pros, msg.getBytes());
  16. System.out.println("生产者发送消息:" + msg);
  17. }
  18. channel.close();
  19. connection.close();
  20. }
  21. }

消费者1:

  1. public class Receive1 {
  2. private static final String EXCHANGE_NAME = "testTopicExchange";
  3. private static final String QUEUE_NAME = "testTopicGroup1";
  4. public static void main(String[] args) throws Exception {
  5. Connection connection = ConnectionUtils.getConnection();
  6. final Channel channel = connection.createChannel();
  7. //声明队列
  8. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  9. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.#");
  10. channel.basicQos(1);
  11. //定义一个消费者
  12. Consumer consumer = new DefaultConsumer(channel) {
  13. // 消息到达,触发这个消息
  14. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  15. String msg = new String(body, "utf-8");
  16. System.out.println("[1]: " + msg);
  17. try {
  18. Thread.sleep(5000);
  19. } catch (InterruptedException e) {
  20. e.printStackTrace();
  21. }finally {
  22. // channel.basicAck(envelope.getDeliveryTag(),false);
  23. // System.out.println("[1] done");
  24. }
  25. }
  26. };
  27. //4. 监听队列
  28. boolean autoAck = false; //自动应答 false
  29. channel.basicConsume(QUEUE_NAME, autoAck, consumer);
  30. }
  31. }

消费者2:

  1. public class Receive2 {
  2. private static final String EXCHANGE_NAME = "testTopicExchange";
  3. private static final String DLX_EXCHANGE_NAME = "dlxExchange";
  4. private static final String QUEUE_NAME = "testTopicGroup1";
  5. public static void main(String[] args) throws Exception {
  6. Connection connection = ConnectionUtils.getConnection();
  7. final Channel channel = connection.createChannel();
  8. // channel.queueDelete(QUEUE_NAME); //创建队列前先删除队列,避免已存在队列造成误差
  9. //声明死信交换机(fanout 类型)以及死信队列
  10. // channel.exchangeDeclare(DLX_EXCHANGE_NAME,"fanout",true,true,null); //声明死信交换机
  11. // Map<String, Object> arguments = new HashMap<>();
  12. // arguments.put("x-message-ttl" , 5000);//设置消息有效期1秒,过期后变成私信消息,然后进入DLX (过期时间在生产者发送消息的时候设置了,这里不重复设置)
  13. // arguments.put("x-dead-letter-exchange" , DLX_EXCHANGE_NAME);//设置DLX
  14. // arguments.put("x-dead-letter-routing-key" , "");//设置DLX的路由键(这里的死信交换机是fanout类型,所以这里不设置)
  15. //声明普通队列 并添加 DLX
  16. // channel.queueDeclare(QUEUE_NAME,true,false,false,arguments);
  17. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  18. channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"goods.new.*"); //普通队列绑定生产者的交换机
  19. channel.basicQos(1); //如果要确认,生产者和消费者都要有这行代码
  20. //定义一个消费者
  21. Consumer consumer = new DefaultConsumer(channel) {
  22. // 消息到达,触发这个消息
  23. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  24. String msg = new String(body, "utf-8");
  25. System.out.println("[消费者 2 接收消息]: " + msg);
  26. try {
  27. Thread.sleep(100);
  28. } catch (InterruptedException e) {
  29. e.printStackTrace();
  30. }finally {
  31. // channel.basicAck(envelope.getDeliveryTag(),false);
  32. // System.out.println("[消费者 2 ] done");
  33. }
  34. }
  35. };
  36. //4. 监听队列
  37. boolean autoAck = false; //自动应答 false
  38. channel.basicConsume(QUEUE_NAME,autoAck,consumer);
  39. }
  40. }

消费者 1 接收到了第一条消息,消费者 2 接收到了第二条消息,开始阻塞。5秒后队列中的消息过期消失了。

结论:(channel.basicQos(1)  方法在消费者端写)如果两个消费者绑定了同一个队列,且消费端都设置了 channel.basicQos(1) 方法,设置消息手动确认且不确认,那么第一个消费者拿到第一条数据后这条消息就会阻塞,其他的消费者就会跳开刚刚阻塞的消息拿下一条数据。

2)生产者发送 10 条消息,并设置 5 秒过期时间。两个消费者去掉  channel.basicQos(1)  方法,设置手动确认且不确认。(代码和上面一样,就是去掉了 channel.basicQos(1) )

两个消费者轮询收到了消息,但是队列中的消息没有删掉。

结论:两个消费者连接同一个队列,如果消费端没有加 channel.basicQos(n) 方法,消息默认轮询发送。且如果消息一直没有确认,队列中的消息不会删除。(可以看出如果消息消费者已经拿到了,就不会过期了)

channel.basicQos(5); //每次分发5个需要确认

把生产者和两个消费者的 basicQos 方法改为每次发 5 个消息,两个消费者都设置了手动确认,消费者 1 正常确认,消费者 2 不确认:(这个方法其实在生产端不用写也行)

可以看出,如果 channel.basicQos(5) 写的是5,这说明每发 5 个消息需要确认,否则就不会再发给这个消费者了。上述例子消费者 2 一直未确认,所以收到了 5 条消息之后就阻塞了。还可以看出,其中几条消息阻塞是不会影响队列中后面的消息被消费的(所以消费者1 拿到了被阻塞消息后面的消息),只会影响被阻塞的这个消费者以及这几条消息。(而且前 10 条消息都是轮询发送)

(以及消费者报错都会重试以及报错)

3. 死信队列

消费者发送消息时设置 5 秒过期时间:

  1. import com.janet.util.ConnectionUtils;
  2. import com.rabbitmq.client.AMQP;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.Connection;
  5. /**
  6. * @Description 生产者 交换机为topic类型
  7. * @Date 2021/4/27
  8. * @Author Janet
  9. */
  10. public class Send {
  11. private static final String EXCHANGE_NAME = "testTopicExchange";
  12. public static void main(String[] args) throws Exception {
  13. Connection connection = ConnectionUtils.getConnection();
  14. Channel channel = connection.createChannel();
  15. //定义交换机
  16. channel.exchangeDeclare(EXCHANGE_NAME,"topic",true);
  17. AMQP.BasicProperties.Builder bd = new AMQP.BasicProperties().builder();
  18. bd.deliveryMode(2);//持久化
  19. bd.expiration("5000");//设置消息有效期5秒钟
  20. AMQP.BasicProperties pros = bd.build();
  21. for(int i = 0; i <10; i++){
  22. String msg = "hello mq"+i;
  23. System.out.println("生产者发送消息:"+msg);
  24. //4. 发送消息
  25. channel.basicPublish(EXCHANGE_NAME,"goods.new.add",true,false, pros, msg.getBytes());
  26. System.out.println("----send"+msg);
  27. }
  28. channel.close();
  29. connection.close();
  30. }
  31. }

消费者接收生产者的消息,并消费一次休眠 5 秒钟。生产者的队列设置死信交换机。(所以这个消费者只能正常消费 2 条消息,其他的全部过期进死信队列了)

  1. public class Receive2 {
  2. private static final String EXCHANGE_NAME = "testTopicExchange";
  3. private static final String DLX_EXCHANGE_NAME = "dlxExchange";
  4. private static final String QUEUE_NAME = "testTopicGroup1";
  5. public static void main(String[] args) throws Exception {
  6. Connection connection = ConnectionUtils.getConnection();
  7. final Channel channel = connection.createChannel();
  8. channel.queueDelete(QUEUE_NAME); //创建队列前先删除队列,避免已存在队列造成误差
  9. //声明死信交换机(fanout 类型)
  10. channel.exchangeDeclare(DLX_EXCHANGE_NAME,"fanout",true,true,null); //声明死信交换机
  11. Map<String, Object> arguments = new HashMap<>();
  12. // arguments.put("x-message-ttl" , 5000);//设置消息有效期1秒,过期后变成私信消息,然后进入DLX (过期时间在生产者发送消息的时候设置了,这里不重复设置)
  13. arguments.put("x-dead-letter-exchange" , DLX_EXCHANGE_NAME);//设置DLX
  14. // arguments.put("x-dead-letter-routing-key" , "");//设置DLX的路由键(这里的死信交换机是fanout类型,所以这里不设置)
  15. //声明普通队列 并添加 DLX
  16. channel.queueDeclare(QUEUE_NAME,true,false,false,arguments);
  17. channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"goods.new.*"); //普通队列绑定生产者的交换机
  18. channel.basicQos(1);
  19. //定义一个消费者
  20. Consumer consumer = new DefaultConsumer(channel) {
  21. // 消息到达,触发这个消息
  22. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  23. String msg = new String(body, "utf-8");
  24. System.out.println("[2]: " + msg);
  25. try {
  26. Thread.sleep(5000);
  27. } catch (InterruptedException e) {
  28. e.printStackTrace();
  29. }finally {
  30. channel.basicAck(envelope.getDeliveryTag(),false);
  31. System.out.println("[2] done");
  32. }
  33. }
  34. };
  35. //4. 监听队列
  36. boolean autoAck = false; //自动应答 false
  37. channel.basicConsume(QUEUE_NAME,autoAck,consumer);
  38. }
  39. }

再定义一个消费者消费死信队列中的消息:

  1. import com.janet.util.ConnectionUtils;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. /**
  5. * @Description 定义一个消费者消费死信队列中的消息
  6. */
  7. public class Receive3 {
  8. private static final String EXCHANGE_NAME = "dlxExchange"; //死信交换机
  9. private static final String QUEUE_NAME = "dexGroup"; //死信队列
  10. public static void main(String[] args) throws Exception {
  11. Connection connection = ConnectionUtils.getConnection();
  12. final Channel channel = connection.createChannel();
  13. channel.queueDeclare(QUEUE_NAME, true, false, false, null); //声明死信队列
  14. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "null"); //死信队列和死信交换机绑定
  15. //定义一个消费者
  16. Consumer consumer = new DefaultConsumer(channel) {
  17. // 消息到达,触发这个消息
  18. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  19. String msg = new String(body, "utf-8");
  20. System.out.println("消费者3接收到死信队列中的消息: " + msg);
  21. channel.basicAck(envelope.getDeliveryTag(), false);
  22. System.out.println("[消费者3] done");
  23. }
  24. };
  25. //4. 监听队列
  26. boolean autoAck = false; //自动应答 false
  27. channel.basicConsume(QUEUE_NAME, autoAck, consumer);
  28. }
  29. }

结论:如果设置了死信队列,过期的消息会进入死信队列重新消费。

4. 拒绝接收数据

/*
         * 拒绝接收消息
         * 参数1:deliveryTag 该消息的index
         * 参数2:multiple:是否批量。true:将一次性拒绝所有小于 deliveryTag 的消息。
         * 参数3:requeue:被拒绝的消息是否重新入队列
         */

void basicNack(long deliveryTag, boolean multiple, boolean requeue)

1)生产者发送 1 条消息,消费者拒绝接收该数据(第三个参数 requeue 设置为 false)且不设置死信队列

消费者:

  1. public class Send {
  2. private static final String EXCHANGE_NAME = "testTopicExchange";
  3. public static void main(String[] args) throws Exception {
  4. Connection connection = ConnectionUtils.getConnection();
  5. Channel channel = connection.createChannel();
  6. //定义交换机
  7. channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
  8. String msg = "hello mq";
  9. // 发送消息
  10. channel.basicPublish(EXCHANGE_NAME, "goods.new.add", true, false, null, msg.getBytes());
  11. System.out.println("生产者发送消息:" + msg);
  12. channel.close();
  13. connection.close();
  14. }
  15. }
  1. public class Receive1 {
  2. private static final String EXCHANGE_NAME = "testTopicExchange";
  3. private static final String QUEUE_NAME = "testTopicGroup1";
  4. public static void main(String[] args) throws Exception {
  5. Connection connection = ConnectionUtils.getConnection();
  6. final Channel channel = connection.createChannel();
  7. channel.queueDelete(QUEUE_NAME); //创建队列前先删除队列,避免已存在队列造成误差
  8. //声明队列
  9. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  10. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.#");
  11. channel.basicQos(1); //每个消费者发送确认消息之前,消息队列不发送下一个消息到消费者,一次只能处理一个消息
  12. //定义一个消费者
  13. Consumer consumer = new DefaultConsumer(channel) {
  14. // 消息到达,触发这个消息
  15. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  16. String msg = new String(body, "utf-8");
  17. System.out.println("[消费者 1 ]接收到消息: " + msg);
  18. channel.basicNack(envelope.getDeliveryTag(), false, false);
  19. System.out.println("[消费者 1 ]拒绝消息:");
  20. }
  21. };
  22. //4. 监听队列
  23. boolean autoAck = false; //自动应答 false
  24. channel.basicConsume(QUEUE_NAME, autoAck, consumer);
  25. }
  26. }

消息被消费者拒绝,且队列中没有消息。

结论:如果没有设置死信队列,消息被拒绝且不重新入队的话消息会被队列删除。

2)生产者发送 1 条消息,消费者拒绝接收该数据(第三个参数 requeue 设置为 false)并设置死信队列。

生产者代码没变。

消费者:

  1. public class Receive2 {
  2. private static final String EXCHANGE_NAME = "testTopicExchange";
  3. private static final String DLX_EXCHANGE_NAME = "dlxExchange";
  4. private static final String QUEUE_NAME = "testTopicGroup1";
  5. public static void main(String[] args) throws Exception {
  6. Connection connection = ConnectionUtils.getConnection();
  7. final Channel channel = connection.createChannel();
  8. channel.queueDelete(QUEUE_NAME); //创建队列前先删除队列,避免已存在队列造成误差
  9. //声明死信交换机(fanout 类型)以及死信队列
  10. channel.exchangeDeclare(DLX_EXCHANGE_NAME, "fanout", true, true, null); //声明死信交换机
  11. Map<String, Object> arguments = new HashMap<>();
  12. arguments.put("x-dead-letter-exchange", DLX_EXCHANGE_NAME);//设置DLX
  13. //声明普通队列 并添加 DLX
  14. channel.queueDeclare(QUEUE_NAME, true, false, false, arguments);
  15. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.new.*"); //普通队列绑定生产者的交换机
  16. channel.basicQos(1); //如果要确认,生产者和消费者都要有这行代码
  17. //定义一个消费者
  18. Consumer consumer = new DefaultConsumer(channel) {
  19. // 消息到达,触发这个消息
  20. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  21. String msg = new String(body, "utf-8");
  22. System.out.println("[消费者 2 接收消息]: " + msg);
  23. channel.basicNack(envelope.getDeliveryTag(), false, false);
  24. System.out.println("[消费者 1 ]拒绝消息:");
  25. }
  26. };
  27. //4. 监听队列
  28. boolean autoAck = false; //自动应答 false
  29. channel.basicConsume(QUEUE_NAME, autoAck, consumer);
  30. }
  31. }

定义消费者接收死信队列中的消息:

  1. public class Receive3 {
  2. private static final String EXCHANGE_NAME = "dlxExchange"; //死信交换机
  3. private static final String QUEUE_NAME = "dexGroup"; //死信队列
  4. public static void main(String[] args) throws Exception {
  5. Connection connection = ConnectionUtils.getConnection();
  6. final Channel channel = connection.createChannel();
  7. channel.queueDeclare(QUEUE_NAME, true, false, false, null); //声明死信队列
  8. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "null"); //死信队列和死信交换机绑定
  9. channel.basicQos(1);
  10. //定义一个消费者
  11. Consumer consumer = new DefaultConsumer(channel) {
  12. // 消息到达,触发这个消息
  13. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  14. String msg = new String(body, "utf-8");
  15. System.out.println("消费者 3 接收到死信队列中的消息: " + msg);
  16. channel.basicAck(envelope.getDeliveryTag(), false);
  17. System.out.println("[消费者 3 ] 确认消费消息");
  18. }
  19. };
  20. //4. 监听队列
  21. boolean autoAck = false; //自动应答 false
  22. channel.basicConsume(QUEUE_NAME, autoAck, consumer);
  23. }
  24. }

结论:如果设置了死信队列,消息被拒绝且不重新入队的话消息会进入死信队列。

3)生产者发送多条消息,两个消费者绑定同一队列。消费者1正常接收数据,消费者2拒绝接收数据(第三个参数 requeue 设置为 true,再入队列)并设置死信队列。

  1. public class Send {
  2. private static final String EXCHANGE_NAME = "testTopicExchange";
  3. public static void main(String[] args) throws Exception {
  4. Connection connection = ConnectionUtils.getConnection();
  5. Channel channel = connection.createChannel();
  6. //定义交换机
  7. channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
  8. for (int i = 0; i < 10; i++) {
  9. String msg = "hello mq" + i;
  10. // 发送消息
  11. channel.basicPublish(EXCHANGE_NAME, "goods.new.add", true, false, null, msg.getBytes());
  12. System.out.println("生产者发送消息:" + msg);
  13. }
  14. channel.close();
  15. connection.close();
  16. }
  17. }
  1. public class Receive1 {
  2. private static final String EXCHANGE_NAME = "testTopicExchange";
  3. private static final String QUEUE_NAME = "testTopicGroup1";
  4. public static void main(String[] args) throws Exception {
  5. Connection connection = ConnectionUtils.getConnection();
  6. final Channel channel = connection.createChannel();
  7. // HashMap<String, Object> argss = new HashMap<>();
  8. // argss.put("x-message-ttl", 10000); //设置队列里面的消息过期时间10秒
  9. // argss.put("x-max-length",5); //队列的CAHNG
  10. // channel.queueDelete(QUEUE_NAME); //创建队列前先删除队列,避免已存在队列造成误差
  11. //声明队列
  12. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  13. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.#");
  14. channel.basicQos(1); //每个消费者发送确认消息之前,消息队列不发送下一个消息到消费者,一次只能处理一个消息
  15. //定义一个消费者
  16. Consumer consumer = new DefaultConsumer(channel) {
  17. // 消息到达,触发这个消息
  18. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  19. String msg = new String(body, "utf-8");
  20. System.out.println("[消费者 1 ]接收到消息: " + msg);
  21. channel.basicAck(envelope.getDeliveryTag(), false);
  22. // channel.basicNack(envelope.getDeliveryTag(), false, false);
  23. System.out.println("[消费者 1 ]处理完成:");
  24. }
  25. };
  26. //4. 监听队列
  27. boolean autoAck = false; //自动应答 false
  28. channel.basicConsume(QUEUE_NAME, autoAck, consumer);
  29. }
  30. }
  1. public class Receive2 {
  2. private static final String EXCHANGE_NAME = "testTopicExchange";
  3. private static final String DLX_EXCHANGE_NAME = "dlxExchange";
  4. private static final String QUEUE_NAME = "testTopicGroup1";
  5. public static void main(String[] args) throws Exception {
  6. Connection connection = ConnectionUtils.getConnection();
  7. final Channel channel = connection.createChannel();
  8. // channel.queueDelete(QUEUE_NAME); //创建队列前先删除队列,避免已存在队列造成误差
  9. //声明死信交换机(fanout 类型)以及死信队列
  10. // channel.exchangeDeclare(DLX_EXCHANGE_NAME, "fanout", true, true, null); //声明死信交换机
  11. // Map<String, Object> arguments = new HashMap<>();
  12. // arguments.put("x-dead-letter-exchange", DLX_EXCHANGE_NAME);//设置DLX
  13. //声明普通队列 并添加 DLX
  14. // channel.queueDeclare(QUEUE_NAME, true, false, false, arguments);
  15. // channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.new.*"); //普通队列绑定生产者的交换机
  16. channel.basicQos(1); //如果要确认,生产者和消费者都要有这行代码
  17. //定义一个消费者
  18. Consumer consumer = new DefaultConsumer(channel) {
  19. // 消息到达,触发这个消息
  20. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  21. String msg = new String(body, "utf-8");
  22. System.out.println("[消费者 2 接收消息]: " + msg);
  23. //拒绝接收消息,且重新入队列
  24. channel.basicNack(envelope.getDeliveryTag(), false, true);
  25. // channel.basicAck(envelope.getDeliveryTag(),false);
  26. System.out.println("[消费者 2 ]拒绝消息:");
  27. }
  28. };
  29. //4. 监听队列
  30. boolean autoAck = false; //自动应答 false
  31. channel.basicConsume(QUEUE_NAME, autoAck, consumer);
  32. }
  33. }

可以看到,消费者1正常消费数据,消费者 2 一直循环拒绝消息 1 (并且把队列入队了),直到消费者1成功消费完队列中的消息。

4)把上一个情况的 channel.basicQos(1) 改为 channel.basicQos(4),其他情况不变

可以看到,消费者1和消费者2轮询获取 4 条消息,消费者1消费完成之后,继续消费下面的数据,而消费者2由于没有确认,一直在拒绝,所以得到的还是刚刚的4条数据,一直循环拒绝(消费者1每消费完1条,就少循环一条),直到消费者1消费完了队列中的所有消息。

5)以上代码不变,消费者 2 加上 3 秒的休眠时间

可以看到消费者1 消费消息的顺序没有改变,只是消费者2循环的次数减少了。

可以看到消费者根据 channel.basicQos(N) 方法轮询先拿到各自的 N 条数据,然后消费者 1 先消费自己刚刚拿到的数据,再消费没消费过的数据,最后再消费 消费者 2 拒绝过的数据;

消费者 2 一直循环拒绝刚刚未确认的 N 条数据,消费者 1 每消费 1 条,消费者 2 则少循环 1 条。一直到队列被消费者 1 消费完。(消费者 1 消费 消费者 2 拒绝过的数据的顺序,有待考究)

(这里看到,绑定同一个队列的几个消费者,拿到的数据都是顺序轮询的,那要是设置了basicQos(4)方法,且两个消费者都是正常确认,前4个消息也是顺序轮询吗?)

5. 延迟队列

Spring Cloud Stream 进阶配置——使用延迟队列实现“定时关闭超时未支付订单”_多隆的博客-CSDN博客_springcloud stream延迟队列

6. 队列声明后,参数是否可以修改

我先声明一个队列:channel.queueDeclare(QUEUE_NAME, false, false, false, null);

可以看到队列已经生成了,此时我企图重新声明这个队列来改变可持久性:channel.queueDeclare(QUEUE_NAME, true, false, false, null);

但是会报错:

Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'testTopicGroup1' in vhost '/vhost_lyd': received 'true' but current is 'false', class-id=50, method-id=10)

结论:队列一旦声明,参数将无法更改、添加、删除。要改变一个队列的参数,只有两种办法:

  • 删除该队列,重新创建
  • 换个名字,创建一个新的队列

7. 队列的优先级

1)生产者发送 10 条消息,消息优先级为 i ,消费者 1 的队列设置优先级为 5,消费者 2 的队列不设置优先级

  1. /**
  2. * @Description 生产者 测试队列的优先级、topic交换机
  3. */
  4. public class Send {
  5. private static final String EXCHANGE_NAME = "testTopicPriorityExchange";
  6. public static void main(String[] args) throws Exception {
  7. Connection connection = ConnectionUtils.getConnection();
  8. Channel channel = connection.createChannel();
  9. //定义交换机
  10. channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
  11. for (int i = 1; i <= 10; i++) {
  12. AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties().builder();
  13. builder.priority(i);
  14. AMQP.BasicProperties pros = builder.build();
  15. String msg = "hello mq" + i;
  16. // 发送消息
  17. channel.basicPublish(EXCHANGE_NAME, "goods.new.add", true, false, pros, msg.getBytes());
  18. System.out.println("生产者发送消息:" + msg + ",优先级为:"+i);
  19. }
  20. channel.close();
  21. connection.close();
  22. }
  23. }
  1. import com.janet.util.ConnectionUtils;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. import java.util.HashMap;
  5. public class Receive1 {
  6. private static final String EXCHANGE_NAME = "testTopicPriorityExchange";
  7. private static final String QUEUE_NAME = "testTopicPriorityGroup1";
  8. public static void main(String[] args) throws Exception {
  9. Connection connection = ConnectionUtils.getConnection();
  10. final Channel channel = connection.createChannel();
  11. channel.queueDelete(QUEUE_NAME); //创建队列前先删除队列,避免已存在队列造成误差
  12. HashMap<String, Object> argss = new HashMap<>();
  13. argss.put("x-max-priority", 5); //设置队列优先级
  14. //声明队列
  15. channel.queueDeclare(QUEUE_NAME, false, false, false, argss);
  16. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.#");
  17. channel.basicQos(1); //每个消费者发送确认消息之前,消息队列不发送下一个消息到消费者,一次只能处理一个消息
  18. //定义一个消费者
  19. Consumer consumer = new DefaultConsumer(channel) {
  20. // 消息到达,触发这个消息
  21. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  22. String msg = new String(body, "utf-8");
  23. System.out.println("[消费者 1 ]接收到消息: " + msg);
  24. channel.basicAck(envelope.getDeliveryTag(), false);
  25. System.out.println("[消费者 1 ]处理完成");
  26. }
  27. };
  28. //4. 监听队列
  29. boolean autoAck = false; //自动应答 false
  30. channel.basicConsume(QUEUE_NAME, autoAck, consumer);
  31. }
  32. }
  1. public class Receive2 {
  2. private static final String EXCHANGE_NAME = "testTopicPriorityExchange";
  3. private static final String QUEUE_NAME = "testTopicPriorityGroup2";
  4. public static void main(String[] args) throws Exception {
  5. Connection connection = ConnectionUtils.getConnection();
  6. final Channel channel = connection.createChannel();
  7. channel.queueDelete(QUEUE_NAME); //创建队列前先删除队列,避免已存在队列造成误差
  8. //声明队列
  9. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  10. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.new.*");
  11. channel.basicQos(1); //每个消费者发送确认消息之前,消息队列不发送下一个消息到消费者,一次只能处理一个消息
  12. //定义一个消费者
  13. Consumer consumer = new DefaultConsumer(channel) {
  14. // 消息到达,触发这个消息
  15. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  16. String msg = new String(body, "utf-8");
  17. System.out.println("[消费者 2 ]接收到消息: " + msg);
  18. channel.basicAck(envelope.getDeliveryTag(), false);
  19. System.out.println("[消费者 2 ]处理完成");
  20. }
  21. };
  22. //4. 监听队列
  23. boolean autoAck = false; //自动应答 false
  24. channel.basicConsume(QUEUE_NAME, autoAck, consumer);
  25. }
  26. }

生产者发送 10 条消息,消息优先级为 i ,消费者 1 的队列设置最大优先级为 5,消费者 2 的队列不设置优先级 ,先启动消费者,再启动生产者:

可以看到,消费者 1 除了第 1 条消息以外(一会再验证这一条),5-10 条消息先消费,5 以内的消息按照我们刚刚设置的优先级消费。所以我们猜测如果消息的优先级 > 队列的最大优先级 ,那么这些消息的优先级就不起效果了,所以 5-10条消息按顺序先消费了(至于为什么是5-10条数据先消费,而不是2-4条消息先消费,这里我猜测可能是原来消息设置的优先级就比较大,虽然这几个不按优先级排序了,但还是在 2-4 条消息的优先级之前,所以先消费了),最后才按优先级消费队列最大优先级 5 以内的 3 条数据。

2)设置消费者 1 的队列优先级为 20,消费者 2 消费 1 条消息后休眠 3 秒,先启动两个消费者,再启动生产者:

消费者1:

  1. HashMap<String, Object> argss = new HashMap<>();
  2. argss.put("x-max-priority", 20); //设置队列优先级
  3. //声明队列
  4. channel.queueDeclare(QUEUE_NAME, false, false, false, argss);

消费者2:

Thread.sleep(3000);

可以看到,有优先级的队列消费者1拿到第一条数据后,其余的都是按照优先级消费。消费者2队列没有设置优先级,所以消息也没有按照优先级排序,是按照发消息的顺序消费的。

3)设置消费者 1 的队列优先级为 5,设置每消费一条消息休眠 5 秒,先启动消费者1,再启动生产者:

  1. HashMap<String, Object> argss = new HashMap<>();
  2. argss.put("x-max-priority", 5); //设置队列优先级
  3. //声明队列
  4. channel.queueDeclare(QUEUE_NAME, false, false, false, argss);
  5. ...
  6. Thread.sleep(3000);

4)生产者 5 秒发送一条消息,消费者及时消费消息(消费者的速度 >消费者速度),先启动消费者,再启动生产者:

可以看到消费者按顺序消费消息,说明如果消费者速度 > 生产者速度的话,发来一条就消费一条了,没时间排序。这时设置优先级就没有意义。

5)生产者 5 秒发送一条消息,消费者及时消费消息(消费者的速度 >消费者速度),先启动生产者几秒后,再启动消费者(之前是消费者1启动之前会删除队列的,避免有误差,这里就不要删除队列了,确保生产者发消息的时候,消费者的队列是存在的):

  1. public class Send {
  2. private static final String EXCHANGE_NAME = "testTopicPriorityExchange";
  3. public static void main(String[] args) throws Exception {
  4. Connection connection = ConnectionUtils.getConnection();
  5. Channel channel = connection.createChannel();
  6. //定义交换机
  7. channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
  8. for (int i = 1; i <= 10; i++) {
  9. AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties().builder();
  10. builder.priority(i);
  11. AMQP.BasicProperties pros = builder.build();
  12. String msg = "hello mq" + i;
  13. Thread.sleep(3000);
  14. // 发送消息
  15. channel.basicPublish(EXCHANGE_NAME, "goods.new.add", true, false, pros, msg.getBytes());
  16. System.out.println("生产者发送消息:" + msg + ",优先级为:"+i);
  17. }
  18. channel.close();
  19. connection.close();
  20. }
  21. }
  1. public class Receive1 {
  2. private static final String EXCHANGE_NAME = "testTopicPriorityExchange";
  3. private static final String QUEUE_NAME = "testTopicPriorityGroup1";
  4. public static void main(String[] args) throws Exception {
  5. Connection connection = ConnectionUtils.getConnection();
  6. final Channel channel = connection.createChannel();
  7. // channel.queueDelete(QUEUE_NAME); //创建队列前先删除队列,避免已存在队列造成误差
  8. HashMap<String, Object> argss = new HashMap<>();
  9. argss.put("x-max-priority", 5); //设置队列优先级
  10. //声明队列
  11. channel.queueDeclare(QUEUE_NAME, false, false, false, argss);
  12. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.#");
  13. channel.basicQos(1); //每个消费者发送确认消息之前,消息队列不发送下一个消息到消费者,一次只能处理一个消息
  14. //定义一个消费者
  15. Consumer consumer = new DefaultConsumer(channel) {
  16. // 消息到达,触发这个消息
  17. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  18. String msg = new String(body, "utf-8");
  19. System.out.println("[消费者 1 ]接收到消息: " + msg);
  20. channel.basicAck(envelope.getDeliveryTag(), false);
  21. System.out.println("[消费者 1 ]处理完成");
  22. }
  23. };
  24. //4. 监听队列
  25. boolean autoAck = false; //自动应答 false
  26. channel.basicConsume(QUEUE_NAME, autoAck, consumer);
  27. }
  28. }

先启动生产者之后,发送消息,队列里面已经有数据了,但是消费者还没连接:

生产者 5 秒发送一条消息,消费者及时消费消息(消费者的速度 >消费者速度),先启动生产者几秒后,再启动消费者(之前是消费者1启动之前会删除队列的,避免有误差,这里就不要删除队列了,确保生产者发消息的时候,消费者的队列是存在的)。打开消费者:

发现第 5、6条消息先消费了,再是 4、3、2、1,再是7、8...根据上面的测试,可以推测出:消费者连接队列的时候,队列中已经有了 1-6 条数据,所以前 6 条消息已经排序了(因为这个队列的最大优先级设置了 5,所以第 5 条消息先消费,再根据优先级消费队列最大优先级以内的),后 4 条因为消费者速度 > 生产者速度,所以来一条消费一条,就没有排序。

6)消费者队列最大优先级为 5 ,生产者发消息时,1-4条消息不设置优先级,5-10条消息设置优先级,先启动生产者再启动消费者:

这里可以看出,只要设置了优先级,无论生不生效,都会比没有设置优先级的消息优先执行。

结论:

① 队列可以设置最大优先级N(1-255的整数),消息也可以设置优先级。如果要保证消息按照优先级消费,那么队列和消息都要设置优先级。如果只有消息设置了优先级,而消息进入的队列没有设置最大优先级,那么这些消息设置的优先级就不会起效果,依然还是按照消息进入队列的顺序消费。

② 在队列设置最大优先级的前提下,设置优先级的消息比没有设置优先级的消息先消费。

③ 优先级大的消息先消费。

④ 消息设置的优先级 i 必须小于所在队列的最大优先级 N 才会起效果,否则 i >= N 的消息不会按照优先级排序,还是按照入队列的顺序消费。而 i<N 的消息才会按照优先级消费。并且此时 i >= N 的消息比 i<N 的消息先消费。

⑤ 要想设置的优先级有意义,必须保证生产者的速度 > 消费者的速度,队列中有消息堆积才会排序,不然生产者发来一条就被消费者消费了,队列中至多只有一条消息,根本不会去排序。

8. Max length : 队列可以容纳的消息的最大条数

队列可以容纳的消息的最大条数,超过这个条数,队列头部的消息将会被丢弃。

测试 : 我们设置消费者队列最多只能容纳 1 条消息,生产者一次性发送 10 条消息。

生产者:

  1. for (int i = 0; i < 10; i++) {
  2. String msg = "hello mq" + i;
  3. // 发送消息
  4. channel.basicPublish(EXCHANGE_NAME, "goods.new.add", true, false, null, msg.getBytes());
  5. System.out.println("生产者发送消息:" + msg);
  6. }

消费者:

  1. public class Receive1 {
  2. private static final String EXCHANGE_NAME = "testTopicExchange";
  3. private static final String QUEUE_NAME = "testTopicGroup1";
  4. public static void main(String[] args) throws Exception {
  5. Connection connection = ConnectionUtils.getConnection();
  6. final Channel channel = connection.createChannel();
  7. HashMap<String, Object> argss = new HashMap<>();
  8. // argss.put("x-message-ttl", 10000); //设置队列里面的消息过期时间10秒
  9. argss.put("x-max-length", 1); //队列的长度 设置队列中的消息的最大条数为 1 条,超过1条,则遵循队列的"先进先出(丢)"原则.
  10. //声明队列
  11. channel.queueDeclare(QUEUE_NAME, false, false, false, argss);
  12. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.#");
  13. channel.basicQos(1); //每个消费者发送确认消息之前,消息队列不发送下一个消息到消费者,一次只能处理一个消息
  14. boolean autoAck = false; //自动应答 false
  15. //定义一个消费者
  16. channel.basicConsume(QUEUE_NAME, autoAck, new DefaultConsumer(channel){
  17. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  18. String msg = new String(body, "utf-8");
  19. System.out.println("[消费者 1 ]接收到消息: " + msg);
  20. channel.basicAck(envelope.getDeliveryTag(), false);
  21. System.out.println("[消费者 1 ]处理完成");
  22. }
  23. });
  24. }
  25. }

先启动消费者,再启动生产者:

当生产者发送消息的时候,消费者已经连接队列等待消费了,所以第一条消息进来的时候,队列中有且仅有一条消息,并推给消费者消费。当后面的数据发送之后,由于队列中最多只能有一条消息,所以遵循 “先进先出(丢)” 的原则,消息1-8都被丢弃了,最后剩下消息 9 退给消费者。

所以我们猜测,如果是生产者先发送消息,再启动消费者,消费者只会拿到第 9 条消息,其他的消息会被丢掉。

启动生产者,再启动消费者:

结果确实是这样子的。

结论:如果队列设置了 Max length 为 n,说明队列里面任何时候都有且仅有 n 条数据,且先进先出(丢)。

9. Max length bytes : 队列可以容纳的消息的最大字节数

队列可以容纳的消息的最大字节数,超过这个字节数,队列头部的消息将会被丢弃。

消费者:

  1. HashMap<String, Object> argss = new HashMap<>();
  2. argss.put("x-max-length-bytes", 10); //设置队列中的消息的最大字节数
  3. //声明队列
  4. channel.queueDeclare(QUEUE_NAME, false, false, false, argss);

1)发送一条 18 个字节的消息(我们采用UTF8编码,1个汉字占3个字节),先启动消费者,再启动生产者:

生产者:

  1. channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
  2. String msg = "旧故里草木深";
  3. // 发送消息
  4. channel.basicPublish(EXCHANGE_NAME, "goods.new.add", true, false, null, msg.getBytes());
  5. System.out.println("生产者发送消息:" + msg);

2)发送一条 18 个字节的消息,先启动生产者,再启动消费者(这里的意思是队列还是存在的,只是消费者不在线而已哈):

队列里面直接没有数据,好像是直接扔掉了

3)发送一条 9 个字节的消息。这条和下面的如果没有说明,都是先启动生产者,再启动消费者。

此时这个消息的字节数没有超过 10,所以消息还是在队列里面的。

4)消费者同时发送两条消息,第一条字节数 9,第二条字节数 18。

  1. channel.basicPublish(EXCHANGE_NAME, "goods.new.add", true, false, null, "草木深".getBytes());
  2. System.out.println("生产者发送消息:" + "草木深");
  3. channel.basicPublish(EXCHANGE_NAME, "goods.new.add", true, false, null, "旧故里草木深".getBytes());
  4. System.out.println("生产者发送消息:" + "旧故里草木深");

发现队列里面一条数据也没有,好像是直接扔掉了。

5)把上两个消息的顺序换掉

  1. channel.basicPublish(EXCHANGE_NAME, "goods.new.add", true, false, null, "旧故里草木深".getBytes());
  2. System.out.println("生产者发送消息:" + "旧故里草木深");
  3. channel.basicPublish(EXCHANGE_NAME, "goods.new.add", true, false, null, "草木深".getBytes());
  4. System.out.println("生产者发送消息:" + "草木深");

发现只有一条数据了,第一条被删掉了。

6)

  1. channel.basicPublish(EXCHANGE_NAME, "goods.new.add", true, false, null, "草木深".getBytes());
  2. System.out.println("生产者发送消息:" + "草木深");
  3. channel.basicPublish(EXCHANGE_NAME, "goods.new.add", true, false, null, "旧".getBytes());
  4. System.out.println("生产者发送消息:" + "旧");

7)

  1. channel.basicPublish(EXCHANGE_NAME, "goods.new.add", true, false, null, "草木".getBytes());
  2. System.out.println("生产者发送消息:" + "草木深");
  3. channel.basicPublish(EXCHANGE_NAME, "goods.new.add", true, false, null, "旧".getBytes());
  4. System.out.println("生产者发送消息:" + "旧");
  5. channel.basicPublish(EXCHANGE_NAME, "goods.new.add", true, false, null, "旧".getBytes());
  6. System.out.println("生产者发送消息:" + "旧");

8)

  1. channel.basicPublish(EXCHANGE_NAME, "goods.new.add", true, false, null, "旧".getBytes());
  2. System.out.println("生产者发送消息:" + "旧");
  3. channel.basicPublish(EXCHANGE_NAME, "goods.new.add", true, false, null, "故".getBytes());
  4. System.out.println("生产者发送消息:" + "故");
  5. channel.basicPublish(EXCHANGE_NAME, "goods.new.add", true, false, null, "里".getBytes());
  6. System.out.println("生产者发送消息:" + "里");
  7. channel.basicPublish(EXCHANGE_NAME, "goods.new.add", true, false, null, "草".getBytes());
  8. System.out.println("生产者发送消息:" + "草");

9)

  1. channel.basicPublish(EXCHANGE_NAME, "goods.new.add", true, false, null, "旧".getBytes());
  2. System.out.println("生产者发送消息:" + "旧");
  3. channel.basicPublish(EXCHANGE_NAME, "goods.new.add", true, false, null, "故".getBytes());
  4. System.out.println("生产者发送消息:" + "故");
  5. channel.basicPublish(EXCHANGE_NAME, "goods.new.add", true, false, null, "里".getBytes());
  6. System.out.println("生产者发送消息:" + "里");
  7. channel.basicPublish(EXCHANGE_NAME, "goods.new.add", true, false, null, "草".getBytes());
  8. System.out.println("生产者发送消息:" + "草");
  9. channel.basicPublish(EXCHANGE_NAME, "goods.new.add", true, false, null, "木".getBytes());
  10. System.out.println("生产者发送消息:" + "深");

10)定义两个消费者,第一个消费者不确认,第二个消费者正常确认消费:

  1. public class Receive1 {
  2. private static final String EXCHANGE_NAME = "testTopicExchange";
  3. private static final String QUEUE_NAME = "testTopicGroup1";
  4. public static void main(String[] args) throws Exception {
  5. Connection connection = ConnectionUtils.getConnection();
  6. final Channel channel = connection.createChannel();
  7. HashMap<String, Object> argss = new HashMap<>();
  8. argss.put("x-max-length-bytes", 10); //设置队列中的消息的最大字节数
  9. //声明队列
  10. channel.queueDeclare(QUEUE_NAME, false, false, false, argss);
  11. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.#");
  12. channel.basicQos(1); //每个消费者发送确认消息之前,消息队列不发送下一个消息到消费者,一次只能处理一个消息
  13. boolean autoAck = false; //自动应答 false
  14. //定义一个消费者
  15. channel.basicConsume(QUEUE_NAME, autoAck, new DefaultConsumer(channel){
  16. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  17. String msg = new String(body, "utf-8");
  18. System.out.println("[消费者 1 ]接收到消息: " + msg);
  19. // channel.basicAck(envelope.getDeliveryTag(), false);
  20. // System.out.println("[消费者 1 ]处理完成");
  21. }
  22. });
  23. }
  24. }
  1. public class Receive2 {
  2. private static final String EXCHANGE_NAME = "testTopicExchange";
  3. private static final String DLX_EXCHANGE_NAME = "dlxExchange";
  4. private static final String QUEUE_NAME = "testTopicGroup1";
  5. public static void main(String[] args) throws Exception {
  6. Connection connection = ConnectionUtils.getConnection();
  7. final Channel channel = connection.createChannel();
  8. boolean autoAck = false; //自动应答 false
  9. //定义一个消费者
  10. channel.basicConsume(QUEUE_NAME, autoAck, new DefaultConsumer(channel){
  11. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  12. String msg = new String(body, "utf-8");
  13. System.out.println("[消费者 2 ]接收到消息: " + msg);
  14. channel.basicAck(envelope.getDeliveryTag(), false);
  15. System.out.println("[消费者 2 ]处理完成");
  16. }
  17. });
  18. }
  19. }

先启动消费者1,再启动生产者发消息,再启动消费者2:

先启动消费者1,再启动消费者2再,启动生产者发消息:

结论:如果设置了 Max length bytes 参数为 n,说明队列里面所有消息的字节数都不会超过 n,遵循先进先出(丢)的原理,而且丢消息是一整条丢。(如果设置了死信队列,那么就进入死信队列)(如果消费者在生产者发消息的时候一直待机,且队列里面已经没有消息了,那么生产者刚发送的消息一下就会发给消费者消费了,设置的队列字节长度就布包括已经发送给消费者的消息,包括未确认的)

10. Overflow behaviour : 队列中的消息溢出后如何处理

队列中的消息溢出时,如何处理这些消息:要么丢弃队列头部的消息(drop-head ),要么丢掉后面生产者发送过来的所有消息(reject-publish)

1)设置队列中最大允许 1 条消息存在,并且丢掉后面生产者发送过来的所有消息(reject-publish)。生产者一次发送 5 条消息。

  1. channel.basicPublish(EXCHANGE_NAME, "goods.new.add", true, false, null, "旧啊啊啊啊".getBytes());
  2. System.out.println("生产者发送消息:" + "旧啊啊啊啊");
  3. channel.basicPublish(EXCHANGE_NAME, "goods.new.add", true, false, null, "故".getBytes());
  4. System.out.println("生产者发送消息:" + "故");
  5. channel.basicPublish(EXCHANGE_NAME, "goods.new.add", true, false, null, "里".getBytes());
  6. System.out.println("生产者发送消息:" + "里");
  7. channel.basicPublish(EXCHANGE_NAME, "goods.new.add", true, false, null, "草".getBytes());
  8. System.out.println("生产者发送消息:" + "草");
  9. channel.basicPublish(EXCHANGE_NAME, "goods.new.add", true, false, null, "木".getBytes());
  10. System.out.println("生产者发送消息:" + "木");
  1. public class Receive1 {
  2. private static final String EXCHANGE_NAME = "testTopicExchange";
  3. private static final String QUEUE_NAME = "testTopicGroup1";
  4. private static final String DLX_EXCHANGE_NAME = "dlxExchange";
  5. private static final String DEX_QUEUE_NAME = "dexGroup"; //死信队列
  6. public static void main(String[] args) throws Exception {
  7. Connection connection = ConnectionUtils.getConnection();
  8. final Channel channel = connection.createChannel();
  9. channel.exchangeDeclare(DLX_EXCHANGE_NAME, "fanout", true, true, null); //声明死信交换机
  10. channel.queueDeclare(DEX_QUEUE_NAME, true, false, false, null); //声明死信队列
  11. channel.queueBind(DEX_QUEUE_NAME, DLX_EXCHANGE_NAME, "null"); //死信队列和死信交换机绑定
  12. HashMap<String, Object> argss = new HashMap<>();
  13. argss.put("x-max-length", 1); //队列的长度 设置队列中的消息的最大条数为 1 条,超过1条,则遵循队列的"先进先出(丢)"原则.
  14. argss.put("x-overflow", "reject-publish"); //队列中的消息溢出时,如何处理这些消息:要么丢弃队列头部的消息(drop-head ),要么拒绝接收后面生产者发送过来的所有消息(reject-publish)
  15. // argss.put("x-dead-letter-exchange", DLX_EXCHANGE_NAME);//设置DLX
  16. // channel.queueDelete(QUEUE_NAME); //创建队列前先删除队列,避免已存在队列造成误差
  17. //声明队列
  18. channel.queueDeclare(QUEUE_NAME, false, false, false, argss);
  19. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.#");
  20. channel.basicQos(1); //每个消费者发送确认消息之前,消息队列不发送下一个消息到消费者,一次只能处理一个消息
  21. boolean autoAck = false; //自动应答 false
  22. //定义一个消费者
  23. channel.basicConsume(QUEUE_NAME, autoAck, new DefaultConsumer(channel){
  24. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  25. String msg = new String(body, "utf-8");
  26. System.out.println("[消费者 1 ]接收到消息: " + msg);
  27. channel.basicAck(envelope.getDeliveryTag(), false);
  28. System.out.println("[消费者 1 ]处理完成");
  29. }
  30. });
  31. }
  32. }

消费者消费了第 1 条消息,其他的消息丢掉了了。

2)设置队列中最大允许 1 条消息存在,并且丢弃队列头部的消息(drop-head )。生产者一次发送 5 条消息。

  1. HashMap<String, Object> argss = new HashMap<>();
  2. argss.put("x-max-length", 1); //队列的长度 设置队列中的消息的最大条数为 1 条,超过1条,则遵循队列的"先进先出(丢)"原则.
  3. argss.put("x-overflow", "drop-head"); //设置队列中的消息的最大字节数
  4. //声明队列
  5. channel.queueDeclare(QUEUE_NAME, false, false, false, argss);

消费者消费了最后 1 条消息,前面的消息丢掉了

结论:队列中的消息溢出时,如果队列设置了 x-overflow 参数,那么值为 drop-head  时会丢弃队列头部的消息,如果值为 reject-publish,就会拒绝后面生产者发送过来的所有消息。

而且,如果此队列设置有死信队列,第一种情况会进入死信队列,第二种不会(下一部分论证)。

11. Dead letter exchange 死信交换机

死信交换机前面已经讲很多了,这里就直接举个例子。

在前一部分中,我们给队列设置了 “x-overflow”的值,丢弃队列头部的消息(drop-head ),或者丢掉后面生产者发送过来的所有消息(reject-publish)

1)当 argss.put("x-overflow", "drop-head"); 时,设置死信交换机

  1. HashMap<String, Object> argss = new HashMap<>();
  2. argss.put("x-max-length", 1); //队列的长度 设置队列中的消息的最大条数为 1 条,超过1条,则遵循队列的"先进先出(丢)"原则.
  3. argss.put("x-overflow", "drop-head"); //设置队列中的消息的最大字节数
  4. argss.put("x-dead-letter-exchange", DLX_EXCHANGE_NAME);//设置DLX
  5. //声明队列
  6. channel.queueDeclare(QUEUE_NAME, false, false, false, argss);
  1. public class Receive3 {
  2. private static final String DEX_QUEUE_NAME = "dexGroup"; //死信队列
  3. public static void main(String[] args) throws Exception {
  4. Connection connection = ConnectionUtils.getConnection();
  5. final Channel channel = connection.createChannel();
  6. channel.basicQos(1);
  7. //定义一个消费者
  8. Consumer consumer = new DefaultConsumer(channel) {
  9. // 消息到达,触发这个消息
  10. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  11. String msg = new String(body, "utf-8");
  12. System.out.println("消费者 3 接收到死信队列中的消息: " + msg);
  13. channel.basicAck(envelope.getDeliveryTag(), false);
  14. System.out.println("[消费者 3 ] 确认消费消息");
  15. }
  16. };
  17. //4. 监听队列
  18. boolean autoAck = false; //自动应答 false
  19. channel.basicConsume(DEX_QUEUE_NAME, autoAck, consumer);
  20. }
  21. }

看队列中如果消息条数超过了设置的 1,那么扔掉的数据是否会进死信队列:

说明前面扔掉的消息确实进了死信队列。

2)当 argss.put("x-overflow", "reject-publish"); 时,设置死信交换机

argss.put("x-overflow", "reject-publish"); 

嗯,如果是设置了 “reject-publish”,消息就直接扔掉了,就算有死信队列也不会进入死信队列。

12. 消息的持久化

换器的持久化、队列的持久化和消息的持久化。

1)换器的持久化

在声明交换机时 durable 参数置为 true 实现的。如果交换器不设置持久化,那么在 RabbitMQ 服务重启之后,相关的交换器元数据会丢失。

这里我声明一个交换机,durable 参数置为 false:

channel.exchangeDeclare(EXCHANGE_NAME, "topic", false);

在重启 RabbitMQ 时,交换机消失了:

  1. //重启步骤:
  2. rabbitmqctl stop :停止rabbitmq
  3. rabbitmq-server restart : 重启rabbitmq

2)队列的持久化

在声明队列时将 durable 参数置为 true 实现的。如果队列不设置持久化,那么在 RabbitMQ 服务重启之后,相关队列的元数据会丢失,此时数据也会丢失。正所谓“皮之不存,毛将焉附”,队列都没有了,消息又能存在哪里呢? 

声明一个队列:

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

在重启 RabbitMQ 时,队列消失了:

队列的持久化能保证其本身的元数据不会因异常情况而丢失,但是并不能保证内部所存储的消息不会丢失。要确保消息不会丢失,需要将其设置为持久化。通过将消息的投递模式(BasicProperties 中的 deliveryMode 属性)设置为 2 即可实现消息的持久化


关联数据库:GitHub - skonline/canal-client: spring boot canal starter 易用的canal 客户端 canal client

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/AllinToyou/article/detail/393680
推荐阅读
相关标签
  

闽ICP备14008679号