getKafkaConsumer() throws Exception { Properties properties = new Properties(); properties.putAll(xxx.getClientConfigs(kafkaClientId,kafkaUsername,kaf...">
赞
踩
@Bean(value = "kafkaConsumer", destroyMethod = "close") public KafkaConsumer<String, String> getKafkaConsumer() throws Exception { Properties properties = new Properties(); properties.putAll(xxx.getClientConfigs(kafkaClientId,kafkaUsername,kafkaPassword)); //不设置默认值是latest(第一次消费或者越界从最新开始消费);earliest:第一次消费或者越界从最小位点开始消费数据 properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); //是否自动提交位点,默认是true.默认的自动提交位点的时间间隔是5000ms,false的情况下是需要用户自己调用commit方法自己手动提交位点信息的 properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50); KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties, new StringDeserializer(), new StringDeserializer()); kafkaConsumer.subscribe(Collections.singleton(kafkaTopic)); return kafkaConsumer; }
kafka消
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。