赞
踩
注意:
1 kafka偏移量有建议state中flink自己存储的为准,除非flink自己维护的状态丢了 这是才会考虑使用kafka中存储的偏移量.
2 flink不同版本 kafka 消费者写法不同 文中已贴官网连接
3 自己测试可以配合datagrip的bigdatatools实现kafka的数据发送 方便测试
- package com.dahuatech.dm.fc.demo
-
- import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner
- import org.apache.flink.api.common.eventtime.WatermarkStrategy
- import org.apache.flink.api.common.restartstrategy.RestartStrategies
- import org.apache.flink.api.common.serialization.SimpleStringSchema
- import org.apache.flink.api.common.time.Time
- import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
- import org.apache.flink.streaming.api.CheckpointingMode
- import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
- import org.apache.flink.streaming.api.scala._
- import org.apache.flink.api.scala._
- import org.apache.flink.streaming.connectors.kafka.Fl
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。