赞
踩
ElasticSearch(弹性搜索):是一款开源的分布式、RESTful 风格的搜索和数据分析引擎,它底层基于 Apache Lucene 开源库进行封装,其不仅仅提供分布式多用户能力的全文搜索引擎,还可以被准确形容为:
1、一个分布式的实时文档存储,每个字段可以被索引与搜索;
2、一个分布式实时分析搜索引擎;
3、能胜任上百个节点的扩展,并支持 PB 级别结构化和非结构化数据。
Elasticsearch 本质上是一个分布式数据库,允许多台服务器协同工作,每台服务器可以运行多个 Elasticsearch 实例。单个 Elasticsearch 实例称为一个节点(Node),一组节点构成一个集群(Cluster)
1 什么是 index (索引) ?
一个 索引 就是一个拥有几分相似特征的文档的集合。ES 将数据存储于一个或多个索引中,索引 就相当于 SQL 中的一个数据库。
2 什么是 Type(类型)?
类型是索引内部的逻辑分区(category/partition),然而其意义完全取决于用户需求。因此,一个索引内部可定义一个或多个类型(type)。一般来说,类型就是为那些拥有相同的域的文档做的预定义。类比传统的关系型数据库领域来说,类型 相当于 表。在 5.X 版本中,一个 index 下可以创建多个 type
在 6.X 版本中,一个 index 下只能存在一个 type
在 7.X 版本中,默认可以支持 type ,但可以禁用
在 8.X 版本中,直接就删除 type,即 index 不再支持 type
3 什么是 Document(文档)?
文档是 Lucene 索引和搜索的 原子单位,它是包含了一个或多个域的容器,基于 Json 格式进行表示。文档有一个或多个域组成,每个域拥有一个名字及一个或多个值,有多个值的域通常被称为 多值域,每个文档可以存储不同的域集,但同一类型下的文档至应该有某种程度上的相似之处。相当于 mysql 表中的 row 。
4 什么是 Field (字段)?
Field 是相当于数据库中的 Column
5 什么是 Mapping(映射)?
Mapping 是定义文档及其包含的字段如何存储和索引的过程。Mapping 是 ES 中的一个很重要的内容,它类似于传统关系型数据中 table 的 schema,用于定义一个索引(index)的某个类型(type)的数据结构。
1 什么是 Shard (分片)?
一个 索引 可以存储超出单个结点硬件限制的大量数据。比如,一个具有 10亿文档的索引占据 1TB 的磁盘空间,而任一节点都没有这样大的磁盘空间;或者单个节点处理搜索请求,响应太慢。
为了解决这个问题,Elasticsearch 提供了将索引划分成多份的能力,这些份就叫做 分片。当你创建一个索引的时候,你可以指定你想要的 分片的数量。每个分片本身也是一个功能完善并且独立的 索引,这个 索引 可以被放置到集群中的任何节点上。
分片之所以重要,主要有两方面的原因:
允许你水平分割/扩展你的内容容量
允许你在分片(潜在地,位于多个节点上)之上进行分布式的、并行的操作,进而提高性能/吞吐量
一个分片怎样分布,它的文档怎样聚合回搜索请求,是完全由 Elasticsearch 管理的,对于作为用户的你来说,这些都是透明的
2 什么是 Replica (副本)?
副本是一个分片的精确复制,每个分片可以有零个或多个副本。副本的作用:
提高系统的容错性,当某个节点某个分片损坏或丢失时,可以从副本中恢复。
提高 ES 查询效率,ES 会自动对搜索请求进行负载均衡。
1 写数据
segment file(磁盘文件)
存储倒排索引的文件,每个segment本质上就是一个倒排索引,每秒都会生成一个segment文件,当文件过多时es会自动进行segment merge(合并文件),合并时会同时将已经标注删除的文档物理删除
commit point(磁盘文件)
记录当前所有可用的segment,每个commit point都会维护一个.del文件,即每个.del文件都有一个commit point文件(es删除数据本质是不属于物理删除),当es做删改操作时首先会在.del文件中声明某个document已经被删除,文件内记录了在某个segment内某个文档已经被删除,当查询请求过来时在segment中被删除的文件是能够查出来的,但是当返回结果时会根据commit point维护的那个.del文件把已经删除的文档过滤掉
translog日志文件(磁盘文件)
为了防止elasticsearch宕机造成数据丢失保证可靠存储,es会将每次写入数据同时写到translog日志中
os cache
操作系统里面,磁盘文件其实都有一个东西,叫做os cache,操作系统缓存,就是说数据写入磁盘文件之前,会先进入os cache,先进入操作系统级别的一个内存缓存中去
写数据的流程
客户端通过路由节点到达指定的shard分片,也就是Lucene的index
数据首先写被入es的jvm内存中,此时数据对外不可见.
默认每隔1s将jvm的数据写入到底层Lucene的os cache中,一旦写入到os cache中数据即可查询出来.所以es可以做到近实时的搜索.
默认每隔30分钟将数据从os cache中flush到磁盘上,成为一个个的segment file
为了防止如果节点宕机数据丢失,会写translog,也是先写到os cache中的,默认每隔5s中生成translog日志持久化到磁盘上,用于节点重启的时候进行数据恢复使用(可以理解成mysql中的binlog)
translog文件是会不断变大的,当到达一定值的时候就会触发commit操作,生成commit point磁盘文件,将os cache数据强制刷入磁盘,再清空translog文件
主分片写入成功后会同步到其他的副本分片去.
当主分片和副本分片节点都写完后,协调节点会返回给客户端数据写入成功的响应
es会定期进行merge操作,进行segment file的合并,当进行segment 合并的时候会将标记删除的文件进行物理删除操作.
读数据
客户端发送请求到一个 coordinate node(协调节点),事随意选择的 。
协调节点将搜索请求转发到所有的 shard 对应的 primary shard(主分片) 或 replica shard(副本分片) ,都可以。
query phase:每个 shard 将自己的搜索结果(其实就是一些 doc id )返回给协调节点,由协调节点进行数据的合并、排序、分页等操作,产出最终结果。
fetch phase:接着由协调节点根据 doc id 去各个节点上拉取实际的 document 数据,最终返回给客户端。
注意点
写请求是写入 primary shard,然后同步给所有的 replica shard;读请求可以从 primary shard 或 replica shard 读取,采用的是随机轮询算法。
关系型数据库 | Elasticsearch |
---|---|
数据库 | 索引 Index,支持全文检索 |
表 Table | 类型 Type(废弃) |
数据行 Row | 文档 Document,但不需要固定结构,不同文档可以具有不同字段集合 |
数据列 Column | 字段 Field |
IP | 主机名 | 角色 | 组件 |
---|---|---|---|
11.0.1.133 | es01 | es集群 | elasticsearch |
11.0.1.136 | es02 | es集群 | elasticsearch |
11.0.1.137 | es03 | es集群 | elasticsearch |
1)添加hosts解析 (所有机器) cat >> /etc/hosts <<EOF 11.0.1.133 es01 11.0.1.134 es02 11.0.1.139 es03 EOF 2)关闭防火墙 (所有机器) systemctl stop firewalld systemctl disable firewalld 3)时钟同步 (所有机器) 略 4)关闭selinux (所有机器) sed -i 's#SELINUX=enforcing#SELINUX=disabled#g' /etc/selinux/config setenforce 0 getenforce
1)配置免密登录(测试可配置,生产严谨) # es1节点上生成密钥对 [root@es1 ~]# ssh-keygen -t rsa -P '' -f ~/.ssh/id_rsa -q # es1配置所有集群节点的免密登录 [root@es1 ~]# for i in 11.0.1.134 11.0.1.139;do ssh-copy-id $i;done #测试 [root@es1 ~]# ssh 11.0.1.134 2) 配置同步(测试可配置,生产严谨) [root@es1 ~]# yum -y install rsync [root@es1 ~]# cat /usr/local/sbin/data_rsync.sh #!/bin/bash if [ $# -ne 1 ];then echo "Usage: $0 /path/to/file(绝对路径)" exit fi #判断文件是否存在 if [ ! -e $1 ];then echo " {$1} dir or file not find!" exit fi #获取父路径 fullpath=`dirname $1` #获取子路径 basename=`basename $1` #进入父路径 cd $fullpath for IP in 11.0.1.134 11.0.1.139 do # 使得终端输出变为绿色 tput setaf 2 echo ===== rsyncing $IP: $basename ===== # 使得终端恢复原来的颜色 tput setaf 7 #将数据同步到其他两个节点 rsync -az $basename `whoami`@$IP:$fullpath if [ $? -eq 0 ];then echo "命令执行成功" fi done # 添加执行权限 [root@es1 ~]# chmod +x /usr/local/sbin/data_rsync.sh
es三台节点需要操作
1)修改资源限制配置 cat >>/etc/security/limits.conf <<EOF * soft nofile 65536 * hard nofile 65536 * soft nproc 65536 * hard nproc 65536 * soft stack 10240 EOF 2)增加最大内存映射数 # 内核参数 vm.max_map_count 用于限制一个进程可以拥有的VMA(虚拟内存区域)的数量 echo "vm.max_map_count = 262144" >> /etc/sysctl.conf # 设置系统最大打开的文件描述符数 echo "fs.file-max = 1000000" >> /etc/sysctl.conf 3)加载生效 sysctl -p 4)可以查看默认值 sysctl -a |grep vm.max_map_count
【1】集群相关操作
################################ ################ ########################################### ################################ 三个节点都需要执行 ########################################### 1)下载地址,我这里下载的版本为8.12.0 https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-8.12.0-linux-x86_64.tar.gz 2)二进制包 [root@es03 src]# ll /home/weihu/src/ 总用量 593M -rw-r--r-- 1 root root 593M 1月 17 22:03 elasticsearch-8.12.0-linux-x86_64.tar.gz 3)解压 [root@es01 src]# tar -xf elasticsearch-8.12.0-linux-x86_64.tar.gz -C /usr/local/ [root@es01 src]# cd /usr/local/ [root@es01 local]# ls bin elasticsearch-8.12.0 etc games include lib lib64 libexec sbin share src [root@es01 local]# mv elasticsearch-8.12.0 elasticsearch 4)创建es数据目录 [root@es01 local]# mkdir -p /data/elasticsearch/data 5)创建用户 # 因为安全问题es不让用root用户直接运行,所以要创建普通用户启动 # 我这里已经有普通用户了 [root@es01 local]# id weihu 用户id=1000(weihu) 组id=1000(weihu) 组=1000(weihu) # 如果没有创建一个即可 useradd elasticsearch 6)修改文件属性,一定要修改 [root@es01 local]# chown -R weihu.weihu /usr/local/elasticsearch/ [root@es01 local]# chown -R weihu.weihu /data/elasticsearch/ 7)切换普通账户 [root@es01 ~]# su - weihu 8)查看elasticsearch目录 [weihu@es01 ~]$ ls /usr/local/elasticsearch/ bin config jdk lib LICENSE.txt logs modules NOTICE.txt plugins README.asciidoc ################################ ################ ########################################### 7)配置文件修改 #### 配置文件详解 #ELK集群名称,单节点无需配置,同一个集群内每个节点的此项必须相同,新加集群的节点此项和其它节点相同即可加入集群,而无需再验证 cluster.name: my-application #当前节点在集群内的节点名称,同一集群中每个节点要确保此名称唯一 node.name: node-1 #ES 数据保存目录,包安装默认路径:/var/lib/elasticsearch/ path.data: /path/to/data #ES 日志保存目录,包安装默认路径:/var/llog/elasticsearch/ path.logs: /path/to/logs #服务启动的时候立即分配(锁定)足够的内存,防止数据写入swap,提高启动速度,但是true会导致启动失败,需要优化 bootstrap.memory_lock: true #指定该节点用于集群的监听IP,默认监听在127.0.0.1:9300,集群模式必须修改此行,单机默认即可 network.host: 0.0.0.0 #监听端口 http.port: 9200 #发现集群的node节点列表,可以添加部分或全部节点IP #在新增节点到已有集群时,此处需指定至少一个已经在集群中的节点地址 discovery.seed_hosts: ["host1", "host2"] #集群初始化时指定希望哪些节点可以被选举为 master,只在初始化时使用,新加节点到已有集群时此项可不配置 cluster.initial_master_nodes: ["node-1", "node-2"] #一个集群中的 N 个节点启动后,才允许进行数据恢复处理,默认是1,一般设为为所有节点的一半以上,防止出现脑裂现象 #当集群无法启动时,可以将之修改为1,或者将下面行注释掉,实现快速恢复启动 gateway.recover_after_nodes: 2 #设置是否可以通过正则表达式或者_all匹配索引库进行删除或者关闭索引库,默认true表示必须需要明确指定索引库名称,不能使用正则表达式和_all,生产环境建议设置为 true,防止误删索引库。 action.destructive_requires_name: true #如果不参与主节点选举设为false,默认值为true node.master: false #存储数据,默认值为true,此值为false则不存储数据而成为一个路由节点 #如果将原有的true改为false,需要先执行/usr/share/elasticsearch/bin/elasticsearch-node repurpose 清理数据 node.data: true #7.x以后版本下面指令已废弃,在2.x 5.x 6.x 版本中用于配置节点发现列表 discovery.zen.ping.unicast.hosts: ["10.0.0.101", "10.0.0.102","10.0.0.103"] #8.X版后默认即开启Xpack功能,可以修改为false禁用 xpack. security. enabled : true #开启跨域访问支持,默认为false http.cors.enabled: true # 跨域访问允许的域名地址,(允许所有域名)以上使用正则 http.cors.allow-origin: "*" #### 生产要先备份 [weihu@es01 ~]$ cd /usr/local/elasticsearch/config/ [weihu@es01 config]$ cp elasticsearch.yml elasticsearch.yml20240621 #### 修改后的配置文件 (节点之间只需要修改 node.name 即可) # es-01 [weihu@es01 config]$ egrep -v "^#|^$" /usr/local/elasticsearch/config/elasticsearch.yml cluster.name: cs-cluster node.name: node-01 path.data: /data/elasticsearch/data path.logs: /usr/local/elasticsearch/logs/ bootstrap.memory_lock: false network.host: 0.0.0.0 http.port: 9200 transport.port: 9300 discovery.seed_hosts: ["11.0.1.131","11.0.1.136","11.0.1.137"] cluster.initial_master_nodes: ["11.0.1.131","11.0.1.136","11.0.1.137"] http.cors.enabled: true http.cors.allow-origin: "*" http.cors.allow-methods: OPTIONS,HEAD,GET,POST,PUT,DELETE http.cors.allow-headers: "X-Requested-With,Content-Type,Content-Length,X-User" xpack.security.enabled: true xpack.security.transport.ssl.enabled: true xpack.security.transport.ssl.verification_mode: certificate xpack.security.transport.ssl.keystore.path: certs/elastic-certificates.p12 xpack.security.transport.ssl.truststore.path: certs/elastic-certificates.p12 #es-02 [weihu@es02 local]$ egrep -v "^#|^$" /usr/local/elasticsearch/config/elasticsearch.yml cluster.name: cs-cluster node.name: node-02 path.data: /data/elasticsearch/data path.logs: /usr/local/elasticsearch/logs/ bootstrap.memory_lock: false network.host: 0.0.0.0 http.port: 9200 transport.port: 9300 discovery.seed_hosts: ["11.0.1.131","11.0.1.136","11.0.1.137"] cluster.initial_master_nodes: ["11.0.1.131","11.0.1.136","11.0.1.137"] http.cors.enabled: true http.cors.allow-origin: "*" http.cors.allow-methods: OPTIONS,HEAD,GET,POST,PUT,DELETE http.cors.allow-headers: "X-Requested-With,Content-Type,Content-Length,X-User" xpack.security.enabled: true xpack.security.transport.ssl.enabled: true xpack.security.transport.ssl.verification_mode: certificate xpack.security.transport.ssl.keystore.path: certs/elastic-certificates.p12 xpack.security.transport.ssl.truststore.path: certs/elastic-certificates.p12 #es-03 [weihu@es03 elasticsearch]$ egrep -v "^#|^$" /usr/local/elasticsearch/config/elasticsearch.yml cluster.name: cs-cluster node.name: node-03 path.data: /data/elasticsearch/data path.logs: /usr/local/elasticsearch/logs/ bootstrap.memory_lock: false network.host: 0.0.0.0 http.port: 9200 transport.port: 9300 discovery.seed_hosts: ["11.0.1.131","11.0.1.136","11.0.1.137"] cluster.initial_master_nodes: ["11.0.1.131","11.0.1.136","11.0.1.137"] http.cors.enabled: true http.cors.allow-origin: "*" http.cors.allow-methods: OPTIONS,HEAD,GET,POST,PUT,DELETE http.cors.allow-headers: "X-Requested-With,Content-Type,Content-Length,X-User" xpack.security.enabled: true xpack.security.transport.ssl.enabled: true xpack.security.transport.ssl.verification_mode: certificate xpack.security.transport.ssl.keystore.path: certs/elastic-certificates.p12 xpack.security.transport.ssl.truststore.path: certs/elastic-certificates.p12
【2】ELK优化配置
1)开启 bootstrap.memory_lock 优化 开启 bootstrap.memory_lock: true 可以优化性能,但会导致无法启动的错误解决方法 注意:开启 bootstrap.memory_lock: true 需要足够的内存,建议4G以上,否则内存不足,启动会很慢 #方法1:直接修改elasticsearch.service [root@node1 ~]#vim /lib/systemd/system/elasticsearch.service [Service] #加下面一行 LimitMEMLOCK=infinity #方法2:新建文件 [root@node1 ~]#systemctl edit elasticsearch ### Anything between here and the comment below will become the new contents of the file #加下面两行,注意加在中间位置 [Service] LimitMEMLOCK=infinity ### Lines below this comment will be discarded [root@node1 ~]#cat /etc/systemd/system/elasticsearch.service.d/override.conf [Service] LimitMEMLOCK=infinity [root@node1 ~]#systemctl daemon-reload [root@node1 ~]#systemctl restart elasticsearch.service 2)内存优化 内存优化建议:为了保证性能,每个ES节点的JVM内存设置具体要根据 node 要存储的数据量来估算,建议符合下面约定 * 在内存和数据量有一个建议的比例:对于一般日志类文件,1G 内存能存储48G~96GB数据 * JVM 堆内存最大不要超过30GB * 单个分片控制在30-50GB,太大查询会比较慢,索引恢复和更新时间越长;分片太小,会导致索引碎片化越严重,性能也会下降 #默认为4G,根据业务需求自己定义,但是要符合jvm堆内存的原则 [root@elk ~]# sed -i 's/## -Xms4g/-Xms4g/' /usr/local/elasticsearch/config/jvm.options [root@elk ~]# sed -i 's/## -Xmx4g/-Xmx4g/' /usr/local/elasticsearch/config/jvm.options
【3】配置证书,ELK8默认是开启安全认证的
# 创建证书存放目录 [weihu@es01 local]$ mkdir elasticsearch/config/certs [weihu@es02 local]$ mkdir elasticsearch/config/certs [weihu@es03 local]$ mkdir elasticsearch/config/certs # 生成证书 # 为Elasticsearch集群创建一个证书颁发机构 [weihu@es01 ~]$ cd /usr/local/elasticsearch/ [weihu@es01 elasticsearch]$ bin/elasticsearch-certutil ca This tool assists you in the generation of X.509 certificates and certificate signing requests for use with SSL/TLS in the Elastic stack. The 'ca' mode generates a new 'certificate authority' This will create a new X.509 certificate and private key that can be used to sign certificate when running in 'cert' mode. Use the 'ca-dn' option if you wish to configure the 'distinguished name' of the certificate authority By default the 'ca' mode produces a single PKCS#12 output file which holds: * The CA certificate * The CA's private key If you elect to generate PEM format certificates (the -pem option), then the output will be a zip file containing individual files for the CA certificate and private key Please enter the desired output file [elastic-stack-ca.p12]: #按回车键即可 Enter password for elastic-stack-ca.p12 : #按回车键即可 [weihu@es01 elasticsearch]$ # 证书生成完以后在/usr/local/elasticsearch目录下会有一个 elastic-stack-ca.p12 证书文件 [weihu@es01 elasticsearch]$ ll elastic-stack-ca.p12 -rw------- 1 weihu weihu 2.7K 6月 23 11:23 elastic-stack-ca.p12 # 为集群中的每个节点生成证书和私钥 # 可以在创建过程中加入密码,我这里就不设置了 [weihu@es01 elasticsearch]$ bin/elasticsearch-certutil cert --ca elastic-stack-ca.p12 This tool assists you in the generation of X.509 certificates and certificate signing requests for use with SSL/TLS in the Elastic stack. The 'cert' mode generates X.509 certificate and private keys. * By default, this generates a single certificate and key for use on a single instance. * The '-multiple' option will prompt you to enter details for multiple instances and will generate a certificate and key for each one * The '-in' option allows for the certificate generation to be automated by describing the details of each instance in a YAML file * An instance is any piece of the Elastic Stack that requires an SSL certificate. Depending on your configuration, Elasticsearch, Logstash, Kibana, and Beats may all require a certificate and private key. * The minimum required value for each instance is a name. This can simply be the hostname, which will be used as the Common Name of the certificate. A full distinguished name may also be used. * A filename value may be required for each instance. This is necessary when the name would result in an invalid file or directory name. The name provided here is used as the directory name (within the zip) and the prefix for the key and certificate files. The filename is required if you are prompted and the name is not displayed in the prompt. * IP addresses and DNS names are optional. Multiple values can be specified as a comma separated string. If no IP addresses or DNS names are provided, you may disable hostname verification in your SSL configuration. * All certificates generated by this tool will be signed by a certificate authority (CA) unless the --self-signed command line option is specified. The tool can automatically generate a new CA for you, or you can provide your own with the --ca or --ca-cert command line options. By default the 'cert' mode produces a single PKCS#12 output file which holds: * The instance certificate * The private key for the instance certificate * The CA certificate If you specify any of the following options: * -pem (PEM formatted output) * -multiple (generate multiple certificates) * -in (generate certificates from an input file) then the output will be be a zip file containing individual certificate/key files Enter password for CA (elastic-stack-ca.p12) : #按回车键即可 Please enter the desired output file [elastic-certificates.p12]: #按回车键即可 Enter password for elastic-certificates.p12 : #按回车键即可 Certificates written to /usr/local/elasticsearch/elastic-certificates.p12 This file should be properly secured as it contains the private key for your instance. This file is a self contained file and can be copied and used 'as is' For each Elastic product that you wish to configure, you should copy this '.p12' file to the relevant configuration directory and then follow the SSL configuration instructions in the product guide. For client applications, you may only need to copy the CA certificate and configure the client to trust this certificate. # 签发好节点证书 同样在 /usr/local/elasticsearch目录下会有一个 elastic-certificates.p12 证书文件 [weihu@es01 elasticsearch]$ ll elastic-certificates.p12 -rw------- 1 weihu weihu 3.6K 6月 23 11:29 elastic-certificates.p12 # 将证书拷贝到elasticsearch的每个节点下面config/certs目录下 [weihu@es01 elasticsearch]$ mv elastic-certificates.p12 elastic-stack-ca.p12 config/certs/ [weihu@es01 elasticsearch]$ scp config/certs/elastic-* weihu@11.0.1.136:/usr/local/elasticsearch/config/certs/ [weihu@es01 elasticsearch]$ scp config/certs/elastic-* weihu@11.0.1.137:/usr/local/elasticsearch/config/certs/ # 如果在创建证书的过程中加了密码,需要将你的密码加入到你的Elasticsearch keystore中去。每个节点都需要 bin/elasticsearch-keystore add xpack.security.transport.ssl.keystore.secure_password bin/elasticsearch-keystore add xpack.security.transport.ssl.truststore.secure_password # 先启动es集群,否则创建密码会失败 [weihu@es01 elasticsearch]$ bin/elasticsearch -d [weihu@es02 elasticsearch]$ bin/elasticsearch -d [weihu@es03 elasticsearch]$ bin/elasticsearch -d # 给认证的集群创建用户密码所有机器 # 按照命令提示输入所有用户的初始化密码,全部输入password就OK,我这里全部是redhat [weihu@es01 elasticsearch]$ bin/elasticsearch-setup-passwords interactive ****************************************************************************** Note: The 'elasticsearch-setup-passwords' tool has been deprecated. This command will be removed in a future release. ****************************************************************************** Initiating the setup of passwords for reserved users elastic,apm_system,kibana,kibana_system,logstash_system,beats_system,remote_monitoring_user. You will be prompted to enter passwords as the process progresses. Please confirm that you would like to continue [y/N]y Enter password for [elastic]: Reenter password for [elastic]: Enter password for [apm_system]: Reenter password for [apm_system]: Enter password for [kibana_system]: Reenter password for [kibana_system]: Enter password for [logstash_system]: Reenter password for [logstash_system]: Enter password for [beats_system]: Reenter password for [beats_system]: Enter password for [remote_monitoring_user]: Reenter password for [remote_monitoring_user]: Changed password for user [apm_system] Changed password for user [kibana_system] Changed password for user [kibana] Changed password for user [logstash_system] Changed password for user [beats_system] Changed password for user [remote_monitoring_user] Changed password for user [elastic] #elastic 账号:拥有 superuser 角色,是内置的超级用户。 #kibana 账号:拥有 kibana_system 角色,用户 kibana 用来连接 elasticsearch 并与之通信。Kibana 服务器以该用户身份提交请求以访问集群监视 API 和 .kibana 索引。不能访问 index。 #logstash_system 账号:拥有 logstash_system 角色。用户 Logstash 在 Elasticsearch 中存储监控信息时使用。 #beats_system账号:拥有 beats_system 角色。用户 Beats 在 Elasticsearch 中存储监控信息时使用。 #elastic是超级用户,它可以做任何事情
【4】验证集群
# 集群设置了安全认证,如果直接访问,会报一下错误 [weihu@es01 elasticsearch]$ curl http://11.0.1.131:9200 {"error":{"root_cause":[{"type":"security_exception","reason":"missing authentication credentials for REST request [/]","header":{"WWW-Authenticate":["Basic realm=\"security\" charset=\"UTF-8\"","ApiKey"]}}],"type":"security_exception","reason":"missing authentication credentials for REST request [/]","header":{"WWW-Authenticate":["Basic realm=\"security\" charset=\"UTF-8\"","ApiKey"]}},"status":401}[weihu@es01 elasticsearch]$ # 进行安全访问 [weihu@es01 elasticsearch]$ curl http://elastic:redhat@11.0.1.131:9200 { "name" : "node-01", "cluster_name" : "cs-cluster", "cluster_uuid" : "H_MMkElRT72JvGKpFFWaBg", "version" : { "number" : "8.12.0", "build_flavor" : "default", "build_type" : "tar", "build_hash" : "1665f706fd9354802c02146c1e6b5c0fbcddfbc9", "build_date" : "2024-01-11T10:05:27.953830042Z", "build_snapshot" : false, "lucene_version" : "9.9.1", "minimum_wire_compatibility_version" : "7.17.0", "minimum_index_compatibility_version" : "7.0.0" }, "tagline" : "You Know, for Search" } # 查看集群状态 [weihu@es01 elasticsearch]$ curl http://elastic:redhat@11.0.1.131:9200/_cat/health 1719116523 04:22:03 cs-cluster green 3 3 2 1 0 0 0 0 - 100.0% # 查看所有节点信息 [weihu@es01 elasticsearch]$ curl http://elastic:redhat@11.0.1.131:9200/_cat/nodes?v ip heap.percent ram.percent cpu load_1m load_5m load_15m node.role master name 11.0.1.131 64 88 0 0.04 0.01 0.00 cdfhilmrstw * node-01 11.0.1.137 69 88 0 0.01 0.02 0.00 cdfhilmrstw - node-03 11.0.1.136 64 75 0 0.02 0.02 0.00 cdfhilmrstw - node-02 #查看集群健康性,获取到的是一个json格式的返回值,那就可以通过python等工具对其中的信息进行分析 # 注意:status 字段为green才是正常状态 [weihu@es01 elasticsearch]$ curl http://elastic:redhat@11.0.1.131:9200/_cluster/health?pretty=true { "cluster_name" : "cs-cluster", "status" : "green", "timed_out" : false, "number_of_nodes" : 3, "number_of_data_nodes" : 3, "active_primary_shards" : 1, "active_shards" : 2, "relocating_shards" : 0, "initializing_shards" : 0, "unassigned_shards" : 0, "delayed_unassigned_shards" : 0, "number_of_pending_tasks" : 0, "number_of_in_flight_fetch" : 0, "task_max_waiting_in_queue_millis" : 0, "active_shards_percent_as_number" : 100.0 } # 列出所有的索引 以及每个索引的相关信息 [weihu@es01 elasticsearch]$ curl http://elastic:redhat@11.0.1.131:9200/_cat/indices?v health status index uuid pri rep docs.count docs.deleted store.size pri.store.size dataset.size green open index1 OhGi9QaXT3SnFkviCxRRZQ 1 1 0 0 498b 249b 249b ### 如果用浏览器访问,会直接跳出登录页面,输入账号密码即可
【5】nginx代理
upstream elasticsearch { zone elasticsearch 64K; server 11.0.1.131:9200; server 11.0.1.136:9200; server 11.0.1.137:9200; } server { listen 9200; server_name 11.0.1.137; location / { proxy_pass http://elasticsearch; proxy_redirect off; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; } access_log logs/es_access.log; }
Kibana是一个开源的分析与可视化平台,设计出来用于和Elasticsearch一起使用的。你可以用kibana搜索、查看存放在Elasticsearch中的数据。Kibana与Elasticsearch的交互方式是各种不同的图表、表格、地图等,直观的展示数据,从而达到高级的数据分析与可视化的目的。
可以通过包或者二进制的方式进行安装,可以安装在独立服务器,或者也可以和elasticsearch的主机安装在一起
注意: Kibana的版本要和 Elasticsearch 相同的版本,否则可能会出错
1) 下载资源包 https://artifacts.elastic.co/downloads/kibana/kibana-8.12.0-linux-x86_64.tar.gz 2)上传资源包到服务器 [root@web src]# ll kibana-8.12.0-linux-x86_64.tar.gz -rw-r--r-- 1 root root 298M 6月 23 20:55 kibana-8.12.0-linux-x86_64.tar.gz 3)解压 [root@web src]# tar -xf kibana-8.12.0-linux-x86_64.tar.gz -C /usr/local/ [root@web src]# cd /usr/local/ [root@web local]# mv kibana-8.12.0 kibana 4)修改配置文件 # 最好先备份 [weihu@web ~]$ cd /usr/local/kibana/ [weihu@web kibana]$ cp config/kibana.yml config/kibana.yml20240623 [weihu@web kibana]$ egrep -v "^#|^$" config/kibana.yml server.port: 5601 server.host: "0.0.0.0" server.publicBaseUrl: "http://kibana.wjj.com:5601" elasticsearch.hosts: ["http://11.0.1.131:9200","http://11.0.1.136:9200","http://11.0.1.137:9200"] elasticsearch.username: "kibana" elasticsearch.password: "redhat" i18n.locale: "zh-CN"
配置文件详解
# Kibana服务相关 1 server.host: "localhost",kibana的主机地址。 2 server.port: 5601,kibana的默认监听端口。 3 server.basePath: "",如果您在代理后面运行,则可以指定安装Kibana的路径。使用server.rewriteBasePath设置告诉Kibana是否应从其收到的请求中删除basePath,并防止在启动时发生弃用警告。此设置不能以斜杠/结尾。 4 server.rewriteBasePath: false,默认值为false,指定Kibana是否应重写以server.basePath为前缀的请求,或者要求它们由反向代理重写。在Kibana 6.3之前,此设置实际上始终为false,并且在Kibana 7.0中默认为true。 5 server.maxPayloadBytes: 1048576 传入服务器请求的最大有效负载大小(以字节为单位)。 6 server.name: "your-hostname",kibana的服务名称。 # elasticsearch相关配置 1 elasticsearch.url: "http://localhost:9200",kibana访问es的URL。 2 elasticsearch.preserveHost: true,默认值为true,当此设置的值为true时,Kibana使用server.host设置中指定的主机名。当此设置的值为false时,Kibana使用连接到此Kibana实例的主机的主机名。 4 kibana.index: ".kibana",默认值:.kibana,Kibana使用Elasticsearch中的索引来存储已保存的搜索,可视化和仪表板。如果索引尚不存在,Kibana会创建一个新索引。 6 kibana.defaultAppId: "home", 默认值home,要加载的默认应用程序。 8 elasticsearch.username: "user"和elasticsearch.password: "pass",如果您的Elasticsearch受基本身份验证保护,则这些设置提供Kibana服务器用于在启动时对Kibana索引执行维护的用户名和密码。您的Kibana用户仍需要使用通过Kibana服务器代理的Elasticsearch进行身份验证。 10 server.ssl.enabled: false,从Kibana服务器到浏览器的传出请求启用SSL。设置为true时,需要server.ssl.certificate和server.ssl.key。 11 server.ssl.certificate: /path/to/your/server.crt,PEM格式的SSL证书。 12 server.ssl.key: /path/to/your/server.key,SSL秘钥文件的路径。 14 elasticsearch.ssl.certificate: /path/to/your/client.crt和elasticsearch.ssl.key: /path/to/your/client.key,可选设置,提供PEM格式SSL证书和密钥文件的路径。这些文件用于验证Kibana到Elasticsearch的身份,并且在Elasticsearch中的xpack.ssl.verification_mode设置为certificate或full时是必需的。 15 elasticsearch.ssl.certificateAuthorities: [ "/path/to/your/CA.pem" ],可选设置,使您可以为Elasticsearch实例的证书颁发机构指定PEM文件的路径列表。 16 elasticsearch.ssl.verificationMode: full,完全控制Elasticsearch提供的证书验证。有效值为none,certificate和full。完整执行主机名验证,证书不执行。 17 elasticsearch.pingTimeout: 1500,elasticsearch.requestTimeout设置的值等待Elasticsearch响应ping的时间(以毫秒为单位)。 18 elasticsearch.requestTimeout: 30000,等待后端或Elasticsearch响应的时间(以毫秒为单位)。这个值必须是一个正整数。 19 elasticsearch.requestHeadersWhitelist: [ authorization ],要发送到Elasticsearch的Kibana客户端标头列表。如不发送客户端标头,请将此值设置为空。 21 elasticsearch.customHeaders: {},要发送到Elasticsearch的标题名称和值。 无论elasticsearch.requestHeadersWhitelist配置如何,客户端标头都不能覆盖任何自定义标头。 22 elasticsearch.shardTimeout: 30000, Elasticsearch等待分片响应的时间(以毫秒为单位)。设置为0以禁用。 23 elasticsearch.startupTimeout: 5000,在Kibana刚启动时等待Elasticsearch的时间(以毫秒为单位),然后重试。 24 elasticsearch.logQueries: false,记录发送到Elasticsearch的查询。 需要将logging.verbose设置为true。 # 日志相关 1 logging.verbose: false,将此设置的值设置为true,以记录所有事件,包括系统使用信息 和所有请求。 2 logging.dest: stdout,允许指定Kibana存储日志输出的文件。 3 logging.silent: false, 将此设置的值设置为true,以禁用所有日志输出。 4 logging.quiet: false,将此设置的值设置为true,以抑制除错误消息之外的所有日志输出。 # 其它配置 1 ops.interval: 5000,将间隔(以毫秒为单位)设置为采样系统和处理性能 指标。最低是100 ms。默认为5000。 2 i18n.locale: "en",配置多语言版本配置,目前貌似汉语支持还不太行。 3 pid.file: /var/run/kibana.pid,指定Kibana创建进程标识文件的路径。
1)Filebeat 是什么?
Filebeat 是用于转发和集中日志数据的轻量级传送程序。作为服务器上的代理安装,Filebeat监视您指定的日志文件或位置,收集日志事件,并将它们转发到Elasticsearch或Logstash进行索引。
2) Filebeat 工作原理
启动Filebeat时,它将启动一个或多个输入源,这些输入将在为日志数据指定的位置中查找。对于Filebeat所找到的每个日志,Filebeat都会启动收集器harvester进程。每个收集器harvester都读取一个日志以获取新内容,并将新日志数据发送到libbeatlibbeat会汇总事件并将汇总的数据发送到为Filebeat配置的输出。
Filebeat可以保持每个文件的状态,并且频繁地把文件状态从注册表里更新到磁盘。这里所说的文件状态是用来记录上一次Harvster读取文件时读取到的位置,以保证能把全部的日志数据都读取出来,然后发送给output。如果在某一时刻,作为output的ElasticSearch或者Logstash变成了不可用,Filebeat将会把最后的文件读取位置保存下来,直到output重新可用的时候,快速地恢复文件数据的读取。在Filebaet运行过程中,每个Prospector的状态信息都会保存在内存里。如果Filebeat出行了重启,完成重启之后,会从注册表文件里恢复重启之前的状态信息,让FIlebeat继续从之前已知的位置开始进行数据读取。
3 )Filebeat 和 Logstash区别
logstash是jvm跑的,资源消耗比较大,启动一个logstash就至少需要消耗500M左右的内存(这就是为什么logstash启动特别慢的原因),而filebeat只需要10来M内存资源。常用的ELK日志采集方案中,大部分的做法就是将所有节点的日志内容通过filebeat发送到logstash,logstash根据配置文件进行过滤。然后将过滤之后的文件输送到elasticsearch中,通过kibana去展示。
注意:Filebeat 支持多个输入,但不支持同时有多个输出
1) 下载地址 # 我这里直接用rpm包装了 https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-8.12.0-x86_64.rpm 2)上传至需要收集日志的服务器中 # 生产中需要收集哪台服务器上面的日志就在哪台装即可 # 我这里实验就在一台装了 [root@web src]# ll filebeat-8.12.0-x86_64.rpm -rw-r--r-- 1 root root 48M 6月 24 20:10 filebeat-8.12.0-x86_64.rpm 3)安装 [root@web src]# rpm -ivh filebeat-8.12.0-x86_64.rpm 警告:filebeat-8.12.0-x86_64.rpm: 头V4 RSA/SHA512 Signature, 密钥 ID d88e42b4: NOKEY Verifying... ################################# [100%] 准备中... ################################# [100%] 正在升级/安装... 1:filebeat-8.12.0-1 ################################# [100%] [root@web src]# rpm -ql filebeat |head /etc/filebeat/fields.yml /etc/filebeat/filebeat.reference.yml /etc/filebeat/filebeat.yml /etc/filebeat/modules.d/activemq.yml.disabled /etc/filebeat/modules.d/apache.yml.disabled /etc/filebeat/modules.d/auditd.yml.disabled /etc/filebeat/modules.d/aws.yml.disabled /etc/filebeat/modules.d/awsfargate.yml.disabled /etc/filebeat/modules.d/azure.yml.disabled /etc/filebeat/modules.d/barracuda.yml.disabled
1) 临时测试filebeat # 测试时要关闭filebeat服务 # systemctl stop filebeat.service [root@web filebeat]# vim stdin.yml filebeat.inputs: - type: log enabled: true json.keys_under_root: true #默认False会将json数据存储至message,true则会将数据以独立字段存储,并且删除message字段,如果是文>本还是放在message字段中 paths: - /usr/local/nginx/logs/access.log #--------------- 打印到屏幕上 --------------- #output.console: # pretty: true # enable: true #-------------- 打印到指定文件中 -------------- output.file: path: "/tmp" filename: "test.log" # 检测配置文件格式是否正确 [root@web filebeat]# filebeat test config -c stdin.yml Config OK # 执行 [root@web filebeat]# filebeat -e -c stdin.yml # 查看/tmp目录下是否有生产的文件 [root@web filebeat]# ll /tmp/ -rw------- 1 root root 1.7K 6月 25 19:34 test.log-20240625.ndjson [root@web filebeat]# cat /tmp/test.log-20240625.ndjson |jq { "@timestamp": "2024-06-25T11:34:24.050Z", "@metadata": { "beat": "filebeat", "type": "_doc", "version": "8.12.0" }, "log": { "file": { "path": "/usr/local/nginx/logs/access.log" }, "offset": 31072 }, "json": {}, "message": "11.0.1.1 - - [25/Jun/2024:19:34:18 +0800] \"GET / HTTP/1.1\" 304 0 \"-\" \"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36\"", "input": { "type": "log" }, "ecs": { "version": "8.0.0" }, "host": { "name": "web" }, "agent": { "version": "8.12.0", "ephemeral_id": "481a87e3-0b15-4979-8aef-6407ea1d3ca0", "id": "8fa65b8e-a6fe-47e8-9377-319c48a3b1ec", "name": "web", "type": "filebeat" } } # 测试还有很多,这里就不做介绍了
1) 自定义索引名称收集Nginx日志到 ELasticsearch # 修改nginx日志格式 log_format access_json '{"@timestamp":"$time_iso8601",' '"host":"$server_addr",' '"clientip":"$remote_addr",' '"size":$body_bytes_sent,' '"responsetime":$request_time,' '"upstreamtime":"$upstream_response_time",' '"upstreamhost":"$upstream_addr",' '"http_host":"$host",' '"uri":"$uri",' '"domain":"$host",' '"xff":"$http_x_forwarded_for",' '"referer":"$http_referer",' '"tcp_xff":"$proxy_protocol_addr",' '"http_user_agent":"$http_user_agent",' '"status":"$status"}'; # 修改filebeat配置文件 [root@web filebeat]# vim /etc/filebeat/filebeat.yml filebeat.inputs: - type: log enabled: true paths: - /var/log/nginx/access_json.log json.keys_under_root: true #默认false,只识别为普通文本,会将全部日志数据存储至message字段,改为true则会以Json格式存储 json.overwrite_keys: true #设为true,使用json格式日志中自定义的key替代默认的message字段,此项可选 tags: ["nginx-access"] #指定tag,用于分类 - type: log enabled: true paths: - /var/log/nginx/error.log tags: ["nginx-error"] output.elasticsearch: hosts: ["11.0.1.131:9200","11.0.1.136:9200","11.0.1.137:9200"] indices: - index: "nginx-access-%{[agent.version]}-%{+yyy.MM.dd}" when.contains: tags: "nginx-access" #如果记志中有access的tag,就记录到nginx-access的索引中 - index: "nginx-error-%{[agent.version]}-%{+yyy.MM.dd}" when.contains: tags: "nginx-error" #如果记志中有error的tag,就记录到nginx-error的索引中 setup.ilm.enabled: false #关闭索引生命周期ilm功能,默认开启时索引名称只能为filebeat-* setup.template.name: "nginx" #定义模板名称,要自定义索引名称,必须指定此项,否则无法启动 setup.template.pattern: "nginx-*" #定义模板的匹配索引名称,要自定义索引名称,必须指定此项,否则无法启动 # 改完配置重启 [root@web filebeat]# systemctl restart filebeat.service 2)filebeat送日志到redis [root@web filebeat]# cat /etc/filebeat/filebeat.yml filebeat.inputs: - type: log enabled: true paths: - /var/log/nginx/access_json.log json.keys_under_root: true #默认False会将json数据存储至message,改为true则会独立message外存储 json.overwrite_keys: true #设为true,覆盖默认的message字段,使用自定义json格式中的key tags: ["nginx-access"] - type: log enabled: true paths: - /var/log/nginx/error.log tags: ["nginx-error"] output.redis: hosts: ["10.0.0.104:6379"] key: "filebeat" #password: "123456" #db: 0 # 改完配置重启 [root@web filebeat]# systemctl restart filebeat.service 3)从标准输入读取再输出至 Kafka [root@elk-web1 ~]#vim /etc/filebeat/filebeat.yml filebeat.inputs: - type: stdin enabled: true output.kafka: hosts: ["10.0.0.201:9092", "10.0.0.202:9092", "10.0.0.203:9092"] topic: filebeat-log #指定kafka的topic partition.round_robin: reachable_only: true #true表示只发布到可用的分区,false时表示所有分区,如果一个节点down,会block required_acks: 1 #如果为0,错误消息可能会丢失,1等待写入主分区(默认),-1等待写入副本分区 compression: gzip max_message_bytes: 1000000 #每条消息最大长度,以字节为单位,如果超过将丢弃 5)收集java程序日志,tomcat举例 #filebeat是按行读取的,一行形成一个文档,如果碰到java程序报错按行读取就不太合理,就需要filebeat收集java程序报错信息时配置合并行。 # 我这里拿tomcat来测试的,如果需要收集其他java程序的错误日志,合并条件根据程序报错信息来定(如下配置) # multiline.type: pattern #*此为默认值,可省略 # multiline.pattern: '^[0-3][0-9]-' #*正则表达式匹配以两位,或者为'^\d{2}' # multiline.negate: true #*negate否定无效 # multiline.match: after # multiline.max_lines: 5000 #*默认只合并500行,指定最大合并5000行 [root@elk-web1 ~]#vim /etc/filebeat/filebeat.yml filebeat.inputs: - type: log enabled: true paths: - /usr/local/tomcat/logs/localhost_access_log.* #- /var/log/tomcat9/localhost_access_log.* #包安装 json.keys_under_root: true json.overwrite_keys: false tags: ["tomcat-access"] - type: log enabled: true paths: - /usr/local/tomcat/logs/catalina.*.log tags: ["tomcat-error"] multiline.type: pattern #*此为默认值,可省略 multiline.pattern: '^[0-3][0-9]-' #*正则表达式匹配以两位,或者为'^\d{2}' multiline.negate: true #*negate否定无效 multiline.match: after multiline.max_lines: 5000 #*默认只合并500行,指定最大合并5000行 output.elasticsearch: hosts: ["10.0.0.201:9092", "10.0.0.202:9092", "10.0.0.203:9092"] indices: - index: "tomcat-access-%{[agent.version]}-%{+yyy.MM.dd}" when.contains: tags: "tomcat-access" - index: "tomcat-error-%{[agent.version]}-%{+yyy.MM.dd}" when.contains: tags: "tomcat-error" setup.ilm.enabled: false setup.template.name: "tomcat" setup.template.pattern: "tomcat-*"
1) logstash是什么?
Logstash 是免费且开放的服务器端数据处理管道,能够从多个来源采集数据,转换数据,然后将数据发送到您最喜欢的一个或多个“存储库”中。
2)logstash目标
收集来自不同来源的日志数据,包括文件、网络流、数据库查询等
对日志数据进行过滤和转换,以提取有用的信息和字段(logstash的强大之处)
将处理后的日志数据发送到目标系统,如Elasticsearch、Kibana、Redis等
支持插件机制,可以轻松地扩展和定制Logstash的功能
3)logstash架构
1)下载地址 https://artifacts.elastic.co/downloads/logstash/logstash-8.12.0-linux-x86_64.tar.gz 2)解压缩 [root@logstash src]# tar -xf logstash-8.12.0-linux-x86_64.tar.gz -C /usr/local/ 3)查看 [root@logstash ~]# cd /usr/local/logstash-8.12.0/ [root@logstash logstash-8.12.0]# ll -tr 总用量 740K -rw-r--r-- 1 root wheel 630K 1月 10 18:26 NOTICE.TXT -rw-r--r-- 1 root wheel 14K 1月 10 18:26 LICENSE.txt -rw-r--r-- 1 root wheel 16 1月 10 18:26 JDK_VERSION -rw-r--r-- 1 root wheel 31K 1月 10 18:26 Gemfile.lock -rw-r--r-- 1 root wheel 4.2K 1月 10 18:26 Gemfile drwxr-xr-x 2 root wheel 4.0K 1月 10 18:26 data -rw-r--r-- 1 root wheel 2.3K 1月 10 18:26 CONTRIBUTORS drwxr-xr-x 2 root root 4.0K 6月 29 17:08 bin drwxr-xr-x 2 root root 4.0K 6月 29 17:08 config drwxr-xr-x 4 root root 4.0K 6月 29 17:08 modules drwxr-xr-x 6 root root 4.0K 6月 29 17:08 lib drwxr-xr-x 3 root root 4.0K 6月 29 17:08 tools drwxr-xr-x 3 root root 4.0K 6月 29 17:08 logstash-core-plugin-api drwxr-xr-x 4 root root 4.0K 6月 29 17:08 logstash-core drwxr-xr-x 4 root root 4.0K 6月 29 17:08 vendor drwxr-xr-x 9 root root 4.0K 6月 29 17:08 x-pack drwxr-xr-x 9 root root 4.0K 6月 29 17:08 jdk 4)配置文件 # 根据实际情况调整 [root@logstash config]# vim /usr/local/logstash-8.12.0/config/logstash.yml [root@logstash config]# grep -Ev '#|^$' /usr/local/logstash-8.12.0/config/logstash.yml node.name: logstash-node01 pipeline.workers: 2 pipeline.batch.size: 1000 #批量从IPNPUT读取的消息个数,可以根据ES的性能做性能优化 pipeline.batch.delay: 5 #处理下一个事件前的最长等待时长,以毫秒ms为单位,可以根据ES的性能做性能优化 path.data: /var/lib/logstash #默认值 path.logs: /var/log/logstash #默认值 5)内存优化 # 根据实际情况调整 [root@logstash config]# vim /usr/local/logstash-8.12.0/configjvm.options -Xms1g -Xmx1g
1)logstash的命令
# 查看帮助
[root@logstash bin]# /usr/local/logstash-8.12.0/bin/logstash --help
# 常用选项
-e 指定配置内容
-f 指定配置文件,支持绝对路径,如果用相对路径,是相对于/usr/share/logstash/的路径
-t 语法检查
-r 修改配置文件后自动加载生效,注意:有时候修改配置还需要重新启动生效
2)logstach基本测试
[root@logstash logstash-8.12.0]# mkdir /usr/local/logstash-8.12.0/test/ [root@logstash logstash-8.12.0]# cd /usr/local/logstash-8.12.0/test/ [root@logstash test]# vim std_std.conf input { stdin{ } } output { stdout{ } } #语法检查 [root@logstash ~]# cd /usr/local/logstash-8.12.0/bin/ [root@logstash bin]# ./logstash -f /usr/local/logstash-8.12.0/test/std_std.conf -t ...... [2024-06-29T18:42:57,046][INFO ][logstash.javapipeline ] Pipeline `main` is configured with `pipeline.ecs_compatibility: v8` setting. All plugins in this pipeline will default to `ecs_compatibility => v8` unless explicitly configured otherwise. Configuration OK [2024-06-29T18:42:57,046][INFO ][logstash.runner ] Using config.test_and_exit mode. Config Validation Result: OK. Exiting Logstash # 启动 [root@logstash bin]# ./logstash -f /usr/local/logstash-8.12.0/test/std_std.conf ..... [2024-06-29T18:43:40,791][INFO ][logstash.javapipeline ][main] Pipeline Java execution initialization time {"seconds"=>0.68} The stdin plugin is now waiting for input: [2024-06-29T18:43:40,835][INFO ][logstash.javapipeline ][main] Pipeline started {"pipeline.id"=>"main"} [2024-06-29T18:43:40,852][INFO ][logstash.agent ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]} aaaaa #输入的信息 { "@version" => "1", "event" => { "original" => "aaaaa" }, "@timestamp" => 2024-06-29T10:44:08.062827917Z, "host" => { "hostname" => "logstash" }, "message" => "aaaaa" } hellord,word #输入的信息 { "@version" => "1", "event" => { "original" => "hellord,word" }, "@timestamp" => 2024-06-29T10:44:39.306328184Z, "host" => { "hostname" => "logstash" }, "message" => "hellord,word" }
【1】标准输入
codec 用于输入数据的编解码器,默认值为plain表示单行字符串,若设置为json,表示按照json方式解析
# 以配置文件实现标准输入
[root@logstash test]# vim /usr/local/logstash-8.12.0/test/stdin_to_stdout.conf
input {
stdin {
type => "stdin_type" #自定义事件类型,可用于后续判断
tags => "stdin_tag" #自定义事件tag,可用于后续判断
codec => "json" #指定Json 格式
}
}
output {
stdout {
codec => "rubydebug" #输出格式,此为默认值,可省略
}
}
【2】从文件输入
Logstash 会记录每个文件的读取位置,下次自动从此位置继续向后读取
[root@logstash test]# vim /usr/local/logstash-8.12.0/test/file_to_stdout.conf input { file { path => "/tmp/wu.*" type => "wulog" # 自定义的type字段 exclude => "*.txt" # 排除.txt文件 start_position => "beginning" # 从头开始读取 stat_interval => "3" # 检查文件更新的时间间隔,单位为秒 codec => json # 如果文件是Json格式,需要指定此项才能解析,如果不是Json格式而添加此行也不会影响结果 } file { path => "/var/log/messages" # 监控的系统日志文件路径 type => "syslog" # 指定数据类型为syslog start_position => "beginning" # 从头开始读取 stat_interval => 3 # 检查文件更新的时间间隔,单位为秒 } } output { stdout { codec => "rubydebug" # 输出的编解码器 } }
【3】从 Http 请求中取数据
[root@logstash test]# vim /usr/local/logstash-8.12.0/test/http_to_stdout.conf
input {
http {
port =>6666
codec => json
}
}
output {
stdout {
codec => rubydebug
}
}
【4】从 Filebeat 读取数据
[root@elk-web1 ~]#vim /etc/filebeat/filebeat.yml filebeat.inputs: - type: log enabled: true #开启日志 paths: - /var/log/nginx/access_json.log #指定收集的日志文件 json.keys_under_root: true tags: ["nginx-access"] filebeat.inputs: - type: log enabled: true #开启日志 paths: - /var/log/* #指定收集的日志文件 output.logstash: hosts: ["11.0.1.139:5044"] #指定Logstash服务器的地址和端口 #Logstash配置 [root@logstash ~]#cat /etc/logstash/conf.d/filebeat_to_stdout.conf input { beats { port => 5044 # 端口号,必须与 Filebeat 配置一致 } } output { stdout { codec => "rubydebug" # 使用 rubydebug 编解码器 } }
【5】从 Redis 中读取数据
#支持由多个 Logstash 从 Redis 读取日志,提高性能
#Logstash 从 Redis 收集完数据后,将删除对应的列表Key
[root@logstash test]# vim /usr/local/logstash-8.12.0/test/redis_to_stdout.conf input { redis { host => 'Redis_IP' port => "6379" password => "123456" db => "0" data_type => 'list' key => "nginx-accesslog" } } output { stdout { codec => rubydebug } }
【6】从 Kafka 中读取数据
[root@logstash test]# vim /usr/local/logstash-8.12.0/test/kakfa_to_stdout.conf input { kafka { bootstrap_servers => "10.0.0.201:9092,10.0.0.202:9092,10.0.0.203:9092" # Kafka 集群的地址 group_id => "logstash" # Kafka 消费者组的ID topics => ["nginx-accesslog", "nginx-errorlog"] # 订阅的Kafka主题列表 codec => "json" # 指定编解码器 consumer_threads => 8 # Kafka消费者线程数 } } output { stdout { codec => "rubydebug" # 使用rubydebug编解码器输出到标准输出 } }
【1】Stdout 插件
stdout 插件将数据输出到屏幕终端,主要用于调试
output {
stdout {
codec => rubydebug
}
}
【2】File 插件
输出到文件,可以将分散在多个文件的数据统一存放到一个文件
output {
stdout {
codec => rubydebug
}
file {
path => "/var/log/test.log"
}
}
【3】Elasticsearch 插件
官方说明
https://www.elastic.co/guide/en/logstash/7.6/plugins-outputs-elasticsearch.html
索引的时间格式说明
https://www.joda.org/joda-time/apidocs/org/joda/time/format/DateTimeFormat.html
# 当日志量较小时,可以按月或周生成索引,当日志量比较大时,会按天生成索引,以方便后续按天删除 output { elasticsearch { hosts =>["10.0.0.101:9200","10.0.0.102:9200","10.0.0.103:9200"] #一般写ES中data节点地址 index => "app-%{+YYYY.MM.dd}" #指定索引名称,建议加时间,按天建立索引 #index => "%{[@metadata][target_index]}" #使用字段[@metadata][target_index]值做为索引名 template_overwrite => true #覆盖索引模板,此项可选,默认值为false } } # 配置文件实现 [root@logstash test]# vim /usr/local/logstash-8.12.0/test/logstash-test.conf input { file { path => "/var/log/syslog" start_position => "beginning" stat_interval => 3 } } output { elasticsearch { hosts => ["10.0.0.101:9200"] index => "syslog-%{+YYYY.MM.dd}" } }
【4】Redis 插件
Logstash 支持将日志转发至 Redis
官方说明
https://www.elastic.co/guide/en/logstash/7.6/plugins-outputs-redis.html
[root@logstash test]# vim /usr/local/logstash-8.12.0/test/file_to_redis.conf input { file { path => "/var/log/nginx/access.log" type => 'nginx-accesslog' start_position => "beginning" stat_interval => "3" codec ==> json } } output { if [type] == 'nginx-accesslog' { redis { host => 'Redis_IP' port => "6379" password => "123456" db => "0" data_type => 'list' key => "nginx-accesslog" } }
【5】 Kafka 插件
Logstash 支持将日志转发至 Kafka
官方链接:
https://www.elastic.co/guide/en/logstash/7.6/plugins-outputs-kafka.html
[root@logstash test]# vim /usr/local/logstash-8.12.0/test/file_to_kafka.conf input { file { path => "/var/log/nginx/access.log" type => 'nginx-accesslog' start_position => "beginning" stat_interval => 3 codec => json # 指定编解码器,如果日志是 JSON 格式 } file { path => "/var/log/nginx/error.log" type => 'nginx-errorlog' start_position => "beginning" stat_interval => 3 # 如果日志不是 JSON 格式,这里不需要 codec 配置 } } filter { # 这里可以添加 filter 来处理日志数据,如果需要 } output { if [type] == 'nginx-accesslog' { kafka { bootstrap_servers => '11.0.1.120:9092, 11.0.1.121:9092, 11.0.1.122:9092' topic_id => 'nginx-accesslog' # 使用 topics 而不是 topic_id codec => json } } if [type] == 'nginx-errorlog' { kafka { bootstrap_servers => 'Kafka_IP1:9092, Kafka_IP2:9092, Kafka_IP3:9092' topic_id => 'nginx-errorlog' # 使用 topics codec => json } } }
数据从源传输到存储库的过程中,Logstash 过滤器能够解析各个事件,识别已命名的字段以构建结构,并将它们转换成通用格式,以便进行更强大的分析和实现商业价值
Logstash 能够动态地转换和解析数据,不受格式或复杂度的影响
常用filter插件总结:
grok:正则匹配任意文本
date:处理时间字段
user_agent:分析客户端设备
geoip:分析客户端IP地理位置
mutate:对字符串文本进行处理,转换,字母大小写,切分,重命名,添加字段,删除字段
if分支语句:
if [type] == "xxx" {
} else if [type] == "xxx" {
} else {
}
【1】Grok 介绍
Grok 是一个过滤器插件,可帮助您描述日志格式的结构。有超过200种 grok模式抽象概念,如IPv6地址,UNIX路径和月份名称。
为了将日志行与格式匹配, 生产环境常需要将非结构化的数据解析成 json 结构化数据格式
[root@logstash test]# cat nginx_grok_stdout.conf input { file { start_position => "beginning" path => "/var/log/nginx/access.log" } } filter{ grok{ match => { "message" => "%{COMMONAPACHELOG}" #官方自带的Apache转换模块,与nginx格式一致 } } } output { stdout {} } # 插入一条访问日志 [root@logstash test]# echo '139.226.81.24 - - [05/Jul/2022:19:48:03 +0000] "GET /wp-content/themes/dux/js/libs/ias.min.js?ver=THEME_VERSION HTTP/1.1" 200 1949 "https://www.aaa.com/" "Mozilla/5.0 (Linux; U; Android 11; zh-CN; Redmi K30 Pro Build/RKQ1.200826.002) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/78.0.3904.108 Quark/5.4.9.201 Mobile Safari/537.36" "-"' >> /var/log/nginx/access.log # 看结果 [root@logstash bin]# ./logstash -f /usr/local/logstash-8.12.0/test/nginx_grok_stdout.conf ...... { "http" => { "response" => { "status_code" => 200, "body" => { "bytes" => 1949 } }, "request" => { "method" => "GET" }, "version" => "1.1" }, "event" => { "original" => "139.226.81.24 - - [05/Jul/2022:19:48:03 +0000] \"GET /wp-content/themes/dux/js/libs/ias.min.js?ver=THEME_VERSION HTTP/1.1\" 200 1949 \"https://www.aaa.com/\" \"Mozilla/5.0 (Linux; U; Android 11; zh-CN; Redmi K30 Pro Build/RKQ1.200826.002) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/78.0.3904.108 Quark/5.4.9.201 Mobile Safari/537.36\" \"-\"" }, "url" => { "original" => "/wp-content/themes/dux/js/libs/ias.min.js?ver=THEME_VERSION" }, "@timestamp" => 2024-06-30T03:02:08.019405702Z, "message" => "139.226.81.24 - - [05/Jul/2022:19:48:03 +0000] \"GET /wp-content/themes/dux/js/libs/ias.min.js?ver=THEME_VERSION HTTP/1.1\" 200 1949 \"https://www.aaa.com/\" \"Mozilla/5.0 (Linux; U; Android 11; zh-CN; Redmi K30 Pro Build/RKQ1.200826.002) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/78.0.3904.108 Quark/5.4.9.201 Mobile Safari/537.36\" \"-\"", "timestamp" => "05/Jul/2022:19:48:03 +0000", "host" => { "name" => "logstash" }, "@version" => "1", "source" => { "address" => "139.226.81.24" }, "log" => { "file" => { "path" => "/var/log/nginx/access.log" } } }
【2】Geoip 插件
geoip 根据 ip 地址提供的对应地域信息,比如:经纬度,国家,城市名等,以方便进行地理数据分析
[root@logstash test]# cat nginx-geoip_stdout.conf input { file { start_position => "beginning" path => "/var/log/nginx/access.log" } } filter{ grok{ match => { "message" => "%{COMMONAPACHELOG}" #官方自带的Apache转换模块,与nginx格式一致 } } geoip { source => "[source][address]" target => "geoip" } } output { stdout {} } # 执行结果 [root@logstash bin]# ./logstash -f /usr/local/logstash-8.12.0/test/nginx-geoip_stdout.conf ...... { "geoip" => { "geo" => { "location" => { "lon" => 121.4581, "lat" => 31.2222 }, "region_name" => "Shanghai", "region_iso_code" => "CN-SH", "city_name" => "Shanghai", "country_iso_code" => "CN", "country_name" => "China", "continent_code" => "AS", "timezone" => "Asia/Shanghai" }, "ip" => "139.226.81.24" }, "timestamp" => "05/Jul/2022:19:48:03 +0000", "event" => { "original" => "139.226.81.24 - - [05/Jul/2022:19:48:03 +0000] \"GET /wp-content/themes/dux/js/libs/ias.min.js?ver=THEME_VERSION HTTP/1.1\" 200 1949 \"https://www.aaa.com/\" \"Mozilla/5.0 (Linux; U; Android 11; zh-CN; Redmi K30 Pro Build/RKQ1.200826.002) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/78.0.3904.108 Quark/5.4.9.201 Mobile Safari/537.36\" \"-\"" }, "url" => { "original" => "/wp-content/themes/dux/js/libs/ias.min.js?ver=THEME_VERSION" }, "http" => { "request" => { "method" => "GET" }, "response" => { "body" => { "bytes" => 1949 }, "status_code" => 200 }, "version" => "1.1" }, "log" => { "file" => { "path" => "/var/log/nginx/access.log" } }, "message" => "139.226.81.24 - - [05/Jul/2022:19:48:03 +0000] \"GET /wp-content/themes/dux/js/libs/ias.min.js?ver=THEME_VERSION HTTP/1.1\" 200 1949 \"https://www.aaa.com/\" \"Mozilla/5.0 (Linux; U; Android 11; zh-CN; Redmi K30 Pro Build/RKQ1.200826.002) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/78.0.3904.108 Quark/5.4.9.201 Mobile Safari/537.36\" \"-\"", "source" => { "address" => "139.226.81.24" }, "@timestamp" => 2024-06-30T03:16:44.384058522Z, "@version" => "1", "host" => { "name" => "logstash" } }
【3】Date 插件
Date插件可以将日志中的指定的日期字符串对应的源字段生成新的目标字段。
然后替换@timestamp 字段(此字段默认为当前写入logstash的时间而非日志本身的时间)或指定的其他字段
match #类型为数组,用于指定需要使用的源字段名和对应的时间格式
target #类型为字符串,用于指定生成的目标字段名,默认是 @timestamp
timezone #类型为字符串,用于指定时区域
官方说明
https://www.elastic.co/guide/en/logstash/current/plugins-filters-date.html
[root@logstash test]# cat nginx-grok_date_stdout.conf input { file { start_position => "beginning" path => "/var/log/nginx/access.log" } } filter{ grok{ match => { "message" => "%{COMMONAPACHELOG}" #官方自带的Apache转换模块,与nginx格式一致 } } date{ match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ] #target => "access_time" #将时间写入新生成的access_time字段,源字段仍保留 target => "@timestamp" #将时间覆盖原有的@timestamp字段 timezone => "Asia/Shanghai" } } output { stdout {} } # 将UNIX时间转换指定格式 date { match => ["timestamp","UNIX","YYYY-MM-dd HH:mm:ss"] target =>"@timestamp" timezone => "Asia/shanghai" }
【4】 Useragent 插件
useragent 插件可以根据请求中的 user-agent 字段,解析出浏览器设备、操作系统等信息, 以方便后续的分析使用
[root@logstash test]# cat nginx-grok_useragent_stdout.conf input { file { start_position => "beginning" path => "/var/log/nginx/access.log" } } filter{ grok{ match => { "message" => "%{COMMONAPACHELOG}" #官方自带的Apache转换模块,与nginx格式一致 } } date{ match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ] #target => "access_time" target => "@timestamp" timezone => "Asia/Shanghai" } useragent{ source => "message" target => "useragent" } } output { stdout {} } # 模拟生成nginx日志 [root@logstash test]# echo '139.226.81.24 - - [05/Jul/2022:19:48:03 +0000] "GET /wp-content/themes/dux/js/libs/ias.min.js?ver=THEME_VERSION HTTP/1.1" 200 1949 "https://www.aaa.com/" "Mozilla/5.0 (Linux; U; Android 11; zh-CN; Redmi K30 Pro Build/RKQ1.200826.002) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/78.0.3904.108 Quark/5.4.9.201 Mobile Safari/537.36" "-"' >> /var/log/nginx/access.log # 结果 [root@logstash bin]# ./logstash -f /usr/local/logstash-8.12.0/test/nginx-grok_useragent_stdout.conf ... { "@timestamp" => 2022-07-05T19:48:03.000Z, "host" => { "name" => "logstash" }, "event" => { "original" => "139.226.81.24 - - [05/Jul/2022:19:48:03 +0000] \"GET /wp-content/themes/dux/js/libs/ias.min.js?ver=THEME_VERSION HTTP/1.1\" 200 1949 \"https://www.aaa.com/\" \"Mozilla/5.0 (Linux; U; Android 11; zh-CN; Redmi K30 Pro Build/RKQ1.200826.002) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/78.0.3904.108 Quark/5.4.9.201 Mobile Safari/537.36\" \"-\"" }, "http" => { "request" => { "method" => "GET" }, "response" => { "status_code" => 200, "body" => { "bytes" => 1949 } }, "version" => "1.1" }, "log" => { "file" => { "path" => "/var/log/nginx/access.log" } }, "@version" => "1", "timestamp" => "05/Jul/2022:19:48:03 +0000", "useragent" => { "name" => "Chrome Mobile WebView", "device" => { "name" => "XiaoMi Redmi K30 Pro" }, "os" => { "full" => "Android 11", "name" => "Android", "version" => "11" }, "version" => "78.0.3904.108" }, "source" => { "address" => "139.226.81.24" }, "message" => "139.226.81.24 - - [05/Jul/2022:19:48:03 +0000] \"GET /wp-content/themes/dux/js/libs/ias.min.js?ver=THEME_VERSION HTTP/1.1\" 200 1949 \"https://www.aaa.com/\" \"Mozilla/5.0 (Linux; U; Android 11; zh-CN; Redmi K30 Pro Build/RKQ1.200826.002) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/78.0.3904.108 Quark/5.4.9.201 Mobile Safari/537.36\" \"-\"", "url" => { "original" => "/wp-content/themes/dux/js/libs/ias.min.js?ver=THEME_VERSION" } }
【5】Mutate 插件
Mutate 插件主要是对字段进行、类型转换、删除、替换、更新等操作,可以使用以下函数
remove_field #删除字段
split #字符串切割,相当于awk取列
add_field #添加字段
convert #类型转换,支持的数据类型:integer,integer_eu,float,float_eu,string,boolean
gsub #字符串替换
rename #字符串改名
lowercase #转换字符串为小写
# Split 切割 # mutate 中的 split 字符串切割,指定字符做为分隔符,切割的结果用于生成新的列表元素 filter { mutate { #字段分隔符 split => { "message" => "|" } #将message字段按 | 分割成名称message列表中多个列表元素 } } # remove_field 删除字段 filter { mutate { #remove_field => ["headers","message", "agent"] #7.X remove_field => ["timestamp","message", "http"] #8.X } } # add_field 添加字段 filter { #mutate 切割操作 mutate { #字段分隔符 split => { "message" => "|" } #添加字段,将message的列表的第0个元素添加字段名user_id add_field => { "user_id" => "%{[message][0]}" "action" => "%{[message][1]}" "time" => "%{[message][2]}" } #添加字段做索引名 #add_field => {"[@metadata][target_index]" => "app-%{+YYY.MM.dd}"} #删除无用字段 remove_field => ["headers","message"] } } # convert 转换 filter { #mutate 切割操作 mutate { #字段分隔符 split => { "message" => "|" } #添加字段 add_field => { "user_id" => "%{[message][0]}" "action" => "%{[message][1]}" "time" => "%{[message][2]}" } #删除无用字段 remove_field => ["headers","message"] #对新添加字段进行格式转换 convert => { "user_id" => "integer" "action" => "string" "time" => "string" } #convert => ["excute_time","float] #此格式也可以支持 #convert => ["time","string" ] } } # gsub 替换 filter { mutate { gsub=>["message","\n", " "] #将message字段中的换行替换为空格 } }
【6】条件判断
Filter 语句块中支持 if 条件判断功能
filter { mutate { gsub=>["message","\n", " "] #将message字段中的换行替换为空格 } } filter { if "access" in [tags][0] { mutate { add_field => { "target_index" => "access-%{+YYYY.MM.dd}"} } } else if "error" in [tags][0] { mutate { add_field => { "target_index" => "error-%{+YYYY.MM.dd}"} } } else if "system" in [tags][0] { mutate { add_field => { "target_index" => "system-%{+YYYY.MM.dd}"} } } }
【具体配置】
[root@web1 ~]#grep -Ev "#|^$" /etc/filebeat/filebeat.yml filebeat.inputs: - type: log enabled: true paths: - /var/log/nginx/access_json.log json.keys_under_root: true json.overwrite_keys: true tags: ["nginx-access"] - type: log enabled: true paths: - /var/log/nginx/error.log tags: ["nginx-error"] - type: log enabled: true paths: - /var/log/syslog tags: ["syslog"] output.kafka: hosts: ["11.0.1.120:9092, 11.0.1.121:9092, 11.0.1.122:9092"] topic: filebeat-log #指定kafka的topic partition.round_robin: reachable_only: true #true表示只发布到可用的分区,false时表示所有分区,如果一个节点down会导致block required_acks: 1 #如果为0,表示不确认,错误消息可能会丢失,1等待写入主分区(默认),-1等待写入副本分区 compression: gzip max_message_bytes: 1000000 #每条消息最大长度,以字节为单位,如果超过将丢弃
[root@logstash ~]#cat /etc/logstash/conf.d/kafka-to-es.conf input { kafka { bootstrap_servers => "11.0.1.120:9092, 11.0.1.121:9092, 11.0.1.122:9092" topics => "filebeat-log" codec => "json" #group_id => "logstash" #消费者组的名称 #consumer_threads => "3" #建议设置为和kafka的分区相同的值为线程数 #topics_pattern => "nginx-.*" #通过正则表达式匹配topic,而非用上面topics=>指定固定值 } } output { #stdout {} #调试使用 if "nginx-access" in [tags] { elasticsearch { hosts => ["11.0.1.123:9200","11.0.1.136:9200","11.0.1.137:9200"] index => "logstash-kafka-nginx-accesslog-%{+YYYY.MM.dd}" #user => "elastic" #password => "123456" 如果es设置有账号密码需要加上 } } if "nginx-error" in [tags] { elasticsearch { hosts => ["11.0.1.123:9200","11.0.1.136:9200","11.0.1.137:9200"] index => "logstash-kafka-nginx-errorlog-%{+YYYY.MM.dd}" } } if "syslog" in [tags] { elasticsearch { hosts => ["11.0.1.123:9200","11.0.1.136:9200","11.0.1.137:9200"] index => "logstash-kafka-syslog-%{+YYYY.MM.dd}" } } } [root@logstash ~]#systemctl restart logstash.service
ES中的日志后续会被删除,但有些重要数据,比如状态码、客户端IP、客户端浏览器版本等,后期可以会按月或年做数据统计等,因此需要持久保存
可以将重要数据写入数据库达到持久保存目的
具体部署
1)数据库操作
1) 需要安装数据库,生产中要安装集群模式,这里就省略了 2)创建库和表并授权用户登录 mysql> create database elk character set utf8 collate utf8_bin; Query OK, 1 row affected, 2 warnings (0.02 sec) mysql> create user elk@"11.0.1.%" identified by '123456'; Query OK, 0 rows affected (0.01 sec) mysql> grant all privileges on elk.* to elk@"11.0.1.%"; Query OK, 0 rows affected (0.00 sec) mysql> flush privileges; Query OK, 0 rows affected (0.00 sec) # #创建表,字段对应需要保存的数据字段 mysql> use elk; Database changed mysql> CREATE TABLE elklog ( clientip VARCHAR ( 39 ),uri VARCHAR ( 256 ),STATUS CHAR ( 3 ),time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); Query OK, 0 rows affected (0.01 sec)
2)Logstash操作
1 Logstash 配置 mysql-connector-java 包
MySQL Connector/J是MySQL官方JDBC驱动程序,JDBC(Java Data Base Connectivity,java数据库连接)是一种用于执行SQL语句的Java API,可以为多种关系数据库提供统一访问,它由一组用Java语言编写的类和接口组成
【1】下载 #官方下载地址:https://dev.mysql.com/downloads/connector/ wget https://downloads.mysql.com/archives/get/p/3/file/mysql-connector-java-8.0.28-1.el8.noarch.rpm 【2】 安装 [root@logstash src]# yum install /home/weihu/src/mysql-connector-j-8.4.0-1.el8.noarch.rpm [root@logstash src]# rpm -ql mysql-connector* 警告:mysql-connector-j-8.4.0-1.el8.noarch.rpm: 头V4 RSA/SHA256 Signature, 密钥 ID a8d3785c: NOKEY /usr/share/doc/mysql-connector-j /usr/share/doc/mysql-connector-j/CHANGES /usr/share/doc/mysql-connector-j/INFO_BIN /usr/share/doc/mysql-connector-j/INFO_SRC /usr/share/doc/mysql-connector-j/LICENSE /usr/share/doc/mysql-connector-j/README /usr/share/java/mysql-connector-j.jar /usr/share/java/mysql-connector-java.jar /usr/share/doc/mysql-connector-j /usr/share/doc/mysql-connector-j/CHANGES /usr/share/doc/mysql-connector-j/INFO_BIN /usr/share/doc/mysql-connector-j/INFO_SRC /usr/share/doc/mysql-connector-j/LICENSE /usr/share/doc/mysql-connector-j/README /usr/share/java/mysql-connector-j.jar /usr/share/java/mysql-connector-java.jar 【3】 复制jar文件到logstash指定的目录下 # mysql-connector-java.jar是链接文件,其实复制哪个都可以 [root@logstash src]# ll /usr/share/java/mysql-connector-java.jar lrwxrwxrwx 1 root root 21 3月 13 09:04 /usr/share/java/mysql-connector-java.jar -> mysql-connector-j.jar [root@logstash src]# ll /usr/share/java/mysql-connector-j.jar -rw-r--r-- 1 root root 2.5M 3月 13 09:04 /usr/share/java/mysql-connector-j.jar [root@logstash ~]# mkdir /usr/local/logstash-8.12.0/liboutput/ [root@logstash ~]# cp /usr/share/java/mysql-connector-j.jar /usr/local/logstash-8.12.0/liboutput/ [root@logstash ~]# ll /usr/local/logstash-8.12.0/liboutput/ 总用量 2.5M -rw-r--r-- 1 root root 2.5M 7月 5 21:58 mysql-connector-j.jar 【4】更改 Gem 源 #Logstash 基于 Ruby 语言实现 , 后续需要安装相关插件,需要从 gem 源下载依赖包 #ruby 语言使用国外的gem源, 由于网络原因,从国内访问很慢而且不稳定 [root@logstash ~]# yum -y install ruby [root@logstash ~]# gem -v 3.2.32 [root@logstash ~]# gem sources -l *** CURRENT SOURCES *** https://rubygems.org/ #修改指定国内的源 [root@logstash ~]# gem sources --add https://gems.ruby-china.com/ --remove https://rubygems.org/ [root@logstash ~]# gem sources -l *** CURRENT SOURCES *** https://gems.ruby-china.com/ 【5】安装logstash-output-jdbc插件 #查看安装插件,默认只有input-jdbc插件,需要安装output-jdbc插件 [root@logstash ~]# /usr/local/logstash-8.12.0/bin/logstash-plugin list |grep jdbc logstash-integration-jdbc ├── logstash-input-jdbc ├── logstash-filter-jdbc_streaming └── logstash-filter-jdbc_static ####在线安装output-jdbc插件,可能会等较长时间 [root@logstash ~]# /usr/local/logstash-8.12.0/bin/logstash-plugin install logstash-output-jdbc Using bundled JDK: /usr/local/logstash-8.12.0/jdk Validating logstash-output-jdbc Resolving mixin dependencies Installing logstash-output-jdbc Installation successful ####离线安装 ####如果无法在线安装,可以先从已经安装的主机导出插件,再导入 #导出插件 [root@logstash ~]# /usr/local/logstash-8.12.0/bin/logstash-plugin prepare-offline-pack logstash-output-jdbc Using bundled JDK: /usr/local/logstash-8.12.0/jdk Offline package created at: /usr/local/logstash-8.12.0/logstash-offline-plugins-8.12.0.zip You can install it with this command `bin/logstash-plugin install file:///usr/local/logstash-8.12.0/logstash-offline-plugins-8.12.0.zip` [root@logstash ~]# ll /usr/local/logstash-8.12.0/logstash-offline-plugins-8.12.0.zip -rw-r--r-- 1 root root 2.9M 7月 5 17:28 /usr/local/logstash-8.12.0/logstash-offline-plugins-8.12.0.zip #离线导入插件 /usr/local/logstash-8.12.0/bin/logstash-plugin install file:///usr/local/logstash-8.12.0/logstash-offline-plugins-8.12.0.zip #再次检查安装拆插件 [root@logstash ~]# /usr/local/logstash-8.12.0/bin/logstash-plugin list |grep jdbc logstash-integration-jdbc ├── logstash-input-jdbc ├── logstash-filter-jdbc_streaming └── logstash-filter-jdbc_static logstash-output-jdbc
3)配置filebeate
[root@web ~]# cat /etc/filebeat/filebeat.yml filebeat.inputs: - type: log enabled: true json.keys_under_root: true paths: - /var/log/nginx/error.log tags: [wuuuuuu] - type: log enable: true paths: - /var/log/messages output.logstash: hosts: ["11.0.1.139:5044"] #修改完重启filebeat即可 [root@web ~]# systemctl restart filebeat
4)配置logstash
[root@logstash ~]# cat /usr/local/logstash-8.12.0/test/mysq_filebeat.conf input { beats { port => 5044 } } filter { if "wuuuuuu" in [tags] { mutate { rename => { "@timestamp" => "times" } } } } output { stdout { codec=>rubydebug } if "wuuuuuu" in [tags] { jdbc { driver_jar_path => "/usr/local/logstash-8.12.0/liboutput/mysql-connector-j.jar" driver_class => "com.mysql.jdbc.Driver" connection_string => "jdbc:mysql://11.0.1.138:3306/elk?user=elk&password=123456&characterEncoding=UTF-8&useSSL=false&serverTimezone=GMT" statement => ["INSERT INTO elklog (clientip,uri,STATUS,time) VALUES(?,?,?,?)","clientip","uri","status","times"] } } }
# 检查语法
[root@logstash ~]# /usr/local/logstash-8.12.0/bin/logstash -f /usr/local/logstash-8.12.0/test/mysq_filebeat.conf -t
# 后台启动
[root@logstash ~]# /usr/local/logstash-8.12.0/bin/logstash -f /usr/local/logstash-8.12.0/test/mysq_filebeat.conf &
5)产生nginx日志
我这里提前准备了一些nginx的访问日志(json格式的),然后直接模拟导入
[root@web tmp]# head -n10 access_json.log >> /var/log/nginx/access_json.log
----
{"@timestamp":"2023-02-20T03:56:01+08:00","host":"172.17.180.135","clientip":"47.92.67.105","size":7067,"responsetime":0.158,"upstreamtime":"0.157","upstreamhost":"127.0.0.1:9000","http_host":"www.xxxx.com","uri":"/index.php","domain":"www.wangxiaochun.com","xff":"-","referer":"-","tcp_xff":"","http_user_agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.93 Safari/537.36","status":"200"}
----
6)验证数据是否有数据
也可以使用MySQL连接工具,比如navicat
[root@test-db01 ~]# mysql -h11.0.1.138 -uelk -p -e 'select * from elk.elklog' Enter password: +----------------+---------------+--------+---------------------+ | clientip | uri | STATUS | time | +----------------+---------------+--------+---------------------+ | 20.203.179.186 | /.git/config | 302 | 2024-07-06 16:33:58 | | 47.92.67.105 | /index.php | 200 | 2024-07-06 16:33:58 | | 20.203.179.186 | /.git/config | 302 | 2024-07-06 16:33:58 | | 218.94.106.239 | / | 302 | 2024-07-06 16:33:58 | | 47.92.78.126 | / | 302 | 2024-07-06 16:33:58 | | 39.103.133.249 | - | 400 | 2024-07-06 16:33:58 | | 123.56.174.200 | /wp-cron.php | 499 | 2024-07-06 16:33:58 | | 148.72.244.186 | /wp-login.php | 200 | 2024-07-06 16:33:58 | | 47.92.33.185 | / | 302 | 2024-07-06 16:33:58 | | 123.56.174.200 | /wp-cron.php | 499 | 2024-07-06 16:33:58 | +----------------+---------------+--------+---------------------+
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。