当前位置:   article > 正文

离线二进制安装:使用categraf采集日志,vector进行日志清洗,存储到ES和ClickHouse。初体验ClickVisual

离线二进制安装:使用categraf采集日志,vector进行日志清洗,存储到ES和ClickHouse。初体验ClickVisual

前言

搭建一套可观测的日志分析平台离不开ETL技术。什么是ETL?ETL 是指 Extract、Transform、Load 的缩写,是一种常见的数据处理模式,用于将数据从一个数据源抽取(Extract)出来,经过转换(Transform)后加载(Load)到目标数据仓库或数据库中。如果数据源是日志文件,那么最通用的技术栈是ELK。相信很多研发人员对这三个字母并不陌生,ELK是Elasticsearch、Logstash、Kibana的首字母,是一个非常流行的开源工具组合,在日志的收集、转换、搜索和分析中得到非常广泛的应用。ELK 架构的简单示例如下图所示:

在这里插入图片描述

Beats:通常指filebeat,用于收集日志文件数据的工具,具有轻量级和低资源消耗的特点。我们使用categra替代filebeat.
Kafka:用于实现日志数据的传输、缓冲和分发
Logstash:用于收集、抽取、转换和发送日志和事件数据的工具,日志经过处理后发送到Elasticsearch。我们使用vector替换Logstash。
Elasticsearch:用于存储、索引和分析日志和事件数据的分布式搜索和分析引擎,具有强大的搜索和聚合功能。我们使用Clickhouse替换Elasticsearch。
Kibana:用于可视化和分析 Elasticsearch 中存储的数据的开源数据可视化工具。我们使用ClickVisual或者grafana替换Kibana。

调整后的架构如下图

在这里插入图片描述

1:安装categraf

categraf的安装部署在官网上已经写的非常详细,这里就不再写了,自行去查看官方文档就好。categraf采用了All-in-one 的设计理念,可以同时支持指标、日志、链路追踪数据的采集。

1.1、下载最新版本

https://flashcat.cloud/download/categraf/
  • 1

2.2、部署

https://flashcat.cloud/docs/content/flashcat-monitor/categraf/2-installation/
  • 1

2.3、修改配置文件

主要修改conf/config.toml配置文件,在下面这个配置文件中同时采集了java日志和nginx日志,并发送到不同的kafka的topic中。

[logs]
## just a placholder
api_key = "127.0.0.1"
## enable log collect or not
enable = true
## the server receive logs, http/tcp/kafka, only kafka brokers can be multiple ip:ports with concatenation character ","
send_to = "127.0.0.1:5044"
## send logs with protocol: http/tcp/kafka
send_type = "kafka"
topic = "crj_server"
## send logs with compression or not 
use_compression = false
# gzip压缩级别,0 表示不压缩, 1-9 表示压缩级别
compression_level=0

#kafka支持的压缩 none gzip snappy lz4 zstd
compression_codec="none"

## use ssl or not
send_with_tls = false
## send logs in batchs
batch_wait = 5
## save offset in this path 
run_path = "/opt/categraf/run"
## max files can be open 
open_files_limit = 100
## scan config file in 10 seconds
scan_period = 10
## read buffer of udp 
frame_size = 9000
## channal size, default 100 

## 读取日志缓冲区,行数
chan_size = 10000
## 有多少线程处理日志
pipeline=10
## configuration for kafka
## 指定kafka版本
kafka_version="3.7.0"
# 默认0 表示按照读取顺序串行写入kafka,如果对日志顺序有要求,保持默认配置
batch_max_concurrence = 100
# 发送缓冲区的大小(行数),如果设置比chan_size小,会自动设置为跟chan_size相同
batch_max_size=10000
# 每次最大发送的内容上限 默认1000000 Byte (与batch_max_size先到者触发发送)
batch_max_content_size=900000
# client timeout in seconds
producer_timeout= 10

# 是否开启sasl模式
sasl_enable = false
sasl_user = "admin"
sasl_password = "admin"
# PLAIN 
sasl_mechanism= "PLAIN"
# v1
sasl_version=1
# set true
sasl_handshake = true
# optional
# sasl_auth_identity=""

##
#ent-v0.3.50以上版本新增,是否开启pod日志采集
enable_collect_container=false
#是否采集所有pod的stdout stderr
collect_container_all = false
  ## glog processing rules
  # [[logs.processing_rules]]
  ## single log configure
  [[logs.items]]
  ## file/journald/tcp/udp
  type = "file"
  ## type=file, path is required; type=journald/tcp/udp, port is required
  path = "/data/crj-server/bin/logs/sys-info.log"
  source = "java"
  service = "crj_server_info"
  [[logs.items]]
  type = "file"
  path = "/data/crj-server/bin/logs/sys-error.log"
  source = "java"
  service = "crj_server_error"
  [[logs.items.log_processing_rules]]
  type = "multi_line"
  name = "new_line_with_date"
  pattern="\\d{4}-\\d{2}-\\d{2}"
  [[logs.items]]
  type = "file"
  path = "/usr/local/nginx/logs/error.log"
  source = "nginx"
  service = "crj_nginx_error"
  topic = "crj_nginx"
  [[logs.items]]
  type = "file"
  path = "/usr/local/nginx/logs/access.log"
  source = "nginx"
  service = "crj_nginx_access"
  topic = "crj_nginx"
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97

2:安装kafka

2.1、下载安装包

下载

https://dlcdn.apache.org/kafka/3.7.0/kafka_2.13-3.7.0.tgz
  • 1

解压

tar -zxvf kafka_2.13-3.7.0.tgz && mv kafka_2.13-3.7.0/ kafka/ && cd kafka
  • 1

2.2、修改配置文件

kafka的启动依赖于zookeeper,先修改 config/zookeeper.properties,调整 dataDir 配置

cd /data/kafka/config
cp zookeeper.properties zookeeper.properties.bak
vim zookeeper.properties
  • 1
  • 2
  • 3
dataDir=/data/kafka/zookeeper/data #数据持久化目录
dataLogDir=/data/kafka/zookeeper/logs #日志目录
clientPort=2181
maxClientCnxns=0
admin.enableServer=false
  • 1
  • 2
  • 3
  • 4
  • 5

修改 Kafka 的配置:config/server.properties,修改 log.dirs,也是更换一下目录,不要使用 /tmp

cp server.properties server.properties.bak
vim server.properties
  • 1
  • 2
broker.id=0
#注意这里要写成具体的IP,不然categraf推送日志会出现推送到本机的问题
listeners=PLAINTEXT://127.0.0.1:5044
advertised.listeners=PLAINTEXT://127.0.0.1:5044
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka/logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

2.3、启动

启动zookeeper

./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
  • 1

启动 Kafka

nohup bin/kafka-server-start.sh config/server.properties &> kafka.stdout &
  • 1

2.4、检查状态

检查 Zookeeper 和 Kafka 是否启动成功,可以使用 jps 命令查看进程,也可以使用 netstat -tunlp 查看端口。

[root@srd-127.0.0.1 kafka]# jps
10725 Jps
10025 Kafka
3724 QuorumPeerMain
  • 1
  • 2
  • 3
  • 4

3:安装kafka可视化工具Redpanda(原Kowl)

3.1、下载

https://github.com/redpanda-data/console/releases/download/v2.4.5/redpanda_console_2.4.5_linux_amd64.tar.gz
  • 1

3.2、解压

tar -xvf redpanda_console_2.4.5_linux_amd64.tar.gz
  • 1

3.3、启动

export KAFKA_BROKERS=127.0.0.1:5044
nohup /data/redpanda-console/redpanda-console &> /data/redpanda-console/redpanda-console.stdout &
  • 1
  • 2

3.4、访问

redpanda会默认启动8080端口,页面长这样

在这里插入图片描述

4:安装vector

4.1、下载

https://github.com/vectordotdev/vector/releases/tag/v0.37.1
  • 1

4.2、安装

rpm -ivh vector-0.37.1-1.x86_64.rpm
  • 1

4.3、修改配置文件

vector的配置文件默认在/etc/vector/目录下面

vector分为三个模块,分别为sourcetransformssinks,并且都可以配置多个元素,但是要保证不能同名

vector中推荐日志清洗transforms推荐使用VRL来实现,配置类型为remap。VRL中配置了大量的常见日志转换的函数,例如parse_nginx_log函数。具体的配置规则可以查看官方文档,写的非常详细。Remap with VRL参考文档,并且vector提供了在线调试平台,可以在线测试代码的合法性VRL在线调试,用法如下:
在这里插入图片描述

实战

# 配置java log4j打出来的info,error日志的解析规则
vim /etc/vector/vector_java.yaml
  • 1
  • 2
sources:
  police_java_log:
    type: kafka
    bootstrap_servers: 127.0.0.1:5044
    group_id: vector_java_consumer
    topics:
      - "^.*server$" # 支持正则表达式

transforms:
  police_java_log_transform:
    type: remap
    inputs:    # 匹配输入源
      - police_java_log
    source: |
      . = parse_json!(.message)
      .java_log,err = parse_regex(.message, r'^(?<timestamp>\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})\s+\[(?<thread>[^\]]+)\]\s+(?<logLevel>[A-Z]+)\s+(?<javaClass>[^\s]+)\s+-\s+(?<content>.*)$') 
      if err != null {
        . = merge(., .java_log)
      }
      .timestamp = parse_timestamp(.timestamp, "%Y/%m/%d %H:%M:%S %z") ?? now()
      del(.status)
sinks:
  police_java_es:
    type: elasticsearch
    inputs: ["police_java_log_transform"] # 匹配转换配置
    #mode: "data_stream" # 数据流方式写入
    auth: # es认证信息 
      strategy: "basic"
      user: "elastic"
      password: "***********"
    endpoints: ["https://127.0.0.1:9200"] # es连接地址
    bulk:
      index: "{{ fcservice }}-{{ fcsource }}-%Y-%m-%d"
    tls: # tls证书配置
      verify_certificate: false # 跳过证书验证
      ca_file: "/etc/vector/http_ca.crt" # ca证书路径
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
配置nginx的access,error日志解析规则
vim /etc/vector/vector_nginx.yaml
  • 1
  • 2
sources:
  police_nginx_log:
    type: kafka
    bootstrap_servers: 127.0.0.1:5044
    group_id: vector_nginx_consumer
    topics:
      - "^.*nginx$"

transforms:
  police_nginx_log_transform:
    type: remap
    inputs:    # 匹配输入源
      - police_nginx_log
    source: |
      . = parse_json!(.message)
      del(.@metadata)
      .nginx_log = parse_nginx_log(.message, "combined") ?? parse_nginx_log!(.message, "error")
      . = merge(., .nginx_log)
      del(.nginx_log)
      if .status == "info" {
        del(.status)
      }
sinks:
  police_nginx_es:
    type: elasticsearch
    inputs: ["police_nginx_log_transform"] # 匹配转换配置
    #mode: "data_stream" # 数据流方式写入
    auth: # es认证信息 
      strategy: "basic"
      user: "elastic"
      password: "*************"
    endpoints: ["https://127.0.0.1:9200"] # es连接地址
    bulk:
      index: "{{ fcservice }}-{{ fcsource }}-%Y-%m-%d" # 根据日志的标签,按照日志创建es的索引
    tls: # tls证书配置
      verify_certificate: false # 跳过证书验证
      ca_file: "/etc/vector/http_ca.crt" # ca证书路径
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

如果把数据推送到clickhouse,需要预先在clickhosue创建表结构,不符合我目前的需求,所以这里先推送到ES。推送到ClickHouse可以查看官方的示例,已经写的非常详细了。vector清洗nginx日志推送到clickhouse

4.4、启动vecotr

#单文件启动
nohup vector -c /etc/vector/vector_java.yaml 2>vector.log & 

#多文件启动
nohup vector -c /etc/vector/*.yaml -w /etc/vector/*.yaml 2>vector.log & 
  • 1
  • 2
  • 3
  • 4
  • 5

-w参数,意思是关注配置文件中的更改,并相应地重新加载;
再次提醒:即使是多个配置文件,在同一个vector实例中各阶段的命名也不能重名

4.5、查看结果

可以看到对应的消费者已经自动创建
在这里插入图片描述
再查看ES的索引也已经创建
在这里插入图片描述
nginx日志也被正常解析
在这里插入图片描述

5:安装clickhouse

5.1、下载安装包

https://github.com/ClickHouse/ClickHouse/releases/download/v24.3.2.23-lts/clickhouse-client-24.3.2.23-amd64.tgz

https://github.com/ClickHouse/ClickHouse/releases/download/v24.3.2.23-lts/clickhouse-common-static-24.3.2.23-amd64.tgz

https://github.com/ClickHouse/ClickHouse/releases/download/v24.3.2.23-lts/clickhouse-common-static-dbg-24.3.2.23-amd64.tgz

https://github.com/ClickHouse/ClickHouse/releases/download/v24.3.2.23-lts/clickhouse-server-24.3.2.23-amd64.tgz
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

5.2、安装

tar -zxvf clickhouse-common-static-24.3.2.23-amd64.tgz 
tar -zxvf clickhouse-common-static-dbg-24.3.2.23-amd64.tgz 
tar -zxvf clickhouse-server-24.3.2.23-amd64.tgz 
tar -zxvf clickhouse-client-24.3.2.23-amd64.tgz 

# 安装的时候会提示设置密码,记得记住密码,不然要去改XML文件
./clickhouse-common-static-24.3.2.23/install/doinst.sh 
./clickhouse-common-static-dbg-24.3.2.23/install/doinst.sh 
./clickhouse-server-24.3.2.23/install/doinst.sh 
./clickhouse-client-24.3.2.23/install/doinst.sh
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

5.3、启动

重新修改托管文件

vim /usr/lib/systemd/system/clickhouse-server.service
  • 1
[Unit]
Description=ClickHouse Server
After=network.target

[Service]
Type=simple
User=clickhouse
ExecStart=/usr/bin/clickhouse-server --config-file /etc/clickhouse-server/config.xml
Restart=always
RestartSec=30

[Install]
WantedBy=multi-user.target
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

启动

systemctl daemon-reload
systemctl restart clickhouse-server
systemctl status clickhouse-server
  • 1
  • 2
  • 3

6:安装ClickVisual

ClickVisual 的安装依赖 MySQL 和 Redis,可以查看我的历史博客。

6.1、下载

https://github.com/clickvisual/clickvisual/releases/download/v1.0.1-rc1/clickvisual-v1.0.1-rc1-linux-amd64.tar.gz
  • 1

6.2、安装

mkdir clickvisual && cd clickvisual
tar -xvf clickvisual-v1.0.1-rc1-linux-amd64.tar.gz
  • 1
  • 2

6.3、修改配置文件

[redis]
debug = true
addr = "127.0.0.1:6379"
writeTimeout = "3s"
password = "**********"

[mysql]
debug = true
# database DSN
dsn = "root:************@tcp(127.0.0.1:3306)/clickvisual?charset=utf8mb4&collation=utf8mb4_general_ci&parseTime=True&loc=Local&readTimeout=1s&timeout=1s&writeTimeout=3s"
# log level
level = "debug"
# maximum number of connections in the idle connection pool for database
maxIdleConns = 5
# maximum number of open connections for database
maxOpenConns = 10
# maximum amount of time a connection
connMaxLifetime = "300s"
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

6.4、执行数据库初始文件

mysql -uroot -p < scripts/migration/database.sql
  • 1

6.5、启动

nohup ./clickvisual server &>stdout.log &
  • 1

6.6、创建实例

启动成功会监听 19011 ,第一次浏览器访问ClickVisual 会提示你进行表结构初始化,初始账号密码是 clickvisual/clickvisual。
在这里插入图片描述
在这里插入图片描述

6.7、创建数据库和表

进入日志菜单,可以看到刚才添加的 ClickHouse 实例,右键添加数据库(一个 ClickHouse 实例里可以创建多个数据库,跟 MySQL 一样,我这里直接取名 db01,你随意 ):
在这里插入图片描述
之后在 db01 上右键,新增日志库。数据表通常填成 topic 名字就行,其实就是 log stream 的名字。source 字段很关键,ClickVisual 会根据 source 来提取日志。source 里填 json 结构,不用填真实内容,只要填一个假数据结构,ClickVisual 能推断出各个字段的类型就行。
在这里插入图片描述
在这里插入图片描述
确定之后,ClickVisual 自动填充了相关字段,然后,我们补齐剩下的 Kafka 信息即可。确定之后,稍等几秒钟,就可以看到数据了,我的截图如下:
在这里插入图片描述

参考文章
ClickHouse + ClickVisual构建日志平台
vector实践–性能吊打logstash
vector实践–推送nginx日志到clickhouse
ES8生产实践——日志清洗过滤(vector方案)
ES8生产实践——数据查询与数据可视化(Grafana)

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/552756
推荐阅读
  

闽ICP备14008679号