当前位置:   article > 正文

基于Canal与Flink实现数据实时增量同步(二)

flink 基于日志 同步mysql宽表数据到es

点击“蓝字”关注我吧

背景

数据仓库建模中,未经任何加工处理的原始业务层数据,我们称之为ODS(Operational Data Store)数据。在互联网企业中,常见的ODS数据有业务日志数据(Log)和业务DB数据(DB)两类。对于业务DB数据来说,从MySQL等关系型数据库的业务数据进行采集,然后导入到Hive中,是进行数据仓库生产的重要环节。如何准确、高效地把MySQL数据同步到Hive中?一般常用的解决方案是批量取数并Load:直连MySQL去Select表中的数据,然后存到本地文件作为中间存储,最后把文件Load到Hive表中。这种方案的优点是实现简单,但是随着业务的发展,缺点也逐渐暴露出来:

  • 性能瓶颈:随着业务规模的增长,Select From MySQL -> Save to Localfile -> Load to Hive这种数据流花费的时间越来越长,无法满足下游数仓生产的时间要求。

  • 直接从MySQL中Select大量数据,对MySQL的影响非常大,容易造成慢查询,影响业务线上的正常服务。

  • 由于Hive本身的语法不支持更新、删除等SQL原语(高版本Hive支持,但是需要分桶+ORC存储格式),对于MySQL中发生Update/Delete的数据无法很好地进行支持。

为了彻底解决这些问题,我们逐步转向CDC (Change Data Capture) + Merge的技术方案,即实时Binlog采集 + 离线处理Binlog还原业务数据这样一套解决方案。Binlog是MySQL的二进制日志,记录了MySQL中发生的所有数据变更,MySQL集群自身的主从同步就是基于Binlog做的。

实现思路

首先,采用Flink负责把Kafka上的Binlog数据拉取到HDFS上。

然后,对每张ODS表,首先需要一次性制作快照(Snapshot),把MySQL里的存量数据读取到Hive上,这一过程底层采用直连MySQL去Select数据的方式,可以使用Sqoop进行一次性全量导入。

最后,对每张ODS表,每天基于存量数据和当天增量产生的Binlog做Merge,从而还原出业务数据。

Binlog是流式产生的,通过对Binlog的实时采集,把部分数据处理需求由每天一次的批处理分摊到实时流上。无论从性能上还是对MySQL的访问压力上,都会有明显地改善。Binlog本身记录了数据变更的类型(Insert/Update/Delete),通过一些语义方面的处理,完全能够做到精准的数据还原。

实现方案

Flink处理Kafka的binlog日志

使用kafka source,对读取的数据进行JSON解析,将解析的字段拼接成字符串,符合Hive的schema格式,具体代码如下:

  1. package com.etl.kafka2hdfs;
  2. import com.alibaba.fastjson.JSON;
  3. import com.alibaba.fastjson.JSONArray;
  4. import com.alibaba.fastjson.JSONObject;
  5. import com.alibaba.fastjson.parser.Feature;
  6. import org.apache.flink.api.common.functions.FilterFunction;
  7. import org.apache.flink.api.common.functions.MapFunction;
  8. import org.apache.flink.api.common.serialization.SimpleStringEncoder;
  9. import org.apache.flink.api.common.serialization.SimpleStringSchema;
  10. import org.apache.flink.core.fs.Path;
  11. import org.apache.flink.runtime.state.StateBackend;
  12. import org.apache.flink.runtime.state.filesystem.FsStateBackend;
  13. import org.apache.flink.streaming.api.datastream.DataStream;
  14. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  15. import org.apache.flink.streaming.api.environment.CheckpointConfig;
  16. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  17. import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
  18. import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
  19. import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
  20. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
  21. import java.util.Map;
  22. import java.util.Properties;
  23. /**
  24.  *  @Created with IntelliJ IDEA.
  25.  *  @author : jmx
  26.  *  @Date: 2020/3/27
  27.  *  @Time: 12:52
  28.  *  
  29.  */
  30. public class HdfsSink {
  31.     public static void main(String[] args) throws Exception {
  32.         String fieldDelimiter = ",";
  33.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  34.         env.setParallelism(1);
  35.         // checkpoint
  36.         env.enableCheckpointing(10_000);
  37.         //env.setStateBackend((StateBackend) new FsStateBackend("file:///E://checkpoint"));
  38.         env.setStateBackend((StateBackend) new FsStateBackend("hdfs://kms-1:8020/checkpoint"));
  39.         CheckpointConfig config = env.getCheckpointConfig();
  40.         config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
  41.         // source
  42.         Properties props = new Properties();
  43.         props.setProperty("bootstrap.servers""kms-2:9092,kms-3:9092,kms-4:9092");
  44.         // only required for Kafka 0.8
  45.         props.setProperty("zookeeper.connect""kms-2:2181,kms-3:2181,kms-4:2181");
  46.         props.setProperty("group.id""test123");
  47.         FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
  48.                 "qfbap_ods.code_city"new SimpleStringSchema(), props);
  49.         consumer.setStartFromEarliest();
  50.         DataStream<String> stream = env.addSource(consumer);
  51.         // transform
  52.         SingleOutputStreamOperator<String> cityDS = stream
  53.                 .filter(new FilterFunction<String>() {
  54.                     // 过滤掉DDL操作
  55.                     @Override
  56.                     public boolean filter(String jsonVal) throws Exception {
  57.                         JSONObject record = JSON.parseObject(jsonVal, Feature.OrderedField);
  58.                         return record.getString("isDdl").equals("false");
  59.                     }
  60.                 })
  61.                 .map(new MapFunction<String, String>() {
  62.                     @Override
  63.                     public String map(String value) throws Exception {
  64.                         StringBuilder fieldsBuilder = new StringBuilder();
  65.                         // 解析JSON数据
  66.                         JSONObject record = JSON.parseObject(value, Feature.OrderedField);
  67.                         // 获取最新的字段值
  68.                         JSONArray data = record.getJSONArray("data");
  69.                         // 遍历,字段值的JSON数组,只有一个元素
  70.                         for (int i = 0; i < data.size(); i++) {
  71.                             // 获取到JSON数组的第i个元素
  72.                             JSONObject obj = data.getJSONObject(i);
  73.                             if (obj != null) {
  74.                                 fieldsBuilder.append(record.getLong("id")); // 序号id
  75.                                 fieldsBuilder.append(fieldDelimiter); // 字段分隔符
  76.                                 fieldsBuilder.append(record.getLong("es")); //业务时间戳
  77.                                 fieldsBuilder.append(fieldDelimiter);
  78.                                 fieldsBuilder.append(record.getLong("ts")); // 日志时间戳
  79.                                 fieldsBuilder.append(fieldDelimiter);
  80.                                 fieldsBuilder.append(record.getString("type")); // 操作类型
  81.                                 for (Map.Entry<String, Object> entry : obj.entrySet()) {
  82.                                     fieldsBuilder.append(fieldDelimiter);
  83.                                     fieldsBuilder.append(entry.getValue()); // 表字段数据
  84.                                 }
  85.                             }
  86.                         }
  87.                         return fieldsBuilder.toString();
  88.                     }
  89.                 });
  90.         //cityDS.print();
  91.         //stream.print();
  92.         // sink
  93.         // 以下条件满足其中之一就会滚动生成新的文件
  94.         RollingPolicy<String, String> rollingPolicy = DefaultRollingPolicy.create()
  95.                 .withRolloverInterval(60L * 1000L) //滚动写入新文件的时间,默认60s。根据具体情况调节
  96.                 .withMaxPartSize(1024 * 1024 * 128L) //设置每个文件的最大大小 ,默认是128M,这里设置为128M
  97.                 .withInactivityInterval(60L * 1000L) //默认60秒,未写入数据处于不活跃状态超时会滚动新文件
  98.                 .build();
  99.         StreamingFileSink<String> sink = StreamingFileSink
  100.                 //.forRowFormat(new Path("file:///E://binlog_db/city"), new SimpleStringEncoder<String>())
  101.                 .forRowFormat(new Path("hdfs://kms-1:8020/binlog_db/code_city_delta"), new SimpleStringEncoder<String>())
  102.                 .withBucketAssigner(new EventTimeBucketAssigner())
  103.                 .withRollingPolicy(rollingPolicy)
  104.                 .withBucketCheckInterval(1000)  // 桶检查间隔,这里设置1S
  105.                 .build();
  106.         cityDS.addSink(sink);
  107.         env.execute();
  108.     }
  109. }

对于Flink Sink到HDFS,StreamingFileSink 替代了先前的 BucketingSink,用来将上游数据存储到 HDFS 的不同目录中。它的核心逻辑是分桶,默认的分桶方式是 DateTimeBucketAssigner,即按照处理时间分桶。处理时间指的是消息到达 Flink 程序的时间,这点并不符合我们的需求。因此,我们需要自己编写代码将事件时间从消息体中解析出来,按规则生成分桶的名称,具体代码如下:

  1. package com.etl.kafka2hdfs;
  2. import org.apache.flink.core.io.SimpleVersionedSerializer;
  3. import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
  4. import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
  5. import java.text.SimpleDateFormat;
  6. import java.util.Date;
  7. /**
  8.  *  @Created with IntelliJ IDEA.
  9.  *  @author : jmx
  10.  *  @Date: 2020/3/27
  11.  *  @Time: 12:49
  12.  *  
  13.  */
  14. public class EventTimeBucketAssigner implements BucketAssigner<String, String> {
  15.     @Override
  16.     public String getBucketId(String element, Context context) {
  17.         String partitionValue;
  18.         try {
  19.             partitionValue = getPartitionValue(element);
  20.         } catch (Exception e) {
  21.             partitionValue = "00000000";
  22.         }
  23.         return "dt=" + partitionValue;//分区目录名称
  24.     }
  25.     @Override
  26.     public SimpleVersionedSerializer<String> getSerializer() {
  27.         return SimpleVersionedStringSerializer.INSTANCE;
  28.     }
  29.     private String getPartitionValue(String element) throws Exception {
  30.         // 取出最后拼接字符串的es字段值,该值为业务时间
  31.         long eventTime = Long.parseLong(element.split(",")[1]);
  32.         Date eventDate = new Date(eventTime);
  33.         return new SimpleDateFormat("yyyyMMdd").format(eventDate);
  34.     }
  35. }

离线还原MySQL数据

经过上述步骤,即可将Binlog日志记录写入到HDFS的对应的分区中,接下来就需要根据增量的数据和存量的数据还原最新的数据。Hive 表保存在 HDFS 上,该文件系统不支持修改,因此我们需要一些额外工作来写入数据变更。常用的方式包括:JOIN、Hive 事务、或改用 HBase、kudu。

如昨日的存量数据code_city,今日增量的数据为code_city_delta,可以通过 FULL OUTER JOIN,将存量和增量数据合并成一张最新的数据表,并作为明天的存量数据:

  1. INSERT OVERWRITE TABLE code_city
  2. SELECT 
  3.         COALESCE( t2.id, t1.id ) AS id,
  4.         COALESCE ( t2.city, t1.city ) AS city,
  5.         COALESCE ( t2.province, t1.province ) AS province,
  6.         COALESCE ( t2.event_time, t1.event_time ) AS event_time 
  7. FROM
  8.         code_city t1
  9.         FULL OUTER JOIN (
  10. SELECT
  11.         id,
  12.         city,
  13.         province,
  14.         event_time 
  15. FROM
  16.         (-- 取最后一条状态数据
  17. SELECT
  18.         id,
  19.         city,
  20.         province,
  21.         dml_type,
  22.         event_time,
  23.         row_number ( ) over ( PARTITION BY id ORDER BY event_time DESC ) AS rank 
  24. FROM
  25.         code_city_delta 
  26. WHERE
  27.         dt = '20200324' 
  28.         ) temp 
  29. WHERE
  30.         rank = 1 
  31.         ) t2 ON t1.id = t2.id;

小结

本文主要从Binlog流式采集和基于Binlog的ODS数据还原两方面,介绍了通过Flink实现实时的ETL,此外还可以将binlog日志写入kudu、HBase等支持事务操作的NoSQL中,这样就可以省去数据表还原的步骤。本文是《基于Canal与Flink实现数据实时增量同步》的第二篇,关于canal解析Binlog日志写入kafka的实现步骤,参见《基于Canal与Flink实现数据实时增量同步一》

refrence:

[1]https://tech.meituan.com/2018/12/06/binlog-dw.html

戳原文,更有料!

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/723330
推荐阅读
相关标签
  

闽ICP备14008679号