赞
踩
小屌丝:鱼哥,你上一篇《Python3,掌握这几种并行处理,轻轻松松提升for循环速度》写的很赞, 我还想继续深入并行计算。
小鱼:那你的意思,哪一篇写的不深呗?
小屌丝:我可没有那个意思。
小鱼:我也没说你是哪个意思。
小屌丝:我就是想着你能不能再讲一讲,关于如何实现CPU并行计算。
小鱼:我屮艸芔茻…你这是有啥心思?
小屌丝:我… 我没有。
小鱼:如实说来,或许,我还能讲一讲。
小屌丝:当真?
小鱼:当真…
小屌丝:就是,我要在公司分享一些技术,就涉及到 如何实现CPU并行计算方面的姿势 . 知识。
小鱼:这样啊, 那你直接说就好了。 我又不是特别喜欢黑桃A。
小屌丝:… 整,整,整吧。
这里,我们主要以Python中的额多进程模式进行讲解。
multiprocessing
模块。multiprocessing
模块可以创建和管理多个进程,每个进程都有自己独立的内存空间和执行环境。代码示例
# -*- coding:utf-8 -*- # @Time : 2023-07-01 # @Author : Carl_DJ import multiprocessing def worker(num): """子进程的任务函数""" print(f'Worker {num} started') # 执行一些任务 print(f'Worker {num} finished') if __name__ == '__main__': # 创建多个子进程 processes = [] for i in range(5): p = multiprocessing.Process(target=worker, args=(i,)) processes.append(p) p.start() # 等待所有子进程结束 for p in processes: p.join() print('All workers finished')
多进程模式下,将任务分配给多个进程并行执行,从而利用多核CPU的优势。
这可以说作为一名码农,必备的知识点。
这里,我们同样使用multiprocessing 来实现一个并发执行任务的示例。
代码示例
# -*- coding:utf-8 -*- # @Time : 2023-07-01 # @Author : Carl_DJ import multiprocessing def task(name): print(f"Running task {name}") if __name__ == "__main__": # 创建进程池,最大进程数为4 pool = multiprocessing.Pool(processes=4) # 提交任务到进程池 for i in range(10): pool.apply_async(task, args=(i,)) # 关闭进程池,不再接受新的任务 pool.close() # 等待所有任务完成 pool.join() print("All tasks completed")
解析:
apply_async
方法提交了10个任务到进程池中;对于大量重复的任务,
使用进程池来维护一定数量的进程,每个进程执行一个任务后返回结果,然后再由进程池分配下一个任务。
这样的好处就是:避免频繁地创建和销毁进程,从而提高效率。
我们使用 multiprocessing模块的Pool类来实现进程池。
代码示例
# -*- coding:utf-8 -*- # @Time : 2023-07-01 # @Author : Carl_DJ import multiprocessing def worker(num): print('Worker', num) if __name__ == '__main__': # 创建一个进程池,最大进程数为3 pool = multiprocessing.Pool(processes=3) # 使用进程池执行任务 for i in range(5): pool.apply_async(worker, (i,)) # 关闭进程池,不再接受新的任务 pool.close() # 等待所有任务完成 pool.join()
解析:
在多进程模式下,不同的进程之间需要进行通信,可以利用消息队列来实现进程间通信。
我们使用Queue模块来实现消息队列。
代码示例
# -*- coding:utf-8 -*- # @Time : 2023-07-01 # @Author : Carl_DJ from queue import Queue import time # 创建一个消息队列 message_queue = Queue() # 生产者函数,向消息队列中添加消息 def producer(): for i in range(5): message = f"Message {i+1}" message_queue.put(message) print(f"Produced: {message}") time.sleep(1) # 消费者函数,从消息队列中获取消息并处理 def consumer(): while True: message = message_queue.get() print(f"Consumed: {message}") time.sleep(2) message_queue.task_done() # 创建并启动生产者和消费者线程 producer_thread = threading.Thread(target=producer) consumer_thread = threading.Thread(target=consumer) producer_thread.start() consumer_thread.start() # 等待生产者和消费者线程结束 producer_thread.join() consumer_thread.join()
解析:
message_queue
;producer
负责向消息队列中添加消息;consumer
负责从消息队列中获取消息并进行处理。对于需要多个进程共享的数据,可以使用共享内存来避免数据拷贝和进程间通信的开销。
我们使用multiprocessing模块的Value和Array类来实现共享内存
代码示例
# -*- coding:utf-8 -*- # @Time : 2023-07-01 # @Author : Carl_DJ from multiprocessing import Process, Value, Array # 定义一个共享变量 shared_value = Value('i', 0) # 定义一个共享数组 shared_array = Array('d', [0.0, 1.0, 2.0, 3.0, 4.0]) # 定义一个函数,用于修改共享变量和数组的值 def modify_shared_data(value, array): value.value = 10 for i in range(len(array)): array[i] = i * 2 # 创建一个子进程,传入共享变量和数组 p = Process(target=modify_shared_data, args=(shared_value, shared_array)) p.start() p.join() # 打印共享变量和数组的值 print("Shared value:", shared_value.value) print("Shared array:", shared_array[:])
解析:
对于I/O密集型任务,可以使用异步IO来提高效率。
我们使用asyncio模块来实现异步IO。
代码示例
# -*- coding:utf-8 -*- # @Time : 2023-07-01 # @Author : Carl_DJ import asyncio async def fetch_data(url): print(f"正在请求URL:{url}") await asyncio.sleep(2) # 模拟网络请求延迟 print(f"请求URL:{url}完成") return f"从{url}获取的数据" async def main(): urls = [ "https://www.example.com", "https://www.google.com", "https://www.baidu.com" ] tasks = [fetch_data(url) for url in urls] results = await asyncio.gather(*tasks) print(results) if __name__ == "__main__": asyncio.run(main())
解析:
fetch_data
函数,模拟了一个网络请求,并使用asyncio.sleep
来模拟请求的延迟;main
函数中,创建了多个fetch_data
的协程任务,并使用asyncio.gather
来并发执行这些任务;asyncio.run
来运行main
函数。看到这里,今天的分享差不多就到这里了。
今天主要针对在Python中, 使用多进程模式来实现CPU的并行计算。如:
在实际的项目中,很多地方都会用到并行计算, 这不仅提高的代码执行效率, 也提高了用户的满意度。
我是小鱼:
关注我,带你学习更多更专业更前言的Python技术。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。