当前位置:   article > 正文

任务二:使用Flink处理Kafka中的数据_数据采集与实时计算-任务二:使用flink处理kafka中的数据

数据采集与实时计算-任务二:使用flink处理kafka中的数据

                          陶运道  QQ:275931339

题目:

环境说明:

Flink任务在Yarn上用per job模式(即Job分离模式,不采用Session模式),方便Yarn回收资源。

注:与订单金额计算相关使用order_money字段,同一个订单无需多次重复计算,需要考虑退款或者取消的订单。

编写Scala代码,使用Flink消费Kafka中Topic为ods_mall_log和ods_mall_data的数据并进行相应的数据统计计算(使用ProcessTime)。

  1. 使用Flink消费Kafka中topic为ods_mall_data的数据,根据数据中不同的表将数据分别分发至kafka的DWD层的fact_order_master、fact_order_detail的Topic中(只获取data的内容,具体的内容格式考生请自查),其他的表则无需处理;

一、读题分析

涉及组件:Scala,Flink,Kafka,json

涉及知识点:Flink处理数据,json文件的处理

本题重点要搞清楚ods_mall_data的数据格式,本例是json格式

1.数据按行组织,每行一条记录包括表名:data

{"table":"ods_mall_log", "data":[{"wp_web_sk":"60","wp_web_page_id":"AAAAAAA","start_date":"2001-09-03"}]}

{"table":"order_master", "data":[{"wp_web__sk":"60","wp_web_page_id":"AAAAAAAAK","start_date":"2001-09-03"}]}

{"table":"order_detail", "data":[{"wp_web__sk":"60","wp_web_page_id":"AAAAAAAA","start_date":"2001-09-03"}]} 

{"table":"order_detail", "data":[{"wp_web__sk":"60","wp_web_page_id":"AAAAAA","start_date":"2001-09-03"}]}

{"table":"order_detail", "data":[{"order_detail_id":"60","name":"dtAAAA","id":"1"}]}

{"table":"order_detail", "data":[{"order_detail_id":"60","name":"dtBBBB","id":"2"}]}                 

{"table":"order_master", "data":[{"order_master_id":"60","name":"mtAAA","id ":"1"}]}

{"table":"order_master", "data":[{"order_master_id":"60","name":"mtBBBB","id ":"2"}]}

2.由于主题行数据很多,读入数据包含指定表数据到f中

val f=env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks[String], "kafka source")
      .filter(line => line.contains("order_master_id") || line.contains("order_detail_id"))
      .map(line => {
        import com.google.gson.JsonParser
        val jsonobj = new JsonParser().parse(line).getAsJsonObject
       // jsonobj.getAsJsonObject("data").toString
        jsonobj.get("data").toString
      })

  f是按data中是否包含字符串order_master_id与order_detail_id条件筛选出来的行

 假设原数据为

执行程序后就变成

{"order_detail_id":"60","name":"dtAAAA","id":"1"}]

[{"order_detail_id":"60","name":"dtBBBB","id":"2"}]

[{"order_master_id":"60","name":"mtAAA","id ":"1"}]

[{"order_master_id":"60","name":"mtBBBB","id ":"2"}]

注意取data数据,不能理解的是,不能根据表中作为选择条件。只能根据data中行中字段选择

3.利用skin将 f数据写到指定的主题中

这段程序关键要理解f与t关系 (上段程序获得行数据集 f就放到此处t),

val kafkaSink = KafkaSink.builder[String]
      .setBootstrapServers("master:9092")
      .setKafkaProducerConfig(properties)
      .setRecordSerializer(KafkaRecordSerializationSchema.builder[String] //.builder()
        .setTopicSelector(new TopicSelector[String] {
          override def apply(t: String): String = {  //t与f都是data数据
          // println(t)
            if (t.contains("order_master_id")) "fact_order_master"
            else if (t.contains("order_detail_id")) "fact_order_detail"
            else null
          }
        })
        .setValueSerializationSchema(new SimpleStringSchema)
        .build()).build()
 二、处理过程

  注意 1.数据在主题test中,而不是ods_mall_data中,以方便阅读

       2.不必事先建立主题fact_order_detail,fact_order_master

import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.scala._
import org.apache.flink.connector.kafka.sink.{KafkaRecordSerializationSchema, KafkaSink, TopicSelector}
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import java.util.Properties
object scala_1 {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment // 设置流执行环境
    import org.apache.flink.streaming.api.TimeCharacteristic
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) //设置使用处理时间
    env.setParallelism(1) // 设置并行度
    env.enableCheckpointing(5000)   // 启用检查点
    // kafka source
    val kafkaSource = KafkaSource.builder[String]
      .setBootstrapServers("master:9092")
      .setTopics("test")   //从主题aa
     // .setGroupId("group-test")
      .setStartingOffsets(OffsetsInitializer.earliest)
      .setValueOnlyDeserializer(new SimpleStringSchema)
      .build()
     // kafka sink
    val properties = new Properties()
    properties.setProperty("trans.timeout.ms", "7200000") // 2 hours
    // KafkaSink 允许将记录流写入一个或多个 Kafka 主题。
    val kafkaSink = KafkaSink.builder[String]
      .setBootstrapServers("master:9092")
      .setKafkaProducerConfig(properties)
      .setRecordSerializer(KafkaRecordSerializationSchema.builder[String] //.builder()
        .setTopicSelector(new TopicSelector[String] {
          override def apply(t: String): String = {  //t与f都是data数据
          // println(t)
            if (t.contains("order_master_id")) "fact_order_master"
            else if (t.contains("order_detail_id")) "fact_order_detail"
            else null
          }
        })
        .setValueSerializationSchema(new SimpleStringSchema)
        .build()).build()
    val f=env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks[String], "kafka source")
      .filter(line => line.contains("order_master") || line.contains("order_detail"))
      .map(line => {
        import com.google.gson.JsonParser
        val jsonobj = new JsonParser().parse(line).getAsJsonObject
       // jsonobj.getAsJsonObject("data").toString
        jsonobj.get("data").toString

      }) //取出包含order_master_id、order_detail_id每行data数据,即f有这两个表数据
      f.sinkTo(kafkaSink) //发至不同主题
      env.execute("Task1")
  }
}

三、重难点分析

1.主题是以行为内容的json格式,如下

例:{

     "username":"YWRtaW4=","password":"bGlhblNoaTIwMjA="

}

以下代码取出json指定字段的值

val jsonStr = "{\"username\":\"YWRtaW4=\",\"password\":\"bGlhblNoaTIwMjA=\"}";
 val jsonobj = new JsonParser().parse(jsonStr).getAsJsonObject
println(jsonobj)
val a=jsonobj.get("password") //取字段的值
println(a)

2. 取出每行data值代码

(2)val f = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks[String], "kafka source")
  .filter(line => line.contains("order_detail") || line.contains("order_master")) 只取这两种行
   .map(x => {
   import com.google.gson.JsonParser
   val jsonobj = new JsonParser().parse(x).getAsJsonObject
   jsonobj.get("data")  //取每行中data值
 })

3.熟练使用kafka以下命令

kafka-topics.sh --list --bootstrap-server  master:9092   列主题

kafka-topics.sh --delete --bootstrap-server master:9092  --topic test  删除主题

kafka-topics .sh -- create  --bootstrap-server  master:9092   --topic topic

kafka-console-producer.sh --broker-list master:9092 --topic testc  生产消息

kafka-console-consumer.sh --bootstrap-server master:9092 --from-beginning --topic test

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/514532
推荐阅读
相关标签
  

闽ICP备14008679号