当前位置:   article > 正文

nginx日志发送到kafka_nginx 转发kafka

nginx 转发kafka

由于业务需要,原有的架构在对接过程中,收集数据存在丢失的风险,经过方案对比等确定了现有的方案,现在把部署的一些流程在此做记录,供需要有此需要的同学参考

1.nginx日志格式化
       log_format main escape=json '{ "@timestamp": "$time_iso8601", '
        '"remote_addr": "$remote_addr", '
        '"remote_user": "$remote_user", '
        '"referer": "$http_referer", '
        '"request": "$request", '
        '"status": $status, '
        '"bytes": "$body_bytes_sent", '
        '"agent": "$http_user_agent", '
        '"x_forwarded": "$http_x_forwarded_for", '
        '"http_referer": "$http_referer", '
        '"up_addr": "$upstream_addr",'
        '"dm": "$request_body",'
        '"up_host": "$upstream_http_host",'
        '"up_resp_time": "$upstream_response_time",'
        '"request_time": "$request_time"'
        ' }';


添加如上代码到nginx.conf文件中。

2.输出nginx的日志到自己想要的文件,注意此步

location /student_live{
      #proxy_set_header X-Real-IP $remote_addr;
      #proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
      #proxy_set_header Host $http_host;
      proxy_set_header X-NginX-Proxy true;
      proxy_redirect off;
      proxy_method GET;
      proxy_pass https://xxxxx/aa.html;
      access_log /var/log/nginx/user_defined.log main;
    }
 

 

 首先这个请求是post进来,post请求日志不在这里自定义输出的话,日志中是没有办法收集到格式化后的请求参数,上图配置可以把请求参数打到日志文件中。(这步折腾了很久,也没有找到其他的方式,最后这么简单的一句解决。)

3.安装logstash,版本logstash-8.3.3-linux-x86_64.tar.gz

修改配置文件:
input {
    file {
        # 指定日志路径,可以采用通用如:access.*
        path => ["/var/log/nginx/user_defined.log"]
        start_position => "beginning"
        codec => "json"
        type => "nginx"
    }
}


#过滤无用字段,仅保留message内容
filter{
     mutate {
        remove_field => ["host"]
        remove_field => ["agent"]
        remove_field => ["ecs"]
        remove_field => ["tags"]
        remove_field => ["fields"]
        remove_field => ["@version"]
        remove_field => ["@timestamp"]
        remove_field => ["input"]
        remove_field => ["log"]
        remove_field => ["container"]
        remove_field => ["event"]
        remove_field => ["referer"]
        remove_field => ["up_addr"]
        remove_field => ["up_host"]
        remove_field => ["type"]
        remove_field => ["up_resp_time"]
    }

}

output{
   stdout {codec => rubydebug}
   kafka {
    bootstrap_servers => "127.0.0.1:9092"
    topic_id => "student-live-record"
    compression_type => "snappy"
    codec => json
  }
}

 

启动命令放在这里:

 ./logstash -f /data/logstash/logstash-8.3.3/config/logstash.conf &

3.安装zookeeper,zookeeper版本apache-zookeeper-3.7.1-bin.tar.gz

wget --no-check-certificate  https://mirrors.aliyun.com/apache/zookeeper/zookeeper-3.7.1/apache-zookeeper-3.7.1-bin.tar.gz

安装zookeeper需要在bin的同级目录添加两个文件夹 data和logs

在zookper/apache-zookeeper-3.7.1-bin/conf 目录下基于zoo_sample.cfg复制一个zoo.cfg

添加如下配置:

dataDir = /data/zookper/apache-zookeeper-3.7.1-bin/data
dataLogDir = /data/zookper/apache-zookeeper-3.7.1-bin/logs
ps:此处有坑,这个版本的dataDir和dataLogDir 必须这么写,其他版本的不是这样的,如果是3.7.1之前的版本,需要注意一下这个变量的写法。

启动服务:
/usr/local/services/zookeeper/zookeeper-3.4.10/bin/zkServer.sh start
连接服务:
/usr/local/services/zookeeper/zookeeper-3.4.10/bin/zkCli.sh
查看服务状态:
/usr/local/services/zookeeper/zookeeper-3.4.10/bin/zkServer.sh status
停止服务:
/usr/local/services/zookeeper/zookeeper-3.4.10/bin/zkServer.sh stop
4.安装kafka,kafka版本kafka_2.12-3.0.1.tgz

 修改 vim config/server.properties  advertised.listeners=PLAINTEXT://127.0.0.1:9092

开始 bin/kafka-server-start.sh -daemon config/server.properties
停止 bin/kafka-server-stop.sh

创建topic命令
linux旧版本:bin/kafka-topics.sh --create --topic student-live-record --replication-factor 1 --partitions 1 --zookeeper 127.0.0.1:2181

linux新版本:./kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 1 --topic student-live-record

windows旧版本:.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic student-live-record

windows新版本:.\bin\windows\kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic student-live-record

生产打印: ./kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic student-live-record

监听打印: ./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092  --topic student-live-record --from-beginning

5.环境到此基本就装好了,然后写java服务消费kafka的消息即可。文档水平不佳,希望能给需要的人提供一点帮助。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/192689
推荐阅读
相关标签
  

闽ICP备14008679号