赞
踩
陶运道 QQ:275931339
题目:
环境说明:
Flink任务在Yarn上用per job模式(即Job分离模式,不采用Session模式),方便Yarn回收资源。 |
注:与订单金额计算相关使用order_money字段,同一个订单无需多次重复计算,需要考虑退款或者取消的订单。
编写Scala代码,使用Flink消费Kafka中Topic为ods_mall_log和ods_mall_data的数据并进行相应的数据统计计算(使用ProcessTime)。
一、读题分析
涉及组件:Scala,Flink,Kafka,json
涉及知识点:Flink处理数据,json文件的处理
本题重点要搞清楚ods_mall_data的数据格式,本例是json格式
1.数据按行组织,每行一条记录包括表名:data
{"table":"ods_mall_log", "data":[{"wp_web_sk":"60","wp_web_page_id":"AAAAAAA","start_date":"2001-09-03"}]}
{"table":"order_master", "data":[{"wp_web__sk":"60","wp_web_page_id":"AAAAAAAAK","start_date":"2001-09-03"}]}
{"table":"order_detail", "data":[{"wp_web__sk":"60","wp_web_page_id":"AAAAAAAA","start_date":"2001-09-03"}]}
{"table":"order_detail", "data":[{"wp_web__sk":"60","wp_web_page_id":"AAAAAA","start_date":"2001-09-03"}]}
{"table":"order_detail", "data":[{"order_detail_id":"60","name":"dtAAAA","id":"1"}]}
{"table":"order_detail", "data":[{"order_detail_id":"60","name":"dtBBBB","id":"2"}]}
{"table":"order_master", "data":[{"order_master_id":"60","name":"mtAAA","id ":"1"}]}
{"table":"order_master", "data":[{"order_master_id":"60","name":"mtBBBB","id ":"2"}]}
2.由于主题行数据很多,读入数据包含指定表数据到f中
val f=env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks[String], "kafka source")
.filter(line => line.contains("order_master_id") || line.contains("order_detail_id"))
.map(line => {
import com.google.gson.JsonParser
val jsonobj = new JsonParser().parse(line).getAsJsonObject
// jsonobj.getAsJsonObject("data").toString
jsonobj.get("data").toString
})
f是按data中是否包含字符串order_master_id与order_detail_id条件筛选出来的行
假设原数据为
执行程序后就变成
{"order_detail_id":"60","name":"dtAAAA","id":"1"}]
[{"order_detail_id":"60","name":"dtBBBB","id":"2"}]
[{"order_master_id":"60","name":"mtAAA","id ":"1"}]
[{"order_master_id":"60","name":"mtBBBB","id ":"2"}]
注意取data数据,不能理解的是,不能根据表中作为选择条件。只能根据data中行中字段选择
3.利用skin将 f数据写到指定的主题中
这段程序关键要理解f与t关系 (上段程序获得行数据集 f就放到此处t),
val kafkaSink = KafkaSink.builder[String]
.setBootstrapServers("master:9092")
.setKafkaProducerConfig(properties)
.setRecordSerializer(KafkaRecordSerializationSchema.builder[String] //.builder()
.setTopicSelector(new TopicSelector[String] {
override def apply(t: String): String = { //t与f都是data数据
// println(t)
if (t.contains("order_master_id")) "fact_order_master"
else if (t.contains("order_detail_id")) "fact_order_detail"
else null
}
})
.setValueSerializationSchema(new SimpleStringSchema)
.build()).build()
二、处理过程
注意 1.数据在主题test中,而不是ods_mall_data中,以方便阅读
2.不必事先建立主题fact_order_detail,fact_order_master
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.scala._
import org.apache.flink.connector.kafka.sink.{KafkaRecordSerializationSchema, KafkaSink, TopicSelector}
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import java.util.Properties
object scala_1 {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment // 设置流执行环境
import org.apache.flink.streaming.api.TimeCharacteristic
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) //设置使用处理时间
env.setParallelism(1) // 设置并行度
env.enableCheckpointing(5000) // 启用检查点
// kafka source
val kafkaSource = KafkaSource.builder[String]
.setBootstrapServers("master:9092")
.setTopics("test") //从主题aa
// .setGroupId("group-test")
.setStartingOffsets(OffsetsInitializer.earliest)
.setValueOnlyDeserializer(new SimpleStringSchema)
.build()
// kafka sink
val properties = new Properties()
properties.setProperty("trans.timeout.ms", "7200000") // 2 hours
// KafkaSink 允许将记录流写入一个或多个 Kafka 主题。
val kafkaSink = KafkaSink.builder[String]
.setBootstrapServers("master:9092")
.setKafkaProducerConfig(properties)
.setRecordSerializer(KafkaRecordSerializationSchema.builder[String] //.builder()
.setTopicSelector(new TopicSelector[String] {
override def apply(t: String): String = { //t与f都是data数据
// println(t)
if (t.contains("order_master_id")) "fact_order_master"
else if (t.contains("order_detail_id")) "fact_order_detail"
else null
}
})
.setValueSerializationSchema(new SimpleStringSchema)
.build()).build()
val f=env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks[String], "kafka source")
.filter(line => line.contains("order_master") || line.contains("order_detail"))
.map(line => {
import com.google.gson.JsonParser
val jsonobj = new JsonParser().parse(line).getAsJsonObject
// jsonobj.getAsJsonObject("data").toString
jsonobj.get("data").toString
}) //取出包含order_master_id、order_detail_id每行data数据,即f有这两个表数据
f.sinkTo(kafkaSink) //发至不同主题
env.execute("Task1")
}
}
三、重难点分析
1.主题是以行为内容的json格式,如下
例:{
"username":"YWRtaW4=","password":"bGlhblNoaTIwMjA="
}
以下代码取出json指定字段的值
val jsonStr = "{\"username\":\"YWRtaW4=\",\"password\":\"bGlhblNoaTIwMjA=\"}";
val jsonobj = new JsonParser().parse(jsonStr).getAsJsonObject
println(jsonobj)
val a=jsonobj.get("password") //取字段的值
println(a)
2. 取出每行data值代码
(2)val f = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks[String], "kafka source")
.filter(line => line.contains("order_detail") || line.contains("order_master")) 只取这两种行
.map(x => {
import com.google.gson.JsonParser
val jsonobj = new JsonParser().parse(x).getAsJsonObject
jsonobj.get("data") //取每行中data值
})
3.熟练使用kafka以下命令
kafka-topics.sh --list --bootstrap-server master:9092 列主题
kafka-topics.sh --delete --bootstrap-server master:9092 --topic test 删除主题
kafka-topics .sh -- create --bootstrap-server master:9092 --topic topic
kafka-console-producer.sh --broker-list master:9092 --topic testc 生产消息
kafka-console-consumer.sh --bootstrap-server master:9092 --from-beginning --topic test
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。