当前位置:   article > 正文

kafka重试实例-生产端重试,消费端重试_@kafkalistener配置重试

@kafkalistener配置重试

消费端异常重试

通过listenter中errorhandler监听异常进行重试。

 @KafkaListener(errorHandler = "consumerAwareListenerErrorHandler")
  • 1
package cn.gov.chinatax.gt4.szc.sj.sjszzh.listenter;

import cn.gov.chinatax.gt4.szc.sj.sjszzh.common.elasticsearch.ElasticsearchService;
import cn.gov.chinatax.gt4.szc.sj.sjszzh.common.elasticsearch.model.SsRzSsjl;
import cn.gov.chinatax.gt4.szc.sj.sjszzh.pojo.dto.SsContextDTO;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.json.JSONUtil;
import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.type.CollectionType;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.poi.ss.formula.functions.T;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;
import org.springframework.kafka.listener.ErrorHandler;
import org.springframework.kafka.listener.ListenerExecutionFailedException;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

/**
 * kafka消费时异常重试
 */
@Component
@Slf4j
public class KafkaConsumerListenter{

    public final String ES_INDEX_SS_RZ_KAFKA_FAIL = "ss_rz_kafkasend_fail";
    @Resource
    @Qualifier("kafkaSjSsTemplate")
    private KafkaTemplate kafkaSsTemplate;

    @Resource
    protected ElasticsearchService esService;

    @Bean
    public ConsumerAwareListenerErrorHandler consumerAwareListenerErrorHandler(){
        return new ConsumerAwareListenerErrorHandler() {
            @Override
            public Object handleError(Message<?> message, ListenerExecutionFailedException e, Consumer<?, ?> consumer) {
                if(!ObjectUtil.isEmpty(e)){
                    ObjectMapper objectMapper = new ObjectMapper();
                    List<ConsumerRecord<String, String>> consumerRecords = new ArrayList<>();
                    try {
                        consumerRecords = (List<ConsumerRecord<String, String>>) message.getPayload();
                        for(ConsumerRecord<String, String> t : consumerRecords){
                            SsContextDTO ssContextDTO = objectMapper.readValue(JSONUtil.toJsonStr(t.value()),SsContextDTO.class);
                            ssContextDTO.setKafkasendnum(ssContextDTO.getKafkasendnum()+1);
                            if(ssContextDTO.getKafkasendnum()<3){
                                log.info("topic{},consumer num{}",t.topic(),ssContextDTO.getKafkasendnum());
                                kafkaSsTemplate.send(t.topic(),t.key(),JSONUtil.toJsonStr(ssContextDTO));
                            }else {
                                SsRzSsjl rzSsjl = new SsRzSsjl();
                                rzSsjl.setXwlsh(ssContextDTO.getRowkey());
                                rzSsjl.setRzmx(Arrays.asList(JSONUtil.toJsonStr(ssContextDTO)));
                                esService.saveSsRzSsjl(rzSsjl,ES_INDEX_SS_RZ_KAFKA_FAIL);
                            }
                        }
                    } catch (JsonProcessingException ex) {
                        log.info("consumer message fail");
                        throw new RuntimeException(ex);
                    }

                }else {
                    log.info("consumer message fail");
                }
                return null;
            }
        };
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82

生产端异常重试

通过ProducerListener 中onerror进行处理。

package cn.gov.chinatax.gt4.szc.sj.sjszzh.listenter;

import cn.gov.chinatax.gt4.szc.sj.sjszzh.common.elasticsearch.ElasticsearchService;
import cn.gov.chinatax.gt4.szc.sj.sjszzh.common.elasticsearch.model.SsRzSsjl;
import cn.gov.chinatax.gt4.szc.sj.sjszzh.common.utils.SsToolUtils;
import cn.gov.chinatax.gt4.szc.sj.sjszzh.pojo.dto.SsContextDTO;
import cn.hutool.json.JSONUtil;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.Arrays;

/**
 * kafka发送时异常重试
 */
@Component
@Slf4j
public class KafkaProducerListenter implements ProducerListener {

    public final String ES_INDEX_SS_RZ_KAFKA_FAIL = "ss_rz_kafkasend_fail";
    @Resource
    @Qualifier("kafkaSjSsTemplate")
    private KafkaTemplate kafkaSsTemplate;

    @Resource
    protected ElasticsearchService esService;

    @Override
    public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) {
        log.info("message send sucess:topic:{}",producerRecord.topic());
    }

    @Override
    public void onError(ProducerRecord producerRecord, Exception exception) {
        ObjectMapper objectMapper = new ObjectMapper();
        SsContextDTO ssContextDTO = new SsContextDTO();
        try {
            ssContextDTO = objectMapper.readValue(JSONUtil.toJsonStr(producerRecord.value()), SsContextDTO.class);
        } catch (JsonProcessingException e) {
            log.info("message retry exception"+ SsToolUtils.getExcepitonInfo(e));
            throw new RuntimeException(e);
        }
        ssContextDTO.setKafkasendnum(ssContextDTO.getKafkasendnum()+1);
        if(ssContextDTO.getKafkasendnum()<3){
            log.info("topic{},consumer num{}",producerRecord.topic(),ssContextDTO.getKafkasendnum());
            kafkaSsTemplate.send(producerRecord.topic(),producerRecord.key(),JSONUtil.toJsonStr(ssContextDTO));
        }else {
            SsRzSsjl rzSsjl = new SsRzSsjl();
            rzSsjl.setXwlsh(ssContextDTO.getRowkey());
            rzSsjl.setRzmx(Arrays.asList(JSONUtil.toJsonStr(ssContextDTO)));
            esService.saveSsRzSsjl(rzSsjl,ES_INDEX_SS_RZ_KAFKA_FAIL);
        }
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63

监听操作

kafkaSsTemplate.setProducerListener(kafkaProducerListenter);
kafkaSsTemplate.send(yhsjgywxl,  gfdjxh + xfdjxh, CssJsonUtils.toJson(ssContextDTO));
  • 1
  • 2

https://blog.csdn.net/yangshangwei/article/details/113846000
https://www.jianshu.com/p/9bf9809b7491

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/502313
推荐阅读
相关标签
  

闽ICP备14008679号