赞
踩
一、数据存入kafka作为纬度表关联
要想存入kafka的数据能在多个程序中作为纬度表关联使用,则必须要保存全量的的纬度数据在kafka中,这就要求kafka的日志清理策略不能为delete,因为这种策略会删除历史数据且无法证每个join的key保留到最新的数据,所以需要采用compact的清理策略,相同key的数据至少会保留一条最新的数据,这个清理策略的触发由相关参数控制。
创建topic的测试实例 相关参数可进行调整
./kafka-topics.sh --zookeeper zookeeper:2181 --create --topic iot_device_update_kafka_time --partitions 1 --replication-factor 1 --config cleanup.policy=compact --config segment.ms=5000 --config min.cleanable.dirty.ratio=0.001 --config delete.retention.ms=1000
cleanup.policy=compact 开启compact
segment.ms=5000 多久生成segment 这里为了测试方便进行修改
min.cleanable.dirty.ratio 触发compact的一种机制 越大越高效越小频率越高。耗性能 测试方便触发频率跳高则该数值调小
delete.retention.ms=1000 valuew值为null的清理事件触发条件 默认为24h
二、相关代码实现
object Topic_stream_test { def main(args: Array[String]): Unit = { //TODO 创建运行时环境 val env = StreamExecutionEnvironment.getExecutionEnvironment //TODO 设置使用的tableAPI类型 val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() //TODO 穿件table的运行时环境 val btEnv = StreamTableEnvironment.create(env, bsSettings) //TODO 创建主流的Source表 // 需要主流表和纬度表 都要有watermark 都是根据wartermark来触发的 btEnv.executeSql( """CREATE TABLE topic_resource_stream ( | `device_id` STRING, | `resource_id` STRING, | `s1` STRING, | `timeStamp` TIMESTAMP(3) , | WATERMARK FOR `timeStamp` as `timeStamp` - INTERVAL '10' SECOND |) WITH ( | 'connector' = 'kafka', | 'topic' = 'topic_resource_stream5', | 'properties.bootstrap.servers' = 'localhost:9092', | 'properties.group.id' = 'topic_resource_stream3', | 'scan.startup.mode' = 'earliest-offset', | 'format' = 'json' |)""".stripMargin) btEnv.executeSql( """CREATE TABLE iot_device ( | id BIGINT, | device_id STRING, | state INT , | model STRING, | model_type INT , | update_time TIMESTAMP(3) , | WATERMARK FOR `update_time` as `update_time` - INTERVAL '10' SECOND |) WITH ( | 'connector' = 'mysql-cdc', | 'hostname' = 'localhost', | 'port' = '3306', | 'username' = 'root', | 'password' = '123456', | 'database-name' = 'tableTest', | 'table-name' = 'iot_device_time' |)""".stripMargin) btEnv.executeSql( """ |CREATE TABLE iot_device_update_kafka ( | id BIGINT, | device_id STRING, | state INT , | model STRING, | model_type INT , | update_time TIMESTAMP(3) , | WATERMARK FOR `update_time` as `update_time` - INTERVAL '10' SECOND , | PRIMARY KEY (device_id) NOT ENFORCED |) WITH ( | 'connector' = 'upsert-kafka', | 'topic' = 'iot_device_update_kafka_time', | 'properties.bootstrap.servers' = 'localhost:9092', | 'key.format' = 'json', | 'value.format' = 'json' |) |""".stripMargin) btEnv.executeSql( """ |insert into iot_device_update_kafka select id ,device_id,state ,model ,model_type,update_time from iot_device |""".stripMargin) btEnv.executeSql( """ |select |topic_resource_stream.device_id, |resource_id, |s1, |state, |model |from topic_resource_stream |left join iot_device_update_kafka FOR SYSTEM_TIME AS OF topic_resource_stream.`timeStamp` |on topic_resource_stream.device_id=iot_device_update_kafka.device_id |""".stripMargin).print() } }
上述的纬表关联都是通过watermark对齐后触发的,如果纬度表的水位线一直未更新,则主流的数据就会挤压缓存起来等待纬表水位线对齐然后关联,切changelog的纬度表目前只支持事件事件。
还未总结完 待更。。。。。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。