当前位置:   article > 正文

大数据职业技能大赛样题(数据采集与实时计算:使用Flink处理Kafka中的数据)_编写scala代码,使用flink消费kafka中topic为order的数据并进行相应的数据统计计

编写scala代码,使用flink消费kafka中topic为order的数据并进行相应的数据统计计算

       编写Scala代码,使用Flink消费Kafka中Topic为order的数据并进行相应的数据统计计算(订单信息对应表结构order_info,订单详细信息对应表结构order_detail(来源类型和来源编号这两个字段不考虑,所以在实时数据中不会出现),同时计算中使用order_info或order_detail表中create_time或operate_time取两者中值较大者作为EventTime,若operate_time为空值或无此列,则使用create_time填充,允许数据延迟5s,订单状态分别为1001:创建订单、1002:支付订单、1003:取消订单、1004:完成订单、1005:申请退回、1006:退回完成。另外对于数据结果展示时,不要采用例如:1.9786518E7的科学计数法)。

  1. 使用Flink消费Kafka中的数据,统计商城实时订单实收金额(需要考虑订单状态,若有取消订单、申请退回、退回完成则不计入订单实收金额,其他状态的则累加),将key设置成totalprice存入Redis中。使用redis cli以get key方式获取totalprice值,将结果截图粘贴至客户端桌面【Release\任务D提交结果.docx】中对应的任务序号下,需两次截图,第一次截图和第二次截图间隔1分钟以上,第一次截图放前面,第二次截图放后面;
  2. 在任务1进行的同时,使用侧边流,监控若发现order_status字段为退回完成, 将key设置成totalrefundordercount存入Redis中,value存放用户退款消费额。使用redis cli以get key方式获取totalrefundordercount值,将结果截图粘贴至客户端桌面【Release\任务D提交结果.docx】中对应的任务序号下,需两次截图,第一次截图和第二次截图间隔1分钟以上,第一次截图放前面,第二次截图放后面;
  3. 在任务1进行的同时,使用侧边流,监控若发现order_status字段为取消订单,将数据存入MySQL数据库shtd_result的order_info表中,然后在Linux的MySQL命令行中根据id降序排序,查询列id、consignee、consignee_tel、final_total_amount、feight_fee,查询出前5条,将SQL语句复制粘贴至客户端桌面【Release\任务D提交结果.docx】中对应的任务序号下,将执行结果截图粘贴至客户端桌面【Release\任务D提交结果.docx】中对应的任务序号下。

使用Flink处理Kafka中的数据

  1. package module_d
  2. import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
  3. import org.apache.flink.api.common.serialization.SimpleStringSchema
  4. import org.apache.flink.configuration.Configuration
  5. import org.apache.flink.streaming.api.functions.ProcessFunction
  6. import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
  7. import org.apache.flink.streaming.api.scala.{OutputTag, StreamExecutionEnvironment}
  8. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
  9. import org.apache.flink.streaming.connectors.redis.RedisSink
  10. import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
  11. import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
  12. import org.apache.flink.util.Collector
  13. import org.apache.flink.streaming.api.scala._
  14. import java.sql.{Connection, DriverManager, PreparedStatement}
  15. import java.text.{DecimalFormat, SimpleDateFormat}
  16. import java.time.Duration
  17. import java.util.Properties
  18. /**
  19. * 编写Scala代码,使用Flink消费Kafka中Topic为order的数据并进行相应的数据统计计算(订单信息对应表结构order_info,订单详细信息对应表结构order_detail(来源类型和来源编号这两个字段不考虑,所以在实时数据中不会出现),同时计算中使用order_info或order_detail表中create_time或operate_time取两者中值较大者作为EventTime,若operate_time为空值或无此属性,则使用create_time填充,允许数据延迟5S,订单状态分别为1001:创建订单、1002:支付订单、1003:取消订单、1004:完成订单、1005:申请退回、1006:退回完成。另外对于数据结果展示时,不要采用例如:1.9786518E7的科学计数法)。
  20. */
  21. object task1 {
  22. /**
  23. * 一个流分成四个流
  24. */
  25. lazy val statusother: OutputTag[String] = new OutputTag[String]("other")
  26. lazy val status1003: OutputTag[String] = new OutputTag[String]("s1003")
  27. lazy val status1005: OutputTag[String] = new OutputTag[String]("s1005")
  28. lazy val status1006: OutputTag[String] = new OutputTag[String]("s1006")
  29. def main(args: Array[String]): Unit = {
  30. /**
  31. * 1、使用Flink消费Kafka中的数据,统计商城实时订单实收金额(需要考虑订单状态,若有取消订单、申请退回、退回完成则不计入订单实收金额,其他状态的则累加),将key设置成totalprice存入Redis中。使用redis cli以get key方式获取totalprice值,将结果截图粘贴至对应报告中,需两次截图,第一次截图和第二次截图间隔1分钟以上,第一次截图放前面,第二次截图放后面;
  32. */
  33. val env = StreamExecutionEnvironment.getExecutionEnvironment
  34. env.setParallelism(1) //并行度
  35. //Kafka配置
  36. val properties = new Properties()
  37. properties.setProperty("bootstrap.servers", "ngc:9092") //集群地址
  38. properties.setProperty("group.id", "g1") //消费者组
  39. //原始流
  40. val stream = env.addSource(new FlinkKafkaConsumer[String]("order1", new SimpleStringSchema(), properties).setStartFromLatest())
  41. .assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness[String](Duration.ofSeconds(5))//允许数据延迟5S
  42. .withTimestampAssigner(
  43. new SerializableTimestampAssigner[String] {
  44. override def extractTimestamp(t: String, l: Long): Long = {
  45. val sdf = new SimpleDateFormat("dd/MM/yyyy HH:mm:ss")
  46. if (t.split(",")(11).equals("")) { //如果operate_time为空
  47. sdf.parse(t.split(",")(10)).getTime
  48. } else {
  49. val create_time = sdf.parse(t.split(",")(10)).getTime
  50. val operate_time = sdf.parse(t.split(",")(11)).getTime
  51. math.max(create_time, operate_time)
  52. }
  53. }
  54. }
  55. ))
  56. //设置自定义侧边流
  57. val streamProcess = stream.process(new MdSplitProcessFunction)
  58. /**
  59. * 1、使用Flink消费Kafka中的数据,统计商城实时订单实收金额(需要考虑订单状态,若有取消订单、申请退回、
  60. * 退回完成则不计入订单实收金额,其他状态的则累加),将key设置成totalprice存入Redis中。使用redis
  61. * cli以get key方式获取totalprice值,将结果截图粘贴至对应报告中,需两次截图,第一次截图和第二次截图
  62. * 间隔1分钟以上,第一次截图放前面,第二次截图放后面;
  63. */
  64. val ds1 = streamProcess
  65. .getSideOutput(statusother)
  66. .map(line => line.split(",")(3).toDouble)
  67. .keyBy(_ => true) //聚合到一起
  68. .sum(0)
  69. .map(n=>new DecimalFormat("#.#").format(n))
  70. //redis配置
  71. val conf = new FlinkJedisPoolConfig.Builder()
  72. .setHost("ngc")
  73. .setPort(6378)
  74. .setPassword("123456")
  75. .build()
  76. ds1.addSink(new RedisSink[String](conf, new MyRedisMapper("totalcount")))
  77. /**
  78. * 2、在任务1进行的同时,使用侧边流,监控若发现order_status字段为退回完成, 将key设置成totalrefundordercount存入Redis中,value存放用户退款消费额。使用redis cli以get key方式获取totalrefundordercount值,将结果截图粘贴至对应报告中,需两次截图,第一次截图和第二次截图间隔1分钟以上,第一次截图放前面,第二次截图放后面;
  79. */
  80. val ds2 = streamProcess
  81. .getSideOutput(status1006)
  82. .map(line => line.split(",")(3).toDouble)
  83. .keyBy(_ => true) //聚合到一起
  84. .sum(0)
  85. .map(n=>new DecimalFormat("#.#").format(n))
  86. ds2.addSink(new RedisSink[String](conf, new MyRedisMapper("totalrefundordercount")))
  87. /**
  88. * 3、在任务1进行的同时,使用侧边流,监控若发现order_status字段为取消订单,将数据存入MySQL数据库shtd_result的order_info表中,然后在Linux的MySQL命令行中根据id降序排序,查询列id、consignee、consignee_tel、final_total_amount、feight_fee,查询出前5条,将SQL语句与执行结果截图粘贴至对应报告中。
  89. */
  90. val ds3 = streamProcess
  91. .getSideOutput(status1003)
  92. ds3.addSink(new RichSinkFunction[String] {
  93. var conn: Connection = _
  94. var insertStmt: PreparedStatement = _
  95. override def open(parameters: Configuration): Unit = {
  96. conn = DriverManager.getConnection("jdbc:mysql://ngc:3307/shtd_result?useSSL=false", "root", "123456")
  97. insertStmt = conn.prepareStatement("insert into order_info (id,consignee,consignee_tel,final_total_amount,feight_fee) values (?,?,?,?,?)")
  98. }
  99. override def close(): Unit = {
  100. insertStmt.close()
  101. conn.close()
  102. }
  103. override def invoke(value: String, context: SinkFunction.Context): Unit = {
  104. val arr = value.split(",")
  105. insertStmt.setString(1, arr(0))
  106. insertStmt.setString(2, arr(1))
  107. insertStmt.setString(3, arr(2))
  108. insertStmt.setString(4, arr(3))
  109. insertStmt.setString(5, arr(19))
  110. insertStmt.execute()
  111. }
  112. })
  113. ds1.print()
  114. ds2.print()
  115. env.execute("kafka sink test")
  116. }
  117. /**
  118. * 自定义侧边流配置
  119. */
  120. class MdSplitProcessFunction extends ProcessFunction[String, String] {
  121. override def processElement(value: String, ctx: ProcessFunction[String, String]#Context, out: Collector[String]): Unit = {
  122. val line = value.split(",")
  123. /**
  124. * 订单状态order_status分别为1001:创建订单、1002:支付订单、1003:取消订单、1004:完成订单、1005:申请退回、1006:退回完成。
  125. */
  126. if (line(4).equals("1003")) {
  127. ctx.output(status1003, value)
  128. } else if (line(4).equals("1005")) {
  129. ctx.output(status1005, value)
  130. } else if (line(4).equals("1006")) {
  131. ctx.output(status1006, value)
  132. } else {
  133. ctx.output(statusother, value)
  134. }
  135. }
  136. }
  137. /**
  138. * Redis key——value存储 也可用RichSinkFunction建立Redis
  139. */
  140. class MyRedisMapper(key: String) extends RedisMapper[String] {
  141. override def getCommandDescription: RedisCommandDescription = new RedisCommandDescription(RedisCommand.SET)
  142. override def getValueFromData(data: String): String = data
  143. override def getKeyFromData(data: String): String = key
  144. }
  145. }
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/514528
推荐阅读
相关标签
  

闽ICP备14008679号