赞
踩
1. 项目架构图:
第一步给Flink 添加CheckPoint
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- // 设置流式时间为EventTime
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- // 设置并行度为1
- env.setParallelism(1)
- // 测试一下
- env.fromCollection(List("hadoop", "hive")).print()
- /**
- * **********************添加 Checkpoint********************************
- * 保证程序长时间运行的安全性进行checkpoint操作
- */
- // 5秒钟启动一次checkpoint
- env.enableCheckpointing(5000)
- // 设置checkpoint 只 checkpoint 一次
- env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
- // 设置两次checkpoint 的最小时间间隔
- env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1000)
- // checkpoint 的超时时间
- env.getCheckpointConfig.setCheckpointTimeout(60000)
- // 允许的最大checkpoint 的并行度
- env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1)
- // 当程序关闭时, 触发额外的checkpoint
- env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
- // 设置checkpoint 的地址
- env.setStateBackend(new FsStateBackend("hdfs://node01.com:8020/fink-checkpoint/"))

2.第二步给flink 开始整合kafka
主要就是配置一些关于kafka 的参数等信息
- /**
- * ***************************flink 整合kafka*******************************
- */
- val properties = new Properties()
- // # Kafka集群地址
- properties.setProperty("bootstrap.servers", GlobalConfigUtil.bootstrapServers)
- // # ZooKeeper集群地址
- properties.setProperty("zookeeper.connect", GlobalConfigUtil.zookeeperConnect)
- // # Kafka Topic名称
- properties.setProperty("input.topic", GlobalConfigUtil.inputTopic)
- // # 消费组ID
- properties.setProperty("group.id", GlobalConfigUtil.groupId)
- // # 自动提交拉取到消费端的消息offset到kafka
- properties.setProperty("enable.auto.commit", GlobalConfigUtil.enableAutoCommit)
- // # 自动提交offset到zookeeper的时间间隔单位(毫秒)
- properties.setProperty("auto.commit.interval.ms", GlobalConfigUtil.autoCommitIntervalMs)
- // # 每次消费最新的数据
- properties.setProperty("auto.offset.reset", GlobalConfigUtil.autoOffsetReset)
-
- // 反序列化器 属性集合
- val consumer = new FlinkKafkaConsumer010[String](GlobalConfigUtil.inputTopic, new SimpleStringSchema(), properties)
-
- val kafkaStream: DataStream[String] = env.addSource(consumer)
- // kafkaStream.print()

3. 第三部将 获取到的json转化为元祖
- // 将json 转化为元组
-
- val tunlpDataStream = kafkaStream.map {
- msgjson =>
- val jsonObject = JSON.parseObject(msgjson)
- val message = jsonObject.getString("message")
- val count = jsonObject.getLong("count")
- val timeStamp = jsonObject.getLong("timeStamp")
-
- Message(ClickLog(message), count, timeStamp)
- }
- tunlpDataStream.print()
4,第四部添加水印,的支持
/** * ------------------------------- Flink 添加水印的支持---------------------------------- */ val watemarkDataStream = tunlpDataStream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[Message] { // 当前时间搓 var currentTimeStamp = 0l // 延迟时间 var maxDelayTime = 2000l //获取当前的时间戳 override def getCurrentWatermark: Watermark = { new Watermark(currentTimeStamp - maxDelayTime) } //获取事件的时间 override def extractTimestamp(element: Message, previousElementTimestamp: Long): Long = { currentTimeStamp = Math.max(element.timeStamp, previousElementTimestamp) currentTimeStamp } })
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。