当前位置:   article > 正文

Kafka-日志收集分析平台搭建

Kafka-日志收集分析平台搭建

项目名称: Kafka-日志收集分析平台搭建

项目环境: centos7.9,nginx-1.20.2,filebeat-7.17.15, zookeeper-3.7.2,kafka_2.13-2.7.2,mysql-5.7.1

项目描述: 该项目旨在搭建一个可靠、高性能的日志收集和分析平台,使用 Kafka 作为消息队列和数据管道,实现大规模数据的实时处理和分析

项目步骤: 

                1、准备4台虚拟机,搭建nginx集群,并初始化环境

                2、准备三台虚拟机,搭建Kafka集群,并通过zookeeper来管理

                3、在nginx集群上安装filebeat插件,并测试确保日志可以输入到Kafka集群

                4、通过创建消费者python程序将kafka中清洗的数据输入到mysql数据库中存储

项目详细步骤:

 1、准备四台虚拟机,搭建nginx集群,并初始化环境

1.1、创建四台虚拟机,并进行静态ip设置以及防火墙关闭工作

nginx1192.168.118.163
nginx2192.168.118.164
nginx3

192.168.118.165

nginx0192.168.118.162

修改主机名分别为nginx-01,nginx-02,nginx-03

  1. [root@nginx-nginx01 ~]# hostnamectl set-hostname nginx01
  2. [root@nginx-nginx02 ~]# hostnamectl set-hostname nginx02
  3. [root@nginx-nginx03 ~]# hostnamectl set-hostname nginx03
  4. [root@nginx-nginx00 ~]# hostnamectl set-hostname nginx00

在四台nginx机器上进行如下操作,将动态ip修改为静态ip地址

  1. [root@nginx-nginx01 ~]# vim /etc/sysconfig/network-scripts/ifcfg-ens33
  2. BOOTPROTO="none"
  3. IPADDR=192.168.118.163
  4. NETMASK=255.255.255.0
  5. GATEWAY=192.168.118.2
  6. DNS1=192.168.118.2
  7. DNS2=114.114.114.114
  8. [root@nginx-nginx02 ~]# vim /etc/sysconfig/network-scripts/ifcfg-ens33
  9. BOOTPROTO="none"
  10. IPADDR=192.168.118.164
  11. NETMASK=255.255.255.0
  12. GATEWAY=192.168.118.2
  13. DNS1=192.168.118.2
  14. DNS2=114.114.114.114
  15. [root@nginx-nginx03 ~]# vim /etc/sysconfig/network-scripts/ifcfg-ens33
  16. BOOTPROTO="none"
  17. IPADDR=192.168.118.165
  18. NETMASK=255.255.255.0
  19. GATEWAY=192.168.118.2
  20. DNS1=192.168.118.2
  21. DNS2=114.114.114.114
  22. [root@nginx-nginx00 ~]# vim /etc/sysconfig/network-scripts/ifcfg-ens33
  23. BOOTPROTO="none"
  24. IPADDR=192.168.118.162
  25. NETMASK=255.255.255.0
  26. GATEWAY=192.168.118.2
  27. DNS1=192.168.118.2
  28. DNS2=114.114.114.114

在四台机器上分别关闭防火墙以及selinux

  1. [root@nginx-nginx01 ~]# iptables -F && systemctl stop firewalld && systemctl enable firewalld
  2. [root@nginx-nginx01 ~]# setenforce 0
  3. [root@nginx-nginx01 ~]# vim /etc/selinux/config
  4. SELINUX=disabled
  5. [root@nginx-nginx02 ~]# iptables -F && systemctl stop firewalld && systemctl enable firewalld
  6. [root@nginx-nginx02 ~]# setenforce 0
  7. [root@nginx-nginx02 ~]# vim /etc/selinux/config
  8. SELINUX=disabled
  9. [root@nginx-nginx03 ~]# iptables -F && systemctl stop firewalld && systemctl enable firewalld
  10. [root@nginx-nginx03 ~]# setenforce 0
  11. [root@nginx-nginx03 ~]# vim /etc/selinux/config
  12. SELINUX=disabled
  13. [root@nginx-nginx00 ~]# iptables -F && systemctl stop firewalld && systemctl enable firewalld
  14. [root@nginx-nginx00 ~]# setenforce 0
  15. [root@nginx-nginx00 ~]# vim /etc/selinux/config
  16. SELINUX=disabled

1.2、将nginx01,02,03作为web服务器,nginx00作为负载均衡器

在一台机器上进行wget下载nginx的安装包,然后分别scp到其他三台机器

  1. [root@nginx-nginx01 ~]# wget https://nginx.org/download/nginx-1.20.2.tar.gz
  2. [root@nginx-nginx01 ~]# scp ~/nginx-1.20.2.tar.gz 192.168.118.164:/root
  3. [root@nginx-nginx01 ~]# scp ~/nginx-1.20.2.tar.gz 192.168.118.165:/root
  4. [root@nginx-nginx01 ~]# scp ~/nginx-1.20.2.tar.gz 192.168.118.162:/root

先安装依赖,进行分别解压tar包,然后进行安装编译nginx

  1. yum -y install pcre-devel openssl openssl-devel gd-devel gcc gcc-c++
  2. [root@nginx-nginx01 ~]# tar -xvf nginx-1.20.2.tar.gz
  3. [root@nginx-nginx01 ~]# cd nginx-1.20.2
  4. [root@nginx-nginx01 nginx-1.20.2]# ./configure
  5. [root@nginx-nginx01 nginx-1.20.2]# make
  6. [root@nginx-nginx01 nginx-1.20.2]# sudo make install
  7. [root@nginx-nginx02 ~]# tar -xvf nginx-1.20.2.tar.gz
  8. [root@nginx-nginx02 ~]# cd nginx-1.20.2
  9. [root@nginx-nginx02 nginx-1.20.2]# ./configure
  10. [root@nginx-nginx02 nginx-1.20.2]# make
  11. [root@nginx-nginx02 nginx-1.20.2]# sudo make install
  12. [root@nginx-nginx03 ~]# tar -xvf nginx-1.20.2.tar.gz
  13. [root@nginx-nginx03 ~]# cd nginx-1.20.2
  14. [root@nginx-nginx03 nginx-1.20.2]# ./configure
  15. [root@nginx-nginx03 nginx-1.20.2]# make
  16. [root@nginx-nginx03 nginx-1.20.2]# sudo make install
  17. [root@nginx-nginx00 ~]# tar -xvf nginx-1.20.2.tar.gz
  18. [root@nginx-nginx00 ~]# cd nginx-1.20.2
  19. [root@nginx-nginx00 nginx-1.20.2]# ./configure
  20. [root@nginx-nginx00 nginx-1.20.2]# make
  21. [root@nginx-nginx00 nginx-1.20.2]# sudo make install

启动nginx,开机自启,并将nginx启动路径写入环境变量

  1. [root@nginx-nginx01 sbin]#/usr/local/nginx/sbin/nginx
  2. [root@nginx-nginx01 sbin]# vim /etc/rc.local
  3. /usr/local/nginx/sbin/nginx
  4. [root@nginx-nginx01 rc.d]# chmod +x /etc/rc.d/rc.local
  5. [root@nginx-kafka01 ~]# echo "PATH=$PATH:/usr/local/nginx/sbin" >> ~/.bashrc
  6. [root@nginx-kafka01 ~]# source ~/.bashrc

编辑nginx配置文件,提供web服务,并将nginx00作为负载均衡器

  1. [root@nginx-nginx01 conf]# vim /usr/local/nginx/conf/nginx.conf
  2. server {
  3. listen 80 default_server;
  4. server_name www.plf.com;
  5. root /usr/share/nginx/html;
  6. access_log /var/log/nginx/sc/access.log main;
  7. location / {
  8. }
  9. }
  10. [root@nginx-nginx02 conf]# vim /usr/local/nginx/conf/nginx.conf
  11. server {
  12. listen 80 default_server;
  13. server_name www.plf.com;
  14. root /usr/share/nginx/html;
  15. access_log /var/log/nginx/sc/access.log main;
  16. location / {
  17. }
  18. }
  19. [root@nginx-nginx03 conf]# vim /usr/local/nginx/conf/nginx.conf
  20. server {
  21. listen 80 default_server;
  22. server_name www.plf.com;
  23. root /usr/share/nginx/html;
  24. access_log /var/log/nginx/sc/access.log main;
  25. location / {
  26. }
  27. }
  28. [root@nginx-nginx00 conf]# vim /usr/local/nginx/conf/nginx.conf
  29. log_format main '$remote_addr -$http_x_real_ip - $remote_user [$time_local] "$request" '
  30. '$status $body_bytes_sent "$http_referer" '
  31. '"$http_user_agent" "$http_x_forwarded_for"';
  32. upstream app{
  33. server 192.168.118.163 weight=3;
  34. server 192.168.118.164 weight=1;
  35. server 192.168.118.165 weight=1;
  36. }
  37. server {
  38. listen 80;
  39. server_name www.feng.com;
  40. location / {
  41. proxy_pass http://app;
  42. proxy_set_header X-Real-IP $remote_addr;
  43. #设置X-Real-IP 用于保存客户端真实ip
  44. }
  45. }

2、准备三台虚拟机,搭建Kafka集群,并通过zookeeper来管理

kafka01192.168.118.166
kafka02192.168.118.167
kafka03192.168.118.168

修改三台机器的主机名,并且将主机名映射到ip地址

  1. [root@nginx-kafka01 html]# hostnamectl set-hostname kafka01
  2. [root@nginx-kafka01 html]# vim /etc/hosts
  3. 192.168.118.166 nginx-kafka01
  4. 192.168.118.167 nginx-kafka02
  5. 192.168.118.168 nginx-kafka03
  6. [root@nginx-kafka02 html]# hostnamectl set-hostname kafka01
  7. [root@nginx-kafka02 html]# vim /etc/hosts
  8. 192.168.118.166 nginx-kafka01
  9. 192.168.118.167 nginx-kafka02
  10. 192.168.118.168 nginx-kafka03
  11. [root@nginx-kafka03 html]# hostnamectl set-hostname kafka01
  12. [root@nginx-kafka03 html]# vim /etc/hosts
  13. 192.168.118.166 kafka01
  14. 192.168.118.167 kafka02
  15. 192.168.118.168 kafka03

安装java环境,kafka以及zookeeper 

  1. [root@nginx-kafka01 html]# yum install java wget -y
  2. [root@nginx-kafka01 html]# wget https://mirrors.bfsu.edu.cn/apache/kafka/2.8.1/kafka_2.12-2.8.1.tgz
  3. [root@nginx-kafka01 html]# tar xf kafka_2.12-2.8.1.tgz
  4. [root@nginx-kafka01 html]# wget https://mirrors.bfsu.edu.cn/apache/zookeeper/zookeeper-3.6.3/apache-zookeeper-3.6.4-bin.tar.gz
  5. [root@nginx-kafka01 html]#tar xf apache-zookeeper-3.6.4-bin.tar.gz

配置kafka

  1. [root@nginx-kafka01 ~]# vim ~/kafka_2.13-2.7.2/config/server.properties
  2. broker.id=1
  3. listeners=PLAINTEXT://nginx-kafka01:9092
  4. zookeeper.connect=192.168.118.166:2181,192.168.118.167:2181,192.168.118.168:2181
  5. [root@nginx-kafka02 ~]# vim ~/kafka_2.13-2.7.2/config/server.properties
  6. broker.id=2
  7. listeners=PLAINTEXT://nginx-kafka02:9092
  8. zookeeper.connect=192.168.118.166:2181,192.168.118.167:2181,192.168.118.168:2181
  9. [root@nginx-kafka03 ~]# vim ~/kafka_2.13-2.7.2/config/server.properties
  10. broker.id=3
  11. listeners=PLAINTEXT://nginx-kafka03:9092
  12. zookeeper.connect=192.168.118.166:2181,192.168.118.167:2181,192.168.118.168:2181

配置zk

  1. [root@nginx-kafka01 conf]# cd /root/apache-zookeeper-3.7.2-bin/conf
  2. [root@nginx-kafka01 conf]# cp zoo_sample.cfg zoo.cfg
  3. [root@nginx-kafka01 conf]# vim zoo.cfg
  4. server.1=192.168.118.166:3888:4888
  5. server.2=192.168.118.166:3888:4888
  6. server.3=192.168.118.166:3888:4888
  7. #3888和4888都是端口 一个用于数据传输,一个用于检验存活性和选举

创建/tmp/zookeeper目录 ,在目录中添加myid文件,文件内容就是本机指定的zookeeper id内容

  1. [root@nginx-kafka01 zookeeper]# mkdir /tmp/zookeeper/myid
  2. [root@nginx-kafka01 zookeeper]# echo 1 > /tmp/zookeeper/myid
  3. [root@nginx-kafka02 zookeeper]# mkdir /tmp/zookeeper/myid
  4. [root@nginx-kafka02 zookeeper]# echo 2 > /tmp/zookeeper/myid
  5. [root@nginx-kafka03 zookeeper]# mkdir /tmp/zookeeper/myid
  6. [root@nginx-kafka04 zookeeper]# echo 3 > /tmp/zookeeper/myid

启动zookeeper,并查看状态,先启动zookeeper再启动kafka

  1. [root@kafka01 bin]# ./zkServer.sh start
  2. [root@kafka01 bin]# ./zkServer.sh status

启动kafka

  1. [root@nginx-kafka01 ~]# cd kafka_2.13-2.7.2
  2. [root@nginx-kafka01 kafka_2.13-2.7.2]# bin/kafka-server-start.sh -daemon config/server.properties

测试kafka集群,创建topic,查看topic,创建生产者,以及消费者

  1. [root@nginx-kafka01 kafka_2.12-2.8.1]# bin/kafka-topics.sh --create --zookeeper 192.168.1.95:2181 --replication-factor 3 --partitions 3 --topic sc
  2. [root@nginx-kafka01 kafka_2.12-2.8.1]# bin/kafka-topics.sh --list --zookeeper 192.168.1.95:2181
  3. [root@nginx-kafka01 kafka_2.12-2.8.1]# bin/kafka-console-producer.sh --broker-list 192.168.118.166:9092 --topic sc
  4. >xixi
  5. >haha
  6. >didi
  7. >hello
  8. >world!!!!!
  9. [root@nginx-kafka01 kafka_2.12-2.8.1]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.118.167:9092 --topic sc --from-beginning
  10. haha
  11. hello
  12. didi
  13. xixi
  14. world!!!!!

3、在nginx集群上安装filebeat插件,并测试确保日志可以输入到Kafka集群

安装filebeat,并设置仓库以及开机自启

  1. [root@nginx01 /]# rpm --import https://packages.elastic.co/GPG-KEY-elasticsearch
  2. [root@nginx01 /]# vim /etc/yum.repos.d/fb.repo
  3. [elastic-7.x]
  4. name=Elastic repository for 7.x packages
  5. baseurl=https://artifacts.elastic.co/packages/7.x/yum
  6. gpgcheck=1
  7. gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
  8. enabled=1
  9. autorefresh=1
  10. type=rpm-md
  11. [root@nginx01 /]# yum install filebeat -y
  12. [root@nginx01 /]# systemctl enable filebeat

配置filebeat文件,指定读取日志,以及输出到kafka

  1. [root@nginx01 /]# cd /etc/filebeat
  2. [root@nginx01 filebeat]# cp filebeat.yml filebeat.yml.bak
  3. [root@nginx01 filebeat]# >filebeat.yml
  4. [root@nginx01 filebeat]# vim filebeat.yml
  5. filebeat.inputs:
  6. - type: log
  7. # Change to true to enable this input configuration.
  8. enabled: true
  9. # Paths that should be crawled and fetched. Glob based paths.
  10. paths:
  11. - /usr/local/nginx/logs/access.log
  12. #==========------------------------------kafka-----------------------------------
  13. output.kafka:
  14. hosts: ["192.168.118.163:9092","192.168.118.163:9092","192.168.118.163:9092"]
  15. topic: nginxlog
  16. keep_alive: 10s

创建主题nginxlog,启动服务,进行消费测试

  1. [root@nginx01 kafka_2.13-2.7.2]# bin/kafka-topics.sh --create --zookeeper 192.168.118.167:2181 --replication-factor 3 --partitions 1 --topic nginxlog
  2. [root@nginx01 kafka_2.13-2.7.2]# systemctl start filebeat
  3. [root@nginx-kafka01 kafka_2.1-2.7.2]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.118.168:9092 --topic nginxlog --from-beginning

4、通过创建消费者python程序将kafka中清洗的数据输入到mysql数据库中存储

需求分析:

        需要nginx日志ip时间带宽字段
        将ip字段解析成相应的省份运营商
        存入数据库的字段: id, 时间, 省份, 运营商, 带宽

准备一台mysql服务器,进行创建数据库,以及数据表,和字段

  1. create database flask_database;
  2. use flask_database;
  3. CREATE TABLE isp_index (
  4. id int(11) NOT NULL AUTO_INCREMENT,
  5. isp_name varchar(256) DEFAULT NULL,
  6. PRIMARY KEY (`id`)
  7. )
  8. CREATE TABLE prov_index (
  9. id int(11) NOT NULL AUTO_INCREMENT,
  10. prov_name varchar(256) DEFAULT NULL,
  11. PRIMARY KEY (`id`)
  12. )
  13. CREATE TABLE `nginxlog` (
  14. `id` int(11) NOT NULL AUTO_INCREMENT,
  15. `dt` datetime NOT NULL,
  16. `prov` int(11) DEFAULT NULL,
  17. `isp` int(11) DEFAULT NULL,
  18. `bd` float DEFAULT NULL,
  19. PRIMARY KEY (`id`)
  20. )

编写python脚本,从kafka获取日志

获取kafka的脚本

  1. [root@nginx-kafka01 opt]# cat python_consumer.py
  2. from pykafka import KafkaClient
  3. import json
  4. import requests
  5. import time
  6. import mysql_connect
  7. import pymysql
  8. taobao_url= "https://ip.taobao.com/outGetIpInfo?accessKey=alibaba-inc&ip="
  9. prov_dict=mysql_connect.prov_dict
  10. isp_dict=mysql_connect.isp_dict
  11. def import_data(dt,prov,isp,bd):
  12. conn = pymysql.connect(host='192.168.118.157',port=3306,user='root',passwd='123456',db='sanchuang')
  13. # 创建游标
  14. cursor = conn.cursor()
  15. # 插入数据
  16. cursor.execute('insert into nginxlog(dt,prov,isp,bd) values("%s","%s","%s","%s")' % (dt,prov,isp,bd))
  17. conn.commit()
  18. conn.close()
  19. cursor.close()
  20. def resolv_ip(ip):
  21. response=requests.get(taobao_url+ip)
  22. if response.status_code==200:
  23. tmp_dict=json.loads(response.text)
  24. prov=tmp_dict['data']['region']
  25. isp=tmp_dict['data']['isp']
  26. return prov,isp
  27. return None,None
  28. def trans_time(dt):
  29. timeArray=time.strptime(dt,"%d/%b/%Y:%H:%M:%S")
  30. new_time=time.strftime("%Y-%m-%d %H:%M:%S",timeArray)
  31. return new_time
  32. client=KafkaClient(hosts="192.168.118.163:9092,192.168.118.164:9092,192.168.118.165:9092")
  33. topic=client.topics['nginxlog']
  34. balanced_consumer=topic.get_balanced_consumer(
  35. consumer_group='testgroup',
  36. auto_commit_enable=True,
  37. zookeeper_connect='nginx-kafka01:2181,nginx-kafka02:2181,nginx-kafka03:2181'
  38. )
  39. for message in balanced_consumer:
  40. if message is not None:
  41. try:
  42. line=json.loads(message.value.decode("utf-8"))
  43. except:
  44. pass
  45. log=line["message"]
  46. tmp_lst=log.split()
  47. ip=tmp_lst[0]
  48. dt=tmp_lst[4].replace("[","")
  49. bt=tmp_lst[10]
  50. dt=trans_time(dt)
  51. prov,isp=resolv_ip(ip)
  52. if prov and isp:
  53. prov_index,prov_dict=mysql_connect.get_index(prov,prov_dict,"prov_index","prov_name")
  54. print(prov_index,prov_dict)
  55. import_data(dt,prov,isp,bt)

连接mysql的脚本

  1. [root@nginx-kafka01 opt]# cat mysql_connect.py
  2. import pymysql
  3. db=pymysql.connect(host='192.168.118.157',
  4. user='root',
  5. password='123456',
  6. database='sanchuang')
  7. cursor=db.cursor()
  8. def get_prov_isp(tablename):
  9. prov=cursor.execute(f"select * from {tablename}")
  10. data=cursor.fetchall()
  11. tmp_dict=dict(data)
  12. result_dict={v:k for k,v in tmp_dict.items()}
  13. return result_dict
  14. prov_dict=get_prov_isp("prov_index")
  15. isp_dict=get_prov_isp("isp_index")
  16. def get_index(message,index_dict,tablename,field_name):
  17. if message in index_dict:
  18. return index_dict[message],index_dict
  19. else:
  20. cursor.execute(f'insert into {tablename}({field_name}) values("{message}")')
  21. db.commit()
  22. result_dict=get_prov_isp(tablename)
  23. return result_dict[message],result_dict

执行文件,查看数据库,成功

  1. [root@nginx-kafka01 opt]# python3 python_consumer.py
  2. root@sanchuang 10:20 mysql>select * from nginxlog;
  3. +----+---------------------+------+----------+------+
  4. | id | dt | prov | isp | bd |
  5. +----+---------------------+------+----------+------+
  6. | 1 | 2024-01-01 10:51:35 | XX | 内网IP | 0 |
  7. | 2 | 2024-01-01 10:51:35 | XX | 内网IP | 555 |
  8. | 3 | 2024-01-01 10:51:45 | XX | 内网IP | 0 |
  9. | 4 | 2024-01-01 10:51:45 | XX | 内网IP | 0 |
  10. | 5 | 2024-01-01 10:51:45 | XX | 内网IP | 0 |
  11. +----+---------------------+------+----------+------+

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/很楠不爱3/article/detail/579113
推荐阅读
相关标签
  

闽ICP备14008679号