赞
踩
建设实时数仓的目的,主要是增加数据计算的复用性。每次新增加统计需求时,不至于从原始数据进行计算,而是从半成品继续加工而成。
这里从kafka的ods层读取用户行为日志以及业务数据,并进行简单处理,写回到kafka作为dwd层。
分层 | 数据描述 | 生成计算工具 | 存储媒介 |
---|---|---|---|
ODS | 原始数据,日志和业务数据 | 日志服务器,maxwell | kafka |
DWD | 根据数据对象为单位进行分流,比如订单、页面访问等等。 | FLINK | kafka |
DWM | 对于部分数据对象进行进一步加工,比如独立访问、跳出行为。依旧是明细数据。 | FLINK | kafka |
DIM | 维度数据 | FLINK | HBase |
DWS | 根据某个维度主题将多个事实数据轻度聚合,形成主题宽表。 | FLINK | Clickhouse |
ADS | 把Clickhouse中的数据根据可视化需要进行筛选聚合。 | Clickhouse SQL | 可视化展示 |
根据日志的不同类别做分流。
前端埋点中的数据全部放在kafka中ods_base_log主题中,如启动日志,页面访问日志,曝光日志等。虽然同是日志,但是却分为不同的种类,将来做数据统计时,全部从这一个主题中获取数据不方便。所以需要从ods_base_log主题中将数据取出来,根据日志的类型,将不同类型的数据放到不同的主题中,进行分流操作,如启动日志放到启动主题中,曝光日志放到曝光主题中,页面日志放到日志主题中。
根据业务数据的类型(维度 or 事实)做分流。
MySQL存储的业务数据中有很多张表,这些表分为两类,一类是事实表,一类是维度表。在采集数据时,只要业务数据发生变化就会通过maxwell采集到kafka的ods_base_db_m主题中,并没有区分事实和维度。如果是事实数据,希望将其放到kafka的不同单独主题中,如订单主题,订单明细主题,支付主题等。对于维度数据,不适合存放在kafka中,kafka不适合做长期存储,默认存储7天。海量数据的分析计算,同样不适合存放到MySQL中,因为在做分析计算时要不停的进行查询操作,给业务数据库造成很大的压力,且MySQL对于大量数据的查询,性能也较差。
在使用维度数据时,需要根据维度id查询出具体的数据,K-V型数据库比较适合存储维度数据,根据K获取V效率较高,KV数据库包括Redis和Hbase,Redis对于长期存储压力比较大,最终选择Hbase存储维度数据。
创建maven工程,gmall2022-realtime。
<properties> <java.version>1.8</java.version> <maven.compiler.source>${java.version}</maven.compiler.source> <maven.compiler.target>${java.version}</maven.compiler.target> <flink.version>1.12.0</flink.version> <scala.version>2.12</scala.version> <hadoop.version>3.1.3</hadoop.version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_${scala.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-cep_${scala.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.68</version> </dependency> <!--如果保存检查点到hdfs上,需要引入此依赖--> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> <!--Flink默认使用的是slf4j记录日志,相当于一个日志的接口,这里使用log4j作为具体的日志实现--> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.25</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.25</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-to-slf4j</artifactId> <version>2.14.0</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>3.0.0</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
在resources目录下创建log4j.properties配置文件
log4j.rootLogger=error,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
目录 | 作用 |
---|---|
app | 产生各层数据的flink任务 |
bean | 数据对象 |
common | 公共常量 |
utils | 工具类 |
前面采集的日志数据已经保存到Kafka中,作为日志数据的ODS层,从kafka的ODS层读取的日志数据分为3类,页面日志、启动日志和曝光日志。这三类数据虽然都是用户行为数据,但是有着完全不一样的数据结构,所以要拆分处理。将拆分后的不同的日志写回Kafka不同主题中,作为日志DWD层。
页面日志输出到主流,启动日志输出到启动侧输出流,曝光日志输出到曝光侧输出流。
整体流程如下图:
三者之间的关系和区别如下图:
/**
* 操作kafka工具类
*/
public class MyKafkaUtil {
private static final String KAFKA_SERVER = "hadoop101:9092,hadoop102:9092,hadoop103:9092";
// 获取kafka的消费者
public static FlinkKafkaConsumer<String> getKafkaSource(String topic,String groupId){
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,KAFKA_SERVER);
// 定义消费者组
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG,groupId);
return new FlinkKafkaConsumer<String>(topic,new SimpleStringSchema(),props);
}
}
/** * 对日志数据进行分流操作 * 启动、曝光、页面 * 启动日志放到启动侧输出流中 * 曝光日志放到曝光侧输出流中 * 页面日志放到主流中 * 将不同流的数据写回到kafka的dwd主题中 */ public class BaseLogApp { public static void main(String[] args) throws Exception{ // TODO 1 基本环境准备 // 流处理环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置并行度 env.setParallelism(4); // TODO 2 检查点相关设置 // 开启检查点 // 每5S中开启一次检查点,检查点模式为EXACTLY_ONCE env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE); // 设置检查点超时时间 env.getCheckpointConfig().setCheckpointTimeout(60000L); // 设置重启策略 // 重启三次,每次间隔3s钟 env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,3000L)); // 设置job取消后,检查点是否保留 env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // 设置状态后端 -- 基于内存 or 文件系统 or RocksDB // 内存:状态存在TaskManager内存中,检查点存在JobManager内存中 // 文件系统:状态存在TaskManager内存中,检查点存在指定的文件系统路径中 // RocksDB:看做和Redis类似的数据库,状态存在TaskManager内存中,检查点存在JobManager内存和本地磁盘上 // hadoop中nm的地址 env.setStateBackend(new FsStateBackend("hdfs://hadoop101:8020/ck/gmall")); // 指定操作HDFS的用户 System.setProperty("HADOOP_USER_NAME","hzy"); // TODO 3 从kafka读取数据 // 声明消费的主题和消费者组 String topic = "ods_base_log"; String groupId = "base_log_app_group"; // 获取kafka消费者 FlinkKafkaConsumer<String> kafkaSource = MyKafkaUtil.getKafkaSource(topic, groupId); // 读取数据,封装为流 DataStreamSource<String> kafkaDS = env.addSource(kafkaSource); // TODO 4 对读取的数据进行结构的转换 jsonStr -> jsonObj // // 匿名内部类实现 // SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.map( // new MapFunction<String, JSONObject>() { // @Override // public JSONObject map(String jsonStr) throws Exception { // return JSON.parseObject(jsonStr); // } // } // ); // // lambda表达式实现 // kafkaDS.map( // jsonStr -> JSON.parse(jsonStr) // ); // 方法的默认调用,注意导入的是alibaba JSON包 SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.map(JSON::parseObject); jsonObjDS.print(">>>"); // TODO 5 修复新老访客状态 // TODO 6 按照日志类型对日志进行分流 // TODO 7 将不同流的数据写到kafka的dwd不同主题中 env.execute(); } }
# 启动zookeeper
# 启动kafka
# 启动采集服务
logger.sh start
# 启动nm以将检查点保存到hdfs上
start-dfs.sh
# 等待安全模式关闭,启动主程序,如果出现权限问题,可以将权限放开
hdfs dfs -chmod -R 777 /
# 或者增加以下代码到主程序中
System.setProperty("HADOOP_USER_NAME","hzy");
# 程序运行起来后,启动模拟生成日志数据jar包,在主程序中可以接收到数据
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。