当前位置:   article > 正文

websphere 发送日志 syslog_Structured Streaming编程练习-日志分析

统计root这个进程每小时生成的日志数,并以时间顺序排列,水印设置为1分钟。

e29773e2a87a746699d12d696d5cf097.png
Syslog日志内一般包括产生日志的时间、主机名、程序模块、进程名、进程ID、严重性和日志内容。日志一般会通过Kafka等有容错保障的源发送,本实验为了简化,直接将Syslog通过Socket源发送。新建一个终端,执行如下命令:$ tail -n+1 -f /var/log/syslog | nc -lk 9988“tail -n+1 -f /var/log/syslog”表示从第一行开始打印文件syslog的内容。“-f”表示如果文件有增加则持续输出最新的内容。然后,通过管道把文件内容发送到nc程序(nc程序可以进一步把数据发送给Spark)

对Syslog进行查询由Spark接收nc程序发送过来的日志信息,然后完成以下任务:

(1)统计CRON这个进程每小时生成的日志数,并以时间顺序排列,水印设置为1分钟 (2)统计每小时的每个进程或者服务分别产生的日志总数,水印设置为1分钟。 (3)输出所有日志内容带error的日志。

ps:基于pyspark

新建python脚本test.py如下:

  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. from functools import partial
  4. from pyspark.sql import SparkSession
  5. from pyspark.sql.functions import *
  6. if __name__ == "__main__":
  7. spark = SparkSession.builder.appName("syslog_ouput").getOrCreate()
  8. spark.sparkContext.setLogLevel('WARN')
  9. lines = spark.readStream.format('socket').option("host","localhost").option("port","9988").load()
  10. # 日志格式如下,使用正则处理
  11. # Nov 24 13:17:01 spark CRON[18455]: (root) CMD ( cd / && run-parts --report /etc/cron.hourly)
  12. # 定义一个偏应用函数,从固定的pattern获取日志内匹配的字段
  13. fields = partial(
  14. regexp_extract, str="value", pattern="^(w{3}s*d{1,2} d{2}:d{2}:d{2}) (.*?) (.*?)[*d*]*: (.*)$"
  15. )
  16. words = lines.select(
  17. to_timestamp(format_string('2020 %s', fields(idx=1)), 'yy MMM d H:m:s').alias("timestamp"),
  18. fields(idx=2).alias("hostname"),
  19. fields(idx=3).alias("tag"),
  20. fields(idx=4).alias("content"),
  21. )
  22. # (1). 统计CRON这个进程每小时生成的日志数,并以时间顺序排列,水印设置为1分钟。
  23. windowedCounts1 = words.filter("tag = 'CRON'").withWatermark("timestamp", "1 minutes").groupBy(window('timestamp', "1 hour")).count().sort(asc('window'))
  24. # (2). 统计每小时的每个进程或者服务分别产生的日志总数,水印设置为1分钟。
  25. windowedCounts2 = words.withWatermark("timestamp", "1 minutes") .groupBy('tag', window('timestamp', "1 hour")).count().sort(asc('window'))
  26. #(3). 输出所有日志内容带error的日志
  27. #如果没有error信息,可以手动加入error日志信息,在终端输入下面语句即可
  28. #logger 'I am a test error log message.'
  29. windowedCounts3 = words.filter("content like '%error%'")
  30. # 开始运行查询并在控制台输出,问题3使用update输出,complete输出要有聚合函数
  31. query = windowedCounts1.writeStream.outputMode("complete").format("console").option('truncate', 'false').trigger(processingTime="3 seconds").start()
  32. #query = windowedCounts2.writeStream.outputMode("complete").format("console").option('truncate', 'false').trigger(processingTime="3 seconds").start()
  33. #query = windowedCounts3.writeStream.outputMode("update").format("console").option('truncate', 'false').trigger(processingTime="3 seconds").start()
  34. query.awaitTermination()

打开一个终端,输入下面语句:

  1. #向9988端口发送日志信息
  2. tail -n+1 -f /var/log/syslog | nc -lk 9988“tail -n+1 -f /var/log/syslog

再打开一个终端,运行Python脚本文件:

/usr/local/spark/bin/spark-submit test.py

对每小时CRON进程的信息计数后输出

88f37c19589e5458ad67f35bac3bbccd.png

学习交流,有任何问题还请随时评论指出交流。

声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号