当前位置:   article > 正文

Kafka 业务日志采集最佳实践

Kafka 业务日志采集最佳实践

简介

Apache Kafka 是一个分布式流处理平台,主要用于构建实时数据流管道和应用程序。在收集业务日志的场景中,Kafka 可以作为一个消息中间件,用于接收、存储和转发大量的日志数据。将 Kafka 与其他系统(如 Elasticsearch、Flume、Spark Streaming 等)集成,以提供更丰富的日志处理和分析功能。本文提到的是和观测云集成,即通过观测云的采集器 Datakit 采集 Kafka 中的业务日志,下面通过一些例子了解下观测云的快速集成效果。

实践环境

前置条件

软件和中间件

  • Kafka3.2.0
  • Datakit采集器
  • JDK 8

硬件

  • 云服务器 CentOS7.9 64位 4vCPU,8GB 内存,100GB 云盘一台。

接入方案

准备 Kafka 环境

安装 Kafka

下载 3.2.0 版本,解压即可使用。

wget https://archive.apache.org/dist/kafka/3.2.0/kafka_2.13-3.2.0.tgz

注:目前 Datakit 支持的 Kafka 版本有[version:0.8.2 ~ 3.2.0]

启动 Zookeeper 服务
$ bin/zookeeper-server-start.sh config/zookeeper.properties
启动 KafkaServer
$ bin/kafka-server-start.sh config/server.properties
创建 Topic

创建名为 testlog 的 Topic 。

$ bin/kafka-topics.sh --create --topic testlog --bootstrap-server localhost:9092
启动 Producer
$ bin/kafka-console-producer.sh --topic testlog --bootstrap-server localhost:9092

安装 DataKit

参考官网文档安装 DataKit 采集器

  1. TOKEN 依据你的观测云工作空间来填写
  2. DK_DATAWAY=https://openway.guance.com?token=<TOKEN> bash -c "$(curl -L https://static.guance.com/datakit/install.sh)"

开启 Kafka 采集器

进入 DataKit 安装目录下 (默认是 /usr/local/datakit/conf.d/ ) 的 conf.d/kafkamq 目录,复制 kafkamq.conf.sample 并命名为 kafkamq.conf 。

类似如下:

  1. -rwxr-xr-x 1 root root 2574 Apr 30 23:52 kafkamq.conf
  2. -rwxr-xr-x 1 root root 2579 May 1 00:40 kafkamq.conf.sample

调制 kafka 采集器配置如下:

  • addrs = ["localhost:9092"],该文采集器 DataKit 和 Kafka 安装到同一台操作系统中,localhost 即可。
  • kafka_version = "3.2.0",该文使用 Kafka 的版本。
  • [inputs.kafkamq.custom],删除注释符号“#”。
  • [inputs.kafkamq.custom.log_topic_map],删除注释符号“#”。
  • "testlog"="log.p",testlog 为 Topic 的名字,log.p 为观测云 Pipeline 可编程数据处理器的日志字段提取规则配置。涉及的业务日志和 log.p 的内容详细见下面的《使用 Pipeline》。
  1. # {"version": "1.28.1", "desc": "do NOT edit this line"}
  2. [[inputs.kafkamq]]
  3. addrs = ["localhost:9092"]
  4. # your kafka version:0.8.2 ~ 3.2.0
  5. kafka_version = "3.2.0"
  6. group_id = "datakit-group"
  7. # consumer group partition assignment strategy (range, roundrobin, sticky)
  8. assignor = "roundrobin"
  9. ## rate limit.
  10. #limit_sec = 100
  11. ## sample
  12. # sampling_rate = 1.0
  13. ## kafka tls config
  14. # tls_enable = true
  15. # tls_security_protocol = "SASL_PLAINTEXT"
  16. # tls_sasl_mechanism = "PLAIN"
  17. # tls_sasl_plain_username = "user"
  18. # tls_sasl_plain_password = "pw"
  19. ## -1:Offset Newest, -2:Offset Oldest
  20. offsets=-1
  21. ## skywalking custom
  22. #[inputs.kafkamq.skywalking]
  23. ## Required!send to datakit skywalking input.
  24. #dk_endpoint="http://localhost:9529"
  25. #thread = 8
  26. #topics = [
  27. # "skywalking-metrics",
  28. # "skywalking-profilings",
  29. # "skywalking-segments",
  30. # "skywalking-managements",
  31. # "skywalking-meters",
  32. # "skywalking-logging",
  33. #]
  34. #namespace = ""
  35. ## Jaeger from kafka. Please make sure your Datakit Jaeger collector is open !!!
  36. #[inputs.kafkamq.jaeger]
  37. ## Required! ipv6 is "[::1]:9529"
  38. #dk_endpoint="http://localhost:9529"
  39. #thread = 8
  40. #source: agent,otel,others...
  41. #source = "agent"
  42. ## Required! topics
  43. #topics=["jaeger-spans","jaeger-my-spans"]
  44. ## user custom message with PL script.
  45. [inputs.kafkamq.custom]
  46. #spilt_json_body = true
  47. #thread = 8
  48. ## spilt_topic_map determines whether to enable log splitting for specific topic based on the values in the spilt_topic_map[topic].
  49. #[inputs.kafkamq.custom.spilt_topic_map]
  50. # "log_topic"=true
  51. # "log01"=false
  52. [inputs.kafkamq.custom.log_topic_map]
  53. "testlog"="log.p"
  54. # "log01"="log_01.p"
  55. #[inputs.kafkamq.custom.metric_topic_map]
  56. # "metric_topic"="metric.p"
  57. # "metric01"="rum_apm.p"
  58. #[inputs.kafkamq.custom.rum_topic_map]
  59. # "rum_topic"="rum_01.p"
  60. # "rum_02"="rum_02.p"
  61. #[inputs.kafkamq.remote_handle]
  62. ## Required!
  63. #endpoint="http://localhost:8080"
  64. ## Required! topics
  65. #topics=["spans","my-spans"]
  66. # send_message_count = 100
  67. # debug = false
  68. # is_response_point = true
  69. # header_check = false
  70. ## Receive and consume OTEL data from kafka.
  71. #[inputs.kafkamq.otel]
  72. #dk_endpoint="http://localhost:9529"
  73. #trace_api="/otel/v1/trace"
  74. #metric_api="/otel/v1/metric"
  75. #trace_topics=["trace1","trace2"]
  76. #metric_topics=["otel-metric","otel-metric1"]
  77. #thread = 8
  78. ## todo: add other input-mq

注意:开启或调整 DataKit 的配置,需重启采集器(shell 下执行 datakit service -R)。

使用 Pipeline

log.p 规则内容

  1. data = load_json(message)
  2. protocol = data["protocol"]
  3. response_code = data["response_code"]
  4. set_tag(protocol,protocol)
  5. set_tag(response_code,response_code)
  6. group_between(response_code,[200,300],"info","status")
  7. group_between(response_code,[400,499],"warning","status")
  8. group_between(response_code,[500,599],"error","status")
  9. time = data["start_time"]
  10. set_tag(time,time)
  11. default_time(time)

效果展示

发送业务日志样例

业务日志样例文件如下:

  1. #info
  2. {"protocol":"HTTP/1.1","upstream_local_address":"172.20.32.97:33878","response_flags":"-","istio_policy_status":null,"trace_id":"5532224c1013b9ad6da1efe88778dd64","authority":"server:1338","method":"PUT","response_code":204,"duration":83,"upstream_service_time":"83","user_agent":"Jakarta Commons-HttpClient/3.1","bytes_received":103,"downstream_local_address":"172.21.2.130:1338","start_time":"2024-05-01T00:37:11.230Z","upstream_transport_failure_reason":null,"requested_server_name":null,"bytes_sent":0,"route_name":"routes","x_forwarded_for":"10.0.0.69,10.23.0.31","upstream_cluster":"outbound|1338|svc.cluster.local","request_id":"80ac7d31-a598-4dc8-bb74-1850593f61e4","downstream_remote_address":"10.23.0.31:0","path":"/api/dimensions/items","upstream_host":"172.20.9.101:1338"}
  3. #error
  4. {"protocol":"HTTP/1.1","upstream_local_address":"172.20.32.97:33878","response_flags":"-","istio_policy_status":null,"trace_id":"5532224c1013b9ad6da1efe88778dd64","authority":"server:1338","method":"PUT","response_code":504,"duration":83,"upstream_service_time":"83","user_agent":"Jakarta Commons-HttpClient/3.1","bytes_received":103,"downstream_local_address":"172.21.2.130:1338","start_time":"2024-05-01T00:39:11.230Z","upstream_transport_failure_reason":null,"requested_server_name":null,"bytes_sent":0,"route_name":"routes","x_forwarded_for":"10.0.0.69,10.23.0.31","upstream_cluster":"outbound|1338|svc.cluster.local","request_id":"80ac7d31-a598-4dc8-bb74-1850593f61e4","downstream_remote_address":"10.23.0.31:0","path":"/api/dimensions/items","upstream_host":"172.20.9.101:1338"}
  5. #warn
  6. {"protocol":"HTTP/1.1","upstream_local_address":"172.20.32.97:33878","response_flags":"-","istio_policy_status":null,"trace_id":"5532224c1013b9ad6da1efe88778dd64","authority":"server:1338","method":"PUT","response_code":404,"duration":83,"upstream_service_time":"83","user_agent":"Jakarta Commons-HttpClient/3.1","bytes_received":103,"downstream_local_address":"172.21.2.130:1338","start_time":"2024-05-01T00:38:11.230Z","upstream_transport_failure_reason":null,"requested_server_name":null,"bytes_sent":0,"route_name":"routes","x_forwarded_for":"10.0.0.69,10.23.0.31","upstream_cluster":"outbound|1338|svc.cluster.local","request_id":"80ac7d31-a598-4dc8-bb74-1850593f61e4","downstream_remote_address":"10.23.0.31:0","path":"/api/dimensions/items","upstream_host":"172.20.9.101:1338"}

日志发送命令

在 Producer 启动后,分别发送如下三条日志内容,三条日志一条为 info 级别("response_code":204),另一条为 error 级别("response_code":504),最后一条为 warn 级别日志("response_code":404)。

  1. >{"protocol":"HTTP/1.1","upstream_local_address":"172.20.32.97:33878","response_flags":"-","istio_policy_status":null,"trace_id":"5532224c1013b9ad6da1efe88778dd64","authority":"server:1338","method":"PUT","response_code":204,"duration":83,"upstream_service_time":"83","user_agent":"Jakarta Commons-HttpClient/3.1","bytes_received":103,"downstream_local_address":"172.21.2.130:1338","start_time":"2024-04-30T08:47:11.230Z","upstream_transport_failure_reason":null,"requested_server_name":null,"bytes_sent":0,"route_name":"routes","x_forwarded_for":"10.0.0.69,10.23.0.31","upstream_cluster":"outbound|1338|svc.cluster.local","request_id":"80ac7d31-a598-4dc8-bb74-1850593f61e4","downstream_remote_address":"10.23.0.31:0","path":"/api/dimensions/items","upstream_host":"172.20.9.101:1338"}
  2. >
  3. >
  4. >
  5. >
  6. >{"protocol":"HTTP/1.1","upstream_local_address":"172.20.32.97:33878","response_flags":"-","istio_policy_status":null,"trace_id":"5532224c1013b9ad6da1efe88778dd64","authority":"server:1338","method":"PUT","response_code":504,"duration":83,"upstream_service_time":"83","user_agent":"Jakarta Commons-HttpClient/3.1","bytes_received":103,"downstream_local_address":"172.21.2.130:1338","start_time":"2024-04-30T08:47:11.230Z","upstream_transport_failure_reason":null,"requested_server_name":null,"bytes_sent":0,"route_name":"routes","x_forwarded_for":"10.0.0.69,10.23.0.31","upstream_cluster":"outbound|1338|svc.cluster.local","request_id":"80ac7d31-a598-4dc8-bb74-1850593f61e4","downstream_remote_address":"10.23.0.31:0","path":"/api/dimensions/items","upstream_host":"172.20.9.101:1338"}
  7. >
  8. >
  9. >
  10. >{"protocol":"HTTP/1.1","upstream_local_address":"172.20.32.97:33878","response_flags":"-","istio_policy_status":null,"trace_id":"5532224c1013b9ad6da1efe88778dd64","authority":"server:1338","method":"PUT","response_code":404,"duration":83,"upstream_service_time":"83","user_agent":"Jakarta Commons-HttpClient/3.1","bytes_received":103,"downstream_local_address":"172.21.2.130:1338","start_time":"2024-04-30T08:47:11.230Z","upstream_transport_failure_reason":null,"requested_server_name":null,"bytes_sent":0,"route_name":"routes","x_forwarded_for":"10.0.0.69,10.23.0.31","upstream_cluster":"outbound|1338|svc.cluster.local","request_id":"80ac7d31-a598-4dc8-bb74-1850593f61e4","downstream_remote_address":"10.23.0.31:0","path":"/api/dimensions/items","upstream_host":"172.20.9.101:1338"}

通过 DataKit 采集到 Kafka 的三条业务日志

使用 Pipeline 对业务日志进行字段提取

下图 protocol、response_code 以及 time 都是使用 Pipeline 提取后的效果。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/花生_TL007/article/detail/702254
推荐阅读
相关标签
  

闽ICP备14008679号