当前位置:   article > 正文

python 记录日志到日志服务器_[docker]通过rsyslog记录日志并转发nginx日志到python程序...

python 记录日志到日志服务器

import asyncio

import json

import time

import database_init

class LogAnalyser:

def __init__(self):

pass

def process(self, str_input):

# print(str_input)

str_input = str_input.decode("utf-8", errors="ignore")

# Add your processing steps here

# ...

try:

# Extract created_at from the log string

str_splits = str_input.split("{", 1)

json_text = "{" + str_splits[1]

data = json.loads(json_text)

created_at = data["time"]

request_all = data["request"].split(" /", 1)

http_type = request_all[0]

path = data["path"]

request_time = data["request_time"]

if PREFIX in data["path"]:

path = data["path"]

return http_type, path, created_at,request_time # The order is relevant for INSERT query params

except Exception as e:

print("error in read_rsylog.py,Class LogAnalyser,function process")

print(e)

return None

@asyncio.coroutine

def handle_echo(reader, writer):

log_filter = LogAnalyser()

while True:

line = yield from reader.readline()

if not line:

break

params = log_filter.process(line)

if params:

# 进行一堆操作,例如进行数据库的插入

# execute_sql(params=params)

if __name__ == '__main__':

CURSOR = database_init.DBConnect().CURSOR

CONN = database_init.DBConnect().CONN

PREFIX = database_init.DBConnect().CONFIG["TEST_SWAGGER"]["PREFIX"]

database_init.DBConnect().create_table()

loop = asyncio.get_event_loop()

coro = asyncio.start_server(handle_echo, None, 6000, loop=loop)

server = loop.run_until_complete(coro)

# Serve requests until Ctrl+C is pressed

print('Serving on {}'.format(server.sockets[0].getsockname()))

try:

loop.run_forever()

except KeyboardInterrupt:

pass

# Close the server

print("Closing the server.")

server.close()

loop.run_until_complete(server.wait_closed())

loop.close()

CURSOR.close()

CONN.close()

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/815896
推荐阅读
相关标签
  

闽ICP备14008679号