当前位置:   article > 正文

Filebeat读取日志推送kafka(docker-compose)_filebeat采集日志到kafka

filebeat采集日志到kafka

简介

Filebeat是本地文件的日志数据采集器,可监控日志目录或特定日志文件(tail file),并将它们转发给Elasticsearch或Logstatsh进行索引、kafka等。带有内部模块(auditd,Apache,Nginx,System和MySQL),可通过一个指定命令来简化通用日志格式的收集,解析和可视化。

  • 工作流程
    在这里插入图片描述
  1. 安装filebeat
mkdir filebeat/{etc,logs}
vim docker-compose-filebeat.yml
  • 1
  • 2
version: "3.3"
services:
  filebeat:
    image: filebeat:7.13.2
    container_name: filebeat
    #restart: always
    restart: unless-stopped
    user: root
    privileged: true
    volumes:
    - ./filebeat/etc/filebeat.yml:/usr/share/filebeat/filebeat.yml
    - ./filebeat/logs:/usr/share/filebeat/logs
    - /var/log/test:/logs    #映射本地需要收集的日志文件或目录
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  1. 编辑配置文件
vim 
  • 1
filebeat.inputs:
#文件类型
- type: log
  enabled: true
  tail_files: true          #从文件末尾开始读取
#文件位置
  paths:
    - /logs/mysql.log
  scan_frequency: 10s       #每十秒扫描一次,如果设置为0s,则Filebeat会尽可能快地感知更新(占用的CPU会变高)。默认是10s
  close_older: 1h           # 如果一个文件在某个时间段内没有发生过更新,则关闭监控的文件handle。默认1h
#自定义字段
  fields:
    level: notice
    appname: mysql
#日志多行合并
  multiline:
        # pattern for error log, if start with space or cause by 
    pattern: '^\\d{4}/\\d{2}/\\d{2}'    #希望匹配到的结果(正则表达式)
    negate:  true       #值为 true 或 false。使用 false 代表匹配到的行合并到上一行;使用 true 代表不匹配的行合并到上一行
    match:   after      #值为 after 或 before。after 代表合并到上一行的末尾;before 代表合并到下一行的开头
# ============================== Filebeat modules ==============================
#设置模块文件位置
filebeat.config.modules:
  path: ${path.config}/modules.d/*.yml
  reload.enabled: false
# ======================= Elasticsearch template setting =======================
setup.template.settings:
  index.number_of_shards: 1

# ---------------------------- Elasticsearch Output ----------------------------
#output.elasticsearch:
#  hosts: ["192.168.1.253:19200"]

  # Protocol - either `http` (default) or `https`.
  #protocol: "https"

  # Authentication credentials - either API key or username/password.
  #api_key: "id:api_key"
  #username: "elastic"
  #password: "changeme"
# ---------------------------- KAFKA Output ----------------------------
output.kafka:
   enabled: true
   hosts: ["192.168.0.161:9092","192.168.0.161:9093","192.168.0.161:9094"]
   topic: log_filebeat
   #version: "0.9.0.1"
   compression: gzip
#用于从事件中删除指定字段
processors:
- drop_fields: 
   fields: ["beat", "input", "source", "offset"]
# ================================== Logging ===================================
#设置日志级别
logging.level: debug
name: filebeat.log
# ================================= Processors =================================
#processors:
#  - add_host_metadata:
#      when.not.contains.tags: forwarded
#  - add_cloud_metadata: ~
#  - add_docker_metadata: ~
#  - add_kubernetes_metadata: ~

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  1. 启动模块
docker-compose -f docker-compose-filebeat.yml up -d
  • 1
  1. 搭建测试kafka集群接收日志
docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper
docker run -d --name kafka0 -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=192.168.0.161:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.0.161:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -t wurstmeister/kafka
docker run -d --name kafka1 -p 9093:9093 -e KAFKA_BROKER_ID=1 -e KAFKA_ZOOKEEPER_CONNECT=192.168.0.161:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.0.161:9093 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9093 -t wurstmeister/kafka
docker run -d --name kafka2 -p 9094:9094 -e KAFKA_BROKER_ID=2 -e KAFKA_ZOOKEEPER_CONNECT=192.168.0.161:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.0.161:9094 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9094 -t wurstmeister/kafka
docker run --name kafka-manager -d -p 9000:9000 -e ZK_HOSTS="192.168.0.161:2181" sheepkiller/kafka-manager
  • 1
  • 2
  • 3
  • 4
  • 5

*** kafka常用命令

#查看kafka版本
docker exec -it kafka2 /bin/bash
find /opt/kafka_2.13-2.8.1/libs/ -name \*kafka_\* | head -1 | grep -o '\kafka[^\n]*'
#进入zookeeper客户端
zkCli.sh -server 192.168.0.161:2181
#创建topic
kafka-topics.sh --zookeeper 192.168.0.161:2181 --create --topic log_filebeat --partitions 3 --replication-factor 3
#删除topic
kafka-topics --zookeeper localhost:2181 --topic log_filebeat --delete
#查看topic
kafka-topics.sh --zookeeper 192.168.0.161:2181 --describe --topic log_filebeat
#topic消费者
kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic log_filebeat
#topic生产者
kafka-console-producer --broker-list test1:9092,test2:9092,test3:9092 --topic log_filebeat
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  1. 启动消费者监听是否收到filebeat推送的日志文件
kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic log_filebeat
  • 1
本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/w/煮酒与君饮/article/detail/923622
推荐阅读
相关标签
  

闽ICP备14008679号