赞
踩
今天大半天都在折腾着一个问题,就是从kafka消费消息后,后面的业务处理一直处理不过来,总是延后几个小时。为了解决这个问题,不断去调试代码,查找到相对耗时的位置,最终定位是Qeueue的问题。先上一段简化版的代码。
- #!/usr/bin/env python
- #-*- coding:utf-8 -*-
- from cachetools import TTLCache
- import os
- import sys
- import subprocess
- from multiprocessing import Process,Queue
- from datetime import datetime, timedelta
- import config
- from kafka import KafkaConsumer
- from datetime import datetime, timedelta
- import time
- import socket
- import struct
- import logging
- from logging.handlers import TimedRotatingFileHandler
-
- def init_log():
- # 建立一个 TimedRotatingFileHandler 对象
- time_handler = TimedRotatingFileHandler(filename="./log/online_datarev.log", when="H", interval=1, backupCount=3)
- # 设定 time_handler 文件扩展名的保存格式
- # time_handler.suffix = '%Y-%m-%d-%H-%M-%S.log'
- # 设定 WARNING 以上的内容输出到文件里面
- time_handler.setLevel(logging.WARNING)
- # 设定 time_handler 的输出参数
- formatter = logging.Formatter('[%(levelname)s %(asctime)s %(filename)s %(lineno)s]:%(message)s')
- time_handler.setFormatter(formatter)
- # 创建一个 logger
- logger = logging.getLogger()
- # 将 time_handler 添加到 logger 中
- logger.addHandler(time_handler)
- return logger
-
- logging = init_log()
-
- def get_log_from_kafka(q, appid, app_boostrap_server):
- while True:
- try:
- topic = 'ysec_anticheat_log_' + appid
- consumer = KafkaConsumer(topic, bootstrap_servers=app_boostrap_server, request_timeout_ms=config.REQUEST_TIMEOUT_MS,auto_offset_reset=config.AUTO_OFFSET_RESET, group_id='anticheat_pic1')
- for msg in consumer:
- q.put(msg.value) # 将读取得到的数据,写入队列中
- except Exception as e:
- logging.error(str(e))
-
- # 进行kafka数据的处理
- def deal_log_task(q):
- while True:
- ip = ""
- region = ""
- code = 0
- msg_list = []
- try:
- first_time = time.time()
- print(q.qsize())
- end_time = time.time()
- print(end_time-first_time)
- except Exception as e:
- logging.error(e)
- continue
-
-
- if __name__ == '__main__':
-
- q = Queue(200000000)
- task_list = []
- appid = 'act_present'
- p = Process(target = get_log_from_kafka, args = (q, appid, config.log_boostrap_server,))
- p.start()
- task_list.append(p)
-
- for i in range(5):
- task = Process(target = deal_log_task, args = (q,))
- task.start()
- task_list.append(task)
-
- for p in task_list:
- p.join()
因为此业务每分钟维持在40万左右,而队列中的数据越来越多,最终占用了系统90%的内存了。我将接收进程接收消息的时间打印出来,没想到竟然需要60毫秒到200毫秒之间,接收一条消息竟然需要那么长时间,肯定有问题,直接去问google,发现Qeueue就个安全队列,那么想到put和get肯定都会加锁,这样频繁的加锁和解锁,那么时间都浪费在加解锁上了。
那我们是需要解决这加解锁的问题,那我们是不是可以减少在队列的交互次数呢?于是想到下面的一种方法来解决。直接上代码。
- #!/usr/bin/env python
- #-*- coding:utf-8 -*-
- from cachetools import TTLCache
- import os
- import sys
- import subprocess
- from multiprocessing import Process,Queue
- from datetime import datetime, timedelta
- import config
- from kafka import KafkaConsumer
- from datetime import datetime, timedelta
- import time
- import socket
- import struct
- import logging
- from logging.handlers import TimedRotatingFileHandler
-
- def init_log():
- # 建立一个 TimedRotatingFileHandler 对象
- time_handler = TimedRotatingFileHandler(filename="./log/online_datarev.log", when="H", interval=1, backupCount=3)
- # 设定 time_handler 文件扩展名的保存格式
- # time_handler.suffix = '%Y-%m-%d-%H-%M-%S.log'
- # 设定 WARNING 以上的内容输出到文件里面
- time_handler.setLevel(logging.WARNING)
- # 设定 time_handler 的输出参数
- formatter = logging.Formatter('[%(levelname)s %(asctime)s %(filename)s %(lineno)s]:%(message)s')
- time_handler.setFormatter(formatter)
- # 创建一个 logger
- logger = logging.getLogger()
- # 将 time_handler 添加到 logger 中
- logger.addHandler(time_handler)
- return logger
-
- logging = init_log()
-
- def get_log_from_kafka(q, appid, app_boostrap_server, max_num):
- while True:
- try:
- topic = 'ysec_anticheat_log_' + appid
- consumer = KafkaConsumer(topic, bootstrap_servers=app_boostrap_server, request_timeout_ms=config.REQUEST_TIMEOUT_MS,auto_offset_reset=config.AUTO_OFFSET_RESET, group_id='anticheat_pic1')
- msg_list = []
- for msg in consumer:
- msg_list.append(msg.value)
- if len(msg_list) == max_num:
- q.put(msg_list) # 将读取得到的数据,写入队列中
- msg_list = []
- except Exception as e:
- logging.error(str(e))
-
- # 进行kafka数据的处理
- def deal_log_task(q):
- while True:
- ip = ""
- region = ""
- code = 0
- msg_list = []
- try:
- first_time = time.time()
- msg_list = q.get() # 从队列获取信息
- print(q.qsize())
- end_time = time.time()
- print(end_time-first_time)
- except Exception as e:
- logging.error(e)
- continue
- for msg in msg_list:
- print(msg)
-
-
- if __name__ == '__main__':
-
- q = Queue(2000000)
- task_list = []
- appid = 'act_present'
- p = Process(target = get_log_from_kafka, args = (q, appid, config.log_boostrap_server,1000,))
- p.start()
- task_list.append(p)
-
- for i in range(5):
- task = Process(target = deal_log_task, args = (q,))
- task.start()
- task_list.append(task)
-
- for p in task_list:
- p.join()
改版后的代码性能大大提高,原因是减少消息在队列中交互的数量,增加的是每条消息的大小而已,问题解决。在使用multiprocessing的Queue时需要注意,在数据交互频率较大时,不建议使用这种方式,非常影响性能。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。