getKafkaConsumer() throws Exception { Properties properties = new Properties(); properties.putAll(xxx.getClientConfigs(kafkaClientId,kafkaUsername,kaf...">
当前位置:   article > 正文

Kafka Consumer 简易配置

Kafka Consumer 简易配置
@Bean(value = "kafkaConsumer", destroyMethod = "close")
public KafkaConsumer<String, String> getKafkaConsumer() throws Exception {
    Properties properties = new Properties();
  properties.putAll(xxx.getClientConfigs(kafkaClientId,kafkaUsername,kafkaPassword));
    //不设置默认值是latest(第一次消费或者越界从最新开始消费);earliest:第一次消费或者越界从最小位点开始消费数据
    properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    //是否自动提交位点,默认是true.默认的自动提交位点的时间间隔是5000ms,false的情况下是需要用户自己调用commit方法自己手动提交位点信息的
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties, new StringDeserializer(), new StringDeserializer());
    kafkaConsumer.subscribe(Collections.singleton(kafkaTopic));
    return kafkaConsumer;
}

Kafka Consumer

kafka消

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/657662
推荐阅读
相关标签
  

闽ICP备14008679号