1. 案例介绍
该案例中,我们假设某论坛需要根据用户对站内网页的点击量,停留时间,以及是否点赞,来近实时的计算网页热度,进而动态的更新网站的今日热点模块,把最热话题的链接显示其中。
2. 案例分析
对于某一个访问论坛的用户,我们需要对他的行为数据做一个抽象,以便于解释网页话题热度的计算过程。
首先,我们通过一个向量来定义用户对于某个网页的行为即点击的网页,停留时间,以及是否点赞,可以表示如下:
(page001.html, 1, 0.5, 1)
向量的第一项表示网页的 ID,第二项表示从进入网站到离开对该网页的点击次数,第三项表示停留时间,以分钟为单位,第四项是代表是否点赞,1 为赞,-1 表示踩,0 表示中立。
其次,我们再按照各个行为对计算网页话题热度的贡献,给其设定一个权重,在本文中,我们假设点击次数权重是 0.8,因为用户可能是由于没有其他更好的话题,所以再次浏览这个话题。停留时间权重是 0.8,因为用户可能同时打开多个 tab 页,但他真正关注的只是其中一个话题。是否点赞权重是 1,因为这一般表示用户对该网页的话题很有兴趣。
最后,我们定义用下列公式计算某条行为数据对于该网页热度的贡献值。
f(x,y,z)=0.8x+0.8y+z
那么对于上面的行为数据 (page001.html, 1, 0.5, 1),利用公式可得:
H(page001)=f(x,y,z)= 0.8x+0.8y+z=0.8*1+0.8*0.5+1*1=2.2
读者可以留意到,在这个过程中,我们忽略了用户本身,也就是说我们不关注用户是谁,而只关注它对于网页热度所做的贡献。
3. 生产行为数据消息
在本案例中我们将使用一段程序来模拟用户行为,该程序每隔 5 秒钟会随机的向 user-behavior-topic 主题推送 0 到 50 条行为数据消息,显然,这个程序扮演消息生产者的角色,在实际应用中,这个功能一般会由一个系统来提供。为了简化消息处理,我们定义消息的格式如下:
网页 ID|点击次数|停留时间 (分钟)|是否点赞
并假设该网站只有 100 个网页。以下是该类的 Scala 实现源码。
UserBehaviorMsgProducer 类源码
- import scala.util.Random
- import java.util.Properties
- import kafka.producer.KeyedMessage
- import kafka.producer.ProducerConfig
- import kafka.producer.Producer
-
- class UserBehaviorMsgProducer(brokers: String, topic: String) extends Runnable {
- private val brokerList = brokers
- private val targetTopic = topic
- private val props = new Properties()
- props.put("metadata.broker.list", this.brokerList)
- props.put("serializer.class", "kafka.serializer.StringEncoder")
- props.put("producer.type", "async")
- private val config = new ProducerConfig(this.props)
- private val producer = new Producer[String, String](this.config)
-
- private val PAGE_NUM = 100
- private val MAX_MSG_NUM = 3
- private val MAX_CLICK_TIME = 5
- private val MAX_STAY_TIME = 10
- //Like,1;Dislike -1;No Feeling 0
- private val LIKE_OR_NOT = Array[Int](1, 0, -1)
-
- def run(): Unit = {
- val rand = new Random()
- while (true) {
- //how many user behavior messages will be produced
- val msgNum = rand.nextInt(MAX_MSG_NUM) + 1
- try {
- //generate the message with format like page1|2|7.123|1
- for (i <- 0 to msgNum) {
- var msg = new StringBuilder()
- msg.append("page" + (rand.nextInt(PAGE_NUM) + 1))
- msg.append("|")
- msg.append(rand.nextInt(MAX_CLICK_TIME) + 1)
- msg.append("|")
- msg.append(rand.nextInt(MAX_CLICK_TIME) + rand.nextFloat())
- msg.append("|")
- msg.append(LIKE_OR_NOT(rand.nextInt(3)))
- println(msg.toString())
- //send the generated message to broker
- sendMessage(msg.toString())
- }
- println("%d user behavior messages produced.".format(msgNum+1))
- } catch {
- case e: Exception => println(e)
- }
- try {
- //sleep for 5 seconds after send a micro batch of message
- Thread.sleep(5000)
- } catch {
- case e: Exception => println(e)
- }
- }
- }
- def sendMessage(message: String) = {
- try {
- val data = new KeyedMessage[String, String](this.topic, message);
- producer.send(data);
- } catch {
- case e:Exception => println(e)
- }
- }
- }
- object UserBehaviorMsgProducerClient {
- def main(args: Array[String]) {
- if (args.length < 2) {
- println("Usage:UserBehaviorMsgProducerClient 192.168.1.1:9092 user-behavior-topic")
- System.exit(1)
- }
- //start the message producer thread
- new Thread(new UserBehaviorMsgProducer(args(0), args(1))).start()
- }
- }
4. 编写 Spark Streaming 程序消费消息
在弄清楚了要解决的问题之后,就可以开始编码实现了。对于本案例中的问题,在实现上的基本步骤如下:
构建 Spark 的 StreamingContext 实例,并且开启 checkpoint 功能。因为我们需要使用 updateStateByKey 原语去累计的更新网页话题的热度值。
利用 Spark 提供的 KafkaUtils.createStream 方法消费消息主题,这个方法会返回 ReceiverInputDStream 对象实例。
对于每一条消息,利用上文的公式计算网页话题的热度值。
定义一个匿名函数去把网页热度上一次的计算结果值和新计算的值相加,得到最新的热度值。
调用 updateStateByKey 原语并传入上面定义的匿名函数更新网页热度值。
最后得到最新结果后,需要对结果进行排序,最后打印热度值最高的 10 个网页。
源代码如下。
WebPagePopularityValueCalculator 类源码
- import org.apache.spark.SparkConf
- import org.apache.spark.streaming.Seconds
- import org.apache.spark.streaming.StreamingContext
- import org.apache.spark.streaming.kafka.KafkaUtils
- import org.apache.spark.HashPartitioner
- import org.apache.spark.streaming.Duration
-
- object WebPagePopularityValueCalculator {
- private val checkpointDir = "popularity-data-checkpoint"
- private val msgConsumerGroup = "user-behavior-topic-message-consumer-group"
-
- def main(args: Array[String]) {
- if (args.length < 2) {
- println("Usage:WebPagePopularityValueCalculator zkserver1:2181,
- zkserver2:2181,zkserver3:2181 consumeMsgDataTimeInterval(secs)")
- System.exit(1)
- }
- val Array(zkServers,processingInterval) = args
- val conf = new SparkConf().setAppName("Web Page Popularity Value Calculator")
- val ssc = new StreamingContext(conf, Seconds(processingInterval.toInt))
- //using updateStateByKey asks for enabling checkpoint
- ssc.checkpoint(checkpointDir)
- val kafkaStream = KafkaUtils.createStream(
- //Spark streaming context
- ssc,
- //zookeeper quorum. e.g zkserver1:2181,zkserver2:2181,...
- zkServers,
- //kafka message consumer group ID
- msgConsumerGroup,
- //Map of (topic_name -> numPartitions) to consume. Each partition is consumed in its own thread
- Map("user-behavior-topic" -> 3))
- val msgDataRDD = kafkaStream.map(_._2)
- //for debug use only
- //println("Coming data in this interval...")
- //msgDataRDD.print()
- // e.g page37|5|1.5119122|-1
- val popularityData = msgDataRDD.map { msgLine =>
- {
- val dataArr: Array[String] = msgLine.split("\\|")
- val pageID = dataArr(0)
- //calculate the popularity value
- val popValue: Double = dataArr(1).toFloat * 0.8 + dataArr(2).toFloat * 0.8 + dataArr(3).toFloat * 1
- (pageID, popValue)
- }
- }
- //sum the previous popularity value and current value
- val updatePopularityValue = (iterator: Iterator[(String, Seq[Double], Option[Double])]) => {
- iterator.flatMap(t => {
- val newValue:Double = t._2.sum
- val stateValue:Double = t._3.getOrElse(0);
- Some(newValue + stateValue)
- }.map(sumedValue => (t._1, sumedValue)))
- }
- val initialRDD = ssc.sparkContext.parallelize(List(("page1", 0.00)))
- val stateDstream = popularityData.updateStateByKey[Double](updatePopularityValue,
- new HashPartitioner(ssc.sparkContext.defaultParallelism), true, initialRDD)
- //set the checkpoint interval to avoid too frequently data checkpoint which may
- //may significantly reduce operation throughput
- stateDstream.checkpoint(Duration(8*processingInterval.toInt*1000))
- //after calculation, we need to sort the result and only show the top 10 hot pages
- stateDstream.foreachRDD { rdd => {
- val sortedData = rdd.map{ case (k,v) => (v,k) }.sortByKey(false)
- val topKData = sortedData.take(10).map{ case (v,k) => (k,v) }
- topKData.foreach(x => {
- println(x)
- })
- }
- }
- ssc.start()
- ssc.awaitTermination()
- }
- }
WebPagePopularityValueCalculator 类启动命令
- bin/spark-submit \
- --jars $SPARK_HOME/lib/spark-streaming-kafka_2.10-1.3.1.jar, \
- $SPARK_HOME/lib/spark-streaming-kafka-assembly_2.10-1.3.1.jar, \
- $SPARK_HOME/lib/kafka_2.10-0.8.2.1.jar, \
- $SPARK_HOME/lib/kafka-clients-0.8.2.1.jar \
- --class com.ibm.spark.exercise.streaming.WebPagePopularityValueCalculator
- --master spark://<spark_master_ip>:7077 \
- --num-executors 4 \
- --driver-memory 4g \
- --executor-memory 2g \
- --executor-cores 2 \
- /home/fams/sparkexercise.jar \
- 192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181 2
由于程序中我们要用到或者间接调用 Kafka 的 API,并且需要调用 Spark Streaming 集成 Kafka 的 API(KafkaUtils.createStream), 所以需要提前将启动命令中的 jar 包上传到 Spark 集群的每个机器上 (本例中我们将它们上传到 Spark 安装目录的 lib 目录下,即$SPARK_HOME/lib),并在启动命令中引用它们。