当前位置:   article > 正文

基于FlinkCDC 和upsert-kafka的flinkSQL的纬度表关联_flink cdc upsert-kafka

flink cdc upsert-kafka

基于FlinkCDC 和upsert-kafka的flinkSQL的纬度表关联

一、数据存入kafka作为纬度表关联
要想存入kafka的数据能在多个程序中作为纬度表关联使用,则必须要保存全量的的纬度数据在kafka中,这就要求kafka的日志清理策略不能为delete,因为这种策略会删除历史数据且无法证每个join的key保留到最新的数据,所以需要采用compact的清理策略,相同key的数据至少会保留一条最新的数据,这个清理策略的触发由相关参数控制。

创建topic的测试实例 相关参数可进行调整

./kafka-topics.sh --zookeeper zookeeper:2181 --create --topic iot_device_update_kafka_time --partitions 1    --replication-factor 1  --config cleanup.policy=compact --config segment.ms=5000 --config min.cleanable.dirty.ratio=0.001 --config delete.retention.ms=1000  
  • 1

cleanup.policy=compact 开启compact
segment.ms=5000 多久生成segment 这里为了测试方便进行修改
min.cleanable.dirty.ratio 触发compact的一种机制 越大越高效越小频率越高。耗性能 测试方便触发频率跳高则该数值调小
delete.retention.ms=1000 valuew值为null的清理事件触发条件 默认为24h

二、相关代码实现

object Topic_stream_test {

  def main(args: Array[String]): Unit = {
    //TODO 创建运行时环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //TODO 设置使用的tableAPI类型
    val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
    //TODO 穿件table的运行时环境
    val btEnv = StreamTableEnvironment.create(env, bsSettings)

    //TODO 创建主流的Source表

    //    需要主流表和纬度表 都要有watermark 都是根据wartermark来触发的
    btEnv.executeSql(
      """CREATE TABLE topic_resource_stream (
        |  `device_id` STRING,
        |  `resource_id` STRING,
        |  `s1` STRING,
        |  `timeStamp` TIMESTAMP(3) ,
        |  WATERMARK FOR `timeStamp` as `timeStamp` - INTERVAL '10' SECOND
        |) WITH (
        |  'connector' = 'kafka',
        |  'topic' = 'topic_resource_stream5',
        |  'properties.bootstrap.servers' = 'localhost:9092',
        |  'properties.group.id' = 'topic_resource_stream3',
        |  'scan.startup.mode' = 'earliest-offset',
        |  'format' = 'json'
        |)""".stripMargin)


    btEnv.executeSql(
      """CREATE TABLE iot_device (
        |  id BIGINT,
        |  device_id STRING,
        |  state INT ,
        |  model STRING,
        |  model_type INT ,
        |  update_time TIMESTAMP(3) ,
        |  WATERMARK FOR `update_time` as `update_time` - INTERVAL '10' SECOND
        |) WITH (
        |  'connector' = 'mysql-cdc',
        |  'hostname' = 'localhost',
        |  'port' = '3306',
        |  'username' = 'root',
        |  'password' = '123456',
        |  'database-name' = 'tableTest',
        |  'table-name' = 'iot_device_time'
        |)""".stripMargin)



    btEnv.executeSql(
      """
        |CREATE TABLE iot_device_update_kafka (
        |  id BIGINT,
        |  device_id STRING,
        |  state INT ,
        |  model STRING,
        |  model_type INT ,
        |  update_time TIMESTAMP(3) ,
        |  WATERMARK FOR `update_time` as `update_time` - INTERVAL '10' SECOND ,
        |  PRIMARY KEY (device_id) NOT ENFORCED
        |) WITH (
        |  'connector' = 'upsert-kafka',
        |  'topic' = 'iot_device_update_kafka_time',
        |  'properties.bootstrap.servers' = 'localhost:9092',
        |  'key.format' = 'json',
        |  'value.format' = 'json'
        |)
        |""".stripMargin)

    btEnv.executeSql(
      """
        |insert into iot_device_update_kafka select id ,device_id,state ,model ,model_type,update_time from iot_device
        |""".stripMargin)

    btEnv.executeSql(
      """
        |select
        |topic_resource_stream.device_id,
        |resource_id,
        |s1,
        |state,
        |model
        |from topic_resource_stream
        |left join iot_device_update_kafka FOR SYSTEM_TIME AS OF topic_resource_stream.`timeStamp`
        |on topic_resource_stream.device_id=iot_device_update_kafka.device_id
        |""".stripMargin).print()

  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91

上述的纬表关联都是通过watermark对齐后触发的,如果纬度表的水位线一直未更新,则主流的数据就会挤压缓存起来等待纬表水位线对齐然后关联,切changelog的纬度表目前只支持事件事件。
还未总结完 待更。。。。。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/不正经/article/detail/723328
推荐阅读
相关标签
  

闽ICP备14008679号