赞
踩
打开config/server.properties 文件,修改broker.id,listeners,port,log.dirs
vi config/server.properties
broker.id=0
listeners=PLAINTEXT://192.168.105.110:9092
port=9092
log.dirs=kafka-logs
zookeeper.connect=192.168.105.110:2181
备注:
listeners一定要配置成为IP地址;
如果配置为localhost或服务器的hostname,在使用java获取数据时会拿不到数据,或者发送数据时就会抛出异 常:org.apache.kafka.common.errors.TimeoutException: Batch Expired 。
因为在没有配置advertised.host.name 的情况下,Kafka并没有像官方文档宣称的那样改为广播我们配置的host.name,而是广播了主机配置的hostname。远端的客户端并没有配置 hosts,所以自然是连接不上这个hostname的
bin/kafka-topics.sh --create --zookeeper 192.168.105.110:2181 --replication-factor 1 --partitions 1 --topic Hello-Kafka
bin/kafka-topics.sh --list --zookeeper 192.168.105.110:2181
bin/kafka-console-producer.sh --broker-list 192.168.105.110: 9092 --topic Hello-Kafka
bin/kafka-console-consumer.sh --zookeeper 192.168.105.110:2181 --topic Hello-Kafka --from-beginning
bin/kafka-console-consumer.sh --bootstrap-server 192.168.105.110:9092 --topic Hello-Kafka --from-beginning
-
- import org.apache.kafka.clients.producer.KafkaProducer;
- import org.apache.kafka.clients.producer.Producer;
- import org.apache.kafka.clients.producer.ProducerRecord;
-
- import java.io.BufferedReader;
- import java.io.File;
- import java.io.FileReader;
- import java.util.Properties;
- import java.util.Random;
- import java.util.UUID;
-
- /**
- * @author admin
- * @title SimpleProducer
- * @projectName KafkaStreamDemo
- * @description TODO
- * @date 2019/9/1016:45
- */
- public class SimpleProducer {
- public static void main(String[] args) throws Exception {
-
- // Assign topicName to string variable
- String topicName = "Hello-Kafka";
-
- // create instance for properties to access producer configs
- Properties props = new Properties();
-
- // Assign localhost id, 参考http://kafka.apache.org/documentation/#producerapi
- props.put("bootstrap.servers", "192.168.105.110:9092");
-
- // Set acknowledgements for producer requests.
- props.put("acks", "all");
-
- // If the request fails, the producer can automatically retry,
- props.put("retries", 0);
-
- // Specify buffer size in config
- props.put("batch.size", 16384);
-
- // Reduce the no of requests less than 0
- props.put("linger.ms", 1);
-
- // The buffer.memory controls the total amount of memory available to the
- // producer for buffering.
- props.put("buffer.memory", 33554432);
-
- props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-
- props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-
- Producer<String, String> producer = new KafkaProducer<String, String>(props);
-
-
- int i = 0 ;
- while (i < 100) {
- String tempString = UUID.randomUUID().toString();
- System.out.println("----------"+tempString);
- producer.send(new ProducerRecord<String, String>(topicName, tempString));
- Thread.sleep(1000);
- i++ ;
- }
-
- System.out.println("Message sent successfully");
- producer.close();
- }
- }
-
-
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
- import org.apache.kafka.clients.consumer.KafkaConsumer;
-
-
- import java.time.Duration;
- import java.util.Arrays;
- import java.util.Collections;
- import java.util.Properties;
-
- /**
- * @author admin
- * @title SimpleConsumer
- * @projectName KafkaStreamDemo
- * @description TODO
- * @date 2019/9/1016:47
- */
- public class SimpleConsumer {
- public static void main(String[] args) throws Exception {
- // Kafka consumer configuration settings
- String topicName = "Hello-Kafka";
-
-
- Properties props = new Properties();
- props.put("bootstrap.servers", "192.168.105.110:9092");
-
-
- props.put("group.id", "CountryCounter");
-
- props.put("auto.offset.reset", "latest");
-
- props.put("enable.auto.commit", "true");
-
- props.put("auto.commit.interval.ms", "1000");
-
- props.put("session.timeout.ms", "30000");
-
- props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(props);
- // Kafka Consumer subscribes list of topics here.
- kafkaConsumer.subscribe(Collections.singletonList(topicName) );
-
- while (true) {
- ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
- for (ConsumerRecord<String, String> record : records) {
- // print the offset,key and value for the consumer records.
- System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
- }
- }
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。