当前位置:   article > 正文

关于multiprocessing的Queue效率问题_multiprocessing queue 延时

multiprocessing queue 延时

今天大半天都在折腾着一个问题,就是从kafka消费消息后,后面的业务处理一直处理不过来,总是延后几个小时。为了解决这个问题,不断去调试代码,查找到相对耗时的位置,最终定位是Qeueue的问题。先上一段简化版的代码。

  1. #!/usr/bin/env python
  2. #-*- coding:utf-8 -*-
  3. from cachetools import TTLCache
  4. import os
  5. import sys
  6. import subprocess
  7. from multiprocessing import Process,Queue
  8. from datetime import datetime, timedelta
  9. import config
  10. from kafka import KafkaConsumer
  11. from datetime import datetime, timedelta
  12. import time
  13. import socket
  14. import struct
  15. import logging
  16. from logging.handlers import TimedRotatingFileHandler
  17. def init_log():
  18. # 建立一个 TimedRotatingFileHandler 对象
  19. time_handler = TimedRotatingFileHandler(filename="./log/online_datarev.log", when="H", interval=1, backupCount=3)
  20. # 设定 time_handler 文件扩展名的保存格式
  21. # time_handler.suffix = '%Y-%m-%d-%H-%M-%S.log'
  22. # 设定 WARNING 以上的内容输出到文件里面
  23. time_handler.setLevel(logging.WARNING)
  24. # 设定 time_handler 的输出参数
  25. formatter = logging.Formatter('[%(levelname)s %(asctime)s %(filename)s %(lineno)s]:%(message)s')
  26. time_handler.setFormatter(formatter)
  27. # 创建一个 logger
  28. logger = logging.getLogger()
  29. # 将 time_handler 添加到 logger 中
  30. logger.addHandler(time_handler)
  31. return logger
  32. logging = init_log()
  33. def get_log_from_kafka(q, appid, app_boostrap_server):
  34. while True:
  35. try:
  36. topic = 'ysec_anticheat_log_' + appid
  37. consumer = KafkaConsumer(topic, bootstrap_servers=app_boostrap_server, request_timeout_ms=config.REQUEST_TIMEOUT_MS,auto_offset_reset=config.AUTO_OFFSET_RESET, group_id='anticheat_pic1')
  38. for msg in consumer:
  39. q.put(msg.value) # 将读取得到的数据,写入队列中
  40. except Exception as e:
  41. logging.error(str(e))
  42. # 进行kafka数据的处理
  43. def deal_log_task(q):
  44. while True:
  45. ip = ""
  46. region = ""
  47. code = 0
  48. msg_list = []
  49. try:
  50. first_time = time.time()
  51. print(q.qsize())
  52. end_time = time.time()
  53. print(end_time-first_time)
  54. except Exception as e:
  55. logging.error(e)
  56. continue
  57. if __name__ == '__main__':
  58. q = Queue(200000000)
  59. task_list = []
  60. appid = 'act_present'
  61. p = Process(target = get_log_from_kafka, args = (q, appid, config.log_boostrap_server,))
  62. p.start()
  63. task_list.append(p)
  64. for i in range(5):
  65. task = Process(target = deal_log_task, args = (q,))
  66. task.start()
  67. task_list.append(task)
  68. for p in task_list:
  69. p.join()

因为此业务每分钟维持在40万左右,而队列中的数据越来越多,最终占用了系统90%的内存了。我将接收进程接收消息的时间打印出来,没想到竟然需要60毫秒到200毫秒之间,接收一条消息竟然需要那么长时间,肯定有问题,直接去问google,发现Qeueue就个安全队列,那么想到put和get肯定都会加锁,这样频繁的加锁和解锁,那么时间都浪费在加解锁上了。

那我们是需要解决这加解锁的问题,那我们是不是可以减少在队列的交互次数呢?于是想到下面的一种方法来解决。直接上代码。

  1. #!/usr/bin/env python
  2. #-*- coding:utf-8 -*-
  3. from cachetools import TTLCache
  4. import os
  5. import sys
  6. import subprocess
  7. from multiprocessing import Process,Queue
  8. from datetime import datetime, timedelta
  9. import config
  10. from kafka import KafkaConsumer
  11. from datetime import datetime, timedelta
  12. import time
  13. import socket
  14. import struct
  15. import logging
  16. from logging.handlers import TimedRotatingFileHandler
  17. def init_log():
  18. # 建立一个 TimedRotatingFileHandler 对象
  19. time_handler = TimedRotatingFileHandler(filename="./log/online_datarev.log", when="H", interval=1, backupCount=3)
  20. # 设定 time_handler 文件扩展名的保存格式
  21. # time_handler.suffix = '%Y-%m-%d-%H-%M-%S.log'
  22. # 设定 WARNING 以上的内容输出到文件里面
  23. time_handler.setLevel(logging.WARNING)
  24. # 设定 time_handler 的输出参数
  25. formatter = logging.Formatter('[%(levelname)s %(asctime)s %(filename)s %(lineno)s]:%(message)s')
  26. time_handler.setFormatter(formatter)
  27. # 创建一个 logger
  28. logger = logging.getLogger()
  29. # 将 time_handler 添加到 logger 中
  30. logger.addHandler(time_handler)
  31. return logger
  32. logging = init_log()
  33. def get_log_from_kafka(q, appid, app_boostrap_server, max_num):
  34. while True:
  35. try:
  36. topic = 'ysec_anticheat_log_' + appid
  37. consumer = KafkaConsumer(topic, bootstrap_servers=app_boostrap_server, request_timeout_ms=config.REQUEST_TIMEOUT_MS,auto_offset_reset=config.AUTO_OFFSET_RESET, group_id='anticheat_pic1')
  38. msg_list = []
  39. for msg in consumer:
  40. msg_list.append(msg.value)
  41. if len(msg_list) == max_num:
  42. q.put(msg_list) # 将读取得到的数据,写入队列中
  43. msg_list = []
  44. except Exception as e:
  45. logging.error(str(e))
  46. # 进行kafka数据的处理
  47. def deal_log_task(q):
  48. while True:
  49. ip = ""
  50. region = ""
  51. code = 0
  52. msg_list = []
  53. try:
  54. first_time = time.time()
  55. msg_list = q.get() # 从队列获取信息
  56. print(q.qsize())
  57. end_time = time.time()
  58. print(end_time-first_time)
  59. except Exception as e:
  60. logging.error(e)
  61. continue
  62. for msg in msg_list:
  63. print(msg)
  64. if __name__ == '__main__':
  65. q = Queue(2000000)
  66. task_list = []
  67. appid = 'act_present'
  68. p = Process(target = get_log_from_kafka, args = (q, appid, config.log_boostrap_server,1000,))
  69. p.start()
  70. task_list.append(p)
  71. for i in range(5):
  72. task = Process(target = deal_log_task, args = (q,))
  73. task.start()
  74. task_list.append(task)
  75. for p in task_list:
  76. p.join()

改版后的代码性能大大提高,原因是减少消息在队列中交互的数量,增加的是每条消息的大小而已,问题解决。在使用multiprocessing的Queue时需要注意,在数据交互频率较大时,不建议使用这种方式,非常影响性能。

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop】
推荐阅读
相关标签
  

闽ICP备14008679号