赞
踩
编写Structured Streaming程序的基本步骤是:
1.创建SparkSession实例;
2.创建DataFrame表示从数据源输入的每一行数据;
3.DataFrame转换,类似于RDD转换操作;
4.创建StreamingQuery开启流查询;
5.调用StreamingQuery.awaitTermination()方法,等待流查询结束。
这里还是用统计单词个数的示例,来展示如何进行Structured Streaming操作;
获取SparkSession
要操作Structured Streaming,首先要获取SparkSession实例,我们可以如下创建一个SparkSession;登录Linux系统,启动spark-shell。然后输入一下代码:
scala> import org.apache.spark.sql.functions._ scala> import org.apache.spark.sql.SparkSession val spark = SparkSession. | builder. | appName("StructuredNetworkWordCount"). | getOrCreate() spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@1d1bf7bf scala> import spark.implicits._
首先引入两个包,接着实例化一个SparkSession对象;appName方法设置spark应用名称,getOrCreate方法表示如果已存在一个SparkSession,则直接用现成的SparkSession,否则创建一个SparkSession,其实spark就是已经存在的SparkSession,因为每次开启spark-shell时,就默认开启了一个SparkSession;千万别忘了引入spark.implicits._包,否则会报错。
创建DataFrame
创建好SparkSession后,即可用SparkSession.readStream方法创建DataFrame,在spark-shell输入一下代码:
scala> val lines = spark.readStream. | format("socket"). | option("host","localhost"). | option("port",9999). | load() lines: org.apache.spark.sql.DataFrame = [value: string]
SparkSession.readStream方法返回一个DataStreamReader实例,接着DataStreamReader.format设置数据源为网络套接字,这里还可以设置从文件获取数据;然后调用两次DataStreamReader.option分别设置套接字的主机和端口;最后调用DataStreamReader.load方法,返回一个DataFrame实例;
此时,lines可以理解为之前Spark2.0入门:Structured Streaming简介提到的”unbound table”,实时到来的数据,即添加到这个”unbound table”;由输出可以看出,这个DataFrame每行包含一个字符串;
DataFrame转换
获取lines这个DataFrame之后,接下来就要处理这个DataFrame;类似于RDD转换操作,DataFrame也是通过转换成新的DataFrame来处理数据;因此由之前lines获取到每行数据之后,接下来要进行分隔并进行统计;
scala> val words = lines.as[String].flatMap(_.split(" ")) words: org.apache.spark.sql.Dataset[String] = [value: string] scala> val wordCounts = words.groupBy("value").count() wordCounts: org.apache.spark.sql.DataFrame = [value: string, count: bigint]
lines.as[String]将DataFrame转换成DataSet,其实DataFrame只是DataSet的特例,在官方的API文档上有如下声明
type DataFrame = Dataset[Row]
接着调用flatMap将一行实时数据按空格切割成单词集合;groupBy方法表示按”value”这个属性合并,count方法表示统计出每个单词出现的次数;此时的wordCounts又转换为DataFrame,且这个DataFrame有两个属性,value和count,即每次处理完一行实时数据时,都会输出单词和该单词出现的次数;
执行流查询
如果DataFrame转换操作定义结束,接下来即可开启流查询,在spark-shell输入如下:
scala> val query = wordCounts.writeStream. | outputMode("complete"). | format("console"). | start() query: org.apache.spark.sql.streaming.StreamingQuery = Streaming Query - query-0 [state = ACTIVE] scala> query.awaitTermination()
wordCounts.writeStream返回一个 DataStreamWriter实例,该实例定义了将实时流查询产生结果输出到外部存储的接口;outputMode设置了’complete’模式,即每次都输出全部结果数据;format定义输出媒介,这里为控制台;最后调用start方法开启流查询并返回一个StreamingQuery实例;
最后调用StreamingQuery.awaitTermination等待查询结束;
流查询结果:
在开启查询之前,需要先在另外一个终端上开启一个NetCat简单服务程序,用于向spark流查询产生数据;在终端输入如下即可:
nc -lk 9999
表示监听本地9999端口,这样在该终端上输入数据,即可传输给spark流查询;我们先输入简单字符串”hello spark”:
- hadoop@charles-Aspire-4741:/usr/local/spark$ nc -lk 9999
- hello spark
即可在之前开启spark流查询的端口看到如下结果:
- scala> query.awaitTermination()
- -------------------------------------------
- Batch: 0
- -------------------------------------------
- +-----+-----+
- |value|count|
- +-----+-----+
- |hello| 1|
- |spark| 1|
- +-----+-----+
我们就得到了每个单词以及次数;当我们在NetCat终端再输入一行字符串”hello apache”,得到如下结果:
- scala> query.awaitTermination()
- -------------------------------------------
- Batch: 0
- -------------------------------------------
- +-----+-----+
- |value|count|
- +-----+-----+
- |hello| 1|
- |spark| 1|
- +-----+-----+
-
- -------------------------------------------
- Batch: 1
- -------------------------------------------
- +------+-----+
- | value|count|
- +------+-----+
- | hello| 2|
- |apache| 1|
- | spark| 1|
- +------+-----+
我们可以看到,每处理一行数据,即输出一个结果;因此当数据源传来源源不断的实时数据时,Structured streaming可以按固定时间间隔读取若干行数据,并执行流查询,输出结果;因为这个示例程序用的是complete模式,因此每次都将结果全部输出。当然也可以设置成append(增量)模式,这样每次输出即为新增的结果行;
编写独立应用
之前是在spark-shell中一行一行的执行代码,现在我们可以把程序写成单独一个scala文件,然后提交给spark执行;首先打开一个终端,输入以下命令:
cd /usr/local/spark/mycode mkdir streaming cd streaming mkdir -p src/main/scala cd src/main/scala vim TestStructuredStreaming.scala
这样就用vim打开一个scala文件,在该文件输入以下内容:
- import org.apache.spark.sql.functions._
- import org.apache.spark.sql.SparkSession
- object WordCountStructuredStreaming{
- def main(args: Array[String]){
- val spark = SparkSession.builder.appName("StructuredNetworkWordCount").getOrCreate()
- import spark.implicits._
- val lines = spark.readStream.format("socket").option("host","localhost").option("port",9999).load()
- val words = lines.as[String].flatMap(_.split(" "))
- val wordCounts = words.groupBy("value").count()
- val query = wordCounts.writeStream.outputMode("complete").format("console").start()
- query.awaitTermination()
- }
- }
代码写好之后,退出终端,然后在/usr/lcoal/spark/mycode/streaming目录下创建simple.sbt文件:
cd /usr/local/spark/mycode/streaming vim simple.sbt # 注意,是simple.sbt,不是simple.txt
打开vim编辑器以后,输入以下内容:
- name := "Simple Project"
- version := "1.0"
- scalaVersion := "2.11.8"
- libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.0.0"
然后就可以执行sbt打包编译了:
cd /usr/local/spark/mycode/streaming /usr/local/sbt/sbt package
打包成功以后,就可以输入以下命令启动这个程序:
cd /usr/local/spark/mycode/streaming /usr/local/spark/bin/spark-submit --class "WordCountStructuredStreaming" ./target/scala-2.11/simple-project_2.11-1.0.jar
执行上输出程序之后,就开启了监听状态,当我们在NetCat终端输入”hello world”之后,spark应用终端即可输出”hello”和”world”单词出现的次数,和spark-shell输出一致;
以上就是Structured Streaming操作网络流所有内容.
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。