赞
踩
对于线上大流量服务或者需要上报日志的nginx服务,每天会产生大量的日志,这些日志非常有价值。可用于计数上报、用户行为分析、接口质量、性能监控等需求。但传统nginx记录日志的方式数据会散落在各自nginx上,而且大流量日志本身对磁盘也是一种冲击。
我们需要把这部分nginx日志统一收集汇总起来,收集过程和结果需要满足如下需求:
1、支持不同业务获取数据,如监控业务,数据分析统计业务等。
2、数据实时性
3、高性能保证
openresty: http://openresty.org
kafka: http://kafka.apache.org
lua-resty-kafka: https://github.com/doujiang24/lua-resty-kafka
得益于openresty和kafka的高性能,我们可以非常轻量高效的实现当前需求,架构如下:
1、 安装openresty,记得安装nginx的监控模块
2、 安装kafka
3、 下载lua+kafka插件:https://github.com/doujiang24/lua-resty-kafka/archive/master.zip
4、 解压插件,将lua-resty-kafka-master\lib\resty\kafka文件夹放到openresty/lualib/resty/下
可能会遇到的问题
l Nginx文件:nginx.conf
worker_processes 1; events { worker_connections 1024; } http { include mime.types; default_type application/octet-stream; sendfile on; keepalive_timeout 65; #开启共享字典,设置内存大小为10M,供每个nginx的线程消费 lua_shared_dict shared_data 10m; #配置本地域名解析 resolver 127.0.0.1; server { listen 80; server_name localhost; #charset koi8-r; #access_log logs/host.access.log main; location / { #root html; #index index.html index.htm; #开启nginx监控 stub_status on; #加载lua文件 default_type text/html; content_by_lua_file /usr/local/openresty/nginx/conf/controller.lua; } error_page 500 502 503 504 /50x.html; location = /50x.html { root html; } } }
l Lua文件:controller.lua
下面代码中的字段信息说明:
--数据采集阈值限制,如果lua采集超过阈值,则不采集 local DEFAULT_THRESHOLD = 100000 -- kafka分区数 local PARTITION_NUM = 6 -- kafka主题名称 local TOPIC = 'B2CDATA_COLLECTION3' -- 轮询器共享变量KEY值 local POLLING_KEY = "POLLING_KEY" -- kafka集群(定义kafka broker地址,ip需要和kafka的host.name配置一致) local function partitioner(key, num, correlation_id) return tonumber(key) end --kafka broker列表 local BROKER_LIST = {{host="192.168.56.112",port=9092},{host="192.168.56.113",port=9092},{host="192.168.56.154",port=9092}} --kafka参数, local CONNECT_PARAMS = { producer_type = "async", socket_timeout = 30000, flush_time = 10000, request_timeout = 20000, partitioner = partitioner } -- 共享内存计数器,用于kafka轮询使用 local shared_data = ngx.shared.shared_data local pollingVal = shared_data:get(POLLING_KEY) if not pollingVal then pollingVal = 1 shared_data:set(POLLING_KEY, pollingVal) end --获取每一条消息的计数器,对PARTITION_NUM取余数,均衡分区 local partitions = '' .. (tonumber(pollingVal) % PARTITION_NUM) shared_data:incr(POLLING_KEY, 1) -- 并发控制 local isGone = true --获取ngx.var.connections_active进行过载保护,即如果当前活跃连接数超过阈值进行限流保护 if tonumber(ngx.var.connections_active) > tonumber(DEFAULT_THRESHOLD) then isGone = false end -- 数据采集 if isGone then local time_local = ngx.var.time_local if time_local == nil then time_local = "" end local request = ngx.var.request if request == nil then request = "" end local request_method = ngx.var.request_method if request_method == nil then request_method = "" end local content_type = ngx.var.content_type if content_type == nil then content_type = "" end ngx.req.read_body() local request_body = ngx.var.request_body if request_body == nil then request_body = "" end local http_referer = ngx.var.http_referer if http_referer == nil then http_referer = "" end local remote_addr = ngx.var.remote_addr if remote_addr == nil then remote_addr = "" end local http_user_agent = ngx.var.http_user_agent if http_user_agent == nil then http_user_agent = "" end local time_iso8601 = ngx.var.time_iso8601 if time_iso8601 == nil then time_iso8601 = "" end local server_addr = ngx.var.server_addr if server_addr == nil then server_addr = "" end local http_cookie = ngx.var.http_cookie if http_cookie == nil then http_cookie = "" end --封装数据 local message = time_local .."#CS#".. request .."#CS#".. request_method .."#CS#".. content_type .."#CS#".. request_body .."#CS#".. http_referer .."#CS#".. remote_addr .."#CS#".. http_user_agent .."#CS#".. time_iso8601 .."#CS#".. server_addr .."#CS#".. http_cookie; --引入kafka的producer local producer = require "resty.kafka.producer" --创建producer local bp = producer:new(BROKER_LIST, CONNECT_PARAMS) --发送数据 local ok, err = bp:send(TOPIC, partitions, message) --打印错误日志 if not ok then ngx.log(ngx.ERR, "kafka send err:", err) return end end
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。