当前位置:   article > 正文

Kafka和Spark Streaming的组合使用学习笔记(Spark 3.5.1)_2. kafka和structured streaming组合使用 (1)编写生产者程序每1秒生成一

2. kafka和structured streaming组合使用 (1)编写生产者程序每1秒生成一个包含2个

一、安装Kafka

1.执行以下命令完成Kafka的安装:
  1. cd ~ //默认压缩包放在根目录
  2. sudo tar -zxf kafka_2.12-2.6.0.tgz -C /usr/local
  3. cd /usr/local
  4. sudo mv kafka_2.12-2.6.0 kafka-2.6.0
  5. sudo chown -R qiangzi ./kafka-2.6.0

二、启动Kafaka

1.首先需要启动Kafka,打开一个终端,输入下面命令启动Zookeeper服务:
  1. cd /usr/local/kafka-2.6.0
  2. ./bin/zookeeper-server-start.sh config/zookeeper.properties

注意:以上现象是Zookeeper服务器已经启动,正在处于服务状态。不要关闭!

2.打开第二个终端输入下面命令启动Kafka服务:
  1. cd /usr/local/kafka-2.6.0
  2. ./bin/kafka-server-start.sh config/server.properties
  3. //加了“&”的命令,Kafka就会在后台运行,即使关闭了这个终端,Kafka也会一直在后台运行。
  4. bin/kafka-server-start.sh config/server.properties &

注意:同样不要误以为死机了,而是Kafka服务器已经启动,正在处于服务状态。

三、创建Topic

1.再打开第三个终端,然后输入下面命令创建一个自定义名称为“wordsender”的Topic:
  1. cd /usr/local/kafka-2.6.0
  2. ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic wordsender
2.然后,可以执行如下命令,查看名称为“wordsender”的Topic是否已经成功创建:
./bin/kafka-topics.sh --list --zookeeper localhost:2181

3.再新开一个终端(记作“监控输入终端”),执行如下命令监控Kafka收到的文本:
  1. cd /usr/local/kafka-2.6.0
  2. bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic wordsender

注意,所有这些终端窗口都不要关闭,要继续留着后面使用。

四、Spark准备工作

Kafka和Flume等高级输入源,需要依赖独立的库(jar文件),因此,需要为Spark添加相关jar包。访问MVNREPOSITORY官网(http://mvnrepository.com),下载spark-streaming-kafka-0-10_2.12-3.5.1.jar和spark-token-provider-kafka-0-10_2.12-3.5.1.jar文件,其中,2.12表示Scala的版本号,3.5.1表示Spark版本号。然后,把这两个文件复制到Spark目录的jars目录下(即“/usr/local/spark-3.5.1/jars”目录)。此外,还需要把“/usr/local/kafka-2.6.0/libs”目录下的kafka-clients-2.6.0.jar文件复制到Spark目录的jars目录下。

  1. cd ~ .jar文件默认放在根目录
  2. sudo mv ./spark-streaming-kafka-0-10_2.12-3.5.1.jar /usr/local/spark-3.5.1/jars/
  3. sudo mv ./spark-token-provider-kafka-0-10_2.12-3.5.1.jar /usr/local/spark-3.5.1/jars/
  4. sudo cp /usr/local/kafka-2.6.0/libs/kafka-clients-2.6.0.jar /usr/local/spark-3.5.1/jars/

spark-streaming-kafka-0-10_2.12-3.5.1.jar的下载页面:

Maven Repository: org.apache.spark » spark-streaming-kafka-0-10_2.12 » 3.5.1 (mvnrepository.com)

spark-streaming-kafka-0-10_2.12-3.5.1.jar的下载页面:

Maven Repository: org.apache.spark » spark-token-provider-kafka-0-10_2.12 » 3.5.1 (mvnrepository.com)

进入下载页面以后,如下图所示,点击红色方框内的“jar”,就可以下载JAR包了。

五、编写Spark Streaming程序使用Kafka数据源

1.编写生产者(Producer)程序
(1)新打开一个终端,然后,执行如下命令创建代码目录和代码文件:
  1. cd /usr/local/spark-3.5.1
  2. mkdir mycode
  3. cd ./mycode
  4. mkdir kafka
  5. mkdir -p kafka/src/main/scala
  6. vi kafka/src/main/scala/KafkaWordProducer.scala
(2)使用vi编辑器新建了KafkaWordProducer.scala

它是用来产生一系列字符串的程序,会产生随机的整数序列,每个整数被当作一个单词,提供给KafkaWordCount程序去进行词频统计。请在KafkaWordProducer.scala中输入以下代码:

  1. import java.util.HashMap
  2. import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
  3. import org.apache.spark.SparkConf
  4. import org.apache.spark.streaming._
  5. import org.apache.spark.streaming.kafka010._
  6. object KafkaWordProducer {
  7. def main(args: Array[String]) {
  8. if (args.length < 4) {
  9. System.err.println("Usage: KafkaWordProducer <metadataBrokerList> <topic> " +
  10. "<messagesPerSec> <wordsPerMessage>")
  11. System.exit(1)
  12. }
  13. val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args
  14. // Zookeeper connection properties
  15. val props = new HashMap[String, Object]()
  16. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
  17. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
  18. "org.apache.kafka.common.serialization.StringSerializer")
  19. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
  20. "org.apache.kafka.common.serialization.StringSerializer")
  21. val producer = new KafkaProducer[String, String](props)
  22. // Send some messages
  23. while(true) {
  24. (1 to messagesPerSec.toInt).foreach { messageNum =>
  25. val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).
  26. toString)
  27. .mkString(" ")
  28. print(str)
  29. println()
  30. val message = new ProducerRecord[String, String](topic, null, str)
  31. producer.send(message)
  32. }
  33. Thread.sleep(1000)
  34. }
  35. }
  36. }
2.编写消费者(Consumer)程序

在“/usr/local/spark-3.5.1/mycode/kafka/src/main/scala”目录下创建文件KafkaWordCount.scala,用于单词词频统计,它会把KafkaWordProducer发送过来的单词进行词频统计,代码内容如下:

  1. cd /usr/local/spark-3.5.1/mycode
  2. vi kafka/src/main/scala/KafkaWordCount.scala
  1. import org.apache.spark._
  2. import org.apache.spark.SparkConf
  3. import org.apache.spark.rdd.RDD
  4. import org.apache.spark.streaming._
  5. import org.apache.spark.streaming.kafka010._
  6. import org.apache.spark.streaming.StreamingContext._
  7. import org.apache.spark.streaming.kafka010.KafkaUtils
  8. import org.apache.kafka.common.serialization.StringDeserializer
  9. import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
  10. import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
  11. object KafkaWordCount{
  12. def main(args:Array[String]){
  13. val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")
  14. val sc = new SparkContext(sparkConf)
  15. sc.setLogLevel("ERROR")
  16. val ssc = new StreamingContext(sc,Seconds(10))
  17. ssc.checkpoint("file:///usr/local/spark-3.5.1/mycode/kafka/checkpoint") //设置检查点,如果存放在HDFS上面,则写成类似ssc.checkpoint("/user/hadoop/checkpoint")这种形式,但是,要启动Hadoop
  18. val kafkaParams = Map[String, Object](
  19. "bootstrap.servers" -> "localhost:9092",
  20. "key.deserializer" -> classOf[StringDeserializer],
  21. "value.deserializer" -> classOf[StringDeserializer],
  22. "group.id" -> "use_a_separate_group_id_for_each_stream",
  23. "auto.offset.reset" -> "latest",
  24. "enable.auto.commit" -> (true: java.lang.Boolean)
  25. )
  26. val topics = Array("wordsender")
  27. val stream = KafkaUtils.createDirectStream[String, String](
  28. ssc,
  29. PreferConsistent,
  30. Subscribe[String, String](topics, kafkaParams)
  31. )
  32. stream.foreachRDD(rdd => {
  33. val offsetRange = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  34. val maped: RDD[(String, String)] = rdd.map(record => (record.key,record.value))
  35. val lines = maped.map(_._2)
  36. val words = lines.flatMap(_.split(" "))
  37. val pair = words.map(x => (x,1))
  38. val wordCounts = pair.reduceByKey(_+_)
  39. wordCounts.foreach(println)
  40. })
  41. ssc.start
  42. ssc.awaitTermination
  43. }
  44. }
3.在路径“file:///usr/local/spark/mycode/kafka/”下创建“checkpoint”目录作为预写式日志的存放路径。
  1. cd ./kafka
  2. mkdir checkpoint
4.继续在当前目录下创建StreamingExamples.scala代码文件,用于设置log4j:
  1. cd /usr/local/spark-3.5.1/mycode/
  2. vi kafka/src/main/scala/StreamingExamples.scala
  3. /*StreamingExamples.scala*/
  4. package org.apache.spark.examples.streaming
  5. import org.apache.spark.internal.Logging
  6. import org.apache.log4j.{Level, Logger} /** Utility functions for Spark Streaming examples. */
  7. object StreamingExamples extends Logging {
  8. /** Set reasonable logging levels for streaming if the user has not configured log4j. */
  9. def setStreamingLogLevels() {
  10. val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
  11. if (!log4jInitialized) {
  12. // We first log something to initialize Spark's default logging, then we override the
  13. // logging level.
  14. logInfo("Setting log level to [WARN] for streaming example." +
  15. " To override add a custom log4j.properties to the classpath.")
  16. Logger.getRootLogger.setLevel(Level.WARN)
  17. } } }
5.编译打包程序

现在在“/usr/local/spark-3.5.1/mycode/kafka/src/main/scala”目录下,就有了如下3个scala文件:

然后,执行下面命令新建一个simple.sbt文件:

  1. cd /usr/local/spark-3.5.1/mycode/kafka/
  2. vim simple.sbt

在simple.sbt中输入以下代码:

  1. name := "Simple Project"
  2. version := "1.0"
  3. scalaVersion := "2.12.18"
  4. libraryDependencies += "org.apache.spark" %% "spark-core" % "3.5.1"
  5. libraryDependencies += "org.apache.spark" %% "spark-streaming" % "3.5.1" % "provided"
  6. libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka-0-10" % "3.5.1"
  7. libraryDependencies += "org.apache.kafka" % "kafka-clients" % "2.6.0"

然后执行下面命令,进行编译打包:

  1. cd /usr/local/spark-3.5.1/mycode/kafka/
  2. /usr/local/sbt-1.9.0/sbt/sbt package

打包成功界面

6. 运行程序

首先,启动Hadoop,因为如果前面KafkaWordCount.scala代码文件中采用了ssc.checkpoint
("/user/hadoop/checkpoint")这种形式,这时的检查点是被写入HDFS,因此需要启动Hadoop。启动Hadoop的命令如下:

  1. cd /usr/local/hadoop-2.10.1
  2. ./sbin/start-dfs.sh
  3. 或者
  4. start-dfs.sh
  5. start-yarn.sh

启动Hadoop成功以后,就可以测试刚才生成的词频统计程序了。
要注意,之前已经启动了Zookeeper服务和Kafka服务,因为之前那些终端窗口都没有关闭,所以,这些服务一直都在运行。如果不小心关闭了之前的终端窗口,那就参照前面的内容,再次启动Zookeeper服务,启动Kafka服务。
然后,新打开一个终端,执行如下命令,运行“KafkaWordProducer”程序,生成一些单词(是一堆整数形式的单词):

  1. cd /usr/local/spark-3.5.1/mycode/kafka/
  2. /usr/local/spark-3.5.1/bin/spark-submit --class "KafkaWordProducer" ./target/scala-2.12/sime-project_2.12-1.0.jar localhost:9092 wordsender 3 5

注意,上面命令中,“localhost:9092 wordsender 3 5”是提供给KafkaWordProducer程序的4个输入参数,第1个参数“localhost:9092”是Kafka的Broker的地址,第2个参数“wordsender”是Topic的名称,我们在KafkaWordCount.scala代码中已经把Topic名称写死掉,所以,KafkaWordCount程序只能接收名称为“wordsender”的Topic。第3个参数“3”表示每秒发送3条消息,第4个参数“5”表示每条消息包含5个单词(实际上就是5个整数)。
执行上面命令后,屏幕上会不断滚动出现类似如下的新单词:

不要关闭这个终端窗口,让它一直不断发送单词。然后,再打开一个终端,执行下面命令,运行KafkaWordCount程序,执行词频统计:

  1. cd /usr/local/spark-3.5.1/mycode/kafka/
  2. /usr/local/spark-3.5.1/bin/spark-submit --class "KafkaWordCount" ./target/scala-2.12/simple-oject_2.12-1.0.jar
运行上面命令以后,就启动了词频统计功能,屏幕上就会显示如下类似信息:

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Guff_9hys/article/detail/784244
推荐阅读
相关标签
  

闽ICP备14008679号