赞
踩
项目名称: Kafka-日志收集分析平台搭建
项目环境: centos7.9,nginx-1.20.2,filebeat-7.17.15, zookeeper-3.7.2,kafka_2.13-2.7.2,mysql-5.7.1
项目描述: 该项目旨在搭建一个可靠、高性能的日志收集和分析平台,使用 Kafka 作为消息队列和数据管道,实现大规模数据的实时处理和分析
项目步骤:
1、准备4台虚拟机,搭建nginx集群,并初始化环境
2、准备三台虚拟机,搭建Kafka集群,并通过zookeeper来管理
3、在nginx集群上安装filebeat插件,并测试确保日志可以输入到Kafka集群
4、通过创建消费者python程序将kafka中清洗的数据输入到mysql数据库中存储
项目详细步骤:
1、准备四台虚拟机,搭建nginx集群,并初始化环境
1.1、创建四台虚拟机,并进行静态ip设置以及防火墙关闭工作
nginx1 | 192.168.118.163 |
nginx2 | 192.168.118.164 |
nginx3 | 192.168.118.165 |
nginx0 | 192.168.118.162 |
修改主机名分别为nginx-01,nginx-02,nginx-03
- [root@nginx-nginx01 ~]# hostnamectl set-hostname nginx01
- [root@nginx-nginx02 ~]# hostnamectl set-hostname nginx02
- [root@nginx-nginx03 ~]# hostnamectl set-hostname nginx03
- [root@nginx-nginx00 ~]# hostnamectl set-hostname nginx00
在四台nginx机器上进行如下操作,将动态ip修改为静态ip地址
- [root@nginx-nginx01 ~]# vim /etc/sysconfig/network-scripts/ifcfg-ens33
- BOOTPROTO="none"
- IPADDR=192.168.118.163
- NETMASK=255.255.255.0
- GATEWAY=192.168.118.2
- DNS1=192.168.118.2
- DNS2=114.114.114.114
-
- [root@nginx-nginx02 ~]# vim /etc/sysconfig/network-scripts/ifcfg-ens33
- BOOTPROTO="none"
- IPADDR=192.168.118.164
- NETMASK=255.255.255.0
- GATEWAY=192.168.118.2
- DNS1=192.168.118.2
- DNS2=114.114.114.114
-
- [root@nginx-nginx03 ~]# vim /etc/sysconfig/network-scripts/ifcfg-ens33
- BOOTPROTO="none"
- IPADDR=192.168.118.165
- NETMASK=255.255.255.0
- GATEWAY=192.168.118.2
- DNS1=192.168.118.2
- DNS2=114.114.114.114
-
- [root@nginx-nginx00 ~]# vim /etc/sysconfig/network-scripts/ifcfg-ens33
- BOOTPROTO="none"
- IPADDR=192.168.118.162
- NETMASK=255.255.255.0
- GATEWAY=192.168.118.2
- DNS1=192.168.118.2
- DNS2=114.114.114.114

在四台机器上分别关闭防火墙以及selinux
- [root@nginx-nginx01 ~]# iptables -F && systemctl stop firewalld && systemctl enable firewalld
- [root@nginx-nginx01 ~]# setenforce 0
- [root@nginx-nginx01 ~]# vim /etc/selinux/config
- SELINUX=disabled
-
- [root@nginx-nginx02 ~]# iptables -F && systemctl stop firewalld && systemctl enable firewalld
- [root@nginx-nginx02 ~]# setenforce 0
- [root@nginx-nginx02 ~]# vim /etc/selinux/config
- SELINUX=disabled
-
- [root@nginx-nginx03 ~]# iptables -F && systemctl stop firewalld && systemctl enable firewalld
- [root@nginx-nginx03 ~]# setenforce 0
- [root@nginx-nginx03 ~]# vim /etc/selinux/config
- SELINUX=disabled
-
- [root@nginx-nginx00 ~]# iptables -F && systemctl stop firewalld && systemctl enable firewalld
- [root@nginx-nginx00 ~]# setenforce 0
- [root@nginx-nginx00 ~]# vim /etc/selinux/config
- SELINUX=disabled

1.2、将nginx01,02,03作为web服务器,nginx00作为负载均衡器
在一台机器上进行wget下载nginx的安装包,然后分别scp到其他三台机器
- [root@nginx-nginx01 ~]# wget https://nginx.org/download/nginx-1.20.2.tar.gz
- [root@nginx-nginx01 ~]# scp ~/nginx-1.20.2.tar.gz 192.168.118.164:/root
- [root@nginx-nginx01 ~]# scp ~/nginx-1.20.2.tar.gz 192.168.118.165:/root
- [root@nginx-nginx01 ~]# scp ~/nginx-1.20.2.tar.gz 192.168.118.162:/root
-
先安装依赖,进行分别解压tar包,然后进行安装编译nginx
- yum -y install pcre-devel openssl openssl-devel gd-devel gcc gcc-c++
-
- [root@nginx-nginx01 ~]# tar -xvf nginx-1.20.2.tar.gz
- [root@nginx-nginx01 ~]# cd nginx-1.20.2
- [root@nginx-nginx01 nginx-1.20.2]# ./configure
- [root@nginx-nginx01 nginx-1.20.2]# make
- [root@nginx-nginx01 nginx-1.20.2]# sudo make install
-
- [root@nginx-nginx02 ~]# tar -xvf nginx-1.20.2.tar.gz
- [root@nginx-nginx02 ~]# cd nginx-1.20.2
- [root@nginx-nginx02 nginx-1.20.2]# ./configure
- [root@nginx-nginx02 nginx-1.20.2]# make
- [root@nginx-nginx02 nginx-1.20.2]# sudo make install
-
- [root@nginx-nginx03 ~]# tar -xvf nginx-1.20.2.tar.gz
- [root@nginx-nginx03 ~]# cd nginx-1.20.2
- [root@nginx-nginx03 nginx-1.20.2]# ./configure
- [root@nginx-nginx03 nginx-1.20.2]# make
- [root@nginx-nginx03 nginx-1.20.2]# sudo make install
-
- [root@nginx-nginx00 ~]# tar -xvf nginx-1.20.2.tar.gz
- [root@nginx-nginx00 ~]# cd nginx-1.20.2
- [root@nginx-nginx00 nginx-1.20.2]# ./configure
- [root@nginx-nginx00 nginx-1.20.2]# make
- [root@nginx-nginx00 nginx-1.20.2]# sudo make install

启动nginx,开机自启,并将nginx启动路径写入环境变量
- [root@nginx-nginx01 sbin]#/usr/local/nginx/sbin/nginx
- [root@nginx-nginx01 sbin]# vim /etc/rc.local
- /usr/local/nginx/sbin/nginx
-
- [root@nginx-nginx01 rc.d]# chmod +x /etc/rc.d/rc.local
- [root@nginx-kafka01 ~]# echo "PATH=$PATH:/usr/local/nginx/sbin" >> ~/.bashrc
- [root@nginx-kafka01 ~]# source ~/.bashrc
-
编辑nginx配置文件,提供web服务,并将nginx00作为负载均衡器
- [root@nginx-nginx01 conf]# vim /usr/local/nginx/conf/nginx.conf
- server {
- listen 80 default_server;
- server_name www.plf.com;
-
- root /usr/share/nginx/html;
-
- access_log /var/log/nginx/sc/access.log main;
-
- location / {
-
- }
- }
-
- [root@nginx-nginx02 conf]# vim /usr/local/nginx/conf/nginx.conf
- server {
- listen 80 default_server;
- server_name www.plf.com;
-
- root /usr/share/nginx/html;
-
- access_log /var/log/nginx/sc/access.log main;
-
- location / {
-
- }
- }
-
- [root@nginx-nginx03 conf]# vim /usr/local/nginx/conf/nginx.conf
- server {
- listen 80 default_server;
- server_name www.plf.com;
-
- root /usr/share/nginx/html;
-
- access_log /var/log/nginx/sc/access.log main;
-
- location / {
-
- }
- }
-
-
- [root@nginx-nginx00 conf]# vim /usr/local/nginx/conf/nginx.conf
-
- log_format main '$remote_addr -$http_x_real_ip - $remote_user [$time_local] "$request" '
- '$status $body_bytes_sent "$http_referer" '
- '"$http_user_agent" "$http_x_forwarded_for"';
-
-
- upstream app{
- server 192.168.118.163 weight=3;
- server 192.168.118.164 weight=1;
- server 192.168.118.165 weight=1;
-
- }
- server {
- listen 80;
- server_name www.feng.com;
- location / {
- proxy_pass http://app;
- proxy_set_header X-Real-IP $remote_addr;
- #设置X-Real-IP 用于保存客户端真实ip
- }
- }

2、准备三台虚拟机,搭建Kafka集群,并通过zookeeper来管理
kafka01 | 192.168.118.166 |
kafka02 | 192.168.118.167 |
kafka03 | 192.168.118.168 |
修改三台机器的主机名,并且将主机名映射到ip地址
- [root@nginx-kafka01 html]# hostnamectl set-hostname kafka01
- [root@nginx-kafka01 html]# vim /etc/hosts
- 192.168.118.166 nginx-kafka01
- 192.168.118.167 nginx-kafka02
- 192.168.118.168 nginx-kafka03
-
- [root@nginx-kafka02 html]# hostnamectl set-hostname kafka01
- [root@nginx-kafka02 html]# vim /etc/hosts
- 192.168.118.166 nginx-kafka01
- 192.168.118.167 nginx-kafka02
- 192.168.118.168 nginx-kafka03
-
- [root@nginx-kafka03 html]# hostnamectl set-hostname kafka01
- [root@nginx-kafka03 html]# vim /etc/hosts
- 192.168.118.166 kafka01
- 192.168.118.167 kafka02
- 192.168.118.168 kafka03

安装java环境,kafka以及zookeeper
- [root@nginx-kafka01 html]# yum install java wget -y
- [root@nginx-kafka01 html]# wget https://mirrors.bfsu.edu.cn/apache/kafka/2.8.1/kafka_2.12-2.8.1.tgz
- [root@nginx-kafka01 html]# tar xf kafka_2.12-2.8.1.tgz
- [root@nginx-kafka01 html]# wget https://mirrors.bfsu.edu.cn/apache/zookeeper/zookeeper-3.6.3/apache-zookeeper-3.6.4-bin.tar.gz
- [root@nginx-kafka01 html]#tar xf apache-zookeeper-3.6.4-bin.tar.gz
配置kafka
- [root@nginx-kafka01 ~]# vim ~/kafka_2.13-2.7.2/config/server.properties
- broker.id=1
- listeners=PLAINTEXT://nginx-kafka01:9092
- zookeeper.connect=192.168.118.166:2181,192.168.118.167:2181,192.168.118.168:2181
-
- [root@nginx-kafka02 ~]# vim ~/kafka_2.13-2.7.2/config/server.properties
- broker.id=2
- listeners=PLAINTEXT://nginx-kafka02:9092
- zookeeper.connect=192.168.118.166:2181,192.168.118.167:2181,192.168.118.168:2181
-
- [root@nginx-kafka03 ~]# vim ~/kafka_2.13-2.7.2/config/server.properties
- broker.id=3
- listeners=PLAINTEXT://nginx-kafka03:9092
- zookeeper.connect=192.168.118.166:2181,192.168.118.167:2181,192.168.118.168:2181
配置zk
- [root@nginx-kafka01 conf]# cd /root/apache-zookeeper-3.7.2-bin/conf
- [root@nginx-kafka01 conf]# cp zoo_sample.cfg zoo.cfg
- [root@nginx-kafka01 conf]# vim zoo.cfg
- server.1=192.168.118.166:3888:4888
- server.2=192.168.118.166:3888:4888
- server.3=192.168.118.166:3888:4888
- #3888和4888都是端口 一个用于数据传输,一个用于检验存活性和选举
创建/tmp/zookeeper目录 ,在目录中添加myid文件,文件内容就是本机指定的zookeeper id内容
- [root@nginx-kafka01 zookeeper]# mkdir /tmp/zookeeper/myid
- [root@nginx-kafka01 zookeeper]# echo 1 > /tmp/zookeeper/myid
-
- [root@nginx-kafka02 zookeeper]# mkdir /tmp/zookeeper/myid
- [root@nginx-kafka02 zookeeper]# echo 2 > /tmp/zookeeper/myid
-
- [root@nginx-kafka03 zookeeper]# mkdir /tmp/zookeeper/myid
- [root@nginx-kafka04 zookeeper]# echo 3 > /tmp/zookeeper/myid
启动zookeeper,并查看状态,先启动zookeeper再启动kafka
- [root@kafka01 bin]# ./zkServer.sh start
- [root@kafka01 bin]# ./zkServer.sh status
启动kafka
- [root@nginx-kafka01 ~]# cd kafka_2.13-2.7.2
- [root@nginx-kafka01 kafka_2.13-2.7.2]# bin/kafka-server-start.sh -daemon config/server.properties
测试kafka集群,创建topic,查看topic,创建生产者,以及消费者
- [root@nginx-kafka01 kafka_2.12-2.8.1]# bin/kafka-topics.sh --create --zookeeper 192.168.1.95:2181 --replication-factor 3 --partitions 3 --topic sc
-
- [root@nginx-kafka01 kafka_2.12-2.8.1]# bin/kafka-topics.sh --list --zookeeper 192.168.1.95:2181
-
- [root@nginx-kafka01 kafka_2.12-2.8.1]# bin/kafka-console-producer.sh --broker-list 192.168.118.166:9092 --topic sc
- >xixi
- >haha
- >didi
- >hello
- >world!!!!!
-
- [root@nginx-kafka01 kafka_2.12-2.8.1]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.118.167:9092 --topic sc --from-beginning
- haha
- hello
- didi
- xixi
- world!!!!!

3、在nginx集群上安装filebeat插件,并测试确保日志可以输入到Kafka集群
安装filebeat,并设置仓库以及开机自启
- [root@nginx01 /]# rpm --import https://packages.elastic.co/GPG-KEY-elasticsearch
- [root@nginx01 /]# vim /etc/yum.repos.d/fb.repo
- [elastic-7.x]
- name=Elastic repository for 7.x packages
- baseurl=https://artifacts.elastic.co/packages/7.x/yum
- gpgcheck=1
- gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
- enabled=1
- autorefresh=1
- type=rpm-md
- [root@nginx01 /]# yum install filebeat -y
- [root@nginx01 /]# systemctl enable filebeat
配置filebeat文件,指定读取日志,以及输出到kafka
- [root@nginx01 /]# cd /etc/filebeat
- [root@nginx01 filebeat]# cp filebeat.yml filebeat.yml.bak
- [root@nginx01 filebeat]# >filebeat.yml
- [root@nginx01 filebeat]# vim filebeat.yml
- filebeat.inputs:
- - type: log
- # Change to true to enable this input configuration.
- enabled: true
- # Paths that should be crawled and fetched. Glob based paths.
- paths:
- - /usr/local/nginx/logs/access.log
- #==========------------------------------kafka-----------------------------------
- output.kafka:
- hosts: ["192.168.118.163:9092","192.168.118.163:9092","192.168.118.163:9092"]
- topic: nginxlog
- keep_alive: 10s
-
-
-

创建主题nginxlog,启动服务,进行消费测试
- [root@nginx01 kafka_2.13-2.7.2]# bin/kafka-topics.sh --create --zookeeper 192.168.118.167:2181 --replication-factor 3 --partitions 1 --topic nginxlog
- [root@nginx01 kafka_2.13-2.7.2]# systemctl start filebeat
- [root@nginx-kafka01 kafka_2.1-2.7.2]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.118.168:9092 --topic nginxlog --from-beginning
4、通过创建消费者python程序将kafka中清洗的数据输入到mysql数据库中存储
需求分析:
需要nginx日志的ip,时间,带宽字段
将ip字段解析成相应的省份、运营商
存入数据库的字段: id, 时间, 省份, 运营商, 带宽
准备一台mysql服务器,进行创建数据库,以及数据表,和字段
- create database flask_database;
- use flask_database;
- CREATE TABLE isp_index (
- id int(11) NOT NULL AUTO_INCREMENT,
- isp_name varchar(256) DEFAULT NULL,
- PRIMARY KEY (`id`)
- )
- CREATE TABLE prov_index (
- id int(11) NOT NULL AUTO_INCREMENT,
- prov_name varchar(256) DEFAULT NULL,
- PRIMARY KEY (`id`)
- )
- CREATE TABLE `nginxlog` (
- `id` int(11) NOT NULL AUTO_INCREMENT,
- `dt` datetime NOT NULL,
- `prov` int(11) DEFAULT NULL,
- `isp` int(11) DEFAULT NULL,
- `bd` float DEFAULT NULL,
- PRIMARY KEY (`id`)
- )

编写python脚本,从kafka获取日志
获取kafka的脚本
- [root@nginx-kafka01 opt]# cat python_consumer.py
- from pykafka import KafkaClient
- import json
- import requests
- import time
- import mysql_connect
- import pymysql
- taobao_url= "https://ip.taobao.com/outGetIpInfo?accessKey=alibaba-inc&ip="
- prov_dict=mysql_connect.prov_dict
- isp_dict=mysql_connect.isp_dict
-
- def import_data(dt,prov,isp,bd):
- conn = pymysql.connect(host='192.168.118.157',port=3306,user='root',passwd='123456',db='sanchuang')
- # 创建游标
- cursor = conn.cursor()
- # 插入数据
- cursor.execute('insert into nginxlog(dt,prov,isp,bd) values("%s","%s","%s","%s")' % (dt,prov,isp,bd))
- conn.commit()
- conn.close()
- cursor.close()
-
- def resolv_ip(ip):
- response=requests.get(taobao_url+ip)
- if response.status_code==200:
- tmp_dict=json.loads(response.text)
- prov=tmp_dict['data']['region']
- isp=tmp_dict['data']['isp']
- return prov,isp
- return None,None
-
- def trans_time(dt):
- timeArray=time.strptime(dt,"%d/%b/%Y:%H:%M:%S")
- new_time=time.strftime("%Y-%m-%d %H:%M:%S",timeArray)
- return new_time
- client=KafkaClient(hosts="192.168.118.163:9092,192.168.118.164:9092,192.168.118.165:9092")
- topic=client.topics['nginxlog']
- balanced_consumer=topic.get_balanced_consumer(
- consumer_group='testgroup',
- auto_commit_enable=True,
- zookeeper_connect='nginx-kafka01:2181,nginx-kafka02:2181,nginx-kafka03:2181'
- )
-
- for message in balanced_consumer:
- if message is not None:
- try:
- line=json.loads(message.value.decode("utf-8"))
- except:
- pass
- log=line["message"]
- tmp_lst=log.split()
- ip=tmp_lst[0]
- dt=tmp_lst[4].replace("[","")
- bt=tmp_lst[10]
- dt=trans_time(dt)
- prov,isp=resolv_ip(ip)
- if prov and isp:
- prov_index,prov_dict=mysql_connect.get_index(prov,prov_dict,"prov_index","prov_name")
- print(prov_index,prov_dict)
- import_data(dt,prov,isp,bt)

连接mysql的脚本
- [root@nginx-kafka01 opt]# cat mysql_connect.py
- import pymysql
-
- db=pymysql.connect(host='192.168.118.157',
- user='root',
- password='123456',
- database='sanchuang')
- cursor=db.cursor()
-
- def get_prov_isp(tablename):
- prov=cursor.execute(f"select * from {tablename}")
- data=cursor.fetchall()
- tmp_dict=dict(data)
- result_dict={v:k for k,v in tmp_dict.items()}
- return result_dict
-
- prov_dict=get_prov_isp("prov_index")
- isp_dict=get_prov_isp("isp_index")
-
- def get_index(message,index_dict,tablename,field_name):
- if message in index_dict:
- return index_dict[message],index_dict
- else:
- cursor.execute(f'insert into {tablename}({field_name}) values("{message}")')
- db.commit()
- result_dict=get_prov_isp(tablename)
- return result_dict[message],result_dict

执行文件,查看数据库,成功
- [root@nginx-kafka01 opt]# python3 python_consumer.py
-
- root@sanchuang 10:20 mysql>select * from nginxlog;
- +----+---------------------+------+----------+------+
- | id | dt | prov | isp | bd |
- +----+---------------------+------+----------+------+
- | 1 | 2024-01-01 10:51:35 | XX | 内网IP | 0 |
- | 2 | 2024-01-01 10:51:35 | XX | 内网IP | 555 |
- | 3 | 2024-01-01 10:51:45 | XX | 内网IP | 0 |
- | 4 | 2024-01-01 10:51:45 | XX | 内网IP | 0 |
- | 5 | 2024-01-01 10:51:45 | XX | 内网IP | 0 |
- +----+---------------------+------+----------+------+
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。