赞
踩
- cd ~ //默认压缩包放在根目录
- sudo tar -zxf kafka_2.12-2.6.0.tgz -C /usr/local
- cd /usr/local
- sudo mv kafka_2.12-2.6.0 kafka-2.6.0
- sudo chown -R qiangzi ./kafka-2.6.0
- cd /usr/local/kafka-2.6.0
- ./bin/zookeeper-server-start.sh config/zookeeper.properties
注意:以上现象是Zookeeper服务器已经启动,正在处于服务状态。不要关闭!
- cd /usr/local/kafka-2.6.0
- ./bin/kafka-server-start.sh config/server.properties
-
- //加了“&”的命令,Kafka就会在后台运行,即使关闭了这个终端,Kafka也会一直在后台运行。
- bin/kafka-server-start.sh config/server.properties &
注意:同样不要误以为死机了,而是Kafka服务器已经启动,正在处于服务状态。
- cd /usr/local/kafka-2.6.0
- ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic wordsender
./bin/kafka-topics.sh --list --zookeeper localhost:2181
- cd /usr/local/kafka-2.6.0
- bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic wordsender
注意,所有这些终端窗口都不要关闭,要继续留着后面使用。
Kafka和Flume等高级输入源,需要依赖独立的库(jar文件),因此,需要为Spark添加相关jar包。访问MVNREPOSITORY官网(http://mvnrepository.com),下载spark-streaming-kafka-0-10_2.12-3.5.1.jar和spark-token-provider-kafka-0-10_2.12-3.5.1.jar文件,其中,2.12表示Scala的版本号,3.5.1表示Spark版本号。然后,把这两个文件复制到Spark目录的jars目录下(即“/usr/local/spark-3.5.1/jars”目录)。此外,还需要把“/usr/local/kafka-2.6.0/libs”目录下的kafka-clients-2.6.0.jar文件复制到Spark目录的jars目录下。
- cd ~ .jar文件默认放在根目录
- sudo mv ./spark-streaming-kafka-0-10_2.12-3.5.1.jar /usr/local/spark-3.5.1/jars/
- sudo mv ./spark-token-provider-kafka-0-10_2.12-3.5.1.jar /usr/local/spark-3.5.1/jars/
- sudo cp /usr/local/kafka-2.6.0/libs/kafka-clients-2.6.0.jar /usr/local/spark-3.5.1/jars/
spark-streaming-kafka-0-10_2.12-3.5.1.jar的下载页面:
Maven Repository: org.apache.spark » spark-streaming-kafka-0-10_2.12 » 3.5.1 (mvnrepository.com)
spark-streaming-kafka-0-10_2.12-3.5.1.jar的下载页面:
进入下载页面以后,如下图所示,点击红色方框内的“jar”,就可以下载JAR包了。
- cd /usr/local/spark-3.5.1
- mkdir mycode
- cd ./mycode
- mkdir kafka
- mkdir -p kafka/src/main/scala
- vi kafka/src/main/scala/KafkaWordProducer.scala
它是用来产生一系列字符串的程序,会产生随机的整数序列,每个整数被当作一个单词,提供给KafkaWordCount程序去进行词频统计。请在KafkaWordProducer.scala中输入以下代码:
- import java.util.HashMap
- import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
- import org.apache.spark.SparkConf
- import org.apache.spark.streaming._
- import org.apache.spark.streaming.kafka010._
- object KafkaWordProducer {
- def main(args: Array[String]) {
- if (args.length < 4) {
- System.err.println("Usage: KafkaWordProducer <metadataBrokerList> <topic> " +
- "<messagesPerSec> <wordsPerMessage>")
- System.exit(1)
- }
- val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args
- // Zookeeper connection properties
- val props = new HashMap[String, Object]()
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
- "org.apache.kafka.common.serialization.StringSerializer")
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
- "org.apache.kafka.common.serialization.StringSerializer")
- val producer = new KafkaProducer[String, String](props)
- // Send some messages
- while(true) {
- (1 to messagesPerSec.toInt).foreach { messageNum =>
- val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).
- toString)
- .mkString(" ")
- print(str)
- println()
- val message = new ProducerRecord[String, String](topic, null, str)
- producer.send(message)
- }
- Thread.sleep(1000)
- }
- }
- }
在“/usr/local/spark-3.5.1/mycode/kafka/src/main/scala”目录下创建文件KafkaWordCount.scala,用于单词词频统计,它会把KafkaWordProducer发送过来的单词进行词频统计,代码内容如下:
- cd /usr/local/spark-3.5.1/mycode
- vi kafka/src/main/scala/KafkaWordCount.scala
- import org.apache.spark._
- import org.apache.spark.SparkConf
- import org.apache.spark.rdd.RDD
- import org.apache.spark.streaming._
- import org.apache.spark.streaming.kafka010._
- import org.apache.spark.streaming.StreamingContext._
- import org.apache.spark.streaming.kafka010.KafkaUtils
- import org.apache.kafka.common.serialization.StringDeserializer
- import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
- import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
-
- object KafkaWordCount{
- def main(args:Array[String]){
- val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")
- val sc = new SparkContext(sparkConf)
- sc.setLogLevel("ERROR")
- val ssc = new StreamingContext(sc,Seconds(10))
- ssc.checkpoint("file:///usr/local/spark-3.5.1/mycode/kafka/checkpoint") //设置检查点,如果存放在HDFS上面,则写成类似ssc.checkpoint("/user/hadoop/checkpoint")这种形式,但是,要启动Hadoop
- val kafkaParams = Map[String, Object](
- "bootstrap.servers" -> "localhost:9092",
- "key.deserializer" -> classOf[StringDeserializer],
- "value.deserializer" -> classOf[StringDeserializer],
- "group.id" -> "use_a_separate_group_id_for_each_stream",
- "auto.offset.reset" -> "latest",
- "enable.auto.commit" -> (true: java.lang.Boolean)
- )
- val topics = Array("wordsender")
- val stream = KafkaUtils.createDirectStream[String, String](
- ssc,
- PreferConsistent,
- Subscribe[String, String](topics, kafkaParams)
- )
- stream.foreachRDD(rdd => {
- val offsetRange = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
- val maped: RDD[(String, String)] = rdd.map(record => (record.key,record.value))
- val lines = maped.map(_._2)
- val words = lines.flatMap(_.split(" "))
- val pair = words.map(x => (x,1))
- val wordCounts = pair.reduceByKey(_+_)
- wordCounts.foreach(println)
- })
- ssc.start
- ssc.awaitTermination
- }
- }
- cd ./kafka
- mkdir checkpoint
- cd /usr/local/spark-3.5.1/mycode/
- vi kafka/src/main/scala/StreamingExamples.scala
-
-
- /*StreamingExamples.scala*/
- package org.apache.spark.examples.streaming
- import org.apache.spark.internal.Logging
- import org.apache.log4j.{Level, Logger} /** Utility functions for Spark Streaming examples. */
- object StreamingExamples extends Logging {
- /** Set reasonable logging levels for streaming if the user has not configured log4j. */
- def setStreamingLogLevels() {
- val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
- if (!log4jInitialized) {
- // We first log something to initialize Spark's default logging, then we override the
- // logging level.
- logInfo("Setting log level to [WARN] for streaming example." +
- " To override add a custom log4j.properties to the classpath.")
- Logger.getRootLogger.setLevel(Level.WARN)
- } } }
现在在“/usr/local/spark-3.5.1/mycode/kafka/src/main/scala”目录下,就有了如下3个scala文件:
然后,执行下面命令新建一个simple.sbt文件:
- cd /usr/local/spark-3.5.1/mycode/kafka/
- vim simple.sbt
在simple.sbt中输入以下代码:
- name := "Simple Project"
- version := "1.0"
- scalaVersion := "2.12.18"
- libraryDependencies += "org.apache.spark" %% "spark-core" % "3.5.1"
- libraryDependencies += "org.apache.spark" %% "spark-streaming" % "3.5.1" % "provided"
- libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka-0-10" % "3.5.1"
- libraryDependencies += "org.apache.kafka" % "kafka-clients" % "2.6.0"
然后执行下面命令,进行编译打包:
- cd /usr/local/spark-3.5.1/mycode/kafka/
- /usr/local/sbt-1.9.0/sbt/sbt package
打包成功界面
首先,启动Hadoop,因为如果前面KafkaWordCount.scala代码文件中采用了ssc.checkpoint
("/user/hadoop/checkpoint")这种形式,这时的检查点是被写入HDFS,因此需要启动Hadoop。启动Hadoop的命令如下:
- cd /usr/local/hadoop-2.10.1
- ./sbin/start-dfs.sh
- 或者
- start-dfs.sh
- start-yarn.sh
启动Hadoop成功以后,就可以测试刚才生成的词频统计程序了。
要注意,之前已经启动了Zookeeper服务和Kafka服务,因为之前那些终端窗口都没有关闭,所以,这些服务一直都在运行。如果不小心关闭了之前的终端窗口,那就参照前面的内容,再次启动Zookeeper服务,启动Kafka服务。
然后,新打开一个终端,执行如下命令,运行“KafkaWordProducer”程序,生成一些单词(是一堆整数形式的单词):
- cd /usr/local/spark-3.5.1/mycode/kafka/
- /usr/local/spark-3.5.1/bin/spark-submit --class "KafkaWordProducer" ./target/scala-2.12/sime-project_2.12-1.0.jar localhost:9092 wordsender 3 5
注意,上面命令中,“localhost:9092 wordsender 3 5”是提供给KafkaWordProducer程序的4个输入参数,第1个参数“localhost:9092”是Kafka的Broker的地址,第2个参数“wordsender”是Topic的名称,我们在KafkaWordCount.scala代码中已经把Topic名称写死掉,所以,KafkaWordCount程序只能接收名称为“wordsender”的Topic。第3个参数“3”表示每秒发送3条消息,第4个参数“5”表示每条消息包含5个单词(实际上就是5个整数)。
执行上面命令后,屏幕上会不断滚动出现类似如下的新单词:
不要关闭这个终端窗口,让它一直不断发送单词。然后,再打开一个终端,执行下面命令,运行KafkaWordCount程序,执行词频统计:
- cd /usr/local/spark-3.5.1/mycode/kafka/
- /usr/local/spark-3.5.1/bin/spark-submit --class "KafkaWordCount" ./target/scala-2.12/simple-oject_2.12-1.0.jar
运行上面命令以后,就启动了词频统计功能,屏幕上就会显示如下类似信息:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。