当前位置:   article > 正文

kafka部署

kafka部署

目录

一、安装zookeeper集群

1.1安装zookeeper集群

 1.2下载安装包并安装kafka

1.3修改配置文件

1.4修改环境变量

1.5 配置 kafka 启动脚本并设置开机自启,启动kafka

1.6加权和将服务加到系统中

1.7分别启动kafka

1.8 Kafka 命令行操作

1.创建topic

2.查看当前服务器中的所有 topic

3.查看某个 topic 的详情

4.发布消息

5.消费消息

6.修改分区数

7.删除topic


一、安装zookeeper集群

1.1安装zookeeper集群

可见上篇文章,接着做就行(部署所有集群服务器)

 1.2下载安装包并安装kafka

  1. cd /opt/
  2. tar zxvf kafka_2.13-2.7.1.tgz

mv kafka_2.13-2.7.1 /usr/local/kafka

1.3修改配置文件

  1. cd /usr/local/kafka/config/
  2. cp server.properties{,.bak}
  3. vim server.properties
  4. broker.id=0 #21行,broker的全局唯一编号,每个broker不能重复,因此要在其他机器上配置 broker.id=1、broker.id=2
  5. listeners=PLAINTEXT://192.168.2.33:9092 #31行,指定监听的IP和端口,如果修改每个broker的IP需区分开来,也可保持默认配置不用修改
  6. num.network.threads=3 #42行,broker 处理网络请求的线程数量,一般情况下不需要去修改
  7. num.io.threads=8 #45行,用来处理磁盘IO的线程数量,数值应该大于硬盘数
  8. socket.send.buffer.bytes=102400 #48行,发送套接字的缓冲区大小
  9. socket.receive.buffer.bytes=102400 #51行,接收套接字的缓冲区大小
  10. socket.request.max.bytes=104857600 #54行,请求套接字的缓冲区大小
  11. log.dirs=/usr/local/kafka/logs #60行,kafka运行日志存放的路径,也是数据存放的路径
  12. num.partitions=1 #65行,topic在当前broker上的默认分区个数,会被topic创建时的指定参数覆盖
  13. num.recovery.threads.per.data.dir=1 #69行,用来恢复和清理data下数据的线程数量
  14. log.retention.hours=168 #103行,segment文件(数据文件)保留的最长时间,单位为小时,默认为7天,超时将被删除
  15. log.segment.bytes=1073741824 #110行,一个segment文件最大的大小,默认为 1G,超出将新建一个新的segment文件
  16. zookeeper.connect=192.168.2.33:2181,192.168.2.77:2181,192.168.2.99:2181 #123行,配置连接Zookeeper集群地址

1.4修改环境变量

  1. vim /etc/profile
  2. export KAFKA_HOME=/usr/local/kafka
  3. export PATH=$PATH:$KAFKA_HOME/bin
  4. source /etc/profile 

1.5 配置 kafka 启动脚本并设置开机自启,启动kafka

  1. #!/bin/bash
  2. #chkconfig:2345 22 88
  3. #description:Kafka Service Control Script
  4. KAFKA_HOME='/usr/local/kafka'
  5. case $1 in
  6. start)
  7. echo "---------- Kafka 启动 ------------"
  8. ${KAFKA_HOME}/bin/kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties
  9. ;;
  10. stop)
  11. echo "---------- Kafka 停止 ------------"
  12. ${KAFKA_HOME}/bin/kafka-server-stop.sh
  13. ;;
  14. restart)
  15. $0 stop
  16. $0 start
  17. ;;
  18. status)
  19. echo "---------- Kafka 状态 ------------"
  20. count=$(ps -ef | grep kafka | egrep -cv "grep|$$")
  21. if [ "$count" -eq 0 ];then
  22. echo "kafka is not running"
  23. else
  24. echo "kafka is running"
  25. fi
  26. ;;
  27. *)
  28. echo "Usage: $0 {start|stop|restart|status}"
  29. esac

1.6加权和将服务加到系统中

  1. chmod +x /etc/init.d/kafka
  2. chkconfig --add kafka 

1.7分别启动kafka

service kafka start  

1.8 Kafka 命令行操作

1.创建topic

  1. kafka-topics.sh --create --zookeeper 192.168.91.103:2181,192.168.91.104:2181,192.168.91.105:2181 --replication-factor 2 --partitions 3 --topic test
  2. -------------------------------------------------------------------------------------
  3. --zookeeper:定义 zookeeper 集群服务器地址,如果有多个 IP 地址使用逗号分割,一般使用一个 IP 即可
  4. --replication-factor:定义分区副本数,1 代表单副本,建议为 2
  5. --partitions:定义分区数
  6. --topic:定义 topic 名称
  7. ------------------------------------ 

2.查看当前服务器中的所有 topic

kafka-topics.sh --list --zookeeper 192.168.91.103:2181,192.168.91.104:2181,192.168.91.105:2181

3.查看某个 topic 的详情

kafka-topics.sh --describe --zookeeper 192.168.91.103:2181,192.168.91.104:2181,192.168.91.105:2181  

4.发布消息

kafka-console-producer.sh --broker-list 192.168.91.103:9092,192.168.91.104:9092,192.168.91.105:9092 --topic test

5.消费消息

  1. kafka-console-consumer.sh --bootstrap-server 192.168.91.103:9092,192.168.91104:9092,192.168.91.105:9092 --topic test --from-beginning
  2. -------------------------------------------------------------------------------------
  3. --from-beginning:会把主题中以往所有的数据都读取出来
  4. -------------------------------------------------------------------------------------  

6.修改分区数

kafka-topics.sh --zookeeper 192.168.91.103:2181,192.168.91.104:2181,192.168.91.105:2181 --alter --topic test  --partitions 6  

7.删除topic

kafka-topics.sh --delete --zookeeper 192.168.91.103:2181,192.168.91.104:2181,192.168.91.105:2181 --topic  test

二、部署Filebeat+Kafka+ELK

1.1部署Zookeeper+Kafka集群

接着上面实验做

1.2部署Filebeat

要搭建ELK,可见之前的文章,接着写

  1. cd /usr/local/filebeat
  2. vim filebeat.yml
  3. filebeat.prospectors:
  4. - type: log
  5. enabled: true
  6. paths:
  7. - /var/log/messages
  8. - /var/log/*.log
  9. ......
  10. #添加输出到 Kafka 的配置
  11. output.kafka:
  12. enabled: true
  13. hosts: ["192.168.91.103:9092","192.168.91.104:9092","192.168.91.105:9092"] #指定 Kafka 集群配置
  14. topic: "filebeat_test" #指定 Kafka 的 topic
  15. #启动 filebeat
  16. ./filebeat -e -c filebeat.yml  

1.3部署ELK,在logstash组件所在节点上新建一个Logstash配置文件

  1. cd /etc/logstash/conf.d/
  2. vim filebeat.conf
  3. input {
  4. kafka {
  5. bootstrap_servers => "192.168.91.103:9092,192.168.91.104:9092,192.168.91.105:9092"
  6. topics => "filebeat_test"
  7. group_id => "test123"
  8. auto_offset_reset => "earliest"
  9. }
  10. }
  11. output {
  12. elasticsearch {
  13. hosts => ["192.168.91.102:9200"]
  14. index => "filebeat_test-%{+YYYY.MM.dd}"
  15. }
  16. stdout {
  17. codec => rubydebug
  18. }
  19. }  

1.4启动logstash

logstash -f filebeat.conf  

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/415636
推荐阅读
相关标签
  

闽ICP备14008679号