当前位置:   article > 正文

Structured Streaming 案例初体验_实验七-structured streaming编程初级实践

实验七-structured streaming编程初级实践

Structured Streaming程序基本步骤

编写Structured Streaming程序的基本步骤是:
1.创建SparkSession实例;
2.创建DataFrame表示从数据源输入的每一行数据;
3.DataFrame转换,类似于RDD转换操作;
4.创建StreamingQuery开启流查询;
5.调用StreamingQuery.awaitTermination()方法,等待流查询结束。

Structured Streaming操作示例

这里还是用统计单词个数的示例,来展示如何进行Structured Streaming操作;

获取SparkSession

要操作Structured Streaming,首先要获取SparkSession实例,我们可以如下创建一个SparkSession;登录Linux系统,启动spark-shell。然后输入一下代码:

scala> import org.apache.spark.sql.functions._
scala> import org.apache.spark.sql.SparkSession
val spark = SparkSession.
     | builder.
     | appName("StructuredNetworkWordCount").
     | getOrCreate()
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@1d1bf7bf
 
scala> import spark.implicits._

首先引入两个包,接着实例化一个SparkSession对象;appName方法设置spark应用名称,getOrCreate方法表示如果已存在一个SparkSession,则直接用现成的SparkSession,否则创建一个SparkSession,其实spark就是已经存在的SparkSession,因为每次开启spark-shell时,就默认开启了一个SparkSession;千万别忘了引入spark.implicits._包,否则会报错。

创建DataFrame

创建好SparkSession后,即可用SparkSession.readStream方法创建DataFrame,在spark-shell输入一下代码:

scala> val lines = spark.readStream.
     | format("socket").
     | option("host","localhost").
     | option("port",9999).
     | load()
lines: org.apache.spark.sql.DataFrame = [value: string]

SparkSession.readStream方法返回一个DataStreamReader实例,接着DataStreamReader.format设置数据源为网络套接字,这里还可以设置从文件获取数据;然后调用两次DataStreamReader.option分别设置套接字的主机和端口;最后调用DataStreamReader.load方法,返回一个DataFrame实例;

此时,lines可以理解为之前Spark2.0入门:Structured Streaming简介提到的”unbound table”,实时到来的数据,即添加到这个”unbound table”;由输出可以看出,这个DataFrame每行包含一个字符串;

DataFrame转换

获取lines这个DataFrame之后,接下来就要处理这个DataFrame;类似于RDD转换操作,DataFrame也是通过转换成新的DataFrame来处理数据;因此由之前lines获取到每行数据之后,接下来要进行分隔并进行统计;

scala> val words = lines.as[String].flatMap(_.split(" "))
words: org.apache.spark.sql.Dataset[String] = [value: string]
 
scala> val wordCounts = words.groupBy("value").count()
wordCounts: org.apache.spark.sql.DataFrame = [value: string, count: bigint]

lines.as[String]将DataFrame转换成DataSet,其实DataFrame只是DataSet的特例,在官方的API文档上有如下声明

type DataFrame = Dataset[Row]

接着调用flatMap将一行实时数据按空格切割成单词集合;groupBy方法表示按”value”这个属性合并,count方法表示统计出每个单词出现的次数;此时的wordCounts又转换为DataFrame,且这个DataFrame有两个属性,value和count,即每次处理完一行实时数据时,都会输出单词和该单词出现的次数;

执行流查询

如果DataFrame转换操作定义结束,接下来即可开启流查询,在spark-shell输入如下:

scala> val query = wordCounts.writeStream.
     | outputMode("complete").
     | format("console").
     | start()
query: org.apache.spark.sql.streaming.StreamingQuery = Streaming Query - query-0 [state = ACTIVE]
 
scala> query.awaitTermination()

wordCounts.writeStream返回一个 DataStreamWriter实例,该实例定义了将实时流查询产生结果输出到外部存储的接口;outputMode设置了’complete’模式,即每次都输出全部结果数据;format定义输出媒介,这里为控制台;最后调用start方法开启流查询并返回一个StreamingQuery实例;

最后调用StreamingQuery.awaitTermination等待查询结束;

流查询结果:

在开启查询之前,需要先在另外一个终端上开启一个NetCat简单服务程序,用于向spark流查询产生数据;在终端输入如下即可:

nc -lk 9999

表示监听本地9999端口,这样在该终端上输入数据,即可传输给spark流查询;我们先输入简单字符串”hello spark”:

  1. hadoop@charles-Aspire-4741:/usr/local/spark$ nc -lk 9999
  2. hello spark

即可在之前开启spark流查询的端口看到如下结果:

  1. scala> query.awaitTermination()
  2. -------------------------------------------
  3. Batch: 0
  4. -------------------------------------------
  5. +-----+-----+
  6. |value|count|
  7. +-----+-----+
  8. |hello| 1|
  9. |spark| 1|
  10. +-----+-----+

我们就得到了每个单词以及次数;当我们在NetCat终端再输入一行字符串”hello apache”,得到如下结果:

  1. scala> query.awaitTermination()
  2. -------------------------------------------
  3. Batch: 0
  4. -------------------------------------------
  5. +-----+-----+
  6. |value|count|
  7. +-----+-----+
  8. |hello| 1|
  9. |spark| 1|
  10. +-----+-----+
  11. -------------------------------------------
  12. Batch: 1
  13. -------------------------------------------
  14. +------+-----+
  15. | value|count|
  16. +------+-----+
  17. | hello| 2|
  18. |apache| 1|
  19. | spark| 1|
  20. +------+-----+

我们可以看到,每处理一行数据,即输出一个结果;因此当数据源传来源源不断的实时数据时,Structured streaming可以按固定时间间隔读取若干行数据,并执行流查询,输出结果;因为这个示例程序用的是complete模式,因此每次都将结果全部输出。当然也可以设置成append(增量)模式,这样每次输出即为新增的结果行;

编写独立应用

之前是在spark-shell中一行一行的执行代码,现在我们可以把程序写成单独一个scala文件,然后提交给spark执行;首先打开一个终端,输入以下命令:

cd /usr/local/spark/mycode
mkdir streaming
cd streaming
mkdir -p src/main/scala
cd src/main/scala
vim TestStructuredStreaming.scala

这样就用vim打开一个scala文件,在该文件输入以下内容:

  1. import org.apache.spark.sql.functions._
  2. import org.apache.spark.sql.SparkSession
  3. object WordCountStructuredStreaming{
  4. def main(args: Array[String]){
  5. val spark = SparkSession.builder.appName("StructuredNetworkWordCount").getOrCreate()
  6. import spark.implicits._
  7. val lines = spark.readStream.format("socket").option("host","localhost").option("port",9999).load()
  8. val words = lines.as[String].flatMap(_.split(" "))
  9. val wordCounts = words.groupBy("value").count()
  10. val query = wordCounts.writeStream.outputMode("complete").format("console").start()
  11. query.awaitTermination()
  12. }
  13. }

代码写好之后,退出终端,然后在/usr/lcoal/spark/mycode/streaming目录下创建simple.sbt文件:

cd /usr/local/spark/mycode/streaming
vim simple.sbt # 注意,是simple.sbt,不是simple.txt

打开vim编辑器以后,输入以下内容:

  1. name := "Simple Project"
  2. version := "1.0"
  3. scalaVersion := "2.11.8"
  4. libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.0.0"

然后就可以执行sbt打包编译了:

cd /usr/local/spark/mycode/streaming
/usr/local/sbt/sbt package

打包成功以后,就可以输入以下命令启动这个程序:

cd /usr/local/spark/mycode/streaming
/usr/local/spark/bin/spark-submit --class "WordCountStructuredStreaming" ./target/scala-2.11/simple-project_2.11-1.0.jar 

执行上输出程序之后,就开启了监听状态,当我们在NetCat终端输入”hello world”之后,spark应用终端即可输出”hello”和”world”单词出现的次数,和spark-shell输出一致;

以上就是Structured Streaming操作网络流所有内容.

声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号