赞
踩
创建Maven项目:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.aura.spark</groupId>
<artifactId>1711spark</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<scala.version>2.11.8</scala.version>
<spark.version>2.3.0</spark.version>
<hadoop.version>2.7.5</hadoop.version>
<encoding>UTF-8</encoding>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-graphx_2.11 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-graphx_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.3.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.6</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-8_2.11 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<executions>
<execution>
<phase>compile</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
创建Scala代码 NetWordCount.scala (注意是object)
千万注意local[2]最少是2,否则什么事也不会做,因为有Recevier 启动起来就是一个task任务,就需要一个线程,只写1个线程(去接收数据)就没有线程处理数据了,所以在本地最少要写2
而且代码是微批处理并不是非常准确的实时计算,每Seconds(2)两秒运行一次本批数据
- import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
- import org.apache.spark.{SparkConf, SparkContext}
- import org.apache.spark.streaming.{Seconds, StreamingContext}
-
- /**
- * Create NetWordCount.scala by jenrey on 2018/5/11 21:58
- */
- object NetWordCount {
- def main(args: Array[String]): Unit = {
- /**
- * 初始化程序入口
- */
- val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("NetWordCount")
- val sc = new SparkContext(conf)
- val ssc = new StreamingContext(sc,Seconds(2))
-
- /*StreamingContext源码会createNewSparkContext,所以可以省略创建SparkContext对象。
- val ssc = new StreamingContext(conf,Seconds(2))*/
-
- /**
- * 通过程序入口获取DStream
- * 我们通过监听的方式获取数据,监听主机名和端口。只要一有数据就可以获取到,相当于通过Socket的方式
- */
- //ReceiverInputDStream就是个DStream,继承InputDStream继承DStream(就是一个抽象类,其实就是个HashMap(Time,RDD[T])一个时间点对应一个RDD )
- val dstream: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop04", 9999)
-
- /**
- * 对DStream流进行操作
- */
- //下面都是Transformation操作
- val wordCountDStream: DStream[(String, Int)] = dstream.flatMap(line => line.split(","))
- .map((_, 1))
- .reduceByKey(_ + _)
- //output的操作类似于RDD的Action
- wordCountDStream.print() //把数据打印出来
- /**
- * 启动应用程序(固定操作)
- */
- //启动我们的程序
- ssc.start();
- //等待结束
- ssc.awaitTermination();
- //如果结束就释放资源
- ssc.stop();
- }
- }
在hadoop04几点上使用下面的命令:
[hadoop@hadoop04 ~]$ nc -lk 9999
================================================
(安装nc:)
[hadoop@hadoop04 ~]$ sudo yum install nc
输入nc命令查看是否安装成功
==================================================
先让代码跑起来。
再在hadoop04 的 nc下输入hadoop,hadoop
IDEA控制台的输出结果:
- import org.apache.spark.streaming.dstream.DStream
- import org.apache.spark.{SparkConf, SparkContext}
- import org.apache.spark.streaming.{Seconds, StreamingContext}
-
- /**
- * 用SparkStreaming在HA模式下的HDFS跑WordCount程序
- */
- object HDFSWordCount {
-
- def main(args: Array[String]): Unit = {
- val conf = new SparkConf().setMaster("local[2]").setAppName("HDFSWordCount")
- val sc = new SparkContext(conf)
- val ssc = new StreamingContext(sc,Seconds(2))
- /**
- * 数据的输入
- */
- //监控的是一个目录即文件夹,新增文件就可以接收到了
- val fileDStream: DStream[String] = ssc.textFileStream("hdfs://myha01/streaming")
- /**
- * 数据的处理
- */
- val wordCountDStream: DStream[(String, Int)] = fileDStream.flatMap(_.split(","))
- .map((_, 1))
- .reduceByKey(_ + _)
-
- /**
- * 数据的输出
- */
- wordCountDStream.print()
-
- ssc.start()
- ssc.awaitTermination()
- ssc.stop()
- }
-
- }
1)在hdfs上新建一个文件夹用来监听文件夹,一旦往监听的文件夹下面放入文件就能监听的到数据
注意:如果用的是HA模式,一定要把两个配置(core-site.xml和hdfs-site.xml)文件放到resources下面
在hdfs创建一个空的用来测试代码的文件夹streaming
[hadoop@hadoop04 ~]$ hadoop fs -mkdir /streaming
把IDEA里面的代码跑起来!!!!
然后vim hello.txt写入
you,jump
i,jump
然后上传到streaming文件夹下面
[hadoop@hadoop04 ~]$ hadoop fs -put hello.txt /streaming
- import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
- import org.apache.spark.{SparkConf, SparkContext}
- import org.apache.spark.streaming.{Seconds, StreamingContext}
-
- /**
- * Create by jenrey on 2018/5/13 15:09
- */
- object UpdateStateByKeyWordCount {
- def main(args: Array[String]): Unit = {
- /**
- * 初始化程序入口
- */
- val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("UpdateStateByKeyWordCount")
- val sc = new SparkContext(conf)
- val ssc = new StreamingContext(sc, Seconds(2))
- //注意一定要设置checkpoint目录,否则程序报错,但是这个HDFS目录一定要有权限,这个目录不用提前创建,自动创建
- ssc.checkpoint("hdfs://myha01/StreamingCheckPoint")
- /**
- * 数据的输入
- */
- val dstream: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop04", 9999)
- /**
- * 数据的处理
- */
- val wordCountDStream: DStream[(String, Int)] = dstream.flatMap(_.split(","))
- .map((_, 1))
- //updateStateByKey(updateFunc: (Seq[V], Option[S]) => Option[S]) 注意里面是一个函数
- //Option:Some:有值,None:没值
- //ByKey:操作就是分组
- //you,1
- //you,1 => you,{1,1}和jump,{1}
- //jump,1
- //下面这个函数每一个key都会调用一次这个函数
- //所以values:Seq[Int]代表List{1,1} state:Option[Int]代表上一次这个单词出现了多少次,如果上一次没出现过就是None,如果出现过就是Some该1次就1次该2次就2次
- .updateStateByKey((values: Seq[Int], state: Option[Int]) => {
- val current: Int = values.sum
- //上一次出现多少次如果有就有,没有就是0
- val lastCount: Int = state.getOrElse(0)
- //既然这个单词能调用这个方法,那么这个单词必然最少出现了一次就是Some,所以当前+上一次的就是这个单词一共出现多少次
- Some(current + lastCount)
- })
-
- /**
- * 数据的输出
- */
- wordCountDStream.print()
-
- ssc.start()
- ssc.awaitTermination()
- ssc.stop()
- }
- }
运行代码:
注意容易报错的两个点:
1.没有设置checkpoint导致报错
2.没有权限,报错,如下图
需要去设置权限
[hadoop@hadoop04 ~]$ hdfs dfs -chmod 777 /
然后运行代码:
在hadoop04节点进行发送数据:
[hadoop@hadoop04 ~]$ nc -lk 9999
普通的为什么就不能停止后接着运行呢?因为又一次创建了程序入口(new StreamingContext),是完全两个不同的程序入口,说白了就是关心Driver服务。两个Driver服务不同。
checkpoint可以把Driver服务里面的信息存到这个目录里,那么下一次我们启动的时候通过checkpoint里面的数据把Driver服务恢复成跟上一次一样的,那么再计算的时候就相当于对上一次的结果进行累加了
- import org.apache.spark.streaming.dstream.ReceiverInputDStream
- import org.apache.spark.{SparkConf, SparkContext}
- import org.apache.spark.streaming.{Seconds, StreamingContext}
-
- object DriverHAWordCount {
- def main(args: Array[String]): Unit = {
- //注意本程序需要执行一次输入点数据,然后关闭再次执行就可以接着上次进行累加了
- val checkpointDirectory: String = "hdfs://myha01/StreamingCheckPoint3";
-
- def functionToCreateContext(): StreamingContext = {
- val conf = new SparkConf().setMaster("local[2]").setAppName("DriverHAWordCount")
- val sc = new SparkContext(conf)
- val ssc = new StreamingContext(sc, Seconds(2))
- ssc.checkpoint(checkpointDirectory)
- val dstream: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop04", 9999)
- val wordCountDStream = dstream.flatMap(_.split(","))
- .map((_, 1))
- .updateStateByKey((values: Seq[Int], state: Option[Int]) => {
- val currentCount = values.sum
- val lastCount = state.getOrElse(0)
- Some(currentCount + lastCount)
- })
-
- wordCountDStream.print()
-
- ssc.start()
- ssc.awaitTermination()
- ssc.stop()
- //最后一行代码就是返回的
- ssc
- }
-
- //从里面获取一个程序入口,如果checkpointDirectory目录里面有程序入口就用这个,如果没有就新new一个程序入口(或者说一个Driver服务)
- val ssc = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
-
- ssc.start()
- ssc.awaitTermination()
- ssc.stop()
-
- }
-
- }
运行程序:
关闭代码程序,再次运行,等待一会再输入数据
- import org.apache.spark.broadcast.Broadcast
- import org.apache.spark.rdd.RDD
- import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
- import org.apache.spark.{SparkConf, SparkContext}
- import org.apache.spark.streaming.{Seconds, StreamingContext}
-
- object WordBlack {
- def main(args: Array[String]): Unit = {
- val conf = new SparkConf().setMaster("local[2]").setAppName("WordBlack")
- val sc: SparkContext = new SparkContext(conf)
- val ssc = new StreamingContext(sc, Seconds(2))
- /**
- * 数据的输入
- */
- val dstream: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop04", 9999)
-
- /**
- * 自己模拟一个黑名单(正常是用mysql,hbase,redis数据库读取出来的
- */
- //直接转化为RDD
- val wordBlackList: RDD[(String, Boolean)] = ssc.sparkContext.parallelize(List("?", "!", "*"))
- .map(param => (param, true))
- /**
- * (?,true)
- * (!,true)
- * (*,true)
- */
- val balckList: Array[(String, Boolean)] = wordBlackList.collect()
- //broadcast广播出去
- val blackListBroadcast: Broadcast[Array[(String, Boolean)]] = ssc.sparkContext.broadcast(balckList)
-
- /**
- * 数据的处理
- */
- val wordOneDStream: DStream[(String, Int)] = dstream.flatMap(_.split(","))
- .map((_, 1))
- //transform把DStream转换成RDD,需要又返回值,并且类型为RDD
- val wordCountDStream: DStream[(String, Int)] = wordOneDStream.transform(rdd => {
- val filterRDD: RDD[(String, Boolean)] = rdd.sparkContext.parallelize(blackListBroadcast.value)
- val resultRDD: RDD[(String, (Int, Option[Boolean]))] = rdd.leftOuterJoin(filterRDD)
-
- /**
- * String, (Int, Option[Boolean])
- * String:word
- * Int:1
- * Option:有可能join上也有可能join不上
- *
- * 思路:我们要的是join不上的,说白了要的是Option[Boolean]=None
- * filter:
- * true代表我们要
- */
- resultRDD.filter(tuple => {
- tuple._2._2.isEmpty
- }).map(_._1)
- }).map((_, 1)).reduceByKey(_ + _)
-
- /**
- * 数据的输出
- */
- wordCountDStream.print()
-
- ssc.start()
- ssc.awaitTermination()
- ssc.stop()
- }
- }
先运行代码,然后再hadoop04传送数据
- import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
- import org.apache.spark.{SparkConf, SparkContext}
- import org.apache.spark.streaming.{Seconds, StreamingContext}
-
- /**
- * 需求:每隔4秒统计最近6秒的单词计数的情况
- * reduceByKeyAndWindow
- */
- object WindowOperatorTest {
- def main(args: Array[String]): Unit = {
- val conf = new SparkConf().setMaster("local[2]").setAppName("WordBlack")
- val sc: SparkContext = new SparkContext(conf)
- val ssc = new StreamingContext(sc, Seconds(2))
- /**
- * 数据的输入
- * 到目前为止这个地方还没有跟生产进行对接。
- */
- val dstream: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop04", 9999)
-
- /**
- * 数据的处理
- */
- val resultWordCountDStream: DStream[(String, Int)] = dstream.flatMap(_.split(","))
- .map((_, 1))
-
- /**
- * reduceFunc: (V, V) => V, 匿名函数-达到对单词次数进行累加的效果
- * windowDuration: Duration, 统计多少秒以内的数据-窗口的大小
- * slideDuration: Duration, 每隔多少时间-滑动的大小
- * //numPartitions: Int 指定分区数,要么跟核数有关要么和指定分区数有关
- * 注意:这两个数一定要是Seconds(2)的倍数
- */
- .reduceByKeyAndWindow((x: Int, y: Int) => x + y, Seconds(6), Seconds(4))
-
-
- /**
- * 数据的输出
- */
- resultWordCountDStream.print()
- ssc.start()
- ssc.awaitTermination()
- ssc.stop()
- }
- }
注意foreachRDD下面的代码是执行在Driver端的
1.下图会报错
- dstream.foreachRDD { rdd =>
- val connection = createNewConnection() // executed at the driver
- rdd.foreach { record =>
- connection.send(record) // executed at the worker
- }
- }
connection对象是需要发到executor上执行的。这就需要网络的传输了。就需要序列化。但是这个对象是不支持序列化的。所以就发送不过去,会报错无法序列化。
2.不报错,缺陷每次都需要创建connection对象,还有就是每次处理的是一条数据,频繁的创建和销毁对数据库连接,对数据库影响很大
- dstream.foreachRDD { rdd =>
- rdd.foreach { record =>
- val connection = createNewConnection()
- connection.send(record)
- connection.close()
- }
- }
3.不报错,一次拿到一个partition数据,并创建一个connection对象。再把每个分区的数据一条一条发送到数据库。
- dstream.foreachRDD { rdd =>
- rdd.foreachPartition { partitionOfRecords =>
- val connection = createNewConnection()
- partitionOfRecords.foreach(record => connection.send(record))
- connection.close()
- }
- }
调优:我们可以弄个连接池,再弄个批处理,每一百条数据提交一次到数据库
4.不报错,使用连接池
- dstream.foreachRDD { rdd =>
- rdd.foreachPartition { partitionOfRecords =>
- // ConnectionPool is a static, lazily initialized pool of connections
- val connection = ConnectionPool.getConnection()
- partitionOfRecords.foreach(record => connection.send(record))
- ConnectionPool.returnConnection(connection) // return to the pool for future reuse
- }
- }
注意:从连接池中获取,使用完把连接丢回到连接池,少了创建的步骤,但是还是一条数据一条数据提交。所以我们要批处理,就要考虑到事务。
下面是操作实战:
现在mysql建立一张表:
写一个连接池的scala文件:
- import java.sql.{Connection, DriverManager}
-
- object ConnectionPool {
- private val max=8 ;//连接池的连接总数
- private val connectionNum=10;//每次产生的连接数
- private var conNum=0;//当前连接池已经产生的连接数
-
- import java.util
- private val pool=new util.LinkedList[Connection]();//连接池
-
- {
- Class.forName("com.mysql.jdbc.Driver")
- }
- /**
- * 释放连接
- */
- def returnConnection(conn:Connection):Unit={
- pool.push(conn);
- }
- /**
- * 获取连接
- */
- def getConnection():Connection={
- //同步代码块
- AnyRef.synchronized({
- if(pool.isEmpty()){
- for( i <- 1 to connectionNum){
- val conn=DriverManager.getConnection("jdbc:mysql://localhost:3306/aura","root","root");
- pool.push(conn);
- conNum+1;
- }
- }
- pool.poll();
- })
-
- }
- }
下面的代码:
- /**
- * Create by jenrey on 2018/5/13 20:27
- */
- import org.apache.spark.{SparkConf, SparkContext}
- import org.apache.spark.streaming.{Seconds, StreamingContext}
- import org.apache.spark.streaming.dstream.ReceiverInputDStream
-
- /**
- * 接收nc的数据,并把数据存到mysql表中
- */
- object OutputTest {
-
- def main(args: Array[String]): Unit = {
- val conf = new SparkConf().setMaster("local[2]").setAppName("OutputTest")
- val sc = new SparkContext(conf)
- val ssc = new StreamingContext(sc,Seconds(2))
- ssc.checkpoint("hdfs://myha01/StreamingCheckPoint3")
- /**
- * 数据的输入
- */
- val dstream: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop04",9999)
-
-
- val wordCountDStream = dstream.flatMap(_.split(","))
- .map((_, 1))
- .updateStateByKey((values: Seq[Int], state: Option[Int]) => {
- val currentCount = values.sum
- val lastCount = state.getOrElse(0)
- Some(currentCount + lastCount)
- })
-
- /**
- * 数据的输出
- */
- wordCountDStream.foreachRDD( rdd=>{
- rdd.foreachPartition( paritition =>{
- //从连接池中获取连接
- val connection = ConnectionPool.getConnection()
- //获取Statement对象(用来发送sql指令)
- val statement = connection.createStatement()
- paritition.foreach{
- case (word,count) =>{
- val sql=s"insert into aura.1711wordcount values(now(),'$word',$count)"
- print(sql)
- //借助于Statement发送sql指令
- statement.execute(sql)
- }
- }
- //把connection对象再还回给连接池
- ConnectionPool.returnConnection(connection)
- } )
- })
-
- ssc.start()
- ssc.awaitTermination()
- ssc.stop()
- }
-
- }
-
先运行程序再发送nc数据:
缺点就是把每一次的记录都打印出来了。
如果想要最新的数据就存在Hbase上面,Hbase会自动进行覆盖。
- import java.sql.DriverManager
-
- import org.apache.spark.storage.StorageLevel
- import org.apache.spark.streaming.{Seconds, StreamingContext}
- import org.apache.spark.{SparkConf, SparkContext}
-
- /**
- * WordCount程序,Spark Streaming消费TCP Server发过来的实时数据的例子:
- *
- * 1、在master服务器上启动一个Netcat server
- * `$ nc -lk 9998` (如果nc命令无效的话,我们可以用yum install -y nc来安装nc)
- *
- *
- * create table wordcount(ts bigint, word varchar(50), count int);
- *
- * spark-shell --total-executor-cores 4 --executor-cores 2 --master spark://master:7077 --jars mysql-connector-java-5.1.44-bin.jar,c3p0-0.9.1.2.jar,spark-streaming-basic-1.0-SNAPSHOT.jar
- *
- *
- */
- object NetworkWordCountForeachRDD {
- def main(args: Array[String]) {
- val sparkConf = new SparkConf().setAppName("NetworkWordCountForeachRDD")
- val sc = new SparkContext(sparkConf)
-
- // Create the context with a 1 second batch size
- val ssc = new StreamingContext(sc, Seconds(5))
-
- //创建一个接收器(ReceiverInputDStream),这个接收器接收一台机器上的某个端口通过socket发送过来的数据并处理
- val lines = ssc.socketTextStream("hadoop1", 9998, StorageLevel.MEMORY_AND_DISK_SER)
-
- //处理的逻辑,就是简单的进行word count
- val words = lines.flatMap(_.split(" "))
- val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
-
- //将结果保存到Mysql(一)
- /**
- *
- * 这个代码会报错的!!!
- */
- wordCounts.foreachRDD { (rdd, time) =>
- Class.forName("com.mysql.jdbc.Driver")
- val conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/aura", "root", "root")
- val statement = conn.prepareStatement(s"insert into wordcount(ts, word, count) values (?, ?, ?)")
- rdd.foreach { record =>
- statement.setLong(1, time.milliseconds)
- statement.setString(2, record._1)
- statement.setInt(3, record._2)
- statement.execute()
- }
- statement.close()
- conn.close()
- }
- //启动Streaming处理流
- ssc.start()
-
- ssc.stop(false)
-
-
- //将结果保存到Mysql(二)
- wordCounts.foreachRDD { (rdd, time) =>
- rdd.foreach { record =>
- Class.forName("com.mysql.jdbc.Driver")
- val conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/aura", "root", "root")
- val statement = conn.prepareStatement(s"insert into wordcount(ts, word, count) values (?, ?, ?)")
- statement.setLong(1, time.milliseconds)
- statement.setString(2, record._1)
- statement.setInt(3, record._2)
- statement.execute()
- statement.close()
- conn.close()
- }
- }
-
- //将结果保存到Mysql(三)
- wordCounts.foreachRDD { (rdd, time) =>
- rdd.foreachPartition { partitionRecords =>
- Class.forName("com.mysql.jdbc.Driver")
- val conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/aura", "root", "root")
- val statement = conn.prepareStatement(s"insert into wordcount(ts, word, count) values (?, ?, ?)")
- partitionRecords.foreach { case (word, count) =>
- statement.setLong(1, time.milliseconds)
- statement.setString(2, word)
- statement.setInt(3, count)
- statement.execute()
- }
- statement.close()
- conn.close()
- }
- }
-
- //将结果保存到Mysql(四),使用连接池
- wordCounts.foreachRDD { (rdd, time) =>
- rdd.foreachPartition { partitionRecords =>
- val conn = ConnectionPool.getConnection
- val statement = conn.prepareStatement(s"insert into wordcount(ts, word, count) values (?, ?, ?)")
- partitionRecords.foreach { case (word, count) =>
- statement.setLong(1, time.milliseconds)
- statement.setString(2, word)
- statement.setInt(3, count)
- statement.execute()
- }
- statement.close()
- ConnectionPool.returnConnection(conn)
- }
- }
-
- //将结果保存到Mysql(五),批处理
- wordCounts.foreachRDD { (rdd, time) =>
- rdd.foreachPartition { partitionRecords =>
- val conn = ConnectionPool.getConnection
- val statement = conn.prepareStatement(s"insert into wordcount(ts, word, count) values (?, ?, ?)")
- partitionRecords.foreach { case (word, count) =>
- statement.setLong(1, time.milliseconds)
- statement.setString(2, word)
- statement.setInt(3, count)
- statement.addBatch()
- }
- statement.executeBatch()
- statement.close()
- ConnectionPool.returnConnection(conn)
- }
- }
-
-
- //将结果保存到Mysql(六),批处理引入事务
- wordCounts.foreachRDD { (rdd, time) =>
- rdd.foreachPartition { partitionRecords =>
- val conn = ConnectionPool.getConnection
- //把自动提交改为false
- conn.setAutoCommit(false)
- val statement = conn.prepareStatement(s"insert into wordcount(ts, word, count) values (?, ?, ?)")
- partitionRecords.foreach { case (word, count) =>
- statement.setLong(1, time.milliseconds)
- statement.setString(2, word)
- statement.setInt(3, count)
- statement.addBatch()
- }
- statement.executeBatch()
- statement.close()
- conn.commit()
- conn.setAutoCommit(true)
- ConnectionPool.returnConnection(conn)
- }
- }
-
-
- //将结果保存到Mysql(七),控制批处理的量,每500条提交一次
- wordCounts.foreachRDD { (rdd, time) =>
- rdd.foreachPartition { partitionRecords =>
- val conn = ConnectionPool.getConnection
- conn.setAutoCommit(false)
- val statement = conn.prepareStatement(s"insert into wordcount(ts, word, count) values (?, ?, ?)")
- partitionRecords.zipWithIndex.foreach { case ((word, count), index) =>
- statement.setLong(1, time.milliseconds)
- statement.setString(2, word)
- statement.setInt(3, count)
- statement.addBatch()
- if (index != 0 && index % 500 == 0) {
- statement.executeBatch()
- conn.commit()
- }
- }
- statement.executeBatch()
- statement.close()
- conn.commit()
- conn.setAutoCommit(true)
- ConnectionPool.returnConnection(conn)
- }
- }
-
- //等待Streaming程序终止
- ssc.awaitTermination()
- }
- }
首先在pom.xml中写入下面的代码
编写下面的代码:
- import kafka.serializer.StringDecoder
- import org.apache.spark.streaming.dstream.DStream
- import org.apache.spark.streaming.kafka.KafkaUtils
- import org.apache.spark.{SparkConf, SparkContext}
- import org.apache.spark.streaming.{Seconds, StreamingContext}
-
- object KafkaTest {
-
- def main(args: Array[String]): Unit = {
- val conf = new SparkConf().setMaster("local[2]").setAppName("KafkaTest")
- val sc = new SparkContext(conf)
- val ssc = new StreamingContext(sc,Seconds(2))
- //使用kafka是需要下面的目录的,因为SparkStreaming自己要维护一些东西的,要持久化,存到内存是易丢失的。
- ssc.checkpoint("hdfs://myha01/streamingkafka")
- /**
- * 数据的输入:KafkaUtils.createDirectStream
- *
- * def createDirectStream[K: ClassTag,V: ClassTag,KD <:Decoder[K]: ClassTag,VD <:Decoder[V]: ClassTag] (
- * 下面是三个参数:
- * ssc: StreamingContext,
- * kafkaParams: Map[String, String],
- * topics: Set[String]) 可以一下子读多个topics,但是我们这里读一个topics就行了
- */
- //指定kafka broker的机器,也就是kafka的地址
- val kafkaParams = Map("metadata.broker.list" -> "hadoop03:9092")
- val topics = Set("aura")
- //kafka读出来数据是kv的形式[String代表k的数据类型(k可就是偏移位置的信息, String代表v的数据类型(kafka内每一条数据), StringDecoder代表的就是解码器, StringDecoder]
- //原来直接返回的是InputDStream[(String,String)]的KV数据类型,因为偏移位置的信息对我们是没有用的所以我们要.map(_._2)
- val kafkaDStream: DStream[String] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
- ssc, kafkaParams, topics).map(_._2)
-
- /**
- * 数据的处理
- * 也已经比较正式了
- */
- kafkaDStream.flatMap(_.split(","))
- .map((_,1))
- .reduceByKey(_+_)
- .print()
-
- ssc.start()
- ssc.awaitTermination()
- ssc.stop()
- }
- }
启动kafka高可用集群:
注意先启动ZK
[hadoop@hadoop03 kafka_2.11-0.8.2.0]$ bin/kafka-server-start.sh config/server.properties
[hadoop@hadoop04 kafka_2.11-0.8.2.0]$ bin/kafka-server-start.sh config/server.properties
[hadoop@hadoop05 kafka_2.11-0.8.2.0]$ bin/kafka-server-start.sh config/server.properties
创建topic
创建生产者
[hadoop@hadoop03 kafka_2.11-0.8.2.0]$ bin/kafka-console-producer.sh --broker-list localhost:9092 -topic aura
然后运行代码:
然后再hadoop03输入下面的内容
- import java.sql.Date
- import java.util.Properties
-
- import kafka.serializer.StringDecoder
- import org.apache.spark.rdd.RDD
- import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}
- import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}
- import org.apache.spark.streaming.dstream.DStream
- import org.apache.spark.streaming.kafka.KafkaUtils
- import org.apache.spark.streaming.{Seconds, StreamingContext}
- import org.apache.spark.{SparkConf, SparkContext}
-
- /**
- * timestamp:
- * 时间戳,用户点击广告的时间
- * province:
- * 省份,用户在哪个省份点击的广告
- * city:
- * 城市,用户在哪个城市点击的广告
- * userid:
- * 用户的唯一标识
- * advid:
- * 被点击的广告id
- */
- object AdvApplicationTest {
-
- def main(args: Array[String]): Unit = {
- val conf = new SparkConf()
- conf.setMaster("local")
- conf.setAppName("AdvApplicationTest")
- conf.set("","") //序列化
-
- val sc = new SparkContext(conf)
-
- val ssc = new StreamingContext(sc,Seconds(5))
- //getOrCreate():有就拿过来,没有就创建,类似于单例模式:
- val spark: SparkSession = SparkSession.builder()
- .config(conf).getOrCreate()
-
- /**
- * 第一步:从kafka获取数据(direct 方式)
- * K: ClassTag,
- V: ClassTag,
- KD <: Decoder[K]: ClassTag,
- VD <: Decoder[V]: ClassTag] (
- ssc: StreamingContext,
- kafkaParams: Map[String, String],
- topics: Set[String]
- */
- val kafkaParams = Map("metadata.broker.list" -> "hadoop3:9092")
- val topics = Set("aura")
- val logDstream: DStream[String] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
- ssc, kafkaParams, topics).map(_._2)
-
- /**
- * 第二步:进行黑名单过滤
- */
- val filterLogDStream: DStream[String] = blackListFilter(logDstream,ssc)
-
-
- /**
- * 【一个用户】【一天内】对【某个广告】点击的次数超过了【100次】,这样的用户属于黑名单用户
- *
- *
- * zhangsan:
- * A:50 B:60
- * lisi:
- * A:50 A:20 A:40 这就是黑名单用户
- * 如果一个用户今天是黑名单用户,那么明天还是黑名单用户吗?
- * 这个看业务而定。
- *
- * 第三步:动态生成黑名单 实时生成黑名单
- */
- DynamicGenerationBlacklists(filterLogDStream,spark)
-
- /**
- * 第四步:
- * 实时统计每天各省各城市广告点击量
- */
- val dateProvinceCityAdvClick_Count = ProvinceCityAdvClick_Count(filterLogDStream)
- /**
- * 第五步:
- * 实时统计每天各省热门广告
- * 分组求TopN
- *
- * transform froeachRDD
- * rdd => dataframe
- * SparkSQL:
- * SQL
- */
-
-
- /**
- * 第六步:
- * 实时统计每天每个广告在最近一小时的滑动窗口的点击趋势
- */
-
- ssc.start()
- ssc.awaitTermination()
- ssc.stop()
- }
-
- /**
- * 对黑名单数据进行过滤
- * logDstream 从kafka读取数据
- * 返回的就是进行黑名单过滤以后的数据
- */
- def blackListFilter(logDstream: DStream[String],ssc:StreamingContext):DStream[String]={
-
- /**
- * 这个地方应该是去数据库里面去读取数据
- * 三个常用的数据库:Redis,HBase,Mysql
- * black_list
- */
-
- val blackList = List((1L,true),(2L,true),(3L,true))
- val blackListRDD = ssc.sparkContext.parallelize(blackList)
- val balckListBroadcast = ssc.sparkContext.broadcast(blackListRDD.collect())
-
- /**
- * 这个地方的黑名单,应该是从我们的持久化的数据库里面读取的:有三个数据库是我们常用的:
- * 1)Reids 自己去百度一下
- * 2) HBase 自己去百度一下
- * 3) Mysql 上课演示过
- * SparkCore的方式读取的
- * SparkSQL -> dataframe -> rdd
- */
-
- logDstream.transform( rdd =>{
- val user_lineRDD=rdd.map( line =>{
- val fields = line.split(",")
- (fields(3).toLong,line)
- })
- val blackRDD = rdd.sparkContext.parallelize(balckListBroadcast.value)
- //只有keyValue的形式才能进行join,所以需要上面的操作
- val resultRDD: RDD[(Long, (String, Option[Boolean]))] = user_lineRDD.leftOuterJoin(blackRDD)
- resultRDD.filter( tuple =>{
- tuple._2._2.isEmpty
- }).map(_._2._1)
-
- })
-
- }
-
- /**
- * 动然生成黑名单
- * @param filterLogDStream 黑名单过滤万了以后的数据
- * 【一个用户】【一天内】对【某个广告】点击的次数超过了【100次】,这样的用户属于黑名单用户
- *
- * 梳理一下思路:
- * 这个需求 跟 我们单词计数很像,无非不就是实时统计每个单词出现了多少次
- * 如果发现某个单词出现了一个100,那么他就是黑名单单词
- * 方式一:
- * (date_userid_advid,v)=map
- * 实时统计出来每个单词出现了多少次=updateStateBykey (对内存的要求高一点)
- * 张三 A 80
- * 李四 B 99
- * 100
- * fitler 过滤出来次数 一百以上 把它写入 MySQL,Reids,HBase 数据库
- * 方式二:
- * (date_userid_advid,v)=map
- * 每次处理的是本批次的数据 reduceBykey(对内存的要求低一点)
- * HBase:
- * rowkey: date_userid_advid 2
- * 本批次 3
- * 5
- * Redis
- * 方式三:
- * MySQL的方式
- */
- def DynamicGenerationBlacklists(filterLogDStream: DStream[String],spark:SparkSession):Unit={
-
- val date_userid_advid_ds=filterLogDStream.map( line =>{
- val fields = line.split(",")
- val time = new Date( fields(0).toLong)
- val date = DateUtils.formatDateKey(time)
- val userid = fields(3)
- val advid = fields(4)
- //20180512_
- (date+"_"+userid+"_"+advid,1L)
- }).reduceByKey(_+_)
-
- date_userid_advid_ds.foreachRDD( rdd =>{
- rdd.foreachPartition( partition =>{
- val connection = ConnectionPool.getConnection()
- val statement = connection.createStatement()
- partition.foreach{
- case(date_userid_advid,count) =>{
- val fields = date_userid_advid.split("_")
- val date = fields(0)
- val userid = fields(1).toLong
- val advid = fields(2).toLong
- val sql=s"insert into aura.tmp_advclick_count values($date,$userid,$advid,$count)";
- statement.execute(sql);
- }
- }
- ConnectionPool.returnConnection(connection)
-
- })
- })
-
- /**
- *生成黑名单
- */
-
- val df: DataFrame = spark.read.format("jdbc")
- .option("url", "jdbc:mysql://localhost:3306/aura")
- .option("user", "aura")
- .option("password", "aura")
- .option("dbtable", "tmp_advclick_count")
- .load()
-
- df.createOrReplaceTempView("tmp_advclick_count")
-
- val sql=
- """
- SELECT
- userid
- FROM
- (
- SELECT
- date,userid,advid,sum(click_count) c_count
- FROM
- tmp_advclick_count
- GROUP BY
- date,userid,advid
- ) t
- WHERE
- t.c_count > 100
- """
-
- //统计出来黑名单
- val blacklistdf = spark.sql(sql).distinct()
- val properties = new Properties()
- properties.put("user","aura")
- properties.put("password","aura")
- blacklistdf.write.mode(SaveMode.Append)
- .jdbc("jdbc:mysql://localhost:3306/aura","black_list",properties)
- }
-
- /**
- * 实时统计每天各省各城市广告点击量
- * @param filterLogDStream
- */
- def ProvinceCityAdvClick_Count(filterLogDStream: DStream[String]):DStream[(String,Long)]={
- /**
- * 思路
- * map => (k,v) => date+province+city+advid 1
- * updateStateBykey
- */
- var f=(input:Seq[Long],state:Option[Long]) =>{
- val current_count = input.sum
- val last_count = state.getOrElse(0)
- Some(current_count+last_count)
- }
-
- filterLogDStream.map( line =>{
- val fields = line.split(",")
- val time = fields(0).toLong
- val mydate = new Date(time)
- val date = DateUtils.formatDateKey(mydate)
- val province = fields(1)
- val city = fields(2)
- val advid = fields(4)
- (date+"_"+province+"_"+city+"_"+advid,1L)
- }).updateStateByKey(f)
- /**
- * 如果开发有需求的话,可以把这些数据库写入 MySQL数据库 ,Hbase
- */
- }
-
- /**
- * 实时统计 各省热门广告
- *
- * transform : rdd -> datafram -> table -> sql
- * @param date_province_city_advid_count
- */
- def ProvinceAdvClick_Count(date_province_city_advid_count:DStream[(String,Long)],spark:SparkSession): Unit ={
- date_province_city_advid_count.transform( rdd =>{
- var date_province_advid_count= rdd.map{
- case(date_province_city_advid,count) =>{
- val fields = date_province_city_advid.split("_")
- val date = fields(0)
- val province = fields(1)
- val advid = fields(3)
-
-
- (date+"_"+province+"_"+advid,count)
- }
- }.reduceByKey(_+_)
-
- val rowRDD=date_province_advid_count.map( tuple =>{
- val fields = tuple._1.split("_")
- val date = fields(0)
- val provnice = fields(1)
- val advid = fields(2).toLong
- val count = tuple._2
- Row(date,provnice,advid,count)
- })
-
- val schema=StructType(
- StructField("date",StringType,true)::
- StructField("province",StringType,true)::
- StructField("advid",LongType,true)::
- StructField("count",LongType,true):: Nil
-
- )
-
- val df = spark.createDataFrame(rowRDD,schema)
-
- df.createOrReplaceTempView("temp_date_province_adv_count")
-
- val sql=
- """
- select
- *
- from
- (
- select
- date,province,advid,count,row_number() over(partition by province ordr by count desc) rank
- from
- temp_date_province_adv_count
- ) temp
- where temp.rank < 10
- """
-
- /**
- * 把结果持久化到数据库
- */
- spark.sql(sql)
-
- rdd
-
- })
- }
- }
- import java.text.ParseException;
- import java.text.SimpleDateFormat;
- import java.util.Calendar;
- import java.util.Date;
-
- /**
- * DateTime Utils
- *
- * Created by XuanYu on 2016/5/31.
- */
- public class DateUtils {
-
- public static final SimpleDateFormat TIME_FORMAT =
- new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- public static final SimpleDateFormat DATE_FORMAT =
- new SimpleDateFormat("yyyy-MM-dd");
- public static final SimpleDateFormat DATEKEY_FORMAT =
- new SimpleDateFormat("yyyyMMdd");
-
- /**
- * 判断一个时间是否在另一个时间之前
- * @param time1 第一个时间
- * @param time2 第二个时间
- * @return 判断结果
- */
- public static boolean before(String time1, String time2) {
- try {
- Date dateTime1 = TIME_FORMAT.parse(time1);
- Date dateTime2 = TIME_FORMAT.parse(time2);
-
- if(dateTime1.before(dateTime2)) {
- return true;
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- return false;
- }
-
- /**
- * 判断一个时间是否在另一个时间之后
- * @param time1 第一个时间
- * @param time2 第二个时间
- * @return 判断结果
- */
- public static boolean after(String time1, String time2) {
- try {
- Date dateTime1 = TIME_FORMAT.parse(time1);
- Date dateTime2 = TIME_FORMAT.parse(time2);
-
- if(dateTime1.after(dateTime2)) {
- return true;
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- return false;
- }
-
- /**
- * 计算时间差值(单位为秒)
- * @param time1 时间1
- * @param time2 时间2
- * @return 差值
- */
- public static int minus(String time1, String time2) {
- try {
- Date datetime1 = TIME_FORMAT.parse(time1);
- Date datetime2 = TIME_FORMAT.parse(time2);
-
- long millisecond = datetime1.getTime() - datetime2.getTime();
-
- return Integer.valueOf(String.valueOf(millisecond / 1000));
- } catch (Exception e) {
- e.printStackTrace();
- }
- return 0;
- }
-
- /**
- * 获取年月日和小时
- * @param datetime 时间(yyyy-MM-dd HH:mm:ss)
- * @return 结果(yyyy-MM-dd_HH)
- */
- public static String getDateHour(String datetime) {
- String date = datetime.split(" ")[0];
- String hourMinuteSecond = datetime.split(" ")[1];
- String hour = hourMinuteSecond.split(":")[0];
- return date + "_" + hour;
- }
-
- /**
- * 获取当天日期(yyyy-MM-dd)
- * @return 当天日期
- */
- public static String getTodayDate() {
- return DATE_FORMAT.format(new Date());
- }
-
- /**
- * 获取昨天的日期(yyyy-MM-dd)
- * @return 昨天的日期
- */
- public static String getYesterdayDate() {
- Calendar cal = Calendar.getInstance();
- cal.setTime(new Date());
- cal.add(Calendar.DAY_OF_YEAR, -1);
-
- Date date = cal.getTime();
-
- return DATE_FORMAT.format(date);
- }
-
- /**
- * 格式化日期(yyyy-MM-dd)
- * @param date Date对象
- * @return 格式化后的日期
- */
- public static String formatDate(Date date) {
- return DATE_FORMAT.format(date);
- }
-
- /**
- * 格式化时间(yyyy-MM-dd HH:mm:ss)
- * @param date Date对象
- * @return 格式化后的时间
- */
- public static String formatTime(Date date) {
- return TIME_FORMAT.format(date);
- }
-
- /**
- * 解析时间字符串
- * @param time 时间字符串
- * @return Date
- */
- public static Date parseTime(String time) {
- try {
- return TIME_FORMAT.parse(time);
- } catch (ParseException e) {
- e.printStackTrace();
- }
- return null;
- }
-
- /**
- * 格式化日期key
- * @param date
- * @return
- */
- public static String formatDateKey(Date date) {
- return DATEKEY_FORMAT.format(date);
- }
-
- /**
- * 格式化日期key
- * @param datekey
- * @return
- */
- public static Date parseDateKey(String datekey) {
- try {
- return DATEKEY_FORMAT.parse(datekey);
- } catch (ParseException e) {
- e.printStackTrace();
- }
- return null;
- }
-
- /**
- * 格式化时间,保留到分钟级别
- * yyyyMMddHHmm
- * @param date
- * @return
- */
- public static String formatTimeMinute(Date date) {
- SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmm");
- return sdf.format(date);
- }
-
- }
- import java.sql.{Connection, DriverManager}
- object ConnectionPool {
- private val max=8 ;//连接池的连接总数
- private val connectionNum=10;//每次产生的连接数
- private var conNum=0;//当前连接池已经产生的连接数
-
- import java.util
- private val pool=new util.LinkedList[Connection]();//连接池
-
- {
-
- Class.forName("com.mysql.jdbc.Driver")
- }
- /**
- * 释放连接
- */
- def returnConnection(conn:Connection):Unit={
- pool.push(conn);
- }
- /**
- * 获取连接
- */
- def getConnection():Connection={
- //同步代码块
- AnyRef.synchronized({
- if(pool.isEmpty()){
- for( i <- 1 to connectionNum){
- val conn=DriverManager.getConnection("jdbc:mysql://localhost:3306/aura","root","root");
- pool.push(conn);
- conNum+1;
- }
- }
-
- pool.poll();
- })
-
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。