赞
踩
服务器日志分析是Spark一个理想的应用场景。日志文件通常很大,Spark能够以低成本存储这些日志,同时提供简单而高效的方式进行分析。日志分析对于改善商业运营、建立推荐系统和欺诈检测都十分有用。
Apache Web服务器日志的每一行格式与下面的例子类似。
127.0.0.1 - - [01/Aug/1995:00:00:01 -0400] "GET /images/launch-logo.gif HTTP/1.0" 200 1839
其中各部分的定义描述如下。
本实验涉及到的几个新的动作函数有以下这些。
本实验使用的数据集是美国国家航天局(NASA)位于美国佛罗里达数据中心的Web服务器日志,截取了大约100万条记录。数据集已经存放在HDFS上,路径为“/data/12/3/”。
首先载入所需的类库。再定义一个月份名称与数字的字典以及日志行的正则表达式。然后定义函数 parse_apache_time() ,将日志行中的日期字符串转换为Python的 datetime 对象。最后定义函数 parseApacheLogLine() ,调用正则表达式匹配函数 search() 解析出日志行的9个字段,如果日志行能够成功匹配正则表达式则返回包含 Row 对象和1的二元列表,否则则返回包含原始日志行字符串和0的二元列表。所有的“-”都会被替换成0。
>>> import re >>> import datetime >>> >>> from pyspark.sql import Row >>> >>> month_map = {'Jan': 1, 'Feb': 2, 'Mar':3, 'Apr':4, 'May':5, 'Jun':6, 'Jul':7, ... 'Aug':8, 'Sep': 9, 'Oct':10, 'Nov': 11, 'Dec': 12} >>> >>> APACHE_ACCESS_LOG_PATTERN = '^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+)\s*(\S*)\s*" (\d{3}) (\S+)' >>> >>> def parse_apache_time(s): ... return datetime.datetime(int(s[7:11]), ... month_map[s[3:6]], ... int(s[0:2]), ... int(s[12:14]), ... int(s[15:17]), ... int(s[18:20])) ... >>> >>> def parseApacheLogLine(logline): ... match = re.search(APACHE_ACCESS_LOG_PATTERN, logline) ... if match is None: ... return (logline, 0) ... size_field = match.group(9) ... if size_field == '-': ... size = long(0) ... else: ... size = long(match.group(9)) ... return (Row( ... host = match.group(1), ... client_identd = match.group(2), ... user_id = match.group(3), ... date_time = parse_apache_time(match.group(4)), ... method = match.group(5), ... endpoint = match.group(6), ... protocol = match.group(7), ... response_code = int(match.group(8)), ... content_size = size ... ), 1) ...
日志文件已经存放在HDFS上,路径为 “/data/12/3/apache.access.log.PROJECT” 。我们将整个创建RDD的过程封装在了函数 parseLogs() 中。首先调用函数 sc.textfile(logFile) 将文件的每一行转换为RDD中的一个元素,紧接着调用函数 map(parseApacheLogLine) 将解析函数应用于RDD中的每一个元素(即每一行日志),将其转换为包含 Row 对象的二元列表,并将该RDD缓存在内存中。最后分别生成解析成功和失败的记录,并输出其个数。最终输出的结果中,可以看出所有的日志行都已被成功解析。
>>> import sys >>> import os >>> >>> baseDir = os.path.join('/data') >>> inputPath = os.path.join('12', '3', 'apache.access.log.PROJECT') >>> logFile = os.path.join(baseDir, inputPath) >>> >>> def parseLogs(): ... parsed_logs = (sc ... .textFile(logFile) ... .map(parseApacheLogLine) ... .cache()) ... access_logs = (parsed_logs ... .filter(lambda s: s[1] == 1) ... .map(lambda s: s[0]) ... .cache()) ... failed_logs = (parsed_logs ... .filter(lambda s: s[1] == 0) ... .map(lambda s: s[0])) ... failed_logs_count = failed_logs.count() ... if failed_logs_count > 0: ... print 'Number of invalid logline: %d' % failed_logs.count() ... for line in failed_logs.take(20): ... print 'Invalid logline: %s' % line ... print 'Read %d lines, successfully parsed %d lines, failed to parse %d lines' % (parsed_logs.count(), access_logs.count(), failed_logs.count()) ... return parsed_logs, access_logs, failed_logs ... >>> parsed_logs, access_logs, failed_logs = parseLogs()
输出结果:
统计Web服务器返回的内容大小,即字段 content_size ,包括其平均值、最小值和最大值。首先调用变换函数 map() 并传入一个只取出字段 content_size 的匿名函数创建一个新的RDD。再调用动作函数 reduce() 和 count() 求出内容大小的总和以及个数,其比值即内容大小的平均值。最后调用动作函数 min() 和 max() 得到内容大小的最小值和最大值。
>>> content_sizes = access_logs.map(lambda log: log.content_size).cache()
>>> print 'Content Size Avg: %i, Min: %i, Max: %s' % (
... content_sizes.reduce(lambda a, b : a + b) / content_sizes.count(),
... content_sizes.min(),
... content_sizes.max())
输出结果:
统计不同的状态代码出现的次数。与上一步类似,首先调用变换函数 map() 取出字段 response_code ,不同的是,这里取出的是一个二元列表,即字段 response_code 和1,使得之后能够计数。再调用变换函数 reduceByKey() 按照不同的状态代码统计出现次数,并调用函数 cache() 在内存中缓存。最后调用动作函数 take() 输出成列表。
>>> responseCodeToCount = (access_logs
... .map(lambda log: (log.response_code, 1))
... .reduceByKey(lambda a, b : a + b)
... .cache())
>>> responseCodeToCountList = responseCodeToCount.take(100)
>>>> print responseCodeToCountList
输出结果:
这一步需要查看频繁访问(大于10次)的客户端主机。与上一步类似,首先调用变换函数 map() 取出包含字段 host 和1的二元列表。再调用变换函数 reduceByKey() 按照不同的主机地址统计出现次数。然后调用变换函数 filter() 取出出现次数大于10的主机地址。最后调用动作函数 take() 输出成列表。
>>> hostCountPairTuple = access_logs.map(lambda log: (log.host, 1))
>>> hostSum = hostCountPairTuple.reduceByKey(lambda a, b : a + b)
>>> hostMoreThan10 = hostSum.filter(lambda s: s[1] > 10)
>>> hostsPick20 = (hostMoreThan10
... .map(lambda s: s[0])
... .take(20))
>>> print 'Any 20 hosts that have accessed more then 10 times: %s' % hostsPick20
输出结果:
查看被访问最多的资源标识符。与上一步类似,首先调用变换函数 map() 取出包含字段 endpoint 和1的二元组。再调用变换函数 reduceByKey() 按照不同的资源标识符统计出现次数。最后调用动作函数 takeOrdered() 取出前10被访问最多的资源标识符。
>>> endpointCounts = (access_logs
... .map(lambda log: (log.endpoint, 1))
... .reduceByKey(lambda a, b : a + b))
>>> topEndpoints = endpointCounts.takeOrdered(10, lambda s: -1 * s[1])
>>> print 'Top Ten Endpoints: %s' % topEndpoints
输出结果:
查看前10个出现错误(即状态代码不是 200 )最多的资源标识符(即字段 endpoint ),最终的返回结果是按访问次数倒序排列的前10个标识符及其相应的访问次数。
>>>errorEndpointCounts = (access_logs
... .map(lambda log: (log.endpoint, 1))
... .reduceByKey(lambda a, b : a + b)
... .filter(lambda x:x[0]!='200'))
>>>topErrorEndpoints = errorEndpointCounts.takeOrdered(10, lambda s: -1 * s[1])
>>>print topErrorEndpoints
统计独立主机地址(字段 host )数。
>>>IndependentHostCounts = (access_logs
... .map(lambda log: (log.host, 1))
... .reduceByKey(lambda a, b : a + b)
... .count())
>>>print IndependentHostCounts
import re import datetime from pyspark.sql import Row month_map = {'Jan': 1, 'Feb': 2, 'Mar':3, 'Apr':4, 'May':5, 'Jun':6, 'Jul':7, 'Aug':8, 'Sep': 9, 'Oct':10, 'Nov': 11, 'Dec': 12} APACHE_ACCESS_LOG_PATTERN = '^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+)\s*(\S*)\s*" (\d{3}) (\S+)' def parse_apache_time(s): return datetime.datetime(int(s[7:11]), month_map[s[3:6]], int(s[0:2]), int(s[12:14]), int(s[15:17]), int(s[18:20])) def parseApacheLogLine(logline): match = re.search(APACHE_ACCESS_LOG_PATTERN, logline) if match is None: return (logline, 0) size_field = match.group(9) if size_field == '-': size = long(0) else: size = long(match.group(9)) return (Row( host = match.group(1), client_identd = match.group(2), user_id = match.group(3), date_time = parse_apache_time(match.group(4)), method = match.group(5), endpoint = match.group(6), protocol = match.group(7), response_code = int(match.group(8)), content_size = size ), 1)
import sys import os baseDir = os.path.join('/data') inputPath = os.path.join('12', '3', 'apache.access.log.PROJECT') logFile = os.path.join(baseDir, inputPath) def parseLogs(): parsed_logs = (sc .textFile(logFile) .map(parseApacheLogLine) .cache()) access_logs = (parsed_logs .filter(lambda s: s[1] == 1) .map(lambda s: s[0]) .cache()) failed_logs = (parsed_logs .filter(lambda s: s[1] == 0) .map(lambda s: s[0])) failed_logs_count = failed_logs.count() if failed_logs_count > 0: print 'Number of invalid logline: %d' % failed_logs.count() for line in failed_logs.take(20): print 'Invalid logline: %s' % line print 'Read %d lines, successfully parsed %d lines, failed to parse %d lines' % (parsed_logs.count(), access_logs.count(), failed_logs.count()) return parsed_logs, access_logs, failed_logs parsed_logs, access_logs, failed_logs = parseLogs()
content_sizes = access_logs.map(lambda log: log.content_size).cache()
print 'Content Size Avg: %i, Min: %i, Max: %s' % (
content_sizes.reduce(lambda a, b : a + b) / content_sizes.count(),
content_sizes.min(),
content_sizes.max())
responseCodeToCount = (access_logs
.map(lambda log: (log.response_code, 1))
.reduceByKey(lambda a, b : a + b)
.cache())
responseCodeToCountList = responseCodeToCount.take(100)
# 省略部分输出
print responseCodeToCountList
hostCountPairTuple = access_logs.map(lambda log: (log.host, 1))
hostSum = hostCountPairTuple.reduceByKey(lambda a, b : a + b)
hostMoreThan10 = hostSum.filter(lambda s: s[1] > 10)
hostsPick20 = (hostMoreThan10
.map(lambda s: s[0])
.take(20))
# 省略部分输出
print 'Any 20 hosts that have accessed more then 10 times: %s' % hostsPick20
endpointCounts = (access_logs
.map(lambda log: (log.endpoint, 1))
.reduceByKey(lambda a, b : a + b))
topEndpoints = endpointCounts.takeOrdered(10, lambda s: -1 * s[1])
# 省略部分输出
print 'Top Ten Endpoints: %s' % topEndpoints
errorEndpointCounts = (access_logs
.map(lambda log: (log.endpoint, 1))
.reduceByKey(lambda a, b : a + b)
.filter(lambda x:x[0]!='200'))
topErrorEndpoints = errorEndpointCounts.takeOrdered(10, lambda s: -1 * s[1])
print topErrorEndpoints
IndependentHostCounts = (access_logs
.map(lambda log: (log.host, 1))
.reduceByKey(lambda a, b : a + b)
.count())
print IndependentHostCounts
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。