赞
踩
文章目录
我们使用Structured Streaming来监控socket数据统计WordCount。这里我们使用Spark版本为3.4.3版本,首先在Maven pom文件中导入以下依赖:
- <!-- 配置以下可以解决 在jdk1.8环境下打包时报错 “-source 1.5 中不支持 lambda 表达式” -->
- <properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <maven.compiler.source>1.8</maven.compiler.source>
- <maven.compiler.target>1.8</maven.compiler.target>
- <spark.version>3.4.3</spark.version>
- </properties>
-
- <dependencies>
- <!-- Spark-core -->
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_2.12</artifactId>
- <version>${spark.version}</version>
- </dependency>
- <!-- SparkSQL -->
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_2.12</artifactId>
- <version>${spark.version}</version>
- </dependency>
- <!-- SparkSQL ON Hive-->
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-hive_2.12</artifactId>
- <version>${spark.version}</version>
- </dependency>
- <!--mysql依赖的jar包-->
- <dependency>
- <groupId>mysql</groupId>
- <artifactId>mysql-connector-java</artifactId>
- <version>5.1.47</version>
- </dependency>
- <!--SparkStreaming-->
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming_2.12</artifactId>
- <version>${spark.version}</version>
- </dependency>
-
- <!-- Kafka 0.10+ Source For Structured Streaming-->
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-sql-kafka-0-10_2.12</artifactId>
- <version>${spark.version}</version>
- </dependency>
-
- <!-- 向kafka 生产数据需要包 -->
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- <version>2.8.0</version>
- </dependency>
-
- <!-- Scala 包-->
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-library</artifactId>
- <version>2.12.15</version>
- </dependency>
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-compiler</artifactId>
- <version>2.12.15</version>
- </dependency>
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-reflect</artifactId>
- <version>2.12.15</version>
- </dependency>
- <dependency>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- <version>1.2.12</version>
- </dependency>
- <dependency>
- <groupId>com.google.collections</groupId>
- <artifactId>google-collections</artifactId>
- <version>1.0</version>
- </dependency>
-
- </dependencies>
- package com.lanson.structuredStreaming
-
- /**
- * Structured Streaming 实时读取Socket数据
- */
-
- import org.apache.spark.sql.streaming.StreamingQuery
- import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
-
- /**
- * Structured Streaming 读取Socket数据
- */
- object SSReadSocketData {
- def main(args: Array[String]): Unit = {
-
- //1.创建SparkSession对象
- val spark: SparkSession = SparkSession.builder()
- .master("local")
- .appName("StructuredSocketWordCount")
- //默认200个并行度,由于源头数据量少,可以设置少一些并行度
- .config("spark.sql.shuffle.partitions",1)
- .getOrCreate()
-
- import spark.implicits._
-
- spark.sparkContext.setLogLevel("Error")
-
- //2.读取Socket中的每行数据,生成DataFrame默认列名为"value"
- val lines: DataFrame = spark.readStream
- .format("socket")
- .option("host", "node3")
- .option("port", 9999)
- .load()
-
- //3.将每行数据切分成单词,首先通过as[String]转换成Dataset操作
- val words: Dataset[String] = lines.as[String].flatMap(line=>{line.split(" ")})
-
- //4.按照单词分组,统计个数,自动多一个列count
- val wordCounts: DataFrame = words.groupBy("value").count()
-
- //5.启动流并向控制台打印结果
- val query: StreamingQuery = wordCounts.writeStream
- //更新模式设置为complete
- .outputMode("complete")
- .format("console")
- .start()
- query.awaitTermination()
-
- }
-
- }
- package com.lanson.structuredStreaming;
-
- import java.util.Arrays;
- import java.util.Iterator;
- import java.util.concurrent.TimeoutException;
- import org.apache.spark.api.java.function.FlatMapFunction;
- import org.apache.spark.sql.Dataset;
- import org.apache.spark.sql.Encoders;
- import org.apache.spark.sql.Row;
- import org.apache.spark.sql.SparkSession;
- import org.apache.spark.sql.streaming.StreamingQuery;
- import org.apache.spark.sql.streaming.StreamingQueryException;
-
- public class SSReadSocketData01 {
-
- public static void main(String[] args) throws StreamingQueryException, TimeoutException {
- SparkSession spark = SparkSession.builder().master("local")
- .appName("SSReadSocketData01")
- .config("spark.sql.shuffle.partitions", 1)
- .getOrCreate();
-
- spark.sparkContext().setLogLevel("Error");
-
- Dataset<Row> lines = spark.readStream().format("socket")
- .option("host", "node3")
- .option("port", 9999)
- .load();
-
- Dataset<String> words = lines.as(Encoders.STRING())
- .flatMap(new FlatMapFunction<String, String>() {
- @Override
- public Iterator<String> call(String line) throws Exception {
- return Arrays.asList(line.split(" ")).iterator();
- }
- }, Encoders.STRING());
-
- Dataset<Row> wordCounts = words.groupBy("value").count();
-
- StreamingQuery query = wordCounts.writeStream()
- .outputMode("complete")
- .format("console")
- .start();
-
- query.awaitTermination();
- }
- }
以上代码编写完成之后,在node3节点执行“nc -lk 9999”启动socket服务器,然后启动代码,向socket中输入以下数据:
- 第一次输入:a b c
- 第二次输入:d a c
- 第三次输入:a b c
可以看到控制台打印如下结果:
------------------------------------------- Batch: 1 ------------------------------------------- +-----+-----+ |value|count| +-----+-----+ | c| 1| | b| 1| | a| 1| +-----+-----+ ------------------------------------------- Batch: 2 ------------------------------------------- +-----+-----+ |value|count| +-----+-----+ | d| 1| | c| 2| | b| 1| | a| 2| +-----+-----+ ------------------------------------------- Batch: 3 ------------------------------------------- +-----+-----+ |value|count| +-----+-----+ | d| 1| | c| 3| | b| 2| | a| 3| +-----+-----+
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。