当前位置:   article > 正文

Flink 实时项目_实时flink项目

实时flink项目

1. 项目架构图:

第一步给Flink 添加CheckPoint

  1. val env = StreamExecutionEnvironment.getExecutionEnvironment
  2. // 设置流式时间为EventTime
  3. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  4. // 设置并行度为1
  5. env.setParallelism(1)
  6. // 测试一下
  7. env.fromCollection(List("hadoop", "hive")).print()
  8. /**
  9. * **********************添加 Checkpoint********************************
  10. * 保证程序长时间运行的安全性进行checkpoint操作
  11. */
  12. // 5秒钟启动一次checkpoint
  13. env.enableCheckpointing(5000)
  14. // 设置checkpoint 只 checkpoint 一次
  15. env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
  16. // 设置两次checkpoint 的最小时间间隔
  17. env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1000)
  18. // checkpoint 的超时时间
  19. env.getCheckpointConfig.setCheckpointTimeout(60000)
  20. // 允许的最大checkpoint 的并行度
  21. env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1)
  22. // 当程序关闭时, 触发额外的checkpoint
  23. env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
  24. // 设置checkpoint 的地址
  25. env.setStateBackend(new FsStateBackend("hdfs://node01.com:8020/fink-checkpoint/"))

2.第二步给flink 开始整合kafka

主要就是配置一些关于kafka 的参数等信息

  1. /**
  2. * ***************************flink 整合kafka*******************************
  3. */
  4. val properties = new Properties()
  5. // # Kafka集群地址
  6. properties.setProperty("bootstrap.servers", GlobalConfigUtil.bootstrapServers)
  7. // # ZooKeeper集群地址
  8. properties.setProperty("zookeeper.connect", GlobalConfigUtil.zookeeperConnect)
  9. // # Kafka Topic名称
  10. properties.setProperty("input.topic", GlobalConfigUtil.inputTopic)
  11. // # 消费组ID
  12. properties.setProperty("group.id", GlobalConfigUtil.groupId)
  13. // # 自动提交拉取到消费端的消息offset到kafka
  14. properties.setProperty("enable.auto.commit", GlobalConfigUtil.enableAutoCommit)
  15. // # 自动提交offset到zookeeper的时间间隔单位(毫秒)
  16. properties.setProperty("auto.commit.interval.ms", GlobalConfigUtil.autoCommitIntervalMs)
  17. // # 每次消费最新的数据
  18. properties.setProperty("auto.offset.reset", GlobalConfigUtil.autoOffsetReset)
  19. // 反序列化器 属性集合
  20. val consumer = new FlinkKafkaConsumer010[String](GlobalConfigUtil.inputTopic, new SimpleStringSchema(), properties)
  21. val kafkaStream: DataStream[String] = env.addSource(consumer)
  22. // kafkaStream.print()

3. 第三部将  获取到的json转化为元祖

  1. // 将json 转化为元组
  2. val tunlpDataStream = kafkaStream.map {
  3. msgjson =>
  4. val jsonObject = JSON.parseObject(msgjson)
  5. val message = jsonObject.getString("message")
  6. val count = jsonObject.getLong("count")
  7. val timeStamp = jsonObject.getLong("timeStamp")
  8. Message(ClickLog(message), count, timeStamp)
  9. }
  10. tunlpDataStream.print()

4,第四部添加水印,的支持

  1. /**
  2. * ------------------------------- Flink 添加水印的支持----------------------------------
  3. */
  4. val watemarkDataStream = tunlpDataStream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[Message] {
  5. // 当前时间搓
  6. var currentTimeStamp = 0l
  7. // 延迟时间
  8. var maxDelayTime = 2000l
  9. //获取当前的时间戳
  10. override def getCurrentWatermark: Watermark = {
  11. new Watermark(currentTimeStamp - maxDelayTime)
  12. }
  13. //获取事件的时间
  14. override def extractTimestamp(element: Message, previousElementTimestamp: Long): Long = {
  15. currentTimeStamp = Math.max(element.timeStamp, previousElementTimestamp)
  16. currentTimeStamp
  17. }
  18. })

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/597945
推荐阅读
相关标签
  

闽ICP备14008679号