赞
踩
数据流
随处可见的数据流
流处理
流的好处
流的应用环境
流处理框架
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
val conf=new SparkConf().setMaster("local[2]").setAppName("kgc streaming demo")
val ssc=new StreamingContext(conf,Seconds(8))
//在spark-shell下,会出现如下错误提示:
//org.apache.spark.SparkException: Only one SparkContext may be running in this JVM
//解决:
//方法1、sc.stop //创建ssc前,停止spark-shell自行启动的SparkContext
//方法2、或者通过已有的sc创建ssc:val ssc=new StreamingContext(sc,Seconds(8))
1、一个JVM只能有一个StreamingContext启动
2、StreamingContext停止后不能再启动
wordcount:
开启kafka服务:
kafka-server-start.sh /opt/soft/kafka211/config/server.properties
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.3.4</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.3.4</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>2.3.4</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-10 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>2.3.4</version> </dependency>
log4j.rootLogger=ERROR,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%-20c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkConf, SparkContext} object MyReadKafkaHandler { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[*]").setAppName("MyKafka") val sc = new SparkContext(conf) //流处理的上下文类 val ssc = new StreamingContext(sc,Seconds(10)) //创建连接kafka服务器参数 val kafkaParam = Map( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "192.168.253.150:9092", ConsumerConfig.GROUP_ID_CONFIG -> "mykafka1", ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "true", ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG -> "20000", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer], ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer], ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest" ) //创建Direct流 val streams = KafkaUtils.createDirectStream(ssc,LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String,String](Set("mydemo"),kafkaParam)) //简单的数据处理 并打印 streams.map(_.value).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print() //启动sparkstreaming ssc.start() ssc.awaitTermination() } }
object MyReadKafkaHandler { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("mytest").setMaster("local[2]") val sc = SparkContext.getOrCreate(conf) // 流处理的上下文类 val ssc = new StreamingContext(sc,Seconds(10)) // 因为有状态DStream,所以必须要有地方记录 ssc.checkpoint("d:/mykafka-logs") // 创建链接kafka服务器参数 val kafkaParam = Map( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "192.168.56.101:9092", ConsumerConfig.GROUP_ID_CONFIG -> "myKafka2", ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "true", ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG -> "20000", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer], ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer], ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest" ) // 创建Direct流 val stream = KafkaUtils.createDirectStream(ssc,LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String,String](Set("mydemo"),kafkaParam)) // 带状态的DStream(1) val value = stream.map(_.value()).flatMap(_.split(" ")).map((_,1)) value.updateStateByKey((values:Seq[Int],state:Option[Int])=>{ var newVal = state.getOrElse(0) for (elem <- values) { newVal+=elem } Option(newVal) }).print() // 带状态的DStream(2) stream.map(_.value()).flatMap(_.split(" ")).map((_,1)).updateStateByKey((values:Seq[Int],stateValue:Option[Int])=>Option(stateValue.getOrElse(0)+values.sum)).print() //两种写法意思相同,只是写法不一样而已,仅供参考 // 启动sparkStreaming ssc.start() ssc.awaitTermination() } }
测试输入指令:
kafka-console-producer.sh --broker-list 192.168.253.150:9092 --topic mydemo
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} class KafkaSinks[k,v](fc:()=>KafkaProducer[k,v]) extends Serializable { lazy val producer = fc() def send(topic:String,key:k,value:v) = { producer.send(new ProducerRecord[k,v](topic,key,value)) } def send(topic:String,value:v)={ producer.send(new ProducerRecord[k,v](topic,value)) } } object KafkaSinks{ import scala.collection.JavaConversions._ def apply[k,v](conf:Map[String,String]): KafkaSinks[k,v] = { var func = ()=>{ val prod = new KafkaProducer[k,v](conf) sys.addShutdownHook{ prod.close() } prod } new KafkaSinks[k,v](func) } }
object MySingleBaseDAO { @volatile private var instance:Broadcast[KafkaSinks[String,String]]=null def getInstance()={ if (instance == null){ val sc = SparkSession.builder().appName("writeKafka").master("local[2]").getOrCreate().sparkContext synchronized{ if (instance == null){ val kafkaParams = Map[String,String]( ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> "jzy1:9092", ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> classOf[StringSerializer].getName, ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> classOf[StringSerializer].getName ) instance = sc.broadcast(KafkaSinks[String,String](kafkaParams)) } instance } } instance } }
def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("kafkawork").setMaster("local[*]") val sc = new SparkContext(conf) //流处理的上下文类 val ssc = new StreamingContext(sc, Seconds(10)) //记录状态 ssc.checkpoint("f:/mykafka-logs") //创建连接kafka服务器参数 val kafkaParam = Map( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "jzy1:9092", ConsumerConfig.GROUP_ID_CONFIG -> "mykafkawork", ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "true", ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG -> "2000", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer], ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer], ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest" ) //创建Direct流 ssc 算子均分分布提交 val streams = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](Set("user_friends2"), kafkaParam)) val r1: DStream[(String, String)] = streams.map(_.value()).filter(_ !="user,friends").filter(_.split(",").size>1).flatMap(line => { val ids = line.split(",") ids(1).split(" ").map(word => (ids(0), word)) }) r1.print() //掉用 r1.foreachRDD(rdd=>{ val producer = MySingleBaseDAO.getInstance().value rdd.foreach(record=>{ producer.send("kafkawork",record.toString()) }) }) //启动SparkStreaming ssc.start() ssc.awaitTermination() }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。