当前位置:   article > 正文

Linux安装kafka服务器(已解决启动失败问题)及消息分发测试_linux测试kafka

linux测试kafka

一、提前安装基本环境

由于Kafka是用Scala语言开发的,运行在JVM上,在安装kafka服务器之前需要先具备Java环境和zookeeper环境。我这里安装的是jdk11和zookeeper3.8.3。最近在自己学习kafka消息中间件,安装的时候遇到以下问题,经过查找一些有用资料解决了问题,现总结如下:

文章末尾附上源文件链接,自取


1、对应需要的文件可到Apache官网上根据需求选择下载。

(1)使用xshell和xftp连接远程服务器,将jdk文件、zookeeper、kafka文件上传服务器中,并且解压缩到/usr/local目录下:

2、安装jdk环境

  1. #解压文件到指定位置
  2. tar -zxvf jdk-11.0.15.1_linux-x64_bin.tar.gz -C /usr/local
  3. #配置jdk环境变量
  4. vi /etc/profile
  5. ####文件最后一行添加以下环境配置####
  6. #JAVA_HOME
  7. export JAVA_HOME=/usr/jdk-11.0.15.1
  8. CLASSPATH=.:$JAVA_HOME/lib.tools.jar
  9. PATH=$JAVA_HOME/bin:$PATH
  10. export JAVA_HOME CLASSPATH PATH
  11. #使文件生效
  12. source /etc/profile

(2)由于上传文件中没有JRE文件,手动添加:

 

验证安装:

(3)有的可能会因为更改了配置文件导致linux命令失效,若没有问题则不需要配置:

这是因为在改环境变量的时候没有配置正确的原因,需要在命令行写:

export PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin:/root/bin

此时再试一下ls,可以正常执行,但是下次连接的时候可能失效,所以要将这句话写到/etc/profile文件中,然后让这个配置生效:source /etc/profile

3、安装zookeeper环境

(1)解压缩 文件到指定目录:

  1. tar -zxvf apache-zookeeper-3.8.3-bin.tar.gz -C /usr/local
  2. #避免启动服务报错,将config下的zoo_sample.cfg配置文件复制一份并重命名为zoo.cfg
  3. cd /usr/local
  4. mv apache-zookeeper-3.8.3-bin zookeeper-3.8.3
  5. #进入到zookeeper-3.8.3目录中
  6. cp config/zoo_sample.cfg ./zoo.cfg
  7. #vi /etc/profile 配置zookeeper环境变量
  8. #zookeeper
  9. export ZOOKEEPER_HOME=/usr/local/apache-zookeeper-3.8.3
  10. export PATH=$ZOOKEEPER_HOME/bin:$PATH
  11. export PATH
  12. #使配置文件生效
  13. source /etc/profile

(2)在Windows环境下启动zkServer.cmd时可能会出现闪退问题,可在zkServer.cmd启动文件中末尾添加“pause”字段。启动zookeeper服务:

  1. #启停zookeeper服务,Windows下执行后缀为cmd,linux上为sh
  2. zkServer.sh start
  3. zkServer.sh stop
  4. #进入客户端
  5. zkCli.sh
  6. #查看节点
  7. ls /
  8. #关闭不了zookeeper服务解决办法
  9. #查看占用2181端口的进程ID,发现就是jps中的QuorumPeerMain
  10. netstat -nltp|grep 2181
  11. #然后kill对应的进程号
  12. kill -9 21569

4、安装kafka服务器

  1. tar -zxvf kafka_2.11-2.4.1.tgz -C /usr/local
  2. #在kafka同级目录下创建日志存放路径
  3. mkdir -p /usr/local/kafka_data/log/kafka
  4. #然后就是配置kafka启动配置文件,主要修改以下对应位置:
  5. broker.id=0 #每个kafka服务器对应的唯一broker id
  6. listeners=PLAINTEXT://主机地址:9092 #服务器IP地址
  7. log.dirs=/usr/local/kafka_data/log/kafka #日志存放路径,上面创建的目录
  8. zookeeper. Connect=主机地址:2181 #zookeeper地址和端口

(1) 配置完后我去启动kafka服务发现启动失败或者几秒就挂掉了!然后我就去设置开放防火墙端口号:

  1. #查看服务器防火墙状态
  2. systemctl status firewalld
  3. systemctl start firewalld.service(开启防火墙)
  4. systemctl stop firewalld.service(关闭防火墙)
  5. #开放9092、2181端口号
  6. sudo firewall-cmd --zone=public --permanent --add-port=9092/tcp
  7. sudo firewall-cmd --zone=public --permanent --add-port=2181/tcp
  8. sudo firewall-cmd --reload #重新载入配置,配置后执行此命令
  9. #查看开放的端口号
  10. firewall-cmd --list-ports

(2)然后还要去kafka中修改配置文件server.properties:

将“listeners=PLAINTEXT://主机地址:9092”前面添加#注释掉,在下面开启远程连接地址“advertised.listeners=PLAINTEXT://主机地址:9092” 。

  1. #启动顺序是先启动zookeeper再启动kafka,关闭顺序是先关闭kafka再关闭zookeeper。
  2. #进入到kafka bin目录中,执行;-deamon是以守护进程方式启动。
  3. ./kafka-server-start.sh -daemon ../config/server. Properties
  4. #启动后发现报一个提示内存不足的失败错误:Java HotSpot(TM) 64-Bit Server VM warning: INFO,或者就是启动后又挂掉。
  5. #进入到zkServer.sh中编辑:
  6. export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G" #修改为
  7. export KAFKA_HEAP_OPTS="-Xmx256M -Xms128M"

(3)验证kafka启动成功

执行jps命令可以看到kafka进程,进入zookeeper客户端可以看到broker id=0

二、模拟创建生产者topic和消费者topic

1、创建主题topic

进入到kafka bin目录下,创建一个topic:test

./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

查看当前kafka中有哪些topic:

./kafka-topics.sh --list --zookeeper localhost:2181

2、模拟发送消息和消费消息

再开启一台模拟生产者和消费者的服务器。 指定发送到kafka服务器地址和topic:

创建producer并发送消息:

./kafka-console-producer.sh --broker-list localhost:9092 --topic test

consumer消费消息,默认是消费最新消息,从指定kafka服务器地址和topic中消费消息:

  1. #从最后一条消息的偏移量+1开始消费,消费最新消息
  2. ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
  3. #从头开始消费
  4. ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic test

 3、单播消息和多播消息

(1)单播消息:一个consumerGroup中只有一个消费者能消费到某一个topic中的消息,如果多个消费者同在一个消费组中,最后只有一个消费者可以收到订阅的topic中的消息。

  1. #为这个消费者创建一个消费组test-group
  2. ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=test-group --topic test

(2)多播消息:需要让一条消息给多个消费者消费,实现多播让不同的消费者处于不同的消费组。在生产者处发送消息后,这两个组中的消费者都可以收到消息。

  1. #创建不同的消费组,其中的消费者进行消费
  2. ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=test-group1 --topic test
  3. ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=test-group2 --topic test

4、查看消息组和信息

#查看当前topic下有哪些消费组:
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

#查看消费组中的具体信息:
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-group1

#参数说明
CURRENNT-OFFSET:当前消费组已消费偏移量
LOG-END-OFFSET:主题对应分区消息的结束偏移量
LAG:当前消费组未消费消息数

三、总结

以上是我初始对kafka学习的见解,后面会更多的去学习了解使用kafka消息中间件,以上可能还有欠缺之处,还望指正!总之,学习永无止境,想要学好就要坚持,细心才是重中之重(抱拳)。

链接:https://pan.baidu.com/s/1WzjA1DBoN92SmWj6afs-Vw?pwd=hhjq 
提取码:hhjq

本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号