当前位置:   article > 正文

Rabbitmq取消预取机制配置,配置手动确认后仍然java.lang.IllegalStateException: Channel closed; cannot ack/nack的问题_rabbitmq java.lang.illegalstateexception: connecti

rabbitmq java.lang.illegalstateexception: connection returned a null channel

目录

背景

配置

 报错

代码-生产者

 代码-消费者

思考

结果

 总结

背景

 根据教程写了工作队列,然后发现2个消费者是公平消费的,即10条消息,2个队列,分别取到了5条消息进行消费,由于教程设置2个消费者分别线程休眠时间不一样,目的是让其消费速度不一致,模仿实际生产中消息消费过程时间有长有短。(一个消费慢,一个消费快,若公平消费的话,即平分当前消息量,快的消费者已经干完事在休息了,慢的消费者还有一大堆消息等着处理呢,明显不符合资源调配利用,应该能者多劳)

配置

因为想要实现“能者多劳”的功能,所以消息的应答方式一定要设置成手动应答manual,参数prefetch的含义表示RabbitMQ服务器节点上最多的没有确认的消息个数. 这个参数设置为0时,表示不对消息个数进行限制, RabbitMQ服务器会投递尽可能多的消息到consumer,这里我设置成1

  1. #yml配置
  2. spring:
  3. rabbitmq:
  4. host: 127.0.0.1
  5. port: 5672
  6. username: guest
  7. password: guest
  8. virtual-host: /
  9. listener:
  10. simple:
  11. prefetch: 1 # 每次只能获取一条,处理完成才能获取下一条
  12. acknowledge-mode: manual #一定改为要手动确认模式

 报错

  1. org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method 'public void com.example.rabbitmq.consumer.PrefetchReceiver.myListener2(org.springframework.amqp.core.Message,com.rabbitmq.client.Channel) throws java.io.IOException,java.lang.InterruptedException' threw exception
  2. at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:271) ~[spring-rabbit-2.4.6.jar:2.4.6]
  3. at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandlerAndProcessResult(MessagingMessageListenerAdapter.java:208) ~[spring-rabbit-2.4.6.jar:2.4.6]
  4. at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:147) ~[spring-rabbit-2.4.6.jar:2.4.6]
  5. at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1670) ~[spring-rabbit-2.4.6.jar:2.4.6]
  6. at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1589) ~[spring-rabbit-2.4.6.jar:2.4.6]
  7. at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1577) ~[spring-rabbit-2.4.6.jar:2.4.6]
  8. at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1568) ~[spring-rabbit-2.4.6.jar:2.4.6]
  9. at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1512) ~[spring-rabbit-2.4.6.jar:2.4.6]
  10. at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:993) [spring-rabbit-2.4.6.jar:2.4.6]
  11. at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:940) [spring-rabbit-2.4.6.jar:2.4.6]
  12. at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:84) [spring-rabbit-2.4.6.jar:2.4.6]
  13. at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1317) [spring-rabbit-2.4.6.jar:2.4.6]
  14. at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1223) [spring-rabbit-2.4.6.jar:2.4.6]
  15. at java.lang.Thread.run(Thread.java:748) [na:1.8.0_181]
  16. Caused by: java.lang.IllegalStateException: Channel closed; cannot ack/nack
  17. at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:1153) ~[spring-rabbit-2.4.6.jar:2.4.6]
  18. at com.sun.proxy.$Proxy81.basicAck(Unknown Source) ~[na:na]
  19. at com.example.rabbitmq.consumer.PrefetchReceiver.myListener2(PrefetchReceiver.java:27) ~[classes/:na]
  20. at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_181]
  21. at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_181]
  22. at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_181]
  23. at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_181]
  24. at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:169) ~[spring-messaging-5.3.21.jar:5.3.21]
  25. at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:119) ~[spring-messaging-5.3.21.jar:5.3.21]
  26. at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:75) ~[spring-rabbit-2.4.6.jar:2.4.6]
  27. at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:262) ~[spring-rabbit-2.4.6.jar:2.4.6]
  28. ... 13 common frames omitted

代码-生产者

  1. @Component
  2. public class PrefetchSender {
  3. @Autowired
  4. private RabbitTemplate rabbitTemplate;
  5. public void prefetch() {
  6. String exchange = "prefetch";
  7. String queue = "prefetch.queue";
  8. for (int i = 0; i < 10; i++) {
  9. rabbitTemplate.convertAndSend(exchange, queue, "消息" + i);
  10. }
  11. }
  12. }
  1. // 测试类方法
  2. @Test
  3. void prefetch() {
  4. prefetchSender.prefetch();
  5. }

 代码-消费者

  1. import com.rabbitmq.client.Channel;
  2. import org.springframework.amqp.core.Message;
  3. import org.springframework.amqp.rabbit.annotation.Queue;
  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. import org.springframework.stereotype.Component;
  6. import java.io.IOException;
  7. @Component
  8. public class PrefetchReceiver {
  9. // 消费者1,快
  10. @RabbitListener(queuesToDeclare = @Queue("prefetch.queue")) // queuesToDeclare 自动声明队列
  11. public void myListener1(Message message, Channel channel) throws IOException, InterruptedException {
  12. Thread.sleep(200);
  13. System.out.println("1收到的消息是:" + new String(message.getBody(), "utf-8"));
  14. channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
  15. }
  16. // 消费者2,慢
  17. @RabbitListener(queuesToDeclare = @Queue("prefetch.queue")) // queuesToDeclare 自动声明队列
  18. public void myListener2(Message message, Channel channel) throws IOException, InterruptedException {
  19. Thread.sleep(1000);
  20. System.out.println("2收到的消息是:" + new String(message.getBody(), "utf-8"));
  21. channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
  22. }
  23. }

思考

已经配置了手动确认(spring.rabbitmq.listener.simple.acknowledge-mode: manual),还有每次获取消息处理是1条,处理完消息才能拿下1条(spring.rabbitmq.listener.simple.prefetch: 1),为什么通道关闭,确认消息失败呢?

翻看一些帖子后,发现有些项目使用了消息转换json,我这个demo也使用了消息转json,目的是后台管理的界面可以看到发送的消息是json格式(未加密,但中文会乱码),但是消息接收的时候也需要进行json转换,需要在项目中加rabbitmq的配置类,连接工厂对象会在这个类里加工一下,所以发现问题所在了,配置类加工连接的时候漏了,需要我们"手动加上的"。

  1. import org.springframework.amqp.core.AcknowledgeMode;
  2. import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
  3. import org.springframework.amqp.rabbit.connection.ConnectionFactory;
  4. import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
  5. import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. @Configuration
  9. public class RabbitMQConfig {
  10. @Bean
  11. public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
  12. SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
  13. factory.setConnectionFactory(connectionFactory);
  14. factory.setMessageConverter(new Jackson2JsonMessageConverter()); // json转消息
  15. factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 手动确认(手动加上的)
  16. factory.setPrefetchCount(1); // 每次只能获取一条,处理完成才能获取下一条(手动加上的)
  17. return factory;
  18. }
  19. }

结果

 总结

生产者项目配置了消息转json,那么,消费者项目是需要用配置类设置json转消息的,如果在yml或者properties配置文件修改了参数,不起作用的时候,需要检查配置类,手动加上变更的参数。(自己写demo的时候知道消息体可以转json就好了,还是不配置比较好。)

最后,谢谢大佬帖子给我启发,大家可以见原帖rabbitmq设置手动ack报错:Channel closed; cannot ack/nack_qq_42894258的博客-CSDN博客

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/凡人多烦事01/article/detail/625967
推荐阅读
相关标签
  

闽ICP备14008679号