赞
踩
Kafka是一款分布式的消息发布订阅系统,具有高性能,高吞吐量的特点而被广泛应用于大数据传输场景。它是由Linkedin公司开发,使用Scale语言开发,之后成为Apache基金会的一个顶级的项目。Kafka提供了类似JMS的特性,但特性和设计上完全不同,而且它也不是基于JMS规范的实现。
Kafka作为一个消息系统,最初设计的目的是用作Linkedin的活动流(ActivityStream)和运营数据管理通道(pipline)。活动流数据是所有的网站对用户的使用情况做分析的时候要用到的最常规的部分,活动数据包括页面的访问量(pv),被查看内容方面的信息以及搜索内容。这种数据的处理方式通常是,先把各种活动以日志的形式写入到某种文件中,然后周期性地对这些文件进行统计分析。运营数据指的是服务器的性能数据(CPU、IO使用率、请求时间、服务日志等)
由于Kafka具有更好的吞吐量、内置分区、冗余、容错性的优点(Kafka每秒可以处理几十万的消息),让Kafka成为一个很优秀的大规模消息处理应用的解决方案。在企业级应用上,Kafka主要应用于以下几个方面:
[root@util bin]# sh zkServer.
zkServer.cmd zkServer.sh
[root@util bin]# sh zkServer.sh start
/bin/java
ZooKeeper JMX enabled by default
Using config: /home/apache-zookeeper-3.8.2-bin/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[root@zk2 home]# wget https://downloads.apache.org/kafka/3.5.1/kafka_2.12-3.5.1.tgz
--2023-07-26 15:57:34-- https://downloads.apache.org/kafka/3.5.1/kafka_2.12-3.5.1.tgz
正在解析主机 downloads.apache.org (downloads.apache.org)... 88.99.95.219, 135.181.214.104, 2a01:4f8:10a:201a::2, ...
正在连接 downloads.apache.org (downloads.apache.org)|88.99.95.219|:443... 已连接。
已发出 HTTP 请求,正在等待回应... 200 OK
长度:106956505 (102M) [application/x-gzip]
正在保存至: “kafka_2.12-3.5.1.tgz”
100%[===================================================================================================================================>] 106,956,505 24.9KB/s 用时 63m 29s
2023-07-26 17:01:05 (27.4 KB/s) - 已保存 “kafka_2.12-3.5.1.tgz” [106956505/106956505])
[root@zk2 home]# ls
admin java kafka_2.12-3.5.1.tgz zookeeper
[root@zk2 home]# tar -zxvf kafka_2.12-3.5.1.tgz
kafka_2.12-3.5.1/
kafka_2.12-3.5.1/LICENSE
kafka_2.12-3.5.1/NOTICE
...
[root@zk2 home]# ls
admin java kafka_2.12-3.5.1 kafka_2.12-3.5.1.tgz zookeeper
[root@zk2 home]# vim kafka_2.12-3.5.1/config/server.properties
zookeeper.connect=192.168.50.115:2181 ## 将属性值改为目标zookeeper服务器ip:port
kafka默认存储消息的消息数据的目录为/tmp//kafka-logs,可以根据自己的实际需要,在service.propoties文件中进行配置
# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs
配置kafka环境变量,vim /etc/profile
#this is for kafka enviroment
export KAFKA_HOME=/home/kafka_2.12-3.5.1
export PATH=$KAFKA_HOME/bin:$PATH
使环境变量生效
[root@zk3 home]# source /etc/profile
[root@zk2 bin]# sh kafka-server-start.sh ../config/server.properties
[2023-07-26 17:55:14,596] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2023-07-26 17:55:18,963] INFO Setting -D jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated TLS renegotiation (org.apache.zookeeper.common.X509Util)
[2023-07-26 17:55:19,921] INFO Registered signal handlers for TERM, INT, HUP (org.apache.kafka.common.utils.LoggingSignalHandler)
...
[2023-07-26 17:55:20,915] INFO [ZooKeeperClient Kafka server] Connected. (kafka.zookeeper.ZooKeeperClient)
[2023-07-26 17:55:24,261] INFO Cluster ID = nv2qazaBQk2I3xcM0WziGQ (kafka.server.KafkaServer)
...
看到[ZooKeeperClient Kafka server] Connected启动成功
5. 另外新开一个终端,创建一个topic
[root@zk2 bin]# sh kafka-topics.sh --create --topic --quickstart-events --bootstrap-server 192.168.50.112:9092
Created topic --quickstart-events.
[root@zk2 bin]# sh kafka-topics.sh --list --bootstrap-server 192.168.50.112:9092 #查看所有的topic列表
--quickstart-events
__consumer_offsets
test
[root@zk2 bin]# sh kafka-topics.sh --describe --topic --quickstart-events --bootstrap-server 192.168.50.112:9092 #查看某一个topic的具体参数
Topic: --quickstart-events TopicId: W3k6ScaISqWEy2vaQynOPA PartitionCount: 1 ReplicationFactor: 1 Configs:
Topic: --quickstart-events Partition: 0 Leader: 0 Replicas: 0 Isr: 0
[root@zk2 bin]# sh kafka-console-producer.sh --topic quickstart-events --bootstrap-server 192.168.50.112:9092
>this is my first message
[root@zk2 bin]# sh kafka-console-consumer.sh --topic quickstart-events --bootstrap-server 192.168.50.112:9092
this is my first message
参考下图,将3台Kafka注册到3台zookeeper组成的集群上。
zookeeper | IP:port |
---|---|
zookeeper1 | 192.168.50.114:2181 |
zookeeper2 | 192.168.50.112:2181 |
zookeeper3 | 192.168.50.113:2181 |
kafka | IP:port |
---|---|
kafka1 | 192.168.50.114:9092 |
kafka2 | 192.168.50.112:9092 |
kafka3 | 192.168.50.113:9092 |
broker.id=1 #每个broker对应唯一一个broker.id
listeners = PLAINTEXT://192.168.50.112:9092 #配置本机的地址,即kafka监听的ip:port
zookeeper.connect=192.168.50.114:2181,192.168.50.112:2181,192.168.50.113:2181 #zookeeper集群地址
[root@zk2 config]# scp -r /home/kafka_2.12-3.5.1 zk1:/home/
[root@zk2 config]# scp -r /home/kafka_2.12-3.5.1 zk3:/home/
并修改对应的配置文件信息
[root@zk2 bin]# vim kafka.sh #!/bin case $1 in "start"){ for i in zk1 zk2 zk3 do echo ----------kafka $i 启动---------- ssh $i "/home/kafka_2.12-3.5.1/bin/kafka-server-start.sh -daemon /home/kafka_2.12-3.5.1/config/server.properties" done } ;; "stop"){ for i in zk1 zk2 zk3 do echo ----------kafka $i 停止---------- ssh $i "/home/kafka_2.12-3.5.1/bin/kafka-server-stop.sh" done } ;; "status"){ for i in zk1 zk2 zk3 do echo ----------kafka $i Status---------- if ping -c 1 -w 1 $i > /dev/null ; then num=$( ssh $i ps -ef | grep kafka | grep -vc grep ) if [ $num -eq 0 ]; then echo "Kafka未启动" else echo "Kafka已启动" fi else echo "kafka $i 网络断开连接,无法获取状态信息" fi done } ;; "topic"){ for i in zk1 zk2 zk3 do echo ----------kafka $i Topic---------- ssh $i "/home/kafka_2.12-3.5.1/bin/kafka-topics.sh --bootstrap-server $i:9092 --list" done } ;; esac
[root@zk2 bin]# chmod 777 zk.sh
----------kafka zk1 启动----------
----------kafka zk2 启动----------
/home/kafka_2.12-3.5.1/bin/kafka-run-class.sh: 第 346 行:exec: java: 未找到
----------kafka zk3 启动----------
/home/kafka_2.12-3.5.1/bin/kafka-run-class.sh: 第 346 行:exec: java: 未找到
问题出在:
通过ssh登陆之后会找不到JAVA_HOME ,JAVA_HOME是定义在/etc/profile 里面的。 远程登录和直接登录执行的文件是不一样的: /etc/profile: 当用户登录时,该文件被执行. /etc/bashrc: 当bash shell被打开时,该文件被执行. ssh作为远程登录的方式进入,无法触发/etc/profile的执行,找不到JAVA_HOME, 将java的配置信息配置到bashrc的文件中去,配置步骤如下所示: (1) 命令 vim ~/.bashrc 进入到文件; (2) 添加配置的环境变量: #this is for java enviroment export JAVA_HOME=/home/java/jdk8 export JRE_HOME=${JAVA_HOME}/jre export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib export PATH=${JAVA_HOME}/bin:$PATH #this is for zookeeper enviroment export ZOOKEEPER_HOME=/home/zookeeper/zookeeper-bin export PATH=$ZOOKEEPER_HOME/bin:$PATH #this is for kafka enviroment export KAFKA_HOME=/home/kafka_2.12-3.5.1 export PATH=$KAFKA_HOME/bin:$PATH (3) 命令 :source ~/.bashrc 更新 .bashrc 对该文件进行修改保存后,在执行kafka集群启动,就不会发生如上的问题
如果在启动kafka集群的时候报了如下错误
ERROR Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
kafka.common.InconsistentClusterIdException: The Cluster ID 5wuVzKv8ST-yi888x5F5kg doesn't match stored clusterId Some(b6IB_--tRdml_53SAMp_sA) in meta.properties. The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong.
at kafka.server.KafkaServer.startup(KafkaServer.scala:218)
at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)
at kafka.Kafka$.main(Kafka.scala:84)
at kafka.Kafka.main(Kafka.scala)
意思是集群id跟元数据meta.properties中存储的不一致,导致启动失败。
因此去查看meta.properties文件中的元数据信息。这个文件的存储路径是通过/config/server.properties配置文件中的log.dirs属性配置的。所以通过配置文件找到meta.properties,修改里面的cluster.id即可。
重新启动kafka集群
[root@zk2 bin]# sh kafka.sh start
----------kafka zk1 启动----------
----------kafka zk2 启动----------
----------kafka zk3 启动----------
查看每个节点的启动状态
[root@zk2 bin]# sh kafka.sh status
----------kafka zk1 Status----------
Kafka已启动
----------kafka zk2 Status----------
Kafka已启动
----------kafka zk3 Status----------
Kafka已启动
查看每个节点下的Topic
[root@zk2 kafka-logs]# sh kafka.sh topic
----------kafka zk1 Topic----------
__consumer_offsets
test
----------kafka zk2 Topic----------
__consumer_offsets
test
----------kafka zk3 Topic----------
__consumer_offsets
test
启动后的kafka集群
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。