当前位置:   article > 正文

Spark Streaming综合实战(一)(史上最详细)_不要关闭这个终端窗口,让它一直不断发送单词。然后,再打开一个终端,执行下面命令,

不要关闭这个终端窗口,让它一直不断发送单词。然后,再打开一个终端,执行下面命令,

       

  

很多企业为了支持决策分析而构建的数据仓库系统,其中存放的大量历史数据就是静态数据。技术人员可以利用数据挖掘和OLAP(On-Line Analytical Processing)分析工具从静态数据中找到对企业有价值的信息 

对静态数据和流数据的处理,对应着两种截然不同的计算模式:批量计算和实时计算     

 批量计算:充裕时间处理静态数据,如Hadoop 流数据不适合采用批量计算,因为流数据不适合用传统的关系模型建模 流数据必须采用实时计算,响应时间为秒级 数据量少时,不是问题,但是,在大数据时代,数据格式复杂、来源众多、数据量巨大,对实时计算提出了很大的挑战。因此,针对流数据的实时计算——流计算,应运而生.

                                            

流计算:实时获取来自不同数据源的海量数据,经过实时分析处理,获得有价值的信息

传统的数据处理流程,需要先采集数据并存储在关系数据库等数据管理系统中,之后由用户通过查询操作和数据管理系统进行交互

                                     

                                                                 

流计算的处理流程一般包含三个阶段:数据实时采集、数据实时计算、实时查询服务

                                           

                                                                           

 

数据实时采集阶段通常采集多个数据源的海量数据,需要保证实时性、低延迟与稳定可靠 以日志数据为例,由于分布式集群的广泛应用,数据分散存储在不同的机器上,因此需要实时汇总来自不同机器上的日志数据 目前有许多互联网公司发布的开源分布式日志采集系统均可满足每秒数百MB的数据采集和传输需求,

如: Facebook的Scribe LinkedIn的Kafka 淘宝的Time Tunnel 基于Hadoop的Chukwa和Flume

                                          

数据实时计算阶段对采集的数据进行实时的分析和计算,并反馈实时结果 经流处理系统处理后的数据,可视情况进行存储,以便之后再进行分析计算。在时效性要求较高的场景中,处理之后的数据也可以直接丢弃

                                                               

实时查询服务:经由流计算框架得出的结果可供用户进行实时查询、展示或储存 传统的数据处理流程,用户需要主动发出查询才能获得想要的结果。而在流处理流程中,实时查询服务可以不断更新结果,并将用户所需的结果实时推送给用户 虽然通过对传统的数据处理系统进行定时查询,也可以实现不断地更新结果和结果推送,但通过这样的方式获取的结果,仍然是根据过去某一时刻的数据得到的结果,与实时结果有着本质的区别

                                                  

7.2.1 Spark Streaming设计

7.2.2 Spark Streaming与Storm的对比

7.2.3 从“Hadoop+Storm”架构转向Spark架构 

                         

                                                      图 Spark Streaming支持的输入、输出数据源 

                 

            

                                                                           图  Spark Streaming执行流程 

         

                      

                                                                                               图 DStream操作示意图

                                     

                                                                图  采用Hadoop+Storm部署方式的一个案例

采用Spark架构具有如下优点: 实现一键式安装和配置、线程级别的任务监控和告警; 降低硬件集群、软件维护、任务监控和应用开发的难度; 便于做成统一的硬件、计算平台资源池。

                                   

                                   图   用Spark架构满足批处理和流处理需求

7.3.1 Spark Streaming工作机制

7.3.2 Spark Streaming程序的基本步骤

编写Spark Streaming程序的基本步骤是:

1.通过创建输入DStream来定义输入源

2.通过对DStream应用转换操作和输出操作来定义流计算

3.用streamingContext.start()来开始接收数据和处理流程

4.通过streamingContext.awaitTermination()方法来等待处理结束(手动结束或因为错误而结束)

5.可以通过streamingContext.stop()来手动结束流计算进程

7.3.3 创建StreamingContext对象 

如果要运行一个Spark Streaming程序,就需要首先生成一个StreamingContext对象,它是Spark Streaming程序的主入口

可以从一个SparkConf对象创建一个StreamingContext对象 登录Linux系统后,启动spark-shell。进入spark-shell以后,就已经获得了一个默认的SparkConext,也就是sc。因此,可以采用如下方式来创建StreamingContext对象:

如果是编写一个独立的Spark Streaming程序,而不是在spark-shell中运行,则需要通过如下方式创建StreamingContext对象:

7.4.1 文件流

1.在spark-shell中创建文件流

进入spark-shell创建文件流。请另外打开一个终端窗口,启动进入spark-shell

上面在spark-shell中执行的程序,一旦你输入ssc.start()以后,程序就开始自动进入循环监听状态,屏幕上会显示一堆的信息,如下:

                                                        

在“/usr/local/spark/mycode/streaming/logfile”目录下新建一个log.txt文件,就可以在监听窗口中显示词频统计结果

2. 采用独立应用程序方式创建文件流

用vim编辑器新建一个TestStreaming.scala代码文件,请在里面输入以下代码:

 $ cd /usr/local/spark/mycode/streaming

$ /usr/local/sbt/sbt package

打包成功以后,就可以输入以下命令启动这个程序:

7.4.2 套接字流

Spark Streaming可以通过Socket端口监听并接收数据,然后进行相应处理

                                                   

          

请在NetworkWordCount.scala文件中输入如下内容:

在相同目录下再新建另外一个代码文件StreamingExamples.scala,文件内容如下:

 

新打开一个窗口作为nc窗口,启动nc程序:

       

package org.apache.spark.examples.streaming

Import  java.io.{PrintWriter}

Import  java.net.ServerSocket

Import  scala.io.Source

在另外一个窗口启动监听程序:

启动成功后,你就会看到,屏幕上不断打印出词频统计信息 

7.4.3 RDD队列流

 

打包成功后,执行下面命令运行程序

 执行上面命令以后,程序就开始运行,就可以看到类似下面的结果:

                          

7.5.1 Kafka简介

在公司的大数据生态系统中,可以把Kafka作为数据交换枢纽,不同类型的分布式系统(关系数据库、NoSQL数据库、流处理系统、批处理系统等),可以统一接入到Kafka,实现和Hadoop各个组件之间的不同类型数据的实时高效交换

                                   

                                                                                           图 Kafka作为数据交换枢纽

Broker Kafka集群包含一个或多个服务器,这种服务器被称为broker

Topic 每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上,但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)

Partition Partition是物理上的概念,每个Topic包含一个或多个Partition.

Producer 负责发布消息到Kafka broker

Consumer 消息消费者,向Kafka broker读取消息的客户端。

Consumer Group 每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)

                           

7.5.2 Kafka准备工作

说明:本安装文件为Kafka_2.11-0.10.2.0.tgz,前面的2.11就是该Kafka所支持的Scala版本号,后面的0.10.2.0是Kafka自身的版本号

打开一个终端,输入下面命令启动Zookeeper服务:

千万不要关闭这个终端窗口,一旦关闭,Zookeeper服务就停止了

打开第二个终端,然后输入下面命令启动Kafka服务:

千万不要关闭这个终端窗口,一旦关闭,Kafka服务就停止了 

再打开第三个终端,然后输入下面命令创建一个自定义名称为“wordsendertest”的Topic:

7.5.3 Spark准备工作

下面用生产者(Producer)来产生一些数据,请在当前终端内继续输入下面命令:

上面命令执行后,就可以在当前终端内用键盘输入一些英文单词,比如可以输入:

现在可以启动一个消费者,来查看刚才生产者产生的数据。请另外打开第四个终端,输入下面命令:

可以看到,屏幕上会显示出如下结果,也就是刚才在另外一个终端里面输入的内容:

hello hadoop

hello spark 

Kafka和Flume等高级输入源,需要依赖独立的库(jar文件)

在spark-shell中执行下面import语句进行测试:

对于Spark2.1.0版本,如果要使用Kafka,则需要下载spark-streaming-kafka-0-8_2.11相关jar包,下载地址:

http://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-8_2.11/2.1.0

把jar文件复制到Spark目录的jars目录下

继续把Kafka安装目录的libs目录下的所有jar文件复制到“/usr/local/spark/jars/kafka”目录下,请在终端中执行下面命令:

执行如下命令启动spark-shell:

启动成功后,再次执行如下命令:

7.5.4 编写Spark Streaming程序使用Kafka数据源

1.编写生产者(producer)程序

在KafkaWordProducer.scala中输入以下代码:

package org.apache.spark.examples.streaming

import java.util.HashMap

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}

import org.apache.spark.SparkConf

import org.apache.spark.streaming._

import org.apache.spark.streaming.kafka._

继续在当前目录下创建KafkaWordCount.scala代码文件,它会把KafkaWordProducer发送过来的单词进行词频统计,代码内容如下:

继续在当前目录下创建StreamingExamples.scala代码文件,,用于设置log4j:

进行打包编译:

打开一个终端,执行如下命令,运行“KafkaWordProducer”程序,生成一些单词(是一堆整数形式的单词):

执行上面命令后,屏幕上会不断滚动出现新的单词,如下:

不要关闭这个终端窗口,就让它一直不断发送单词

请新打开一个终端,执行下面命令,运行KafkaWordCount程序,执行词频统计:

运行上面命令以后,就启动了词频统计功能,屏幕上就会显示如下类似信息:

7.6.1 DStream无状态转换操作

map(func) :对源DStream的每个元素,采用func函数进行转换,得到一个新的Dstream

flatMap(func): 与map相似,但是每个输入项可用被映射为0个或者多个输出项

filter(func): 返回一个新的DStream,仅包含源DStream中满足函数func的项

repartition(numPartitions): 通过创建更多或者更少的分区改变DStream的并行程度

reduce(func):利用函数func聚集源DStream中每个RDD的元素,返回一个包含单元素RDDs的新DStream

count():统计源DStream中每个RDD的元素数量

union(otherStream): 返回一个新的DStream,包含源DStream和其他DStream的元素

countByValue():应用于元素类型为K的DStream上,返回一个(K,V)键值对类型的新DStream,每个键的值是在原DStream的每个RDD中的出现次数

reduceByKey(func, [numTasks]):当在一个由(K,V)键值对组成的DStream上执行该操作时,返回一个新的由(K,V)键值对组成的DStream,每一个key的值均由给定的recuce函数(func)聚集起来

join(otherStream, [numTasks]):当应用于两个DStream(一个包含(K,V)键值对,一个包含(K,W)键值对),返回一个包含(K, (V, W))键值对的新Dstream

cogroup(otherStream, [numTasks]):当应用于两个DStream(一个包含(K,V)键值对,一个包含(K,W)键值对),返回一个包含(K, Seq[V], Seq[W])的元组

transform(func):通过对源DStream的每个RDD应用RDD-to-RDD函数,创建一个新的DStream。支持在新的DStream中做任何RDD操作

7.6.2 DStream有状态转换操作

无状态转换操作实例: 之前“套接字流”部分介绍的词频统计,就是采用无状态转换,每次统计,都是只统计当前批次到达的单词的词频,和之前批次无关,不会进行累计

1.滑动窗口转换操作

一些窗口转换操作的含义:

window(windowLength, slideInterval) 基于源DStream产生的窗口化的批数据,计算得到一个新的Dstream

countByWindow(windowLength, slideInterval) 返回流中元素的一个滑动窗口数

reduceByWindow(func, windowLength, slideInterval) 返回一个单元素流。利用函数func聚集滑动时间间隔的流的元素创建这个单元素流。函数func必须满足结合律,从而可以支持并行计算

reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) 应用到一个(K,V)键值对组成的DStream上时,会返回一个由(K,V)键值对组成的新的DStream。每一个key的值均由给定的reduce函数(func函数)进行聚合计算。注意:在默认情况下,这个算子利用了Spark默认的并发任务数去分组。可以通过numTasks参数的设置来指定不同的任务数

一些窗口转换操作的含义:

reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]) 更加高效的reduceByKeyAndWindow,每个窗口的reduce值,是基于先前窗口的reduce值进行增量计算得到的;它会对进入滑动窗口的新数据进行reduce操作,并对离开窗口的老数据进行“逆向reduce”操作。但是,只能用于“可逆reduce函数”,即那些reduce函数都有一个对应的“逆向reduce函数”(以InvFunc参数传入)

countByValueAndWindow(windowLength, slideInterval, [numTasks]) 当应用到一个(K,V)键值对组成的DStream上,返回一个由(K,V)键值对组成的新的DStream。每个key的值都是它们在滑动窗口中出现的频率

窗口转换操作实例:

在上一节的“Apache Kafka作为DStream数据源”内容中,已经使用了窗口转换操作,在KafkaWordCount.scala代码中,可以找到下面这一行:

val wordCounts = pair.reduceByKeyAndWindow(_ + _,_ - _,Minutes(2),Seconds(10),2)

                    

                                                    图 reduceByKeyAndWindow操作过程示意图

2.updateStateByKey操作

需要在跨批次之间维护状态时,就必须使用updateStateByKey操作

词频统计实例:

对于有状态转换操作而言,本批次的词频统计,会在之前批次的词频统计结果的基础上进行不断累加,所以,最终统计得到的词频,是所有批次的单词的总的词频统计结果

新建一个NetworkWordCountStateful.scala代码文件,输入以下代码:

新建一个StreamingExamples.scala文件,用于设置log4j日志级别,代码如下:

输入以下命令启动这个程序:

执行上面命令后,就进入了监听状态(称为监听窗口) 

新打开一个窗口作为nc窗口,启动nc程序:

切换到刚才的监听窗口,会发现,已经输出了词频统计信息:

在Spark应用中,外部系统经常需要使用到Spark DStream处理后的数据,因此,需要采用输出操作把DStream的数据输出到数据库或者文件系统中

7.7.1 把DStream输出到文本文件中

7.7.2 把DStream写入到MySQL数据库中

请在NetworkWordCountStateful.scala代码文件中输入以下内容:

sbt打包编译后,使用如下命令运行程序:

 程序运行以后,屏幕上就会显示类似下面的程序运行信息:

打开另外一个终端,作为单词产生的源头,提供给NetworkWordCountStateful程序进行词频统计:

运行NetworkWordCountStateful程序的监听窗口,就可以看到类似下面的词频统计结果:

这些词频结果被成功地输出到“/usr/local/spark/mycode/streaming/dstreamoutput/output.txt”文件中

可以发现,在这个目录下,生成了很多文本文件,如下:

output.txt的命名看起来像一个文件,但是,实际上,spark会生成名称为output.txt的目录,而不是文件

启动MySQL数据库,并完成数据库和表的创建:

在此前已经创建好的“spark”数据库中创建一个名称为“wordcount”的表:

在NetworkWordCountStateful.scala文件中加入下面代码:

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小丑西瓜9/article/detail/547756
推荐阅读
相关标签
  

闽ICP备14008679号