当前位置:   article > 正文

Rabbit消费对象天坑(序列化/反序列化)_error handler converted exception to fatal

error handler converted exception to fatal

一、问题描述

  1. 2022-05-03 14:01:40.630 WARN 16876 --- [ntContainer#0-2] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.
  2. org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Failed to convert message
  3. at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:155) ~[spring-rabbit-2.4.2.jar:2.4.2]
  4. at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1665) ~[spring-rabbit-2.4.2.jar:2.4.2]
  5. at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1584) ~[spring-rabbit-2.4.2.jar:2.4.2]
  6. at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1572) ~[spring-rabbit-2.4.2.jar:2.4.2]
  7. at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1563) ~[spring-rabbit-2.4.2.jar:2.4.2]
  8. at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1507) ~[spring-rabbit-2.4.2.jar:2.4.2]
  9. at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:967) [spring-rabbit-2.4.2.jar:2.4.2]
  10. at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:914) [spring-rabbit-2.4.2.jar:2.4.2]
  11. at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83) [spring-rabbit-2.4.2.jar:2.4.2]
  12. at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1291) [spring-rabbit-2.4.2.jar:2.4.2]
  13. at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1197) [spring-rabbit-2.4.2.jar:2.4.2]
  14. at java.lang.Thread.run(Thread.java:748) [na:1.8.0_301]
  15. Caused by: org.springframework.amqp.support.converter.MessageConversionException: failed to convert serialized Message content
  16. at org.springframework.amqp.support.converter.SimpleMessageConverter.fromMessage(SimpleMessageConverter.java:117) ~[spring-amqp-2.4.2.jar:2.4.2]
  17. at org.springframework.amqp.rabbit.listener.adapter.AbstractAdaptableMessageListener.extractMessage(AbstractAdaptableMessageListener.java:342) ~[spring-rabbit-2.4.2.jar:2.4.2]
  18. at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter$MessagingMessageConverterAdapter.extractPayload(MessagingMessageListenerAdapter.java:365) ~[spring-rabbit-2.4.2.jar:2.4.2]
  19. at org.springframework.amqp.support.converter.MessagingMessageConverter.fromMessage(MessagingMessageConverter.java:132) ~[spring-amqp-2.4.2.jar:2.4.2]
  20. at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.toMessagingMessage(MessagingMessageListenerAdapter.java:242) ~[spring-rabbit-2.4.2.jar:2.4.2]
  21. at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:145) ~[spring-rabbit-2.4.2.jar:2.4.2]
  22. ... 11 common frames omitted
  23. Caused by: java.lang.IllegalStateException: Could not deserialize object type
  24. at org.springframework.amqp.utils.SerializationUtils.deserialize(SerializationUtils.java:97) ~[spring-amqp-2.4.2.jar:2.4.2]
  25. at org.springframework.amqp.support.converter.SimpleMessageConverter.fromMessage(SimpleMessageConverter.java:113) ~[spring-amqp-2.4.2.jar:2.4.2]
  26. ... 16 common frames omitted
  27. Caused by: java.lang.ClassNotFoundException: com.example.cloud.producer.rabbitmq.entity.RabbitOrder
  28. at java.net.URLClassLoader.findClass(URLClassLoader.java:382) ~[na:1.8.0_301]
  29. at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[na:1.8.0_301]
  30. at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355) ~[na:1.8.0_301]
  31. at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[na:1.8.0_301]
  32. at java.lang.Class.forName0(Native Method) ~[na:1.8.0_301]
  33. at java.lang.Class.forName(Class.java:348) ~[na:1.8.0_301]
  34. at org.springframework.util.ClassUtils.forName(ClassUtils.java:284) ~[spring-core-5.3.16.jar:5.3.16]
  35. at org.springframework.core.ConfigurableObjectInputStream.resolveClass(ConfigurableObjectInputStream.java:76) ~[spring-core-5.3.16.jar:5.3.16]
  36. at org.springframework.amqp.support.converter.SimpleMessageConverter$1.resolveClass(SimpleMessageConverter.java:183) ~[spring-amqp-2.4.2.jar:2.4.2]
  37. at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1984) ~[na:1.8.0_301]
  38. at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1848) ~[na:1.8.0_301]
  39. at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2158) ~[na:1.8.0_301]
  40. at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1665) ~[na:1.8.0_301]
  41. at java.io.ObjectInputStream.readObject(ObjectInputStream.java:501) ~[na:1.8.0_301]
  42. at java.io.ObjectInputStream.readObject(ObjectInputStream.java:459) ~[na:1.8.0_301]
  43. at org.springframework.amqp.utils.SerializationUtils.deserialize(SerializationUtils.java:91) ~[spring-amqp-2.4.2.jar:2.4.2]
  44. ... 17 common frames omitted
  45. 2022-05-03 14:01:40.630 WARN 16876 --- [ntContainer#0-2] ingErrorHandler$DefaultExceptionStrategy : Fatal message conversion error; message rejected; it will be dropped or routed to a dead letter exchange, if so configured: (Body:'[serialized object]' MessageProperties [headers={spring_listener_return_correlation=b2329a07-f83e-4999-b7b4-5d1c7b65c767, spring_returned_message_correlation=1651557699966$3b0f387d-b17f-42df-b060-06ffa0a71910}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=order-exchange, receivedRoutingKey=order.user, deliveryTag=1, consumerTag=amq.ctag-sxIMscspKJ98oEOaFyIyPQ, consumerQueue=order-queue])
  46. 2022-05-03 14:01:40.630 ERROR 16876 --- [ntContainer#0-2] o.s.a.r.l.SimpleMessageListenerContainer : Execution of Rabbit message listener failed, and the error handler threw an exception
  47. org.springframework.amqp.AmqpRejectAndDontRequeueException: Error Handler converted exception to fatal
  48. at org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler.handleError(ConditionalRejectingErrorHandler.java:146) ~[spring-rabbit-2.4.2.jar:2.4.2]
  49. at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeErrorHandler(AbstractMessageListenerContainer.java:1469) [spring-rabbit-2.4.2.jar:2.4.2]
  50. at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.handleListenerException(AbstractMessageListenerContainer.java:1753) [spring-rabbit-2.4.2.jar:2.4.2]
  51. at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1528) [spring-rabbit-2.4.2.jar:2.4.2]
  52. at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:967) [spring-rabbit-2.4.2.jar:2.4.2]
  53. at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:914) [spring-rabbit-2.4.2.jar:2.4.2]
  54. at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83) [spring-rabbit-2.4.2.jar:2.4.2]
  55. at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1291) [spring-rabbit-2.4.2.jar:2.4.2]
  56. at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1197) [spring-rabbit-2.4.2.jar:2.4.2]
  57. at java.lang.Thread.run(Thread.java:748) [na:1.8.0_301]
  58. Caused by: org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Failed to convert message
  59. at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:155) ~[spring-rabbit-2.4.2.jar:2.4.2]
  60. at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1665) [spring-rabbit-2.4.2.jar:2.4.2]
  61. at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1584) [spring-rabbit-2.4.2.jar:2.4.2]
  62. at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1572) [spring-rabbit-2.4.2.jar:2.4.2]
  63. at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1563) [spring-rabbit-2.4.2.jar:2.4.2]
  64. at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1507) [spring-rabbit-2.4.2.jar:2.4.2]
  65. ... 6 common frames omitted
  66. Caused by: org.springframework.amqp.support.converter.MessageConversionException: failed to convert serialized Message content
  67. at org.springframework.amqp.support.converter.SimpleMessageConverter.fromMessage(SimpleMessageConverter.java:117) ~[spring-amqp-2.4.2.jar:2.4.2]
  68. at org.springframework.amqp.rabbit.listener.adapter.AbstractAdaptableMessageListener.extractMessage(AbstractAdaptableMessageListener.java:342) ~[spring-rabbit-2.4.2.jar:2.4.2]
  69. at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter$MessagingMessageConverterAdapter.extractPayload(MessagingMessageListenerAdapter.java:365) ~[spring-rabbit-2.4.2.jar:2.4.2]
  70. at org.springframework.amqp.support.converter.MessagingMessageConverter.fromMessage(MessagingMessageConverter.java:132) ~[spring-amqp-2.4.2.jar:2.4.2]
  71. at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.toMessagingMessage(MessagingMessageListenerAdapter.java:242) ~[spring-rabbit-2.4.2.jar:2.4.2]
  72. at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:145) ~[spring-rabbit-2.4.2.jar:2.4.2]
  73. ... 11 common frames omitted
  74. Caused by: java.lang.IllegalStateException: Could not deserialize object type
  75. at org.springframework.amqp.utils.SerializationUtils.deserialize(SerializationUtils.java:97) ~[spring-amqp-2.4.2.jar:2.4.2]
  76. at org.springframework.amqp.support.converter.SimpleMessageConverter.fromMessage(SimpleMessageConverter.java:113) ~[spring-amqp-2.4.2.jar:2.4.2]
  77. ... 16 common frames omitted
  78. Caused by: java.lang.ClassNotFoundException: com.example.cloud.producer.rabbitmq.entity.RabbitOrder
  79. at java.net.URLClassLoader.findClass(URLClassLoader.java:382) ~[na:1.8.0_301]
  80. at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[na:1.8.0_301]
  81. at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355) ~[na:1.8.0_301]
  82. at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[na:1.8.0_301]
  83. at java.lang.Class.forName0(Native Method) ~[na:1.8.0_301]
  84. at java.lang.Class.forName(Class.java:348) ~[na:1.8.0_301]
  85. at org.springframework.util.ClassUtils.forName(ClassUtils.java:284) ~[spring-core-5.3.16.jar:5.3.16]
  86. at org.springframework.core.ConfigurableObjectInputStream.resolveClass(ConfigurableObjectInputStream.java:76) ~[spring-core-5.3.16.jar:5.3.16]
  87. at org.springframework.amqp.support.converter.SimpleMessageConverter$1.resolveClass(SimpleMessageConverter.java:183) ~[spring-amqp-2.4.2.jar:2.4.2]
  88. at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1984) ~[na:1.8.0_301]
  89. at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1848) ~[na:1.8.0_301]
  90. at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2158) ~[na:1.8.0_301]
  91. at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1665) ~[na:1.8.0_301]
  92. at java.io.ObjectInputStream.readObject(ObjectInputStream.java:501) ~[na:1.8.0_301]
  93. at java.io.ObjectInputStream.readObject(ObjectInputStream.java:459) ~[na:1.8.0_301]
  94. at org.springframework.amqp.utils.SerializationUtils.deserialize(SerializationUtils.java:91) ~[spring-amqp-2.4.2.jar:2.4.2]
  95. ... 17 common frames omitted

二、解决方案

方案一:共同使用一个对象

描述:我目前生产者和消费者都有一个对象,内容是一模一样的包括序列化,可就是消费者接收时报序列化错误

解决方案:把这个对象放到公共模块,然后生产者和消费者服务引入这个公共模块,生产者和消费者发送和消费时共用一个对象。这样完全保证了两个项目中JavaBean是一致的,所以能解决反序列失败的问题

缺点:局限性太小,这种模式生产者和消费者只能在同一个微服务下才能使用

方案二:消息JSON序列化(推荐)

生产者确认机制配置

  1. package com.vueadmin.security.config;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
  4. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. import java.util.HashMap;
  9. @Configuration
  10. public class RabbitConfig {
  11. @Autowired
  12. private CachingConnectionFactory cachingConnectionFactory;
  13. private final String EXCHANGE_NAME="nome_exchange";
  14. // private final String DeadEXCHANGE_NAME="dead_nome_exchange";
  15. private final String QUEUE_NAME="nome_queue";
  16. // private final String DeadQUEUE_NAME="dead_nome_queue";
  17. @Bean
  18. public RabbitTemplate rabbitTemplate(){
  19. RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);
  20. rabbitTemplate.setConfirmCallback((data,ask,cause)->{
  21. if (ask){
  22. System.out.println("发送交换机成功"+data.getId());
  23. }else
  24. System.out.println("发送交换机失败"+data.getId());
  25. });
  26. rabbitTemplate.setMandatory(true);
  27. rabbitTemplate.setReturnsCallback((data)->{
  28. System.out.println("发送队列失败"+new String(data.getMessage().getBody()));
  29. });
  30. return rabbitTemplate;
  31. }
  32. // @Bean
  33. // public RabbitTemplate rabbitTemplate(){
  34. // RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);
  35. // rabbitTemplate.setConfirmCallback((data,ask,cause)->{
  36. // if (ask){
  37. // System.out.println("发送交换机成功"+data.getId());
  38. // 修改库中消息状态
  39. // updateStatusById(data.getId());
  40. // }else
  41. // System.out.println("发送交换机失败"+data.getId());
  42. // });
  43. // rabbitTemplate.setMandatory(true);
  44. // rabbitTemplate.setReturnsCallback((data)->{
  45. // System.out.println("发送队列失败"+new String(data.getMessage().getBody()));
  46. // });
  47. //
  48. // return rabbitTemplate;
  49. // }
  50. @Bean("nome_queue")
  51. public Queue queue(){
  52. // HashMap<String, Object> map = new HashMap<>();
  53. // map.put("x-dead-letter-exchange",DeadEXCHANGE_NAME);
  54. // map.put("x-max-length",5);
  55. // map.put("x-message-ttl",5000);
  56. // map.put("x-message-ttl",50000);
  57. return new Queue(QUEUE_NAME,true,false,false);
  58. }
  59. // @Bean("dead_nome_queue")
  60. // public Queue deadQueue(){
  61. // return new Queue(DeadQUEUE_NAME,true,false,false);
  62. // }
  63. @Bean("nome_exchange")
  64. public DirectExchange directExchange(){
  65. return new DirectExchange(EXCHANGE_NAME);
  66. }
  67. // @Bean("dead_nome_exchange")
  68. // public DirectExchange deadDirectExchange(){
  69. // return new DirectExchange(DeadEXCHANGE_NAME);
  70. // }
  71. @Bean
  72. public Binding binding(){
  73. return BindingBuilder.bind(queue()).to(directExchange()).with("nome");
  74. }
  75. // @Bean
  76. // public Binding deadBinding(){
  77. // return BindingBuilder.bind(deadQueue()).to(deadDirectExchange()).with("nome");
  78. // }
  79. }

yml配置

  1. spring:
  2. rabbitmq:
  3. host: 192.168.72.128
  4. virtual-host: /emp
  5. username: user
  6. password: 123456
  7. port: 5672
  8. publisher-returns: true
  9. listener:
  10. simple:
  11. acknowledge-mode: manual

生产者序列化

  1. package com.vueadmin.security.config;
  2. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  3. import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
  4. import org.springframework.beans.factory.InitializingBean;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.context.annotation.Configuration;
  7. @Configuration
  8. public class RabbitmqConfig implements InitializingBean {
  9. @Autowired
  10. private RabbitTemplate rabbitTemplate;
  11. @Override
  12. public void afterPropertiesSet() throws Exception {
  13. rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
  14. }
  15. }
  1. @Test
  2. void TextDirect(){
  3. CorrelationData data = new CorrelationData();
  4. String s = UUID.randomUUID().toString();
  5. data.setId(s);
  6. // MessagePostProcessor message = new MessagePostProcessor() {
  7. // @Override
  8. // public Message postProcessMessage(Message message) throws AmqpException {
  9. // message.getMessageProperties().setExpiration("5000");
  10. // return message;
  11. // }
  12. // };
  13. Emp emp = new Emp();
  14. emp.setId(1);
  15. emp.setName("张三");
  16. // String str = JSONUtil.toJsonStr(employee);
  17. rabbitTemplate.convertAndSend("nome_exchange","nome",emp,data);
  18. }

消费者反序列化

  1. @Bean
  2. public MessageConverter jsonMessageConverter(ObjectMapper objectMapper) {
  3. return new Jackson2JsonMessageConverter(objectMapper);
  4. }

消费者接收消息

  1. package com.test.csfl.pojo;
  2. import com.rabbitmq.client.Channel;
  3. import com.test.csfl.entity.Emp;
  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. import org.springframework.amqp.support.AmqpHeaders;
  6. import org.springframework.messaging.handler.annotation.Headers;
  7. import org.springframework.messaging.handler.annotation.Payload;
  8. import org.springframework.stereotype.Component;
  9. import java.io.IOException;
  10. import java.util.Map;
  11. @Component
  12. public class Consume {
  13. @RabbitListener(queues = "nome_queue")
  14. public void Handler(@Payload Emp emp, @Headers Map<String,Object> headers,Channel channel) throws IOException {
  15. System.out.println(emp.getName());
  16. long o = (long) headers.get(AmqpHeaders.DELIVERY_TAG);
  17. String s = (String) headers.get("spring_returned_message_correlation");
  18. System.out.println(s);
  19. channel.basicAck(o,false);
  20. }
  21. }

根据此文章信息进行书写

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/凡人多烦事01/article/detail/625950
推荐阅读
相关标签
  

闽ICP备14008679号