赞
踩
文章目录
在Spark2.0版本之后,DataFrame和Dataset可以表示静态有边界的数据,也可以表示无边界的流式数据。在Structured Streaming中我们可以使用SparkSession针对流式数据源创建对应的Dataset或者DataFrame,并可以像处理批数据一样使用各种Operators操作处理流式数据。
Structured Streaming的数据源目前支持File Source 、Socket Source 、Rate Source、Kafka Source ,与Kafka的整合在后续整理,这里对其他三种数据源分别演示。
Sturctured Streaming可以读取写入目录的文件作为数据流,文件将按照文件修改时间的顺序进行处理,文件必须原子性的存入到监控目录中,支持的格式有text、csv、json、orc、parquet。
Scala代码如下:
- package com.lanson.structuredStreaming.source
-
- import org.apache.spark.sql.streaming.StreamingQuery
- import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
-
- /**
- * Structured Streaming监控目录 text格式数据
- */
- object SSReadTextData {
- def main(args: Array[String]): Unit = {
-
- //1.创建对象
- val spark: SparkSession = SparkSession.builder().master("local")
- .appName("SSReadTextData")
- .config("spark.sql.shuffle.partitions", 1)
- .getOrCreate()
-
- import spark.implicits._
-
- spark.sparkContext.setLogLevel("Error")
-
- //2.监控目录
- val ds: Dataset[String] = spark.readStream.textFile("./data/")
-
- val result: DataFrame = ds.map(line => {
- val arr: Array[String] = line.split("-")
- (arr(0).toInt, arr(1), arr(2).toInt)
- }).toDF("id", "name", "age")
-
- val query: StreamingQuery = result.writeStream
- .format("console")
- .start()
-
- query.awaitTermination()
-
- }
-
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
结果:
Java代码如下:
- package com.lanson.structuredStreaming.source;
-
- import java.util.concurrent.TimeoutException;
- import org.apache.spark.api.java.function.MapFunction;
- import org.apache.spark.sql.Dataset;
- import org.apache.spark.sql.Encoders;
- import org.apache.spark.sql.Row;
- import org.apache.spark.sql.SparkSession;
- import org.apache.spark.sql.streaming.StreamingQueryException;
- import scala.Tuple3;
-
- public class SSReadTextData01 {
- public static void main(String[] args) throws TimeoutException, StreamingQueryException {
- //1.创建对象
- SparkSession spark = SparkSession.builder().master("local")
- .appName("SSReadSocketData01")
- .config("spark.sql.shuffle.partitions", 1)
- .getOrCreate();
-
- spark.sparkContext().setLogLevel("Error");
-
- Dataset<String> ds = spark.readStream().textFile("./data/");
-
- Dataset<Tuple3<Integer, String, Integer>> ds2 = ds.map(new MapFunction<String, Tuple3<Integer, String, Integer>>() {
- @Override
- public Tuple3<Integer, String, Integer> call(String line) throws Exception {
- String[] arr = line.split("-");
-
- return new Tuple3<>(Integer.valueOf(arr[0]), arr[1],Integer.valueOf(arr[2]) );
- }
- }, Encoders.tuple(Encoders.INT(), Encoders.STRING(), Encoders.INT()));
-
- Dataset<Row> result = ds2.toDF("id", "name", "age");
-
- result.writeStream()
- .format("console")
- .start()
- .awaitTermination();
-
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
结果:
以上代码编写完成之后,向监控的目录“./data”中不断写入含有以下内容的文件,可以看到控制台有对应的流数据输出,这里一定是原子性的将文件复制到对应目录下。文件内容如下:
- 1-zhangsan-18
- 2-lisi-19
- 3-ww-20
Scala代码如下:
- package com.lanson.structuredStreaming.source
-
- import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
- import org.apache.spark.sql.streaming.StreamingQuery
- import org.apache.spark.sql.types.StructType
-
- /**
- * Structured Streaming 读取CSV数据
- */
- object SSReadCsvData {
- def main(args: Array[String]): Unit = {
- //1.创建对象
- val spark: SparkSession = SparkSession.builder().master("local")
- .appName("SSReadCsvData")
- .config("spark.sql.shuffle.partitions", 1)
- .getOrCreate()
-
- import spark.implicits._
-
- spark.sparkContext.setLogLevel("Error")
-
- //2.创建CSV数据schema
- val userSchema: StructType = new StructType().add("id", "integer")
- .add("name", "string")
- .add("gender", "string")
- .add("age", "integer")
-
-
- val result: DataFrame = spark.readStream
- .option("sep", ",")
- .schema(userSchema)
- .csv("./data/")
-
- val query: StreamingQuery = result.writeStream
- .format("console")
- .start()
-
- query.awaitTermination()
-
- }
-
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
结果:
Java代码如下
- package com.lanson.structuredStreaming.source;
-
- import java.util.concurrent.TimeoutException;
- import org.apache.spark.sql.Dataset;
- import org.apache.spark.sql.Row;
- import org.apache.spark.sql.SparkSession;
- import org.apache.spark.sql.streaming.StreamingQueryException;
- import org.apache.spark.sql.types.StructType;
-
- /**
- * Structured Streaming 读取CSV数据
- */
-
- public class SSReadCsvData01 {
- public static void main(String[] args) throws TimeoutException, StreamingQueryException {
- //1.创建对象
- SparkSession spark = SparkSession.builder().master("local")
- .appName("SSReadCsvData")
- .config("spark.sql.shuffle.partitions", 1)
- .getOrCreate();
-
- spark.sparkContext().setLogLevel("Error");
-
- StructType userSchema = new StructType()
- .add("id", "integer")
- .add("name", "string")
- .add("gender", "string")
- .add("age", "integer");
- Dataset<Row> result = spark.readStream()
- .option("sep", ",")
- .schema(userSchema)
- .csv("./data/");
-
- result.writeStream()
- .format("console")
- .start()
- .awaitTermination();
-
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
结果:
以上代码运行之后向对应监控的目录下原子性写入含有数据的csv文件,在控制台可以看到实时监控内容。文件内容如下:
- 1,zhangsan,一班,100
- 2,lisi,二班,200
- 3,wangwu,一班,300
- 4,maliu,二班,100
- 5,tianqi,三班,100
- 6,gaoba,三班,50
- 7,zs2,四班,50
Scala代码如下:
- package com.lanson.structuredStreaming.source
-
- import org.apache.spark.sql.{DataFrame, SparkSession}
- import org.apache.spark.sql.streaming.StreamingQuery
- import org.apache.spark.sql.types.StructType
-
- /**
- * Structured Streaming 监控Json格式数据
- */
- object SSReadJsonData {
- def main(args: Array[String]): Unit = {
- //1.创建对象
- val spark: SparkSession = SparkSession.builder().master("local")
- .appName("SSReadCsvData")
- .config("spark.sql.shuffle.partitions", 1)
- .getOrCreate()
-
- import spark.implicits._
-
- spark.sparkContext.setLogLevel("Error")
-
- //2.创建 json 数据schema
- val userSchema: StructType = new StructType().add("id", "integer")
- .add("name", "string")
- .add("age", "integer")
-
-
-
- val result: DataFrame = spark.readStream
- .schema(userSchema)
- .json("./data/")
-
- val query: StreamingQuery = result.writeStream
- .format("console")
- .start()
-
- query.awaitTermination()
-
- }
-
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
结果:
Java代码如下
- package com.lanson.structuredStreaming.source;
-
-
- import org.apache.spark.sql.Dataset;
- import org.apache.spark.sql.Row;
- import org.apache.spark.sql.SparkSession;
- import org.apache.spark.sql.streaming.StreamingQuery;
- import org.apache.spark.sql.streaming.StreamingQueryException;
- import org.apache.spark.sql.types.StructType;
- import java.util.concurrent.TimeoutException;
-
- /**
- * Structured Streaming实时监控目录中json文件作为数据流
- */
- public class SSReadJsonData01 {
- public static void main(String[] args) throws TimeoutException, StreamingQueryException {
- //1.创建对象
- SparkSession spark = SparkSession.builder().appName("File Source test")
- .master("local")
- .getOrCreate();
-
- //2.设置日志
- spark.sparkContext().setLogLevel("Error");
-
- //3.设置Schema
- StructType userSchema = new StructType().add("id", "integer")
- .add("name", "string")
- .add("age", "integer");
-
- //4.指定监控目录读取数据json数据
- Dataset<Row> ds = spark.readStream()
- .option("sep", ",")
- .schema(userSchema)
- .json("./data/");
-
- //5.打印数据到控制台
- StreamingQuery query =ds.writeStream()
- .format("console")
- .start();
-
- query.awaitTermination();
-
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
结果:
以上代码启动之后,向监控的目录“./data”下原子写入含有以下内容的json文件,在控制台可以看到实时监控内容。json文件内容如下:
- {"id":1,"name":"zs","age":18}
- {"id":2,"name":"ls","age":19}
- {"id":3,"name":"ww","age":20}
- {"id":4,"name":"ml","age":21}
注意:实时监控json格式数据时,创建的Schema 中的字段需要与Json中的属性保持一致,否则在映射成表时,Schema中含有但在Json中没有的属性的字段对应的数据会为null。
读取Socket方式需要指定对应的host和port,读取Socket数据源多用于测试场景,这里不再演示。
可以参考案例:
Spark实时(三):Structured Streaming入门案例-CSDN博客
Rate Source是以每秒指定的行数生成数据,每个输出行包含一个timestamp和value,其中timestamp是一个Timestamp含有信息分配的时间类型,value是从0开始的Long类型的数据,Rate Source式多用于测试。
scala代码如下:
- package com.lanson.structuredStreaming.source
-
- import org.apache.spark.sql.{DataFrame, SparkSession}
-
- /**
- * SSRateSource
- */
- object SSRateSource {
- def main(args: Array[String]): Unit = {
- //1.创建对象
- val spark: SparkSession = SparkSession.builder().master("local")
- .appName("rate test")
- // .config("spark.sql.shuffle.partitions", 1)
- .getOrCreate()
-
- val result: DataFrame = spark.readStream
- .format("rate")
- // 配置每秒生成多少行数据,默认1行
- .option("rowsPerSecond", "10")
- .option("numPartitions", 5)
- .load()
- result.writeStream
- .format("console")
- .option("numRows","100")
- .option("truncate","false")
- .start()
- .awaitTermination()
-
- }
-
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
结果:
Java代码如下:
- package com.lanson.structuredStreaming.source;
-
- import java.util.concurrent.TimeoutException;
- import org.apache.spark.sql.Dataset;
- import org.apache.spark.sql.Row;
- import org.apache.spark.sql.SparkSession;
- import org.apache.spark.sql.streaming.StreamingQueryException;
-
- public class ssratesource01 {
- public static void main(String[] args) throws TimeoutException, StreamingQueryException {
- //1.创建对象
- SparkSession spark = SparkSession.builder().master("local")
- .appName("rate test")
- .getOrCreate();
- spark.sparkContext().setLogLevel("Error");
-
- Dataset<Row> result = spark.readStream()
- .format("rate")
- // 配置每秒生成多少行数据,默认1行
- .option("rowsPerSecond", "10")
- .option("numPartitions", 5)
- .load();
-
- result.writeStream()
- .format("console")
- .option("numRows","100")
- .option("truncate","false")
- .start()
- .awaitTermination();
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
结果:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。