赞
踩
- Properties props = new Properties();
- props.put("bootstrap.servers", address);
- props.put("group.id", groupId);
- props.put("enable.auto.commit", "false");
- props.put("session.timeout.ms", "30000");
- props.put("auto.offset.reset", "latest");//latest ,earliest
- props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
-
- //配置JAAS(ACL),需要acl账号
- props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
- props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
- props.put("sasl.jaas.config",
- "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"ckafka-jaop7o37#test\" password=\"*****\";");
- KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(props);
- kafkaConsumer.subscribe(Collections.singleton(topic));
- return kafkaConsumer;
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。