赞
踩
共享内存是自python3.8开始引入的新功能。
multiprocessing.shared_memory
— 可跨进程直接访问的共享内存,是从python3.8 开始引入的功能。
该模块提供了一个 SharedMemory
类,用于分配和管理多核或对称多处理器(SMP)机器上进程间的共享内存。为了协助管理不同进程间的共享内存生命周期,multiprocessing.managers
模块也提供了一个 BaseManager
的子类: SharedMemoryManager
。
本模块中,共享内存是指 “System V 类型” 的共享内存块(虽然可能和它实现方式不完全一致)而不是 “分布式共享内存”。这种类型的的共享内存允许不同进程读写一片公共(或者共享)的易失性存储区域。一般来说,进程被限制只能访问属于自己进程空间的内存,但是共享内存允许跨进程共享数据,从而避免通过进程间发送消息的形式传递数据。相比通过磁盘、套接字或者其他要求序列化、反序列化和复制数据的共享形式,直接通过内存共享数据拥有更出色性能。
class multiprocessing.shared_memory.SharedMemory
(name=None, create=False, size=0)
创建一个新的共享内存块或者连接到一片已经存在的共享内存块。每个共享内存块都被指定了一个全局唯一的名称。通过这种方式,一个进程可以通过提供一个特定的名字创建一个共享内存区块,然后其他进程使用同样的名字连接到这个共享内存块。
作为一种跨进程共享数据的方式,共享内存块的寿命可能超过创建它的原始进程。一个共享内存块可能同时被多个进程使用,当一个进程不再需要访问这个共享内存块的时候,应该调用 close()
方法。当一个共享内存块不被任何进程使用的时候,应该调用 unlink()
方法以执行必要的清理。
name 是共享内存的唯一名称,字符串类型。如果创建一个新共享内存块的时候,名称指定为 None
(默认值),将会随机产生一个新名称。
create 指定创建一个新的共享内存块 (True
) 还是连接到已存在的共享内存块 (False
) 。
如果是新创建共享内存块则 size 用于指定块的大小为多少字节。由于某些平台是使用特定内存页大小为最小单位来分配的,最终得到的内存块大小可能大于或等于要求的大小。如果是连接到已经存在的共享内存块, size
参数会被忽略。
close
()
关闭实例对于共享内存的访问连接。所有实例确认自己不再需要使用共享内存的时候都应该调用 close()
,以保证必要的资源清理。调用 close()
并不会销毁共享内存区域。
unlink
()
请求销毁底层的共享内存块。 为了执行必要的资源清理,在所有使用这个共享内存块的进程中,unlink()
应该调用一次(且只能调用一次)。 发出此销毁请求后,共享内存块可能会、也可能不会立即销毁,且此行为在不同操作系统之间可能不同。 调用 unlink()
后再尝试访问其中的数据可能导致内存错误。 注意:最后一个关闭共享内存访问权限的进程可以以任意顺序调用 unlink()
和 close()
。
buf
共享内存块内容的 memoryview 。
name
共享内存块的唯一标识,只读属性。
size
共享内存块的字节大小,只读属性。
示例1:展示了 SharedMemory
的基础用法
from multiprocessing import shared_memory import array if __name__ == '__main__': shm_a = shared_memory.SharedMemory(create=True, size=40) print('type: ', type(shm_a)) print('name: ', shm_a.name) print('size: ', shm_a.size) buffer = shm_a.buf buffer[:4] = bytearray([22, 33, 44, 55]) # Modify multiple at once print('buffer_a: ', [i for i in bytes(buffer[:4])]) buffer[4] = 100 # Modify single byte at a time print('buffer_a: ', [i for i in bytes(buffer[:5])]) shm_b = shared_memory.SharedMemory(shm_a.name) shm_b_array = array.array('b', shm_b.buf[:5]) # Copy the data into a new array.array print('shm_b_array: ', shm_b_array) shm_b.buf[:5] = b'howdy' # Modify via shm_b using bytes print('buffer_a: ', bytes(shm_b.buf[:5])) # Access via shm_a shm_b.close() # Close each SharedMemory instance shm_a.close() shm_a.unlink() # Call unlink only once to release the shared memory
输出:
type: <class 'multiprocessing.shared_memory.SharedMemory'>
name: wnsm_97359521
size: 40
buffer_a: [22, 33, 44, 55]
buffer_a: [22, 33, 44, 55, 100]
shm_b_array: array('b', [22, 33, 44, 55, 100])
buffer_a: b'howdy'
示例2:展示了 SharedMemory
类结合NumPy
数组的读写示例
import multiprocessing import time from multiprocessing import shared_memory import numpy as np class ChildProcess(multiprocessing.Process): def __init__(self, shm_name=None, shape=None, dtype=None): super().__init__() self._shm_name = shm_name self._shape = shape self._dtype = dtype def run(self) -> None: existing_shm = shared_memory.SharedMemory(name=self._shm_name, create=False) # Attach to the existing shared memory block print(existing_shm.name) c = np.ndarray(shape=self._shape, dtype=self._dtype, buffer=existing_shm.buf) print('c0: ', c) c[-1] = 888 print('c1: ', c) del c existing_shm.close() if __name__ == '__main__': a = np.array([1, 1, 2, 3, 5, 6]) # Start with an existing NumPy array print('a: ', a) print('a.shape: ', a.shape) print('a type: ', type(a)) shm = shared_memory.SharedMemory(name=None, create=True, size=a.nbytes) b = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf) # Now create a NumPy array backed by shared memory print('b0: ', b) b[:] = a[:] # Copy the original data into shared memory print('b1: ', b) print('shm name: ', shm.name) child_process = ChildProcess(shm_name=shm.name, shape=b.shape, dtype=b.dtype) child_process.start() time.sleep(2) print('b2: ', b) shm.close() shm.unlink() # Free and release the shared memory block at the very end
输出:
a: [1 1 2 3 5 6]
a.shape: (6,)
a type: <class 'numpy.ndarray'>
b0: [0 0 0 0 0 0]
b1: [1 1 2 3 5 6]
shm name: wnsm_cb62400b
wnsm_cb62400b
c0: [1 1 2 3 5 6]
c1: [ 1 1 2 3 5 888]
b2: [ 1 1 2 3 5 888]
示例3:展示了 SharedMemory
类结合NumPy
图像的示例
from multiprocessing import shared_memory import numpy as np import cv2 if __name__ == '__main__': image = cv2.imread('1.jpg') print(type(image)) print(image.shape) image_shape = image.shape image_dtype = image.dtype shm = shared_memory.SharedMemory(name=None, create=True, size=2448*2048*3*1) # 图像像素2448x2048, RGB 3通道 shm_ndarray = np.ndarray(shape=image_shape, dtype=image_dtype, buffer=shm.buf) shm_ndarray[:] = image existing_shm = shared_memory.SharedMemory(name=shm.name, create=False) c = np.ndarray(shape=image_shape, dtype=image_dtype, buffer=existing_shm.buf) cv2.imwrite('2.jpg', c) shm.close() existing_shm.close() existing_shm.unlink()
Github项目地址: https://hub.nuaa.cf/soloist-v/SharedMemoryQueue/tree/main 使用其main分支。
(不要使用Gitee上的,Gitee上的没有更新,运行有问题)
示例:使用共享内存队列传输图像
示例连接,https://download.csdn.net/download/WonderThink/88792215
from shared_memory_queue.shared_memory_record import SharedMemoryRecorder from shared_memory_queue.kfifo_queue import Queue import multiprocessing import os import cv2 def producer(shm_queue: Queue): print('producer start.') for i in range(0, len(os.listdir('./img/'))): image_path = './img/' + str(i) + '.jpg' if os.path.exists(image_path): image_ndarray = cv2.imread(image_path) print(f'i: {i}, shape: {image_ndarray.shape}') shm_queue.put(item=image_ndarray, block=True, timeout=1) print('producer end.') def consumer(shm_queue: Queue, shape=None): print('consumer start.') i = 0 while True: try: data = shm_queue.get(block=True, timeout=2) except: break image = data.reshape(shape) new_image_dir = './img2/' if not os.path.exists(new_image_dir): os.makedirs(new_image_dir) cv2.imwrite(os.path.join(new_image_dir, str(i)+'.jpg'), image) i += 1 print('consumer end.') if __name__ == '__main__': image_shape = (2048, 2448, 3) # numpy shape: 行数、列数、RGB三通道数 shm_queue = Queue(buffer_size=image_shape[0] * image_shape[1] * image_shape[2] * 10) print(shm_queue.sm.get_sm_name()) producer_process = multiprocessing.Process(target=producer, args=(shm_queue,)) producer_process.start() consumer_process = multiprocessing.Process(target=consumer, args=(shm_queue, image_shape,)) consumer_process.start() producer_process.join() consumer_process.join() shm_queue.sm.close() # SharedMemoryRecorder.release_last_sm()
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。