赞
踩
用户行为统计是常见需求,当用户点击某按钮或者进入某页面的时候将行为日志发送给后端进行运算保存。通过用户行为日志分析可以更好地捕捉用户喜好,开发出让用户喜欢的产品。
zookeeper.connect=zk1:2181
broker.id=x
log.dirs=/xxx/xxx
advertised.listeners=PLAINTEXT://172.23.x.x:9092
#禁止自动创建主题
auto.create.topics.enable=false
#允许删除主题
delete.topic.enable=true
bin/kafka-server-start.sh config/server.properties &
./bin/zkCli.sh -server zk1:2181
ls /brokers/ids
bin/kafka-topics.sh --create --zookeeper 172.23.x.x:2181 --replication-factor 1 --partitions 2 --topic usr-log
bin/kafka-topics.sh --describe --zookeeper 172.23.x.x:2181 usr-log
topic被存储在2个分区中,并且只有一个leader副本,不做冗余备份。
安装流程
echo "/usr/local/lib" >> /etc/ld.so.conf
ldconfig
./configure --prefix=/usr/local/src/nginx_1.18.0 --with-http_ssl_module --add-module=/usr/local/src/ngx_kafka_module
官方安装核心步骤图
http {
kafka;
kafka_broker_list 172.23.x.x:9092 172.23.x.x:9092;
server {
listen 80;
server_name localhost;
location /log/usr {
kafka_topic usr-log;
}
}
}
curl -d '{"uid": 1, "action": "2"}' -H 'Content-Type: application/json' http://localhost/log/usr
./kafka-console-consumer.sh --bootstrap-server 172.23.x.x:9092,172.23.x.x:9092 --topic usr-log
<properties> <flink.version>1.10.3</flink.version> <lombok.version>1.18.10</lombok.version> <fastjson.version>1.2.49</fastjson.version> <maven.compiler.version>3.5.1</maven.compiler.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <maven.compiler.encoding>UTF-8</maven.compiler.encoding> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.12</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.12</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-jdbc_2.12</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.12</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <scope>provided</scope> <version>${lombok.version}</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>${fastjson.version}</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.25</version> </dependency> </dependencies>
// 构建流执行环境 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // kafka 配置 Properties props = new Properties(); props.put("bootstrap.servers", "172.23.x.x:9092,172.23.x.x:9092"); props.put("zookeeper.connect", "zk1:2181"); props.put("group.id", "flink"); props.put("client.id", "flink-1"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 props.put("auto.offset.reset", "latest"); props.put("enable.auto.commit", "true"); DataStreamSource<String> dataStreamSource = env.addSource( new FlinkKafkaConsumer<String>( "usr-log", new SimpleStringSchema(), props)) //单线程打印,控制台不乱序,不影响结果 .setParallelism(1); //从kafka里读取数据 dataStreamSource.timeWindowAll( Time.seconds(5L) ).apply(new AllWindowFunction<String, List<String>, TimeWindow>() { @Override public void apply(TimeWindow window, Iterable<String> iterableValues, Collector<List<String>> out) throws Exception { List<String> strList = Lists.newArrayList(iterableValues); if ( strList.isEmpty() ){ return; } out.collect(strList); } }).addSink( new JdbcPersistence() );sink 到数据库 env.execute("nginx log analyse running");
/** * @function:jdbc持久化器 */ public class JdbcPersistence extends RichSinkFunction<List<String>> { @Override public void invoke(List<String> values, Context context) throws Exception { //TODO 解析日志 for (String log : values) { System.out.println(log); } //TODO 构建统计结果 //TODO 入库 } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。