赞
踩
通过listenter中errorhandler监听异常进行重试。
@KafkaListener(errorHandler = "consumerAwareListenerErrorHandler")
package cn.gov.chinatax.gt4.szc.sj.sjszzh.listenter; import cn.gov.chinatax.gt4.szc.sj.sjszzh.common.elasticsearch.ElasticsearchService; import cn.gov.chinatax.gt4.szc.sj.sjszzh.common.elasticsearch.model.SsRzSsjl; import cn.gov.chinatax.gt4.szc.sj.sjszzh.pojo.dto.SsContextDTO; import cn.hutool.core.util.ObjectUtil; import cn.hutool.json.JSONUtil; import com.alibaba.fastjson.JSONObject; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.type.CollectionType; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.poi.ss.formula.functions.T; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler; import org.springframework.kafka.listener.ErrorHandler; import org.springframework.kafka.listener.ListenerExecutionFailedException; import org.springframework.messaging.Message; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; /** * kafka消费时异常重试 */ @Component @Slf4j public class KafkaConsumerListenter{ public final String ES_INDEX_SS_RZ_KAFKA_FAIL = "ss_rz_kafkasend_fail"; @Resource @Qualifier("kafkaSjSsTemplate") private KafkaTemplate kafkaSsTemplate; @Resource protected ElasticsearchService esService; @Bean public ConsumerAwareListenerErrorHandler consumerAwareListenerErrorHandler(){ return new ConsumerAwareListenerErrorHandler() { @Override public Object handleError(Message<?> message, ListenerExecutionFailedException e, Consumer<?, ?> consumer) { if(!ObjectUtil.isEmpty(e)){ ObjectMapper objectMapper = new ObjectMapper(); List<ConsumerRecord<String, String>> consumerRecords = new ArrayList<>(); try { consumerRecords = (List<ConsumerRecord<String, String>>) message.getPayload(); for(ConsumerRecord<String, String> t : consumerRecords){ SsContextDTO ssContextDTO = objectMapper.readValue(JSONUtil.toJsonStr(t.value()),SsContextDTO.class); ssContextDTO.setKafkasendnum(ssContextDTO.getKafkasendnum()+1); if(ssContextDTO.getKafkasendnum()<3){ log.info("topic{},consumer num{}",t.topic(),ssContextDTO.getKafkasendnum()); kafkaSsTemplate.send(t.topic(),t.key(),JSONUtil.toJsonStr(ssContextDTO)); }else { SsRzSsjl rzSsjl = new SsRzSsjl(); rzSsjl.setXwlsh(ssContextDTO.getRowkey()); rzSsjl.setRzmx(Arrays.asList(JSONUtil.toJsonStr(ssContextDTO))); esService.saveSsRzSsjl(rzSsjl,ES_INDEX_SS_RZ_KAFKA_FAIL); } } } catch (JsonProcessingException ex) { log.info("consumer message fail"); throw new RuntimeException(ex); } }else { log.info("consumer message fail"); } return null; } }; } }
通过ProducerListener 中onerror进行处理。
package cn.gov.chinatax.gt4.szc.sj.sjszzh.listenter; import cn.gov.chinatax.gt4.szc.sj.sjszzh.common.elasticsearch.ElasticsearchService; import cn.gov.chinatax.gt4.szc.sj.sjszzh.common.elasticsearch.model.SsRzSsjl; import cn.gov.chinatax.gt4.szc.sj.sjszzh.common.utils.SsToolUtils; import cn.gov.chinatax.gt4.szc.sj.sjszzh.pojo.dto.SsContextDTO; import cn.hutool.json.JSONUtil; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.ProducerListener; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.Arrays; /** * kafka发送时异常重试 */ @Component @Slf4j public class KafkaProducerListenter implements ProducerListener { public final String ES_INDEX_SS_RZ_KAFKA_FAIL = "ss_rz_kafkasend_fail"; @Resource @Qualifier("kafkaSjSsTemplate") private KafkaTemplate kafkaSsTemplate; @Resource protected ElasticsearchService esService; @Override public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) { log.info("message send sucess:topic:{}",producerRecord.topic()); } @Override public void onError(ProducerRecord producerRecord, Exception exception) { ObjectMapper objectMapper = new ObjectMapper(); SsContextDTO ssContextDTO = new SsContextDTO(); try { ssContextDTO = objectMapper.readValue(JSONUtil.toJsonStr(producerRecord.value()), SsContextDTO.class); } catch (JsonProcessingException e) { log.info("message retry exception"+ SsToolUtils.getExcepitonInfo(e)); throw new RuntimeException(e); } ssContextDTO.setKafkasendnum(ssContextDTO.getKafkasendnum()+1); if(ssContextDTO.getKafkasendnum()<3){ log.info("topic{},consumer num{}",producerRecord.topic(),ssContextDTO.getKafkasendnum()); kafkaSsTemplate.send(producerRecord.topic(),producerRecord.key(),JSONUtil.toJsonStr(ssContextDTO)); }else { SsRzSsjl rzSsjl = new SsRzSsjl(); rzSsjl.setXwlsh(ssContextDTO.getRowkey()); rzSsjl.setRzmx(Arrays.asList(JSONUtil.toJsonStr(ssContextDTO))); esService.saveSsRzSsjl(rzSsjl,ES_INDEX_SS_RZ_KAFKA_FAIL); } } }
kafkaSsTemplate.setProducerListener(kafkaProducerListenter);
kafkaSsTemplate.send(yhsjgywxl, gfdjxh + xfdjxh, CssJsonUtils.toJson(ssContextDTO));
https://blog.csdn.net/yangshangwei/article/details/113846000
https://www.jianshu.com/p/9bf9809b7491
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。