赞
踩
目录
04.查看ids,kafka的broker.id以及节点下存储的详细信息
4.测试Kafka中创建Topic,Zookeeper中的接受情况
- yum search java | greo jdk
- yum install -y java-1.8.0-openjdk
- java -verison
- # 下载包
- mkdir /usr/local/java && cd /usr/local/java
- wget https://download.oracle.com/java/22/latest/jdk-22_linux-x64_bin.tar.gz
- tar -zxvf jdk-22_linux-x64_bin.tar.gz
-
- # 配置环境变量
- vi /etc/profile
- #set jdk
- JAVA_HOME=/usr/local/java/jdk-22.0.1
- CLASSPATH=$JAVA_HOME/lib/tools.jar
- PATH=$JAVA_HOME/bin:$PATH
- export JAVA_HOME CLASSPATH PATH
-
- # 使配置生效
- source /etc/profile
-
- # 查看版本
- java -version
- # zookeeper 官网
- https://zookeeper.apache.org/
- # 下载安装最新稳定版 3.8.4 版本
- wget https://dlcdn.apache.org/zookeeper/zookeeper-3.8.4/apache-zookeeper-3.8.4-bin.tar.gz --no-check-certificate
- #创建工作目录
- mkdir /app/zookeeper
- tar -zxvf apache-zookeeper-3.8.4-bin.tar.gz
- #声明环境变量
- vim /etc/profile
- #zookeeper
- export ZOOKEEPER_HOME=/app/zookeeper/apache-zookeeper-3.8.4-bin
- export PATH=$PATH:${ZOOKEEPER_HOME}/bin
- # 使环境变量生效
- source /etc/profile
- #配置zookeeper
- cd /app/zookeeper/apache-zookeeper-3.8.4-bin/conf
-
- cp -p zoo_sample.cfg zoo.cfg
-
- vim zoo.cfg
- #用于计算的时间单元,以毫秒为单位,比如session超时:N*tickTime
- tickTime=2000
- #用于集群,允许从节点链接并同步到master节点的初始化连接时间,以tickTime的倍数来表示
- initLimit=10
- #用于集群,master主节点与从节点之间发送消息,请求和应答时间长度(心跳机制)
- syncLimit=5
- #连接服务器的端口,默认是2181
- clientPort=2181
-
- #存放数据文件夹
- dataDir=/app/zookeeper/apache-zookeeper-3.8.4-bin/data
- #添加日志存放文件夹
- dataLogDir=/app/zookeeper/apache-zookeeper-3.8.4-bin/dataLog
- #集群信息,地址:LF通信端口:选举端口
- server.1=10.88.62.179:2888:3888
- server.2=10.88.62.180:2888:3888
- server.3=10.88.62.181:2888:3888
- #在/app/zookeeper/apache-zookeeper-3.8.4-bin/data数据目录下创建新文件 myid ,分别写入1,2,3
- #主节点
- echo 1 > /app/zookeeper/apache-zookeeper-3.8.4-bin/data/myid
-
- #从节点1
- echo 2 > /app/zookeeper/apache-zookeeper-3.8.4-bin/data/myid
-
- #从节点2
- echo 3 > /app/zookeeper/apache-zookeeper-3.8.4-bin/data/myid
- #启动zk服务
- zkServer.sh start
-
- #查看集群状态
- zkServer.sh status
- # 官网地址
- https://kafka.apache.org/downloads
- https://kafka.apache.org/documentation.html#gettingStarted
- # 下载3.7.0版本
- wget https://www.apache.org/dist/kafka/3.7.0/kafka_2.12-3.7.0.tgz --no-check-certificate
- #创建工作目录
- mkdir /app/kafka
- tar -zxvf kafka_2.13-3.7.0.tgz
- #配置环境变量
- vim /etc/profile
- #kafka
- KAFKA_HOME=/app/kafka/kafka_2.13-3.7.0/
- PATH=$PATH:$KAFKA_HOME/bin/
- #配置文件
- cd /app/kafka/kafka_2.13-3.7.0/config
- vim server.properties
- #broker 的全局唯一id值,用于集群下每个 kafka 的唯一标识,可以是任意的整数值,默认为 0。
- #三个节点,分别修改为1,2,3
- broker.id=0
-
- #删除 topic 时是否物理删除。默认为 false 或者无此配置项(此时手动添加即可)
- #1、如果没有配置 delete.topic.enable,或者值为 false,则删除 topic 时是标记删除,不是真正的物理删除,在 log.dirs 配置的目录下仍然能看到数据,以及在 zk 服务器的 /brokers/topics 路径也能看到数据。
- #2、想要删除 topic 时真正的物理删除,此必须配置为 true.
- delete.topic.enable=true
-
- #处理网络请求与响应的线程数量,默认为 3
- num.network.threads=3
-
- #服务器用于处理请求的线程数,可能包括磁盘I/O,默认为 8
- num.io.threads=8
-
- #发送套接字的缓冲区大小,默认为 102400,100 kb
- socket.send.buffer.bytes=102400
-
- #接收套接字的缓冲区大小,默认为 102400,100 kb
- socket.receive.buffer.bytes=102400
-
- #请求套接字的缓冲区大小,默认为 104857600,100M
- socket.request.max.bytes=104857600
-
- #kafka 运行日志存放的路径,改成自定义的即可,kafka 以日志的形式存储数据,这个路径不能随意删除。
- log.dirs=/app/kafka/kafka_2.13-3.7.0/logs
-
- #topic在当前broker上的分区个数,默认为 1
- num.partitions=1
-
- #用来恢复和清理data下数据的线程数量,默认为 1
- num.recovery.threads.per.data.dir=1
-
- #日志文件保留的最长时间,超时将被删除,默认 168 小时,7 天
- log.retention.hours=168
-
- #日志文件最大大小,超过时会新建
- log.segment.bytes=1073741824
-
- #基于大小的日志(数据)保留策略,当存储的数据量超过此大小时,就删除旧数据。默认为 1G。此配置默认是被注释的。
- log.retention.bytes=1073741824
-
- #检查日志段以查看是否可以根据保留策略删除日志段的间隔
- log.retention.check.interval.ms=300000
-
- #配置连接 Zookeeper 集群地址,默认为 localhost:2181
- zookeeper.connect=10.88.62.179:2181,10.88.62.180:2181,10.88.62.181:2181
-
- #连接到zookeeper的超时(毫秒),默认 18 秒
- zookeeper.connection.timeout.ms=18000
- #三个kafka节点,添加hosts文件,解析主机名
- vim /etc/hosts
- 10.88.62.182 iZl4y0116dv9vahi5mtdjtZ
- 10.88.62.183 iZl4y0116dv9vahi5mtdjuZ
- 10.88.62.184 iZl4y0116dv9vahi5mtdjvZ
- #前台启动
- kafka-server-start.sh /app/kafka/kafka_2.13-3.7.0/config/server.properties
- #后台启动
- nohup kafka-server-start.sh -daemon /app/kafka/kafka_2.13-3.7.0/config/server.properties &
- #停止
- kafka-server-stop.sh
- #列出所有topic
- kafka-topics.sh --list --bootstrap-server 10.88.62.182:9092,10.88.62.183:9092,10.88.62.184:9092
- #查看topic详细信息
- kafka-topics.sh --bootstrap-server 10.88.62.182:9092,10.88.62.183:9092,10.88.62.184:9092 --describe --topic mytopic
- # 调用生产者生产消息
- kafka-console-producer.sh --broker-list 10.88.62.182:9092,10.88.62.183:9092,10.88.62.184:9092 --topic mytopic
- # 调用消费者消费消息,from-beginning表示读取全部的消息
- vim config/consumer.properties
-
- # 配置 kafka服务地址,多个服务使用逗号分隔
- bootstrap.servers=localhost:9092
- # 消费者组id
- group.id=test-consumer-group
- #启动消费者
- kafka-console-consumer.sh --bootstrap-server 10.88.62.182:9092,10.88.62.183:9092,10.88.62.184:9092 --topic mytopic --from-beginning
- #消费者拉取消息
- kafka-console-consumer.sh --bootstrap-server 10.88.62.182:9092,10.88.62.183:9092,10.88.62.184:9092 --topic test --consumer.config ../config/consumer.properties
- # 删除topic
- kafka-topics.sh --bootstrap-server 10.88.62.182:9092,10.88.62.183:9092,10.88.62.184:9092 --delete --topic mytopic
kafka-topics.sh --create --topic my-topic --bootstrap-server 10.88.62.182:9092,10.88.62.183:9092,10.88.62.184:9092 --partitions 3 --replication-factor 1
- import org.apache.kafka.clients.producer.KafkaProducer;
- import org.apache.kafka.clients.producer.ProducerConfig;
- import org.apache.kafka.clients.producer.ProducerRecord;
- import org.apache.kafka.common.serialization.StringSerializer;
-
- import java.util.Properties;
-
- public class KafkaProducerExample {
-
- public static void main(String[] args) {
- Properties properties = new Properties();
- properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.88.62.182:9092,10.88.62.183:9092,10.88.62.184:9092");
- properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
- properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
-
- KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
- String topic = "my-topic";
-
- for (int i = 0; i < 10; i++) {
- String message = "Message " + i;
- producer.send(new ProducerRecord<>(topic, message));
- System.out.println("Sent: " + message);
- }
-
- producer.close();
- }
- }
- import org.apache.kafka.clients.consumer.Consumer;
- import org.apache.kafka.clients.consumer.ConsumerConfig;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
- import org.apache.kafka.clients.consumer.KafkaConsumer;
- import org.apache.kafka.common.serialization.StringDeserializer;
-
- import java.time.Duration;
- import java.util.Collections;
- import java.util.Properties;
-
- public class KafkaConsumerExample {
-
- public static void main(String[] args) {
- Properties properties = new Properties();
- properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.88.62.182:9092,10.88.62.183:9092,10.88.62.184:9092");
- properties.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
- properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
- properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
-
- Consumer<String, String> consumer = new KafkaConsumer<>(properties);
- String topic = "my-topic";
-
- consumer.subscribe(Collections.singletonList(topic));
-
- while (true) {
- ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
- records.forEach(record -> {
- System.out.println("Received: " + record.value());
- });
- }
- }
- }
- #打开一个终端窗口,运行 Kafka 生产者示例
- java KafkaProducerExample
-
- #打开另一个终端窗口,运行 Kafka 消费者示例:
- java KafkaConsumerExample
- #Docker拉取Zookeeper的镜像文件
- docker pull wurstmeister/zookeeper
-
- #Docker创建 bridge 网络,创建自定义网络
- docker network create --driver bridge --subnet 172.18.0.0/16 zookeeper_network
-
- #查看已经存在的网络
- docker network ls
主节点创建目录
- # 创建 zookeeper 主节点配置存放目录
- mkdir -p /mydata/zookeeper_kafka/master/conf
- # 创建 zookeeper 主节点数据存放目录
- mkdir -p /mydata/zookeeper_kafka/master/data
- # 创建 zookeeper 主节点数据日志存放目录
- mkdir -p /mydata/zookeeper_kafka/master/datalog
- # 创建 zookeeper 主节点日志存放目录
- mkdir -p /mydata/zookeeper_kafka/master/logs
从节点1创建目录
- # 创建 zookeeper 节点1 配置存放目录
- mkdir -p /mydata/zookeeper_kafka/node1/conf
- # 创建 zookeeper 节点1 数据存放目录
- mkdir -p /mydata/zookeeper_kafka/node1/data
- # 创建 zookeeper 节点1 数据日志存放目录
- mkdir -p /mydata/zookeeper_kafka/node1/datalog
- # 创建 zookeeper 节点1 日志存放目录
- mkdir -p /mydata/zookeeper_kafka/node1/logs
从节点2创建目录
- # 创建 zookeeper 节点2 配置存放目录
- mkdir -p /mydata/zookeeper_kafka/node2/conf
- # 创建 zookeeper 节点2 数据存放目录
- mkdir -p /mydata/zookeeper_kafka/node2/data
- # 创建 zookeeper 节点2 数据日志存放目录
- mkdir -p /mydata/zookeeper_kafka/node2/datalog
- # 创建 zookeeper 节点2 日志存放目录
- mkdir -p /mydata/zookeeper_kafka/node2/logs
主节点创建配置文件
- # zookeeper 主节点配置存放目录
- cd /mydata/zookeeper_kafka/master/conf
- # 编辑配置文件
- vim zoo.cfg
-
- #创建myid文件
- echo 1 > /mydata/zookeeper_kafka/master/data/myid
从节点1配置文件
- # zookeeper 节点1 配置存放目录
- cd /mydata/zookeeper_kafka/node1/conf
- # 编辑配置文件
- vim zoo.cfg
-
- #创建myid文件
- echo 2 > /mydata/zookeeper_kafka/node1/data/myid
从节点2配置文件
- # zookeeper 节点2 配置存放目录
- cd /mydata/zookeeper_kafka/node2/conf
- # 编辑配置文件
- vim zoo.cfg
-
- #创建myid文件
- echo 3 > /mydata/zookeeper_kafka/node2/data/myid
- 补充:也可以直接使用cp命令,将文件复制拷贝到其他目录下:
-
- cp zoo.cfg /mydata/zookeeper_kafka/node1/conf
- cp zoo.cfg /mydata/zookeeper_kafka/node2/conf
- cd /mydata/zookeeper_kafka/node2/conf
- # Zookeeper保存数据的目录,默认情况下,Zookeeper将写数据的日志文件也保存在这个目录里
- dataDir=/opt/zookeeper-3.4.13/data
-
- # 事物日志存储地点,如果没提供的话使用的则是 dataDir
- dataLogDir=/opt/zookeeper-3.4.13/datalog
-
- # 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳。tickTime以毫秒为单位
- tickTime=2000
-
- # 集群中的follower服务器(F)与leader服务器(L)之间初始连接时能容忍的最多心跳数(tickTime的数量)
- initLimit=5
-
- # 集群中的follower服务器与leader服务器之间请求和应答之间能容忍的最多心跳数(tickTime的数量)
- syncLimit=2
-
- # 默认值为3,不支持以系统属性方式配置。用于配置Zookeeper在自动清理的时候需要保留的快照数据文件数量和对应的事务日志文件。此参数的最小值为3,如果配置的值小于3会自动调整到3
- autopurge.snapRetainCount=3
-
- # 默认值为0,单位为小时,不支持以系统属性方式配置。用于配置Zookeeper进行历史文件自动清理的频率。如果配置为0或负数,表示不需要开启定时清理功能
- autopurge.purgeInterval=0
-
- # 默认为60,不支持以系统属性方式配置。从Socket层面限制单个客户端与单台服务器之间的并发连接数,即以ip地址来进行连接数的限制。
- # 如果设置为0,表示不做任何限制。仅仅是单台客户端与单个Zookeeper服务器连接数的限制,不能控制所有客户端的连接数总和
- maxClientCnxns=60
-
- # 3.5.0中的新功能:当设置为false时,可以在复制模式下启动单个服务器,单个参与者可以使用观察者运行,并且群集可以重新配置为一个节点,并且从一个节点。
- # 对于向后兼容性,默认值为true。可以使用QuorumPeerConfig的setStandaloneEnabled方法或通过将“standaloneEnabled = false”或“standaloneEnabled = true”添加到服务器的配置文件来设置它。
- standaloneEnabled=false
-
- # 内嵌的管理控制台,停用这个服务
- admin.enableServer=false
-
- # 开启四字命令,将所有命令添加到白名单中
- 4lw.commands.whitelist=*
-
- # 服务端口
- clientPort=2181
-
- # 集群中服务的列表
- server.1=172.18.0.6:2888:3888
- server.2=172.18.0.7:2888:3888
- server.3=172.18.0.8:2888:3888
- # 启动命令
- docker run -d --restart always \
- --name zookeeper_kafka_master \
- --network zookeeper_network \
- --ip 172.18.0.6 \
- -p 2181:2181 \
- -e ZOO_MY_ID=1 \
- -v /mydata/zookeeper_kafka/master/conf/zoo.cfg:/opt/zookeeper-3.4.13/conf/zoo.cfg \
- -v /mydata/zookeeper_kafka/master/data:/opt/zookeeper-3.4.13/data \
- -v /mydata/zookeeper_kafka/master/datalog:/opt/zookeeper-3.4.13/datalog \
- -v /mydata/zookeeper_kafka/master/logs:/opt/zookeeper-3.4.13/logs \
- wurstmeister/zookeeper
- # 启动命令
- docker run -d --restart always \
- --name zookeeper_kafka_node1 \
- --network zookeeper_network \
- --ip 172.18.0.7 \
- -p 2182:2181 \
- -e ZOO_MY_ID=2 \
- -v /mydata/zookeeper_kafka/node1/conf/zoo.cfg:/opt/zookeeper-3.4.13/conf/zoo.cfg \
- -v /mydata/zookeeper_kafka/node1/data:/opt/zookeeper-3.4.13/data \
- -v /mydata/zookeeper_kafka/node1/datalog:/opt/zookeeper-3.4.13/datalog \
- -v /mydata/zookeeper_kafka/node1/logs:/opt/zookeeper-3.4.13/logs \
- wurstmeister/zookeeper
- # 启动命令
- docker run -d --restart always \
- --name zookeeper_kafka_node2 \
- --network zookeeper_network \
- --ip 172.18.0.8 \
- -p 2183:2181 \
- -e ZOO_MY_ID=3 \
- -v /mydata/zookeeper_kafka/node2/conf/zoo.cfg:/opt/zookeeper-3.4.13/conf/zoo.cfg \
- -v /mydata/zookeeper_kafka/node2/data:/opt/zookeeper-3.4.13/data \
- -v /mydata/zookeeper_kafka/node2/datalog:/opt/zookeeper-3.4.13/datalog \
- -v /mydata/zookeeper_kafka/node2/logs:/opt/zookeeper-3.4.13/logs \
- wurstmeister/zookeeper
- # 在容器 zookeeper-master 中开启一个交互模式的终端
- docker exec -it zookeeper_kafka_master /bin/bash
-
- # 查看 zookeeper 状态
- cd bin
- zkServer.sh status
docker pull wurstmeister/kafka
docker run -d --name kafka_zookeeper_master --network zookeeper_network -p 9096:9096 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=172.18.0.6:2181,172.18.0.7:2181,172.18.0.8:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://10.0.4.9:9096 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9096 wurstmeister/kafka
docker run -d --name kafka_zookeeper_node1 --network zookeeper_network -p 9097:9097 -e KAFKA_BROKER_ID=1 -e KAFKA_ZOOKEEPER_CONNECT=172.18.0.6:2181,172.18.0.7:2181,172.18.0.8:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://10.0.4.9:9097 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9097 wurstmeister/kafka
docker run -d --name kafka_zookeeper_node2 --network zookeeper_network -p 9098:9098 -e KAFKA_BROKER_ID=2 -e KAFKA_ZOOKEEPER_CONNECT=172.18.0.6:2181,172.18.0.7:2181,172.18.0.8:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://10.0.4.9:9098 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9098 wurstmeister/kafka
zkCli.sh
- ls /
- get /controller
ls /brokers
- ls /brokers
- ls /brokers/ids
- ls /brokers/ids/0
- ls /brokers/ids/1
- ls /brokers/ids/2
kafka-topics.sh --create --topic topic_example_1 --zookeeper 172.18.0.6:2181,172.18.0.7:2181,172.18.0.8:2181 --replication-factor 3 --partitions 3
- #节点123都要查看
- ls /brokers
- ls /brokers/topics
- ls /brokers/topics/topic_ex
- ...
kafka-topics.sh --zookeeper 172.18.0.6:2181,172.18.0.7:2181,172.18.0.8:2181 --list
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。