当前位置:   article > 正文

毕业设计项目:基于Flume+Spark+Flask的分布式实时日志分析与入侵检测系统的设计与实现_基于分布式主机日志分析的入侵检测系统

基于分布式主机日志分析的入侵检测系统

项目源码:https://download.csdn.net/download/zy_dreamer/88038658

项目内容截图:

项目效果:

 

 架构图

 

 

 

简介

本项目是一个整合了web日志聚合、分发、实时分析、入侵检测、数据存储与可视化的日志分析解决方案。聚合采用Apache Flume,分发采用Apache Kafka,实时处理采用Spark Streaming,入侵检测采用Spark MLlib,数据存储使用HDFS与Redis,可视化采用Flask、SocketIO、Echarts、Bootstrap。

本文下述的使用方法均面向单机伪分布式环境,你可以根据需求进行配置上的调整以适应分布式部署。

本系统各模块由个人独立开发,期间参考了一些有价值的文献与资料。本系统还是个人的本科毕业设计。

获得的奖项:2019年全国大学生计算机设计大赛安徽省二等奖、2019年安徽省信息安全作品赛二等奖。

项目结构

  • flask:Flask Web后端

  • spark:日志分析与入侵检测的实现

  • flume:Flume配置文件

  • log_gen:模拟日志生成器

  • datasets:测试日志数据集

  • images:README的图片

依赖与版本

  • 编译与Web端需要用到的:

    • Java 8, Scala 2.11.12, Python 3.8 (包依赖见requirements), sbt 1.3.8

  • 计算环境中需要用到的:

    • Java 8, Apache Flume 1.9.0, Kafka 2.4, Spark 2.4.5, ZooKeeper 3.5.7, Hadoop 2.9.2, Redis 5.0.8

使用说明

在开始之前,你需要修改源码或配置文件中的IP为你自己的地址。具体涉及到flume配置文件、Spark主程序、Flask Web后端。

编译Spark应用

在安装好Java8与Scala11的前提下,在spark目录下,初始化sbt

sbt

退出sbt shell并使用sbt-assembly对Spark项目进行编译打包:

sbt assembly

然后将生成的jar包重命名为logvision.jar

环境准备

你需要一个伪分布式环境(测试环境为CentOS 7),并完成了所有对应版本组件依赖的配置与运行。 使用flume目录下的standalone.conf启动一个Flume Agent。 将datasets文件夹中的learning-datasets提交如下路径:

/home/logv/learning-datasets

datasets文件夹中的access_log提交如下路径:

/home/logv/access_log

入侵检测模型训练与测试

提交jar包至Spark集群并执行入侵检测模型的生成与测试:

spark-submit --class learning logvision.jar

你将可以看到如下结果: 两个表格分别代表正常与异常数据集的入侵检测结果,下面四个表格可用于判断识别准确率。如图中所示250条正常测试数据被检测为250条正常,识别率100%;250条异常测试数据被检测为240条异常,10条正常,准确率96%。

启动可视化后端

flask目录下执行如下命令,下载依赖包:

pip3 install -r requirements.txt

启动Flask Web:

python3 app.py

启动实时日志生成器

log_gen中的实时日志生成器可根据传入参数(每次写入行数、写入间隔时间)将样本日志中的特定行块追加至目标日志中,以模拟实时日志的生成过程,供后续实时处理。

java log_gen [日志源] [目标文件] [每次追加的行数] [时间间隔(秒)]

提交至环境,编译并运行,每2秒将/home/logv/access_log文件中的5行追加至/home/logSrc中:

javac log_gen.java
java log_gen /home/logv/access_log /home/logSrc 5 2

启动分析任务

提交jar包至Spark集群并执行实时分析任务:

spark-submit --class streaming logvision.jar

查看可视化结果

至此你已经完成了后端组件的配置,通过浏览器访问Web端主机的5000端口可以查看到实时日志分析的可视化结果

Flask-python依赖:

Flask_SocketIO==4.3.0
requests==2.23.0
kafka_python==2.0.1
redis==3.5.0
flask==1.1.2

部分代码演示:

  1. import ast
  2. import time
  3. from kafka import KafkaConsumer
  4. import redis
  5. import requests
  6. from threading import Lock, Thread
  7. from flask import Flask, render_template, session, request
  8. from flask_socketio import SocketIO, emit
  9. async_mode = None
  10. app = Flask(__name__)
  11. app.config['SECRET_KEY'] = 'secret!'
  12. socketio = SocketIO(app, async_mode=async_mode)
  13. thread = None
  14. thread_lock = Lock()
  15. # 配置项目
  16. time_interval = 1
  17. kafka_bootstrap_servers = "10.0.0.222:9092"
  18. redis_con_pool = redis.ConnectionPool(host='10.0.0.222', port=6379, decode_responses=True)
  19. # 页面路由与对应页面的ws接口
  20. # 系统时间
  21. @socketio.on('connect', namespace='/sys_time')
  22. def sys_time():
  23. def loop():
  24. while True:
  25. socketio.sleep(time_interval)
  26. current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
  27. socketio.emit('sys_time',
  28. {'data': current_time},
  29. namespace='/sys_time')
  30. socketio.start_background_task(target=loop)
  31. # 欢迎页面
  32. @app.route('/')
  33. @app.route('/welcome')
  34. def welcome():
  35. return render_template('index.html', async_mode=socketio.async_mode)
  36. # 实时日志流
  37. @socketio.on('connect', namespace='/log_stream')
  38. def log_stream():
  39. def loop():
  40. socketio.sleep(time_interval)
  41. consumer = KafkaConsumer("raw_log", bootstrap_servers=kafka_bootstrap_servers)
  42. cache = ""
  43. for msg in consumer:
  44. cache += bytes.decode(msg.value) + "\n"
  45. if len(cache.split("\n")) == 25:
  46. socketio.emit('log_stream',
  47. {'data': cache},
  48. namespace='/log_stream')
  49. cache = ""
  50. socketio.start_background_task(target=loop)
  51. # 实时日志分析页面
  52. @app.route('/analysis')
  53. def analysis():
  54. return render_template('analysis.html', async_mode=socketio.async_mode)
  55. # 实时计数器
  56. @socketio.on('connect', namespace='/count_board')
  57. def count_board():
  58. def loop():
  59. while True:
  60. socketio.sleep(time_interval)
  61. redis_con = redis.Redis(connection_pool=redis_con_pool)
  62. res = redis_con.zrange("statcode", 0, 40, withscores=True)
  63. # 总请求数(日志行数)
  64. host_count = redis_con.zscore("line", "count")
  65. # 成功请求数(状态码属于normal的个数)
  66. normal = ["200", "201", "202", "203", "204", "205", "206", "207"]
  67. success_count = 0
  68. for i in res:
  69. if i[0] in normal:
  70. success_count += int(i[1])
  71. # 其他请求数(其他状态码个数)
  72. other_count = 0
  73. for i in res:
  74. other_count += int(i[1])
  75. other_count -= success_count
  76. # 访客数(不同的IP个数)
  77. visitor_count = redis_con.zcard("host")
  78. # 资源数(不同的url个数)
  79. url_count = redis_con.zcard("url")
  80. # 流量大小(bytes的和,MB)
  81. traffic_sum = int(redis_con.zscore("traffic", "sum"))
  82. # 日志大小(MB)
  83. log_size = int(redis_con.zscore("size", "sum"))
  84. socketio.emit('count_board',
  85. {'host_count': host_count,
  86. 'success_count': success_count,
  87. 'other_count': other_count,
  88. 'visitor_count': visitor_count,
  89. 'url_count': url_count,
  90. 'traffic_sum': traffic_sum,
  91. 'log_size': log_size},
  92. namespace='/count_board')
  93. socketio.start_background_task(target=loop)
  94. # 实时热门位置
  95. @socketio.on('connect', namespace='/hot_geo')
  96. def hot_geo():
  97. def loop():
  98. while True:
  99. socketio.sleep(2)
  100. redis_con = redis.Redis(connection_pool=redis_con_pool)
  101. res = redis_con.zrevrange("host", 0, 50, withscores=True)
  102. data = []
  103. for i in res:
  104. # 调用接口获取地理坐标
  105. req = requests.get("http://api.map.baidu.com/location/ip",
  106. {'ak': '0jKbOcwqK7dGZiYIhSai5rsxTnQZ4UQt',
  107. 'ip': i[0],
  108. 'coor': 'bd09ll'})
  109. body = eval(req.text)
  110. # 仅显示境内定位
  111. if body['status'] == 0:
  112. coor_x = body['content']['point']['x']
  113. coor_y = body['content']['point']['y']
  114. data.append({"name": i[0], "value": [coor_x, coor_y, i[1]]})
  115. socketio.emit('hot_geo',
  116. {'data': data},
  117. namespace='/hot_geo')
  118. socketio.start_background_task(target=loop)
  119. # 实时热门资源排名
  120. @socketio.on('connect', namespace='/hot_url')
  121. def hot_url():
  122. def loop():
  123. while True:
  124. socketio.sleep(time_interval)
  125. redis_con = redis.Redis(connection_pool=redis_con_pool)
  126. res = redis_con.zrevrange("url", 0, 9, withscores=True)
  127. data = []
  128. no = 1
  129. for i in res:
  130. data.append({"no": no, "url": i[0], "count": i[1]})
  131. no += 1
  132. socketio.emit('hot_url',
  133. {'data': data},
  134. namespace='/hot_url')
  135. socketio.start_background_task(target=loop)
  136. # 实时热门IP排名
  137. @socketio.on('connect', namespace='/hot_ip')
  138. def hot_ip():
  139. def loop():
  140. while True:
  141. socketio.sleep(time_interval)
  142. redis_con = redis.Redis(connection_pool=redis_con_pool)
  143. res = redis_con.zrevrange("host", 0, 13, withscores=True)
  144. data = []
  145. no = 1
  146. for i in res:
  147. # 调用接口获取地理坐标
  148. req = requests.get("http://api.map.baidu.com/location/ip",
  149. {'ak': '0jKbOcwqK7dGZiYIhSai5rsxTnQZ4UQt',
  150. 'ip': i[0],
  151. 'coor': 'bd09ll'})
  152. body = eval(req.text)
  153. # 仅显示境内定位
  154. if body['status'] == 0:
  155. address = body['content']['address']
  156. data.append({"no": no, "ip": i[0], "address": address, "count": i[1]})
  157. no += 1
  158. socketio.emit('hot_ip',
  159. {'data': data},
  160. namespace='/hot_ip')
  161. socketio.start_background_task(target=loop)
  162. # 实时状态码比例
  163. @socketio.on('connect', namespace='/status_code_pie')
  164. def status_code_pie():
  165. def loop():
  166. while True:
  167. socketio.sleep(time_interval)
  168. redis_con = redis.Redis(connection_pool=redis_con_pool)
  169. res = redis_con.zrevrange("statcode", 0, 100, withscores=True)
  170. data = []
  171. legend = []
  172. for i in res:
  173. if i[0] != 'foo':
  174. data.append({"value": i[1], "name": i[0]})
  175. legend.append(i[0])
  176. socketio.emit('status_code_pie',
  177. {'legend': legend, 'data': data},
  178. namespace='/status_code_pie')
  179. socketio.start_background_task(target=loop)
  180. # 实时请求方式比例
  181. @socketio.on('connect', namespace='/req_method_pie')
  182. def req_method_pie():
  183. def loop():
  184. while True:
  185. socketio.sleep(time_interval)
  186. redis_con = redis.Redis(connection_pool=redis_con_pool)
  187. res = redis_con.zrevrange("reqmt", 0, 100, withscores=True)
  188. data = []
  189. legend = []
  190. for i in res:
  191. if i[0] != 'foo':
  192. data.append({"value": i[1], "name": i[0]})
  193. legend.append(i[0])
  194. socketio.emit('req_method_pie',
  195. {'legend': legend, 'data': data},
  196. namespace='/req_method_pie')
  197. socketio.start_background_task(target=loop)
  198. # 实时请求计数(按时间顺序)
  199. @socketio.on('connect', namespace='/req_count_timeline')
  200. def req_count_timeline():
  201. def loop():
  202. while True:
  203. socketio.sleep(time_interval)
  204. redis_con = redis.Redis(connection_pool=redis_con_pool)
  205. res = dict(redis_con.zrange("datetime", 0, 10000000, withscores=True))
  206. data = []
  207. date = []
  208. # 按时间排序
  209. for i in sorted(res):
  210. datetime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(i) / 1000))
  211. data.append(res[i])
  212. date.append(datetime)
  213. socketio.emit('req_count_timeline',
  214. {"data": data, "date": date},
  215. namespace='/req_count_timeline')
  216. socketio.start_background_task(target=loop)
  217. # IP请求数排序
  218. @socketio.on('connect', namespace='/ip_ranking')
  219. def timestamp_count_timeline():
  220. def loop():
  221. while True:
  222. socketio.sleep(time_interval)
  223. redis_con = redis.Redis(connection_pool=redis_con_pool)
  224. res = redis_con.zrevrange("host", 0, 50, withscores=True)
  225. ip = []
  226. count = []
  227. for i in res:
  228. ip.append(i[0])
  229. count.append(i[1])
  230. socketio.emit('ip_ranking',
  231. {"ip": ip, "count": count},
  232. namespace='/ip_ranking')
  233. socketio.start_background_task(target=loop)
  234. @app.route('/id')
  235. def id():
  236. return render_template("id.html", async_mode=socketio.async_mode)
  237. # 异常请求计数
  238. @socketio.on('connect', namespace='/bad_count')
  239. def bad_count():
  240. def loop():
  241. while True:
  242. socketio.sleep(time_interval)
  243. redis_con = redis.Redis(connection_pool=redis_con_pool)
  244. res = int(redis_con.zscore("bad", "bad"))
  245. socketio.emit('bad_count',
  246. {"data": res},
  247. namespace='/bad_count')
  248. socketio.start_background_task(target=loop)
  249. # 正常请求计数
  250. @socketio.on('connect', namespace='/good_count')
  251. def bad_count():
  252. def loop():
  253. while True:
  254. socketio.sleep(time_interval)
  255. redis_con = redis.Redis(connection_pool=redis_con_pool)
  256. res = int(redis_con.zscore("good", "good"))
  257. socketio.emit('good_count',
  258. {"data": res},
  259. namespace='/good_count')
  260. socketio.start_background_task(target=loop)
  261. # 正常请求地理标记
  262. @socketio.on('connect', namespace='/good_geo')
  263. def good_geo():
  264. def loop():
  265. while True:
  266. socketio.sleep(time_interval)
  267. consumer = KafkaConsumer("good_result", bootstrap_servers=kafka_bootstrap_servers)
  268. data = []
  269. for msg in consumer:
  270. result = ast.literal_eval(bytes.decode(msg.value))
  271. for record in result:
  272. if record['host'] != "foo":
  273. # 调用接口获取地理坐标
  274. req = requests.get("http://api.map.baidu.com/location/ip",
  275. {'ak': '0jKbOcwqK7dGZiYIhSai5rsxTnQZ4UQt',
  276. 'ip': record['host'],
  277. 'coor': 'bd09ll'})
  278. body = eval(req.text)
  279. # 仅显示境内定位
  280. if body['status'] == 0:
  281. coor_x = body['content']['point']['x']
  282. coor_y = body['content']['point']['y']
  283. datetime = time.strftime("%Y-%m-%d %H:%M:%S",
  284. time.localtime(int(record['timestamp']) / 1000))
  285. data.append({"name": record['host'], "value": [coor_x, coor_y,
  286. record['url'],
  287. datetime,
  288. record['req_method'],
  289. record['protocol'],
  290. record['status_code']]})
  291. socketio.emit('good_geo',
  292. {"data": data},
  293. namespace='/good_geo')
  294. socketio.start_background_task(target=loop)

说明:当前文章或代码如侵犯了您的权益,请私信作者删除!

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/爱喝兽奶帝天荒/article/detail/746670
推荐阅读
相关标签
  

闽ICP备14008679号