赞
踩
1 安装kafka需要安装JDK
2 下载kafka
- cd /usr/local
- # 下载kafka安装包
- wget https://archive.apache.org/dist/kafka/3.1.0/kafka_2.13-3.1.0.tgz
3 解压
- # 解压kafka安装包
- tar -zxvf kafka_2.13-3.1.0.tgz
- mv kafka_2.13-3.1.0 kafka
4 修改配置文件
- listeners=SASL_PLAINTEXT://0.0.0.0:9092
- security.inter.broker.protocol=SASL_PLAINTEXT
- sasl.mechanism.inter.broker.protocol=PLAIN
- sasl.enabled.mechanisms=PLAIN
- allow.everyone.if.no.acl.found=false
- #超级管理员权限用户
- super.users=User:admin
- advertised.listeners=SASL_PLAINTEXT://公网IP:9092
-
- log.dirs=/tmp/kafka-logs
-
- zookeeper.connect=172.19.115.100:2181,172.19.115.99:2181,172.19.115.98:2181
5 分发kafka安装目录
- # 分发kafka安装目录给其他集群节点
- scp -r /usr/local/kafka/ 内网IP2:/usr/local
- scp -r /usr/local/kafka/ 内网IP3:/usr/local
6 配置环境
- # 导入java环境
- vim /etc/profile
- # 添加如下内容(注意:填写自己的java安装目录)
- export JAVA_HOME=/usr/java/jdk1.8.0_131
- export CLASSPATH=.:${JAVA_HOME}/jre/lib/rt.jar:${JAVA_HOME}/lib/dt.jar:${JAVA_HOME}/lib/tools.jar
- export PATH=$PATH:$JAVA_HOME/bin
7 配置zookeeper
- dataDir=/usr/local/kafka_2.13-3.1.0/zkdata
- # the port at which the clients will connect
- clientPort=2181
- # disable the per-ip limit on the number of connections since this is a non-production config
- maxClientCnxns=0
- # Disable the adminserver by default to avoid port conflicts.
- # Set the port to something non-conflicting if choosing to enable this
- admin.enableServer=false
- # admin.serverPort=8080
- tickTime=2000
- initLimit=10
- syncLimit=5
- server.0=172.19.115.100:2888:3888
- server.1=172.19.115.98:2888:3888
- server.2=172.19.115.99:2888:3888
-
- authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
- requireClientAuthScheme=sasl
- jaasLoginRenew=3600000
- #zookeeper.sasl.client=true
将dataDir = /tmp/zookeeper 目录改为 dataDir=/usr/local/kafka_2.13-3.1.0/zkdata
在kafka目录下新建 zkdata目录 mkdir zkdata
默认在tmp下的目录系统会定时清除,所以务必修改目录
8 新建/tmp/kafka-logs目录
9 在/zkdata/下 新建 myid文件,内容填写节点数字 0 其他节点分别填写 1 和2
10 在kafka/conf/下新增 kafka_server_jaas.conf
kafkaserver中定义了kafka的访问用户名密码 user_admin 必须和username password一致
client中定义了和zookeeper通信的用户名密码,必须和下方的zoo_jaas.conf里定义的一致
- KafkaServer {
- org.apache.kafka.common.security.plain.PlainLoginModule required
- username="admin"
- password="admin"
- user_admin="admin"
- user_producer="producer@123"
- user_consumer="consumer@123";
- };
-
- Client {
- org.apache.kafka.common.security.plain.PlainLoginModule required
- username="admin"
- password="admin";
- };
11 配置 consumer.properties
- # Licensed to the Apache Software Foundation (ASF) under one or more
- # contributor license agreements. See the NOTICE file distributed with
- # this work for additional information regarding copyright ownership.
- # The ASF licenses this file to You under the Apache License, Version 2.0
- # (the "License"); you may not use this file except in compliance with
- # the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- # see org.apache.kafka.clients.consumer.ConsumerConfig for more details
-
- # list of brokers used for bootstrapping knowledge about the rest of the cluster
- # format: host1:port1,host2:port2 ...
- bootstrap.servers=localhost:9092
-
- # consumer group id
- group.id=test-consumer-group
-
- ##username 和 password 对应kafka_server_jaas.conf中的用户名密码
- sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="consumer" password="consumer@123";
- security.protocol=SASL_PLAINTEXT
- sasl.mechanism=PLAIN
-
- # What to do when there is no initial offset in Kafka or if the current
- # offset does not exist any more on the server: latest, earliest, none
- #auto.offset.reset=
12 配置 producer.properties
- # Licensed to the Apache Software Foundation (ASF) under one or more
- # contributor license agreements. See the NOTICE file distributed with
- # this work for additional information regarding copyright ownership.
- # The ASF licenses this file to You under the Apache License, Version 2.0
- # (the "License"); you may not use this file except in compliance with
- # the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- # see org.apache.kafka.clients.producer.ProducerConfig for more details
-
- ############################# Producer Basics #############################
-
- # list of brokers used for bootstrapping knowledge about the rest of the cluster
- # format: host1:port1,host2:port2 ...
- bootstrap.servers=localhost:9092
-
- # specify the compression codec for all data generated: none, gzip, snappy, lz4, zstd
- compression.type=none
-
- sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="producer" password="producer@123";
- security.protocol=SASL_PLAINTEXT
- sasl.mechanism=PLAIN
-
- # name of the partitioner class for partitioning events; default partition spreads data randomly
- #partitioner.class=
-
- # the maximum amount of time the client will wait for the response of a request
- #request.timeout.ms=
-
- # how long `KafkaProducer.send` and `KafkaProducer.partitionsFor` will block for
- #max.block.ms=
-
- # the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together
- #linger.ms=
-
- # the maximum size of a request in bytes
- #max.request.size=
-
- # the default batch size in bytes when batching multiple records sent to a partition
- #batch.size=
-
- # the total bytes of memory the producer can use to buffer records waiting to be sent to the server
- #buffer.memory=
13 kafka_consumer_jaas.conf
- Client {
- org.apache.kafka.common.security.plain.PlainLoginModule required
- username="consumer"
- password="consumer@123";
- };
14 kafka_producer_jaas.conf
- Client {
- org.apache.kafka.common.security.plain.PlainLoginModule required
- username="producer"
- password="producer@123";
- };
15 zoo_jaas.conf zookeeper和kafka通信的用户名密码,必须user_admin 和username password必须一致
- Server {
- org.apache.kafka.common.security.plain.PlainLoginModule required
- username="admin"
- password="admin"
- user_admin="admin";
- };
16 sasl.properties
- security.protocol=SASL_PLAINTEXT
- sasl.mechanism=PLAIN
- sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin";
17 修改bin下命令
- #zookeeper-server-start.sh
-
- export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka/config/zoo_jaas.conf"
- exec $base_dir/kafka-run-class.sh $EXTRA_ARGS org.apache.zookeeper.server.quorum.QuorumPeerMain "$@"
-
-
- #kafka-server-start.sh
-
- export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka/config/kafka_server_jaas.conf"
- exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
-
-
- #kafka-console-consumer.sh
-
- export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka/config/kafka_server_jaas.conf"
- exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@"
-
-
-
- #kafka-console-producer.sh
-
- export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka/config/kafka_server_jaas.conf"
- exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"
-
-
-
- #kafka-topics.sh
-
- export KAFKA_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=/usr/local/kafka/config/kafka_server_jaas.conf"
- exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand "$@"
18 kafka命令汇总
-
- 切换主目录
- cd /usr/local/kafka/
-
- 开启zk
- ./bin/zookeeper-server-start.sh ./config/zookeeper.properties
- 守护进程方式开启
- ./bin/zookeeper-server-start.sh -daemon ./config/zookeeper.properties
-
- 开启kafka
- ./bin/kafka-server-start.sh ./config/server.properties
- 守护方式开启
- ./bin/kafka-server-start.sh -daemon ./config/server.properties
-
- 停止kafka
- ./bin/kafka-server-stop.sh
- 停止zk
- ./bin/zookeeper-server-stop.sh
-
- 开启顺序:先启动zk,在启动kafka
- 停止顺序:先停止kafka,再停止zk
-
- 查看服务运行状态
- jps
-
- 强关服务
- kill -s kill 【pid】
-
-
- 查看topic列表
- ./bin/kafka-topics.sh --list --bootstrap-server 172.19.115.100:9092 --command-config ./config/sasl.properties
-
- 查看topic详情
- ./bin/kafka-topics.sh --describe --bootstrap-server 172.19.115.100:9092 --command-config ./config/sasl.properties
-
- 创建topic 3分区 3副本
- ./bin/kafka-topics.sh --create --bootstrap-server 172.19.115.100:9092 --replication-factor 3 --partitions 3 --topic chint02 --command-config ./config/sasl.properties
-
- 生产消息
- ./bin/kafka-console-producer.sh --broker-list 172.19.115.100:9092 --topic chint01 -producer.config ./config/producer.properties
-
- 消费消息
- ./bin/kafka-console-consumer.sh --bootstrap-server 172.19.115.100:9092 --topic chint01 --group test-consumer-group --consumer.config ./config/consumer.properties
-
- 查看group列表
- bin/kafka-consumer-groups.sh --bootstrap-server 172.19.115.100:9092 --list --command-config ./config/sasl.properties
-
- 查看 Group 详情
- bin/kafka-consumer-groups.sh --bootstrap-server 172.19.115.100:9092 --group test-consumer-group --describe --command-config ./config/sasl.properties
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。