当前位置:   article > 正文

Flink电商指标分析项目(1)_项目整体工程搭建_flink 指标分析

flink 指标分析

一.Flink电商指标分析项目

项目背景

项目简介

公司有一个正在运营中的电商网站,名称叫做【品优购】。这是一个B2B2C的电商平台,类似京东。
现在我们想了解一下该电商网站的各种用户行为(访问行为、购物行为、点击行为等),统计出PV、UV等数据。
针对这样的大数据分析项目,我们可以采用MapReduce、Spark或者Flink来进行开发。
由于本项目会对实时数据和静态数据进行分析,所以我们采用性能更加优越的Flink来开发。
业务目标

  • 帮助产品经理、数据分析师以及管理人员分析现有产品的情况
  • 根据用户行为分析结果持续改进产品的设计
  • 调整公司战略和业务
  • 用大数据技术来帮助提升业绩、营业额以及市场占有率
**常见电商模式**
C2C--个人对个人 			案例:淘宝、瓜子二手车
B2B--企业对企业 			案例:阿里巴巴、慧聪网
B2C--企业对个人			案例:唯品会、乐蜂网
B2B2C -企业-企业-个人	案例:京东商城、天猫商城
C2B--个人对企业			案例:海尔商城、 尚品宅配
O2O--线上到线下			案例:美团、饿了么
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

多维度指标分析

用户指标
在这里插入图片描述
商品指标
在这里插入图片描述

**电商小概念: **
**SPU** = Standard Product Unit (标准产品单位)SPU是商品信息聚合的最小单位,是一组可复用、易检索的标准化信息的集合,该集合描述了一个产品的特性。	例如:iPhone X 可以确定一个产品即为一个SPU。
**SKU **= Stock Keeping Unit(库存量单位)。一款商品有多个颜色,则是有多个SKU,例:一件衣服,有红色、白色、蓝色,则SKU编码也不相同,如相同则会出现混淆,发错货。	例如:iPhone X 64G 银色 则是一个SKU。
  • 1
  • 2
  • 3

项目整体介绍

使用到的技术

  • 语言
    Java、scala
  • 框架
    Spring Boot、Hadoop、HBase、Kafka、Flink、Canal

项目整体流程

在这里插入图片描述

  1. 上报服务系统将商城访问日志推送到kafka
  2. 数据库同步系统将mysql数据推送到kafka
  3. 实时分析系统消费kafka数据,经过分析后,下沉到HBase
  4. 实时同步系统消费kafka数据,下沉到HBase
  5. 批处理分析系统从hbase取出数据,进行数据分析
  6. web可视化平台展示HBase中的分析结果数据

项目的技术选型

为什么要选择架构中的技术?

  • Kafka
  • Hbase
  • Canal
  • Flink

Kafka

Kafka是一种高吞吐量的分布式发布订阅消息系统。

  • 吞吐量高
    • 每秒达到几十万的数据
  • 速度快
    • 内存接力(消费者直接消费PageCache内存区数据)
  • 数据安全
    • 冗余机制可以有效避免消息丢失错误
    • 其中几台实例挂掉也可以继续工作
  • 高并发
    • 突发型流量可以将峰值的数据持久化,等到流量正常或低估时再取出消费

HBase

HBase是一个分布式的、面向列的开源数据库。

  • 业务不复杂
    • PV/UV、页面点击量、新鲜度(没有复杂SQL查询)
  • 实时电商平台
    • 存取海量的数据,一个表可以有上亿行,上百万列
  • 社区活跃
    Hbase社区非常大,Facebook、小米、网易等公司都在使用HBase
  • 高可用
    没有单点故障,高可用性高

Canal

数据库同步常用的有两种方案:

  • 方案1
    mysql --> logstash --> kafka --> flink --> hbase
  • 方案2
    mysql --> sqoop --> kafka --> flink-->hbase
    上述方案存在的问题
    logstash、sqoop还是需要使用SQL语句查询mysql,会给mysql增加压力,如果要跑大量数据的同步,会拖垮mysql
    解决方案
    mysql --> cannal(binlog) --> kafka --> flink --> hbase
    cannal
    Canal可以实时解析mysql的binlog日志,通过读取binlog日志,将数据输出到Kafka。不需要执行SQL语句,不会增加mysql压力

Flink

  • 速度要比Spark、MapReduce更快
  • 保证EXACTLY_ONCE
  • 容错更轻量级
  • 自动程序调优,自动避免性能消耗较大的操作(例如:shuffle、sort)
  • 高吞吐
问题:
为什么要选择基于canal来进行数据库同步技术?
  • 1
  • 2

项目整体工程搭建

工程结构

本项目采用Maven构建,下面是我们的项目的整体工程架构。总工程为pyg,下面包含我们要开发的5个子模块。
在这里插入图片描述
IDEA创建工程

  1. 在指定目录创建父模块pyg删除总工程的src目录
    在父模块配置Java版本为1.8
<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.1</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
    </plugins>
</build>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  1. 创建report模块
  2. 创建real-process实时处理Maven模块
  3. 创建canal-kafka数据采集Maven模块
  4. 创建sync-db数据库同步处理Maven模块
  5. 创建batch-process批数据处理Maven模块

上报服务系统开发

在这里插入图片描述
上报服务系统是一个Java Web工程,为了快速开发web项目,我们采用时下JavaWeb最流行的技术Spring Boot

Spring Boot简介

Spring Boot是一个基于Spring之上的快速应用构建框架。使用Spring Boot可以快速开发出基于Spring的应用。Spring Boot主要解决两方面的问题。

  • 依赖太多问题
    • 轻量级JavaEE开发,需要导入大量的依赖
    • 依赖之间还存在版本冲突
  • 配置太多问题
    • 大量的XML配置
      Spring Boot内部整合了大量的依赖,而且经过大量测试,选择的依赖都是没有版本冲突的。Spring Boot简化了大量的配置,通过少量的配置,就可以让程序工作。
      开发Spring Boot程序的基本步骤
  • 导入Spring Boot依赖(起步依赖)
  • 编写application.properties配置文件
  • 编写Application入口程序

导入Maven依赖

从目录中的pom.xml中拷贝依赖到report中的pom文件中
主要导入以下依赖:

  1. 导入Spring Boot依赖
  2. 操作JSON导入FastJSON依赖
  3. 导入Kafka依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.13.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.itheima</groupId>
    <artifactId>report</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>report</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <!-- 阿里云 maven-->
    <repositories>
        <repository>
            <id>alimaven</id>
            <name>alimaven</name>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
        </repository>
    </repositories>

    <dependencies>
        
		<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
                
        
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.47</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>1.0.6.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>http-client</artifactId>
            <version>2.0.1.RELEASE</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71

创建项目包结构

包名说明
com.itheima.report.controller存放Spring MVC的controller
com.itheima.report.bean存放相关的Java Bean实体类
com.itheima.report.util用来存放相关的工具类

验证Spring Boot工程是否创建成功

步骤

  1. 创建SpringBoot入口程序Application
  2. 创建application.properties配置文件
  3. 编写一个简单Spring MVC Controller/Handler,接收浏览器请求参数并打印回显
  4. 打开浏览器测试

实现

1.创建SpringBoot入口程序ReportApplication,用来启动SpringBoot程序
在类上添加以下注解

@SpringBootApplication
  • 1

在main方法中添加以下代码,用来运行Spring Boot程序

SpringApplication.run(ReportApplication.class);
  • 1

在这里插入图片描述

2.在创建一个TestController
注意在类上要添加@RestController注解

@RestController
public class TestController {

}
  • 1
  • 2
  • 3
  • 4

3.编写一个test Handler,从浏览器上接收一个叫做json参数,并打印回显

@RequestMapping("/test")
public String test(String json) {
    System.out.println(json);
    return json;
}
  • 1
  • 2
  • 3
  • 4
  • 5

4.编写application.properties配置文件
配置端口号(8888)

server.port=8888
  • 1

5.启动Spring Boot程序
6.打开浏览器测试Handler是否能够接收到数据
http://localhost:8888/test?json=123123
在这里插入图片描述

安装Kafka-Manager

Kafka-manager是Yahoo!开源的一款Kafka监控管理工具。

链接:https://pan.baidu.com/s/1K7DTfS-b0oI9D6tDhbeY0A 
提取码:0fbr 
  • 1
  • 2

安装步骤

  1. 上传kafka-manager-1.3.3.7.tar.gz
  2. 解压到/export/servers
tar -zxf kafka-manager-1.3.3.7.tar.gz -C /export/servers/
  • 1

3.修改conf/application.conf

kafka-manager.zkhosts="hadoop102:2181,hadoop103:2181,hadoop104:2181"
  • 1

4.启动zookeeper

zk.sh start
  • 1

5.启动kafka

kf.sh start
  • 1

6.直接运行bin/kafka-manager

cd /export/servers/kafka-manager-1.3.3.7
nohup bin/kafka-manager 2>&1 &
  • 1
  • 2

7.浏览器中使用hadoop102:9000访问即可

默认kafka-manager的端口号为`9000`,如果该端口被占用,请使用下面的命令修改端口

bin/kafka-manager -Dconfig.file=/export/servers/kafka-manager-1.3.3.7/conf/application.conf -Dhttp.port=10086
  • 1
  • 2
  • 3

编写Kafka生产者配置工具类

由于我们项目要操作Kafka, 我们先来构建出KafkaTemplate, 这是一个Kafka的模板对象, 通过它我们可以很方便的发送消息到Kafka.
开发步骤

  1. 编写Kafka生产者配置
  2. 编写Kafka生产者SpringBoot配置工具类KafkaProducerConfig,构建KafkaTemplate
    实现
    1.导入Kafka生产者配置文件
    将下面的代码拷贝到application.properties
#
# kakfa
#
#kafka的服务器地址
kafka.bootstrap_servers_config=hadoop102:9092,hadoop103:9092,hadoop104:9092
#如果出现发送失败的情况,允许重试的次数
kafka.retries_config=0
#每个批次发送多大的数据
kafka.batch_size_config=4096
#定时发送,达到1ms发送
kafka.linger_ms_config=1
#缓存的大小
kafka.buffer_memory_config=40960
#TOPIC的名字
kafka.topic=pyg
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
**定时定量**
1. kafka生产者发送一批数据的大小:kafka.producer.batch.size=4096  (单位:字节)
   实际环境可以调大一些,提高效率
2. 定时发送:kafka.producer.linger=1
   达到一毫秒后发送
  • 1
  • 2
  • 3
  • 4
  • 5

2.编写KafkaProducerConfig,主要创建KafkaTemplate,用于发送Kafka消息

  • 使用@Value(“${配置项}”)来读取配置
  • 构建DefaultKafkaProducerFactory
  • 构建KafkaTemplate
package com.itheima.report.util;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;

@Configuration //表示该类是一个配置类
public class KafkaProducerConfig {

    @Value("${kafka.bootstrap_servers_config}")
    private String bootstrap_servers_config;

    @Value("${kafka.retries_config}")
    private int retries_config;

    @Value("${kafka.batch_size_config}")
    private int batch_size_config;

    @Value("${kafka.linger_ms_config}")
    private int linger_ms_config;

    @Value("${kafka.buffer_memory_config}")
    private int buffer_memory_config;


    @Bean  //表示该对象是受spring所管理的一个bean
    public KafkaTemplate kafkaTemplate(){

        //构建工厂需要的配置
        HashMap<Object, Object> configs = new HashMap<>();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_servers_config);
        configs.put(ProducerConfig.RETRIES_CONFIG, retries_config);
        configs.put(ProducerConfig.BATCH_SIZE_CONFIG, batch_size_config);
        configs.put(ProducerConfig.LINGER_MS_CONFIG, linger_ms_config);
        configs.put(ProducerConfig.BUFFER_MEMORY_CONFIG, buffer_memory_config);

        //设置key value的序列化器
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);

        //创建生产者工厂
        ProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory(configs);

        //要返回一个kafkaTemplate对象
        return new KafkaTemplate(producerFactory);
        }

    }

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56

3.在test测试源码中创建一个Junit测试用例

  • 整合Spring Boot Test
  • 注入KafkaTemplate
  • 测试发送100条消息到testtopic
    在这里插入图片描述
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class KafkTest {
    @Autowired
    private KafkaTemplate kafkaTemplate;

    @Test
    public void sendTest01() {
        for(int i = 0; i < 100; i++) {
            kafkaTemplate.send("test", "key", "test msg!");
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

4.在KafkaManager创建testtopic,三个分区、两个副本
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
5.启动kafka-console-consumer

  bin/kafka-console-consumer.sh --zookeeper hadoop102:2181 --from-beginning --topic test
  • 1

6.打开kafka-manager的consumer监控页面,查看对应的logsize参数,消息是否均匀的分布在不同的分区中
在这里插入图片描述
在这里插入图片描述

均匀分区

编写RoundRobbinPartitioner,实现Partitioner接口,确保消息能够发送到Kafka的每个分区

  • 实现Partitioner接口的partition方法
  • 创建一个AtomicInteger变量,用来保存当前的计数器,每次生产一条消息加1
  • 使用计数器变量模除以分区数量得到当前消息需要发送的分区号
    参考代码
    在util里面编写
    RoundRobinPartitioner.java

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

public class RoundRobinPartitioner implements Partitioner {

    //并发包下的线程安全的整型类
    AtomicInteger counter = new AtomicInteger(0);

    //返回值为分区号  0  1
    @Override
    public int partition(String topic, Object key, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {

        //获取分区的数量
        Integer partitions = cluster.partitionCountForTopic(topic);

        int curpartition = counter.incrementAndGet() % partitions;

        if (counter.get() > 65535){
            counter.set(0);
        }

        return curpartition;
}

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39

在util下KafkaProducerConfig中添加

configs.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,RoundRobinPartitioner.class);
  • 1

在这里插入图片描述

上报服务开发

上报服务系统要能够接收http请求,并将http请求中的数据写入到kafka
在这里插入图片描述
步骤

  1. 创建Message实体类对象
    所有的点击流消息都会封装到Message实体类中
  2. 设计一个Controller来接收http请求
  3. 将http请求发送的消息封装到一个Message实体类对象
  4. 使用FastJSONMessage实体类对象转换为JSON字符串
  5. 将JSON字符串使用kafkaTemplate写入到kafka
  6. 返回给客户端一个写入结果JSON串
    在这里插入图片描述
    实现
    创建Message实体类
  • 包含以下字段:消息次数(count)、消息时间(timeStamp)、消息体(message)
  • 生成getter/setter、toString方法
    在bean下面创建
    Message.java
public class Message {
    //消息次数
    private int count;

    //消息的时间戳
    private long timeStamp;

    //消息体
    private String message;

    public int getCount() {
        return count;
    }

    public long getTimeStamp() {
        return timeStamp;
    }

    public String getMessage() {
        return message;
    }

    public void setCount(int count) {
        this.count = count;
    }

    public void setTimeStamp(long timeStamp) {
        this.timeStamp = timeStamp;
    }

    public void setMessage(String message) {
        this.message = message;
    }

    @Override
    public String toString() {
        return "Message{" +
                "count=" + count +
                ", timeStamp=" + timeStamp +
                ", message='" + message + '\'' +
                '}';
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44

2.在com.itheima.report.controller包下创建ReportController

  • 编写receiveDataHandler接收从客户端JSON数据,并将响应结果封装到Map结构中,返回给客户端
    注意:接收JSON数据要在参数前面添加@RequestBody注解
  • 将接收的参数封装到Message实体类
  • 使用FastJSONMessage实体类对象转换为JSON字符串
  • 将JSON字符串发送到Kafka的pygtopic
  • 将响应结果封装到Map结构中,返回给客户端
注意:
1. 在ReportController类上要添加`@RestController`注解
2. 需要添加`@AutoWired`注解来注入KafkaTemplate
3. 请求参数上要加上`@RequestBody`注解
  • 1
  • 2
  • 3
  • 4

ReportController.java

import com.alibaba.fastjson.JSON;
import com.itheima.report.bean.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.HashMap;
import java.util.Map;

@RestController
public class ReportController {

    @Autowired
    KafkaTemplate kafkaTemplate;

    @RequestMapping("/receive")
    public Map<String,String> receive(@RequestBody String json){

        Map<String,String> map = new HashMap<>();

        try {
            //构建message
            Message msg = new Message();
            msg.setMessage(json);
            msg.setCount(1);
            msg.setTimeStamp(System.currentTimeMillis());

            String msgJSON = JSON.toJSONString(msg);

            //发送messsage到kafka
            kafkaTemplate.send("pyg",msgJSON);

            map.put("sucess","true");
        } catch (Exception e) {
            e.printStackTrace();
            map.put("sucess","false");
        }
        return map;

    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44

模拟生产点击流日志消息到Kafka

为了方便进行测试,我们可以使用一个消息生成工具来生成点击流日志,然后发送给上报服务系统。该消息生成工具可以一次生成100条ClickLog信息,并转换成JSON,通过HttpClient把消息内容发送到我们编写好的ReportController。
在这里插入图片描述
步骤

链接:https://pan.baidu.com/s/1EJSDOCXlgKvdNdOsGCGDuw 
提取码:jdt4 
  • 1
  • 2
  1. 导入ClickLog实体类(ClickLog.java)放在bean下
package com.itheima.report.bean;

/**
 * 点击流日志
 */
public class ClickLog {
    //频道ID
    private long channelID ;
    //产品的类别ID
    private long categoryID ;
    //产品ID
    private long produceID ;
    //用户的ID
    private long userID ;

    //国家
    private String country ;
    //省份
    private String province ;
    //城市
    private String city ;


    //网络方式
    private String network ;
    //来源方式
    private String source ;

    //浏览器类型
    private String browserType;

    //进入网站时间
    private Long entryTime ;

    //离开网站时间
    private long leaveTime ;






    public long getChannelID() {
        return channelID;
    }

    public void setChannelID(long channelID) {
        this.channelID = channelID;
    }

    public long getCategoryID() {
        return categoryID;
    }

    public void setCategoryID(long categoryID) {
        this.categoryID = categoryID;
    }

    public long getProduceID() {
        return produceID;
    }

    public void setProduceID(long produceID) {
        this.produceID = produceID;
    }

    public String getCountry() {
        return country;
    }

    public void setCountry(String country) {
        this.country = country;
    }

    public String getProvince() {
        return province;
    }

    public void setProvince(String province) {
        this.province = province;
    }

    public String getCity() {
        return city;
    }

    public void setCity(String city) {
        this.city = city;
    }

    public String getNetwork() {
        return network;
    }

    public void setNetwork(String network) {
        this.network = network;
    }

    public String getSource() {
        return source;
    }

    public void setSource(String source) {
        this.source = source;
    }

    public String getBrowserType() {
        return browserType;
    }

    public void setBrowserType(String browserType) {
        this.browserType = browserType;
    }

    public Long getEntryTime() {
        return entryTime;
    }

    public void setEntryTime(Long entryTime) {
        this.entryTime = entryTime;
    }

    public long getLeaveTime() {
        return leaveTime;
    }

    public void setLeaveTime(long leaveTime) {
        this.leaveTime = leaveTime;
    }

    public long getUserID() {
        return userID;
    }

    public void setUserID(long userID) {
        this.userID = userID;
    }

    @Override
    public String toString() {
        return "ClickLog{" +
                "channelID=" + channelID +
                ", categoryID=" + categoryID +
                ", produceID=" + produceID +
                ", country='" + country + '\'' +
                ", province='" + province + '\'' +
                ", city='" + city + '\'' +
                ", network='" + network + '\'' +
                ", source='" + source + '\'' +
                ", browserType='" + browserType + '\'' +
                ", entryTime=" + entryTime +
                ", leaveTime=" + leaveTime +
                ", userID=" + userID +
                '}';
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  1. 导入点击流日志生成器(ClickLogGenerator.java)放在util下
import com.alibaba.fastjson.JSONObject;
import com.itheima.report.bean.ClickLog;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.util.EntityUtils;
import org.junit.Test;

import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Random;

/**
 * 点击流日志模拟器
 */
public class ClickLogGenerator {
    private static Long[] channelID = new Long[]{1l,2l,3l,4l,5l,6l,7l,8l,9l,10l,11l,12l,13l,14l,15l,16l,17l,18l,19l,20l};//频道id集合
    private static Long[] categoryID = new Long[]{1l,2l,3l,4l,5l,6l,7l,8l,9l,10l,11l,12l,13l,14l,15l,16l,17l,18l,19l,20l};//产品类别id集合
    private static Long[] produceID = new Long[]{1l,2l,3l,4l,5l,6l,7l,8l,9l,10l,11l,12l,13l,14l,15l,16l,17l,18l,19l,20l};//产品id集合
    private static Long[] userID = new Long[]{1l,2l,3l,4l,5l,6l,7l,8l,9l,10l,11l,12l,13l,14l,15l,16l,17l,18l,19l,20l};//用户id集合

    /**
     * 地区
     */
    private static String[] contrys = new String[]{"china"};//地区-国家集合
    private static String[] provinces = new String[]{"HeNan","HeBeijing"};//地区-省集合
    private static String[] citys = new String[]{"ShiJiaZhuang","ZhengZhou", "LuoYang"};//地区-市集合

    /**
     *网络方式
     */
    private static String[] networks = new String[]{"电信","移动","联通"};

    /**
     * 来源方式
     */
    private static String[] sources = new String[]{"直接输入","百度跳转","360搜索跳转","必应跳转"};

    /**
     * 浏览器
     */
    private static String[] browser = new String[]{"火狐","qq浏览器","360浏览器","谷歌浏览器"};

    /**
     * 打开时间 离开时间
     */
    private static List<Long[]> usetimelog = producetimes();
    //获取时间
    public static List<Long[]> producetimes(){
        List<Long[]> usetimelog = new ArrayList<Long[]>();
        for(int i=0;i<100;i++){
            Long [] timesarray = gettimes("2018-12-12 24:60:60:000");
            usetimelog.add(timesarray);
        }
        return usetimelog;
    }

    private static Long [] gettimes(String time){
        DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss:SSS");
        try {
            Date date = dateFormat.parse(time);
            long timetemp = date.getTime();
            Random random = new Random();
            int randomint = random.nextInt(10);
            long starttime = timetemp - randomint*3600*1000;
            long endtime = starttime + randomint*3600*1000;
            return new Long[]{starttime,endtime};
        } catch (ParseException e) {
            e.printStackTrace();
        }
        return new Long[]{0l,0l};
    }


    /**
     * 模拟发送Http请求到上报服务系统
     * @param url
     * @param json
     */
    public static void send(String url, String json) {
        try {
            CloseableHttpClient httpClient = HttpClientBuilder.create().build();
            HttpPost post = new HttpPost(url);
            JSONObject response = null;
            try {
                StringEntity s = new StringEntity(json.toString(), "utf-8");
                s.setContentEncoding("utf-8");
                // 发送json数据需要设置contentType
                s.setContentType("application/json");
                post.setEntity(s);

                HttpResponse res = httpClient.execute(post);
                if (res.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
                    // 返回json格式:
                    String result = EntityUtils.toString(res.getEntity());
                    System.out.println(result);
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        Random random = new Random();
        for (int i = 0; i < 100; i++) {
            //频道id 类别id 产品id 用户id 打开时间 离开时间 地区 网络方式 来源方式 浏览器
            ClickLog clickLog = new ClickLog();

            clickLog.setChannelID(channelID[random.nextInt(channelID.length)]);
            clickLog.setCategoryID(categoryID[random.nextInt(categoryID.length)]);
            clickLog.setProduceID(produceID[random.nextInt(produceID.length)]);
            clickLog.setUserID(userID[random.nextInt(userID.length)]);
            clickLog.setCountry(contrys[random.nextInt(contrys.length)]);
            clickLog.setProvince(provinces[random.nextInt(provinces.length)]);
            clickLog.setCity(citys[random.nextInt(citys.length)]);
            clickLog.setNetwork(networks[random.nextInt(networks.length)]);
            clickLog.setSource(sources[random.nextInt(sources.length)]);
            clickLog.setBrowserType(browser[random.nextInt(browser.length)]);

            Long[] times = usetimelog.get(random.nextInt(usetimelog.size()));
            clickLog.setEntryTime(times[0]);
            clickLog.setLeaveTime(times[1]);

            String jonstr = JSONObject.toJSONString(clickLog);
            System.out.println(jonstr);
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            send("http://localhost:8888/receive", jonstr);
        }
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146

需要改下端口号8888
在这里插入图片描述
点击流日志字段

字段说明
channelID频道ID
categoryID产品的类别ID
produceID产品ID
country国家
province省份
city城市
network网络方式(移动、联通、电信…)
source来源方式
browserType浏览器类型
entryTime进入网站时间
leaveTime离开网站时间
userID用户ID

验证测试代码

步骤

  1. 创建Kafka的topic(pyg
  2. 使用kafka-console-consumer.sh消费 topic中的数据
  3. 启动上报服务
  4. 执行ClickLogGenerator的main方法,生成一百条用户浏览消息到Kafka

实现
1.创建kafka topic

bin/kafka-topics.sh --create --zookeeper hadoop102:2181 --replication-factor 2 --partitions 3 --topic pyg
  • 1

2.启动kafka消费者

bin/kafka-console-consumer.sh --zookeeper hadoop102:2181 --from-beginning --topic pyg
  • 1

在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/431091
推荐阅读
相关标签
  

闽ICP备14008679号