赞
踩
下面是 Apache Kafka 单机和集群环境部署的详细教程,包括部署过程中的注意事项以及一个使用案例。Apache Kafka 是一个分布式流处理平台,广泛用于实时数据处理、日志收集、消息队列等场景。
在 Ubuntu 中:
sudo apt update
sudo apt install openjdk-11-jdk
在 CentOS 中:
sudo yum install java-11-openjdk
验证 Java 安装:
java -version
Kafka 使用 ZooKeeper 进行节点管理和协调,需要先安装并启动 ZooKeeper。
wget https://downloads.apache.org/zookeeper/zookeeper-3.8.2/apache-zookeeper-3.8.2-bin.tar.gz
tar -xzvf apache-zookeeper-3.8.2-bin.tar.gz
mv apache-zookeeper-3.8.2-bin /usr/local/zookeeper
创建数据目录:
mkdir -p /var/lib/zookeeper
复制配置文件:
cp /usr/local/zookeeper/conf/zoo_sample.cfg /usr/local/zookeeper/conf/zoo.cfg
编辑配置文件 /usr/local/zookeeper/conf/zoo.cfg
:
dataDir=/var/lib/zookeeper
clientPort=2181
/usr/local/zookeeper/bin/zkServer.sh start
/usr/local/zookeeper/bin/zkCli.sh -server localhost:2181
在连接成功后输入 ls /
,若返回空列表([]
),则说明连接成功。
访问 Kafka 官网 下载最新版本的 Kafka。
wget https://downloads.apache.org/kafka/3.5.0/kafka_2.12-3.5.0.tgz
tar -xzvf kafka_2.12-3.5.0.tgz
mv kafka_2.12-3.5.0 /usr/local/kafka
编辑 Kafka 的配置文件 /usr/local/kafka/config/server.properties
:
# Kafka Broker ID,唯一标识符
broker.id=0
# 监听的接口和端口
listeners=PLAINTEXT://:9092
# 日志文件存储路径
log.dirs=/var/lib/kafka-logs
# Zookeeper 连接地址
zookeeper.connect=localhost:2181
mkdir -p /var/lib/kafka-logs
/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
创建一个测试 Topic:
/usr/local/kafka/bin/kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
列出 Topic:
/usr/local/kafka/bin/kafka-topics.sh --list --bootstrap-server localhost:9092
你应该看到 test-topic
在列出的 Topic 中。
zookeeper.connect
地址配置正确。listeners
配置了正确的监听地址。Kafka 集群由多个 Kafka Broker 组成,能够提供高可用性和水平扩展。
在每台服务器上按照单机部署的步骤安装 ZooKeeper,并进行以下配置:
编辑每个节点的 zoo.cfg
文件,添加如下配置:
server.1=zookeeper1:2888:3888
server.2=zookeeper2:2888:3888
server.3=zookeeper3:2888:3888
在每台服务器上创建 myid
文件,用于标识节点:
echo "1" > /var/lib/zookeeper/myid # 在 zookeeper1 上
echo "2" > /var/lib/zookeeper/myid # 在 zookeeper2 上
echo "3" > /var/lib/zookeeper/myid # 在 zookeeper3 上
在每台服务器上启动 ZooKeeper:
/usr/local/zookeeper/bin/zkServer.sh start
在每台服务器上按照单机部署的步骤安装 Kafka,并进行以下配置:
编辑每个节点的 server.properties
文件,添加如下配置:
broker.id=0 # 每个 Broker 唯一 ID
listeners=PLAINTEXT://:9092
log.dirs=/var/lib/kafka-logs
zookeeper.connect=zookeeper1:2181,zookeeper2:2181,zookeeper3:2181
在每台服务器上启动 Kafka Broker:
/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
在任一 Kafka Broker 上执行以下命令:
/usr/local/kafka/bin/kafka-topics.sh --create --topic test-topic --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --partitions 3 --replication-factor 3
列出集群中的 Topic:
/usr/local/kafka/bin/kafka-topics.sh --list --bootstrap-server kafka1:9092
查看 Topic 详细信息:
/usr/local/kafka/bin/kafka-topics.sh --describe --topic test-topic --bootstrap-server kafka1:9092
myid
,并且所有节点可以互相通信。broker.id
。在 Maven 项目中添加 Kafka 的依赖:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.5.0</version>
</dependency>
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class SimpleProducer { public static void main(String[] args) { // Kafka 生产者配置 Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig .KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // 创建生产者 Producer<String, String> producer = new KafkaProducer<>(props); // 发送消息 for (int i = 0; i < 10; i++) { ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", Integer.toString(i), "Message " + i); producer.send(record); } // 关闭生产者 producer.close(); } }
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import java.util.Collections; import java.util.Properties; public class SimpleConsumer { public static void main(String[] args) { // Kafka 消费者配置 Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); // 创建消费者 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); // 订阅主题 consumer.subscribe(Collections.singletonList("test-topic")); // 轮询消息 while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("Offset = %d, Key = %s, Value = %s%n", record.offset(), record.key(), record.value()); } } } }
编译并运行生产者:
mvn compile
mvn exec:java -Dexec.mainClass="SimpleProducer"
编译并运行消费者:
mvn exec:java -Dexec.mainClass="SimpleConsumer"
pip install kafka-python
from kafka import KafkaProducer
# 创建 Kafka 生产者
producer = KafkaProducer(bootstrap_servers='localhost:9092')
# 发送消息
for i in range(10):
producer.send('test-topic', key=bytes(str(i), encoding='utf-8'), value=bytes(f'Message {i}', encoding='utf-8'))
# 关闭生产者
producer.close()
from kafka import KafkaConsumer
# 创建 Kafka 消费者
consumer = KafkaConsumer(
'test-topic',
bootstrap_servers='localhost:9092',
group_id='test-group',
auto_offset_reset='earliest'
)
# 轮询消息
for message in consumer:
print(f'Offset = {message.offset}, Key = {message.key.decode()}, Value = {message.value.decode()}')
运行生产者:
python kafka_producer.py
运行消费者:
python kafka_consumer.py
bootstrap.servers
、key.serializer
、value.serializer
、group.id
等参数。通过以上步骤,我们成功部署了 Kafka 单机和集群环境,并实现了一个简单的生产者和消费者应用。Kafka 提供了高吞吐量、低延迟的消息传递能力,适合用于实时流处理和数据管道。
通过合理的配置和优化,Kafka 可以为应用程序提供可靠的消息传递和流处理服务,是构建实时数据管道和事件驱动架构的重要组件。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。