赞
踩
在之前介绍实时数仓概念时讨论过,建设实时数仓的目的,主要是增加数据计算的复用性。每次新增加统计需求时,不至于从原始数据进行计算,而是从半成品继续加工而成。
我们这里从 kafka 的 ods 层读取用户行为日志以及业务数据,并进行简单处理,写回到 kafka 作为 dwd 层。
分层 | 数据描述 | 生成计算工具 | 存储媒介 |
---|---|---|---|
ODS | 原始数据,日志和业务数据 日志服务器 | maxwell | kafka |
DWD | 根据数据对象为单位进行分流,比如订单、页面访问等等 | FLINK | kafka |
DIM | 维度数据 | FLINK | HBase/Redis |
DWM | 对于部分数据对象进行进一步加工,比如独立访问、跳出行为依旧是明细数据 | FLINK | kafka |
DWS | 根据某个维度主题将多个事实数据轻度聚合,形成主题宽表 | FLINK | Clickhouse |
ADS | 把 Clickhouse 中的数据根据可视化需要进行筛选聚合 | Clickhouse | 可视化展示 |
➢ 功能 1:环境搭建
➢ 功能 2:计算用户行为日志 DWD 层
➢ 功能 3:计算业务数据 DWD 层
我们前面采集的日志数据已经保存到 Kafka 中,作为日志数据的 ODS 层,从 kafka 的ODS 层读取的日志数据分为 3 类, 页面日志、启动日志和曝光日志。这三类数据虽然都是用户行为数据,但是有着完全不一样的数据结构,所以要拆分处理。利用侧输出流实现数据拆分,将拆分后的不同的日志写回 Kafka 不同主题中,作为日志 DWD 层。
2 识别新老访客
保存每个 mid 的首次访问日期,每条进入该算子的访问记录,都会把 mid 对应的首次访问时间读取出来,跟当前日期进行比较,只有首次访问时间不为空,且首次访问时间早于当日的,则认为该访客是老访客,否则是新访客。同时如果是新访客且没有访问记录的话,会写入首次访问时间。
- /**
- * Desc: 准备用户行为日志的DWD层
- */
- public class BaseLogApp {
- private static final String TOPIC_START = "dwd_start_log";
- private static final String TOPIC_DISPLAY = "dwd_display_log";
- private static final String TOPIC_PAGE = "dwd_page_log";
- public static void main(String[] args) throws Exception {
- //TODO 1.准备环境
- //1.1 创建Flink流执行环境
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- //1.2设置并行度
- env.setParallelism(1);
-
- //1.3设置Checkpoint
- //每5000ms开始一次checkpoint,模式是EXACTLY_ONCE(默认)
- //env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
- //env.getCheckpointConfig().setCheckpointTimeout(60000);
- //env.setStateBackend(new FsStateBackend("hdfs://hadoop202:8020/gmall/checkpoint/baselogApp"));
-
- //System.setProperty("HADOOP_USER_NAME","atguigu");
-
- //TODO 2.从Kafka中读取数据
- String topic = "ods_base_log";
- String groupId = "base_log_app_group";
-
- //2.1 调用Kafka工具类,获取FlinkKafkaConsumer
- FlinkKafkaConsumer<String> kafkaSource = MyKafkaUtil.getKafkaSource(topic, groupId);
- DataStreamSource<String> kafkaDS = env.addSource(kafkaSource);
-
- //TODO 3.对读取到的数据格式进行转换 String->json
- SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.map(
- new MapFunction<String, JSONObject>() {
- @Override
- public JSONObject map(String value) throws Exception {
- JSONObject jsonObject = JSON.parseObject(value);
- return jsonObject;
- }
- }
- );
- //jsonObjDS.print("json>>>>>>>>");
- /*
- TODO 4.识别新老访客 前端也会对新老状态进行记录,有可能会不准,咱们这里是再次做一个确认
- 保存mid某天方法情况(将首次访问日期作为状态保存起来),等后面该设备在有日志过来的时候,从状态中获取日期
- 和日志产生日志进行对比。如果状态不为空,并且状态日期和当前日期不相等,说明是老访客,如果is_new标记是1,那么对其状态进行修复
- */
- //4.1 根据mid对日志进行分组
- KeyedStream<JSONObject, String> midKeyedDS = jsonObjDS.keyBy(
- data -> data.getJSONObject("common").getString("mid")
- );
-
- //4.2 新老方法状态修复 状态分为算子状态和键控状态,我们这里要记录某一个设备的访问,使用键控状态比较合适
- SingleOutputStreamOperator<JSONObject> jsonDSWithFlag = midKeyedDS.map(
- new RichMapFunction<JSONObject, JSONObject>() {
- //定义该mid访问状态
- private ValueState<String> firstVisitDateState;
- //定义日期格式化对象
- private SimpleDateFormat sdf;
-
- @Override
- public void open(Configuration parameters) throws Exception {
- //对状态以及日期格式进行初始化
- firstVisitDateState = getRuntimeContext().getState(
- new ValueStateDescriptor<String>("newMidDateState", String.class)
- );
- sdf = new SimpleDateFormat("yyyyMMdd");
- }
-
- @Override
- public JSONObject map(JSONObject jsonObj) throws Exception {
- //获取当前日志标记状态
- String isNew = jsonObj.getJSONObject("common").getString("is_new");
-
- //获取当前日志访问时间戳
- Long ts = jsonObj.getLong("ts");
-
- if ("1".equals(isNew)) {
- //获取当前mid对象的状态
- String stateDate = firstVisitDateState.value();
- //对当前条日志的日期格式进行抓换
- String curDate = sdf.format(new Date(ts));
- //如果状态不为空,并且状态日期和当前日期不相等,说明是老访客
- if (stateDate != null && stateDate.length() != 0) {
- //判断是否为同一天数据
- if (!stateDate.equals(curDate)) {
- isNew = "0";
- jsonObj.getJSONObject("common").put("is_new", isNew);
- }
- } else {
- //如果还没记录设备的状态,将当前访问日志作为状态值
- firstVisitDateState.update(curDate);
- }
- }
- return jsonObj;
- }
- }
- );
-
- //jsonDSWithFlag.print(">>>>>>>>>>>");
-
- //TODO 5 .分流 根据日志数据内容,将日志数据分为3类, 页面日志、启动日志和 曝光日志。
- // 页面日志输出到主流,启动日志输出到启动侧输出流,曝光日志输出到曝光日志侧输出流
- // 侧输出流:1)接收迟到数据 2)分流
-
- //定义启动侧输出流标签
- OutputTag<String> startTag = new OutputTag<String>("start"){};
- //定义曝光侧输出流标签
- OutputTag<String> displayTag = new OutputTag<String>("display"){};
- SingleOutputStreamOperator<String> pageDS = jsonDSWithFlag.process(
- new ProcessFunction<JSONObject, String>() {
- @Override
- public void processElement(JSONObject jsonObj, Context ctx, Collector<String> out) throws Exception {
- //获取启动日志标记
- JSONObject startJsonObj = jsonObj.getJSONObject("start");
- //将json格式转换为字符串,方便向侧输出流输出以及向kafka中写入
- String dataStr = jsonObj.toString();
-
- //判断是否为启动日志
- if (startJsonObj != null && startJsonObj.size() > 0) {
- //如果是启动日志,输出到启动侧输出流
- ctx.output(startTag, dataStr);
- } else {
- //如果不是启动日志,获取曝光日志标记(曝光日志中也携带了页面)
- JSONArray displays = jsonObj.getJSONArray("displays");
- //判断是否为曝光日志
- if (displays != null && displays.size() > 0) {
- //如果是曝光日志,遍历输出到侧输出流
- for (int i = 0; i < displays.size(); i++) {
- //获取每一条曝光事件
- JSONObject displaysJsonObj = displays.getJSONObject(i);
- //获取页面id
- String pageId = jsonObj.getJSONObject("page").getString("page_id");
- //给每一条曝光事件加pageId
- displaysJsonObj.put("page_id", pageId);
- ctx.output(displayTag, displaysJsonObj.toString());
- }
- } else { //如果不是启动日志 说明是页面日志 ,输出到主流
- out.collect(dataStr);
- }
-
- }
- }
- }
- );
-
- //获取侧输出流
- DataStream<String> startDS = pageDS.getSideOutput(startTag);
- DataStream<String> displayDS = pageDS.getSideOutput(displayTag);
-
- //打印输出
- pageDS.print("page>>>>");
- startDS.print("start>>>>");
- displayDS.print("display>>>>");
-
- //TODO 6.将不同流的数据写回到kafka的不同topic中
- FlinkKafkaProducer<String> startSink = MyKafkaUtil.getKafkaSink(TOPIC_START);
- startDS.addSink(startSink);
-
- FlinkKafkaProducer<String> displaySink = MyKafkaUtil.getKafkaSink(TOPIC_DISPLAY);
- displayDS.addSink(displaySink);
-
- FlinkKafkaProducer<String> pageSink = MyKafkaUtil.getKafkaSink(TOPIC_PAGE);
- pageDS.addSink(pageSink);
-
- env.execute();
-
- }
- }

业务数据的变化,我们可以通过 Maxwell 采集到,但是 MaxWell 是把全部数据统一写入一个 Topic 中, 这些数据包括业务数据,也包含维度数据,这样显然不利于日后的数据处理,所以需要把各个表拆开处理。但是由于每个表有不同的特点,有些表是维度表,有些表是事实表,有的表既是事实表在某种情况下也是维度表。
所以这个功能是从 Kafka 的业务数据 ODS 层读取数据,在实时计算中,经过处理后, 一般把维度数据写入存储容器,一般是方便通过主键查询的数据库比如HBase/ Redis / MySQL等。一般把事实数据写入流中,将事实数据写回 Kafka 作为业务数据的 DWD 层,进行进一步处理,最终形成宽表。
但是作为 Flink 实时计算任务,如何得知哪些表是维度表,哪些是事实表呢?而这些表又应该采集哪些字段呢?
我们可以将上面的内容放到某一个地方,集中配置。这样的配置不适合写在配置文件中,因为业务端随着需求变化每增加一张表,就要修改配置重启计算程序。所以这里需要一种动态配置方案,把这种配置长期保存起来,一旦配置有变化,实时计算可以自动感知。
这种可以有两个方案实现:
这里选择第二种方案,主要是 mysql 对于配置数据初始化和维护管理,用 sql 都比较方便,虽然周期性操作时效性差一点,但是配置变化并不频繁。
- /**
- * Desc: 准备业务数据的DWD层
- */
- public class BaseDBApp {
- public static void main(String[] args) throws Exception {
- //TODO 1.准备环境
- //1.1 创建流处理执行环境
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- //1.2 设置并新度
- env.setParallelism(1);
- //1.3 开启Checkpoint,并设置相关的参数
- //env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
- //env.getCheckpointConfig().setCheckpointTimeout(60000);
- //env.setStateBackend(new FsStateBackend("hdfs://hadoop202:8020/gmall/checkpoint/basedbapp"));
- //重启策略
- // 如果说没有开启重启Checkpoint,那么重启策略就是noRestart
- // 如果说没有开Checkpoint,那么重启策略会尝试自动帮你进行重启 重启Integer.MaxValue
- //env.setRestartStrategy(RestartStrategies.noRestart());
-
- //TODO 2.从Kafka的ODS层读取数据
- String topic = "ods_base_db_m";
- String groupId = "base_db_app_group";
-
- //2.1 通过工具类获取Kafka的消费者
- FlinkKafkaConsumer<String> kafkaSource = MyKafkaUtil.getKafkaSource(topic, groupId);
- DataStreamSource<String> jsonStrDS = env.addSource(kafkaSource);
-
- //TODO 3.对DS中数据进行结构的转换 String-->Json
- //jsonStrDS.map(JSON::parseObject);
- SingleOutputStreamOperator<JSONObject> jsonObjDS = jsonStrDS.map(jsonStr -> JSON.parseObject(jsonStr));
- //jsonStrDS.print("json>>>>");
-
- //TODO 4.对数据进行ETL 如果table为空 或者 data为空,或者长度<3 ,将这样的数据过滤掉
- SingleOutputStreamOperator<JSONObject> filteredDS = jsonObjDS.filter(
- jsonObj -> {
- boolean flag = jsonObj.getString("table") != null
- && jsonObj.getJSONObject("data") != null
- && jsonObj.getString("data").length() > 3;
- return flag;
- }
- );
-
- //filteredDS.print("json>>>>>");
-
- //TODO 5. 动态分流 事实表放到主流,写回到kafka的DWD层;如果维度表,通过侧输出流,写入到Hbase
- //5.1定义输出到Hbase的侧输出流标签
- OutputTag<JSONObject> hbaseTag = new OutputTag<JSONObject>(TableProcess.SINK_TYPE_HBASE){};
-
- //5.2 主流 写回到Kafka的数据
- SingleOutputStreamOperator<JSONObject> kafkaDS = filteredDS.process(
- new TableProcessFunction(hbaseTag)
- );
-
- //5.3获取侧输出流 写到Hbase的数据
- DataStream<JSONObject> hbaseDS = kafkaDS.getSideOutput(hbaseTag);
-
- kafkaDS.print("事实>>>>");
- hbaseDS.print("维度>>>>");
-
- //TODO 6.将维度数据保存到Phoenix对应的维度表中
- hbaseDS.addSink(new DimSink());
-
- //TODO 7.将事实数据写回到kafka的dwd层
- FlinkKafkaProducer<JSONObject> kafkaSink = MyKafkaUtil.getKafkaSinkBySchema(
- new KafkaSerializationSchema<JSONObject>() {
- @Override
- public void open(SerializationSchema.InitializationContext context) throws Exception {
- System.out.println("kafka序列化");
- }
- @Override
- public ProducerRecord<byte[], byte[]> serialize(JSONObject jsonObj, @Nullable Long timestamp) {
- String sinkTopic = jsonObj.getString("sink_table");
- JSONObject dataJsonObj = jsonObj.getJSONObject("data");
- return new ProducerRecord<>(sinkTopic,dataJsonObj.toString().getBytes());
- }
- }
- );
-
- kafkaDS.addSink(kafkaSink);
-
- env.execute();
- }
- }

2) 程序流程分析
➢ 图中紫线,这个时间线与数据流入无关,只要任务启动就会执行。主要的任务方法是open()这个方法在任务启动时就会执行。他的主要工作就是初始化一些连接,开启周期调度。
➢ 图中绿线,这个时间线也与数据流入无关,只要周期调度启动,会自动周期性执行。主要的任务是同步配置表(tableProcessMap)。通过在 open()方法中加入 timer定时器实现。同时还有个附带任务就是如果发现不存在数据表,要根据配置自动创建数据库表。
➢ 图中黑线,这个时间线就是随着数据的流入持续发生,这部分的任务就是根据同步到内存的 tableProcessMap,来为流入的数据进行标识,同时清理掉没用的字段。
- /**
- * Desc: 用于对业务数据进行分流处理的自定义处理函数
- */
- public class TableProcessFunction extends ProcessFunction<JSONObject, JSONObject> {
-
- //因为要将维度数据通过侧输出流输出,所以我们在这里定义一个侧输出流标记
- private OutputTag<JSONObject> outputTag;
-
- //用于在内存中存放配置表信息的Map <表名:操作,tableProcess>
- private Map<String, TableProcess> tableProcessMap = new HashMap<>();
-
- //用于在内存中存放已经处理过的表(在phoenix中已经建过的表)
- private Set<String> existsTables = new HashSet<>();
-
- //声明Phoenix的连接对象
- Connection conn = null;
- ………………
- ………………
- //根据sinkType,将数据输出到不同的流
- if (tableProcess != null && tableProcess.getSinkType().equals(TableProcess.SINK_TYPE_HBASE)) {
- //如果sinkType = hbase ,说明是维度数据,通过侧输出流输出
- ctx.output(outputTag, jsonObj);
- } else if (tableProcess != null && tableProcess.getSinkType().equals(TableProcess.SINK_TYPE_KAFKA)) {
- //如果sinkType = kafka ,说明是事实数据,通过主流输出
- out.collect(jsonObj);
- }

生命周期方法,初始化连接,初始化配置表信息并开启定时任务,用于不断读取配置表信息
- //在函数被调用的时候执行的方法,执行一次
- @Override
- public void open(Configuration parameters) throws Exception {
- //初始化Phoenix连接
- Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
- conn = DriverManager.getConnection(GmallConfig.PHOENIX_SERVER);
-
- //初始化配置表信息
- refreshMeta(); //========1.从MySQL数据库配置表中查询配置信息
-
- //开启一个定时任务
- // 因为配置表的数据可能会发生变化,每隔一段时间就从配置表中查询一次数据,更新到map,并检查建表
- //从现在起过delay毫秒后,每隔period执行一次
- Timer timer = new Timer();
- timer.schedule(new TimerTask() {
- @Override
- public void run() {
- refreshMeta();
- }
- }, 5000, 5000);
- }

注意:为了开启 hbase 的 namespace 和 phoenix 的 schema 的映射,在程序中需要加这
个配置文件,另外在 linux 服务上,也需要在 hbase 以及 phoenix 的 hbase-site.xml 配置
文件中,加上以上两个配置,并使用 xsync 进行同步
- /**
- * Desc: 通过 Phoenix 向 Hbase 表中写数据
- */
- public class DimSink extends RichSinkFunction<JSONObject> {
DWD 的实时计算核心就是数据分流,其次是状态识别。在开发过程中我们实践了几个
灵活度较强算子,比如 RichMapFunction, ProcessFunction, RichSinkFunction。 那这
几个我们什么时候会用到呢?如何选择?
从对比表中能明显看出,Rich 系列能功能强大,ProcessFunction 功能更强大,但是相对的越全面的算子使用起来也更加繁琐。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。