赞
踩
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#overview
package cn.hanjiaxiaozhi.structedstream import org.apache.spark.SparkContext import org.apache.spark.sql.streaming.Trigger import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} /** * Author hanjiaxiaozhi * Date 2020/7/26 16:35 * Desc 演示使用StructuredStreaming读取Socket数据并做WordCount */ object WordCount { def main(args: Array[String]): Unit = { //1.准备StructuredStreaming执行环境 //SparkContext?--RDD //SparkSession?--DataFrame/DataSet //StreamingContext?--DStream //这里应该要使用SparkSession,因为StructuredStreaming的编程API还是DataFrame/DataSet val spark: SparkSession = SparkSession.builder.appName("wc").master("local[*]").getOrCreate() val sc: SparkContext = spark.sparkContext sc.setLogLevel("WARN") //2.读取node01:9999端口的数据 val df: DataFrame = spark.readStream//表示使用DataFrame/DataSet做流处理并加载数据 .option("host", "node01")//指定ip .option("port", 9999)//指定端口 .format("socket")//指定数据源为socket .load()//开始加载数据 //3.做WordCount import spark.implicits._ val ds: Dataset[String] = df.as[String] val result: Dataset[Row] = ds.flatMap(_.split(" ")).groupBy("value").count().sort($"count".desc) //4.输出结果 //result.show()//错误,这是离线处理是的输出,现在做实时处理 //Queries with streaming sources must be executed with writeStream.start();; result.writeStream .format("console")//指定往控制台输出 .outputMode("complete")//输出模式,complete表示每次将所有数据都输出 .trigger(Trigger.ProcessingTime(0))//触发计算的时间间隔,0表示尽可能快的触发执行,//If `interval` is 0, the query will run as fast as possible. .start()//开启 .awaitTermination()//等待结束 //可以观察到确实可以完成WordCount计算,并且自动做了和历史数据的聚合,API使用起来比SparkStreaming要简单 } }
package cn.hanjiaxiaozhi.structedstream import org.apache.spark.SparkContext import org.apache.spark.sql.streaming.Trigger import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} /** * Author hanjiaxiaozhi * Date 2020/7/26 16:35 * Desc 演示使用StructuredStreaming读取Socket数据并做WordCount */ object WordCount { def main(args: Array[String]): Unit = { //1.准备StructuredStreaming执行环境 //SparkContext?--RDD //SparkSession?--DataFrame/DataSet //StreamingContext?--DStream //这里应该要使用SparkSession,因为StructuredStreaming的编程API还是DataFrame/DataSet val spark: SparkSession = SparkSession.builder.appName("wc").master("local[*]").getOrCreate() val sc: SparkContext = spark.sparkContext sc.setLogLevel("WARN") //2.读取node01:9999端口的数据 val df: DataFrame = spark.readStream//表示使用DataFrame/DataSet做流处理并加载数据 .option("host", "node01")//指定ip .option("port", 9999)//指定端口 .format("socket")//指定数据源为socket .load()//开始加载数据 //3.做WordCount import spark.implicits._ val ds: Dataset[String] = df.as[String] val result: Dataset[Row] = ds.flatMap(_.split(" ")).groupBy("value").count().sort($"count".desc) //4.输出结果 //result.show()//错误,这是离线处理是的输出,现在做实时处理 //Queries with streaming sources must be executed with writeStream.start();; result.writeStream .format("console")//指定往控制台输出 .outputMode("complete")//输出模式,complete表示每次将所有数据都输出,必须包含聚合 //.outputMode("append")//默认模式,表示输出新增的行,只支持简单查询,不支持聚合//Append output mode not supported when there are streaming aggregations on streaming //.outputMode("update")//更新模式,表示输出有更新的行,不支持排序//Sorting is not supported on streaming DataFrames/Datasets .trigger(Trigger.ProcessingTime(0))//触发计算的时间间隔,0表示尽可能快的触发执行,//If `interval` is 0, the query will run as fast as possible. .start()//开启 .awaitTermination()//等待结束 //可以观察到确实可以完成WordCount计算,并且自动做了和历史数据的聚合,API使用起来比SparkStreaming要简单 } }
package cn.hanjiaxiaozhi.structedstream import org.apache.spark.SparkContext import org.apache.spark.sql.streaming.{StreamingQuery, Trigger} import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} /** * Author hanjiaxiaozhi * Date 2020/7/26 16:35 * Desc 演示使用StructuredStreaming读取Socket数据并做WordCount */ object WordCount2 { def main(args: Array[String]): Unit = { //1.准备StructuredStreaming执行环境 //SparkContext?--RDD //SparkSession?--DataFrame/DataSet //StreamingContext?--DStream //这里应该要使用SparkSession,因为StructuredStreaming的编程API还是DataFrame/DataSet val spark: SparkSession = SparkSession.builder.appName("wc").master("local[*]").getOrCreate() val sc: SparkContext = spark.sparkContext sc.setLogLevel("WARN") //2.读取node01:9999端口的数据 val df: DataFrame = spark.readStream//表示使用DataFrame/DataSet做流处理并加载数据 .option("host", "node01")//指定ip .option("port", 9999)//指定端口 .format("socket")//指定数据源为socket .load()//开始加载数据 //3.做WordCount import spark.implicits._ val ds: Dataset[String] = df.as[String] val result: Dataset[Row] = ds.flatMap(_.split(" ")).groupBy("value").count().sort($"count".desc) //4.输出结果--输出到内存表,方便后续使用sql进行查询 val query: StreamingQuery = result.writeStream .format("memory") //表示输出到内存表 .queryName("t_memory") //起个表名 .outputMode("complete") //.trigger(Trigger.ProcessingTime(0))//不指定也有默认值0 .start()//开启 //.awaitTermination() //如果awaitTermination在这里写,后面的查询没有计划执行了! //在这里可以一直去查询内存表 while (true){ Thread.sleep(5000)//每隔5s查一次内存表,看看表中的数据,方便测试 spark.sql("select * from t_memory").show() } query.awaitTermination()//等待停止 } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。