赞
踩
上文 >>>大数据项目之Flink实时数仓(数据采集/ODS层)
接着:
上一篇文章中简单把实时数仓数据采集以及ODS层搭建完成,开始搭建DWD层
DWD层搭建思路:从kafka的ods层读取用户行为数据和业务数据,进行简单处理,再写入到kafka dwd层
首先分析ods层的用户日志数据,分为页面日志,启动日志,曝光日志,三类数据结构不同,需要进行拆分,将拆分后数据再写回到kafka,作为日志dwd层
页面日志输出到主流,启动日志输出到启动侧输出流,曝光日志输出到曝光侧输出流\
public static FlinkKafkaConsumer<String> getKafkaSource(String topic, String groupId) {
//给配置信息对象添加配置项
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
//获取 KafkaSource
return new FlinkKafkaConsumer<String>(topic, new SimpleStringSchema(), properties);
}
数据流:web/app -> Nginx -> SpringBoot -> Kafka(ods) -> FlinkApp -> Kafka(dwd)
程 序:mockLog -> Nginx -> Logger.sh -> Kafka(ZK) -> BaseLogApp -> kafka
//TODO 1.获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//TODO 2.消费 ods_base_log 主题数据创建流
String sourceTopic = "ods_base_log";
String groupId = "base_log_app_210325";
DataStreamSource<String> kafkaDS = env.addSource(MyKafkaUtil.getKafkaConsumer(sourceTopic, groupId));
//TODO 3.将每行数据转换为JSON对象(过滤脏数据)
OutputTag<String> outputTag = new OutputTag<String>("Dirty") {};
SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.process(new ProcessFunction<String, JSONObject>() {
@Override
public void processElement(String value, Context ctx, Collector<JSONObject> out) throws Exception {
try {
//将value转为Json格式
JSONObject jsonObject = JSON.parseObject(value);
out.collect(jsonObject);
} catch (Exception e) {
//发生异常,将数据写入侧输出流
ctx.output(outputTag, value);
}
}
});
//TODO 4.新老用户校验 状态编程
SingleOutputStreamOperator<JSONObject> jsonObjWithNewFlagDS = jsonObjDS.keyBy(jsonObj -> jsonObj.getJSONObject("common").getString("mid"))
.map(new RichMapFunction<JSONObject, JSONObject>() {
private ValueState<String> valueState;
@Override
public void open(Configuration parameters) throws Exception {
valueState = getRuntimeContext().getState(new ValueStateDescriptor<String>("value-state", String.class));
}
@Override
public JSONObject map(JSONObject value) throws Exception {
//获取数据中的"is_new"标记
String isNew = value.getJSONObject("common").getString("is_new");
//判断isNew标记是否为"1"
if ("1".equals(isNew)) {
//获取状态数据
String state = valueState.value();
if (state != null) {
//修改isNew标记
value.getJSONObject("common").put("is_new", "0");
} else {
valueState.update("1");
}
}
return value;
}
});
//TODO 5.分流 侧输出流 页面:主流 启动:侧输出流 曝光:侧输出流
OutputTag<String> startTag = new OutputTag<String>("start") {};
OutputTag<String> displayTag = new OutputTag<String>("display") {};
SingleOutputStreamOperator<String> pageDS = jsonObjWithNewFlagDS.process(new ProcessFunction<JSONObject, String>() {
@Override
public void processElement(JSONObject value, Context ctx, Collector<String> out) throws Exception {
//获取启动日志字段
String start = value.getString("start");
if (start != null && start.length() > 0) {
//将数据写入启动日志侧输出流
ctx.output(startTag, value.toJSONString());
} else {
//将数据写入页面日志主流
out.collect(value.toJSONString());
//取出数据中的曝光数据
JSONArray displays = value.getJSONArray("displays");
if (displays != null && displays.size() > 0) {
//获取页面ID
String pageId = value.getJSONObject("page").getString("page_id");
for (int i = 0; i < displays.size(); i++) {
JSONObject display = displays.getJSONObject(i);
//添加页面id
display.put("page_id", pageId);
//将输出写出到曝光侧输出流
ctx.output(displayTag, display.toJSONString());
}
}
}
}
});
//TODO 6.提取侧输出流
DataStream<String> startDS = pageDS.getSideOutput(startTag);
DataStream<String> displayDS = pageDS.getSideOutput(displayTag);
//TODO 7.将三个流进行打印并输出到对应的Kafka主题中
startDS.print("Start>>>>>>>>>>>");
pageDS.print("Page>>>>>>>>>>>");
displayDS.print("Display>>>>>>>>>>>>");
startDS.addSink(MyKafkaUtil.getKafkaProducer("dwd_start_log"));
pageDS.addSink(MyKafkaUtil.getKafkaProducer("dwd_page_log"));
displayDS.addSink(MyKafkaUtil.getKafkaProducer("dwd_display_log"));
//TODO 8.启动任务
env.execute("BaseLogApp");
业务数据的变化,可以通过FlinkCDC采集到,但是FlinkCDC将数据统一写入一个Topic中,但是数据包含事实数据和维度数据,数据是从kafka业务数据ods层读取数据,经过处理后将维度数据保存在hbase中, 将事实数据写入kafka作为业务数据的dwd层。
public static <T> FlinkKafkaProducer<T> getKafkaSinkBySchema(KafkaSerializationSchema<T>
kafkaSerializationSchema) {
properties.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 5 * 60 * 1000 +
"");
return new FlinkKafkaProducer<T>(DEFAULT_TOPIC,
kafkaSerializationSchema,
properties,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
}
//TODO 1.获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//TODO 2.消费Kafka ods_base_db 主题数据创建流
String sourceTopic = "ods_base_db";
String groupId = "base_db_app_210325";
DataStreamSource<String> kafkaDS = env.addSource(MyKafkaUtil.getKafkaConsumer(sourceTopic, groupId));
//TODO 3.将每行数据转换为JSON对象并过滤(delete) 主流
SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.map(JSON::parseObject)
.filter(new FilterFunction<JSONObject>() {
@Override
public boolean filter(JSONObject value) throws Exception {
//取出数据的操作类型
String type = value.getString("type");
return !"delete".equals(type);
}
});
//TODO 4.使用FlinkCDC消费配置表并处理成 广播流
DebeziumSourceFunction<String> sourceFunction = MySQLSource.<String>builder()
.hostname("master")
.port(3306)
.username("root")
.password("000000")
.databaseList("mall2021_realtime")
.tableList("mall2021_realtime.table_process")
.startupOptions(StartupOptions.initial())
.deserializer(new CustomerDeserialization())
.build();
DataStreamSource<String> tableProcessStrDS = env.addSource(sourceFunction);
MapStateDescriptor<String, TableProcess> mapStateDescriptor = new MapStateDescriptor<>("map-state", String.class, TableProcess.class);
BroadcastStream<String> broadcastStream = tableProcessStrDS.broadcast(mapStateDescriptor);
//TODO 5.连接主流和广播流
BroadcastConnectedStream<JSONObject, String> connectedStream = jsonObjDS.connect(broadcastStream);
//TODO 6.分流 处理数据 广播流数据,主流数据(根据广播流数据进行处理)
OutputTag<JSONObject> hbaseTag = new OutputTag<JSONObject>("hbase-tag") {
};
SingleOutputStreamOperator<JSONObject> kafka = connectedStream.process(new TableProcessFunction(hbaseTag, mapStateDescriptor));
//TODO 7.提取Kafka流数据和HBase流数据
DataStream<JSONObject> hbase = kafka.getSideOutput(hbaseTag);
//TODO 8.将Kafka数据写入Kafka主题,将HBase数据写入Phoenix表
kafka.print("Kafka>>>>>>>>");
hbase.print("HBase>>>>>>>>");
hbase.addSink(new DimSinkFunction());
kafka.addSink(MyKafkaUtil.getKafkaProducer(new KafkaSerializationSchema<JSONObject>() {
@Override
public ProducerRecord<byte[], byte[]> serialize(JSONObject element, @Nullable Long timestamp) {
return new ProducerRecord<byte[], byte[]>(element.getString("sinkTable"),
element.getString("after").getBytes());
}
}));
//TODO 9.启动任务
env.execute("BaseDBApp");
注意这里有个实体类:
@Data
public class TableProcess {
//动态分流 Sink 常量
public static final String SINK_TYPE_HBASE = "hbase";
public static final String SINK_TYPE_KAFKA = "kafka";
public static final String SINK_TYPE_CK = "clickhouse"; //暂时用不到
//来源表
String sourceTable;
//操作类型 insert,update,delete
String operateType;
//输出类型 hbase kafka
String sinkType;
//输出表(主题)
String sinkTable;
//输出字段
String sinkColumns;
//主键字段
String sinkPk;
//建表扩展
String sinkExtend;
}
//Phoenix 库名
public static final String HBASE_SCHEMA = "MALL2021_REALTIME";
//Phoenix 驱动
public static final String PHOENIX_DRIVER = "org.apache.phoenix.jdbc.PhoenixDriver";
//Phoenix 连接参数
public static final String PHOENIX_SERVER =
"jdbc:phoenix:master,slave,slave1:2181";
public TableProcessFunction(OutputTag<JSONObject> objectOutputTag, MapStateDescriptor<String, TableProcess> mapStateDescriptor) {
this.objectOutputTag = objectOutputTag;
this.mapStateDescriptor = mapStateDescriptor;
}
public void open(Configuration parameters) throws Exception {
Class.forName(GmallConfig.PHOENIX_DRIVER);
connection = DriverManager.getConnection(GmallConfig.PHOENIX_SERVER);
}
//value:{“db”:“”,“tn”:“”,“before”:{},“after”:{},“type”:“”} //数据格式
public void processBroadcastElement(String value, Context ctx, Collector<JSONObject> out) throws Exception {
//1.获取并解析数据
JSONObject jsonObject = JSON.parseObject(value);
String data = jsonObject.getString("after");
TableProcess tableProcess = JSON.parseObject(data, TableProcess.class);
//2.建表
if (TableProcess.SINK_TYPE_HBASE.equals(tableProcess.getSinkType())) {
checkTable(tableProcess.getSinkTable(),
tableProcess.getSinkColumns(),
tableProcess.getSinkPk(),
tableProcess.getSinkExtend());
}
//3.写入状态,广播出去
BroadcastState<String, TableProcess> broadcastState = ctx.getBroadcastState(mapStateDescriptor);
String key = tableProcess.getSourceTable() + "-" + tableProcess.getOperateType();
broadcastState.put(key, tableProcess);
}
create table if not exists db.tn(id varchar primary key,tm_name varchar) xxx; //建表语句
private void checkTable(String sinkTable, String sinkColumns, String sinkPk, String sinkExtend) {
PreparedStatement preparedStatement = null;
try {
if (sinkPk == null) {
sinkPk = "id";
}
if (sinkExtend == null) {
sinkExtend = "";
}
StringBuffer createTableSQL = new StringBuffer("create table if not exists ")
.append(GmallConfig.HBASE_SCHEMA)
.append(".")
.append(sinkTable)
.append("(");
String[] fields = sinkColumns.split(",");
for (int i = 0; i < fields.length; i++) {
String field = fields[i];
//判断是否为主键
if (sinkPk.equals(field)) {
createTableSQL.append(field).append(" varchar primary key ");
} else {
createTableSQL.append(field).append(" varchar ");
}
//判断是否为最后一个字段,如果不是,则添加","
if (i < fields.length - 1) {
createTableSQL.append(",");
}
}
createTableSQL.append(")").append(sinkExtend);
//打印建表语句
System.out.println(createTableSQL);
//预编译SQL
preparedStatement = connection.prepareStatement(createTableSQL.toString());
//执行
preparedStatement.execute();
} catch (SQLException e) {
throw new RuntimeException("Phoenix表" + sinkTable + "建表失败!");
} finally {
if (preparedStatement != null) {
try {
preparedStatement.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}
value:{“db”:“”,“tn”:“”,“before”:{},“after”:{},“type”:“”}
public void processElement(JSONObject value, ReadOnlyContext ctx, Collector<JSONObject> out) throws Exception {
//1.获取状态数据
ReadOnlyBroadcastState<String, TableProcess> broadcastState = ctx.getBroadcastState(mapStateDescriptor);
String key = value.getString("tableName") + "-" + value.getString("type");
TableProcess tableProcess = broadcastState.get(key);
if (tableProcess != null) {
//2.过滤字段
JSONObject data = value.getJSONObject("after");
filterColumn(data, tableProcess.getSinkColumns());
//3.分流
//将输出表/主题信息写入Value
value.put("sinkTable", tableProcess.getSinkTable());
String sinkType = tableProcess.getSinkType();
if (TableProcess.SINK_TYPE_KAFKA.equals(sinkType)) {
//Kafka数据,写入主流
out.collect(value);
} else if (TableProcess.SINK_TYPE_HBASE.equals(sinkType)) {
//HBase数据,写入侧输出流
ctx.output(objectOutputTag, value);
}
} else {
System.out.println("该组合Key:" + key + "不存在!");
}
}
@param data
{“id”:“11”,“tm_name”:“keven”,“logo_url”:“aaa”}
@param sinkColumns id,tm_name
{“id”:“11”,“tm_name”:“keven”}
private void filterColumn(JSONObject data, String sinkColumns) {
String[] fields = sinkColumns.split(",");
List<String> columns = Arrays.asList(fields);
data.entrySet().removeIf(next -> !columns.contains(next.getKey()));
}
//TODO 5.连接主流和广播流
BroadcastConnectedStream<JSONObject, String> connectedStream = jsonObjDS.connect(broadcastStream);
//TODO 6.分流 处理数据 广播流数据,主流数据(根据广播流数据进行处理)
OutputTag<JSONObject> hbaseTag = new OutputTag<JSONObject>("hbase-tag") {
};
SingleOutputStreamOperator<JSONObject> kafka = connectedStream.process(new TableProcessFunction(hbaseTag, mapStateDescriptor));
//TODO 7.提取Kafka流数据和HBase流数据
DataStream<JSONObject> hbase = kafka.getSideOutput(hbaseTag);
DimSink 继承了 RickSinkFunction,这个 function 得分两条时间线。
一条是任务启动时执行 open 操作,可以把连接的初始化工作放在此处一次性执行。
另一条是随着每条数据的到达反复执行 invoke(),实
现数据的保存,主要策略就是根据数据组合成 sql 提交给 hbase。
public class DimSinkFunction extends RichSinkFunction<JSONObject> {
private Connection connection;
@Override
public void open(Configuration parameters) throws Exception {
Class.forName(mallConfig.PHOENIX_DRIVER);
connection = DriverManager.getConnection(mallConfig.PHOENIX_SERVER);
connection.setAutoCommit(true);
}
//value:{"sinkTable":"dim_base_trademark","database":"mall-210325-flink","before":{"tm_name":"keven","id":12},"after":{"tm_name":"Keven","id":12},"type":"update","tableName":"base_trademark"}
//SQL:upsert into db.tn(id,tm_name) values('...','...')
@Override
public void invoke(JSONObject value, Context context) throws Exception {
PreparedStatement preparedStatement = null;
try {
//获取SQL语句
String sinkTable = value.getString("sinkTable");
JSONObject after = value.getJSONObject("after");
String upsertSql = genUpsertSql(sinkTable,
after);
System.out.println(upsertSql);
//预编译SQL
preparedStatement = connection.prepareStatement(upsertSql);
//执行插入操作
preparedStatement.executeUpdate();
} catch (SQLException e) {
e.printStackTrace();
} finally {
if (preparedStatement != null) {
preparedStatement.close();
}
}
}
//data:{"tm_name":"kevenhe","id":12}
//SQL:upsert into db.tn(id,tm_name,aa,bb) values('...','...','...','...')
private String genUpsertSql(String sinkTable, JSONObject data) {
Set<String> keySet = data.keySet();
Collection<Object> values = data.values();
//keySet.mkString(","); => "id,tm_name"
return "upsert into " + GmallConfig.HBASE_SCHEMA + "." + sinkTable + "(" +
StringUtils.join(keySet, ",") + ") values('" +
StringUtils.join(values, "','") + "')";
}
}
封装sink方法:
public static <T> FlinkKafkaProducer<T> getKafkaSinkBySchema(KafkaSerializationSchema<T>
kafkaSerializationSchema) {
properties.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 5 * 60 * 1000 +
"");
return new FlinkKafkaProducer<T>(DEFAULT_TOPIC,
kafkaSerializationSchema,
properties,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
}
DWD的实时计算核心是数据分流,再就是状态识别,看一下使用过的一些算子RichMapFunction,ProcessFunction,RichSinkFunction。关于这些算子的选择,可以参考下面:
Function | 可转换结构 | 可过滤数据 | 测输出 | open方法 | 可以使用状态 | 输出至 |
---|---|---|---|---|---|---|
MapFunction | yes | no | no | no | no | 下游算子 |
FilterFunction | no | yes | no | no | no | 下游算子 |
RichMapFunction | yes | no | no | yes | yes | 下游算子 |
RichFilterFunction | no | yes | no | yes | yes | 下游算子 |
ProcessFunction | yes | yes | yes | yes | yes | 下游算子 |
SinkFunction | yes | yes | no | no | no | 外部 |
RichSinkFunction | yes | yes | no | yes | yes | 外部 |
可以看出Rich函数功能强大,Processfunction功能更强大,但是功能越全面的函数使用越繁琐。 |
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。