在 Python 中使用并行处理时固定图像保存时乱序

问题描述 投票:0回答:1

我遇到了一个问题,即使用 Python 中的并行处理调整固定图像的大小后,固定图像的保存顺序是乱序的。运动图像保存正确,但固定图像的顺序混淆了。

这就是我正在做的:

我加载成对的固定图像和移动图像,并使用 ThreadPoolExecutor 进行并行处理。 我将图像大小调整为目标形状 (1024x1024) 并将它们保存在指定的输出目录中。 问题在于,处理后,固定图像没有按预期的时间顺序保存,而运动图像则如此。

代码:

def process_image_pair(fixed_file, moving_file, target_shape):
    fixed_image_name = os.path.splitext(fixed_file)[0]
    moving_image_name = os.path.splitext(moving_file)[0]

    fixed_output_path = os.path.join(output_dir, f"{fixed_image_name}_processed.pt")
    moving_output_path = os.path.join(output_dir, f"{moving_image_name}_processed.pt")

    print(f"Processing pair: {fixed_file} and {moving_file}")

    if os.path.exists(fixed_output_path) and os.path.exists(moving_output_path):
        debug_print(f"Processed pair: {fixed_output_path} and {moving_output_path} already exist. Skipping processing.")
        try:
            fixed_image = torch.load(fixed_output_path)
            moving_image = torch.load(moving_output_path)
            return fixed_image, moving_image
        except EOFError:
            debug_print(f"Error loading processed images. The files may be corrupted.")
            os.remove(fixed_output_path)
            os.remove(moving_output_path)
            debug_print(f"Deleted corrupted files: {fixed_output_path} and {moving_output_path}")

    print(f"Processing pair: {fixed_file} and {moving_file}")

    full_fixed_path = os.path.join(fixed_image_path, fixed_file)
    debug_print(f"Loading fixed image: {full_fixed_path}")
    fixed_image = tiff.imread(full_fixed_path)
    if fixed_image.ndim == 3 and fixed_image.shape[0] in {1, 3}:
        fixed_image = np.transpose(fixed_image, (1, 2, 0))
        fixed_image = img_as_ubyte(fixed_image)
    fixed_image = TF.rgb_to_grayscale(Image.fromarray(fixed_image))
    fixed_image = resize_fixed_image(np.array(fixed_image), target_shape[0])
    debug_print(f"Fixed image resized: {fixed_file}")

    full_moving_path = os.path.join(moving_image_path, moving_file)
    debug_print(f"Loading moving image: {full_moving_path}")
    moving_image = imread(full_moving_path, is_ome=True, level=0, aszarr=False)

    if moving_image.ndim == 4:  # If 4D, determine batch and channel dimensions
        non_singleton_dims = [dim for dim in range(moving_image.ndim) if moving_image.shape[dim] > 1]
        batch_dim = non_singleton_dims[0]
        channel_dim = non_singleton_dims[1] if len(non_singleton_dims) > 1 else batch_dim        
        moving_image = np.moveaxis(moving_image, batch_dim, 0)
    elif moving_image.ndim == 3:  
        moving_image = np.transpose(moving_image, (1, 2, 0))
        moving_image = img_as_ubyte(moving_image)

    moving_image = resize_moving_image(moving_image, target_shape[0])
    moving_image = np.rot90(moving_image, k=3)

    debug_print(f"Moving image resized: {moving_file}")

    fixed_dir = os.path.dirname(fixed_output_path)
    moving_dir = os.path.dirname(moving_output_path)
    os.makedirs(fixed_dir, exist_ok=True)
    os.makedirs(moving_dir, exist_ok=True)

    torch.save(fixed_image, fixed_output_path, pickle_protocol=pickle.HIGHEST_PROTOCOL)
    torch.save(moving_image, moving_output_path, pickle_protocol=pickle.HIGHEST_PROTOCOL)

    print(f"Saved processed pair: {fixed_file} and {moving_file}")

    return fixed_image, moving_image

# Wrapper function to include target_shape
def process_image_pair_wrapper(args):
    return process_image_pair(*args)

def process_fixed_image(fixed_image, index, output_dir):
    fixed_image_resized = load_resized_image(output_dir, index, "fixed")
    if fixed_image_resized is None:
        fixed_image_resized = resize_fixed_image(fixed_image, 1024)
        save_resized_image(output_dir, fixed_image_resized, index, "fixed")
    return fixed_image_resized

def process_moving_image(moving_image, index, output_dir):
    moving_image_resized = load_resized_image(output_dir, index, "moving")
    if moving_image_resized is None:
        moving_image_resized = resize_moving_image(moving_image, 1024)
        save_resized_image(output_dir, moving_image_resized, index, "moving")
    return moving_image_resized

# Check if dataset exists and load it
dataset_path = os.path.join(output_dir, "processed_dataset.pt")
if os.path.exists(dataset_path):
    print("Loading existing dataset...")
    fixed_images, moving_images = torch.load(dataset_path)
else:
    fixed_images = []
    moving_images = []

# Process images in parallel
target_shape = (1024, 1024)
with concurrent.futures.ThreadPoolExecutor() as executor:
    results = list(executor.map(process_image_pair_wrapper, [(fixed_file, moving_file, target_shape) for fixed_file, moving_file in zip(fixed_image_files, moving_image_files)]))

# Unpack the results and ensure uniqueness
for fixed_image, moving_image in results:
    fixed_images.append(fixed_image)
    moving_images.append(moving_image)

# Combine into a single dataset and save
torch.save((fixed_images, moving_images), dataset_path, pickle_protocol=pickle.HIGHEST_PROTOCOL)

all_fixed_images_resized = []
all_moving_images_resized = []

for index in range(len(fixed_images)):
    print(f"Processing pair {index+1}/{len(fixed_images)}")
    fixed_image_resized = process_fixed_image(fixed_images[index], index, output_dir_vxm)
    moving_image_resized = process_moving_image(moving_images[index], index, output_dir_vxm)
    
    all_fixed_images_resized.append(fixed_image_resized)
    all_moving_images_resized.append(moving_image_resized)

# Combine into a single dataset
fixed_images_np = np.stack(all_fixed_images_resized)
moving_images_np = np.stack(all_moving_images_resized)

torch.save((fixed_images_np, moving_images_np), os.path.join(output_dir_vxm, "resized_dataset.pt"), pickle_protocol=5)
python numpy image-processing pytorch image-resizing
1个回答
0
投票

问:
“(...)在使用 Python 中的并行处理调整它们的大小后,保存得乱序。(为什么?)”

因为期望以相同的顺序收到“订单”
(与纯

[SERIAL]
顺序处理相同)
来自just-
[CONCURRENT]
处理,

来自true-
[PARALLEL]
处理

直接错误,
这几乎是计算机科学的矛盾修辞法

你的教育者应该在早期的编程和计算机科学讲座中告诉你,组织工作流程的

[SERIAL]
[CONCURRENT]
[PARALLEL]
类型并不相同,也不相似,每种类型都有非常不同的属性,每个对资源使用、资源映射、工作调度和可以有(相反,有些不能有)相互协调都有非常不同的要求。

# Process images in parallel .................... NO IT IS NOT [PARALLEL]
#                            ....................    IT TAKES
#                            ....................    AWFULLY LOT EFFORTS
#                            ....................    TO FORCE [SERIAL]-Python
#                            ....................    ACT in [PARALLEL] on macro-level
#                            ....................    as add-on costs are IMMENSE
#                            WHY?
#                            READ this : https://stackoverflow.com/revisions/18374629/3
#                            1st
#
with concurrent.futures.ThreadPoolExecutor() as executor:
    results = list( executor.map( process_image_pair_wrapper,
                                  [ ( fixed_file,
                                      moving_file,
                                      target_shape
                                      )
                                  for fixed_file,
                                      moving_file
                                  in  zip(  fixed_image_files,
                                           moving_image_files
                                           )
                                      ]
                                  )
                    )

使用

concurrent.futures.ThreadPoolExecutor()
只会创建一个
[CONCURRENT]
Python 线程池,这些线程仍处于 2024 年 8 月纯-
[SERIAL]
阶段,在宏观层面上由 Python 全局解释器锁 (GIL) -lock ),因此重新[SERIAL]-izes可以防止Python解释器崩溃到并发相关的冲突中,但是这个GIL步进不会恢复,更不用说从不协调的池中按顺序交付。

(绝对不是并行处理的情况)

如果您将列表设置为“可排序”,您仍然可以将未排序的结果重新排序回原始序列。作为一个简单的修复,这至少可以节省您迄今为止在代码中投入的精力。

© www.soinside.com 2019 - 2024. All rights reserved.