我遇到了一个问题,即使用 Python 中的并行处理调整固定图像的大小后,固定图像的保存顺序是乱序的。运动图像保存正确,但固定图像的顺序混淆了。
这就是我正在做的:
我加载成对的固定图像和移动图像,并使用 ThreadPoolExecutor 进行并行处理。 我将图像大小调整为目标形状 (1024x1024) 并将它们保存在指定的输出目录中。 问题在于,处理后,固定图像没有按预期的时间顺序保存,而运动图像则如此。
代码:
def process_image_pair(fixed_file, moving_file, target_shape):
fixed_image_name = os.path.splitext(fixed_file)[0]
moving_image_name = os.path.splitext(moving_file)[0]
fixed_output_path = os.path.join(output_dir, f"{fixed_image_name}_processed.pt")
moving_output_path = os.path.join(output_dir, f"{moving_image_name}_processed.pt")
print(f"Processing pair: {fixed_file} and {moving_file}")
if os.path.exists(fixed_output_path) and os.path.exists(moving_output_path):
debug_print(f"Processed pair: {fixed_output_path} and {moving_output_path} already exist. Skipping processing.")
try:
fixed_image = torch.load(fixed_output_path)
moving_image = torch.load(moving_output_path)
return fixed_image, moving_image
except EOFError:
debug_print(f"Error loading processed images. The files may be corrupted.")
os.remove(fixed_output_path)
os.remove(moving_output_path)
debug_print(f"Deleted corrupted files: {fixed_output_path} and {moving_output_path}")
print(f"Processing pair: {fixed_file} and {moving_file}")
full_fixed_path = os.path.join(fixed_image_path, fixed_file)
debug_print(f"Loading fixed image: {full_fixed_path}")
fixed_image = tiff.imread(full_fixed_path)
if fixed_image.ndim == 3 and fixed_image.shape[0] in {1, 3}:
fixed_image = np.transpose(fixed_image, (1, 2, 0))
fixed_image = img_as_ubyte(fixed_image)
fixed_image = TF.rgb_to_grayscale(Image.fromarray(fixed_image))
fixed_image = resize_fixed_image(np.array(fixed_image), target_shape[0])
debug_print(f"Fixed image resized: {fixed_file}")
full_moving_path = os.path.join(moving_image_path, moving_file)
debug_print(f"Loading moving image: {full_moving_path}")
moving_image = imread(full_moving_path, is_ome=True, level=0, aszarr=False)
if moving_image.ndim == 4: # If 4D, determine batch and channel dimensions
non_singleton_dims = [dim for dim in range(moving_image.ndim) if moving_image.shape[dim] > 1]
batch_dim = non_singleton_dims[0]
channel_dim = non_singleton_dims[1] if len(non_singleton_dims) > 1 else batch_dim
moving_image = np.moveaxis(moving_image, batch_dim, 0)
elif moving_image.ndim == 3:
moving_image = np.transpose(moving_image, (1, 2, 0))
moving_image = img_as_ubyte(moving_image)
moving_image = resize_moving_image(moving_image, target_shape[0])
moving_image = np.rot90(moving_image, k=3)
debug_print(f"Moving image resized: {moving_file}")
fixed_dir = os.path.dirname(fixed_output_path)
moving_dir = os.path.dirname(moving_output_path)
os.makedirs(fixed_dir, exist_ok=True)
os.makedirs(moving_dir, exist_ok=True)
torch.save(fixed_image, fixed_output_path, pickle_protocol=pickle.HIGHEST_PROTOCOL)
torch.save(moving_image, moving_output_path, pickle_protocol=pickle.HIGHEST_PROTOCOL)
print(f"Saved processed pair: {fixed_file} and {moving_file}")
return fixed_image, moving_image
# Wrapper function to include target_shape
def process_image_pair_wrapper(args):
return process_image_pair(*args)
def process_fixed_image(fixed_image, index, output_dir):
fixed_image_resized = load_resized_image(output_dir, index, "fixed")
if fixed_image_resized is None:
fixed_image_resized = resize_fixed_image(fixed_image, 1024)
save_resized_image(output_dir, fixed_image_resized, index, "fixed")
return fixed_image_resized
def process_moving_image(moving_image, index, output_dir):
moving_image_resized = load_resized_image(output_dir, index, "moving")
if moving_image_resized is None:
moving_image_resized = resize_moving_image(moving_image, 1024)
save_resized_image(output_dir, moving_image_resized, index, "moving")
return moving_image_resized
# Check if dataset exists and load it
dataset_path = os.path.join(output_dir, "processed_dataset.pt")
if os.path.exists(dataset_path):
print("Loading existing dataset...")
fixed_images, moving_images = torch.load(dataset_path)
else:
fixed_images = []
moving_images = []
# Process images in parallel
target_shape = (1024, 1024)
with concurrent.futures.ThreadPoolExecutor() as executor:
results = list(executor.map(process_image_pair_wrapper, [(fixed_file, moving_file, target_shape) for fixed_file, moving_file in zip(fixed_image_files, moving_image_files)]))
# Unpack the results and ensure uniqueness
for fixed_image, moving_image in results:
fixed_images.append(fixed_image)
moving_images.append(moving_image)
# Combine into a single dataset and save
torch.save((fixed_images, moving_images), dataset_path, pickle_protocol=pickle.HIGHEST_PROTOCOL)
all_fixed_images_resized = []
all_moving_images_resized = []
for index in range(len(fixed_images)):
print(f"Processing pair {index+1}/{len(fixed_images)}")
fixed_image_resized = process_fixed_image(fixed_images[index], index, output_dir_vxm)
moving_image_resized = process_moving_image(moving_images[index], index, output_dir_vxm)
all_fixed_images_resized.append(fixed_image_resized)
all_moving_images_resized.append(moving_image_resized)
# Combine into a single dataset
fixed_images_np = np.stack(all_fixed_images_resized)
moving_images_np = np.stack(all_moving_images_resized)
torch.save((fixed_images_np, moving_images_np), os.path.join(output_dir_vxm, "resized_dataset.pt"), pickle_protocol=5)
问:
“(...)在使用 Python 中的并行处理调整它们的大小后,保存得乱序。(为什么?)”
因为期望以相同的顺序收到“订单”
(与纯
[SERIAL]
顺序处理相同)[CONCURRENT]
处理,[PARALLEL]
处理你的教育者应该在早期的编程和计算机科学讲座中告诉你,组织工作流程的
[SERIAL]
、[CONCURRENT]
和 [PARALLEL]
类型并不相同,也不相似,每种类型都有非常不同的属性,每个对资源使用、资源映射、工作调度和可以有(相反,有些不能有)相互协调都有非常不同的要求。
# Process images in parallel .................... NO IT IS NOT [PARALLEL]
# .................... IT TAKES
# .................... AWFULLY LOT EFFORTS
# .................... TO FORCE [SERIAL]-Python
# .................... ACT in [PARALLEL] on macro-level
# .................... as add-on costs are IMMENSE
# WHY?
# READ this : https://stackoverflow.com/revisions/18374629/3
# 1st
#
with concurrent.futures.ThreadPoolExecutor() as executor:
results = list( executor.map( process_image_pair_wrapper,
[ ( fixed_file,
moving_file,
target_shape
)
for fixed_file,
moving_file
in zip( fixed_image_files,
moving_image_files
)
]
)
)
使用
concurrent.futures.ThreadPoolExecutor()
只会创建一个 [CONCURRENT]
Python 线程池,这些线程仍处于 2024 年 8 月纯-[SERIAL]
阶段,在宏观层面上由 Python 全局解释器锁 (GIL) -lock ),因此重新[SERIAL]-izes可以防止Python解释器崩溃到并发相关的冲突中,但是这个GIL步进不会恢复,更不用说从不协调的池中按顺序交付。
(绝对不是并行处理的情况)
如果您将列表设置为“可排序”,您仍然可以将未排序的结果重新排序回原始序列。作为一个简单的修复,这至少可以节省您迄今为止在代码中投入的精力。