当前位置:   article > 正文

基于kafka的日志收集分析平台搭建_kafka日志采集

kafka日志采集

目录

一、整体架构图

二、环境准备

1、准备好8台虚拟机(centos7 2核1G)用来搭建nginx和kafka集群

2、配置静态ip地址

3、修改主机名

4、每一台机器上都写好域名解析

5、安装基本软件

6、安装时间同步服务

7、关闭防火墙和selinux

三、搭建nginx集群

1、安装

2、启动并设置开机自启

3、编辑配置文件

4、语法检测并重新加载nginx

四、nginx反向代理配置

1、编辑配置文件

2、语法检测并重新加载nginx

五、搭建keepalived双VIP高可用

1、安装

2、编辑配置文件

3、开启keepalived服务

4、查看效果

六、搭建kafka和zookeeper集群

1、安装

2、解包

3、配置kafka

4、配置zookeeper

5、启动zookeeper

6、启动kafka

七、部署filebeat集群

1、安装

2、配置

3、启动filebeat

4、测试

八、访问nginx集群生成日志

1、添加域名解析

2、访问www.test.com

3、创建消费者获取日志

九、数据入库

1、建表

2、编写python脚本

3、运行效果

4、查看数据库


一、整体架构图

主机名IP描述
keepalived01192.168.226.13反向代理机1,使用keepalived做高可用
keepalived02192.168.226.14反向代理机2,使用keepalived做高可用
nginx01192.168.226.101运行nginx和filebeat服务
nginx02192.168.226.102运行nginx和filebeat服务
nginx03192.168.226.103运行nginx和filebeat服务
nginx-kafka01192.168.226.149运行kafka和zookeeper服务
nginx-kafka02192.168.226.150运行kafka和zookeeper服务
nginx-kafka03192.168.226.151运行kafka和zookeeper服务

二、环境准备

1、准备好8台虚拟机(centos7 2核1G)用来搭建nginx和kafka集群

每台机器都配置好相应的主机名和ip地址,kafka集群还要配置好域名解析,这里以nginx-kafka01为例

2、配置静态ip地址
[root@nginx-kafka01 ~]# vim /etc/sysconfig/network-scripts/ifcfg-ens33
  1. TYPE=Ethernet
  2. PROXY_METHOD=none
  3. BROWSER_ONLY=no
  4. BOOTPROTO=static
  5. IPADDR=192.168.226.149 #静态ip
  6. NETMASK=255.255.255.0 #子网掩码
  7. GATEWAY=192.168.226.2 #网关
  8. DNS1=8.8.8.8 #DNS服务器
  9. jDEFROUTE=yes
  10. IPV4_FAILURE_FATAL=no
  11. IPV6INIT=yes
  12. IPV6_AUTOCONF=yes
  13. IPV6_DEFROUTE=yes
  14. IPV6_FAILURE_FATAL=no
  15. IPV6_ADDR_GEN_MODE=stable-privacy
  16. NAME=ens33
  17. UUID=0bb9f3ba-4245-4281-9844-e9c8e7be74af
  18. DEVICE=ens33
  19. ONBOOT=yes

3、修改主机名

  1. [root@nginx-kafka01 ~]# vim /etc/hostname
  2. #修改主机名
  3. nginx-kafka01
4、每一台机器上都写好域名解析
  1. [root@nginx-kafka01 ~]# vim /etc/hosts
  2. #配置域名解析
  3. 192.168.226.149 nginx-kafka01
  4. 192.168.226.150 nginx-kafka02
  5. 192.168.226.151 nginx-kafka03
5、安装基本软件
[root@nginx-kafka01 ~]# yum install wget lsof vim -y
6、设置时间同步服务
  1. [root@nginx-kafka01 ~]# yum install chrony -y
  2. #设置开机自启
  3. [root@nginx-kafka01 ~]# systemctl enalble chronyd
  4. [root@nginx-kafka01 ~]# systemctl start chronyd
  5. #设置时区
  6. [root@nginx-kafka01 ~]# cp /usr/share/zoneinfo/Asia/Shanghai /etc/localtime
7、关闭防火墙和selinux
  1. #关闭防火墙
  2. [root@nginx-kafka01 ~]# systemctl stop firewalld
  3. [root@nginx-kafka01 ~]# systemctl disable firewalld
  4. #关闭selinux
  5. [root@nginx-kafka01 ~]# vim /etc/selinux/config
  6. #将SELINUX由enforce修改为disabled
  7. SELINUX=disabled

三、搭建nginx集群

准备三台机器组成nginx集群,主机名分别修改为nginx01、nginx02、nginx03
1、安装
  1. #安装好epel源
  2. [root@nginx01 ~]# yum install epel-release -y
  3. [root@nginx01 ~]# yum install nginx -y
2、启动并设置开机自启
  1. #启动nginx
  2. [root@nginx01 ~]# systemctl start nginx
  3. #设置开机自启
  4. [root@nginx01 ~]# systemctl enable nginx
3、编辑配置文件
  1. [root@nginx01 ~]# cd /etc/nginx/
  2. [root@nginx01 nginx]# ls
  3. conf.d fastcgi_params.default nginx.conf uwsgi_params.default
  4. default.d koi-utf nginx.conf.default win-utf
  5. fastcgi.conf koi-win scgi_params
  6. fastcgi.conf.default mime.types scgi_params.default
  7. fastcgi_params mime.types.default uwsgi_params
  8. [root@nginx01 nginx]# vim nginx.conf
  9. # 进入nginx.conf目录后,做如下修改:
  10. # 1、将:
  11. # listen 80 default_server;
  12. # 修改成:
  13. # listen 80;
  14. # 2、在http全局块中添加include /etc/nginx/conf.d/*.conf;
  15. # 3、保存退出
  16. # 4、在conf.d目录下新增chenl.conf文件
  1. [root@nginx01 conf.d]# vim chenl.conf
  2. # vim /etc/nginx/conf.d/chenl.conf
  3. # 添加下列配置
  4. server {
  5. listen 80 default_server;
  6. server_name www.sc.com;
  7. root /usr/share/nginx/html; #设置根目录
  8. access_log /var/log/nginx/sc/access.log main; #设置日志存储目录 日志格式采用主配置文件里的main格式
  9. location / {
  10. }
  11. }
4、语法检测并重新加载nginx
  1. # 语法检测,建议使用绝对路径
  2. [root@nginx01 nginx]# /usr/sbin/nginx -t
  3. nginx: the configuration file /etc/nginx/nginx.conf syntax is ok
  4. nginx: configuration file /etc/nginx/nginx.conf test is successful
  5. # 重新加载nginx
  6. [root@nginx01 nginx]# /usr/sbin/nginx -s reload

四、nginx反向代理配置

准备两台机器用作反向代理机,主机名分别修改为keepalived01、keepalived02,nginx安装步骤同上
1、编辑配置文件
  1. [root@keepalived01 ~]# cd /etc/nginx/
  2. [root@keepalived01 nginx]# ls
  3. conf.d fastcgi_params.default nginx.conf uwsgi_params.default
  4. default.d koi-utf nginx.conf.default win-utf
  5. fastcgi.conf koi-win scgi_params
  6. fastcgi.conf.default mime.types scgi_params.default
  7. fastcgi_params mime.types.default uwsgi_params
  8. [root@nginx01 nginx]# vim nginx.conf
  9. # 1、将:
  10. # listen 80 default_server;
  11. # 修改成:
  12. # listen 80;
  13. # 2、在http全局块中添加include /etc/nginx/conf.d/*.conf;
  14. # 3、保存退出
  15. # 4、在conf.d目录下新增chenl.conf文件
  1. [root@keepalived01 conf.d]# vim chenl.conf
  2. upstream nginx_backend {
  3. server 192.168.226.101:80; #nginx01
  4. server 192.168.226.102:80; #nginx02
  5. server 192.168.226.103:80; #nginx03
  6. }
  7. server {
  8. listen 80 default_server;
  9. root /usr/share/nginx/html;
  10. location / {
  11. proxy_pass http://nginx_backend;
  12. }
  13. }
2、语法检测并重新加载nginx
  1. # 语法检测,建议使用绝对路径
  2. [root@nginx01 nginx]# /usr/sbin/nginx -t
  3. nginx: the configuration file /etc/nginx/nginx.conf syntax is ok
  4. nginx: configuration file /etc/nginx/nginx.conf test is successful
  5. # 重新加载nginx
  6. [root@nginx01 nginx]# /usr/sbin/nginx -s reload

五、搭建keepalived双VIP高可用

1、安装
[root@keepalived01 ~]# yum install keepalived -y
2、编辑配置文件
  1. # 进入/etc/keepalived目录
  2. [root@keepalived01 keepalived]# vim keepalived.conf
  1. # 配置全局
  2. ! Configuration File for keepalived
  3. global_defs {
  4. notification_email {
  5. acassen@firewall.loc
  6. failover@firewall.loc
  7. sysadmin@firewall.loc
  8. }
  9. notification_email_from Alexandre.Cassen@firewall.loc
  10. smtp_server 192.168.200.1
  11. smtp_connect_timeout 30
  12. router_id LVS_DEVEL
  13. vrrp_skip_check_adv_addr
  14. # vrrp_strict # 严格遵守vrrp协议
  15. vrrp_garp_interval 0
  16. vrrp_gna_interval 0
  17. }
  1. # 配置keepalived01
  2. vrrp_script chk_http_port {
  3. script "/opt/check_nginx.sh"
  4. interval 2
  5. weight -60
  6. }
  7. vrrp_instance VI_1 {
  8. state MASTER #设置为主
  9. interface ens33
  10. virtual_router_id 60
  11. priority 100
  12. advert_int 1
  13. authentication {
  14. auth_type PASS
  15. auth_pass 1111
  16. }
  17. track_script {
  18. chk_http_port
  19. }
  20. virtual_ipaddress {
  21. 192.168.1.250
  22. }
  23. }
  24. vrrp_instance VI_2 {
  25. state BACKUP # 设置为备
  26. interface ens33 # 虚拟ip绑定到哪个网卡
  27. virtual_router_id 61 # 0-255#虚拟路由id 在同一个局域网内 来区分不同的keepalive集群 ,
  28. # 如果在同一个keepalive集群中,那每台主机的router id都是一样的
  29. priority 50 # 0-255优先级, 优先越高拿到虚拟ip的概率就会越大
  30. advert_int 1 # 隔1s钟发送一次存活检测
  31. authentication { # 认证方式
  32. auth_type PASS
  33. auth_pass 1111
  34. }
  35. virtual_ipaddress {
  36. 192.168.1.251 # 设置虚拟ip(只要是本机能ping通的ip均可)
  37. }
  38. }
  1. # 配置keepalived02
  2. vrrp_script chk_http_port {
  3. script "/opt/check_nginx.sh"
  4. interval 2
  5. weight -60
  6. }
  7. vrrp_instance VI_1 {
  8. state BACKUP # 设置为备
  9. interface ens33
  10. virtual_router_id 60
  11. priority 100
  12. advert_int 1
  13. authentication {
  14. auth_type PASS
  15. auth_pass 1111
  16. }
  17. track_script {
  18. chk_http_port
  19. }
  20. virtual_ipaddress {
  21. 192.168.1.250
  22. }
  23. }
  24. vrrp_instance VI_2 {
  25. state MASTER # 设置为主
  26. interface ens33 # 虚拟ip绑定到哪个网卡
  27. virtual_router_id 61 # 0-255#虚拟路由id 在同一个局域网内 来区分不同的keepalive集群 ,
  28. # 如果在同一个keepalive集群中,那每台主机的router id都是一样的
  29. priority 50 # 0-255优先级, 优先越高拿到虚拟ip的概率就会越大
  30. advert_int 1 # 隔1s钟发送一次存活检测
  31. authentication { # 认证方式
  32. auth_type PASS
  33. auth_pass 1111
  34. }
  35. virtual_ipaddress {
  36. 192.168.1.251 # 设置虚拟ip(只要是本机能ping通的ip均可)
  37. }
  38. }
3、开启keepalived服务
[root@keepalived01 keepalived]# systemctl start keepalived
4、查看效果
  1. [root@keepalived01 keepalived]# ip a
  2. 1: lo: <LOOPBACK,UP,LOWER_UP> mtu 65536 qdisc noqueue state UNKNOWN group default qlen 1000
  3. link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00
  4. inet 127.0.0.1/8 scope host lo
  5. valid_lft forever preferred_lft forever
  6. inet6 ::1/128 scope host
  7. valid_lft forever preferred_lft forever
  8. 2: ens33: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc pfifo_fast state UP group default qlen 1000
  9. link/ether 00:0c:29:21:dc:2b brd ff:ff:ff:ff:ff:ff
  10. inet 192.168.226.149/24 brd 192.168.226.255 scope global noprefixroute ens33
  11. valid_lft forever preferred_lft forever
  12. inet 192.168.100.250/32 scope global ens33
  13. valid_lft forever preferred_lft forever
  14. inet6 fe80::e634:2b63:9731:1da2/64 scope link noprefixroute
  15. valid_lft forever preferred_lft forever
  16. [root@keepalived02 keepalived]# ip a
  17. 1: lo: <LOOPBACK,UP,LOWER_UP> mtu 65536 qdisc noqueue state UNKNOWN group default qlen 1000
  18. link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00
  19. inet 127.0.0.1/8 scope host lo
  20. valid_lft forever preferred_lft forever
  21. inet6 ::1/128 scope host
  22. valid_lft forever preferred_lft forever
  23. 2: ens33: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc pfifo_fast state UP group default qlen 1000
  24. link/ether 00:0c:29:9c:d1:c7 brd ff:ff:ff:ff:ff:ff
  25. inet 192.168.226.150/24 brd 192.168.226.255 scope global noprefixroute ens33
  26. valid_lft forever preferred_lft forever
  27. inet 192.168.1.251/32 scope global ens33
  28. valid_lft forever preferred_lft forever
  29. inet6 fe80::e634:2b63:9731:1da2/64 scope link tentative noprefixroute dadfailed
  30. valid_lft forever preferred_lft forever
  31. inet6 fe80::b3a5:597f:b278:1602/64 scope link noprefixroute
  32. valid_lft forever preferred_lft forever

六、搭建kafka和zookeeper集群

1、安装
  1. # 安装java
  2. [root@nginx-kafka01 opt]# yum install java wget -y
  3. # 安装kafka
  4. [root@nginx-kafka01 opt]# wget https://mirrors.bfsu.edu.cn/apache/kafka/2.8.2/kafka_2.12-2.8.2.tgz
  5. # 安装zookeeper
  6. [root@nginx-kafka01 opt]# wget https://mirrors.bfsu.edu.cn/apache/zookeeper/zookeeper-3.6.4/apache-zookeeper-3.6.3-bin.tar.gz
2、解包
  1. [root@nginx-kafka01 opt]# tar xzvf kafka_2.12-2.8.1.tgz
  2. [root@nginx-kafka01 opt]# tar xzvf apache-zookeeper-3.6.4-bin.tar.gz
3、配置kafka

进入/opt/kafka_2.12-2.8.1/config

  1. [root@nginx-kafka01 config]# vim server.properties
  2. # 每台机器都需要绑定唯一的id
  3. broker.id=1
  4. # 格式:listeners = PLAINTEXT://your.host.name:9092
  5. listeners=PLAINTEXT://nginx-kafka01:9092
  6. zookeeper.connect=192.168.226.149:2181,192.168.226.150.134:2181,192.168.226.151:2181
4、配置zookeeper

进入/opt/apache-zookeeper-3.6.4-bin/conf

  1. [root@nginx-kafka01 conf]# cp zoo_sample.cfg zoo.cfg
  2. [root@nginx-kafka01 conf]# vim zoo.cfg
  3. # 在文件底部添加如下三行
  4. server.1=192.168.226.149:3888:4888
  5. server.2=192.168.226.150:3888:4888
  6. server.3=192.168.226.151:3888:4888
  7. # 3888端口用于标准的Leader选举通信,4888端口是为了提高集群的可用性和容错性而设计的
  8. # 注意检查"dataDir"和"clientPort"等配置项是否配置正确
  1. # 设置zookeeper服务器节点的唯一标识符,注意要和前面配置的broker.id保持一致
  2. [root@nginx-kafka01 ~]# mkdir /tmp/zookeeper
  3. [root@nginx-kafka01 ~]# echo 1 > /tmp/zookeeper/myid
5、启动zookeeper

进入apache-zookeeper-3.6.4-bin目录,所有可执行文件都在bin目录下,启动前一定要确保防火墙已经关闭

  1. [root@nginx-kafka01 apache-zookeeper-3.6.4-bin]# bin/zkServer.sh start
  2. # 检查当前zookeeper服务的运行状态
  3. [root@nginx-kafka01 apache-zookeeper-3.6.4-bin]# bin/zkServer.sh status
6、启动kafka

进入kafka_2.12-2.8.1目录,所有可执行文件都在bin目录下

[root@nginx-kafka01 kafka_2.12-2.8.1]# bin/kafka-server-start.sh -daemon config/server.properties

七、部署filebeat集群

1、安装
  1. # 导入Elastic Elasticsearch软件包的GPG密钥
  2. [root@nginx01 ~]# rpm --import https://packages.elastic.co/GPG-KEY-elasticsearch
  3. # 配置系统的yum包管理器,告诉系统从哪里获取Elastic Stack7.x版本的软件包,以及如何验证这些包的完整性
  4. [root@nginx01 ~]# vim /etc/yum.repos.d/fb.repo
  5. # 添加如下配置
  6. [elastic-7.x]
  7. name=Elastic repository for 7.x packages
  8. baseurl=https://artifacts.elastic.co/packages/7.x/yum
  9. gpgcheck=1
  10. gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
  11. enabled=1
  12. autorefresh=1
  13. type=rpm-md
  1. # yum安装
  2. [root@nginx01 ~]# yum install filebeat -y
  3. # 可以查看filebeat有没有安装 rpm -qa 是查看机器上安装的所有软件包
  4. [root@nginx01 ~]# rpm -qa |grep filebeat
  5. # 查看filebeat安装到哪里去了,牵扯的文件有哪些
  6. [root@nginx01 ~]# rpm -ql filebeat
  7. # 设置开机自启
  8. [root@nginx01 ~]# systemctl enable filebeat
2、配置

修改配置文件/etc/filebeat/filebeat.yml

  1. filebeat.inputs:
  2. - type: log
  3. # Change to true to enable this input configuration.
  4. enabled: true
  5. # Paths that should be crawled and fetched. Glob based paths.
  6. paths:
  7. - /var/log/nginx/chenl/access.log
  8. #==========------------------------------kafka-----------------------------------
  9. output.kafka:
  10. hosts: ["192.168.226.149:9092","192.168.226.150:9092","192.168.226.151:9092"]
  11. topic: nginxlog
  12. keep_alive: 10s
3、启动filebeat
[root@nginx01 ~]# systemctl start filebeat
4、测试
  1. # 创建主题nginxlog
  2. [root@nginx01 ~]# bin/kafka-topics.sh --create --zookeeper 192.168.226.149:2181 --replication-factor 1 --partitions 1 --topic nginxlog
  3. # 创建消费者消费数据
  4. [root@nginx01 ~]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.226.149:9092 --topic nginxlog --from-beginning

八、访问nginx集群生成日志

执行机器nginx01、kafka01
1、添加域名解析
  1. # 进入/etc/hosts
  2. 192.168.100.250 www.test.com
  3. 192.168.1.251 www.test.com
2、访问www.test.com
[root@keepalived01 ~]# curl www.test.com
3、创建消费者获取日志
  1. [root@nginx-kafka01 kafka_2.12-2.8.1]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.226.149:9092 --topic nginxlog --from-beginning
  2. # 可以看到消费了一条来自ip192.168.100.150的访问记录
  3. {"@timestamp":"2023-09-05T05:35:04.248Z",
  4. "@metadata":{"beat":"filebeat","type":"_doc","version":"7.17.12"},
  5. "log":{"offset":3211,"file":{"path":"/var/log/nginx/chenl/access.log"}},
  6. "message":"192.168.100.250 - - [05/Sep/2023:13:34:59 +0800] \"GET / HTTP/1.1\" 200 4833 \"-\" \"curl/7.29.0\" \"-\"","input":{"type":"log"},
  7. "ecs":{"version":"1.12.0"},"host":{"name":"nginx-kafka01"},
  8. "agent":{"type":"filebeat","version":"7.17.12",
  9. "hostname":"nginx-kafka01",
  10. "ephemeral_id":"4a96ba90-3583-4c3d-b194-8dfec062e0d8",
  11. "id":"b4cb67b1-2919-4e65-ad2b-8bdd665f2093",
  12. "name":"nginx-kafka01"}}

九、数据入库

1、建表
  1. CREATE TABLE `log` (
  2. `ip` varchar(50) DEFAULT NULL,
  3. `time` datetime DEFAULT NULL,
  4. `dk` varchar(30) DEFAULT NULL,
  5. `isp` varchar(50) DEFAULT NULL,
  6. `prov` varchar(30) DEFAULT NULL
  7. ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
2、编写python脚本
  1. #!/bin/python3
  2. #encoding:utf-8
  3. import json
  4. import requests
  5. import time
  6. import pymysql
  7. # 连接数据库
  8. db = pymysql.connect(
  9. host = "192.168.226.149",
  10. user = "root",
  11. passwd = "123456",
  12. database = "nginx_log"
  13. )
  14. cursor = db.cursor()
  15. taobao_url = "https://ip.taobao.com/outGetIpInfo?accessKey=alibaba-inc&ip="
  16. # 查询ip地址的信息(省份和运营商isp),通过taobao网的接口
  17. def resolv_ip(ip):
  18. response = requests.get(taobao_url+ip)
  19. if response.status_code == 200:
  20. tmp_dict = json.loads(response.text)
  21. prov = tmp_dict["data"]["region"]
  22. isp = tmp_dict["data"]["isp"]
  23. return prov,isp
  24. return None,None
  25. # 将日志里读取的格式转换为我们指定的格式
  26. def trans_time(dt):
  27. #把字符串转成时间格式
  28. timeArray = time.strptime(dt, "%d/%b/%Y:%H:%M:%S")
  29. #timeStamp = int(time.mktime(timeArray))
  30. #把时间格式转成字符串
  31. new_time = time.strftime("%Y-%m-%d %H:%M:%S", timeArray)
  32. return new_time
  33. # 从kafka里获取数据,清洗为我们需要的ip,时间,带宽
  34. from pykafka import KafkaClient
  35. client = KafkaClient(hosts="192.168.226.149:9092,192.168.226.150:9092,192.168.226.151:9092")
  36. topic = client.topics['nginxlog']
  37. balanced_consumer = topic.get_balanced_consumer(
  38. consumer_group = 'testgroup',
  39. auto_commit_enable = True,
  40. zookeeper_connect = '192.168.226.149:2181,192.168.226.150:2181,192.168.226.51:2181'
  41. )
  42. # consumer = topic.get_simple_consumer()
  43. for message in balanced_consumer:
  44. if message is not None:
  45. line = json.loads(message.value.decode("utf-8"))
  46. log = line["message"]
  47. tmp_lst = log.split()
  48. ip = tmp_lst[0]
  49. dt = tmp_lst[3].replace("[","")
  50. bt = tmp_lst[9]
  51. dt = trans_time(dt)
  52. prov, isp = resolv_ip(ip)
  53. insert = "insert into log(ip, time, dk, prov, isp) values( %s, %s, %s, %s, %s)"
  54. values = (ip, dt, bt, prov,isp)
  55. cursor.execute(insert,values)
  56. db.commit()
  57. if prov and isp:
  58. print(prov, isp,dt)
  59. db.close()
3、运行效果

4、查看数据库

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Monodyee/article/detail/579117
推荐阅读
相关标签
  

闽ICP备14008679号