当前位置:   article > 正文

大数据ELK(十九):使用FileBeat采集Kafka日志到Elasticsearch_filebeats(1)_filebeat采集kafka 写入elasticsearch

filebeat采集kafka 写入elasticsearch

img
img

网上学习资料一大堆,但如果学到的知识不成体系,遇到问题时只是浅尝辄止,不再深入研究,那么很难做到真正的技术提升。

需要这份系统化资料的朋友,可以戳这里获取

一个人可以走的很快,但一群人才能走的更远!不论你是正从事IT行业的老鸟或是对IT行业感兴趣的新人,都欢迎加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!


在FileBeats中,可以读取一个或多个数据源。


![](https://img-blog.csdnimg.cn/5ebad2449d5c46fa883182ddc99d264c.png)


#### 2、******output配置******


![](https://img-blog.csdnimg.cn/8ed99d9ecd254296a9ea7e565e1bd099.png)


默认FileBeat会将日志数据放入到名称为:filebeat-%filebeat版本号%-yyyy.MM.dd 的索引中。


PS:


FileBeats中的filebeat.reference.yml包含了FileBeats所有支持的配置选项。



### 三、******配置文件******


#### 1、创建配置文件



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

cd /export/server/es/filebeat-7.6.1-linux-x86_64
vim filebeat_kafka_log.yml


#### 2、复制一下到配置文件中



  • 1
  • 2
  • 3
  • 4
  • 5

filebeat.inputs:

  • type: log
    enabled: true
    paths:
    • /export/server/es/data/kafka/server.log.*

output.elasticsearch:
hosts: [“node1:9200”, “node2:9200”, “node3:9200”]



### 四、******运行FileBeat******


#### 1、运行FileBeat



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

./filebeat -c filebeat_kafka_log.yml -e



#### 2、将日志数据上传到/var/kafka/log,并解压



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

mkdir -p /export/server/es/data/kafka/

tar -xvzf kafka_server.log.tar.gz


**注意: 文件权限的报错**


如果在启动fileBeat的时候, 报了一个配置文件权限的错误, 请修改其权限为 -rw-r--r--



### 五、查询数据


#### 1、查看索引信息


GET /\_cat/indices?v



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

{
        “health”: “green”,
        “status”: “open”,
        “index”: “filebeat-7.6.1-2021.12.05-000001”,
        “uuid”: “dplqB_hTQq2XeSk6S4tccQ”,
        “pri”: “1”,
        “rep”: “1”,
        “docs.count”: “213780”,
        “docs.deleted”: “0”,
        “store.size”: “71.9mb”,
        “pri.store.size”: “35.8mb”
    }


GET /filebeat-7.6.1-2021.12.05-000001/\_search



  • 1
  • 2
  • 3
  • 4
  • 5

{
                “_index”: “filebeat-7.6.1-2021.12.05-000001”,
                “_type”: “_doc”,
                “_id”: “-72pX3IBjTeClvZff0CB”,
                “_score”: 1,
                “_source”: {
                    “@timestamp”: “2021-12-05T09:00:40.041Z”,
                    “log”: {
                        “offset”: 55433,
                        “file”: {
                            “path”: “/var/kafka/log/server.log.2021-12-05-16”
                        }
                    },
                    “message”: “[2021-12-05 09:01:30,682] INFO Socket connection established, initiating session, client: /192.168.88.100:46762, server: node1.cn/192.168.88.100:2181 (org.apache.zookeeper.ClientCnxn)”,
                    “input”: {
                        “type”: “log”
                    },
                    “ecs”: {
                        “version”: “1.4.0”
                    },
                    “host”: {
                        “name”: “node1”
                    },
                    “agent”: {
                        “id”: “b4c5c4dc-03c3-4ba4-9400-dc6afcb36d64”,
                        “version”: “7.6.1”,
                        “type”: “filebeat”,
                        “ephemeral_id”: “b8fbf7ab-bc37-46dd-86c7-fa7d74d36f63”,
                        “hostname”: “node1”
                    }
                }
            }


FileBeat自动给我们添加了一些关于日志、采集类型、Host各种字段。



### 六、​​​​​​​******解决一个日志涉及到多行问题******


我们在日常日志的处理中,经常会碰到日志中出现异常的情况。类似下面的情况:



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

[2021-12-05 14:00:05,725] WARN [ReplicaFetcher replicaId=0, leaderId=1, fetcherId=0] Error when sending leader epoch request for Map(test_10m-2 -> (currentLeaderEpoch=Optional[161], leaderEpoch=158)) (kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to node2:9092 (id: 1 rack: null) failed.
at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71)
at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:102)
at kafka.server.ReplicaFetcherThread.fetchEpochEndOffsets(ReplicaFetcherThread.scala:310)
at kafka.server.AbstractFetcherThread.truncateToEpochEndOffsets(AbstractFetcherThread.scala:208)
at kafka.server.AbstractFetcherThread.maybeTruncate(AbstractFetcherThread.scala:173)
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
[2021-12-05 14:00:05,725] INFO [ReplicaFetcher replicaId=0, leaderId=1, fetcherId=0] Retrying leaderEpoch request for partition test_10m-2 as the leader reported an error: UNKNOWN_SERVER_ERROR (kafka.server.ReplicaFetcherThread)
[2021-12-05 14:00:08,731] WARN [ReplicaFetcher replicaId=0, leaderId=1, fetcherId=0] Connection to node 1 (node2/192.168.88.101:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)


在FileBeat中,Harvest是逐行读取日志文件的。但上述的日志会出现一条日志,跨多行的情况。有异常信息时,肯定会出现多行。我们先来看一下,如果默认不处理这种情况会出现什么问题。



#### 1、​​​​​​​******导入错误日志******


**1)在/export/server/es/data/kafka/中创建名为server.log.2021-12-05的日志文件**


**2)将资料中的err.txt日志文本贴入到该文件中**


观察FileBeat,发现FileBeat已经针对该日志文件启动了Harvester,并读取到数据数据。



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

2021-12-05T19:11:01.236+0800    INFO    log/harvester.go:297    Harvester started for file: /var/kafka/log/server.log.2021-12-05


**3)在Elasticsearch检索该文件**


我们发现,原本是一条日志中的异常信息,都被作为一条单独的消息来处理了~


"message":"java.io.IOException:Connection to node2:9092 (id;



这明显是不符合我们的预期的,我们想要的是将所有的异常消息合并到一条日志中。那针对这种情况该如何处理呢?



#### 2、​​​​​​​******问题分析******


每条日志都是有统一格式的开头的,就拿Kafka的日志消息来说,[2021-12-05 14:00:05,725]这是一个统一的格式,如果不是以这样的形式开头,说明这一行肯定是属于某一条日志,而不是独立的一条日志。所以,我们可以通过日志的开头来判断某一行是否为新的一条日志。



#### 3、​​​​​​​******FileBeat多行配置选项******


在FileBeat的配置中,专门有一个解决一条日志跨多行问题的配置。主要为以下三个配置:



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

multiline.pattern: ^[
multiline.negate: false
multiline.match: after


multiline.pattern表示能够匹配一条日志的模式,默认配置的是以[开头的才认为是一条新的日志。


multiline.negate:配置该模式是否生效,默认为false。


multiline.match:表示是否将未匹配到的行追加到上一日志,还是追加到下一个日志。



#### 4、​​​​​​​******重新配置FileBeat******


**1)修改filebeat.yml,并添加以下内容**



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

filebeat.inputs:

  • type: log
    enabled: true
    paths:

img
img
img

既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,涵盖了95%以上大数据知识点,真正体系化!

由于文件比较多,这里只是将部分目录截图出来,全套包含大厂面经、学习笔记、源码讲义、实战项目、大纲路线、讲解视频,并且后续会持续更新

需要这份系统化资料的朋友,可以戳这里获取

715027242251)]
[外链图片转存中…(img-zj0jb60U-1715027242252)]

既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,涵盖了95%以上大数据知识点,真正体系化!

由于文件比较多,这里只是将部分目录截图出来,全套包含大厂面经、学习笔记、源码讲义、实战项目、大纲路线、讲解视频,并且后续会持续更新

需要这份系统化资料的朋友,可以戳这里获取

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小桥流水78/article/detail/923654
推荐阅读
相关标签
  

闽ICP备14008679号