赞
踩
公司有一个正在运营中的电商网站,名称叫做【品优购】。这是一个B2B2C的电商平台,类似京东。
现在我们想了解一下该电商网站的各种用户行为(访问行为、购物行为、点击行为等),统计出PV、UV等数据。
针对这样的大数据分析项目,我们可以采用MapReduce、Spark或者Flink来进行开发。
由于本项目会对实时数据和静态数据进行分析,所以我们采用性能更加优越的Flink来开发。
业务目标
**常见电商模式**
C2C--个人对个人 案例:淘宝、瓜子二手车
B2B--企业对企业 案例:阿里巴巴、慧聪网
B2C--企业对个人 案例:唯品会、乐蜂网
B2B2C -企业-企业-个人 案例:京东商城、天猫商城
C2B--个人对企业 案例:海尔商城、 尚品宅配
O2O--线上到线下 案例:美团、饿了么
用户指标
商品指标
**电商小概念: **
**SPU** = Standard Product Unit (标准产品单位)SPU是商品信息聚合的最小单位,是一组可复用、易检索的标准化信息的集合,该集合描述了一个产品的特性。 例如:iPhone X 可以确定一个产品即为一个SPU。
**SKU **= Stock Keeping Unit(库存量单位)。一款商品有多个颜色,则是有多个SKU,例:一件衣服,有红色、白色、蓝色,则SKU编码也不相同,如相同则会出现混淆,发错货。 例如:iPhone X 64G 银色 则是一个SKU。
上报服务系统
将商城访问日志推送到kafka
数据库同步系统
将mysql数据推送到kafka
实时分析系统
消费kafka
数据,经过分析后,下沉到HBase
实时同步系统
消费kafka数据,下沉到HBase
批处理分析系统
从hbase取出数据,进行数据分析web可视化平台
展示HBase中的分析结果数据为什么要选择架构中的技术?
Kafka是一种高吞吐量的分布式发布订阅消息系统。
HBase是一个分布式的、面向列的开源数据库。
数据库同步常用的有两种方案:
mysql --> logstash --> kafka --> flink --> hbase
mysql --> sqoop --> kafka --> flink-->hbase
使用SQL语句查询mysql
,会给mysql增加压力,如果要跑大量数据的同步,会拖垮mysqlmysql --> cannal(binlog) --> kafka --> flink --> hbase
binlog
日志,通过读取binlog日志,将数据输出到Kafka。不需要执行SQL语句
,不会增加mysql压力问题:
为什么要选择基于canal来进行数据库同步技术?
本项目采用Maven
构建,下面是我们的项目的整体工程架构。总工程为pyg,下面包含我们要开发的5个子模块。
IDEA创建工程
pyg
,删除
总工程的src
目录<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
report
模块real-process
实时处理Maven模块canal-kafka
数据采集Maven模块sync-db
数据库同步处理Maven模块batch-process
批数据处理Maven模块
上报服务系统是一个Java Web工程,为了快速开发web项目,我们采用时下JavaWeb最流行的技术Spring Boot
。
Spring Boot是一个基于Spring之上的快速应用构建框架。使用Spring Boot可以快速开发出基于Spring的应用。Spring Boot主要解决两方面的问题。
application.properties
配置文件Application
入口程序从目录中的pom.xml
中拷贝依赖到report中的pom文件中
主要导入以下依赖:
Spring Boot
依赖FastJSON
依赖Kafka
依赖<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.13.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.itheima</groupId>
<artifactId>report</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>report</name>
<description>Demo project for Spring Boot</description>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<!-- 阿里云 maven-->
<repositories>
<repository>
<id>alimaven</id>
<name>alimaven</name>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.0.6.RELEASE</version>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>http-client</artifactId>
<version>2.0.1.RELEASE</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
包名 | 说明 |
---|---|
com.itheima.report.controller | 存放Spring MVC的controller |
com.itheima.report.bean | 存放相关的Java Bean实体类 |
com.itheima.report.util | 用来存放相关的工具类 |
步骤
入口程序
Applicationapplication.properties
配置文件Spring MVC
Controller/Handler,接收浏览器请求参数并打印回显实现
1.创建SpringBoot入口程序ReportApplication
,用来启动SpringBoot程序
在类上添加以下注解
@SpringBootApplication
在main方法中添加以下代码,用来运行Spring Boot
程序
SpringApplication.run(ReportApplication.class);
2.在创建一个TestController
注意在类上要添加@RestController
注解
@RestController
public class TestController {
}
3.编写一个test Handler
,从浏览器上接收一个叫做json参数,并打印回显
@RequestMapping("/test")
public String test(String json) {
System.out.println(json);
return json;
}
4.编写application.properties配置文件
配置端口号(8888)
server.port=8888
5.启动Spring Boot程序
6.打开浏览器测试Handler是否能够接收到数据
http://localhost:8888/test?json=123123
Kafka-manager是Yahoo!开源的一款Kafka监控管理工具。
链接:https://pan.baidu.com/s/1K7DTfS-b0oI9D6tDhbeY0A
提取码:0fbr
安装步骤
kafka-manager-1.3.3.7.tar.gz
/export/servers
tar -zxf kafka-manager-1.3.3.7.tar.gz -C /export/servers/
3.修改conf/application.conf
kafka-manager.zkhosts="hadoop102:2181,hadoop103:2181,hadoop104:2181"
4.启动zookeeper
zk.sh start
5.启动kafka
kf.sh start
6.直接运行bin/kafka-manager
cd /export/servers/kafka-manager-1.3.3.7
nohup bin/kafka-manager 2>&1 &
7.浏览器中使用hadoop102:9000
访问即可
默认kafka-manager的端口号为`9000`,如果该端口被占用,请使用下面的命令修改端口
bin/kafka-manager -Dconfig.file=/export/servers/kafka-manager-1.3.3.7/conf/application.conf -Dhttp.port=10086
由于我们项目要操作Kafka, 我们先来构建出KafkaTemplate, 这是一个Kafka的模板对象, 通过它我们可以很方便的发送消息到Kafka.
开发步骤
KafkaProducerConfig
,构建KafkaTemplate
application.properties
中#
# kakfa
#
#kafka的服务器地址
kafka.bootstrap_servers_config=hadoop102:9092,hadoop103:9092,hadoop104:9092
#如果出现发送失败的情况,允许重试的次数
kafka.retries_config=0
#每个批次发送多大的数据
kafka.batch_size_config=4096
#定时发送,达到1ms发送
kafka.linger_ms_config=1
#缓存的大小
kafka.buffer_memory_config=40960
#TOPIC的名字
kafka.topic=pyg
**定时定量**
1. kafka生产者发送一批数据的大小:kafka.producer.batch.size=4096 (单位:字节)
实际环境可以调大一些,提高效率
2. 定时发送:kafka.producer.linger=1
达到一毫秒后发送
2.编写KafkaProducerConfig
,主要创建KafkaTemplate
,用于发送Kafka消息
${配置项}
”)来读取配置DefaultKafkaProducerFactory
KafkaTemplate
package com.itheima.report.util;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
@Configuration //表示该类是一个配置类
public class KafkaProducerConfig {
@Value("${kafka.bootstrap_servers_config}")
private String bootstrap_servers_config;
@Value("${kafka.retries_config}")
private int retries_config;
@Value("${kafka.batch_size_config}")
private int batch_size_config;
@Value("${kafka.linger_ms_config}")
private int linger_ms_config;
@Value("${kafka.buffer_memory_config}")
private int buffer_memory_config;
@Bean //表示该对象是受spring所管理的一个bean
public KafkaTemplate kafkaTemplate(){
//构建工厂需要的配置
HashMap<Object, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_servers_config);
configs.put(ProducerConfig.RETRIES_CONFIG, retries_config);
configs.put(ProducerConfig.BATCH_SIZE_CONFIG, batch_size_config);
configs.put(ProducerConfig.LINGER_MS_CONFIG, linger_ms_config);
configs.put(ProducerConfig.BUFFER_MEMORY_CONFIG, buffer_memory_config);
//设置key value的序列化器
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
//创建生产者工厂
ProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory(configs);
//要返回一个kafkaTemplate对象
return new KafkaTemplate(producerFactory);
}
}
3.在test
测试源码中创建一个Junit测试用例
KafkaTemplate
test
topicimport org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class KafkTest {
@Autowired
private KafkaTemplate kafkaTemplate;
@Test
public void sendTest01() {
for(int i = 0; i < 100; i++) {
kafkaTemplate.send("test", "key", "test msg!");
}
}
}
4.在KafkaManager创建test
topic,三个分区、两个副本
5.启动kafka-console-consumer
bin/kafka-console-consumer.sh --zookeeper hadoop102:2181 --from-beginning --topic test
6.打开kafka-manager的consumer监控页面,查看对应的logsize
参数,消息是否均匀的分布在不同的分区中
编写RoundRobbinPartitioner
,实现Partitioner
接口,确保消息能够发送到Kafka的每个分区
Partitioner
接口的partition方法AtomicInteger
变量,用来保存当前的计数器,每次生产一条消息加1分区数量
得到当前消息需要发送的分区号
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
public class RoundRobinPartitioner implements Partitioner {
//并发包下的线程安全的整型类
AtomicInteger counter = new AtomicInteger(0);
//返回值为分区号 0 1
@Override
public int partition(String topic, Object key, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
//获取分区的数量
Integer partitions = cluster.partitionCountForTopic(topic);
int curpartition = counter.incrementAndGet() % partitions;
if (counter.get() > 65535){
counter.set(0);
}
return curpartition;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}
在util下KafkaProducerConfig中添加
configs.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,RoundRobinPartitioner.class);
上报服务系统要能够接收http请求,并将http请求中的数据写入到kafka
步骤
Message
实体类对象Message
实体类对象FastJSON
将Message
实体类对象转换为JSON字符串kafkaTemplate
写入到kafka
Message
实体类public class Message {
//消息次数
private int count;
//消息的时间戳
private long timeStamp;
//消息体
private String message;
public int getCount() {
return count;
}
public long getTimeStamp() {
return timeStamp;
}
public String getMessage() {
return message;
}
public void setCount(int count) {
this.count = count;
}
public void setTimeStamp(long timeStamp) {
this.timeStamp = timeStamp;
}
public void setMessage(String message) {
this.message = message;
}
@Override
public String toString() {
return "Message{" +
"count=" + count +
", timeStamp=" + timeStamp +
", message='" + message + '\'' +
'}';
}
}
2.在com.itheima.report.controller
包下创建ReportController
类
receiveData
Handler接收从客户端JSON数据,并将响应结果封装到Map
结构中,返回给客户端@RequestBody
注解Message
实体类FastJSON
将Message
实体类对象转换为JSON字符串pyg
topicMap
结构中,返回给客户端注意:
1. 在ReportController类上要添加`@RestController`注解
2. 需要添加`@AutoWired`注解来注入KafkaTemplate
3. 请求参数上要加上`@RequestBody`注解
ReportController.java
import com.alibaba.fastjson.JSON;
import com.itheima.report.bean.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
import java.util.Map;
@RestController
public class ReportController {
@Autowired
KafkaTemplate kafkaTemplate;
@RequestMapping("/receive")
public Map<String,String> receive(@RequestBody String json){
Map<String,String> map = new HashMap<>();
try {
//构建message
Message msg = new Message();
msg.setMessage(json);
msg.setCount(1);
msg.setTimeStamp(System.currentTimeMillis());
String msgJSON = JSON.toJSONString(msg);
//发送messsage到kafka
kafkaTemplate.send("pyg",msgJSON);
map.put("sucess","true");
} catch (Exception e) {
e.printStackTrace();
map.put("sucess","false");
}
return map;
}
}
为了方便进行测试,我们可以使用一个消息生成工具来生成点击流日志,然后发送给上报服务系统。该消息生成工具可以一次生成100条ClickLog信息,并转换成JSON,通过HttpClient把消息内容发送到我们编写好的ReportController。
步骤
链接:https://pan.baidu.com/s/1EJSDOCXlgKvdNdOsGCGDuw
提取码:jdt4
package com.itheima.report.bean;
/**
* 点击流日志
*/
public class ClickLog {
//频道ID
private long channelID ;
//产品的类别ID
private long categoryID ;
//产品ID
private long produceID ;
//用户的ID
private long userID ;
//国家
private String country ;
//省份
private String province ;
//城市
private String city ;
//网络方式
private String network ;
//来源方式
private String source ;
//浏览器类型
private String browserType;
//进入网站时间
private Long entryTime ;
//离开网站时间
private long leaveTime ;
public long getChannelID() {
return channelID;
}
public void setChannelID(long channelID) {
this.channelID = channelID;
}
public long getCategoryID() {
return categoryID;
}
public void setCategoryID(long categoryID) {
this.categoryID = categoryID;
}
public long getProduceID() {
return produceID;
}
public void setProduceID(long produceID) {
this.produceID = produceID;
}
public String getCountry() {
return country;
}
public void setCountry(String country) {
this.country = country;
}
public String getProvince() {
return province;
}
public void setProvince(String province) {
this.province = province;
}
public String getCity() {
return city;
}
public void setCity(String city) {
this.city = city;
}
public String getNetwork() {
return network;
}
public void setNetwork(String network) {
this.network = network;
}
public String getSource() {
return source;
}
public void setSource(String source) {
this.source = source;
}
public String getBrowserType() {
return browserType;
}
public void setBrowserType(String browserType) {
this.browserType = browserType;
}
public Long getEntryTime() {
return entryTime;
}
public void setEntryTime(Long entryTime) {
this.entryTime = entryTime;
}
public long getLeaveTime() {
return leaveTime;
}
public void setLeaveTime(long leaveTime) {
this.leaveTime = leaveTime;
}
public long getUserID() {
return userID;
}
public void setUserID(long userID) {
this.userID = userID;
}
@Override
public String toString() {
return "ClickLog{" +
"channelID=" + channelID +
", categoryID=" + categoryID +
", produceID=" + produceID +
", country='" + country + '\'' +
", province='" + province + '\'' +
", city='" + city + '\'' +
", network='" + network + '\'' +
", source='" + source + '\'' +
", browserType='" + browserType + '\'' +
", entryTime=" + entryTime +
", leaveTime=" + leaveTime +
", userID=" + userID +
'}';
}
}
import com.alibaba.fastjson.JSONObject;
import com.itheima.report.bean.ClickLog;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.util.EntityUtils;
import org.junit.Test;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Random;
/**
* 点击流日志模拟器
*/
public class ClickLogGenerator {
private static Long[] channelID = new Long[]{1l,2l,3l,4l,5l,6l,7l,8l,9l,10l,11l,12l,13l,14l,15l,16l,17l,18l,19l,20l};//频道id集合
private static Long[] categoryID = new Long[]{1l,2l,3l,4l,5l,6l,7l,8l,9l,10l,11l,12l,13l,14l,15l,16l,17l,18l,19l,20l};//产品类别id集合
private static Long[] produceID = new Long[]{1l,2l,3l,4l,5l,6l,7l,8l,9l,10l,11l,12l,13l,14l,15l,16l,17l,18l,19l,20l};//产品id集合
private static Long[] userID = new Long[]{1l,2l,3l,4l,5l,6l,7l,8l,9l,10l,11l,12l,13l,14l,15l,16l,17l,18l,19l,20l};//用户id集合
/**
* 地区
*/
private static String[] contrys = new String[]{"china"};//地区-国家集合
private static String[] provinces = new String[]{"HeNan","HeBeijing"};//地区-省集合
private static String[] citys = new String[]{"ShiJiaZhuang","ZhengZhou", "LuoYang"};//地区-市集合
/**
*网络方式
*/
private static String[] networks = new String[]{"电信","移动","联通"};
/**
* 来源方式
*/
private static String[] sources = new String[]{"直接输入","百度跳转","360搜索跳转","必应跳转"};
/**
* 浏览器
*/
private static String[] browser = new String[]{"火狐","qq浏览器","360浏览器","谷歌浏览器"};
/**
* 打开时间 离开时间
*/
private static List<Long[]> usetimelog = producetimes();
//获取时间
public static List<Long[]> producetimes(){
List<Long[]> usetimelog = new ArrayList<Long[]>();
for(int i=0;i<100;i++){
Long [] timesarray = gettimes("2018-12-12 24:60:60:000");
usetimelog.add(timesarray);
}
return usetimelog;
}
private static Long [] gettimes(String time){
DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss:SSS");
try {
Date date = dateFormat.parse(time);
long timetemp = date.getTime();
Random random = new Random();
int randomint = random.nextInt(10);
long starttime = timetemp - randomint*3600*1000;
long endtime = starttime + randomint*3600*1000;
return new Long[]{starttime,endtime};
} catch (ParseException e) {
e.printStackTrace();
}
return new Long[]{0l,0l};
}
/**
* 模拟发送Http请求到上报服务系统
* @param url
* @param json
*/
public static void send(String url, String json) {
try {
CloseableHttpClient httpClient = HttpClientBuilder.create().build();
HttpPost post = new HttpPost(url);
JSONObject response = null;
try {
StringEntity s = new StringEntity(json.toString(), "utf-8");
s.setContentEncoding("utf-8");
// 发送json数据需要设置contentType
s.setContentType("application/json");
post.setEntity(s);
HttpResponse res = httpClient.execute(post);
if (res.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
// 返回json格式:
String result = EntityUtils.toString(res.getEntity());
System.out.println(result);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
Random random = new Random();
for (int i = 0; i < 100; i++) {
//频道id 类别id 产品id 用户id 打开时间 离开时间 地区 网络方式 来源方式 浏览器
ClickLog clickLog = new ClickLog();
clickLog.setChannelID(channelID[random.nextInt(channelID.length)]);
clickLog.setCategoryID(categoryID[random.nextInt(categoryID.length)]);
clickLog.setProduceID(produceID[random.nextInt(produceID.length)]);
clickLog.setUserID(userID[random.nextInt(userID.length)]);
clickLog.setCountry(contrys[random.nextInt(contrys.length)]);
clickLog.setProvince(provinces[random.nextInt(provinces.length)]);
clickLog.setCity(citys[random.nextInt(citys.length)]);
clickLog.setNetwork(networks[random.nextInt(networks.length)]);
clickLog.setSource(sources[random.nextInt(sources.length)]);
clickLog.setBrowserType(browser[random.nextInt(browser.length)]);
Long[] times = usetimelog.get(random.nextInt(usetimelog.size()));
clickLog.setEntryTime(times[0]);
clickLog.setLeaveTime(times[1]);
String jonstr = JSONObject.toJSONString(clickLog);
System.out.println(jonstr);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
send("http://localhost:8888/receive", jonstr);
}
}
}
需要改下端口号8888
点击流日志字段
字段 | 说明 |
---|---|
channelID | 频道ID |
categoryID | 产品的类别ID |
produceID | 产品ID |
country | 国家 |
province | 省份 |
city | 城市 |
network | 网络方式(移动、联通、电信…) |
source | 来源方式 |
browserType | 浏览器类型 |
entryTime | 进入网站时间 |
leaveTime | 离开网站时间 |
userID | 用户ID |
步骤
pyg
)kafka-console-consumer.sh
消费 topic中的数据ClickLogGenerator
的main方法,生成一百条用户浏览消息到Kafka实现
1.创建kafka topic
bin/kafka-topics.sh --create --zookeeper hadoop102:2181 --replication-factor 2 --partitions 3 --topic pyg
2.启动kafka消费者
bin/kafka-console-consumer.sh --zookeeper hadoop102:2181 --from-beginning --topic pyg
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。