当前位置:   article > 正文

Spark实时(五):InputSource数据源案例演示

Spark实时(五):InputSource数据源案例演示

文章目录

InputSource数据源案例演示

一、​​​​​​​File Source

1、读取text文件

2、读取csv文件

3、读取json文件

二、Socket Source 

三、Rate Source


InputSource数据源案例演示

在Spark2.0版本之后,DataFrame和Dataset可以表示静态有边界的数据,也可以表示无边界的流式数据。在Structured Streaming中我们可以使用SparkSession针对流式数据源创建对应的Dataset或者DataFrame,并可以像处理批数据一样使用各种Operators操作处理流式数据。

Structured Streaming的数据源目前支持File Source 、Socket Source 、Rate Source、Kafka Source ,与Kafka的整合在后续整理,这里对其他三种数据源分别演示。

一、​​​​​​​​​​​​​​File Source

Sturctured Streaming可以读取写入目录的文件作为数据流,文件将按照文件修改时间的顺序进行处理,文件必须原子性的存入到监控目录中,支持的格式有text、csv、json、orc、parquet。

1、读取text文件

Scala代码如下:

  1. package com.lanson.structuredStreaming.source
  2. import org.apache.spark.sql.streaming.StreamingQuery
  3. import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
  4. /**
  5. * Structured Streaming监控目录 text格式数据
  6. */
  7. object SSReadTextData {
  8. def main(args: Array[String]): Unit = {
  9. //1.创建对象
  10. val spark: SparkSession = SparkSession.builder().master("local")
  11. .appName("SSReadTextData")
  12. .config("spark.sql.shuffle.partitions", 1)
  13. .getOrCreate()
  14. import spark.implicits._
  15. spark.sparkContext.setLogLevel("Error")
  16. //2.监控目录
  17. val ds: Dataset[String] = spark.readStream.textFile("./data/")
  18. val result: DataFrame = ds.map(line => {
  19. val arr: Array[String] = line.split("-")
  20. (arr(0).toInt, arr(1), arr(2).toInt)
  21. }).toDF("id", "name", "age")
  22. val query: StreamingQuery = result.writeStream
  23. .format("console")
  24. .start()
  25. query.awaitTermination()
  26. }
  27. }

 结果:

Java代码如下:

  1. package com.lanson.structuredStreaming.source;
  2. import java.util.concurrent.TimeoutException;
  3. import org.apache.spark.api.java.function.MapFunction;
  4. import org.apache.spark.sql.Dataset;
  5. import org.apache.spark.sql.Encoders;
  6. import org.apache.spark.sql.Row;
  7. import org.apache.spark.sql.SparkSession;
  8. import org.apache.spark.sql.streaming.StreamingQueryException;
  9. import scala.Tuple3;
  10. public class SSReadTextData01 {
  11. public static void main(String[] args) throws TimeoutException, StreamingQueryException {
  12. //1.创建对象
  13. SparkSession spark = SparkSession.builder().master("local")
  14. .appName("SSReadSocketData01")
  15. .config("spark.sql.shuffle.partitions", 1)
  16. .getOrCreate();
  17. spark.sparkContext().setLogLevel("Error");
  18. Dataset<String> ds = spark.readStream().textFile("./data/");
  19. Dataset<Tuple3<Integer, String, Integer>> ds2 = ds.map(new MapFunction<String, Tuple3<Integer, String, Integer>>() {
  20. @Override
  21. public Tuple3<Integer, String, Integer> call(String line) throws Exception {
  22. String[] arr = line.split("-");
  23. return new Tuple3<>(Integer.valueOf(arr[0]), arr[1],Integer.valueOf(arr[2]) );
  24. }
  25. }, Encoders.tuple(Encoders.INT(), Encoders.STRING(), Encoders.INT()));
  26. Dataset<Row> result = ds2.toDF("id", "name", "age");
  27. result.writeStream()
  28. .format("console")
  29. .start()
  30. .awaitTermination();
  31. }
  32. }

 结果:

以上代码编写完成之后,向监控的目录“./data”中不断写入含有以下内容的文件,可以看到控制台有对应的流数据输出,这里一定是原子性的将文件复制到对应目录下。文件内容如下:

  1. 1-zhangsan-18
  2. 2-lisi-19
  3. 3-ww-20

2、读取csv文件

Scala代码如下:

  1. package com.lanson.structuredStreaming.source
  2. import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
  3. import org.apache.spark.sql.streaming.StreamingQuery
  4. import org.apache.spark.sql.types.StructType
  5. /**
  6. * Structured Streaming 读取CSV数据
  7. */
  8. object SSReadCsvData {
  9. def main(args: Array[String]): Unit = {
  10. //1.创建对象
  11. val spark: SparkSession = SparkSession.builder().master("local")
  12. .appName("SSReadCsvData")
  13. .config("spark.sql.shuffle.partitions", 1)
  14. .getOrCreate()
  15. import spark.implicits._
  16. spark.sparkContext.setLogLevel("Error")
  17. //2.创建CSV数据schema
  18. val userSchema: StructType = new StructType().add("id", "integer")
  19. .add("name", "string")
  20. .add("gender", "string")
  21. .add("age", "integer")
  22. val result: DataFrame = spark.readStream
  23. .option("sep", ",")
  24. .schema(userSchema)
  25. .csv("./data/")
  26. val query: StreamingQuery = result.writeStream
  27. .format("console")
  28. .start()
  29. query.awaitTermination()
  30. }
  31. }

结果:

Java代码如下

  1. package com.lanson.structuredStreaming.source;
  2. import java.util.concurrent.TimeoutException;
  3. import org.apache.spark.sql.Dataset;
  4. import org.apache.spark.sql.Row;
  5. import org.apache.spark.sql.SparkSession;
  6. import org.apache.spark.sql.streaming.StreamingQueryException;
  7. import org.apache.spark.sql.types.StructType;
  8. /**
  9. * Structured Streaming 读取CSV数据
  10. */
  11. public class SSReadCsvData01 {
  12. public static void main(String[] args) throws TimeoutException, StreamingQueryException {
  13. //1.创建对象
  14. SparkSession spark = SparkSession.builder().master("local")
  15. .appName("SSReadCsvData")
  16. .config("spark.sql.shuffle.partitions", 1)
  17. .getOrCreate();
  18. spark.sparkContext().setLogLevel("Error");
  19. StructType userSchema = new StructType()
  20. .add("id", "integer")
  21. .add("name", "string")
  22. .add("gender", "string")
  23. .add("age", "integer");
  24. Dataset<Row> result = spark.readStream()
  25. .option("sep", ",")
  26. .schema(userSchema)
  27. .csv("./data/");
  28. result.writeStream()
  29. .format("console")
  30. .start()
  31. .awaitTermination();
  32. }
  33. }

 结果:

以上代码运行之后向对应监控的目录下原子性写入含有数据的csv文件,在控制台可以看到实时监控内容。文件内容如下:

  1. 1,zhangsan,一班,100
  2. 2,lisi,二班,200
  3. 3,wangwu,一班,300
  4. 4,maliu,二班,100
  5. 5,tianqi,三班,100
  6. 6,gaoba,三班,50
  7. 7,zs2,四班,50

3、读取json文件

Scala代码如下:

  1. package com.lanson.structuredStreaming.source
  2. import org.apache.spark.sql.{DataFrame, SparkSession}
  3. import org.apache.spark.sql.streaming.StreamingQuery
  4. import org.apache.spark.sql.types.StructType
  5. /**
  6. * Structured Streaming 监控Json格式数据
  7. */
  8. object SSReadJsonData {
  9. def main(args: Array[String]): Unit = {
  10. //1.创建对象
  11. val spark: SparkSession = SparkSession.builder().master("local")
  12. .appName("SSReadCsvData")
  13. .config("spark.sql.shuffle.partitions", 1)
  14. .getOrCreate()
  15. import spark.implicits._
  16. spark.sparkContext.setLogLevel("Error")
  17. //2.创建 json 数据schema
  18. val userSchema: StructType = new StructType().add("id", "integer")
  19. .add("name", "string")
  20. .add("age", "integer")
  21. val result: DataFrame = spark.readStream
  22. .schema(userSchema)
  23. .json("./data/")
  24. val query: StreamingQuery = result.writeStream
  25. .format("console")
  26. .start()
  27. query.awaitTermination()
  28. }
  29. }

结果:

Java代码如下

  1. package com.lanson.structuredStreaming.source;
  2. import org.apache.spark.sql.Dataset;
  3. import org.apache.spark.sql.Row;
  4. import org.apache.spark.sql.SparkSession;
  5. import org.apache.spark.sql.streaming.StreamingQuery;
  6. import org.apache.spark.sql.streaming.StreamingQueryException;
  7. import org.apache.spark.sql.types.StructType;
  8. import java.util.concurrent.TimeoutException;
  9. /**
  10. * Structured Streaming实时监控目录中json文件作为数据流
  11. */
  12. public class SSReadJsonData01 {
  13. public static void main(String[] args) throws TimeoutException, StreamingQueryException {
  14. //1.创建对象
  15. SparkSession spark = SparkSession.builder().appName("File Source test")
  16. .master("local")
  17. .getOrCreate();
  18. //2.设置日志
  19. spark.sparkContext().setLogLevel("Error");
  20. //3.设置Schema
  21. StructType userSchema = new StructType().add("id", "integer")
  22. .add("name", "string")
  23. .add("age", "integer");
  24. //4.指定监控目录读取数据json数据
  25. Dataset<Row> ds = spark.readStream()
  26. .option("sep", ",")
  27. .schema(userSchema)
  28. .json("./data/");
  29. //5.打印数据到控制台
  30. StreamingQuery query =ds.writeStream()
  31. .format("console")
  32. .start();
  33. query.awaitTermination();
  34. }
  35. }

结果:

以上代码启动之后,向监控的目录“./data”下原子写入含有以下内容的json文件,在控制台可以看到实时监控内容。json文件内容如下:

  1. {"id":1,"name":"zs","age":18}
  2. {"id":2,"name":"ls","age":19}
  3. {"id":3,"name":"ww","age":20}
  4. {"id":4,"name":"ml","age":21}

注意:实时监控json格式数据时,创建的Schema 中的字段需要与Json中的属性保持一致,否则在映射成表时,Schema中含有但在Json中没有的属性的字段对应的数据会为null。

二、Socket Source 

读取Socket方式需要指定对应的host和port,读取Socket数据源多用于测试场景,这里不再演示。

可以参考案例:

Spark实时(三):Structured Streaming入门案例-CSDN博客

三、Rate Source

Rate Source是以每秒指定的行数生成数据,每个输出行包含一个timestamp和value,其中timestamp是一个Timestamp含有信息分配的时间类型,value是从0开始的Long类型的数据,Rate Source式多用于测试。

scala代码如下:

  1. package com.lanson.structuredStreaming.source
  2. import org.apache.spark.sql.{DataFrame, SparkSession}
  3. /**
  4. * SSRateSource
  5. */
  6. object SSRateSource {
  7. def main(args: Array[String]): Unit = {
  8. //1.创建对象
  9. val spark: SparkSession = SparkSession.builder().master("local")
  10. .appName("rate test")
  11. // .config("spark.sql.shuffle.partitions", 1)
  12. .getOrCreate()
  13. val result: DataFrame = spark.readStream
  14. .format("rate")
  15. // 配置每秒生成多少行数据,默认1行
  16. .option("rowsPerSecond", "10")
  17. .option("numPartitions", 5)
  18. .load()
  19. result.writeStream
  20. .format("console")
  21. .option("numRows","100")
  22. .option("truncate","false")
  23. .start()
  24. .awaitTermination()
  25. }
  26. }

结果:

Java代码如下:

  1. package com.lanson.structuredStreaming.source;
  2. import java.util.concurrent.TimeoutException;
  3. import org.apache.spark.sql.Dataset;
  4. import org.apache.spark.sql.Row;
  5. import org.apache.spark.sql.SparkSession;
  6. import org.apache.spark.sql.streaming.StreamingQueryException;
  7. public class ssratesource01 {
  8. public static void main(String[] args) throws TimeoutException, StreamingQueryException {
  9. //1.创建对象
  10. SparkSession spark = SparkSession.builder().master("local")
  11. .appName("rate test")
  12. .getOrCreate();
  13. spark.sparkContext().setLogLevel("Error");
  14. Dataset<Row> result = spark.readStream()
  15. .format("rate")
  16. // 配置每秒生成多少行数据,默认1行
  17. .option("rowsPerSecond", "10")
  18. .option("numPartitions", 5)
  19. .load();
  20. result.writeStream()
  21. .format("console")
  22. .option("numRows","100")
  23. .option("truncate","false")
  24. .start()
  25. .awaitTermination();
  26. }
  27. }

结果: 

 


推荐阅读
相关标签