赞
踩
模拟线上的实时流,比如用户的操作日志,采集到数据后,进行处理,暂时只考虑数据的采集,使用Html+Jquery+Nginx+Ngx_kafka_module+Kafka来实现,其中Ngx_kafka_module 是开源的专门用来对接Nginx和Kafka的一个组件。
用户id:user_id, 访问时间:act_time, 操作: (action,包括click,job_collect,cv_send,cv_upload), 企业编码job_code
- $ cd /usr/local/src
- $ git clone https://github.com/edenhill/librdkafka
- # 进入到librdkafka,然后进行编译
- $ cd librdkafka
- $ yum install -y gcc gcc-c++ pcre-devel zlib-devel
- $ ./configure
- $ make && make install
- $ yum -y install make zlib-devel gcc-c++ libtool openssl openssl-devel
- $ cd /opt/software
- # 1.下载
- $ wget http://nginx.org/download/nginx-1.18.0.tar.gz
- # 2.解压
- $ tar -zxf nginx-1.18.0.tar.gz -C /opt/servers
- # 3. 下载模块源码
- $ cd /opt/software
- $ git clone git@github.com:brg-liuwei/ngx_kafka_module.git
- # 4. 编译
- $ cd /opt/servers/nginx-1.18.0
- $ ./configure --add-module=/opt/software/ngx_kafka_module/
- $ make && make install
- # 5.删除Nginx安装包
- $ rm /opt/software/nginx-1.18.0.tar.gz
- # 6.启动nginx
- $ cd /opt/servers/nginx-1.18.0/sbin
- $ nginx

- #user nobody;
- worker_processes 1;
-
- #error_log logs/error.log;
- #error_log logs/error.log notice;
- #error_log logs/error.log info;
-
- #pid logs/nginx.pid;
-
-
- events {
- worker_connections 1024;
- }
-
-
- http {
- include mime.types;
- default_type application/octet-stream;
-
- #log_format main '$remote_addr - $remote_user [$time_local] "$request" '
- # '$status $body_bytes_sent "$http_referer" '
- # '"$http_user_agent" "$http_x_forwarded_for"';
-
- #access_log logs/access.log main;
-
- sendfile on;
- #tcp_nopush on;
-
- #keepalive_timeout 0;
- keepalive_timeout 65;
-
- #gzip on;
-
- #此处配置的是和kafka进行整合,配置kafka的broker的地址
- kafka;
- kafka_broker_list linux121:9092;
-
- server {
- listen 6666;
- server_name localhost;
-
- add_header 'Access-Control-Allow-Origin' $http_origin;
- add_header 'Access-Control-Allow-Credentials' 'true';
- add_header 'Access-Control-Allow-Methods' 'GET, POST, OPTIONS';
- add_header 'Access-Control-Allow-Headers' 'DNT,web-token,app-token,Authorization,Accept,Origin,Keep-Alive,User-Agent,X-Mx-ReqToken,X-Data-Type,X-Auth-Token,X-Requested-With,If-Modified-Since,Cache-Control,Content-Type,Range';
- add_header 'Access-Control-Expose-Headers' 'Content-Length,Content-Range';
-
- #charset koi8-r;
-
- #access_log logs/host.access.log main;
-
- location / {
- root html;
- index index.html index.htm;
- }
-
- #配置要消费的kafka分区,如果topic不存在会自动创建
- location = /kafka/mytopic {
- kafka_topic tp_individual;
- }
- #可以配置多个topic
- location = /kafka/topic2 {
- kafka_topic user;
- }
- location = /job {
- root html;
- index jobPage.html;
- }
- }
- }

- # 创建topic
- kafka-topics.sh --zookeeper linux121:2181/myKafka --create --topic tp_individual --partitions 1 --replication-factor 1
- # 创建消费者
- kafka-console-consumer.sh --bootstrap-server linux121:9092 --topic tp_individual --from-beginning
- # 创建生产者测试
- kafka-console-producer.sh --broker-list linux121:9092 --topic tp_individual
- <!DOCTYPE html>
- <html lang="en">
- <head>
- <meta charset="UTF-8">
- <title>kafka job</title>
- <script src="https://code.jquery.com/jquery-3.0.0.min.js"></script>
- </head>
- <body>
- <div>
- <button id="click" type="button" onclick="post('click')">点击</button>
- <button id="collect" type="button" onclick="post('job_collect')">收藏</button>
- <button id="send" type="button" onclick="post('cv_send')">上传简历</button>
- <button id="upload" type="button" onclick="post('cv_upload')">投递简历</button>
- </div>
- </body>
-
- <script type="text/javascript">
- function post(action) {
- let json = {'user_id': 'u_donald', 'act_time': current().toString(), 'action': action, 'job_code': 'donald'};
- alert("begin")
- alert(json)
- $.ajax({
- url: "http://linux121:6666/kafka/mytopic",
- type:"POST" ,
- crossDomain: true,
- data: JSON.stringify(json),
- xhrFields: {
- withCredentials: true //跨域访问
- },
- success:function (data, status, xhr) {
- alert("操作成功:'" + action)
- },
- error:function (err) {
- alert(err.responseText);
- }
- });
- }
-
- function current() {
- let d = new Date(),
- str = '';
- str += d.getFullYear() + '-';
- str += d.getMonth() + 1 + '-';
- str += d.getDate() + ' ';
- str += d.getHours() + ':';
- str += d.getMinutes() + ':';
- str += d.getSeconds();
- return str;
- }
- </script>
- </html>

运行页面点击效果:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。