当前位置:   article > 正文

Spark实时(三):Structured Streaming入门案例_spark structured streaming kafka java示例

spark structured streaming kafka java示例

文章目录

Structured Streaming入门案例

一、Scala代码如下

二、Java 代码如下

三、以上代码注意点如下


Structured Streaming入门案例

我们使用Structured Streaming来监控socket数据统计WordCount。这里我们使用Spark版本为3.4.3版本,首先在Maven pom文件中导入以下依赖:

  1. <!-- 配置以下可以解决 在jdk1.8环境下打包时报错 “-source 1.5 中不支持 lambda 表达式” -->
  2. <properties>
  3. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  4. <maven.compiler.source>1.8</maven.compiler.source>
  5. <maven.compiler.target>1.8</maven.compiler.target>
  6. <spark.version>3.4.3</spark.version>
  7. </properties>
  8. <dependencies>
  9. <!-- Spark-core -->
  10. <dependency>
  11. <groupId>org.apache.spark</groupId>
  12. <artifactId>spark-core_2.12</artifactId>
  13. <version>${spark.version}</version>
  14. </dependency>
  15. <!-- SparkSQL -->
  16. <dependency>
  17. <groupId>org.apache.spark</groupId>
  18. <artifactId>spark-sql_2.12</artifactId>
  19. <version>${spark.version}</version>
  20. </dependency>
  21. <!-- SparkSQL ON Hive-->
  22. <dependency>
  23. <groupId>org.apache.spark</groupId>
  24. <artifactId>spark-hive_2.12</artifactId>
  25. <version>${spark.version}</version>
  26. </dependency>
  27. <!--mysql依赖的jar包-->
  28. <dependency>
  29. <groupId>mysql</groupId>
  30. <artifactId>mysql-connector-java</artifactId>
  31. <version>5.1.47</version>
  32. </dependency>
  33. <!--SparkStreaming-->
  34. <dependency>
  35. <groupId>org.apache.spark</groupId>
  36. <artifactId>spark-streaming_2.12</artifactId>
  37. <version>${spark.version}</version>
  38. </dependency>
  39. <!-- Kafka 0.10+ Source For Structured Streaming-->
  40. <dependency>
  41. <groupId>org.apache.spark</groupId>
  42. <artifactId>spark-sql-kafka-0-10_2.12</artifactId>
  43. <version>${spark.version}</version>
  44. </dependency>
  45. <!-- 向kafka 生产数据需要包 -->
  46. <dependency>
  47. <groupId>org.apache.kafka</groupId>
  48. <artifactId>kafka-clients</artifactId>
  49. <version>2.8.0</version>
  50. </dependency>
  51. <!-- Scala 包-->
  52. <dependency>
  53. <groupId>org.scala-lang</groupId>
  54. <artifactId>scala-library</artifactId>
  55. <version>2.12.15</version>
  56. </dependency>
  57. <dependency>
  58. <groupId>org.scala-lang</groupId>
  59. <artifactId>scala-compiler</artifactId>
  60. <version>2.12.15</version>
  61. </dependency>
  62. <dependency>
  63. <groupId>org.scala-lang</groupId>
  64. <artifactId>scala-reflect</artifactId>
  65. <version>2.12.15</version>
  66. </dependency>
  67. <dependency>
  68. <groupId>log4j</groupId>
  69. <artifactId>log4j</artifactId>
  70. <version>1.2.12</version>
  71. </dependency>
  72. <dependency>
  73. <groupId>com.google.collections</groupId>
  74. <artifactId>google-collections</artifactId>
  75. <version>1.0</version>
  76. </dependency>
  77. </dependencies>

一、Scala代码如下

  1. package com.lanson.structuredStreaming
  2. /**
  3. * Structured Streaming 实时读取Socket数据
  4. */
  5. import org.apache.spark.sql.streaming.StreamingQuery
  6. import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
  7. /**
  8. * Structured Streaming 读取Socket数据
  9. */
  10. object SSReadSocketData {
  11. def main(args: Array[String]): Unit = {
  12. //1.创建SparkSession对象
  13. val spark: SparkSession = SparkSession.builder()
  14. .master("local")
  15. .appName("StructuredSocketWordCount")
  16. //默认200个并行度,由于源头数据量少,可以设置少一些并行度
  17. .config("spark.sql.shuffle.partitions",1)
  18. .getOrCreate()
  19. import spark.implicits._
  20. spark.sparkContext.setLogLevel("Error")
  21. //2.读取Socket中的每行数据,生成DataFrame默认列名为"value"
  22. val lines: DataFrame = spark.readStream
  23. .format("socket")
  24. .option("host", "node3")
  25. .option("port", 9999)
  26. .load()
  27. //3.将每行数据切分成单词,首先通过as[String]转换成Dataset操作
  28. val words: Dataset[String] = lines.as[String].flatMap(line=>{line.split(" ")})
  29. //4.按照单词分组,统计个数,自动多一个列count
  30. val wordCounts: DataFrame = words.groupBy("value").count()
  31. //5.启动流并向控制台打印结果
  32. val query: StreamingQuery = wordCounts.writeStream
  33. //更新模式设置为complete
  34. .outputMode("complete")
  35. .format("console")
  36. .start()
  37. query.awaitTermination()
  38. }
  39. }

 

二、Java 代码如下

  1. package com.lanson.structuredStreaming;
  2. import java.util.Arrays;
  3. import java.util.Iterator;
  4. import java.util.concurrent.TimeoutException;
  5. import org.apache.spark.api.java.function.FlatMapFunction;
  6. import org.apache.spark.sql.Dataset;
  7. import org.apache.spark.sql.Encoders;
  8. import org.apache.spark.sql.Row;
  9. import org.apache.spark.sql.SparkSession;
  10. import org.apache.spark.sql.streaming.StreamingQuery;
  11. import org.apache.spark.sql.streaming.StreamingQueryException;
  12. public class SSReadSocketData01 {
  13. public static void main(String[] args) throws StreamingQueryException, TimeoutException {
  14. SparkSession spark = SparkSession.builder().master("local")
  15. .appName("SSReadSocketData01")
  16. .config("spark.sql.shuffle.partitions", 1)
  17. .getOrCreate();
  18. spark.sparkContext().setLogLevel("Error");
  19. Dataset<Row> lines = spark.readStream().format("socket")
  20. .option("host", "node3")
  21. .option("port", 9999)
  22. .load();
  23. Dataset<String> words = lines.as(Encoders.STRING())
  24. .flatMap(new FlatMapFunction<String, String>() {
  25. @Override
  26. public Iterator<String> call(String line) throws Exception {
  27. return Arrays.asList(line.split(" ")).iterator();
  28. }
  29. }, Encoders.STRING());
  30. Dataset<Row> wordCounts = words.groupBy("value").count();
  31. StreamingQuery query = wordCounts.writeStream()
  32. .outputMode("complete")
  33. .format("console")
  34. .start();
  35. query.awaitTermination();
  36. }
  37. }

 

以上代码编写完成之后,在node3节点执行“nc -lk 9999”启动socket服务器,然后启动代码,向socket中输入以下数据:

  1. 第一次输入:a b c
  2. 第二次输入:d a c
  3. 第三次输入:a b c

可以看到控制台打印如下结果:

  1. -------------------------------------------
  2. Batch: 1
  3. -------------------------------------------
  4. +-----+-----+
  5. |value|count|
  6. +-----+-----+
  7. | c| 1|
  8. | b| 1|
  9. | a| 1|
  10. +-----+-----+
  11. -------------------------------------------
  12. Batch: 2
  13. -------------------------------------------
  14. +-----+-----+
  15. |value|count|
  16. +-----+-----+
  17. | d| 1|
  18. | c| 2|
  19. | b| 1|
  20. | a| 2|
  21. +-----+-----+
  22. -------------------------------------------
  23. Batch: 3
  24. -------------------------------------------
  25. +-----+-----+
  26. |value|count|
  27. +-----+-----+
  28. | d| 1|
  29. | c| 3|
  30. | b| 2|
  31. | a| 3|
  32. +-----+-----+

三、以上代码注意点如下

  • SparkSQL 默认并行度为200,这里由于数据量少,可以将并行度通过参数“spark.sql.shuffle.partitions”设置少一些。
  • StructuredStreaming读取过来数据默认是DataFrame,默认有“value”名称的列
  • 对获取的DataFrame需要通过as[String]转换成Dataset进行操作
  • 结果输出时的OutputMode有三种输出模式:Complete Mode、Append Mode、Update Mode。

推荐阅读
相关标签