赞
踩
At the end of the document, some bugs are recorded
from https://kafka.apache.org/downloads download .tgz binary package to local and extract
edit config/server.properties file:
broker.id=0
log.dirs=/tmp/kafka-logs # or in window use D:\\tmp\\kafka-logs
zookeeper.connect=localhost:2181
listeners=PLAINTEXT://:9092
edit config/zookeeper.properties file:
dataDir=/bigdata/zk # in win use D:\\bigdata\\zk
Make sure Zookeeper and Kafka server are running.
.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
.\bin\windows\kafka-server-start.bat .\config\server.properties
Before producing and consuming messages, need a topic.
.\bin\windows\kafka-topics.bat --create --topic test --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
use the Kafka console producer to send messages to the topic.
Open a new Command Prompt and run:
.\bin\windows\kafka-console-producer.bat --topic test --bootstrap-server localhost:9092
Type messages in the console to send them to the Kafka topic.
Open another Command Prompt to start the consumer that reads messages from the topic.
.\bin\windows\kafka-console-consumer.bat --topic test --bootstrap-server localhost:9092 --from-beginning
should see messages in the consumer console as type them in the producer console.
Connecting Kafka with Code
Here are examples in Java and Python.
First, add Kafka client dependencies topom.xml if using Maven:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.6.2</version>
</dependency>
Producer Example
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class KafkaProducerExample { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> producer = new KafkaProducer<>(props); producer.send(new ProducerRecord<>("test", "key", "value")); producer.close(); } }
Consumer Example
import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class KafkaConsumerExample { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test-group"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("test")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); records.forEach(record -> { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); }); } } }
First, install the Kafka Python client:
pip install kafka-python
Producer Example
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:9092')
producer.send('test', b'Hello, Kafka!')
producer.close()
Consumer Example
from kafka import KafkaConsumer
consumer = KafkaConsumer('test', bootstrap_servers='localhost:9092', auto_offset_reset='earliest')
for message in consumer:
print(f"Key: {message.key}, Value: {message.value}")
Note
‘wmic’ is not recognized as an internal or external command, operable program or batch file
click Environment Variables. In the section for system variables, find PATH (or any capitalization thereof). Add this entry to it:
%SystemRoot%\System32\Wbem
ERROR Exiting Kafka due to fatal exception during startup. (kafka.Kafka$) java.nio.file.InvalidPathException: Illegal char < > at index 2: D: mpdownloadkafkakafka_2.13-3.6.2log\meta.properties.tmp
Correct the Path Format:
Ensure that the path specified in configuration does not contain illegal characters or spaces. Paths in Windows should use double backslashes \ or a single forward slash /.
WARN [SocketServer listenerType=ZK_BROKER, nodeId=0] Unexpected error from /0:0:0:0:0:0:0:1 (channelId=0:0:0:0:0:0:0:1:9092-0:0:0:0:0:0:0:1:62710-1); closing connection (org.apache.kafka.common.network.Selector) org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size = 1195725856 larger than 104857600)
Edit server.properties:
Open the server.properties file in Kafka config directory and increase the max.request.size property. Add or modify the following lines:
max.request.size=209715200 # Increase this value as needed, default is 104857600 (100MB)
socket.request.max.bytes=209715200 # Ensure this matches or exceeds max.request.size
Edit consumer.properties and producer.properties (if applicable):
If have consumer and producer configurations, ensure that these properties are set appropriately there as well:
max.request.size=209715200
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。