我正在使用Python 3.4.3和OpenCV 3.0.0来处理(应用各种过滤器)内存中非常大的图像(80,000 x 60,000),我想使用多个CPU内核来提高性能。经过一些阅读,我得出了两种可能的方法:1)使用python的multiprocessing
模块,让每个进程处理一大片大图像并在处理完成后加入结果(这可能应该在POSIX系统上执行?)2 )由于NumPy支持OpenMP,而OpenCV使用NumPy,我可以将多处理留给NumPy吗?
所以我的问题是:
哪一个会是更好的解决方案? (如果它们看起来不合理,那么可能的方法是什么?)
如果选项2是好的,我应该用OpenMP构建NumPy和OpenCV吗?我如何实际进行多处理? (我真的找不到有用的指示..)
我不知道你需要什么类型的过滤器,但如果它相当简单,你可以考虑libvips。它是一个图像处理系统,用于非常大的图像(大于您拥有的内存量)。它来自一系列由欧盟资助的科学艺术成像项目,因此重点关注图像捕获和比较所需的操作类型:卷积,等级,形态,算术,颜色分析,重采样,直方图等等。
这是fast (faster than OpenCV, on some benchmarks at least), needs little memory,还有a high-level Python binding。它适用于Linux,OS X和Windows。它会自动为您处理所有多处理。
在阅读了一些SO帖子之后,我想出了一种方法,可以在Python3中使用OpenCV
和multiprocessing
。我建议在linux上这样做,因为根据this post,只要内容没有改变,衍生进程就会与父进程共享内存。这是一个最小的例子:
import cv2
import multiprocessing as mp
import numpy as np
import psutil
img = cv2.imread('test.tiff', cv2.IMREAD_ANYDEPTH) # here I'm using a indexed 16-bit tiff as an example.
num_processes = 4
kernel_size = 11
tile_size = img.shape[0]/num_processes # Assuming img.shape[0] is divisible by 4 in this case
output = mp.Queue()
def mp_filter(x, output):
print(psutil.virtual_memory()) # monitor memory usage
output.put(x, cv2.GaussianBlur(img[img.shape[0]/num_processes*x:img.shape[0]/num_processes*(x+1), :],
(kernel_size, kernel_size), kernel_size/5))
# note that you actually have to process a slightly larger block and leave out the border.
if __name__ == 'main':
processes = [mp.Process(target=mp_filter, args=(x, output)) for x in range(num_processes)]
for p in processes:
p.start()
result = []
for ii in range(num_processes):
result.append(output.get(True))
for p in processes:
p.join()
而不是使用Queue
,另一种从流程中收集结果的方法是通过multiprocessing
模块创建共享数组。 (必须导入ctypes
)
result = mp.Array(ctypes.c_uint16, img.shape[0]*img.shape[1], lock = False)
然后,假设没有重叠,每个进程可以写入数组的不同部分。然而,创建一个大型mp.Array
的速度令人惊讶地缓慢。这实际上违背了加速操作的目的。因此,只有当与总计算时间相比,增加的时间不多时才使用它。这个数组可以通过以下方式变成numpy数组:
result_np = np.frombuffer(result, dtypye=ctypes.c_uint16)
这可以使用Ray干净地完成,import cv2
import numpy as np
import ray
num_tasks = 4
kernel_size = 11
@ray.remote
def mp_filter(image, i):
lower = image.shape[0] // num_tasks * i
upper = image.shape[0] // num_tasks * (i + 1)
return cv2.GaussianBlur(image[lower:upper, :],
(kernel_size, kernel_size), kernel_size // 5)
if __name__ == '__main__':
ray.init()
# Load the image and store it once in shared memory.
image = np.random.normal(size=(1000, 1000))
image_id = ray.put(image)
result_ids = [mp_filter.remote(image_id, i) for i in range(num_tasks)]
results = ray.get(result_ids)
是一个用于并行和分布式Python的库。 Ray有关“任务”的原因,而不是使用fork-join模型,这提供了一些额外的灵活性(例如,即使在分配工作进程之后,您在共享内存中放置值),相同的代码在多台机器上运行,您可以将任务组合在一起等
Plasma shared-memory object store
请注意,您可以在共享内存中存储多个numpy数组,如果您的Python对象包含numpy数组(如包含numpy数组的字典),您也可以受益。在引擎盖下,这使用Apache Arrow data layout和Ray documentation。
您可以在qazxswpoi中阅读更多内容。请注意,我是Ray开发人员之一。