当前位置:   article > 正文

Python利用队列Queue实现多进程Process间通信_process 传递queue

process 传递queue

背景:

我在利用大模型进行推理的时候,一个机器上面有8张GPU卡,如何充分的利用这几张卡呢?最开始想到的是利用Python多线程方案,可代码实现之后发现虽然我在环境变量里面设置可以使用多张GPU卡,可程序依然只使用了第一张卡,于是就想利用多进程来实现

代码:

  1. from multiprocessing import Queue
  2. import time
  3. import multiprocessing as mp
  4. #这里是定义一个函数,将一个目录下面的文件添加到队列里面,该函数由主进程调用
  5. def put_data(queue, root_dir):
  6. message_dir = os.path.join(root_dir, "message");
  7. file_count = 0
  8. file_names =[]
  9. for item in os.listdir(message_dir):
  10. sub_path = os.path.join(message_dir, item)
  11. if os.path.isdir(sub_path):
  12. for subItem in os.listdir(sub_path):
  13. sub_item_path = os.path.join(sub_path, subItem)
  14. if os.path.isfile(sub_item_path):
  15. file_names.append(sub_item_path)
  16. file_count = file_count + 1
  17. #这里根据文件的创建时间进行排序
  18. file_names = sorted(file_names, key=lambda x: os.path.getctime(x))
  19. #取文件创建时间最新的一百个文件
  20. file_names = file_names[-100:]
  21. #文件添加到队列里
  22. for file_name in file_names:
  23. queue.put(file_name)
  24. return file_count
  25. #这里定义一个函数,这个函数是后面创建进程将会执行的方法
  26. #gpu_id:该进程将利用哪一块GPU,
  27. def worker(gpu_id, queue, process_num):
  28. print(f"此时正在执行的GPU num:{gpu_id} --- process_num:{process_num}")
  29. os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
  30. #通过设置该环境变量来确保GPU对该进程可见
  31. os.environ["CUDA_VISIBLE_DEVICES"] = gpu_id
  32. while True:
  33. try:
  34. #从队列里面获取文件名称
  35. file_name = queue.get();
  36. #通过重命名文件来解决队列里面可能存在的重复文件名问题,
  37. #解决主进程在添加文件时,添加了重复的文件
  38. new_filename = file_name + ".bak"
  39. try:
  40. os.rename(file_name, new_filename)
  41. print(f"此时正在执行的GPU num:{gpu_id} --- process_num:{process_num},文件已成功重命名为:{new_filename}")
  42. except:
  43. print(f"此时正在执行的GPU num:{gpu_id} --- process_num:{process_num},文件重命名失败:{new_filename}")
  44. except Exception as outExceiption:
  45. print('Error:', outExceiption)
  46. processes = []
  47. queue = Queue()
  48. for i in range(5, 9):
  49. for process_num in range(1):
  50. #创建进程,并传递对应的执行函数,和函数的参数
  51. process = mp.Process(target=worker, args=(str(i),queue, process_num))
  52. processes.append(process)
  53. #启动进程
  54. process.start()
  55. print(f"process count : {len(processes)}")
  56. root_dir = "/data/test"
  57. while True:
  58. if queue.qsize() < 20:
  59. file_count = put_data(queue, root_dir)
  60. print(f"main process find file size : {file_count}")
  61. if file_count == 0:
  62. time.sleep(5)
  63. else:
  64. time.sleep(5)

结论:

利用multiprocessing的Process可以非常方便的实现多进程架构,另外使用它的Queue也可以非常便捷的进行进程间通信,这样每个进程指定特定的GPU卡,就实现了GPU卡的利用。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/860627
推荐阅读
相关标签
  

闽ICP备14008679号