当前位置:   article > 正文

kafka-spring实现对于topic的监听的开启、暂停、暂停后重新开始、停止

kafka-spring实现对于topic的监听的开启、暂停、暂停后重新开始、停止

kafka-spring实现对于topic的监听的开启、暂停、暂停后重新开始、停止

直接上代码

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;

import java.util.HashMap;
import java.util.Map;

/**
 * kafka整体配置类
 *
 * @author Dean
 */
@Configuration
public class KafkaConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;
    @Value("${spring.kafka.admin.client-id}")
    private String adminClientId;

    @Bean
    public AdminClient adminClient() {
        Map<String, Object> configs = new HashMap<>(5);
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configs.put(AdminClientConfig.CLIENT_ID_CONFIG, adminClientId);
        return AdminClient.create(configs);
    }

    /**
     * 如果有多个消费组,需要定义多个不同的ConcurrentKafkaListenerContainerFactory
     *
     * @return ConcurrentKafkaListenerContainerFactory
     */
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        String groupId = "dmGroup";
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        //是否自动提交ack
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        //一次拉取最大数据量,默认值为500,如果拉取时不足配置的条数则有多少拉取多少
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);

        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        //是否批量这个设置好像只对配置了@KafkaListener的方法有用
        factory.setBatchListener(false);
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(props));
        //手动提交ack
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        return factory;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
 
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.listener.AcknowledgingMessageListener;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;

/**
 * kafka中间件的逻辑封装类
 */
@Slf4j
@Service
public class KafkaListenerManagement {
    private final Map<String, MessageListenerContainer> containers = new ConcurrentHashMap<>();
    /**
     * 如果有多个消费组,需要注入多个不同的ConcurrentKafkaListenerContainerFactory
     */
    private final ConcurrentKafkaListenerContainerFactory<String, String> containerFactory;

    @Autowired
    public KafkaListenerManagement(ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
        this.containerFactory = containerFactory;
    }

    /**
     * 开启Topic的监听
     *
     * @param topic            topic
     * @param bizLogicConsumer 消息的业务逻辑处理
     */
    public void startListening(String topic, BiConsumer<String, Acknowledgment> bizLogicConsumer) {
        //必须手动提交ACK,否则停止监听后重新监听可能导致拉取到重复的记录
        AcknowledgingMessageListener<String, String> messageListener =
                (message, acknowledgment) -> bizLogicConsumer.accept(message.value(), acknowledgment);

        MessageListenerContainer container = containerFactory.createContainer(topic);
        container.setupMessageListener(messageListener);
        container.start();
        containers.put(topic, container);
    }

    /**
     * 暂停监听
     *
     * @param topic topic
     */
    public void pauseListening(String topic) {
        MessageListenerContainer container = containers.get(topic);
        container.pause();
    }

    /**
     * 暂停后继续监听
     *
     * @param topic topic
     */
    public void resumeListening(String topic) {
        MessageListenerContainer container = containers.get(topic);
        container.resume();
    }

    /**
     * 停止监听
     *
     * @param topic topic
     */
    public void stopListening(String topic) {
        MessageListenerContainer container = containers.remove(topic);
        if (container != null) {
            container.stop();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
/**
 * Kafka生产者
 *
 * @author LiuChang
 */
@Service
public class KafkaProducerManagement {
    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;

    /**
     * 异步发送
     *
     * @param topic   topic
     * @param message 消息
     * @return ListenableFuture
     */
    public ListenableFuture<SendResult<String, String>> send(String topic, String message) {
        return kafkaTemplate.send(topic, message);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
import com.feiynn.kafka.management.KafkaListenerManagement;
import com.feiynn.kafka.management.KafkaProducerManagement;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;

import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;

/**
 * 业务逻辑类
 * 注意:业务逻辑类不建议直接调用kafka的API,都调用封装后的Kafka相关的Management类
 *
 * @author Dean
 */
@Slf4j
@Service
public class BizService {
    @Resource
    private KafkaListenerManagement kafkaListenerManagement;
    @Resource
    private KafkaProducerManagement kafkaProducerManagement;

    /**
     * 开启topic监听后的业务逻辑
     *
     * @param topic topic
     */
    public void startListening(String topic) {
        kafkaListenerManagement.startListening(topic, (data, acknowledgment) -> {
            //消息处理业务逻辑
            log.info("Received message value: [{}]", data);
            try {
                //降低消费速率,方便观察日志
                TimeUnit.MILLISECONDS.sleep(100L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            acknowledgment.acknowledge();
        });
    }

    /**
     * 停止topic监听
     *
     * @param topic topic
     */
    public void stopListening(String topic) {
        kafkaListenerManagement.stopListening(topic);
    }

    /**
     * 暂停监听
     *
     * @param topic topic
     */
    public void pauseListening(String topic) {
        kafkaListenerManagement.pauseListening(topic);
    }

    /**
     * 暂停后继续监听
     *
     * @param topic topic
     */
    public void resumeListening(String topic) {
        kafkaListenerManagement.resumeListening(topic);
    }

    /**
     * 发送消息
     *
     * @param topic   topic
     * @param message 消息
     */
    public void sendMsg(String topic, String message) {
        ListenableFuture<SendResult<String, String>> listenableFuture = kafkaProducerManagement.send(topic, message);
        //添加回调逻辑,异步获取发送结果
        listenableFuture.addCallback((sendResult) -> {
            //发送成功
            log.trace("Send [{}] success", message);
        }, (e) -> {
            //发送失败,可以执行降级策略,或者把消息写入日志后续进行统一处理
            log.error("Send [{}] failed", message, e);
        });
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = KafkaAdvancedApplication.class)
public class KafkaAdvancedTest {
    @Resource
    private BizService bizService;

    /**
     * 测试topic监听的开启、暂停、暂停后重新开始、停止
     */
    @Test
    public void startStopListening() throws InterruptedException {
 
        String topicDm = "dm0";

        //开启topic监听
        bizService.startListening(topicDm);
        TimeUnit.SECONDS.sleep(2);
        //消息前缀,用来区分是上一次发送的未消费完的消息还是本次发送的消息
        String msgPre = LocalTime.now().toString();
        log.info("msgPre=[{}]", msgPre);
        for (int i = 0; i < 2000; i++) {
            bizService.sendMsg(topicDm, "Msg_" + msgPre + "_" + i);
        }
        TimeUnit.SECONDS.sleep(5);

        log.info("pause listening begin");
        bizService.pauseListening(topicDm);
        log.info("pause listening success");
        //暂停监听成功后,消费者会把配置max.poll.records条数的消息消费完才会真正停止,因此停顿足够长的时间后观察消息消费的日志是否会暂停输出
        TimeUnit.SECONDS.sleep(20);
        log.info("resume listening");
        //暂停后重新开启消息监听
        bizService.resumeListening(topicDm);
        TimeUnit.SECONDS.sleep(20);

        //新一轮暂停与重启
        log.info("pause listening again");
        bizService.pauseListening(topicDm);
        TimeUnit.SECONDS.sleep(10);
        log.info("resume listening again");
        bizService.resumeListening(topicDm);

        //继续消费一段时间
        TimeUnit.SECONDS.sleep(10);
        //消费一段时间后停止监听
        log.info("stop listening");
        bizService.stopListening(topicDm);
        TimeUnit.SECONDS.sleep(20);

        //重新开启topic监听
        log.info("start listening again");
        bizService.startListening(topicDm);
        TimeUnit.SECONDS.sleep(120);
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57

直接运行测试用例,通过观察日志,即可看出各种操作效果

遇到的问题

遇到停止监听topic后,从看到消费消息的日志观察,有时会一直打印,有时会打印一段时间就停止打印的问题,最终发现暂停监听方法调用成功后,消费者会把配置max.poll.records条数的消息消费完才会真正暂停或者停止。
另外如果不是手动提交ack,停止stop(不是暂停pause)订阅topic然后后重新开始订阅(start),可能会出现重复消费消息的问题,改成手动提交ack后问题不再出现。
还考虑到max.poll.interval.ms 最大拉取时间间隔是5分钟,尝试了暂停5分30秒看是否消费者会被因为rebalance,导致在resume重新监听无法成功,测试结果是没有问题,可以成功继续监听并消费消息。
代码已使用到生产环境。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/很楠不爱3/article/detail/478858
推荐阅读
相关标签
  

闽ICP备14008679号