赞
踩
Kafka作为一个实时的分布式消息队列,实时地生产和消费消息。在这里,我们可以利用Spark Streaming实时地读取Kafka中的数据,然后再进行相关计算。在Spark.[.3版本后, KafkaUtils里面提供 了两个创建DStream的方式,一种是KafkaUtils.createDstream方式,另-种为KafkaUtils.createDirectStream方式。 本节,我们针对DStream的这两种方式进行详细介绍。
KafkaUtils.createDstream方式(即基于Receiver的方式),主要是通过Zookeeper连接Kafka, receivers接收器从Kafka中获取数据,并且所有receivers获取到的数据都会保存在Spark executors中,然后通过Spark Streaming启动job来处理这些数据,具体处理流程如图所示。
在图中,当Driver处理Spark Executors中的job时,默认是会出现数据丢失的情况,此时,如果我们启用WAL日志将接收到数据同步地保存到分布式文件系统上(如HDFS) ,当数据由于某种原因丢失时,丢失的数据能够及时恢复。
接下来,通过一个具体的案例来演示如何使用KafkaUtils.createDstream实现词频统计,具体实现步骤如下:
我们需要在pom.xm文件中添加Spark Streaming整合Kafka的依赖。具体内容如下:
<!--引入sparkstreaming整合kafka的依赖-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>2.0.2</version>
</dependency>
在spark_ chapter07项目的/src/main/scala/cn.itcast.dstream目录下, 创建一个名
为"SparkStreaming Kafka_ _createDstream”的Scala类,用来编写Spark Streaming应用程序实现词频统计。具体实现代码如文件所示。
文件7-7 SparkStreaming_Kafka_createDstream.scala
package cn.itcast.dstream import org.apache.spark.streaming.dstream.{ DStream, ReceiverInputDStream} import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{ Seconds, StreamingContext} import org.apache.spark.{ SparkConf, SparkContext} import scala.collection.immutable object SparkStreaming_Kafka_createDstream { def main(args: Array[String]): Unit = { //1.创建sparkConf,并开启wal预写日志,保存数据源 val sparkConf: SparkConf =
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。