赞
踩
前言
搭建一套可观测的日志分析平台离不开ETL技术。什么是ETL?ETL 是指 Extract、Transform、Load 的缩写,是一种常见的数据处理模式,用于将数据从一个数据源抽取(Extract)出来,经过转换(Transform)后加载(Load)到目标数据仓库或数据库中。如果数据源是日志文件,那么最通用的技术栈是ELK。相信很多研发人员对这三个字母并不陌生,ELK是Elasticsearch、Logstash、Kibana的首字母,是一个非常流行的开源工具组合,在日志的收集、转换、搜索和分析中得到非常广泛的应用。ELK 架构的简单示例如下图所示:
Beats:通常指filebeat,用于收集日志文件数据的工具,具有轻量级和低资源消耗的特点。我们使用categra
替代filebeat.
Kafka:用于实现日志数据的传输、缓冲和分发
Logstash:用于收集、抽取、转换和发送日志和事件数据的工具,日志经过处理后发送到Elasticsearch。我们使用vector
替换Logstash。
Elasticsearch:用于存储、索引和分析日志和事件数据的分布式搜索和分析引擎,具有强大的搜索和聚合功能。我们使用Clickhouse
替换Elasticsearch。
Kibana:用于可视化和分析 Elasticsearch 中存储的数据的开源数据可视化工具。我们使用ClickVisual
或者grafana
替换Kibana。
调整后的架构如下图
categraf的安装部署在官网上已经写的非常详细,这里就不再写了,自行去查看官方文档就好。categraf采用了All-in-one 的设计理念,可以同时支持指标、日志、链路追踪数据的采集。
https://flashcat.cloud/download/categraf/
https://flashcat.cloud/docs/content/flashcat-monitor/categraf/2-installation/
主要修改conf/config.toml
配置文件,在下面这个配置文件中同时采集了java日志和nginx日志,并发送到不同的kafka的topic中。
[logs] ## just a placholder api_key = "127.0.0.1" ## enable log collect or not enable = true ## the server receive logs, http/tcp/kafka, only kafka brokers can be multiple ip:ports with concatenation character "," send_to = "127.0.0.1:5044" ## send logs with protocol: http/tcp/kafka send_type = "kafka" topic = "crj_server" ## send logs with compression or not use_compression = false # gzip压缩级别,0 表示不压缩, 1-9 表示压缩级别 compression_level=0 #kafka支持的压缩 none gzip snappy lz4 zstd compression_codec="none" ## use ssl or not send_with_tls = false ## send logs in batchs batch_wait = 5 ## save offset in this path run_path = "/opt/categraf/run" ## max files can be open open_files_limit = 100 ## scan config file in 10 seconds scan_period = 10 ## read buffer of udp frame_size = 9000 ## channal size, default 100 ## 读取日志缓冲区,行数 chan_size = 10000 ## 有多少线程处理日志 pipeline=10 ## configuration for kafka ## 指定kafka版本 kafka_version="3.7.0" # 默认0 表示按照读取顺序串行写入kafka,如果对日志顺序有要求,保持默认配置 batch_max_concurrence = 100 # 发送缓冲区的大小(行数),如果设置比chan_size小,会自动设置为跟chan_size相同 batch_max_size=10000 # 每次最大发送的内容上限 默认1000000 Byte (与batch_max_size先到者触发发送) batch_max_content_size=900000 # client timeout in seconds producer_timeout= 10 # 是否开启sasl模式 sasl_enable = false sasl_user = "admin" sasl_password = "admin" # PLAIN sasl_mechanism= "PLAIN" # v1 sasl_version=1 # set true sasl_handshake = true # optional # sasl_auth_identity="" ## #ent-v0.3.50以上版本新增,是否开启pod日志采集 enable_collect_container=false #是否采集所有pod的stdout stderr collect_container_all = false ## glog processing rules # [[logs.processing_rules]] ## single log configure [[logs.items]] ## file/journald/tcp/udp type = "file" ## type=file, path is required; type=journald/tcp/udp, port is required path = "/data/crj-server/bin/logs/sys-info.log" source = "java" service = "crj_server_info" [[logs.items]] type = "file" path = "/data/crj-server/bin/logs/sys-error.log" source = "java" service = "crj_server_error" [[logs.items.log_processing_rules]] type = "multi_line" name = "new_line_with_date" pattern="\\d{4}-\\d{2}-\\d{2}" [[logs.items]] type = "file" path = "/usr/local/nginx/logs/error.log" source = "nginx" service = "crj_nginx_error" topic = "crj_nginx" [[logs.items]] type = "file" path = "/usr/local/nginx/logs/access.log" source = "nginx" service = "crj_nginx_access" topic = "crj_nginx"
下载
https://dlcdn.apache.org/kafka/3.7.0/kafka_2.13-3.7.0.tgz
解压
tar -zxvf kafka_2.13-3.7.0.tgz && mv kafka_2.13-3.7.0/ kafka/ && cd kafka
kafka的启动依赖于zookeeper,先修改 config/zookeeper.properties,调整 dataDir 配置
cd /data/kafka/config
cp zookeeper.properties zookeeper.properties.bak
vim zookeeper.properties
dataDir=/data/kafka/zookeeper/data #数据持久化目录
dataLogDir=/data/kafka/zookeeper/logs #日志目录
clientPort=2181
maxClientCnxns=0
admin.enableServer=false
修改 Kafka 的配置:config/server.properties
,修改 log.dirs,也是更换一下目录,不要使用 /tmp
。
cp server.properties server.properties.bak
vim server.properties
broker.id=0 #注意这里要写成具体的IP,不然categraf推送日志会出现推送到本机的问题 listeners=PLAINTEXT://127.0.0.1:5044 advertised.listeners=PLAINTEXT://127.0.0.1:5044 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/data/kafka/logs num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.retention.check.interval.ms=300000 zookeeper.connect=localhost:2181 zookeeper.connection.timeout.ms=18000 group.initial.rebalance.delay.ms=0
启动zookeeper
./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
启动 Kafka
nohup bin/kafka-server-start.sh config/server.properties &> kafka.stdout &
检查 Zookeeper 和 Kafka 是否启动成功,可以使用 jps
命令查看进程,也可以使用 netstat -tunlp
查看端口。
[root@srd-127.0.0.1 kafka]# jps
10725 Jps
10025 Kafka
3724 QuorumPeerMain
https://github.com/redpanda-data/console/releases/download/v2.4.5/redpanda_console_2.4.5_linux_amd64.tar.gz
tar -xvf redpanda_console_2.4.5_linux_amd64.tar.gz
export KAFKA_BROKERS=127.0.0.1:5044
nohup /data/redpanda-console/redpanda-console &> /data/redpanda-console/redpanda-console.stdout &
redpanda会默认启动8080端口,页面长这样
https://github.com/vectordotdev/vector/releases/tag/v0.37.1
rpm -ivh vector-0.37.1-1.x86_64.rpm
vector的配置文件默认在/etc/vector/
目录下面
vector分为三个模块,分别为source
、transforms
、sinks
,并且都可以配置多个元素,但是要保证不能同名
vector中推荐日志清洗transforms推荐使用VRL来实现,配置类型为remap。VRL中配置了大量的常见日志转换的函数,例如parse_nginx_log
函数。具体的配置规则可以查看官方文档,写的非常详细。Remap with VRL参考文档,并且vector提供了在线调试平台,可以在线测试代码的合法性VRL在线调试,用法如下:
实战
# 配置java log4j打出来的info,error日志的解析规则
vim /etc/vector/vector_java.yaml
sources: police_java_log: type: kafka bootstrap_servers: 127.0.0.1:5044 group_id: vector_java_consumer topics: - "^.*server$" # 支持正则表达式 transforms: police_java_log_transform: type: remap inputs: # 匹配输入源 - police_java_log source: | . = parse_json!(.message) .java_log,err = parse_regex(.message, r'^(?<timestamp>\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})\s+\[(?<thread>[^\]]+)\]\s+(?<logLevel>[A-Z]+)\s+(?<javaClass>[^\s]+)\s+-\s+(?<content>.*)$') if err != null { . = merge(., .java_log) } .timestamp = parse_timestamp(.timestamp, "%Y/%m/%d %H:%M:%S %z") ?? now() del(.status) sinks: police_java_es: type: elasticsearch inputs: ["police_java_log_transform"] # 匹配转换配置 #mode: "data_stream" # 数据流方式写入 auth: # es认证信息 strategy: "basic" user: "elastic" password: "***********" endpoints: ["https://127.0.0.1:9200"] # es连接地址 bulk: index: "{{ fcservice }}-{{ fcsource }}-%Y-%m-%d" tls: # tls证书配置 verify_certificate: false # 跳过证书验证 ca_file: "/etc/vector/http_ca.crt" # ca证书路径
配置nginx的access,error日志解析规则
vim /etc/vector/vector_nginx.yaml
sources: police_nginx_log: type: kafka bootstrap_servers: 127.0.0.1:5044 group_id: vector_nginx_consumer topics: - "^.*nginx$" transforms: police_nginx_log_transform: type: remap inputs: # 匹配输入源 - police_nginx_log source: | . = parse_json!(.message) del(.@metadata) .nginx_log = parse_nginx_log(.message, "combined") ?? parse_nginx_log!(.message, "error") . = merge(., .nginx_log) del(.nginx_log) if .status == "info" { del(.status) } sinks: police_nginx_es: type: elasticsearch inputs: ["police_nginx_log_transform"] # 匹配转换配置 #mode: "data_stream" # 数据流方式写入 auth: # es认证信息 strategy: "basic" user: "elastic" password: "*************" endpoints: ["https://127.0.0.1:9200"] # es连接地址 bulk: index: "{{ fcservice }}-{{ fcsource }}-%Y-%m-%d" # 根据日志的标签,按照日志创建es的索引 tls: # tls证书配置 verify_certificate: false # 跳过证书验证 ca_file: "/etc/vector/http_ca.crt" # ca证书路径
如果把数据推送到clickhouse,需要预先在clickhosue创建表结构,不符合我目前的需求,所以这里先推送到ES。推送到ClickHouse可以查看官方的示例,已经写的非常详细了。vector清洗nginx日志推送到clickhouse
#单文件启动
nohup vector -c /etc/vector/vector_java.yaml 2>vector.log &
#多文件启动
nohup vector -c /etc/vector/*.yaml -w /etc/vector/*.yaml 2>vector.log &
-w参数,意思是关注配置文件中的更改,并相应地重新加载;
再次提醒:即使是多个配置文件,在同一个vector实例中各阶段的命名也不能重名
可以看到对应的消费者已经自动创建
再查看ES的索引也已经创建
nginx日志也被正常解析
https://github.com/ClickHouse/ClickHouse/releases/download/v24.3.2.23-lts/clickhouse-client-24.3.2.23-amd64.tgz
https://github.com/ClickHouse/ClickHouse/releases/download/v24.3.2.23-lts/clickhouse-common-static-24.3.2.23-amd64.tgz
https://github.com/ClickHouse/ClickHouse/releases/download/v24.3.2.23-lts/clickhouse-common-static-dbg-24.3.2.23-amd64.tgz
https://github.com/ClickHouse/ClickHouse/releases/download/v24.3.2.23-lts/clickhouse-server-24.3.2.23-amd64.tgz
tar -zxvf clickhouse-common-static-24.3.2.23-amd64.tgz
tar -zxvf clickhouse-common-static-dbg-24.3.2.23-amd64.tgz
tar -zxvf clickhouse-server-24.3.2.23-amd64.tgz
tar -zxvf clickhouse-client-24.3.2.23-amd64.tgz
# 安装的时候会提示设置密码,记得记住密码,不然要去改XML文件
./clickhouse-common-static-24.3.2.23/install/doinst.sh
./clickhouse-common-static-dbg-24.3.2.23/install/doinst.sh
./clickhouse-server-24.3.2.23/install/doinst.sh
./clickhouse-client-24.3.2.23/install/doinst.sh
重新修改托管文件
vim /usr/lib/systemd/system/clickhouse-server.service
[Unit]
Description=ClickHouse Server
After=network.target
[Service]
Type=simple
User=clickhouse
ExecStart=/usr/bin/clickhouse-server --config-file /etc/clickhouse-server/config.xml
Restart=always
RestartSec=30
[Install]
WantedBy=multi-user.target
启动
systemctl daemon-reload
systemctl restart clickhouse-server
systemctl status clickhouse-server
ClickVisual 的安装依赖 MySQL 和 Redis,可以查看我的历史博客。
https://github.com/clickvisual/clickvisual/releases/download/v1.0.1-rc1/clickvisual-v1.0.1-rc1-linux-amd64.tar.gz
mkdir clickvisual && cd clickvisual
tar -xvf clickvisual-v1.0.1-rc1-linux-amd64.tar.gz
[redis] debug = true addr = "127.0.0.1:6379" writeTimeout = "3s" password = "**********" [mysql] debug = true # database DSN dsn = "root:************@tcp(127.0.0.1:3306)/clickvisual?charset=utf8mb4&collation=utf8mb4_general_ci&parseTime=True&loc=Local&readTimeout=1s&timeout=1s&writeTimeout=3s" # log level level = "debug" # maximum number of connections in the idle connection pool for database maxIdleConns = 5 # maximum number of open connections for database maxOpenConns = 10 # maximum amount of time a connection connMaxLifetime = "300s"
mysql -uroot -p < scripts/migration/database.sql
nohup ./clickvisual server &>stdout.log &
启动成功会监听 19011 ,第一次浏览器访问ClickVisual 会提示你进行表结构初始化,初始账号密码是 clickvisual/clickvisual。
进入日志菜单,可以看到刚才添加的 ClickHouse 实例,右键添加数据库(一个 ClickHouse 实例里可以创建多个数据库,跟 MySQL 一样,我这里直接取名 db01,你随意 ):
之后在 db01 上右键,新增日志库。数据表通常填成 topic 名字就行,其实就是 log stream 的名字。source 字段很关键,ClickVisual 会根据 source 来提取日志。source 里填 json 结构,不用填真实内容,只要填一个假数据结构,ClickVisual 能推断出各个字段的类型就行。
确定之后,ClickVisual 自动填充了相关字段,然后,我们补齐剩下的 Kafka 信息即可。确定之后,稍等几秒钟,就可以看到数据了,我的截图如下:
参考文章
ClickHouse + ClickVisual构建日志平台
vector实践–性能吊打logstash
vector实践–推送nginx日志到clickhouse
ES8生产实践——日志清洗过滤(vector方案)
ES8生产实践——数据查询与数据可视化(Grafana)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。