赞
踩
第一章:快速入门案例-Spark Streaming运行WC
主要是Flume01中的课程回顾:
主要是source、sink的选择,生产上只有两个sink:Sink --> HDFS(离线)、Sink --> Kafka(实时)
注意:提交Spark作业,配置的时候提交中带" \ "符可能会出问题
TailDir只要遇到一个:按照条数、大小、时间,触发一个就会生效。
真正的小文件是ETL后的小文件,ETL过后在HDFS上的小文件处理。ETL后的小文件定时处理。
TimeStamp要设置为true,否则会报空指针异常。一个event = header + body(字节数组),时间戳的概念是在header中去取值的。
<!--Spark Streaming依赖-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
根据官网上的案例:Streaming编程和RDD除了头、尾不一样,其它都是一样的。
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
val ssc = new StreamingContext(sparkConf,Seconds(10))
}
StreamingContext中的代码:
1、主构造器: class StreamingContext private[streaming] ( _sc: SparkContext, _cp: Checkpoint, _batchDur: Duration //批次是多少 ) extends Logging { 2、附属构造器: def this(sparkContext: SparkContext, batchDuration: Duration) = { this(sparkContext, null, batchDuration) } 3、附属构造器:传入的是SparkConf, def this(conf: SparkConf, batchDuration: Duration) = { this(StreamingContext.createNewSparkContext(conf), null, batchDuration) } 4、点击createNewSparkContext,new了一个SparkContext,把conf传入了 private[streaming] def createNewSparkContext(conf: SparkConf): SparkContext = { new SparkContext(conf) }
package SparkStreaming01 import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} object SocketWCApp { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("SocketWCAPP").setMaster("local[2]") val ssc = new StreamingContext(sparkConf,Seconds(10)) //10秒处理一个批次 //TODO业务逻辑 //已经把数据从server ==> DStream val lines = ssc.socketTextStream("hadoop002",9000) val result = lines.flatMap(_.split("\t")).map((_,1)).reduceByKey(_+_) result.print() ssc.start() ssc.awaitTermination() } }
Seconds和Duration的关系:
伴生对象:Duration的单位是毫秒:
object Seconds {
def apply(seconds: Long): Duration = new Duration(seconds * 1000)
}
socketTextStream方法:
传入参数:hostname和port,默认的存储级别格式是:MEMORY_AND_DISK_SER_2,和Spark Core中又是不一样的。
def socketTextStream(
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[String] = withNamedScope("socket text stream") {
socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
}
返回值是一个:ReceiverInputDStream
一、ReceiverInputDStream方法:
abstract class ReceiverInputDStream[T: ClassTag](_ssc: StreamingContext)
extends InputDStream[T](_ssc) {
二、点进InputDStream方法:
abstract class InputDStream[T: ClassTag](_ssc: StreamingContext)
extends DStream[T](_ssc) {
注意:Spark Core是处理完就结束的,而Spark Streaming是7 x 24h不间断不会停的。
1、 在当前窗口上输入:nc -lk 9000,继续输入数据:ruoze ruoze ruoze jepson jepson 以tab键分割,然后回车
2、在idea中看是否输出:(jepson,2)、(ruoze,3)
1、第一点这个job是一直在的,只要应用程序保持启动就一直在,只要到10秒就会跑任务(不管是否输入)
2、UI界面中多出了一个Streaming页面,页面信息解读:Running batches of 10 seconds for 6 minutes 22 seconds since 2019/09/02 13:51:46 (38 completed batches, 2 records)
运行每10秒一个批次,10秒就是我们idea中设置的参数,从什么时间开始,已经完成了几条数据。
Input Rate 、Scheduling Delay、Processing Time(Kafka的数据处理不过来会有延迟,生产上的数据要保证每一个批次的作业要及时完成,不然导致作业会越排越多。)
Kafka的限速和备压原理
Spark Streaming is an extension of the core Spark API that enables scalable,high-throughput,fault-tolerant stream processing of live data streams,Data can be ingested (接收)from many sources like Kafka,Flume,Kinesis, or TCP sockets, and can be processed using complex algorithms(复杂算法) expressed with high-level functions like map,reduce,join, and window. Finally,processed data can be pushed out to filesystems,databases,and live dashboards(在线指示板), in fact, you can apply Spark’s machine learning and graph processing algorithms on data streams.
它是一个API的扩展能够保证可靠的、高吞吐的、高容错的的在线数据流处理,数据能够从很多的数据源比如Kafka,Flume读入(不推荐flume–>Spark Streaming,数据量大它就扛不住),
flume --> Kafka --> Spark Streaming
Spark Streaming的用处:
实时消费数据,实时处理数据,把结果存在一个地方中。
Internally, it works as follows .Spark Streaming receives live input data streams and divides the data into batches, which are then processed by the Spark engine to generate the final stream of results in batches.
翻译:本质上,Spark Streaming工作流程如下:接收在线的数据流,把它转换成微批处理;这些小批次将被spark引擎处理,然后产生最终的结果。
==>
一个数据实时进来,按照指定的时间分割,把它拆成一个个小的批次,交由Spark处理。
Spark:以批处理为主,使用微批来解决实时问题。
Flink:以Stream为主,来解决批处理问题;bounded 和 unbounded是Flink中的两个概念。
DStream:是Spark Streaming中的编程模型。
RDD中有的方法,Spark Streaming中都有。
Spark Streaming provides a high-level abstraction(高级别的抽象的) called discretized stream or DStream, which represents a continuous stream of data(持续性的数据流)
引出Flink中的两个概念bounded(有界)和unbounded(无界),flume采集过来的是流数据、无界的,只是根据时间采到不同的目录上去的。有界是无界的一种特例。
翻译:要么从输入数据源创建,要么通过高级转换算子转换得来,本质上,一个DStream代表了一连串的RDD.
First, we import the names of the Spark Streaming classes and some implicit conversions from StreamingContext into our environment in order to add useful methods to other classes we need (like DStream). StreamingContext is the main entry point for all Streaming functionality. We create a local StreamingContext with two execution threads, and a batch interval of 1 second.
首先,我们导入Spark Streaming中的一些类和一些隐式转换到idea环境中为了能够让我们的DStream方法能够调用. StreamingContext是所有Streaming函数的主要入口点,我们创建了一个本地的StreamingContext使用两个执行线程,批处理间隔为1秒。
如何创建:
到目前为止记住两点:作业运行是一直在跑着的,至少要启动两个线程。
赘述一遍:
为了初始化一个Spark Streaming程序,一个StreamingContext对象必须被创建(这是SparkStreaming的主要入口点)
翻译:appName参数是展示在UI界面上的一个名字,master是Spark、Mesos、YARN集群模式上的运行线程数;
最佳实践:当程序运行在一个集群上的时候,你无需使用硬编码,而是更好的使用spark-submit,只需要在提交的时候写明appName和Master即可。
翻译:对于本地测试和开发,使用local即可
1、Define the input sources by creating input DStreams.
2、Define the Streaming computations by applying transformation and output operations to DStreams.
3、Start receiving data and processing it using streamingContext.start().
4、Wait for the processing to be stopped (manually or due to any error) using streamingContext.awaitTermination().
5、The processing can be manually stopped using streamingContext.stop().
==>
imput data streams --> transformations --> output
1、Once a context has been started, no new streaming computations can be set up or added to it.
1、Discretized Streams or DStream is the basic abstraction provided by Spark Streaming. It represents a continuous stream of data, either the input data stream received from source, or the processed data stream generated by transforming the input stream.
2、Internally,a DStream is represented by a continuous series of RDDs, which is Spark’s abstraction of an immutable,distributed dataset (see Spark Programming Guide for more details). Each RDD is a DStream contains data from a certain interval, as shown in the following figure.
对一个DStream做了某个操作其实就是对底层的RDD都做了某个操作,一个DStream就是一个序列的RDD;对一个RDD做操作,其实就是对所有的Partition做了一个操作。
DStream就是对每个RDD进行操作,DStream按照时间拆分成RDD.
Spark Streaming提供两种内置的数据源
Spark Streaming不管数据是从socket还是从flume中来,Receiver是接收数据给SparkStreaming处理用的。
去到Receiver.scala源码中查看定义:
后面重点讲的两个算子:
其实就是一些action触发计算
小结我们需要掌握的如下:
StreamingContext、Input DStream、Receiver、Transformation、Output
回到IDEA中的WordCount代码:
修改代码:
val sparkConf = new SparkConf().setAppName(“SocketWCAPP”).setMaster(“local[2]”)
==> 修改为:
setMaster(“local[1]”)测试:
只设置了1个core的话,这样是有问题的,因为Receiver已经占用了一个core了,此时已经没有core来处理数据了。
图解local和local[1]:
官网描述如下:
Therefore,it is important to remember that a Spark Streaming application needs to be allocated enough cores (or threads, if running locally) to process the received data, as well as to run the receivers.
为什么HDFS不需要Receiver?直接使用HDFS API即可。挂了重新读取即可
它只需要一个core,无Receiver,所以是local[1].
package SparkStreaming01 import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} object HDFSWCApp { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("HDFSWCApp").setMaster("local[1]") val ssc = new StreamingContext(sparkConf,Seconds(10)) val lines = ssc.textFileStream("hdfs://10.0.0.132:9000/streamingg6") val result = lines.flatMap(_.split("\t")).map((_,1)).reduceByKey(_+_) result.print() ssc.start() ssc.awaitTermination() } }
读HDFS文件系统的API:ssc.textFileStream
==>这个方法推荐使用moving移动数据,使用put的话,数据正在上传上去的过程中使用ssc去读就会产生问题。
def textFileStream(directory: String): DStream[String] = withNamedScope("text file stream") {
fileStream[LongWritable, Text, TextInputFormat](directory).map(_._2.toString)
}
从上面看出来:textFileStream返回值是DStream,根本不带Receiver。
hdfs上准备文件:
hdfs dfs -put ruozeinput.txt /streamingg6test
数据也会出来,在UI页面上的Streaming页面,都没有监控到数据记录,因为没有receiver。
HDFS directory to monitor for new file
如上我们得知:使用fileStream它只认程序启动后的数据,在程序启动前就把数据put进去是读取不出来的。
通过Transformation on DStream引出:
我们之前讲的都是统计本批次的数据的结果:这是无状态的
**需求变更:**统计今天的到现在的结果:有状态的,比如这个批次与前面几个批次的数据都有关联。
you will have to do two steps:
这个操作允许我们允许我们维护任意状态并且不断的用信息去更新它。
1、Define the state - The state can be an arbitrary data type(抽象数据类型).
2、Define the state update function - Specify with a function how to do update the state using the previous state and the new values from an input stream.
def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
val newCount = ... // add the new values with the previous running count to get the new count
Some(newCount)
}
问题:为什么这个方法中要使用option,因为有的值是原来当中没有的。
eg:传入(hello,hello,John)此时计数:(hello,2)(John,1)
又传入:(tom,tom)累加计数:(hello,2)(John,1)(tom,2)
package SparkStreaming01 import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} object WCApp { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("WCAPP").setMaster("local[2]") val ssc = new StreamingContext(sparkConf,Seconds(10)) //TODO业务逻辑 val lines = ssc.socketTextStream("hadoop002",8888) val result = lines.flatMap(_.split("\t")).map((_,1)).reduceByKey(_+_) val state = result.updateStateByKey(updateFunction) ssc.start() ssc.awaitTermination() } //newValues ==>(hello,1) (hello,1) ==> (1,1) def updateFunction(newValues: Seq[Int], preValues: Option[Int]): Option[Int] = { val newCount = newValues.sum // add the new values with the previous running count to get the new count val oldCount = preValues.getOrElse(0) Some(newCount + oldCount) } }
运行这段代码抛错:
需要指定输出文件目录:
实际我在运行的时候没加输出路径并没有报错
开始在Hadoop002控制台上测试:
输入:ruoze,ruoze,jepson,jepson
输出:(ruoze,2) (jepson,2)
输入:ruoze,17
输出:(ruoze,3)(jepson,2) (17,1)
问题:会在本地中输出很多个目录,如果批次很多的话,能否抗住呢?
Answer:每个批次的结果写到外部存储,使用upsert,并且加上时间戳,然后就可以使用DB的方式查询。
如果是Socket模式的话,肯定是有两个executor的:
分析:Driver + executors,Driver中有一个Application,程序中底层有一个SparkContext,sc的上面有一个ssc,我们开始启动作业,如果是socekt模式的话,至少两个executor,任意一个中运行了Receiver,从外部(Socket、Flume、Kafka)接收数据,以Socket为例子,它是存储级别是:MEMORY_AND_DISK_SER_2,假如存储占4个block,在另一个executor中也有一个4 Block的复制;
达到我们设定的10秒时间,开始处理数据,走业务逻辑这条线;要把block信息告诉ssc,ssc的底层使用SparkContext去提交,产生job,SparkContext去进行分发。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。