赞
踩
spark.streaming.receiver.maxRate
来限制 Receiver 的数据接收速率,来解决生产和消费速率不对等造成的内存溢出等问题,但当数据生产和数据消费的能力都高于 maxRate 时会造成资源利用率下降等问题spark.streaming.backpressure.enabled
来配置启用 backpressure 机制,默认值为 false,即不启用需求:使用 netcat 工具向 9999 端口不断的发送数据,通过 SparkStreaming 读取端口数据并统计不同单词出现的次数
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>3.0.0</version>
</dependency>
object SparkStreamingWC { def main(args: Array[String]): Unit = { // 1.创建 SparkStreaming 环境对象 val conf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming") /* 创建 StreamingContext 对象需要传递两个参数 1.SparkConf:配置对象 2.Duration:批处理的周期,即数据采集周期,单位为毫秒,内置有 Seconds/Minute 等对象 */ val ssc = new StreamingContext(conf, Seconds(3)) // 2.逻辑处理 val line: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999) val words = line.flatMap(_.split(" ")) val wordAsOne = words.map((_, 1)) val wordCount: DStream[(String, Int)] = wordAsOne.reduceByKey(_ + _) wordCount.print() // 3.运行采集器并等待关闭 /* 采集器是一个长期运行的任务,所以不能关闭 ssc,也不能让 main 方法执行完毕 */ ssc.start() ssc.awaitTermination() } }
nc -lp 9999
命令(Linux 下为 nc -lk 999
)Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。