赞
踩
import asyncio
import json
import time
import database_init
class LogAnalyser:
def __init__(self):
pass
def process(self, str_input):
# print(str_input)
str_input = str_input.decode("utf-8", errors="ignore")
# Add your processing steps here
# ...
try:
# Extract created_at from the log string
str_splits = str_input.split("{", 1)
json_text = "{" + str_splits[1]
data = json.loads(json_text)
created_at = data["time"]
request_all = data["request"].split(" /", 1)
http_type = request_all[0]
path = data["path"]
request_time = data["request_time"]
if PREFIX in data["path"]:
path = data["path"]
return http_type, path, created_at,request_time # The order is relevant for INSERT query params
except Exception as e:
print("error in read_rsylog.py,Class LogAnalyser,function process")
print(e)
return None
@asyncio.coroutine
def handle_echo(reader, writer):
log_filter = LogAnalyser()
while True:
line = yield from reader.readline()
if not line:
break
params = log_filter.process(line)
if params:
# 进行一堆操作,例如进行数据库的插入
# execute_sql(params=params)
if __name__ == '__main__':
CURSOR = database_init.DBConnect().CURSOR
CONN = database_init.DBConnect().CONN
PREFIX = database_init.DBConnect().CONFIG["TEST_SWAGGER"]["PREFIX"]
database_init.DBConnect().create_table()
loop = asyncio.get_event_loop()
coro = asyncio.start_server(handle_echo, None, 6000, loop=loop)
server = loop.run_until_complete(coro)
# Serve requests until Ctrl+C is pressed
print('Serving on {}'.format(server.sockets[0].getsockname()))
try:
loop.run_forever()
except KeyboardInterrupt:
pass
# Close the server
print("Closing the server.")
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()
CURSOR.close()
CONN.close()
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。