当前位置:   article > 正文

大数据项目之Flink实时数仓(DWD/DIM层)_flink dwd

flink dwd

上文 >>>大数据项目之Flink实时数仓(数据采集/ODS层)
接着:
上一篇文章中简单把实时数仓数据采集以及ODS层搭建完成,开始搭建DWD层
DWD层搭建思路:从kafka的ods层读取用户行为数据和业务数据,进行简单处理,再写入到kafka dwd层

用户日志数据(DWD)

分析用户日志数据

首先分析ods层的用户日志数据,分为页面日志,启动日志,曝光日志,三类数据结构不同,需要进行拆分,将拆分后数据再写回到kafka,作为日志dwd层
页面日志输出到主流,启动日志输出到启动侧输出流,曝光日志输出到曝光侧输出流\

kafka工具类提供获取kafka消费者的方法:

 public static FlinkKafkaConsumer<String> getKafkaSource(String topic, String groupId) {
 //给配置信息对象添加配置项
 properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
 //获取 KafkaSource
 return new FlinkKafkaConsumer<String>(topic, new SimpleStringSchema(), properties);
 }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

Flink调用消费者读取数据并进行处理:

数据流:web/app -> Nginx -> SpringBoot -> Kafka(ods) -> FlinkApp -> Kafka(dwd)
程 序:mockLog -> Nginx -> Logger.sh -> Kafka(ZK) -> BaseLogApp -> kafka

        //TODO 1.获取执行环境
       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
       env.setParallelism(1);

       //TODO 2.消费 ods_base_log 主题数据创建流
       String sourceTopic = "ods_base_log";
       String groupId = "base_log_app_210325";
       DataStreamSource<String> kafkaDS = env.addSource(MyKafkaUtil.getKafkaConsumer(sourceTopic, groupId));

       //TODO 3.将每行数据转换为JSON对象(过滤脏数据)
       OutputTag<String> outputTag = new OutputTag<String>("Dirty") {};
       SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.process(new ProcessFunction<String, JSONObject>() {
           @Override
           public void processElement(String value, Context ctx, Collector<JSONObject> out) throws Exception {
               try {
                   //将value转为Json格式
                   JSONObject jsonObject = JSON.parseObject(value);
                   out.collect(jsonObject);
               } catch (Exception e) {
                   //发生异常,将数据写入侧输出流
                   ctx.output(outputTag, value);
               }
           }
       });
       //TODO 4.新老用户校验  状态编程
       SingleOutputStreamOperator<JSONObject> jsonObjWithNewFlagDS = jsonObjDS.keyBy(jsonObj -> jsonObj.getJSONObject("common").getString("mid"))
               .map(new RichMapFunction<JSONObject, JSONObject>() {
                   private ValueState<String> valueState;
                   @Override
                   public void open(Configuration parameters) throws Exception {
                       valueState = getRuntimeContext().getState(new ValueStateDescriptor<String>("value-state", String.class));
                   }
                   @Override
                   public JSONObject map(JSONObject value) throws Exception {
                       //获取数据中的"is_new"标记
                       String isNew = value.getJSONObject("common").getString("is_new");
                       //判断isNew标记是否为"1"
                       if ("1".equals(isNew)) {
                           //获取状态数据
                           String state = valueState.value();
                           if (state != null) {
                               //修改isNew标记
                               value.getJSONObject("common").put("is_new", "0");
                           } else {
                               valueState.update("1");
                           }
                       }
                       return value;
                   }
               });
       //TODO 5.分流  侧输出流  页面:主流  启动:侧输出流  曝光:侧输出流
       OutputTag<String> startTag = new OutputTag<String>("start") {};
       OutputTag<String> displayTag = new OutputTag<String>("display") {};
       SingleOutputStreamOperator<String> pageDS = jsonObjWithNewFlagDS.process(new ProcessFunction<JSONObject, String>() {
           @Override
           public void processElement(JSONObject value, Context ctx, Collector<String> out) throws Exception {
               //获取启动日志字段
               String start = value.getString("start");
               if (start != null && start.length() > 0) {
                   //将数据写入启动日志侧输出流
                   ctx.output(startTag, value.toJSONString());
               } else {
                   //将数据写入页面日志主流
                   out.collect(value.toJSONString());
                   //取出数据中的曝光数据
                   JSONArray displays = value.getJSONArray("displays");
                   if (displays != null && displays.size() > 0) {
                       //获取页面ID
                       String pageId = value.getJSONObject("page").getString("page_id");
                       for (int i = 0; i < displays.size(); i++) {
                           JSONObject display = displays.getJSONObject(i);
                           //添加页面id
                           display.put("page_id", pageId);
                           //将输出写出到曝光侧输出流
                           ctx.output(displayTag, display.toJSONString());
                       }
                   }
               }
           }
       });
       
       //TODO 6.提取侧输出流
       DataStream<String> startDS = pageDS.getSideOutput(startTag);
       DataStream<String> displayDS = pageDS.getSideOutput(displayTag);
       
       //TODO 7.将三个流进行打印并输出到对应的Kafka主题中
       startDS.print("Start>>>>>>>>>>>");
       pageDS.print("Page>>>>>>>>>>>");
       displayDS.print("Display>>>>>>>>>>>>");
       
       startDS.addSink(MyKafkaUtil.getKafkaProducer("dwd_start_log"));
       pageDS.addSink(MyKafkaUtil.getKafkaProducer("dwd_page_log"));
       displayDS.addSink(MyKafkaUtil.getKafkaProducer("dwd_display_log"));

       //TODO 8.启动任务
       env.execute("BaseLogApp");
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96

业务数据(DWD)

分析业务数据

业务数据的变化,可以通过FlinkCDC采集到,但是FlinkCDC将数据统一写入一个Topic中,但是数据包含事实数据和维度数据,数据是从kafka业务数据ods层读取数据,经过处理后将维度数据保存在hbase中, 将事实数据写入kafka作为业务数据的dwd层。

封装Sink数据到Kafka主题方法:

public static <T> FlinkKafkaProducer<T> getKafkaSinkBySchema(KafkaSerializationSchema<T>
kafkaSerializationSchema) {
 properties.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 5 * 60 * 1000 +
"");
 return new FlinkKafkaProducer<T>(DEFAULT_TOPIC,
 kafkaSerializationSchema,
 properties,
 FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

接受Kafka数据进行处理作为业务数据DWD层:

        //TODO 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //TODO 2.消费Kafka ods_base_db 主题数据创建流
        String sourceTopic = "ods_base_db";
        String groupId = "base_db_app_210325";
        DataStreamSource<String> kafkaDS = env.addSource(MyKafkaUtil.getKafkaConsumer(sourceTopic, groupId));

        //TODO 3.将每行数据转换为JSON对象并过滤(delete) 主流
        SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.map(JSON::parseObject)
                .filter(new FilterFunction<JSONObject>() {
                    @Override
                    public boolean filter(JSONObject value) throws Exception {
                        //取出数据的操作类型
                        String type = value.getString("type");

                        return !"delete".equals(type);
                    }
                });

        //TODO 4.使用FlinkCDC消费配置表并处理成         广播流
        DebeziumSourceFunction<String> sourceFunction = MySQLSource.<String>builder()
                .hostname("master")
                .port(3306)
                .username("root")
                .password("000000")
                .databaseList("mall2021_realtime")
                .tableList("mall2021_realtime.table_process")
                .startupOptions(StartupOptions.initial())
                .deserializer(new CustomerDeserialization())
                .build();
        DataStreamSource<String> tableProcessStrDS = env.addSource(sourceFunction);
        MapStateDescriptor<String, TableProcess> mapStateDescriptor = new MapStateDescriptor<>("map-state", String.class, TableProcess.class);
        BroadcastStream<String> broadcastStream = tableProcessStrDS.broadcast(mapStateDescriptor);

        //TODO 5.连接主流和广播流
        BroadcastConnectedStream<JSONObject, String> connectedStream = jsonObjDS.connect(broadcastStream);

        //TODO 6.分流  处理数据  广播流数据,主流数据(根据广播流数据进行处理)
        OutputTag<JSONObject> hbaseTag = new OutputTag<JSONObject>("hbase-tag") {

        };
        SingleOutputStreamOperator<JSONObject> kafka = connectedStream.process(new TableProcessFunction(hbaseTag, mapStateDescriptor));

        //TODO 7.提取Kafka流数据和HBase流数据
        DataStream<JSONObject> hbase = kafka.getSideOutput(hbaseTag);
        //TODO 8.将Kafka数据写入Kafka主题,将HBase数据写入Phoenix表
        kafka.print("Kafka>>>>>>>>");
        hbase.print("HBase>>>>>>>>");

        hbase.addSink(new DimSinkFunction());
        kafka.addSink(MyKafkaUtil.getKafkaProducer(new KafkaSerializationSchema<JSONObject>() {
            @Override
            public ProducerRecord<byte[], byte[]> serialize(JSONObject element, @Nullable Long timestamp) {
                return new ProducerRecord<byte[], byte[]>(element.getString("sinkTable"),
                        element.getString("after").getBytes());
            }
        }));
        
        //TODO 9.启动任务
        env.execute("BaseDBApp");
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62

注意这里有个实体类:

@Data
public class TableProcess {
 //动态分流 Sink 常量
 public static final String SINK_TYPE_HBASE = "hbase";
 public static final String SINK_TYPE_KAFKA = "kafka";
 public static final String SINK_TYPE_CK = "clickhouse"; //暂时用不到
 //来源表
 String sourceTable;
 //操作类型 insert,update,delete
 String operateType;
 //输出类型 hbase kafka
 String sinkType;
 //输出表(主题)
 String sinkTable;
 //输出字段
 String sinkColumns;
 //主键字段
 String sinkPk;
 //建表扩展
 String sinkExtend;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

常用配置常量值封装

//Phoenix 库名
 public static final String HBASE_SCHEMA = "MALL2021_REALTIME";
 //Phoenix 驱动
 public static final String PHOENIX_DRIVER = "org.apache.phoenix.jdbc.PhoenixDriver";
 //Phoenix 连接参数
 public static final String PHOENIX_SERVER =
"jdbc:phoenix:master,slave,slave1:2181";
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

TableProcessFunction

    public TableProcessFunction(OutputTag<JSONObject> objectOutputTag, MapStateDescriptor<String, TableProcess> mapStateDescriptor) {
        this.objectOutputTag = objectOutputTag;
        this.mapStateDescriptor = mapStateDescriptor;
    }
  • 1
  • 2
  • 3
  • 4

TableProcessFunction-open

    public void open(Configuration parameters) throws Exception {
        Class.forName(GmallConfig.PHOENIX_DRIVER);
        connection = DriverManager.getConnection(GmallConfig.PHOENIX_SERVER);
    }
  • 1
  • 2
  • 3
  • 4

TableProcessFunction-processBroadcastElement

//value:{“db”:“”,“tn”:“”,“before”:{},“after”:{},“type”:“”} //数据格式

    public void processBroadcastElement(String value, Context ctx, Collector<JSONObject> out) throws Exception {
        //1.获取并解析数据
        JSONObject jsonObject = JSON.parseObject(value);
        String data = jsonObject.getString("after");
        TableProcess tableProcess = JSON.parseObject(data, TableProcess.class);
        //2.建表
        if (TableProcess.SINK_TYPE_HBASE.equals(tableProcess.getSinkType())) {
            checkTable(tableProcess.getSinkTable(),
                    tableProcess.getSinkColumns(),
                    tableProcess.getSinkPk(),
                    tableProcess.getSinkExtend());
        }
        //3.写入状态,广播出去
        BroadcastState<String, TableProcess> broadcastState = ctx.getBroadcastState(mapStateDescriptor);
        String key = tableProcess.getSourceTable() + "-" + tableProcess.getOperateType();
        broadcastState.put(key, tableProcess);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

TableProcessFunction-checkTable

create table if not exists db.tn(id varchar primary key,tm_name varchar) xxx; //建表语句

    private void checkTable(String sinkTable, String sinkColumns, String sinkPk, String sinkExtend) {
        PreparedStatement preparedStatement = null;
        try {
            if (sinkPk == null) {
                sinkPk = "id";
            }
            if (sinkExtend == null) {
                sinkExtend = "";
            }
            StringBuffer createTableSQL = new StringBuffer("create table if not exists ")
                    .append(GmallConfig.HBASE_SCHEMA)
                    .append(".")
                    .append(sinkTable)
                    .append("(");
            String[] fields = sinkColumns.split(",");
            for (int i = 0; i < fields.length; i++) {
                String field = fields[i];
                //判断是否为主键
                if (sinkPk.equals(field)) {
createTableSQL.append(field).append(" varchar primary key ");
                } else {
createTableSQL.append(field).append(" varchar ");
                }
                //判断是否为最后一个字段,如果不是,则添加","
                if (i < fields.length - 1) {
                    createTableSQL.append(",");
                }
            }
 createTableSQL.append(")").append(sinkExtend);
            //打印建表语句
            System.out.println(createTableSQL);
            //预编译SQL
            preparedStatement = connection.prepareStatement(createTableSQL.toString());
            //执行
            preparedStatement.execute();
        } catch (SQLException e) {
            throw new RuntimeException("Phoenix表" + sinkTable + "建表失败!");
        } finally {
            if (preparedStatement != null) {
                try {
                    preparedStatement.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47

TableProcessFunction-processElement

value:{“db”:“”,“tn”:“”,“before”:{},“after”:{},“type”:“”}

    public void processElement(JSONObject value, ReadOnlyContext ctx, Collector<JSONObject> out) throws Exception {
        //1.获取状态数据
        ReadOnlyBroadcastState<String, TableProcess> broadcastState = ctx.getBroadcastState(mapStateDescriptor);
        String key = value.getString("tableName") + "-" + value.getString("type");
        TableProcess tableProcess = broadcastState.get(key);
        if (tableProcess != null) {
            //2.过滤字段
            JSONObject data = value.getJSONObject("after");
            filterColumn(data, tableProcess.getSinkColumns());
            //3.分流
            //将输出表/主题信息写入Value
            value.put("sinkTable", tableProcess.getSinkTable());
            String sinkType = tableProcess.getSinkType();
            if (TableProcess.SINK_TYPE_KAFKA.equals(sinkType)) {
                //Kafka数据,写入主流
                out.collect(value);
            } else if (TableProcess.SINK_TYPE_HBASE.equals(sinkType)) {
                //HBase数据,写入侧输出流
                ctx.output(objectOutputTag, value);
            }
        } else {
            System.out.println("该组合Key:" + key + "不存在!");
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

TableProcessFunction-filterColumn

@param data
{“id”:“11”,“tm_name”:“keven”,“logo_url”:“aaa”}
@param sinkColumns id,tm_name
{“id”:“11”,“tm_name”:“keven”}

    private void filterColumn(JSONObject data, String sinkColumns) {
        String[] fields = sinkColumns.split(",");
        List<String> columns = Arrays.asList(fields);
        data.entrySet().removeIf(next -> !columns.contains(next.getKey()));
    }
  • 1
  • 2
  • 3
  • 4
  • 5

BaseDBApp 中调用 TableProcessFunction 进行分流

        //TODO 5.连接主流和广播流
        BroadcastConnectedStream<JSONObject, String> connectedStream = jsonObjDS.connect(broadcastStream);
        //TODO 6.分流  处理数据  广播流数据,主流数据(根据广播流数据进行处理)
        OutputTag<JSONObject> hbaseTag = new OutputTag<JSONObject>("hbase-tag") {
        };
        SingleOutputStreamOperator<JSONObject> kafka = connectedStream.process(new TableProcessFunction(hbaseTag, mapStateDescriptor));
        //TODO 7.提取Kafka流数据和HBase流数据
        DataStream<JSONObject> hbase = kafka.getSideOutput(hbaseTag);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

维度数据(DIM)

分析维度数据

在这里插入图片描述
在这里插入图片描述
DimSink 继承了 RickSinkFunction,这个 function 得分两条时间线。
一条是任务启动时执行 open 操作,可以把连接的初始化工作放在此处一次性执行。
另一条是随着每条数据的到达反复执行 invoke(),实
现数据的保存,主要策略就是根据数据组合成 sql 提交给 hbase。

编写DimSink

public class DimSinkFunction extends RichSinkFunction<JSONObject> {
    private Connection connection;
    @Override
    public void open(Configuration parameters) throws Exception {
        Class.forName(mallConfig.PHOENIX_DRIVER);
        connection = DriverManager.getConnection(mallConfig.PHOENIX_SERVER);
        connection.setAutoCommit(true);
    }
    //value:{"sinkTable":"dim_base_trademark","database":"mall-210325-flink","before":{"tm_name":"keven","id":12},"after":{"tm_name":"Keven","id":12},"type":"update","tableName":"base_trademark"}
    //SQL:upsert into db.tn(id,tm_name) values('...','...')
    @Override
    public void invoke(JSONObject value, Context context) throws Exception {
        PreparedStatement preparedStatement = null;
        try {
            //获取SQL语句
            String sinkTable = value.getString("sinkTable");
            JSONObject after = value.getJSONObject("after");
            String upsertSql = genUpsertSql(sinkTable,
                    after);
            System.out.println(upsertSql);
            //预编译SQL
            preparedStatement = connection.prepareStatement(upsertSql);
            //执行插入操作
            preparedStatement.executeUpdate();
        } catch (SQLException e) {
            e.printStackTrace();
        } finally {
            if (preparedStatement != null) {
                preparedStatement.close();
            }
        }
    }
    //data:{"tm_name":"kevenhe","id":12}
    //SQL:upsert into db.tn(id,tm_name,aa,bb) values('...','...','...','...')
    private String genUpsertSql(String sinkTable, JSONObject data) {
        Set<String> keySet = data.keySet();
        Collection<Object> values = data.values();
        //keySet.mkString(",");  =>  "id,tm_name"
        return "upsert into " + GmallConfig.HBASE_SCHEMA + "." + sinkTable + "(" +
                StringUtils.join(keySet, ",") + ") values('" +
                StringUtils.join(values, "','") + "')";
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43

分流Sink业务数据保存Kafka

封装sink方法:

public static <T> FlinkKafkaProducer<T> getKafkaSinkBySchema(KafkaSerializationSchema<T>
kafkaSerializationSchema) {
properties.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 5 * 60 * 1000 +
"");
 return new FlinkKafkaProducer<T>(DEFAULT_TOPIC,
 kafkaSerializationSchema,
 properties,
 FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

总结(DWD)

DWD的实时计算核心是数据分流,再就是状态识别,看一下使用过的一些算子RichMapFunction,ProcessFunction,RichSinkFunction。关于这些算子的选择,可以参考下面:

Function可转换结构可过滤数据测输出open方法可以使用状态输出至
MapFunctionyesnononono下游算子
FilterFunctionnoyesnonono下游算子
RichMapFunctionyesnonoyesyes下游算子
RichFilterFunctionnoyesnoyesyes下游算子
ProcessFunctionyesyesyesyesyes下游算子
SinkFunctionyesyesnonono外部
RichSinkFunctionyesyesnoyesyes外部
可以看出Rich函数功能强大,Processfunction功能更强大,但是功能越全面的函数使用越繁琐。
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/943124
推荐阅读
相关标签
  

闽ICP备14008679号