赞
踩
目录
需求: 查询出微博会员等级为5的用户,并把这些数据写入到mysql数据库中的vip_rank表中 查询出评论赞的个数在10个以上的数据,并写入到mysql数据库中的like_status表中 分别计算出2018/10/20 ,2018/10/21,2018/10/22,2018/10/23这四天每一天的评论数是多少,并写入到mysql数据库中的count_conmment表中使用Java对数据预处理,把有效数据落入到Kafka,使用SparkStreaming实时消费Kafka并计算,把计算结果实时落入到Mysql。
1、以下是RNG S8 8强赛失败后,官微发表道歉微博下一级评论
数据说明:
rng_comment.txt文件中的数据
字段 | 字段含义 |
index | 数据id |
child_comment | 回复数量 |
comment_time | 评论时间 |
content | 评论内容 |
da_v | 微博个人认证 |
like_status | 赞 |
pic | 图片评论url |
user_id | 微博用户id |
user_name | 微博用户名 |
vip_rank | 微博会员等级 |
stamp | 时间戳 |
1.1、在kafak中创建rng_comment主题,设置2个分区2个副本
1.2、数据预处理,把空行过滤掉
1.3、请把给出的文件写入到kafka中,根据数据id进行分区,id为奇数的发送到一个分区中,偶数的发送到另一个分区
1.4、使用Spark Streaming对接kafka
1.5、使用Spark Streaming对接kafka之后进行计算
在mysql中创建一个数据库rng_comment
在数据库rng_comment创建vip_rank表,字段为数据的所有字段
在数据库rng_comment创建like_status表,字段为数据的所有字段
在数据库rng_comment创建count_conmment表,字段为 时间,条数
1.5.1、查询出微博会员等级为5的用户,并把这些数据写入到mysql数据库中的vip_rank表中
1.5.2、查询出评论赞的个数在10个以上的数据,并写入到mysql数据库中的like_status表中
1.5.3、分别计算出2018/10/20 ,2018/10/21,2018/10/22,2018/10/23这四天每一天的评论数是多少,并写入到mysql数据库中的count_conmment表中
点击【免费下载数据】
1、在kafak中创建rng_comment主题,设置2个分区2个副本
2、数据预处理,把空行过滤掉并按照ID进行分区奇数1号分区,偶数0号分区落入到Kafka集群
- public static void main(String[] args) throws Exception {
- //1、配置kafka集群
- Properties props = new Properties();
- //kafka服务器地址
- props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092");
- //消息确认机制
- props.put("acks", "all");
- //重试机制
- props.put("retries", 1);
- //批量发送的大小
- props.put("batch.size", 16384);
- //消息延迟
- props.put("linger.ms", 1);
- //批量的缓冲区大小
- props.put("buffer.memory", 33554432);
- //kafka数据中key value的序列化
- props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- //设置自定义分区
- props.put("partitioner.class", "day25.zuoye.MyPartition");
-
- //创建Kafka生产者对象
- KafkaProducer<String, String> producer = new KafkaProducer<>(props);
- //使用 java缓冲流读取文本文件数据
- BufferedReader bufferedReader = new BufferedReader(new FileReader(new File("D:\\第四学期\\网课-上课资料\\05-Spark\\Spark题库\\4.15号练习题\\rng_comment.txt")));
-
- //循环读取每一行数据
- String line = "";
- while ((line = bufferedReader.readLine()) != null) {
- //判断当前行数据是否为空 不为空则落入到Kafka集群
- if (!line.trim().equals("")) {
- //发送数据到 rng_comment
- producer.send(new ProducerRecord("rng_comment", line));
- //打印查看数据
- System.out.println(line);
- //睡眠等待一秒钟
- Thread.sleep(1000);
- }
- }
-
- //释放资源
- producer.close();
- bufferedReader.close();
- }
自定义分区规则
- public class MyPartition implements Partitioner {
- @Override
- public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
- //获取到落入Kafka的一行数据 按照tab 切割 获取到id 用ID 与2取余
- String id = value.toString().split("\t")[0];
- if (Integer.parseInt(id) % 2 == 0) {
- //偶数 放到 0号分区
- return 0;
- } else {
- //奇数 放到 1号分区
- return 1;
- }
- }
-
- @Override
- public void close() {
-
- }
-
- @Override
- public void configure(Map<String, ?> configs) {
-
- }
- }
3、创建三张数据库表 vip_rank 、like_status (前两张表字段一样)与count_conmment
- create table vip_rank
- (
- indexx varchar(255) not null
- primary key,
- child_comment varchar(255) not null,
- comment_time varchar(255) not null,
- content varchar(255) not null,
- da_v varchar(255) not null,
- like_status varchar(255) not null,
- pic varchar(255) not null,
- user_id varchar(255) not null,
- user_name varchar(255) not null,
- vip_rank varchar(255) not null,
- stamp varchar(255) not null
- );
- create table like_status
- (
- indexx varchar(255) not null
- primary key,
- child_comment varchar(255) not null,
- comment_time varchar(255) not null,
- content varchar(255) not null,
- da_v varchar(255) not null,
- like_status varchar(255) not null,
- pic varchar(255) not null,
- user_id varchar(255) not null,
- user_name varchar(255) not null,
- vip_rank varchar(255) not null,
- stamp varchar(255) not null
- );
- create table count_conmment
- (
- id varchar(100) not null,
- comment_time varchar(255) not null
- primary key,
- comment_time_count bigint(100) null
- );
4、使用SparkStreaming消费Kafka数据,计算数据并实时落入Mysql数据库
- import java.sql.{Connection, DriverManager, PreparedStatement}
-
- import org.apache.kafka.clients.consumer.ConsumerRecord
- import org.apache.kafka.common.serialization.StringDeserializer
- import org.apache.spark.SparkContext
- import org.apache.spark.rdd.RDD
- import org.apache.spark.sql.SparkSession
- import org.apache.spark.streaming.dstream.{DStream, InputDStream}
- import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
- import org.apache.spark.streaming.{Seconds, StreamingContext}
-
- def main(args: Array[String]): Unit = {
-
- //创建 Sparksql 对象
- val spark: SparkSession = SparkSession.builder().master("local[*]").appName("ConsumerSpakStreaming").getOrCreate()
- // 使用SparkSQL 创建 Spark 上下文对象
- val sc: SparkContext = spark.sparkContext
- //设置日志级别为警告
- sc.setLogLevel("WARN")
- // 创建 SparkStreaming 上下文对象 并指定
- val ssc: StreamingContext = new StreamingContext(sc, Seconds(2))
- //准备连接Kafka的参数
- val kafkaParams = Map[String, Object](
- "bootstrap.servers" -> "node01:9092,node02:9092,node03:9092",
- "key.deserializer" -> classOf[StringDeserializer],
- "value.deserializer" -> classOf[StringDeserializer],
- "group.id" -> "SparkKafkaDemo",
- //earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
- //latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
- //none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
- //这里配置latest自动重置偏移量为最新的偏移量,即如果有偏移量从偏移量位置开始消费,没有偏移量从新来的数据开始消费
- "auto.offset.reset" -> "latest",
- //false表示关闭自动提交.由spark帮你提交到Checkpoint或程序员手动维护
- "enable.auto.commit" -> (false: java.lang.Boolean))
-
- // 使用 kafka 0.10 消费 指定topic 的数据
- val kafkaConsumerSumDatas: InputDStream[ConsumerRecord[String, String]] =
- KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent,
- ConsumerStrategies.Subscribe[String, String](Array("rng_comment"), kafkaParams))
- //1.5.1、查询出微博会员等级为5的用户,并把这些数据写入到mysql数据库中的vip_rank表中
- fiveDatas(kafkaConsumerSumDatas)
-
- // 1.5.2、查询出评论赞的个数在10个以上的数据,并写入到mysql数据库中的like_status表中
- like_statusCount(kafkaConsumerSumDatas)
-
- // 1.5.3、分别计算出2018/10/20 ,2018/10/21,2018/10/22,2018/10/23这四天每一天的评论数是多少,并写入到mysql数据库中的count_conmment表中
- count_conmment(kafkaConsumerSumDatas, sc)
- //开启 实时任务
- ssc.start()
- //等待关闭
- ssc.awaitTermination()
- }
-
- //1.5.1、查询出微博会员等级为5的用户,并把这些数据写入到mysql数据库中的vip_rank表中
- def fiveDatas(kafkaConsumerSumDatas: InputDStream[ConsumerRecord[String, String]]) = {
- val fiveDatas: DStream[ConsumerRecord[String, String]] = kafkaConsumerSumDatas.filter(x => {
- val line: String = x.value()
- if (line.split("\t")(9) == "5") true else false
- })
-
- //把等于是5的数据落入到Mysql数据库
- fiveDatas.foreachRDD(x => x.foreach(y => {
- saveDataToMysql(y.value(), "vip_rank")
- }))
- }
-
- // 1.5.2、查询出评论赞的个数在10个以上的数据,并写入到mysql数据库中的like_status表中
- def like_statusCount(kafkaConsumerSumDatas: InputDStream[ConsumerRecord[String, String]]) = {
- val like_statusCount: DStream[ConsumerRecord[String, String]] = kafkaConsumerSumDatas.filter(x => {
- val like_status: String = x.value().split("\t")(5)
- if ((like_status).toInt > 10) true else false
- })
- like_statusCount.foreachRDD(x => x.foreach(y => {
- saveDataToMysql(y.value(), "like_status")
- }))
- }
-
- // 1.5.3、分别计算出2018/10/20 ,2018/10/21,2018/10/22,2018/10/23这四天每一天的评论数是多少,并写入到mysql数据库中的count_conmment表中
- def count_conmment(kafkaConsumerSumDatas: InputDStream[ConsumerRecord[String, String]], sc: SparkContext) = {
- var id = ""
- val cumsumerData: DStream[Array[String]] = kafkaConsumerSumDatas.map(_.value().split("\t"))
- val legitimateData: DStream[Array[String]] = cumsumerData.filter(i => {
- //获取到时间 格式:2018/10/20 21:30
- val time: String = i(2)
- //过滤符合条件的数据
- if (time.startsWith("2018/10/20") || time.startsWith("2018/10/21") || time.startsWith("2018/10/22") || time.startsWith("2018/10/23")) {
- id = i(0)
- true
- } else false
- })
- //循环所有有效数据转换成RDD
- legitimateData.foreachRDD(rdd => {
- //按照日期进行分区 得到日期 与这个日期出现的个数
- val time: RDD[(String, Iterable[Array[String]])] = rdd.groupBy(x => x(2).split(" ")(0))
- val value1: RDD[(String, Int)] = time.map(y => y._1 -> y._2.size)
- value1.foreachPartition(i => {
- val connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata0407?characterEncoding=UTF-8", "root", "root")
- val sql = "REPLACE into count_conmment values(?,?,?)"
- i.foreach(line => {
- val ps: PreparedStatement = connection.prepareStatement(sql)
- ps.setString(1, id)
- ps.setString(2, line._1)
- ps.setString(3, line._2 + "")
- ps.executeUpdate()
- ps.close()
- })
- })
- })
- }
-
- def saveDataToMysql(str: String, tableName: String) = {
- val datas = str.split("\t")
- //获得数据库连接
- val connection: Connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata0407?characterEncoding=UTF-8", "root", "root")
-
- //设置SQL
- var sql = s"insert into ${tableName} (indexx,child_comment,comment_time,content,da_v,like_status,pic,user_id,user_name,vip_rank,stamp) values (?,?,?,?,?,?,?,?,?,?,?)"
- //设置value
- val ps = connection.prepareStatement(sql)
- //设置参数
- ps.setString(1, datas(0))
- ps.setString(2, datas(1))
- ps.setString(3, datas(2))
- ps.setString(4, datas(3))
- ps.setString(5, datas(4))
- ps.setString(6, datas(5))
- ps.setString(7, datas(6))
- ps.setString(8, datas(7))
- ps.setString(9, datas(8))
- ps.setString(10, datas(9))
- ps.setString(11, datas(10))
- //执行
- ps.executeUpdate()
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。