赞
踩
我们知道对于一个网站的用户访问流量是不间断的,基于网站的访问日志,即WEB LOG的分析是经典的流式实时计算应用场景。比如百度统计,它可以做流量分析、来源分析、网站分析、转化分析。另外还有特定场景分析,比如安全分析,用来识别 CC 攻击、 SQL 注入分析、脱库等。这里我们简单实现一个类似于百度分析的系统。
这里的课程来自于【实验楼课程】:流式实时日志分析系统——《Spark 最佳实践》。课程内容基于图灵教育的《Spark最佳实践》第六章制作。如需系统的学习本书,请参考书籍内容《Spark 最佳实践》。
这里的Spark实践内容,是参考自上述实验楼内容进行,以分析学习为目的。并通过自己本机的实验环境,重新分析运行该系统的实现方法。如本机无Spark等运行环境,可以直接到上述实验楼课程中进行在线实验。
服务器访问日志分析方法
百度统计(tongji.baidu.com)是百度推出的一款免费的专业网站流量分析工具,能够告诉用户访客是如何找到并浏览用户的网站的,以及在网站上浏览了哪些页面。这些信息可以帮助用户改善访客在其网站上的使用体验,不断提升网站的投资回报率。
百度统计提供了几十种图形化报告,包括:趋势分析、来源分析、页面分析、访客分析、定制分析等多种统计分析服务。
这里我们参考百度统计的功能,基于 Spark Streaming 简单实现一个分析系统,使之包括以下分析功能。
日志实时采集方案
Web log 一般在 HTTP 服务器收集,比如 Nginx access 日志文件。一个典型的方案是 Nginx 日志文件 + Flume + Kafka + Spark Streaming,如下所述:
当然,还可以进一步优化,比如 CGI 程序直接发日志消息到 Kafka ,节省了写访问日志的磁盘开销。这里主要专注 Spark Streaming 的应用,所以我们不做详细论述。
流式分析系统实现
这里简单模拟数据收集的发送的环节,使用python脚本模拟生成Nginx访问日志,并通过脚本的方式自动那个上传至HDFS,然后移动至指定的目录。Sprak Streaming程序监控HDFS目录,自动处理新的文件。
如下内容为系统实现的分析内容,具体的实验操作步骤见本文的后半部分,如果需要直接进行实验,可以之间查看操作步骤部分内容。
生成Nginx日志的python代码如下,创建文件夹并将保存为文件sample_web_log.py:
cd /home/yitian |
mkdir shiyanlou |
vim sample_web_log.py |
sample_web_log.py内容为:
#!/usr/bin/env python |
# -*- coding: utf-8 -*- |
import random |
import time |
class WebLogGeneration( object ): |
# 类属性,由所有类的对象共享 |
site_url_base = "http://www.xxx.com/" |
# 基本构造函数 |
def __init__( self ): |
# 前面7条是IE,所以大概浏览器类型70%为IE ,接入类型上,20%为移动设备,分别是7和8条,5% 为空 |
self .user_agent_dist = { 0.0 : "Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.2; Trident/6.0)" , |
0.1 : "Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.2; Trident/6.0)" , |
0.2 : "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; Trident/4.0; .NET CLR 2.0.50727)" , |
0.3 : "Mozilla/4.0 (compatible; MSIE6.0; Windows NT 5.0; .NET CLR 1.1.4322)" , |
0.4 : "Mozilla/5.0 (Windows NT 6.1; Trident/7.0; rv:11.0) like Gecko" , |
0.5 : "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:41.0) Gecko/20100101 Firefox/41.0" , |
0.6 : "Mozilla/4.0 (compatible; MSIE6.0; Windows NT 5.0; .NET CLR 1.1.4322)" , |
0.7 : "Mozilla/5.0 (iPhone; CPU iPhone OS 7_0_3 like Mac OS X) AppleWebKit/537.51.1 (KHTML, like Gecko) Version/7.0 Mobile/11B511 Safari/9537.53" , |
0.8 : "Mozilla/5.0 (Linux; Android 4.2.1; Galaxy Nexus Build/JOP40D) AppleWebKit/535.19 (KHTML, like Gecko) Chrome/18.0.1025.166 Mobile Safari/535.19" , |
0.9 : "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/45.0.2454.85 Safari/537.36" , |
1 : " " ,} |
self .ip_slice_list = [ 10 , 29 , 30 , 46 , 55 , 63 , 72 , 87 , 98 , 132 , 156 , 124 , 167 , 143 , 187 , 168 , 190 , 201 , 202 , 214 , 215 , 222 ] |
self .url_path_list = [ "login.php" , "view.php" , "list.php" , "upload.php" , "admin/login.php" , "edit.php" , "index.html" ] |
self .http_refer = [ "http://www.baidu.com/s?wd={query}" , "http://www.google.cn/search?q={query}" , "http://www.sogou.com/web?query={query}" , "http://one.cn.yahoo.com/s?p={query}" , "http://cn.bing.com/search?q={query}" ] |
self .search_keyword = [ "spark" , "hadoop" , "hive" , "spark mlib" , "spark sql" ] |
def sample_ip( self ): |
slice = random.sample( self .ip_slice_list, 4 ) #从ip_slice_list中随机获取4个元素,作为一个片断返回 |
return "." .join([ str (item) for item in slice ]) # todo |
def sample_url( self ): |
return random.sample( self .url_path_list, 1 )[ 0 ] |
def sample_user_agent( self ): |
dist_uppon = random.uniform( 0 , 1 ) |
return self .user_agent_dist[ float ( '%0.1f' % dist_uppon)] |
# 主要搜索引擎referrer参数 |
def sample_refer( self ): |
if random.uniform( 0 , 1 ) > 0.2 : # 只有20% 流量有refer |
return "-" |
refer_str = random.sample( self .http_refer, 1 ) |
query_str = random.sample( self .search_keyword, 1 ) |
return refer_str[ 0 ]. format (query = query_str[ 0 ]) |
def sample_one_log( self ,count = 3 ): |
time_str = time.strftime( "%Y-%m-%d %H:%M:%S" ,time.localtime()) |
while count > 1 : |
query_log = "{ip} - - [{local_time}] \"GET /{url} HTTP/1.1\" 200 0 \"{refer}\" \"{user_agent}\" \"-\"" . format (ip = self .sample_ip(),local_time = time_str,url = self .sample_url(),refer = self .sample_refer(),user_agent = self .sample_user_agent()) |
print query_log |
count = count - 1 |
if __name__ = = "__main__" : |
web_log_gene = WebLogGeneration() |
#while True: |
# time.sleep(random.uniform(0, 3)) |
web_log_gene.sample_one_log(random.uniform( 10 , 100 )) |
该项目文件下载地址:http://pan.baidu.com/s/1c2EQMHU
如下是一条日志文件的示例,为一行的形式,各个字段用空格分隔,字符串类型的值用双引号包围:
46.202.124.63 - - [2015-11-26 09:54:27] "GET /view.php HTTP/1.1" 200 0 "http://www.google.cn/search?q=hadoop" "Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.2; Trident/6.0)" "-" |
然后需要一个简单的脚本来调用上面的python脚本运行,以生成模拟日志,上传至HDFS(在后面的操作步骤的示例中,没有将生成的日志文件上传至HDFS中,而是保持至本地文件夹内),然后移动到指定的目标目录:
#!/bin/bash |
# HDFS命令 |
HDFS= "/usr/local/myhadoop/hadoop-2.6.0/bin/hadoop fs" |
# Streaming程序监听的目录,注意跟后面Streaming程序的配置要保持一致 |
streaming_dir=”/spark/streaming” |
# 清空旧数据 |
$HDFS - rm "${streaming_dir}" '/tmp/*' > /dev/null 2>&1 |
$HDFS - rm "${streaming_dir}" '/*' > /dev/null 2>&1 |
# 一直运行 |
while [ 1 ]; do |
./sample_web_log.py > test .log |
# 给日志文件加上时间戳,避免重名 |
tmplog= "access.`date +'%s'`.log" |
# 先放在临时目录,再move至Streaming程序监控的目录下,确保原子性 |
# 临时目录用的是监控目录的子目录,因为子目录不会被监控 |
$HDFS -put test .log ${streaming_dir}/tmp/$tmplog |
$HDFS - mv ${streaming_dir}/tmp/$tmplog ${streaming_dir}/ |
echo "`date +" %F %T "` put $tmplog to HDFS succeed" |
sleep 1 |
done |
Spark Streaming程序代码如下所示(Scala实现),可以在Spark运行环境配置完成后的bin/spark-shell交互式命令环境下运行。如果要以Spark程序的方式运行,可以按照下面代码注解中的说明,调整一下SitreamingContext的创建方式即可。(Spark运行环境配置可参见另外的文章:[Spark] Linux(CentOS)下搭建Spark运行环境)
启动bin/spark-shell时,为了避免因DEBUG日志信息太多而影响观察输出,可以将Debug日志重定向至文件,屏幕上只显示主要输出,方法是:./bing/spark-shell 2>spark-shell-debug.log(还要一种方式,可以更改spark使用的默认日志配置文件log4j.properties中的日志显示级别,具体见:)
// 导入类 |
import org.apache.spark.SparkConf |
import org.apache.spark.streaming.{Seconds, StreamingContext} |
// 设计计算的周期,单位秒 |
val batch = 10 |
/* |
* 这是bin/spark-shell交互式模式下创建StreamingContext的方法 |
* 非交互式请使用下面的方法来创建 |
*/ |
val ssc = new StreamingContext(sc, Seconds(batch)) |
/* |
// 非交互式下创建StreamingContext的方法 |
val conf = new SparkConf().setAppName("NginxAnay") |
val ssc = new StreamingContext(conf, Seconds(batch)) |
*/ |
/* |
* 创建输入DStream,是文本文件目录类型 |
* 本地模式下也可以使用本地文件系统的目录,比如 file:///home/spark/streaming |
*/ |
val lines = ssc.textFileStream("hdfs:///spark/streaming") |
/* |
* 下面是统计各项指标,调试时可以只进行部分统计,方便观察结果 |
*/ |
// 1. 总PV |
lines.count().print() |
// 2. 各IP的PV,按PV倒序 |
// 空格分隔的第一个字段就是IP |
lines.map(line => {(line.split( " " )( 0 ), 1 )}).reduceByKey(_ + _).transform(rdd => { |
rdd.map(ip_pv => (ip_pv._2, ip_pv._1)). |
sortByKey( false ). |
map(ip_pv => (ip_pv._2, ip_pv._1)) |
}).print() |
// 3. 搜索引擎PV |
val refer = lines.map(_.split( "\"" )( 3 )) |
// 先输出搜索引擎和查询关键词,避免统计搜索关键词时重复计算 |
// 输出(host, query_keys) |
val searchEnginInfo = refer.map(r => { |
|
val f = r.split( '/' ) |
val searchEngines = Map( |
"www.google.cn" -> "q" , |
"www.yahoo.com" -> "p" , |
"cn.bing.com" -> "q" , |
"www.baidu.com" -> "wd" , |
"www.sogou.com" -> "query" |
) |
if (f.length > 2 ) { |
val host = f( 2 ) |
if (searchEngines.contains(host)) { |
val query = r.split( '?' )( 1 ) |
if (query.length > 0 ) { |
val arr_search_q = query.split( '&' ).filter(_.indexOf(searchEngines(host)+ "=" ) == 0 ) |
if (arr_search_q.length > 0 ) |
(host, arr_search_q( 0 ).split( '=' )( 1 )) |
else |
(host, "" ) |
} else { |
(host, "" ) |
} |
} else |
( "" , "" ) |
} else |
( "" , "" ) |
}) |
// 输出搜索引擎PV |
searchEnginInfo.filter(_._1.length > 0 ).map(p => {(p._1, 1 )}).reduceByKey(_ + _).print() |
// 4. 关键词PV |
searchEnginInfo.filter(_._2.length > 0 ).map(p => {(p._2, 1 )}).reduceByKey(_ + _).print() |
// 5. 终端类型PV |
lines.map(_.split( "\"" )( 5 )).map(agent => { |
val types = Seq( "iPhone" , "Android" ) |
var r = "Default" |
for (t <- types) { |
if (agent.indexOf(t) != - 1 ) |
r = t |
} |
(r, 1 ) |
}).reduceByKey(_ + _).print() |
// 6. 各页面PV |
lines.map(line => {(line.split( "\"" )( 1 ).split( " " )( 1 ), 1 )}).reduceByKey(_ + _).print() |
// 启动计算,等待执行结束(出错或Ctrl-C退出) |
ssc.start() |
ssc.awaitTermination() |
按照之后的操作步骤所示的内容,打开两个linux命令行界面,一个调用上面的bash脚本模拟提交日志,一个在交互式环境下运行上面Spark Streaming程序代码命令。可以在显示界面中看到如下输出信息,比如某个批次下的输出为(依次对应Streaming文件中的6个计算项),这里对输出结果进行说明:
1. 网站总PV:
------------------------------------------- |
Time: 1448533850000 ms |
------------------------------------------- |
44374 |
2. 访问网站的各来源IP的PV,按PV倒序输出:
------------------------------------------- |
Time: 1448533850000 ms |
------------------------------------------- |
(72.63.87.30,30) |
(63.72.46.55,30) |
(98.30.63.10,29) |
(72.55.63.46,29) |
(63.29.10.30,29) |
(29.30.63.46,29) |
(55.10.98.87,27) |
(46.29.98.30,27) |
(72.46.63.30,27) |
(87.29.55.10,26) |
3. 网站搜索引擎来源PV
------------------------------------------- |
Time: 1448533850000 ms |
------------------------------------------- |
(cn.bing.com,1745) |
(www.baidu.com,1773) |
(www.google.cn,1793) |
(www.sogou.com,1845) |
4. 关键词PV
------------------------------------------- |
Time: 1448533850000 ms |
------------------------------------------- |
(spark,1426) |
(hadoop,1455) |
(spark sql,1429) |
(spark mlib,1426) |
(hive,1420) |
5. 终端类型PV
------------------------------------------- |
Time: 1448533850000 ms |
------------------------------------------- |
(Android,4281) |
(Default,35745) |
(iPhone,4348) |
6. 各页面PV
------------------------------------------- |
Time: 1448533850000 ms |
------------------------------------------- |
(/edit.php,6435) |
(/admin/login.php,6271) |
(/login.php,6320) |
(/upload.php,6278) |
(/list.php,6411) |
(/index.html,6309) |
(/view.php,6350) |
更好的数据处理方法
查看数据最直观的做法是用图形来展示,常见的做法是将Spark对日志的处理结果写入外部DB,然后通过一些图形化的报表或图形展示系统展示出来。比如对于终端类型,就可以使用饼状图展示出来。
多周期统计
除了常规的每个固定周期进行一次统计,我们还可以对连续多个周期的数据进行统计。以统计总 PV 为例,上面的示例是每 10 秒统计一次,可能还需要每分钟统计一次,相当于 6 个 10 秒的周期。我们可以利用窗口方法实现,不同的代码如下:
// 窗口方法必须配置checkpint,可以这样配置: |
ssc.checkpoint("hdfs:///spark/checkpoint") |
// 这是常规每10秒一个周期的PV统计 |
lines.count().print() |
// 这是每分钟(连续多个周期)一次的PV统计 |
lines.countByWindow(Seconds(batch*6), Seconds(batch*6)).print() |
使用相同的方式运行程序之后,我们首先会看到连续6次10秒周期的PV统计输出:
------------------------------------------- |
Time: 1448535090000 ms |
------------------------------------------- |
1101 |
------------------------------------------- |
Time: 1448535100000 ms |
------------------------------------------- |
816 |
------------------------------------------- |
Time: 1448535110000 ms |
------------------------------------------- |
892 |
------------------------------------------- |
Time: 1448535120000 ms |
------------------------------------------- |
708 |
------------------------------------------- |
Time: 1448535130000 ms |
------------------------------------------- |
881 |
------------------------------------------- |
Time: 1448535140000 ms |
------------------------------------------- |
872 |
在这之后,有一个1分钟周期的PV统计输出,它的值刚好是上面6次计算结果的总和:
Time: 1448535140000 ms |
------------------------------------------- |
5270 |
1. 准备日志生成代码
将上述用于生成模拟日志的代码保存至sample_web_log.py文件中:
cd /home/yitian |
mkdir shiyanlou |
vim sample_web_log.py |
文件内容,就如同上面sample_web_log.py文件内容。保存并退出。
修改代码执行权限
运行如下命令,为日志生成文件添加执行权限:
chmod +x sample_web_log.py |
晚上上述步骤,后面就可以使用bash命令调用该文件代码,生成日志文件了。
2. 启动Spark Shell
接下来,进入Spark安装目录,启动Spark Shell:
spark-shell 2>spark-shell-debug.log |
运行成功后,界面如下:
[root@localhost conf]# spark-shell 2>spark-shell-debug.log |
Welcome to |
____ __ |
/ __/__ ___ _____/ /__ |
_\ \/ _ \/ _ `/ __/ '_/ |
/___/ .__/\_,_/_/ /_/\_\ version 1.6.2 |
/_/ |
Using Scala version 2.10.5 (Java HotSpot(TM) Client VM, Java 1.8.0_144) |
Type in expressions to have them evaluated. |
Type :help for more information. |
Spark context available as sc. |
SQL context available as sqlContext. |
scala> |
重要说明:成功启动Spark Shell后,不要关闭运行Spark Shell的终端,之后的其他命令操作需要在新打开的终端中执行。
3. 创建日志保存目录
在稍后的步骤中,会将Python脚本生成的日志保存到本地的文件中。因此需要首先为保存的日志文件创建一个空目录。在 /home/yitian/shiyanlou
目录下新建 streaming
目录,并增设 tmp
临时文件夹。
[yitian@localhost shiyanlou]$ mkdir /home/yitian/shiyanlou/streaming |
[yitian@localhost shiyanlou]$ mkdir /home/yitian/shiyanlou/streaming/tmp |
4. 使用BASH脚本生成日志
这里没有使用上面提到的日志生成脚本的调用代码(日志生成后上传至HDFS),而是将日志生成之后保存在本地,因此在/home/yitian/shiyanlou目录下,创建如下bash脚本运行文件genLog.sh:
#!/bin/bash |
while [ 1 ]; do |
./sample_web_log.py > test .log |
tmplog= "access.`date +'%s'`.log" |
cp test .log streaming/tmp/$tmplog |
mv streaming/tmp/$tmplog streaming/ |
echo "`date +" %F %T "` generating $tmplog succeed" |
sleep 1 |
done |
编辑完成后,保存退出vim编辑器,同时需要修改该脚本文件的执行权限:
chmod +x genLog.sh |
5. 在Sprak Shell中运行Spark Streaming命令进行日志分析
在Spark Shell交互命令界面中,分段输入如下代码,监控日志输出目录并自动调用日志文件进行日志分析。
首先是引用相关的包。由于我们使用的是 Spark Shell(即以交互式模式进行编程),在它启动的过程中就已经创建了 SparkContext 对象 sc
,因此我们可以直接使用 sc
对象。
导入 Streaming 的相关类:
scala> import org.apache.spark.streaming.{Seconds, StreamingContext} |
import org.apache.spark.streaming.{Seconds, StreamingContext} |
设置计算的周期为10秒:
scala> val batch = 10 |
batch: Int = 10 |
在Spark Shell中创建StreamingContext对象:
scala> val ssc = new StreamingContext(sc, Seconds(batch)) |
ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@1485e82 |
创建输入DStream,是文本文件目录类型。这里使用本地文件系统中的目录,即之前创建的/home/yitian/shiyanlou/streaming
目录:
scala> val lines = ssc.textFileStream("file:///home/yitian/shiyanlou/streaming") |
lines: org.apache.spark.streaming.dstream.DStream[String] = <a href="mailto:org.apache.spark.streaming.dstream.MappedDStream@a43556">org.apache.spark.streaming.dstream.MappedDStream@a43556 |
</a> |
接下来就可以统计各项网站访问的指标:
scala> lines.count().print() |
scala> lines.map(line => {(line.split(" ")(0), 1)}).reduceByKey(_ + _).transform(rdd => { |
| rdd.map(ip_pv => (ip_pv._2, ip_pv._1)). |
| sortByKey(false). |
| map(ip_pv => (ip_pv._2, ip_pv._1)) |
| }).print() |
scala> val searchEnginInfo = refer.map(r => { |
| |
| val f = r.split('/') |
| |
| val searchEngines = Map( |
| "www.google.cn" -> "q", |
| "www.yahoo.com" -> "p", |
| "cn.bing.com" -> "q", |
| "www.baidu.com" -> "wd", |
| "www.sogou.com" -> "query" |
| ) |
| |
| if (f.length > 2) { |
| val host = f(2) |
| |
| if (searchEngines.contains(host)) { |
| val query = r.split('?')(1) |
| if (query.length > 0) { |
| val arr_search_q = query.split('&').filter(_.indexOf(searchEngines(host)+"=") == 0) |
| if (arr_search_q.length > 0) |
| (host, arr_search_q(0).split('=')(1)) |
| else |
| (host, "") |
| } else { |
| (host, "") |
| } |
| } else |
| ("", "") |
| } else |
| ("", "") |
| |
| }) |
searchEnginInfo: org.apache.spark.streaming.dstream.DStream[(String, String)] = <a href="mailto:org.apache.spark.streaming.dstream.MappedDStream@16d5c70">org.apache.spark.streaming.dstream.MappedDStream@16d5c70 |
</a> |
scala> searchEnginInfo.filter(_._1.length > 0).map(p => {(p._1, 1)}).reduceByKey(_ + _).print() |
scala> searchEnginInfo.filter(_._2.length > 0).map(p => {(p._2, 1)}).reduceByKey(_ + _).print() |
scala> lines.map(_.split("\"")(5)).map(agent => { |
| val types = Seq("iPhone", "Android") |
| var r = "Default" |
| for (t <- types) { |
| if (agent.indexOf(t) != -1) |
| r = t |
| } |
| (r, 1) |
| }).reduceByKey(_ + _).print() |
scala> lines.map(line => {(line.split("\"")(1).split(" ")(1), 1)}).reduceByKey(_ + _).print() |
各项统计指标设置好之后,就可以启动计算,等待执行结束:
scala> ssc.start() |
scala> ssc.awaitTermination() |
如果需要结束计算过程,可以按下 Ctrl + C
键。
6. 开始生成日志并查看结果
先不要关闭运行着 Spark Streaming 的终端,回到之前创建 genLog.sh
文件的终端里(或者新打开一个),运行 genLog.sh 脚本。
[root@localhost shiyanlou] # ./genLog.sh |
2017-09-01 02:40:21 generating access.1504258821.log succeed |
2017-09-01 02:40:22 generating access.1504258822.log succeed |
2017-09-01 02:40:23 generating access.1504258823.log succeed |
2017-09-01 02:40:24 generating access.1504258824.log succeed |
2017-09-01 02:40:25 generating access.1504258825.log succeed |
2017-09-01 02:40:26 generating access.1504258826.log succeed |
在 Spark Streaming 的终端内,就可以看到输出的分析结果了:
Time: 1504258860000 ms |
------------------------------------------- |
(143.222.168.29,1) |
(55.168.87.156,1) |
(187.215.132.63,1) |
(190.132.167.29,1) |
(46.72.10.143,1) |
(143.215.55.46,1) |
(29.201.124.63,1) |
(201.202.156.187,1) |
(87.156.72.46,1) |
(124.46.143.168,1) |
... |
------------------------------------------- |
Time: 1504258860000 ms |
------------------------------------------- |
(cn.bing.com,28) |
(www.google.cn,15) |
(www.sogou.com,13) |
(www.baidu.com,24) |
------------------------------------------- |
Time: 1504258860000 ms |
------------------------------------------- |
(spark,15) |
(hive,12) |
(hadoop,13) |
(spark mlib,13) |
(spark sql,27) |
------------------------------------------- |
Time: 1504258860000 ms |
------------------------------------------- |
(iPhone,45) |
(Default,372) |
(Android,41) |
------------------------------------------- |
Time: 1504258860000 ms |
------------------------------------------- |
(/admin/login.php,67) |
(/upload.php,63) |
(/index.html,74) |
(/view.php,62) |
(/login.php,70) |
(/edit.php,76) |
(/list.php,46) |
------------------------------------------- |
Time: 1504258870000 ms |
------------------------------------------- |
471 |
观察完毕,请通过 Ctrl + C
关闭日志生成的进程和 Spark Streaming 的进程。实验完成!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。