赞
踩
由于Kafka是用Scala语言开发的,运行在JVM上,在安装kafka服务器之前需要先具备Java环境和zookeeper环境。我这里安装的是jdk11和zookeeper3.8.3。最近在自己学习kafka消息中间件,安装的时候遇到以下问题,经过查找一些有用资料解决了问题,现总结如下:
文章末尾附上源文件链接,自取
(1)使用xshell和xftp连接远程服务器,将jdk文件、zookeeper、kafka文件上传服务器中,并且解压缩到/usr/local目录下:
- #解压文件到指定位置
- tar -zxvf jdk-11.0.15.1_linux-x64_bin.tar.gz -C /usr/local
- #配置jdk环境变量
- vi /etc/profile
-
- ####文件最后一行添加以下环境配置####
-
- #JAVA_HOME
- export JAVA_HOME=/usr/jdk-11.0.15.1
- CLASSPATH=.:$JAVA_HOME/lib.tools.jar
- PATH=$JAVA_HOME/bin:$PATH
- export JAVA_HOME CLASSPATH PATH
-
- #使文件生效
- source /etc/profile
-
-
(2)由于上传文件中没有JRE文件,手动添加:
验证安装:
(3)有的可能会因为更改了配置文件导致linux命令失效,若没有问题则不需要配置:
这是因为在改环境变量的时候没有配置正确的原因,需要在命令行写:
export PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin:/root/bin
此时再试一下ls,可以正常执行,但是下次连接的时候可能失效,所以要将这句话写到/etc/profile文件中,然后让这个配置生效:source /etc/profile
(1)解压缩 文件到指定目录:
- tar -zxvf apache-zookeeper-3.8.3-bin.tar.gz -C /usr/local
-
- #避免启动服务报错,将config下的zoo_sample.cfg配置文件复制一份并重命名为zoo.cfg
-
- cd /usr/local
- mv apache-zookeeper-3.8.3-bin zookeeper-3.8.3
- #进入到zookeeper-3.8.3目录中
- cp config/zoo_sample.cfg ./zoo.cfg
-
- #vi /etc/profile 配置zookeeper环境变量
- #zookeeper
- export ZOOKEEPER_HOME=/usr/local/apache-zookeeper-3.8.3
- export PATH=$ZOOKEEPER_HOME/bin:$PATH
- export PATH
-
- #使配置文件生效
- source /etc/profile
-
(2)在Windows环境下启动zkServer.cmd时可能会出现闪退问题,可在zkServer.cmd启动文件中末尾添加“pause”字段。启动zookeeper服务:
- #启停zookeeper服务,Windows下执行后缀为cmd,linux上为sh
- zkServer.sh start
- zkServer.sh stop
-
- #进入客户端
- zkCli.sh
-
- #查看节点
- ls /
-
- #关闭不了zookeeper服务解决办法
- #查看占用2181端口的进程ID,发现就是jps中的QuorumPeerMain
- netstat -nltp|grep 2181
- #然后kill对应的进程号
- kill -9 21569
- tar -zxvf kafka_2.11-2.4.1.tgz -C /usr/local
- #在kafka同级目录下创建日志存放路径
- mkdir -p /usr/local/kafka_data/log/kafka
-
- #然后就是配置kafka启动配置文件,主要修改以下对应位置:
- broker.id=0 #每个kafka服务器对应的唯一broker id
- listeners=PLAINTEXT://主机地址:9092 #服务器IP地址
- log.dirs=/usr/local/kafka_data/log/kafka #日志存放路径,上面创建的目录
- zookeeper. Connect=主机地址:2181 #zookeeper地址和端口
-
(1) 配置完后我去启动kafka服务发现启动失败或者几秒就挂掉了!然后我就去设置开放防火墙端口号:
- #查看服务器防火墙状态
- systemctl status firewalld
- systemctl start firewalld.service(开启防火墙)
- systemctl stop firewalld.service(关闭防火墙)
-
- #开放9092、2181端口号
- sudo firewall-cmd --zone=public --permanent --add-port=9092/tcp
- sudo firewall-cmd --zone=public --permanent --add-port=2181/tcp
- sudo firewall-cmd --reload #重新载入配置,配置后执行此命令
-
- #查看开放的端口号
- firewall-cmd --list-ports
(2)然后还要去kafka中修改配置文件server.properties:
将“listeners=PLAINTEXT://主机地址:9092”前面添加#注释掉,在下面开启远程连接地址“advertised.listeners=PLAINTEXT://主机地址:9092” 。
- #启动顺序是先启动zookeeper再启动kafka,关闭顺序是先关闭kafka再关闭zookeeper。
- #进入到kafka bin目录中,执行;-deamon是以守护进程方式启动。
- ./kafka-server-start.sh -daemon ../config/server. Properties
-
-
- #启动后发现报一个提示内存不足的失败错误:Java HotSpot(TM) 64-Bit Server VM warning: INFO,或者就是启动后又挂掉。
- #进入到zkServer.sh中编辑:
- export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G" #修改为
- export KAFKA_HEAP_OPTS="-Xmx256M -Xms128M"
(3)验证kafka启动成功
执行jps命令可以看到kafka进程,进入zookeeper客户端可以看到broker id=0
进入到kafka bin目录下,创建一个topic:test
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
查看当前kafka中有哪些topic:
./kafka-topics.sh --list --zookeeper localhost:2181
再开启一台模拟生产者和消费者的服务器。 指定发送到kafka服务器地址和topic:
创建producer并发送消息:
./kafka-console-producer.sh --broker-list localhost:9092 --topic test
consumer消费消息,默认是消费最新消息,从指定kafka服务器地址和topic中消费消息:
- #从最后一条消息的偏移量+1开始消费,消费最新消息
- ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
- #从头开始消费
- ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic test
(1)单播消息:一个consumerGroup中只有一个消费者能消费到某一个topic中的消息,如果多个消费者同在一个消费组中,最后只有一个消费者可以收到订阅的topic中的消息。
- #为这个消费者创建一个消费组test-group
- ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=test-group --topic test
(2)多播消息:需要让一条消息给多个消费者消费,实现多播让不同的消费者处于不同的消费组。在生产者处发送消息后,这两个组中的消费者都可以收到消息。
- #创建不同的消费组,其中的消费者进行消费
- ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=test-group1 --topic test
-
- ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=test-group2 --topic test
#查看当前topic下有哪些消费组:
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
#查看消费组中的具体信息:
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-group1
#参数说明
CURRENNT-OFFSET:当前消费组已消费偏移量
LOG-END-OFFSET:主题对应分区消息的结束偏移量
LAG:当前消费组未消费消息数
以上是我初始对kafka学习的见解,后面会更多的去学习了解使用kafka消息中间件,以上可能还有欠缺之处,还望指正!总之,学习永无止境,想要学好就要坚持,细心才是重中之重(抱拳)。
链接:https://pan.baidu.com/s/1WzjA1DBoN92SmWj6afs-Vw?pwd=hhjq
提取码:hhjq
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。