我用
process = 1
和 process=4
进行了批量实验,它给了我结果,但我现在很困惑,因为我认为结果是有序的。如果它们没有组织起来,我将无法将它们与地面真相进行映射
例如,我们说我的
data_length=5
和我的batch=3
。因此,如果我得到 [[1,2,3], [4,5]]
的结果 process=1
那么我期望在使用 process = 4
时,当我压平结果时应该得到相同的结果。
它们出了故障。我做错了什么?
注意:我在将数据传递到进程时使用了
zip(text,label)
以获得正确的映射,但这不是问题
以下是代码:
def seed_everything(seed=13):
random.seed(seed)
os.environ['PYTHONHASHSEED'] = str(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
set_seed(seed)
torch.backends.cudnn.deterministic = True
seed_everything(seed = 13)
def test():
accelerator = Accelerator()
accelerator.wait_for_everyone()
seed_everything(seed = 13)
model = load_model(model = "my_model_path"
lora = "./my_lora_checkpoint/checkpoint-8200",
device = {"": accelerator.process_index},
num_labels = NUM_LABELS,
merge_unload = False)
with accelerator.split_between_processes(zipped_text_label) as prompts:
res = {"pred_probs": [], "pred_labels": []}
BATCH_SIZE = 10
BATCHES = [prompts[i:i + BATCH_SIZE] for i in range(0, len(prompts), BATCH_SIZE)]
print(len(BATCHES[0]))
pred_probs = []
pred_labels = []
for batch in tqdm(BATCHES):
text_batch = [i[0] for i in batch]
score_batch = [i[1] for i in batch]
with torch.no_grad():
inputs = tokenizer(text_batch,truncation= True, max_length=MAX_LENGTH, padding="max_length", return_tensors = "pt").to(model.device)
logits = model(**inputs).logits.cpu().to(torch.float32)
probs = torch.softmax(logits, dim = 1).numpy()
res["pred_probs"].append(probs.tolist())
res["pred_labels"].append(probs.argmax(axis = 1).tolist())
res = [res]
result = gather_object(res)
if accelerator.is_main_process:
print(result)
notebook_launcher(test, num_processes=1)
当使用多个 GPU 和 Hugging Face 的 Accelerate 等框架时,数据处理可以并行进行,如果不仔细管理,可能会导致结果集合无序。在您的情况下,当使用 num_processes=1 时,您的结果会按自然顺序收集,但使用 num_processes=4 时,结果可以根据各个进程完成计算的方式以非顺序方式返回。
使用accelerator.split_ Between_processes时,数据将被分割到可用的进程中。例如,有 4 个进程(假设总数据长度为 5),每个进程可能会收到要处理的不同数据子集。
由于每个进程独立并发运行,因此它们返回的结果确实可以以非确定性顺序返回。因此,如果一个进程首先完成其批次的处理,它将在可能稍后启动的另一个进程之前返回结果。
您最初的期望是多个进程的结果将保持其处理顺序相同,但这并不能保证。因此,依赖多个进程的返回顺序可能会导致与真实情况不匹配。
为了确保您可以将预测与原始事实相匹配,请考虑以下方法:
拆分数据时,您可以将原始索引包含在输入数据中。例如,如果您使用 zipped_text_label,则可以在每个元组中包含索引:
indexed_data = [(i, txt, lbl) for i, (txt, lbl) in enumerate(zipped_text_label)]
预测后,根据索引对结果进行排序,然后再组合它们。
修改 pred_probs 和 pred_labels 以也包含索引。以下是如何调整代码的示例:
for batch in tqdm(BATCHES):
# Just an example; modified to include index
indexed_batch = [(index, text, score) for index, (text, score) in zip(range(len(batch)), batch)]
text_batch = [item[1] for item in indexed_batch]
...
for index, prob_array in zip(range(batch_size), probs.tolist()):
res["pred_probs"].append((indexed_batch[index][0], prob_array))
res["pred_labels"].append((indexed_batch[index][0], prob_array.argmax(axis=1).tolist()))
# After finishing processing, sort results based on original index
res["pred_probs"].sort(key=lambda x: x[0])
res["pred_labels"].sort(key=lambda x: x[0])
从多个进程收集结果时,请确保还按索引收集所有内容或确保结果根据原始顺序对齐。
收集结果后,请确保根据您使用的指数重新构建原始订单。这样,您就可以将您的预测直接映射回真实情况。
实现示例: 这是收集和组织预测的简化方法:
pred_probs = []
pred_labels = []
for batch in tqdm(BATCHES):
indexed_batch = [(i, b) for i, b in enumerate(batch)]
text_batch = [b[0] for b in indexed_batch]
with torch.no_grad():
inputs = tokenizer(text_batch, ...)
logits = model(**inputs).logits.cpu()
probs = torch.softmax(logits, dim=1).numpy()
for idx, prob_array in zip(range(len(probs)), probs):
pred_probs.append((indexed_batch[idx][0], prob_array.tolist()))
pred_labels.append((indexed_batch[idx][0], prob_array.argmax(axis=1).tolist()))
# Sort results by their original index
pred_probs.sort(key=lambda x: x[0])
pred_labels.sort(key=lambda x: x[0])
# Now you can extract just the probabilities or labels in the correct order
ordered_pred_probs = [prob[1] for prob in pred_probs]
ordered_pred_labels = [label[1] for label in pred_labels]
总之,为了确保使用 Accelerate 时多 GPU 推理的输出顺序的一致性,在处理管道中包含索引将帮助您维护预测与原始真实情况之间的映射。这样,即使使用并发处理,您也可以按照所需的顺序重建结果。