当前位置:   article > 正文

If the serialization is only done by a trusted source, you can also enable trust all (*).解决方法

If the serialization is only done by a trusted source, you can also enable trust all (*).解决方法

在写 kafka-消费者-消费对象消息(了解)(SpringBoot整合Kafka)这篇博客时遇到的bug

1、反序列化kafka消息出现 IllegalArgumentException

Caused by: java.lang.IllegalArgumentException: The class 'com.atguigu.kafka.bean.UserDTO' is not in the trusted packages: [java.util, java.lang]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
	at org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:129) ~[spring-kafka-3.0.5.jar:3.0.5]
	at org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:103) ~[spring-kafka-3.0.5.jar:3.0.5]
	at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:572) ~[spring-kafka-3.0.5.jar:3.0.5]
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1429) ~[kafka-clients-3.3.2.jar:na]
  • 1
  • 2
  • 3
  • 4
  • 5

在这里插入图片描述

2024-06-06T20:37:12.361+08:00 ERROR 6592 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Consumer exception

java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer
	at org.springframework.kafka.listener.DefaultErrorHandler.handleOtherException(DefaultErrorHandler.java:198) ~[spring-kafka-3.0.5.jar:3.0.5]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1996) ~[spring-kafka-3.0.5.jar:3.0.5]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1431) ~[spring-kafka-3.0.5.jar:3.0.5]
	at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[na:na]
	at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
Caused by: org.apache.kafka.common.errors.RecordDeserializationException: Error deserializing key/value for partition my_topic1-1 at offset 0. If needed, please seek past the record to continue consumption.
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1438) ~[kafka-clients-3.3.2.jar:na]
	at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:133) ~[kafka-clients-3.3.2.jar:na]
	at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1661) ~[kafka-clients-3.3.2.jar:na]
	at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1900(Fetcher.java:1497) ~[kafka-clients-3.3.2.jar:na]
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:717) ~[kafka-clients-3.3.2.jar:na]
	at org.apache.kafka.clients.consumer.internals.Fetcher.collectFetch(Fetcher.java:683) ~[kafka-clients-3.3.2.jar:na]
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1291) ~[kafka-clients-3.3.2.jar:na]
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1247) ~[kafka-clients-3.3.2.jar:na]
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1220) ~[kafka-clients-3.3.2.jar:na]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollConsumer(KafkaMessageListenerContainer.java:1711) ~[spring-kafka-3.0.5.jar:3.0.5]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1686) ~[spring-kafka-3.0.5.jar:3.0.5]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1487) ~[spring-kafka-3.0.5.jar:3.0.5]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1379) ~[spring-kafka-3.0.5.jar:3.0.5]
	... 2 common frames omitted
Caused by: java.lang.IllegalArgumentException: The class 'com.atguigu.kafka.bean.UserDTO' is not in the trusted packages: [java.util, java.lang]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
	at org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:129) ~[spring-kafka-3.0.5.jar:3.0.5]
	at org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:103) ~[spring-kafka-3.0.5.jar:3.0.5]
	at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:572) ~[spring-kafka-3.0.5.jar:3.0.5]
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1429) ~[kafka-clients-3.3.2.jar:na]
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

这个错误是由于Spring Kafka在反序列化Kafka消息时,无法确定消息体的类型所导致的。在Spring Kafka中,为了安全起见,只有在Java包java.util和java.lang中的类才能被反序列化。如果你的类不在这些包中,你需要将其添加到可信包中,或者选择信任所有的类。

2、解决办法 appication.yml中修改生产者序列化器

server:
  port: 8120
# v1
spring:
  Kafka:
    bootstrap-servers: 192.168.74.148:9095,192.168.74.148:9096,192.168.74.148:9097
    consumer:
      # read-committed读事务已提交的消息 解决脏读问题
      isolation-level: read-committed # 消费者的事务隔离级别:read-uncommitted会导致脏读,可以读取生产者事务还未提交的消息
      # 消费者是否自动ack :true自动ack 消费者获取到消息后kafka提交消费者偏移量
      # 调用ack方法时才会提交ack给kafka
#      enable-auto-commit: false
      # 消费者提交ack时多长时间批量提交一次
      auto-commit-interval: 1000
      # 消费者第一次消费主题消息时从哪个位置开始
      # earliest:从最早的消息开始消费
      # latest:第一次从LEO位置开始消费
      # none:如果主题分区没有偏移量,则抛出异常
      auto-offset-reset: earliest  #指定Offset消费:earliest | latest | none
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # json反序列化器
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
    #      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      properties:
        spring.json.trusted.packages: "*"
    listener:
      # 手动ack:manual手动ack时 如果有异常会尝试一直消费
#      ack-mode: manual
      # 手动ack:消费有异常时停止
      ack-mode: manual_immediate

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

在这里插入图片描述

spring:
  Kafka:
      properties:
        spring.json.trusted.packages: "*"
  • 1
  • 2
  • 3
  • 4
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/凡人多烦事01/article/detail/708744
推荐阅读
相关标签
  

闽ICP备14008679号