当前位置:   article > 正文

大数据之使用Flink处理Kafka中的数据到Redis_使用flink消费kafka中changerecord主题的数据,实时统计每个设备从其他状态转变为

使用flink消费kafka中changerecord主题的数据,实时统计每个设备从其他状态转变为


前言

本题来源于全国职业技能大赛之大数据技术赛项赛题(其他暂不透露)

题目:使用Flink消费Kafka中ProduceRecord主题的数据,统计在已经检验的产品中,各设备每五分钟生产产品总数,将结果存入Redis中,key值为“totalproduce”,value值为“设备id,最近五分钟生产总数”。

注:ProduceRecord主题,生产一个产品产生一条数据;

change_handle_state字段为1代表已经检验,0代表未检验;

时间语义使用Processing Time。


提示:以下是本篇文章正文内容,下面案例可供参考(使用Scala语言编写)

一、读题分析

涉及组件:Flink,Kafka,Redis

涉及知识点:

1.Flink消费Kafka中的数据

2.Flink将数据存入到Redis数据库中

3.Flink时间窗口的概念和使用(难点)

4.FlinkSQL算子的使用

二、使用步骤

1.导入配置文件到pom.xml

  1. <!--flink连接kafka配置-->
  2. <dependency>
  3. <groupId>org.apache.flink</groupId>
  4. <artifactId>flink-connector-kafka_2.11</artifactId>
  5. <version>1.14.0</version>
  6. </dependency>
  7. <!--配置redis链接-->
  8. <dependency>
  9. <groupId>org.apache.bahir</groupId>
  10. <artifactId>flink-connector-redis_2.12</artifactId>
  11. <version>1.1.0</version>
  12. </dependency>

2.代码部分

直接上代码,代码如下(示例):

  1. package C.dataAndCalculation.shtd_industry.tasl2_FlinkDealKafka
  2. import org.apache.flink.api.common.serialization.SimpleStringSchema
  3. import org.apache.flink.streaming.api.TimeCharacteristic
  4. import org.apache.flink.streaming.api.scala._
  5. import org.apache.flink.streaming.api.windowing.time.Time
  6. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
  7. import org.apache.flink.streaming.connectors.redis.RedisSink
  8. import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
  9. import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
  10. import java.util.Properties
  11. object FlinkToKafkaRedis {
  12. def main(args: Array[String]): Unit = {
  13. //创建FLink流执行环境
  14. val env = StreamExecutionEnvironment.getExecutionEnvironment
  15. //设置并行度
  16. env.setParallelism(1)
  17. //指定时间语义
  18. env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
  19. //Kafka的配置
  20. val properties = new Properties()
  21. properties.setProperty("bootstrap.servers", "bigdata1:9092,bigdata2:9092,bigdata3:9092")
  22. properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
  23. properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringSerializer")
  24. properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
  25. properties.setProperty("auto.offset.reset", "earliest")
  26. //读取Kafka数据
  27. val kafkaStream: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String]("ProduceRecord", new
  28. SimpleStringSchema(), properties))
  29. //使用flink算子对数据进行处理
  30. val dateStream = kafkaStream
  31. .map(line => {
  32. val data = line.split(",")
  33. (data(1).toInt, data(9).toInt)
  34. })
  35. .filter(_._2 == 1)
  36. .keyBy(_._1)
  37. .timeWindow(Time.minutes(1))
  38. .sum(1)
  39. //打印做测试
  40. dateStream.print("ds")
  41. //连接Redis数据库的配置
  42. val config: FlinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder()
  43. .setHost("bigdata1")
  44. .setPort(6379)
  45. .build()
  46. // 创建RedisSink对象,并将数据写入Redis
  47. val redisSink = new RedisSink[(Int, Int)](config, new MyRedisMapper)
  48. // 发送数据
  49. dateStream.addSink(redisSink)
  50. //执行Flink程序
  51. env.execute("FlinkToKafkaToRedis")
  52. }
  53. // 根据题目要求
  54. class MyRedisMapper extends RedisMapper[(Int, Int)] {
  55. //这里使用RedisCommand.HSET不用RedisCommand.SET,前者创建RedisHash表后者创建Redis普通的String对应表
  56. override def getCommandDescription: RedisCommandDescription = new RedisCommandDescription(RedisCommand.HSET,
  57. "totalproduce")
  58. override def getKeyFromData(t: (Int, Int)): String = t._1 + ""
  59. override def getValueFromData(t: (Int, Int)): String = t._2 + ""
  60. }
  61. }

三、重难点分析

  1. //使用flink算子对数据进行处理
  2. val dateStream = kafkaStream
  3. .map(line => {
  4. val data = line.split(",")
  5. (data(1).toInt, data(9).toInt)
  6. })
  7. .filter(_._2 == 1)
  8. .keyBy(_._1)
  9. .timeWindow(Time.minutes(1))
  10. .sum(1)

从Kafka读取ProduceRecord的数据,格式如下:

20,116,0009,2023-03-16 15:43:01,2023-03-16 15:43:09,2023-03-16 15:43:15,20770,1900-01-01 00:00:00,184362,0

21,110,0006,2023-03-16 15:42:43,2023-03-16 15:43:13,2023-03-16 15:43:17,12794,1900-01-01 00:00:00,183215,0

22,111,0003,2023-03-16 15:42:39,2023-03-16 15:43:11,2023-03-16 15:43:19,21168,1900-01-01 00:00:00,180754,1

23,116,00010,2023-03-16 15:43:15,2023-03-16 15:43:18,2023-03-16 15:43:22,20464,1900-01-01 00:00:00,185938,0

24,116,0002,2023-03-16 15:43:22,2023-03-16 15:43:21,2023-03-16 15:43:24,18414,1900-01-01 00:00:00,188880,1

25,114,00010,2023-03-16 15:42:47,2023-03-16 15:43:18,2023-03-16 15:43:26,25280,1900-01-01 00:00:00,186866,1

26,117,0003,2023-03-16 15:42:53,2023-03-16 15:43:24,2023-03-16 15:43:28,10423,1900-01-01 00:00:00,183201,1

首先从Kafka提取到数据后是流数据,我们需要使用DatastreamAPI相关的算子进行数据处理,

1.对每一条数据进行map转换,目的就是提取到我们需要的数据。在这里使用了lambda表达式,也可以自己写一个类继承MapFunction(这里不做演示)。

2.使用filter过滤题目中“为1代表已经检验,0代表未检验”

3.使用keyby对数据进行分组操作,此时数据的类型是keyedStream,按照设备ID进行分组。

4.使用timeWindow前必须要进行keyby操作,本身就是keyedStream中的方法。根据题目“各设备每五分钟生产产品总数”使用时间窗口函数。

        注:这里的Time方法的包必须是

        org.apache.flink.streaming.api.windowing.time.Time

        否则无效,并且这里还需要指定时间语义题目中有给,在env设置指定时间语义

        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

5.这里类型是windowdStream,最后对数据进行聚合操作,转变为DataStream

总结

本文仅仅介绍了Flink对Kafka中的数据提取进行一系列转换存入到Redis的操作,题目不难,难的是能否熟练使用Flink的算子和对时间窗口概念的理解。

---最后附上导入到Redis数据库的图---

如转载请标明出处

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/587261
推荐阅读
相关标签
  

闽ICP备14008679号