当前位置:   article > 正文

FileBeat + Pipeline 解析日志 保存至ElasticSearch(实战)_filebeat pipeline

filebeat pipeline

FileBeat + Pipeline 解析日志 保存至ElasticSearch(实战)

下载地址

https://www.elastic.co/cn/downloads/past-releases#filebeat

目的

使用FileBeat收集日志,Pipeline解析日志,最终写入ES

日志数据

2021-07-01 20:07:25 [XNIO-1 task-2] INFO  fileBeatLogData - 查询用户|4|com.internet.operator.controller..list()|GET|http://127.0.0.1:8080/list|127.0.0.1|jast110|9a2e232170744efda8c526d67f4f5405|userAcco909571P&installedLocation=&pageNum=10&pageSize=10&superQuery=1|{"code":200,"msg":"查询成功","rows":[],"took":2,"total":1}|||0|||1625141245843||||||2021-07-01 20:07:25|142|91110108769392234H|测试111|X
  • 1

模拟Pipeline

注意:如果同时通过setscript设置字段,会以script为准。

POST /_ingest/pipeline/_simulate
{
  "pipeline": {
    "processors" : [
    
      {
        "dissect": {
          "field": "message",
          "pattern": "%{@logTimestamp} [%{logTthread}] %{loglevel} fileBeatLogData - %{logdata}"
        }
      },
      {
        "split": {
          "field": "logdata",
          "separator": "\\|",
          "target_field": "logdata"
        }
      },
      {
        "set": {
          "field": "actionOrFunction",
          "value": "{{logdata.0}}"
        }
      },
      {
        "set": {
          "field": "businessType",
          "value": "{{logdata.1}}"
        }
      },
      {
        "set": {
          "field": "callMethod",
          "value": "{{logdata.2}}"
        }
      },
      {
        "set": {
          "field": "requestMethod",
          "value": "{{logdata.3}}"
        }
      },
      {
        "set": {
          "field": "callLink",
          "value": "{{logdata.4}}"
        }
      },
      {
        "set": {
          "field": "loginUserIp",
          "value": "{{logdata.5}}"
        }
      },
      {
        "set": {
          "field": "userName",
          "value": "{{logdata.6}}"
        }
      },
      {
        "set": {
          "field": "userId",
          "value": "{{logdata.7}}"
        }
      },
      {
        "set": {
          "field": "paramOrInputData",
          "value": "{{logdata.8}}"
        }
      },
      {
        "set": {
          "field": "resultOrOutputData",
          "value": "{{logdata.9}}"
        }
      },
      {
        "set": {
          "field": "exceptionInfo",
          "value": "{{logdata.10}}"
        }
      },
      {
        "set": {
          "field": "systemEnv",
          "value": "{{logdata.11}}"
        }
      },
      {
        "set": {
          "field": "status",
          "value": "{{logdata.12}}"
        }
      },
      {
        "set": {
          "field": "fullLinkId",
          "value": "{{logdata.13}}"
        }
      },
      {
        "set": {
          "field": "subFullLinkId",
          "value": "{{logdata.14}}"
        }
      },
      {
        "set": {
          "field": "currentTimeMillisecond",
          "value": "{{logdata.15}}"
        }
      },
      {
        "convert": {
          "field": "currentTimeMillisecond",
          "type": "long"
        }
      },
      {
        "set": {
          "field": "detail",
          "value": "{{logdata.16}}"
        }
      },{
        "set": {
          "field": "other",
          "value": "{{logdata.17}}"
        }
      },
      {
        "set": {
          "field": "errorData",
          "value": "{{logdata.18}}"
        }
      },
      {
        "set": {
          "field": "errorDataSource",
          "value": "{{logdata.19}}"
        }
      },
      {
        "set": {
          "field": "errorDataDetail",
          "value": "{{logdata.20}}"
        }
      },
      {
        "set": {
          "field": "logTime",
          "value": "{{logdata.21}}"
        }
      },
      {
        "set": {
          "field": "processTime",
          "value": "{{logdata.22}}"
        }
      },
      {
        "convert": {
          "field": "processTime",
          "type": "long"
        }
      },
      {
        "set": {
          "field": "orgCode",
          "value": "{{logdata.23}}"
        }
      },
      {
        "set": {
          "field": "orgName",
          "value": "{{logdata.24}}"
        }
      },
      {
        "set": {
          "field": "exceptionDetailInfo",
          "value": "{{logdata.25}}"
        }
      },{
        "set": {
          "field": "message",
          "value": ""
        }
      },{
        "set": {
          "field": "logdata",
          "value": ""
        }
      },
      {
        "script": {
          "lang": "painless",
          "source": """ ctx.insertTime = new Date(System.currentTimeMillis()+1000l*60*60*8);  """
        }
      }
  ]
  },
  "docs": [
    {
      "_source": {
        "message": "2021-07-01 20:07:25 [XNIO-1 task-2] INFO  fileBeatLogData - 查询运营商宽带用户|4|com.bjga.internet.operator.controller.OperatorBroadbandController.list()|GET|http://127.0.0.1:8080/operator2/broadband/list|127.0.0.1|jast110|9a2e232170744efda8c526d67f4f5405|userAccount=%E5%8C%97%E4%BA%AC1%E5%B8%8256&installedPhone=639857&accountHolderName=%E4%B8%9C%E7%A5%A5%E6%9E%97&operatorCreditCode=91110108101909571P&installedLocation=&pageNum=10&pageSize=10&superQuery=1|{\"code\":200,\"msg\":\"查询成功\",\"rows\":[],\"took\":2,\"total\":1}|||0|||1625141245843||||||2021-07-01 20:07:25|142|91110108769392234H|测试111|X"
      }
    }
  ]
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
  • 207
  • 208
  • 209
  • 210
  • 211

创建pipeline

PUT _ingest/pipeline/logdatapipeline
{
  "description" : "outer pipeline",
  "processors" : [
    
      {
        "dissect": {
          "field": "message",
          "pattern": "%{@logTimestamp} [%{logTthread}] %{loglevel} fileBeatLogData - %{logdata}"
        }
      },
      {
        "split": {
          "field": "logdata",
          "separator": "\\|",
          "target_field": "logdata"
        }
      },
      {
        "set": {
          "field": "actionOrFunction",
          "value": "{{logdata.0}}"
        }
      },
      {
        "set": {
          "field": "businessType",
          "value": "{{logdata.1}}"
        }
      },
      {
        "set": {
          "field": "callMethod",
          "value": "{{logdata.2}}"
        }
      },
      {
        "set": {
          "field": "requestMethod",
          "value": "{{logdata.3}}"
        }
      },
      {
        "set": {
          "field": "callLink",
          "value": "{{logdata.4}}"
        }
      },
      {
        "set": {
          "field": "loginUserIp",
          "value": "{{logdata.5}}"
        }
      },
      {
        "set": {
          "field": "userName",
          "value": "{{logdata.6}}"
        }
      },
      {
        "set": {
          "field": "userId",
          "value": "{{logdata.7}}"
        }
      },
      {
        "set": {
          "field": "paramOrInputData",
          "value": "{{logdata.8}}"
        }
      },
      {
        "set": {
          "field": "resultOrOutputData",
          "value": "{{logdata.9}}"
        }
      },
      {
        "set": {
          "field": "exceptionInfo",
          "value": "{{logdata.10}}"
        }
      },
      {
        "set": {
          "field": "systemEnv",
          "value": "{{logdata.11}}"
        }
      },
      {
        "set": {
          "field": "status",
          "value": "{{logdata.12}}"
        }
      },
      {
        "set": {
          "field": "fullLinkId",
          "value": "{{logdata.13}}"
        }
      },
      {
        "set": {
          "field": "subFullLinkId",
          "value": "{{logdata.14}}"
        }
      },
      {
        "set": {
          "field": "currentTimeMillisecond",
          "value": "{{logdata.15}}"
        }
      },
      {
        "convert": {
          "field": "currentTimeMillisecond",
          "type": "long"
        }
      },
      {
        "set": {
          "field": "detail",
          "value": "{{logdata.16}}"
        }
      },{
        "set": {
          "field": "other",
          "value": "{{logdata.17}}"
        }
      },
      {
        "set": {
          "field": "errorData",
          "value": "{{logdata.18}}"
        }
      },
      {
        "set": {
          "field": "errorDataSource",
          "value": "{{logdata.19}}"
        }
      },
      {
        "set": {
          "field": "errorDataDetail",
          "value": "{{logdata.20}}"
        }
      },
      {
        "set": {
          "field": "logTime",
          "value": "{{logdata.21}}"
        }
      },
      {
        "set": {
          "field": "processTime",
          "value": "{{logdata.22}}"
        }
      },
      {
        "convert": {
          "field": "processTime",
          "type": "long"
        }
      },
      {
        "set": {
          "field": "orgCode",
          "value": "{{logdata.23}}"
        }
      },
      {
        "set": {
          "field": "orgName",
          "value": "{{logdata.24}}"
        }
      },
      {
        "set": {
          "field": "exceptionDetailInfo",
          "value": "{{logdata.25}}"
        }
      },{
        "set": {
          "field": "message",
          "value": ""
        }
      },{
        "set": {
          "field": "logdata",
          "value": ""
        }
      },
      {
        "script": {
          "lang": "painless",
          "source": """ ctx.insertTime = new Date(System.currentTimeMillis()+1000l*60*60*8);  """
        }
      }
  ]
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203

查看Pipeline是否创建成功

GET _ingest/pipeline/logDataPipeline?pretty
  • 1

创建FileBeat配置文件 filebeat.yml

读取 /var/log2/*.log 文件写入ES

filebeat.inputs:
- type: log
  enabled: true
#读取的文件
  paths:
    - /var/log2/*.log
# 标记,在后面用于判断写入的索引
  fields:
    type: logDataPipeline
    source: common
- type: log
  enabled: true
  paths:
    - /var/log/1.log
    - /var/log/2.log
  fields:
    source: exception
- type: log
  enabled: true
  paths:
    - /var/log/3.log


filebeat.config.modules:
  path: ${path.config}/modules.d/*.yml
  reload.enabled: false

# ======================= Elasticsearch template setting =======================

setup.template.settings:
  # 索引默认分片数
  index.number_of_shards: 1
  # 索引默认副本数
  index.number_of_replicas: 1
  #index.codec: best_compression
  #_source.enabled: false
  # # 生成index模板的名称
#允许自动生成index模板
setup.template.enabled: true
# # 如果存在模块则覆盖
setup.template.overwrite: true
# # # 生成index模板时字段配置文件
setup.template.fields: fields.yml
setup.template.name: "logdata" 
# # # 生成index模板匹配的index格式       
setup.template.pattern: "logdata-*" 
setup.ilm.enabled: auto
# 这里一定要注意 会在alias后面自动添加-*
setup.ilm.rollover_alias: "park-ssm"
setup.ilm.pattern: "{now/d}"
# # # 生成kibana中的index pattern,便于检索日志
# #setup.dashboards.index: myfilebeat-7.0.0-*
# #filebeat默认值为auto,创建的elasticsearch索引生命周期为50GB+30天。如果不改,可以不用设置
setup.ilm.enabled: false

# =================================== Kibana ===================================
setup.kibana:


# ---------------------------- Elasticsearch Output ----------------------------
output.elasticsearch:
  # Array of hosts to connect to.
  hosts: ["10.8.10.12:9200"]
  index: "logdata-%{+yyyy.MM.dd}"
  indices:
    - index: "logdata-%{[fields.source]}-%{+yyyy.MM.dd}"
      when.equals: 
        fields: 
          source: "common"
    - index: "logdata-%{[fields.source]}-%{+yyyy.MM.dd}"
      when.equals:
        fields:
          source: "exception"
  pipelines:
    - pipeline: logDataPipeline
      when.equals:
        fields.type: logDataPipeline

# ================================= Processors =================================
processors:
  - add_host_metadata:
      when.not.contains.tags: forwarded
  - add_cloud_metadata: ~
  - add_docker_metadata: ~
  - add_kubernetes_metadata: ~
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85

创建自定义字段 FileBeat fields.yml

# 我们自定义的
- key: rbt
  title: rbt
  description: rbt log data fields 
  fields:
    - name: logdata
      type: keyword
    - name: actionOrFunction
      type: keyword
    - name: businessType
      type: keyword
    - name: callMethod
      type: keyword
    - name: requestMethod
      type: keyword
    - name: callLink
      type: keyword
    - name: loginUserIp
      type: keyword
    - name: userName
      type: keyword
    - name: userId
      type: keyword
    - name: paramOrInputData
      type: keyword
    - name: resultOrOutputData
      type: keyword
    - name: exceptionInfo
      type: keyword
    - name: systemEnv
      type: keyword
    - name: status
      type: long
    - name: fullLinkId
      type: keyword
    - name: subFullLinkId
      type: keyword
    - name: currentTimeMillisecond
      type: long
    - name: detail
      type: keyword
    - name: other
      type: keyword
    - name: errorData
      type: keyword
    - name: errorDataSource
      type: keyword
    - name: errorDataDetail
      type: keyword
    - name: logTime
      type: keyword
    - name: processTime
      type: long
    - name: orgCode
      type: keyword
    - name: orgName
      type: keyword
    - name: exceptionDetailInfo
      type: keyword
    - name: insertTime
      type: date
			
# FileBeat自带的
- key: ecs
  title: ECS
  description: ECS Fields.
  fields:
  - name: '@timestamp'
    level: core
    required: true
    type: date
    description: 'Date/time when the event originated.

      This is the date/time extracted from the event, typically representing when
      the event was generated by the source.

      If the event source has no original timestamp, this value is typically populated
      by the first time the event was received by the pipeline.

      Required field for all events.'
    example: '2016-05-23T08:05:34.853Z'
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81

执行 FileBeat

[root@test13 filebeat-7.9.3-linux-x86_64]# ls
data        fields.yml.bak  filebeat.reference.yml  filebeat.yml.bak  LICENSE.txt  modules.d   README.md
fields.yml  filebeat        filebeat.yml            kibana            module       NOTICE.txt  s.log
[root@test13 filebeat-7.9.3-linux-x86_64]# ./filebeat -e 
  • 1
  • 2
  • 3
  • 4

filebeat 启动命令说明

-c 指定配置文件
-d "*" 报错时候,查看具体的错误原因。
  • 1
  • 2

测试

新增数据到 vim /var/log2/test.log

2021-07-01 20:07:25 [XNIO-1 task-2] INFO  fileBeatLogData - 查询用户|4|com.internet.operator.controller..list()|GET|http://127.0.0.1:8080/list|127.0.0.1|jast110|9a2e232170744efda8c526d67f4f5405|userAcco909571P&installedLocation=&pageNum=10&pageSize=10&superQuery=1|{"code":200,"msg":"查询成功","rows":[],"took":2,"total":1}|||0|||1625141245843||||||2021-07-01 20:07:25|142|91110108769392234H|测试111|X
  • 1

查询结果发现日志已经进入到ES
在这里插入图片描述

个人公众号(大数据学习交流): hadoopwiki

Pipeline 配置详解

1. 根据日志数据指定索引 _id

每个文档都会有一些元数据字段信息(metadata filed),比如_id,_index,_type 等,我们在 processors 中也可以直接访问这些信息的,比如下面的例子:

{
  "set": {
  "field": "_id",
  "value": "{{logdata.6}}"
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

FileBeat 配置详解

注意:首次创建的时候FileBeat会在ElasticSearch设置我们再FileBeat配置的_template索引模板,后续重启服务即便配置改了都不会更新该模板,比如下面的分片副本数量,首次启动后,该配置会写入索引模板中,后续修改不起作用。需要重新配置修改,需要删除filebeat目录下的data目录。

1.设置Filebeat保存到ElasticSearch索引副本、分片数量

修改 filebeat.yml 文件中下面参数

setup.template.settings:
  # 索引默认分片数
  index.number_of_shards: 1
  # 索引默认副本数
  index.number_of_replicas: 1
  • 1
  • 2
  • 3
  • 4
  • 5

异常处理

提示 ERROR instance/beat.go:802 Exiting: error initializing processors:

异常内容如下

2022-01-20T14:39:22.441+0800    ERROR   instance/beat.go:802    Exiting: error initializing processors: Cannot connect to the Docker daemon at unix:///var/run/docker.sock. Is the docker daemon running?
Exiting: error initializing processors: Cannot connect to the Docker daemon at unix:///var/run/docker.sock. Is the docker daemon running?
  • 1
  • 2

解决方法
注释掉filebeat.yml文件中的add_docker_metadataadd_kubernetes_metadata

# ================================= Processors =================================
processors:
  - add_host_metadata:
      when.not.contains.tags: forwarded
  - add_cloud_metadata: ~
#  - add_docker_metadata: ~
#  - add_kubernetes_metadata: ~

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号