赞
踩
目录
为了防止收集日志信息太多或者服务器down机导致的信息丢失,我们这里引入kafka消息队列服务器也起到了日志缓冲的作用,我们这里搭建单节点的kafka,在实际环境下应该使用集群方式部署
zookeeper依赖 java 环境 ,所以我们需要安装 jdk官网建议 最低安装 jdk 1.8 版本
zookeeper下载地址:https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/stable/apache-zookeeper-3.5.5-bin.tar.gz
tar zxf jdk-8u171-linux-x64.tar.gz -C /usr/local/
vim /etc/profile
- JAVA_HOME=/usr/local/jdk1.8.0_171
- PATH=$JAVA_HOME/bin:$PATH
- CLASSPATH=$JAVA_HOME/jre/lib/ext:$JAVA_HOME/lib/tools.jar
- export PATH JAVA_HOME CLASSPATH
source /etc/profile #使环境变量生效
java -version
vim /etc/hosts
192.168.30.13 cong13
tar -zxf apache-zookeeper-3.5.5-bin.tar.gz -C /usr/local/
mkdir -p /data/zk/data
mkdir -p /data/zk/datalog
- cd /usr/local/apache-zookeeper-3.5.5-bin/conf/
- cp zoo_sample.cfg zoo.cfg #复制一份zoo_sample.cfg文件并命名为zoo.cfg
vim zoo.cfg
- dataDir=/data/zk/data #修改这一行为我们创建的目录
- dataLogDir=/data/zk/datalog #添加这一行
- export ZOOKEEPER_HOME=/usr/local/apache-zookeeper-3.5.5-bin
- export PATH=$ZOOKEEPER_HOME/bin:$PATH
source /etc/profile
- [root@cong13 ~]# zkServer.sh start
- ZooKeeper JMX enabled by default
- Using config: /usr/local/apache-zookeeper-3.5.5-bin/bin/../conf/zoo.cfg
- Starting zookeeper ... STARTED
查看zookeeper的状态
zkServer.sh status
或者执行jps查看状态,其中QuorumPeerMain是zookeeper进程
方法1:添加/etc/rc.local文件
echo "source /etc/profile" >> /etc/rc.local
echo "cd /usr/local/apache-zookeeper-3.5.5-bin/bin && ./zkServer.sh start " >> /etc/rc.local
chmod +x /etc/rc.local
方法二:将zookeeper添加到开机自启服务
在/lib/systemd/system/文件夹下创建一个启动脚本zookeeper.service
vim /lib/systemd/system/zookeeper.service
- [Unit]
- Description=Zookeeper service
- After=network.target
-
- [Service]
- Type=forking
- Environment="JAVA_HOME=/usr/local/jdk1.8.0_171"
- User=root
- Group=root
- ExecStart=/usr/local/apache-zookeeper-3.5.5-bin/bin/zkServer.sh start
- ExecStop=/usr/local/apache-zookeeper-3.5.5-bin/bin/zkServer.sh stop
-
- [Install]
- WantedBy=multi-user.target
- systemctl daemon-reload
- systemctl enable zookeeper
官网地址:Apache Kafka
tar zxf kafka_2.12-2.2.0.tgz -C /usr/local/
vim /usr/local/kafka_2.12-2.2.0/config/server.properties
- # broker的全局唯一编号,不能重复
- broker.id=0
- # 监听
- listeners=PLAINTEXT://:9092 #开启此项
- # 日志目录
- log.dirs=/data/kafka/log #修改日志目录
- # 配置zookeeper的连接(如果不是本机,需要该为ip或主机名)
- zookeeper.connect=localhost:2181
mkdir -p /data/kafka/log
- export KAFKA_HOME=/usr/local/kafka_2.12-2.2.0
- export PATH=$KAFKA_HOME/bin:$PATH
source /etc/profile
后台启动kafka
kafka-server-start.sh -daemon /usr/local/kafka_2.12-2.2.0/config/server.properties
将kafka添加到开机自启服务
在/lib/systemd/system/文件夹下创建一个启动脚本kafka.service
- [Unit]
- Description=Apache Kafka server (broker)
- After=network.target zookeeper.service
-
- [Service]
- Type=simple
- Environment="PATH=/usr/local/jdk1.8.0_171/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/root/bin"
- User=root
- Group=root
- ExecStart=/usr/local/kafka_2.12-2.2.0/bin/kafka-server-start.sh /usr/local/kafka_2.12-2.2.0/config/server.properties
- ExecStop=/usr/local/kafka_2.12-2.2.0/bin/kafka-server-stop.sh
- Restart=on-failure
-
- [Install]
- WantedBy=multi-user.target
- systemctl daemon-reload
- systemctl enable kafka
测试:
kill 3607
systemctl start kafka
修改 filebeat 配置文件,把filebeat收集到的nginx日志保存到kafka消息队列中。把output.elasticsearch和output.logstash都给注释掉,添加kafka项
[root@cong12 ~]# vim /usr/local/filebeat/filebeat.yml
- - type: log
- enabled: true #开启此配置
- paths:
- - /usr/local/nginx/logs/*.log #添加收集nginx服务日志
- #- /var/log/*.log #注释该行
- #-------------------------- Elasticsearch output ------------------------------
- #output.elasticsearch: # Elasticsearch这部分全部注释掉
- # Array of hosts to connect to.
- #hosts: ["localhost:9200"]
- #----------------------------- Logstash output --------------------------------
- #output.logstash: # logstash这部分全部注释掉
- # The Logstash hosts
- #hosts: ["192.168.30.11:5044"]
- #在Logstash后面添加如下行
- #----------------------------- KAFKA output --------------------------------
- output.kafka: #把日志发送给kafka
- enabled: true #开启kafka模块
- hosts: ["192.168.30.13:9092"] #填写kafka服务器地址
- topic: nginx_logs #填写kafka的topic(主题),自定义的
把output.elasticsearch和output.logstash都给注释掉,然后在output.logstash结尾添加KAFKA output,把日志数据发送给kafka。需要注意的是kafka中如果不存在这个topic,则会自动创建。如果有多个kafka服务器,可用逗号分隔
这里需要添加kafka的hosts解析,如不添加则会报错
[root@cong12 ~]# vim /etc/hosts
- 192.168.30.12 cong12
- 192.168.30.13 cong13
- kill 919
- cd /usr/local/filebeat/ && ./filebeat -e -c filebeat.yml
注: 当启动时没有显示kafka,一、需要访问nginx的测试页,目的是产生日志记录,filebeat作为消息的生产者才可以将日志数据写到kafka
二、配置文件配置错误
在kafka服务器上查看filebeat保存的数据,topice为nginx_logs
kafka-topics.sh --list --zookeeper localhost:2181
启动一个消费者去查看filebeat发送过来的消息,能看到消息说明我们的filebeat的output.kafka配置成功。接下来配置logstash去kafka消费数据
配置logstash去kafka拿取数据,进行数据格式化,然后把格式化的数据保存到Elasticsearch,通过kibana展示给用户。kibana是通过Elasticsearch进行日志搜索的
vim /etc/host
- 192.168.30.11 cong11
- 192.168.30.12 cong12
- 192.168.30.13 cong13
vim /usr/local/logstash-7.3.0/config/http_logstash.conf
- input{
- kafka {
- codec => plain{charset => "UTF-8"}
- bootstrap_servers => "192.168.30.13:9092"
- client_id => "httpd_logs" #这里设置client.id和group.id是为了做标识
- group_id => "httpd_logs"
- consumer_threads => 5 #设置消费kafka数据时开启的线程数,一个partition对应一个消费者消费,设置多了不影
- 响,在kafka中一个进程对应一个线程
- auto_offset_reset => "latest" #从最新的偏移量开始消费
- decorate_events => true #此属性会将当前topic,offset,group,partition等信息>也带到message中
- topics => "nginx_logs"
- }
- }
- output {
- stdout {
- codec => "rubydebug"
- }
- elasticsearch {
- hosts => [ "192.168.30.11:9200" ]
- index => "nginx-logs-%{+YYYY.MM.dd}"
- }
-
- }
可以使用相同的group_id方式运行多个Logstash实例,以跨物理机分布负载。主题中的消息将分发到具有相同的所有Logstash实例group_id
Kafka input参数详解:
Kafka input plugin | Logstash Reference [8.2] | Elastic
kill 941
nohup logstash -f /usr/local/logstash-7.3.0/config/http_logstash.conf &
查看 nohup输出文件:
[root@cong11 ~]# tail -0f nohup.out
刷新nginx页面,产生数据
可以在kibana上查看我们最新收集到的nginx服务日志。到这里我们的ELK+filebeat+kafka部署完成
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。