赞
踩
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.1.0</version>
</dependency>
ack的默认值就是1,表示producer只要收到一个leader分区副本成功写入的通知就认为推送消息成功了。
ack=0,表示producer发送过去以后就不再管了,不管是否发送成功。
ack=-1或ack=“all”,表示producer只有收到分区内所有副本的成功写入的通知才认为推送消息成功了。
package com;
public interface KFK_INFO {
String PRODUCER_SERVER_IP = "XXXXX:9092,XXXX:9092,XXXX:9092";
String PRODUCER_ACK = "1";
String CONSUMER_SEVER_IP = "XXXX:9092";
String GROUP_ID_CONFIG = "test-consumer-group";
String KAFKA_TOPIC = "test-topic";
}
package com; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.*; import java.util.Properties; public class KafkaUtil { // 创建producer static KafkaProducer createProducer() { Properties pro = new Properties(); pro.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KFK_INFO.PRODUCER_SERVER_IP); pro.put(ProducerConfig.ACKS_CONFIG, KFK_INFO.PRODUCER_ACK); pro.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); pro.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return new KafkaProducer<String, String>(pro); } // 创建consumer static KafkaConsumer createConsumer() { Properties pro = new Properties(); pro.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KFK_INFO.CONSUMER_SEVER_IP); pro.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); pro.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); pro.put(ConsumerConfig.GROUP_ID_CONFIG, KFK_INFO.GROUP_ID_CONFIG); return new KafkaConsumer<String, String>(pro); } }
package com; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Scanner; public class Producer { public static void main(String[] args) { final KafkaProducer producer = KafkaUtil.createProducer(); Thread t1 = new Thread(new Runnable() { public void run() { try { while (true) { // 输入消息 Scanner scan = new Scanner(System.in); System.out.println("-------------请输入内容-------------"); String msg = scan.nextLine(); ProducerRecord<String, String> record = new ProducerRecord<String, String>(KFK_INFO.KAFKA_TOPIC, msg); producer.send(record); } } finally { producer.close(); } } }); t1.start(); } }
package com; import org.apache.kafka.clients.consumer.*; import java.util.Collections; public class Consumer { public static void main(String[] args) { KafkaConsumer consumer = KafkaUtil.createConsumer(); // 订阅消息 consumer.subscribe(Collections.singletonList(KFK_INFO.KAFKA_TOPIC)); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.println("-------------接收到消息-------------"); System.out.println(String.format("topic:%s ,offset:%d ,消息:%s", record.topic(), record.offset(), record.value())); } } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。