赞
踩
阅读目录
elasticsearch-7.2.0.tar.gz
logstash-7.2.0.tar.gz
kibana-7.2.0-linux-x86_64.tar.gz
filebeat-7.2.0-linux-x86_64.tar.gz
jdk-7 以上
Logstash 分布式采集
这种架构是对上面架构的扩展,把一个 Logstash 数据搜集节点扩展到多个,分布于多台机器,将解析好的数据发送到 Elasticsearch server 进行存储,最后在 Kibana 查询、生成日志报表等
这种结构因为需要在各个服务器上部署 Logstash,而它比较消耗 CPU 和内存资源,所以比较适合计算资源丰富的服务器,否则容易造成服务器性能下降,甚至可能导致无法正常工作。
Beats 分布式采集
这种架构引入 Beats 作为日志搜集器。目前 Beats 包括四种:
Beats 将搜集到的数据发送到 Logstash,经 Logstash 解析、过滤后,将其发送到 Elasticsearch 存储,并由 Kibana 呈现给用户
这种架构解决了 Logstash 在各服务器节点上占用系统资源高的问题。相比 Logstash,Beats 所占系统的 CPU 和内存几乎可以忽略不计。另外,Beats 和 Logstash 之间支持 SSL/TLS 加密传输,客户端和服务器双向认证,保证了通信安全。
因此这种架构适合对数据安全性要求较高,同时各服务器性能比较敏感的场景。
如果logstash 压力很大,那么可以考虑filebeat 和logstash 之间引入redis或kafka作为缓冲。
一.安装java
解压到当前目录:
tar -xzvf jdk-8u144-linux-x64.tar.gz -C /usr/local/
临时配置 jdk 环境:(重启之后失效)
- export JAVA_HOME=/usr/local/w0624/jdk1.8.0_144
- export PATH=$JAVA_HOME/bin:$PATH
- export CLASSPATH=.
配置在环境变量中
- vim ~/.bashrc
-
-
- export JAVA_HOME=/usr/local/jdk1.8.0_144
- export JRE_HOME=${JAVA_HOME}/jre
- export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
- export PATH=${JAVA_HOME}/bin:$PATH
-
-
- source ~/.bashrc
kaikai1 192.168.210.40
kaikai2 192.168.210.44
kaikai3 192.168.210.45
- sudo vim /etc/security/limits.conf
- * soft nproc 65536
- * hard nproc 65536
- * soft nofile 65536
- * hard nofile 65536
-
- sudo vim /etc/sysctl.conf
- vm.max_map_count=655360
-
- sudo vim /etc/security/limits.d/90-nproc.conf
- * soft nproc 4096
-
- sysctl -p
- mkdir -p /data/es-data
- mkdir -p /var/log/elasticsearch
-
- chown -R user:user /data/es-data
- chown -R user:user /var/log/elasticsearch
tar -zxvf elasticsearch-7.2.0-linux-x86_64.tar.gz
- vim elasticsearch-7.2.0/config/elasticsearch.yml
-
- # 节点和集群名
- cluster.name: es-ELK # 三台机器集群名必须一样
- node.name: node-1 # 节点名
- node.master: true # 是否设置为主节点
- node.data: true # 是否设置为数据节点
- # 日志和数据目录
- path.data: /data/es-data/ # 数据目录
- path.logs: /var/log/elasticsearch/ # 日志目录
-
- # 访问的host和port设置
- network.host: 0.0.0.0
- http.port: 9200
-
- # 设置跨域请求,为后面的界面化展示使用
- http.cors.enabled: true
- http.cors.allow-origin: "*"
-
- # 节点挂载
- discovery.seed_hosts: ["192.168.210.40", "192.168.210.44","192.168.210.45"]
- discovery.zen.minimum_master_nodes: 2
-
-
- # 使用初始的一组符合主节点条件的节点引导集群 一般与node.name相同
- cluster.initial_master_nodes: ["node-1"]
- vim elasticsearch-7.2.0/config/elasticsearch.yml
-
- # 节点和集群名
- cluster.name: es-ELK # 三台机器集群名必须一样
- node.name: node-2 # 节点名
- node.master: false # 是否设置为主节点
- node.data: true # 是否设置为数据节点
- # 日志和数据目录
- path.data: /data/es-data/ # 数据目录
- path.logs: /var/log/elasticsearch/ # 日志目录
-
- # 访问的host和port设置
- network.host: 0.0.0.0
- http.port: 9200
-
- # 设置跨域请求,为后面的界面化展示使用
- http.cors.enabled: true
- http.cors.allow-origin: "*"
-
- # 节点挂载
- discovery.seed_hosts: ["192.168.210.40", "192.168.210.44","192.168.210.45"]
- discovery.zen.minimum_master_nodes: 2
-
-
- # 使用初始的一组符合主节点条件的节点引导集群 一般与node.name相同
- cluster.initial_master_nodes: ["node-2"]
- vim elasticsearch-7.2.0/config/elasticsearch.yml
-
- # 节点和集群名
- cluster.name: es-ELK # 三台机器集群名必须一样
- node.name: node-3 # 节点名
- node.master: false # 是否设置为主节点
- node.data: true # 是否设置为数据节点
- # 日志和数据目录
- path.data: /data/es-data/ # 数据目录
- path.logs: /var/log/elasticsearch/ # 日志目录
-
- # 访问的host和port设置
- network.host: 0.0.0.0
- http.port: 9200
-
- # 设置跨域请求,为后面的界面化展示使用
- http.cors.enabled: true
- http.cors.allow-origin: "*"
-
- # 节点挂载
- discovery.seed_hosts: ["192.168.210.40", "192.168.210.44","192.168.210.45"]
- discovery.zen.minimum_master_nodes: 2
-
-
- # 使用初始的一组符合主节点条件的节点引导集群 一般与node.name相同
- cluster.initial_master_nodes: ["node-3"]
nohup ./bin/elasticsearch &
- {
- "name" : "node-1",
- "cluster_name" : "AI-ELK",
- "cluster_uuid" : "pOTpS1cwRRGqiRU4BCY4Qg",
- "version" : {
- "number" : "7.2.0",
- "build_flavor" : "default",
- "build_type" : "tar",
- "build_hash" : "508c38a",
- "build_date" : "2019-06-20T15:54:18.811730Z",
- "build_snapshot" : false,
- "lucene_version" : "8.0.0",
- "minimum_wire_compatibility_version" : "6.8.0",
- "minimum_index_compatibility_version" : "6.0.0-beta1"
- },
- "tagline" : "You Know, for Search"
- }
注: 这里最好不要使用localhost,因为我们需要远程访问,所以,我们应该直接使用对应服务器的ip地址
git clone git://github.com/mobz/elasticsearch-head.git
1.安装
$ sudo apt-get install nodejs
$ sudo apt-get install npm
2.升级
$ sudo npm install npm -g
$ npm install –g n
$ n latest(升级node.js到最新版) or $ n stable(升级node.js到最新稳定版)
n后面也可以跟随版本号比如:$ n v0.10.26
或者 $ n 0.10.26
3.npm镜像替换为淘宝镜像
$ npm config set registry http://registry.npm.taobao.org/
$ npm get registry
- npm install
- npm run start
tar -zxvf logstash-7.2.0.tar.gz
普通启动方式:
nohup bin/logstash -f config/app.yml &
多配置文件启动方式:
nohup bin/logstash -f config/yml/ &
注意:yml是个文件夹下面可以有很多个.yml文件
- vim log1.yml
-
-
- input {
- file {
- type => "log1"
- path => "/var/log/tdsp/t.log"
- discover_interval => 10 # 监听间隔
- start_position => "beginning"
- }
- #beats{
- # port => "5045"
- # }
- }
- filter {
- mutate {
- split => {"message" => "|"}
- }
-
- mutate {
- add_field => {
- "date" => "%{[message][0]}"
- "grade" => "%{[message][1]}"
- "infosource" => "%{[message][2]}"
- "msg" => "%{[message][3]}"
- "ip2long" => "%{[message][4]}"
- }
- }
- mutate {
- convert => {
- "date" => "string"
- "grade" => "string"
- "infosource" => "string"
- "msg" => "string"
- "ip2long" => "string"
- }
- }
- }
-
-
- output {
- if [type] == "log1" {
- elasticsearch {
- hosts => ["192.168.210.40:9200","192.168.210.44:9200","192.168.210.45:9200"]
- index => "kaikai-%{+YYYY-MM-dd}"
- }
- }
- }
- # 输出在控制台进行调试时候使用
- #output {
- # if [type] == "log1" {
- # stdout {codec => rubydebug}
- # }
- #}
- vim log2.yml
-
-
- input {
- file {
- type => "log2"
- path => "/var/log/tdsp/w.log"
- discover_interval => 10 # 监听间隔
- start_position => "beginning"
- }
- #beats{
- # port => "5045"
- # }
- }
- filter {
- mutate {
- split => {"message" => "|"}
- }
-
- mutate {
- add_field => {
- "date" => "%{[message][0]}"
- "grade" => "%{[message][1]}"
- "infosource" => "%{[message][2]}"
- "msg" => "%{[message][3]}"
- "ip2long" => "%{[message][4]}"
- }
- }
- mutate {
- convert => {
- "date" => "string"
- "grade" => "string"
- "infosource" => "string"
- "msg" => "string"
- "ip2long" => "string"
- }
- }
- }
-
-
- output {
- if [type] == "log2" {
- elasticsearch {
- hosts => ["192.168.210.40:9200","192.168.210.44:9200","192.168.210.45:9200"]
- index => "test-%{+YYYY-MM-dd}"
- }
- }
- }
-
- #output {
- # if [type] == "log2" {
- # stdout {codec => rubydebug}
- # }
- #}
多个日志互相隔离方法二模板
- input {
- file {
- type => "log1"
- path => "/xxx/xxx/*.log"
- discover_interval => 10
- start_position => "beginning"
- }
- file {
- type => "log2"
- path => "/xxx/xxx/*.log"
- discover_interval => 10
- start_position => "beginning"
- }
- file {
- type => "log3"
- path => "/xxx/xxx/*.log"
- discover_interval => 10
- start_position => "beginning"
- }
- #beats{
- # port => "5045"
- # }
- }
-
-
- filter {
- if [type] == "log1" {
- mutate {
- split => {"message" => "|"} # 分割日志
- }
- mutate {
- add_field => {
- "x1" => "%{[message][0]}"
- "x2" => "%{[message][1]}"
- "x3" => "%{[message][2]}"
- }
-
- }
- mutate {
- convert => {
- "x1" => "string"
- "x2" => "string"
- "x3" => "string"
- }
- }
-
- json {
- source => "xxx"
- target => "xxx"
- }
- mutate {
- remove_field => ["xxx","xxx","xxx","xxx"] # 删除字段
- }
- }
- else if [type] == "log2" {
- mutate {
- split => {"message" => "|"}
- }
-
- mutate {
- add_field => {
- "x1" => "%{[message][0]}"
- "x2" => "%{[message][1]}"
- "x3" => "%{[message][2]}"
-
- }
- }
- mutate {
- convert => {
- "x1" => "string"
- "x2" => "string"
- "x3" => "string"
- }
- }
- json {
- source => "xxx"
- target => "xxx"
- }
- mutate {
- remove_field => ["xxx","xxx","xxx","xxx"]
- }
- }
- }
-
- output {
- if [type] == "log1" {
- elasticsearch {
- hosts => ["192.168.210.40:9200","192.168.210.44:9200","192.168.210.45:9200"]
- index => "log1-%{+YYYY-MM-dd}"
- }
- }
- else if [type] == "log2" {
- elasticsearch {
- hosts => ["192.168.210.40:9200","192.168.210.44:9200","192.168.210.45:9200"]
- index => "log2-%{+YYYY-MM-dd}"
- }
- }
- }
- #output {
- # stdout {codec => rubydebug}
- #}
启动测试
写入两条日志,在es集群中进行查看是否收集到日志
echo "2019-11-18 13:53:35|ERROR|MQTT connected error|错1|2130708993" >> /var/log/tdsp/t.log
echo "2019-11-18 13:53:35|ERROR|MQTT connected error|错1|2130708993" >> /var/log/tdsp/w.log
生成test-2019-11-23 和 kaikai-2019-11-23两个切片表示已经收集成功
部署机器192.168.210.42
tar -zxvf kibana-7.2.0-linux-x86_64.tar.gz
更改配置
- kibana-7.2.0-linux-x86_64/config/kibana.yml
-
-
- server.port: 5602
- server.host: "0.0.0.0"
- elasticsearch.hosts: ["http://192.168.210.40:9200"]
- # 设置语言
- i18n.locale: "zh-CN"
启动
nohup ./bin/kibana &
访问:http://192.168.210.42:5602/
tar -zxvf filebeat-7.2.0-linux-x86_64.tar.gz
介绍一个filebeat采集日志,发送到logstash进行日志分词处理,在推送到elasticsearch集群中收集,最后在kibana中进行生成日志报表,进行分析
- vim aap.yml
-
- filebeat.inputs:
- - type: log
- enabled: true
- paths:
- - /var/log/tdsp/*.log
- # tags: ["nginx"] # 添加自定义的tag
- setup.template.settings:
- index.number_of_shards: 3
- #output.console:
- # pretty: true
- # enable: true
-
-
- output.logstash:
- hosts: ["192.168.210.41:5044"]
- #filebeat.config.modules:
- # path: ${path.config}/modules.d/*.yml
- # reload.enabled: false
-
- #setup.kibana:
- # host: "192.168.210.42:5602"
-
- #output.elasticsearch:
- # hosts: ["192.168.210.40","192.168.210.44","192.168.210.45"]
-
- # 测试控制台输入输出
- #filebeat.inputs:
- #- type: stdin
- # enabled: true
- #setup.template.settings:
- # index.number_of_shards: 3
- #output.console:
- # pretty: true
- # enable: true
- nput {
- beats{
- port => "5044"
- }
- }
- filter {
- mutate {
- split => {"message" => "|"}
- }
-
- mutate {
- add_field => {
- "date" => "%{[message][0]}"
- "grade" => "%{[message][1]}"
- "infosource" => "%{[message][2]}"
- "msg" => "%{[message][3]}"
- "ip2long" => "%{[message][4]}"
- }
- }
- mutate {
- convert => {
- "date" => "string"
- "grade" => "string"
- "infosource" => "string"
- "msg" => "string"
- "ip2long" => "string"
- }
- }
- }
-
-
- output {
- elasticsearch {
- hosts => ["192.168.210.40:9200","192.168.210.44:9200","192.168.210.45:9200"]
- index => "kaikai-%{+YYYY-MM-dd}"
- }
- }
-
- #output {
- # stdout {codec => rubydebug}
- #}
logstash 设置配置采集的日志,并处理分词 索引以文件名命名
- input {
- file {
- type => "info_log"
- path => "/opt/kaikai.log"
- discover_interval => 10 # 监听间隔
- start_position => "end"
- # sincedb_path => "/usr/share/logstash/sincedb_kaikai"
- #start_position => "beginning"
- }
-
- file {
- type => "error_log"
- path => "/opt/error.log"
- discover_interval => 10
- start_position => "beginning"
- }
- }
-
- filter {
-
- grok {
- match => { "[log][file][path]" => "/(?<logfilename>[^/]+)\.log$" }
- }
-
- if [type] == "info_log" {
- # 首先,分割消息字段
- mutate {
- split => { "message" => "|" }
- }
- # 使用 Ruby 代码处理不同的字段数量情况
- ruby {
- code => "
- message = event.get('message')
- path = event.get('path')
- if message.length == 5
- # 如果字段数量正好是5,直接分配
- event.set('date', message[0].strip)
- event.set('grade', message[1].strip)
- event.set('pid', message[2].strip)
- event.set('code_location', message[3].strip)
- event.set('msg', message[4].strip)
- elsif message.length > 5
- # 如果字段数量大于5,合并超出的字段到最后一个字段
- event.set('date', message[0].strip)
- event.set('grade', message[1].strip)
- event.set('pid', message[2].strip)
- event.set('code_location', message[3].strip)
- # 合并剩余的所有字段到最后一个字段,并去除每个字段前后的空格,然后用空格连接它们
- # event.set('msg', message[4..-1].map(&:strip).join('|'))
- event.set('msg', message[4..-1].join('|'))
- else
- # 如果字段数量不足,将整个消息视为格式不正确
- #event.set('format_error', message.join('|').strip)
- #event.tag('format_error')
- event.set('msg', message)
- end
- "
- }
-
- # 如果需要,处理格式错误的情况
- # 例如,可以选择将格式错误的事件发送到特定的文件或索引
- }
- else if [type] == "error_log" {
-
- }
- }
-
- output {
- if [type] == "info_log" {
- if "format_error" in [tags] {
-
- elasticsearch {
- hosts => ["192.168.142.106:9200"]
- index => "%{logfilename}-%{+YYYY-MM-dd}"
- }
-
- } else {
- elasticsearch {
- hosts => ["192.168.142.106:9200"]
- index => "%{logfilename}-%{+YYYY-MM-dd}"
- # index => "path-%{+YYYY-MM-dd}"
- }
- }
- stdout { codec => rubydebug }
- }
- else if [type] == "error_log" {
- elasticsearch {
- hosts => ["192.168.142.106:9200"]
- index => "%{filename}-%{+YYYY-MM-dd}"
- }
- }
- }
./bin/logstash -f app.yml
./filebeat -e -c app.yml
echo '2019-11-18 13:53:35|ERROR|MQTT connected error|错1|2130708993' >> /var/log/tdsp/a.log
在集群中生成了filebeat所采集的日志,最后在kibana中进行日志索引,进行生成日志报表即可
官方文档及下载Elasticsearch Platform — Find real-time answers at scale | Elastic
Elasticsearch集群详细介绍Elasticsearch集群详细介绍_elasticsearch集群图解-CSDN博客
Filebeat工作原理ELK ----- Filebeat工作原理_elk+filebeat工作原理-CSDN博客
日志采集工具MetricbeatELK 日志采集工具Metricbeat_elk收集不到metric-CSDN博客
Kibana介绍ELK---Kibana介绍_kibanan 15672-CSDN博客
Logstash介绍ELK ---Logstash介绍_@elklog注解是什么意思-CSDN博客
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。