当前位置:   article > 正文

Kafka与Spark:整合练习题_spark kafka练习

spark kafka练习

目录

前言:

一:题目:

 二:原始数据

答案如下:


前言:

  1. 需求: 查询出微博会员等级为5的用户,并把这些数据写入到mysql数据库中的vip_rank表中
  2. 查询出评论赞的个数在10个以上的数据,并写入到mysql数据库中的like_status表中
  3. 分别计算出2018/10/20 ,2018/10/21,2018/10/22,2018/10/23这四天每一天的评论数是多少,并写入到mysql数据库中的count_conmment表中

使用Java对数据预处理,把有效数据落入到Kafka,使用SparkStreaming实时消费Kafka并计算,把计算结果实时落入到Mysql。

一:题目:

1、以下是RNG S8 8强赛失败后,官微发表道歉微博下一级评论

数据说明:

rng_comment.txt文件中的数据

字段

字段含义

index

数据id

child_comment

回复数量

comment_time

评论时间

content

评论内容

da_v

微博个人认证

like_status

pic

图片评论url

user_id

微博用户id

user_name

微博用户名

vip_rank

微博会员等级

stamp

时间戳

1.1、在kafak中创建rng_comment主题,设置2个分区2个副本

1.2、数据预处理,把空行过滤掉

1.3、请把给出的文件写入到kafka中,根据数据id进行分区,id为奇数的发送到一个分区中,偶数的发送到另一个分区

1.4、使用Spark Streaming对接kafka

1.5、使用Spark Streaming对接kafka之后进行计算

在mysql中创建一个数据库rng_comment

在数据库rng_comment创建vip_rank表,字段为数据的所有字段

在数据库rng_comment创建like_status表,字段为数据的所有字段

在数据库rng_comment创建count_conmment表,字段为 时间,条数

       1.5.1、查询出微博会员等级为5的用户,并把这些数据写入到mysql数据库中的vip_rank表中

       1.5.2、查询出评论赞的个数在10个以上的数据,并写入到mysql数据库中的like_status表中

       1.5.3、分别计算出2018/10/20 ,2018/10/21,2018/10/22,2018/10/23这四天每一天的评论数是多少,并写入到mysql数据库中的count_conmment表中

 二:原始数据

点击【免费下载数据

答案如下:

1、在kafak中创建rng_comment主题,设置2个分区2个副本

  • 启动Zookeeper集群【启动zk
  • 启动Kafka集群【启动kafka
  • 创建 rng_comment Topic  设置2个分区2个副本
  • bin/kafka-topics.sh --create --zookeeper 主机名01:2181,主机名02:2181,主机名03:2181 --replication-factor 2 --partitions 2 --topic rng_comment
    
  • 查看 Topic
  • bin/kafka-topics.sh  --list --zookeeper 主机名01:2181,主机名02:2181,主机名03:2181
    

     

 2、数据预处理,把空行过滤掉并按照ID进行分区奇数1号分区,偶数0号分区落入到Kafka集群

  1. public static void main(String[] args) throws Exception {
  2. //1、配置kafka集群
  3. Properties props = new Properties();
  4. //kafka服务器地址
  5. props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092");
  6. //消息确认机制
  7. props.put("acks", "all");
  8. //重试机制
  9. props.put("retries", 1);
  10. //批量发送的大小
  11. props.put("batch.size", 16384);
  12. //消息延迟
  13. props.put("linger.ms", 1);
  14. //批量的缓冲区大小
  15. props.put("buffer.memory", 33554432);
  16. //kafka数据中key value的序列化
  17. props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  18. props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  19. //设置自定义分区
  20. props.put("partitioner.class", "day25.zuoye.MyPartition");
  21. //创建Kafka生产者对象
  22. KafkaProducer<String, String> producer = new KafkaProducer<>(props);
  23. //使用 java缓冲流读取文本文件数据
  24. BufferedReader bufferedReader = new BufferedReader(new FileReader(new File("D:\\第四学期\\网课-上课资料\\05-Spark\\Spark题库\\4.15号练习题\\rng_comment.txt")));
  25. //循环读取每一行数据
  26. String line = "";
  27. while ((line = bufferedReader.readLine()) != null) {
  28. //判断当前行数据是否为空 不为空则落入到Kafka集群
  29. if (!line.trim().equals("")) {
  30. //发送数据到 rng_comment
  31. producer.send(new ProducerRecord("rng_comment", line));
  32. //打印查看数据
  33. System.out.println(line);
  34. //睡眠等待一秒钟
  35. Thread.sleep(1000);
  36. }
  37. }
  38. //释放资源
  39. producer.close();
  40. bufferedReader.close();
  41. }

自定义分区规则

  1. public class MyPartition implements Partitioner {
  2. @Override
  3. public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
  4. //获取到落入Kafka的一行数据 按照tab 切割 获取到id 用ID 与2取余
  5. String id = value.toString().split("\t")[0];
  6. if (Integer.parseInt(id) % 2 == 0) {
  7. //偶数 放到 0号分区
  8. return 0;
  9. } else {
  10. //奇数 放到 1号分区
  11. return 1;
  12. }
  13. }
  14. @Override
  15. public void close() {
  16. }
  17. @Override
  18. public void configure(Map<String, ?> configs) {
  19. }
  20. }

 

3、创建三张数据库表 vip_rank 、like_status  (前两张表字段一样)与count_conmment

  1. create table vip_rank
  2. (
  3. indexx varchar(255) not null
  4. primary key,
  5. child_comment varchar(255) not null,
  6. comment_time varchar(255) not null,
  7. content varchar(255) not null,
  8. da_v varchar(255) not null,
  9. like_status varchar(255) not null,
  10. pic varchar(255) not null,
  11. user_id varchar(255) not null,
  12. user_name varchar(255) not null,
  13. vip_rank varchar(255) not null,
  14. stamp varchar(255) not null
  15. );
  1. create table like_status
  2. (
  3. indexx varchar(255) not null
  4. primary key,
  5. child_comment varchar(255) not null,
  6. comment_time varchar(255) not null,
  7. content varchar(255) not null,
  8. da_v varchar(255) not null,
  9. like_status varchar(255) not null,
  10. pic varchar(255) not null,
  11. user_id varchar(255) not null,
  12. user_name varchar(255) not null,
  13. vip_rank varchar(255) not null,
  14. stamp varchar(255) not null
  15. );
  1. create table count_conmment
  2. (
  3. id varchar(100) not null,
  4. comment_time varchar(255) not null
  5. primary key,
  6. comment_time_count bigint(100) null
  7. );

 

4、使用SparkStreaming消费Kafka数据,计算数据并实时落入Mysql数据库

  1. import java.sql.{Connection, DriverManager, PreparedStatement}
  2. import org.apache.kafka.clients.consumer.ConsumerRecord
  3. import org.apache.kafka.common.serialization.StringDeserializer
  4. import org.apache.spark.SparkContext
  5. import org.apache.spark.rdd.RDD
  6. import org.apache.spark.sql.SparkSession
  7. import org.apache.spark.streaming.dstream.{DStream, InputDStream}
  8. import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
  9. import org.apache.spark.streaming.{Seconds, StreamingContext}
  10. def main(args: Array[String]): Unit = {
  11. //创建 Sparksql 对象
  12. val spark: SparkSession = SparkSession.builder().master("local[*]").appName("ConsumerSpakStreaming").getOrCreate()
  13. // 使用SparkSQL 创建 Spark 上下文对象
  14. val sc: SparkContext = spark.sparkContext
  15. //设置日志级别为警告
  16. sc.setLogLevel("WARN")
  17. // 创建 SparkStreaming 上下文对象 并指定
  18. val ssc: StreamingContext = new StreamingContext(sc, Seconds(2))
  19. //准备连接Kafka的参数
  20. val kafkaParams = Map[String, Object](
  21. "bootstrap.servers" -> "node01:9092,node02:9092,node03:9092",
  22. "key.deserializer" -> classOf[StringDeserializer],
  23. "value.deserializer" -> classOf[StringDeserializer],
  24. "group.id" -> "SparkKafkaDemo",
  25. //earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
  26. //latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
  27. //none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
  28. //这里配置latest自动重置偏移量为最新的偏移量,即如果有偏移量从偏移量位置开始消费,没有偏移量从新来的数据开始消费
  29. "auto.offset.reset" -> "latest",
  30. //false表示关闭自动提交.由spark帮你提交到Checkpoint或程序员手动维护
  31. "enable.auto.commit" -> (false: java.lang.Boolean))
  32. // 使用 kafka 0.10 消费 指定topic 的数据
  33. val kafkaConsumerSumDatas: InputDStream[ConsumerRecord[String, String]] =
  34. KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent,
  35. ConsumerStrategies.Subscribe[String, String](Array("rng_comment"), kafkaParams))
  36. //1.5.1、查询出微博会员等级为5的用户,并把这些数据写入到mysql数据库中的vip_rank表中
  37. fiveDatas(kafkaConsumerSumDatas)
  38. // 1.5.2、查询出评论赞的个数在10个以上的数据,并写入到mysql数据库中的like_status表中
  39. like_statusCount(kafkaConsumerSumDatas)
  40. // 1.5.3、分别计算出2018/10/20 ,2018/10/21,2018/10/22,2018/10/23这四天每一天的评论数是多少,并写入到mysql数据库中的count_conmment表中
  41. count_conmment(kafkaConsumerSumDatas, sc)
  42. //开启 实时任务
  43. ssc.start()
  44. //等待关闭
  45. ssc.awaitTermination()
  46. }
  47. //1.5.1、查询出微博会员等级为5的用户,并把这些数据写入到mysql数据库中的vip_rank表中
  48. def fiveDatas(kafkaConsumerSumDatas: InputDStream[ConsumerRecord[String, String]]) = {
  49. val fiveDatas: DStream[ConsumerRecord[String, String]] = kafkaConsumerSumDatas.filter(x => {
  50. val line: String = x.value()
  51. if (line.split("\t")(9) == "5") true else false
  52. })
  53. //把等于是5的数据落入到Mysql数据库
  54. fiveDatas.foreachRDD(x => x.foreach(y => {
  55. saveDataToMysql(y.value(), "vip_rank")
  56. }))
  57. }
  58. // 1.5.2、查询出评论赞的个数在10个以上的数据,并写入到mysql数据库中的like_status表中
  59. def like_statusCount(kafkaConsumerSumDatas: InputDStream[ConsumerRecord[String, String]]) = {
  60. val like_statusCount: DStream[ConsumerRecord[String, String]] = kafkaConsumerSumDatas.filter(x => {
  61. val like_status: String = x.value().split("\t")(5)
  62. if ((like_status).toInt > 10) true else false
  63. })
  64. like_statusCount.foreachRDD(x => x.foreach(y => {
  65. saveDataToMysql(y.value(), "like_status")
  66. }))
  67. }
  68. // 1.5.3、分别计算出2018/10/20 ,2018/10/21,2018/10/22,2018/10/23这四天每一天的评论数是多少,并写入到mysql数据库中的count_conmment表中
  69. def count_conmment(kafkaConsumerSumDatas: InputDStream[ConsumerRecord[String, String]], sc: SparkContext) = {
  70. var id = ""
  71. val cumsumerData: DStream[Array[String]] = kafkaConsumerSumDatas.map(_.value().split("\t"))
  72. val legitimateData: DStream[Array[String]] = cumsumerData.filter(i => {
  73. //获取到时间 格式:2018/10/20 21:30
  74. val time: String = i(2)
  75. //过滤符合条件的数据
  76. if (time.startsWith("2018/10/20") || time.startsWith("2018/10/21") || time.startsWith("2018/10/22") || time.startsWith("2018/10/23")) {
  77. id = i(0)
  78. true
  79. } else false
  80. })
  81. //循环所有有效数据转换成RDD
  82. legitimateData.foreachRDD(rdd => {
  83. //按照日期进行分区 得到日期 与这个日期出现的个数
  84. val time: RDD[(String, Iterable[Array[String]])] = rdd.groupBy(x => x(2).split(" ")(0))
  85. val value1: RDD[(String, Int)] = time.map(y => y._1 -> y._2.size)
  86. value1.foreachPartition(i => {
  87. val connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata0407?characterEncoding=UTF-8", "root", "root")
  88. val sql = "REPLACE into count_conmment values(?,?,?)"
  89. i.foreach(line => {
  90. val ps: PreparedStatement = connection.prepareStatement(sql)
  91. ps.setString(1, id)
  92. ps.setString(2, line._1)
  93. ps.setString(3, line._2 + "")
  94. ps.executeUpdate()
  95. ps.close()
  96. })
  97. })
  98. })
  99. }
  100. def saveDataToMysql(str: String, tableName: String) = {
  101. val datas = str.split("\t")
  102. //获得数据库连接
  103. val connection: Connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata0407?characterEncoding=UTF-8", "root", "root")
  104. //设置SQL
  105. var sql = s"insert into ${tableName} (indexx,child_comment,comment_time,content,da_v,like_status,pic,user_id,user_name,vip_rank,stamp) values (?,?,?,?,?,?,?,?,?,?,?)"
  106. //设置value
  107. val ps = connection.prepareStatement(sql)
  108. //设置参数
  109. ps.setString(1, datas(0))
  110. ps.setString(2, datas(1))
  111. ps.setString(3, datas(2))
  112. ps.setString(4, datas(3))
  113. ps.setString(5, datas(4))
  114. ps.setString(6, datas(5))
  115. ps.setString(7, datas(6))
  116. ps.setString(8, datas(7))
  117. ps.setString(9, datas(8))
  118. ps.setString(10, datas(9))
  119. ps.setString(11, datas(10))
  120. //执行
  121. ps.executeUpdate()
  122. }

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/1017794
推荐阅读
相关标签
  

闽ICP备14008679号