当前位置:   article > 正文

基于Hadoop的云计算与大数据处理(Spark Streaming WordCount)

基于Hadoop的云计算与大数据处理(Spark Streaming WordCount)
实验目的

1.了解Spark Streaming的框架结构

2.准确理解Spark Streaming的实现原理

3.熟练掌握Spark Streaming进行WordCount的实验流程

实验原理

Spark是一个类似于MapReduce的分布式计算框架,其核心是弹性分布式数据集,提供了比MapReduce更丰富的模型,可以快速在内存中对数据集进行多次迭代,以支持复杂的数据挖掘算法和图形计算算法。Spark Streaming是一种构建在Spark上的实时计算框架,它扩展了Spark处理大规模流式数据的能力。

1.Spark Streaming的优势

(1)能运行在100个以上的结点上,并达到秒级延迟;

(2)使用基于内存的Spark作为执行引擎,具有高效和容错的特性;

(3)能集成Spark的批处理和交互查询;

(4)为实现复杂的算法提供了与批处理类似的简单接口。

2.基于Spark on Yarn的Spark Streaming总体架构如下图所示:

Spark on Yarn启动后,由Spark AppMaster把Receiver作为一个Task提交给某一个Spark Executor;Receive启动后输入数据,生成数据块,然后通知Spark AppMaster;Spark AppMaster会根据数据块生成相应的Job,并把Job的Task提交给空闲Spark Executor 执行。图中粗箭头显示被处理的数据流,输入数据流可以是磁盘、网络和HDFS等,输出可以是HDFS,数据库等。

3.Spark Streaming的基本原理

将输入数据流以时间片(秒级)为单位进行拆分,然后以类似批处理的方式处理每个时间片数据,其基本原理如下图所示。

首先,Spark Streaming把实时输入数据流以时间片Δt (如1秒)为单位切分成块。Spark Streaming会把每块数据作为一个RDD,并使用RDD操作处理每一小块数据。每个块都会生成一个Spark Job处理,最终结果也返回多块。

4.Spark Streaming内部实现原理

使用Spark Streaming编写的程序与编写Spark程序非常相似,在Spark程序中,主要通过操作RDD(Resilient Distributed Datasets弹性分布式数据集)提供的接口,如map、reduce、filter等,实现数据的批处理。而在Spark Streaming中,则通过操作DStream(表示数据流的RDD序列)提供的接口,这些接口和RDD提供的接口类似。

实验环境

Linux Ubuntu 16.04

jdk-7u75-linux-x64

scala-2.10.5

spark-1.6.0-bin-hadoop2.6

hadoop-2.6.0-cdh5.4.5

hadoop-2.6.0-eclipse-cdh5.4.5.jar

eclipse-java-juno-SR2-linux-gtk-x86_64

实验内容

下图为项目的流程图,通过nc命令,向9999端口持续发送消息,并使用spark streaming对从9999端口发来的数据进行统计,将统计的结果输出到console界面上。

实验步骤

1.使用jps查看HDFS以及Spark是否已经启动,若未启动,则切换对应目录下,启动Hadoop及Spark。

view plain copy

  1. jps  
  2. cd /apps/hadoop/sbin  
  3. ./start-dfs.sh  

view plain copy

  1. cd /apps/spark/sbin  
  2. ./start-all.sh  

2.使用nc命令,向9999端口,发送数据。

view plain copy

  1. nc -lk 9999  

nc 为netcat,一般多用于在局域网之间传输文件。在执行nc -lk 9999后,界面会进入持续等待输入内容状态。我们可以随便输入一些文字,来作为输入,发给9999端口。

3.打开一个新的连接窗口,并切换目录到/apps/spark目录下,调用spark的example中,自带的wordcount程序。

view plain copy

  1. cd /apps/spark  
  2. bin/run-example org.apache.spark.examples.streaming.JavaNetworkWordCount localhost 9999  

这里,可以调用example中,使用java程序编写的wordcount。在这里localhost和9999为spark steaming接收数据的主机和端口。在此作为参数,传递给wordcount程序。

可以看到wordcount程序,一直等待接受数据。

4.在执行nc -lk 9999的界面中,输入一些文本,并按回车。

view plain copy

  1. hello spark streaming hello hadoop  

再次切换到执行wordcount程序的终端界面,可以看到spark streaming对输入的数据,进行统计,得到单词个数如下:

此程序可以表明,Spark Streaming数据处理流程。

在两个终端界面中,输入CTRL+C,可以终止程序的运行。

5.使用Spark Streaming的Scala API编写wordcount程序,以实现对单词个数的统计。

首先在Linux本地,新建/data/spark7目录,用于存放所需文件。

view plain copy

  1. mkdir -p /data/spark7  

切换目录到/data/spark7下,使用wget命令,下载项目所需jar包spark-assembly-1.6.0-hadoop2.6.0.jar。

view plain copy

  1. cd /data/spark7  
  2. wget http://172.16.103.12:60000/allfiles/spark7/spark-assembly-1.6.0-hadoop2.6.0.jar  

创建一个Scala项目,命名为spark7。

在spark7项目下创建包,包名为my.sparkstreaming。

在my.sparkstreaming包下创建Scala Object,名为NetworkWordCount。

6.右键单击项目,新建一个目录,名为spark7lib,用于存放项目所需的jar包

将/data/spark7目录下的spark-assembly-1.6.0-hadoop2.6.0.jar包,拷贝到eclipse中spark7项目的spark7lib目录下

选中spark7lib目录下所有jar包,单击右键,选择Build Path→Add to Build Path。

7.编写sparkstreaming的wordcount代码。

view plain copy

  1. if (args.length < 2) {  
  2.    System.err.println("Usage: NetworkWordCount <hostname> <port>")  
  3.  System.exit(1)  
  4.  }  

首先传递两个参数,第一个参数为发送数据的地址,第二个参数为发送数据的端口号。

view plain copy

  1. val sparkConf = new SparkConf().setAppName("networkwordcount").setMaster("spark://localhost:7077")  

创建一个sparkconf对象。

view plain copy

  1. val ssc = new StreamingContext(sparkConf, Seconds(1) )  

设置监听数据的时间窗口为1分钟。

view plain copy

  1. val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)  

Spark接受数据,并对数据进行存储。

view plain copy

  1. val words = lines.flatMap(_.split(" "))  
  2.     val wordCounts = words.map( wd => (wd, 1)).reduceByKey( _ + _ )  
  3.     wordCounts.print();  

对接收到数据,放置在一行,并对数据以空格分隔。进行map和reduce操作。得到每个单词出现的个数,并进行打印输出。

view plain copy

  1. ssc.start();  
  2. ssc.awaitTermination();  

最后这两行代码,是开始执行,前面所定义的Spark Streaming的任务。

完整代码如下:

view plain copy

  1. package my.sparkstreaming  
  2. import org.apache.spark.SparkConf  
  3. import org.apache.spark.streaming.StreamingContext  
  4. import org.apache.spark.streaming.Seconds  
  5. import org.apache.spark.storage.StorageLevel  
  6. object NetworkWordCount {  
  7.   def main(args: Array[String]) {  
  8.     if (args.length < 2) {  
  9.       System.err.println("Usage: NetworkWordCount <hostname> <port>")  
  10.     System.exit(1)  
  11.     }  
  12.   
  13.     val sparkConf = new SparkConf().setAppName("networkwordcount").setMaster("spark://localhost:7077")  
  14.     val ssc = new StreamingContext(sparkConf, Seconds(1) )  
  15.     val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)  
  16.     val words = lines.flatMap(_.split(" "))  
  17.     val wordCounts = words.map( wd => (wd, 1)).reduceByKey( _ + _ )  
  18.     wordCounts.print();  
  19.     ssc.start();  
  20.     ssc.awaitTermination();  
  21.   
  22.     }  
  23.     }  

8.切换到Linux本地,执行nc命令,发送数据。

view plain copy

  1. nc -lk 9999  

使用Spark Streaming处理发来的数据。

在NetworkWordCount.scala类上,单击右键选择Run As=>Run Configurations=>Arguments

在Program arguments后面的文本框中,输入执行nc命令发送数据的ip和端口。

view plain copy

  1. localhost  9999  

然后点击Main,进入下面界面,查看项目名和主类名是否与程序的项目名和主类名对应,

若不对应,则在Project下面的文本框,输入本程序的项目名为

view plain copy

  1. spark7  

在Main class下面的文本框中,输入本程序的包名.类名

view plain copy

  1. my.sparkstreaming.NetworkWordCount  

点击Run 执行。

9.在nc窗口,输入数据"hello world hello hadoop"。

view plain copy

  1. hello word hello hadoop  

可以看到程序的console界面,输出为:

实验结论及心得

1. 了解了Spark Streaming的框架结构,包括Spark Streaming的总体架构和内部实现原理。

2. 对Spark Streaming的实现原理有了准确的理解,了解了将输入数据流以时间片为单位拆分并使用RDD操作处理的基本原理。

3. 熟练掌握了使用Spark Streaming进行WordCount的实验流程。

4. Spark Streaming是一种强大的实时计算框架,通过扩展Spark的能力,可以快速处理大规模流式数据。

5. Spark Streaming的优势在于可以运行在大规模集群上并实现秒级延迟,同时结合了Spark的高效和容错特性。

6. 理解Spark Streaming的框架结构和实现原理对于正确使用和优化Spark Streaming应用程序非常重要。

7. 在实验中,学会了使用Spark Streaming进行WordCount实验的流程,这是Spark Streaming的入门应用,也是理解Spark Streaming基本原理的一种方式。

通过这次实验,对Spark Streaming有了更深入的了解,并且掌握了相关操作的基本流程。这将有助于在实际应用中更好地使用Spark Streaming进行实时计算和数据处理任务。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/514163
推荐阅读
相关标签
  

闽ICP备14008679号