当前位置:   article > 正文

zookeeper+kafka消息队列群集部署

zookeeper+kafka消息队列群集部署

单节点部署kafka

主机:

kafka1:192.168.10.101

设置hosts文件

192.168.10.101 kafka1

1:安装zookeeper

[root@kafka1 ~]# yum -y install java

[root@kafka1 ~]# tar zxvf apache-zookeeper-3.6.0-bin.tar.gz

[root@kafka1 ~]# mv apache-zookeeper-3.6.0-bin /etc/zookeeper

[root@kafka1 ~]# cd /etc/zookeeper/conf

[root@kafka1 conf]# mv zoo_sample.cfg zoo.cfg

[root@kafka1 conf]# vim zoo.cfg

dataDir=/etc/zookeeper/zookeeper-data

[root@kafka1 conf]# cd /etc/zookeeper/

[root@kafka1 kafka]# mkdir zookeeper-data

[root@kafka1 zookeeper]# ./bin/zkServer.sh start

[root@kafka1 zookeeper]# ./bin/zkServer.sh status

2:安装kafka

[root@kafka1 ~]# tar zxvf kafka_2.13-2.4.1.tgz

[root@kafka1 ~]# mv kafka_2.13-2.4.1 /etc/kafka

[root@kafka1 ~]# cd /etc/kafka/

[root@kafka1 kafka]# vim config/server.properties

log.dirs=/etc/kafka/kafka-logs      #60行

[root@kafka1 kafka]# mkdir /etc/kafka/kafka-logs

[root@kafka1 kafka]# bin/kafka-server-start.sh  config/server.properties &

检查两个端口的开启状态

[root@kafka1 kafka]# netstat -anpt | grep 2181

[root@kafka1 kafka]# netstat -anpt | grep 9092

注意:启动时先启动zookeeper,关闭时先关闭kafka

如果要关闭zookeeper

[root@kafka1 zookeeper]# /etc/zookeeper/bin/zkServer.sh stop

如果要关闭kafka

[root@192 kafka]# /etc/kafka/bin/kafka-server-stop.sh

如果关不了,就kill杀死该进程

3:测试

创建topic

bin/kafka-topics.sh --create --zookeeper kafka1:2181 --replication-factor 1 --partitions 1 --topic test

列出topic

bin/kafka-topics.sh --list --zookeeper kafka1:2181

查看topic

bin/kafka-topics.sh --describe --zookeeper kafka1:2181 --topic test

生产消息

bin/kafka-console-producer.sh --broker-list kafka1:9092 -topic test

消费消息(打开另一个终端,一边生产消息,一边查看消费消息)

bin/kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic test

删除topic

bin/kafka-topics.sh --delete --zookeeper kafka1:2181 --topic test

五:群集部署kafka

1:修改主机hosts文件(所有主机都配置)

[root@kafka1 ~]# vim /etc/hosts

192.168.10.101 kafka1

192.168.10.102 kafka2

192.168.10.103 kafka3

2:zookeeper的部署

(1)安装zookeeper(三个节点的配置相同)

[root@kafka1 ~]# systemctl stop firewalld

[root@kafka1 ~]# setenforce 0

[root@kafka1 ~]# yum -y install java

[root@kafka1 ~]# tar zxvf apache-zookeeper-3.6.0-bin.tar.gz

[root@kafka1 ~]# mv apache-zookeeper-3.6.0-bin /etc/zookeeper

(2)创建数据保存目录(三个节点的配置相同)

[root@kafka1 ~]# cd /etc/zookeeper/

[root@kafka1 zookeeper]# mkdir zookeeper-data

(3)修改配置文件(三个节点的配置相同)

[root@kafka1 zookeeper]# cd /etc/zookeeper/conf

[root@kafka1 ~]# mv zoo_sample.cfg zoo.cfg

[root@kafka1 ~]# vim zoo.cfg

dataDir=/etc/zookeeper/zookeeper-data

clientPort=2181

server.1=192.168.10.101:2888:3888

server.2=192.168.10.102:2888:3888

server.3=192.168.10.103:2888:3888

注释:zookeeper只用的端口

2181:对cline端提供服务

3888:选举leader使用

2888:集群内机器通讯使用(Leader监听此端口)

(4)创建节点id文件(按server编号设置这个id,三个机器不同)

节点1:

[root@kafka1 conf]# echo '1' > /etc/zookeeper/zookeeper-data/myid

节点2:

[root@kafka2 conf]# echo '2' > /etc/zookeeper/zookeeper-data/myid

节点3:

[root@kafka3 conf]# echo '3' > /etc/zookeeper/zookeeper-data/myid

(5)三个节点启动zookeeper进程

[root@kafka1 conf]# cd /etc/zookeeper/

[root@kafka1 zookeeper]# ./bin/zkServer.sh start

[root@kafka1 zookeeper]# ./bin/zkServer.sh status

2:kafka的部署

(1)kafka的安装(三个节点的配置相同)

[root@kafka1 ~]# tar zxvf kafka_2.13-2.4.1.tgz

[root@kafka1 ~]# mv kafka_2.13-2.4.1 /etc/kafka

(2)修改配置文件

[root@kafka1 ~]# cd /etc/kafka/

[root@kafka2 kafka]# vim config/server.properties

broker.id=1      

##21行  修改,注意其他两个的id分别是2和3

listeners=PLAINTEXT://192.168.10.101:9092     

#31行  修改,其他节点改成各自的IP地址

log.dirs=/etc/kafka/kafka-logs     

## 60行  修改

num.partitions=1     

##65行 分片数量,不能超过节点数

zookeeper.connect=192.168.10.101:2181,192.168.10.102:2181,192.168.10.103:218

##123行,填写集群中各节点的地址和端口

注释:

9092是kafka的监听端口

(3)创建日志目录(三个节点的配置相同)

[root@kafka1 kafka]# mkdir /etc/kafka/kafka-logs

(3)在所有kafka节点上执行开启命令,生成kafka群集(三个节点的配置相同)

[root@kafka1 kafka]# ./bin/kafka-server-start.sh config/server.properties &

如果启动不了,可以将/etc/kafka/kafka-logs中的数据清除再试试

3:测试

创建topic(任意一个节点)

bin/kafka-topics.sh --create --zookeeper kafka1:2181 --replication-factor 1 --partitions 1 --topic test

列出topic(任意一个节点)

bin/kafka-topics.sh --list --zookeeper kafka1:2181

bin/kafka-topics.sh --list --zookeeper kafka2:2181

bin/kafka-topics.sh --list --zookeeper kafka3:2181

生产消息

bin/kafka-console-producer.sh --broker-list kafka1:9092 -topic test

消费消息

bin/kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic test

删除topic

bin/kafka-topics.sh --delete --zookeeper kafka1:2181 --topic test

扩展:将logstash的信息输出到kafka中

[root@logstash1 logstash]# vim conf.d/beats.conf

input {

  beats {

    port => "5044"

    codec => "json"

 }

}

output {

   kafka {

    bootstrap_servers => "192.168.10.101:9092,192.168.10.102:9092,192.168.10.103:9092"

    topic_id => "httpd-filebeat"

    batch_size => "5"

    codec => "json"

   }

}

收集kafka的消息,并发送到ES

[root@logstash2 ~]#  vim /etc/logstash/conf.d/beats.conf

input{

      kafka {

        bootstrap_servers => "192.168.10.101:9092,192.168.10.102:9092,192.168.10.103:9092"

        topics => "httpd-filebeat"

        group_id => "systemlog-filebeat"

        decorate_events => true

        consumer_threads => 1

        codec => "json"

    }

}

output{

       elasticsearch {

          hosts => ["192.168.10.119:9200"]

          index => "weblog-beat-%{+YYYY.MM.dd}"

    }

}

生产消息(任意一个节点)

bin/kafka-console-producer.sh --broker-list kafka1:9092 -topic test

消费消息(任意一个节点)

bin/kafka-console-consumer.sh --bootstrap-server kafka2:9092 --topic test

删除topic(任意一个节点)

bin/kafka-topics.sh --delete --zookeeper kafka1:2181 --topic test

重启kafka

/etc/kafka/bin/kafka-server-stop.sh

/etc/zookeeper/bin/zkServer.sh restart

/etc/kafka/bin/kafka-server-start.sh config/server.properties &

问题解决

错误提示:

 Failed to acquire lock on file .lock in /tmp/kafka-logs. A Kafka instance in another process or thread is using this directory.

解决方法:

[root@kafka1 kafka]# rm -rf /tmp/kafka-logs/*

[root@kafka1 kafka]# netstat -lnp|grep 9092

pkill杀死kafka的进程号

[root@kafka2 kafka]# ./bin/kafka-server-start.sh config/server.properties &

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/喵喵爱编程/article/detail/834821
推荐阅读
相关标签
  

闽ICP备14008679号