如何使用权重和偏差 wandb 扫描实现多处理以实现最大并行化,特别是计数变量在此设置中如何工作?

问题描述 投票:0回答:1

我正在尝试使用权重和偏差 (W&B) 运行超参数扫描,并希望利用多重处理来尽可能并行化我的实验。我想确保每组超参数仅评估一次,并且我可以同时运行多个实验而无需重复超参数。

这是我的培训和评估脚本的简化版本:

import random
import numpy as np
import wandb
from concurrent.futures import ProcessPoolExecutor, as_completed
from typing import Tuple, Dict

def train_one_epoch(epoch: int, lr: float, bs: int) -> Tuple[float, float]:
    """
    Simulate training for one epoch.
    """
    acc = 0.25 + ((epoch / 30) + (random.random() / 10))
    loss = 0.2 + (1 - ((epoch - 1) / 10 + random.random() / 5))
    return acc, loss

def evaluate_one_epoch(epoch: int) -> Tuple[float, float]:
    """
    Simulate evaluation for one epoch.
    """
    acc = 0.1 + ((epoch / 20) + (random.random() / 10))
    loss = 0.25 + (1 - ((epoch - 1) / 10 + random.random() / 6))
    return acc, loss

config: Dict[str, float] = {"lr": 0.0001, "bs": 16, "epochs": 5}

def run_epoch(epoch: int, lr: float, bs: int) -> Tuple[int, float, float, float, float]:
    """
    Run training and evaluation for one epoch.
    """
    train_acc, train_loss = train_one_epoch(epoch, lr, bs)
    val_acc, val_loss = evaluate_one_epoch(epoch)
    return epoch, train_acc, train_loss, val_acc, val_loss

def main() -> None:
    """
    Main function to run the training and evaluation in parallel using multiprocessing.
    """
    lr = config["lr"]
    bs = config["bs"]
    epochs = config["epochs"]

    # Initialize Weights and Biases
    wandb.init(project="my_project", config=config)

    with ProcessPoolExecutor() as executor:
        futures = [executor.submit(run_epoch, epoch, lr, bs) for epoch in np.arange(1, epochs)]
        for future in as_completed(futures):
            epoch, train_acc, train_loss, val_acc, val_loss = future.result()
            wandb.log({
                "epoch": epoch,
                "train_acc": train_acc,
                "train_loss": train_loss,
                "val_acc": val_acc,
                "val_loss": val_loss
            })
            print(f"epoch: {epoch}")
            print(f"training accuracy: {train_acc}, training loss: {train_loss}")
            print(f"validation accuracy: {val_acc}, validation loss: {val_loss}")

if __name__ == "__main__":
    main()

问题:

  1. counts
    在W&B多处理中的作用
    :如何正确配置
    counts
    参数(如果存在)或W&B中的任何其他相关参数以最大化并行化而不重复超参数?
  2. 确保唯一的超参数:如何确保每个进程在运行扫描时选择一组唯一的超参数?我应该在我的 Python 代码中处理这个问题还是 W&B 中有一个功能可以管理这个问题?
  3. 并行化最佳实践:是否有使用 W&B 实施并行化以最有效地利用资源的最佳实践?例如,是否有一种首选方法来构建
    ProcessPoolExecutor
    或任何其他方法来最大化并行运行?

我对两者都感兴趣

  1. 仅使用CPU多核选项的并行性
  2. 多个 GPU 选项的并行性

任何见解、代码示例或对 W&B 文档的引用将不胜感激!


也欢迎对如何在 CLI/Bash 选项中执行此操作进行扩展和评论!


参考资料:

multithreading machine-learning deep-learning multiprocessing wandb
1个回答
0
投票

使用权重和偏差 (W&B/wandb) 进行超参数 (hp) 优化时,您可以使用扫描来系统地探索超参数的不同组合,以找到最佳性能集。

什么是扫描?

W&B 中的扫描允许您定义一组要搜索的超参数。当您在 cli 或 python 中创建扫描时,这会创建一组(例如网格)用于搜索的超参数。这(通常是afaik)使用您的 wandb 帐户/服务器中所有可能的超参数创建一个扫描,稍后当您运行代理时,它会获取一个并尝试该 HP 并将其记录到 wandb。

了解计数和代理

  • 代理:这些是运行试验/马力尝试/运行的工作人员——基本上尝试每个马力。每个代理从 W&B/wandb 服务器提取一组超参数 (hps),使用这些 hps 运行(通常)训练脚本(但在我们的示例中搜索最佳龙猫缩放法则),记录结果并重复。
  • 计数:这是代理将运行的试验/运行的次数。如果将计数设置为 100,代理将运行 100 次试验,每次试验都具有不同的超参数组合。

Afaik,如果您使用网格搜索并且计数高于组合总数,则扫描在运行所有组合后停止。如果计数较低,则仅运行指定次数的试验。

据我所知,只要代理正在运行并且扫描有 hps 可以尝试,它就会不断从您的 wandb 帐户从您的 wandb 扫描服务器获取它们。您可以在您的 wandb 帐户中看到正在运行的代理(并杀死它们、暂停它们)等。

我将提供一个没有多重处理的示例,然后将其设为多重处理(mp):

1 没有多重处理的示例

import numpy as np
import scipy.optimize as opt
import scipy.special
import wandb

# Define the synthetic scaling law function
def scaling_law(c, e, a, b, alpha, beta):
    return np.exp(e) + np.exp(a) * c[:, 0] ** (-alpha) + np.exp(b) * c[:, 1] ** (-beta)

# Generate synthetic data
np.random.seed(0)
C = np.array([[7e9, 2e12], [13e9, 2e12], [34e9, 2e12], [70e9, 2e12]])  # [m, Din] = [m, 2]
e_true, a_true, b_true = np.log(1.8172), np.log(482.01), np.log(2085.43)
alpha_true, beta_true = 0.3478, 0.3658
L_target = scaling_law(C, e_true, a_true, b_true, alpha_true, beta_true).reshape(-1, 1)  # [m, K]
L_target = np.repeat(L_target, 1, axis=1)  # [m , K]

# Define the cost function using the Huber loss
def aggregate_huber_loss(theta_sl, c, l_target, delta=1e-3):
    e, a, b, alpha, beta = theta_sl
    E, A, B = np.exp(e), np.exp(a), np.exp(b)
    l_pred = E + A * c[:, 0] ** (-alpha) + B * c[:, 1] ** (-beta)
    log_l_target = np.log(l_target)
    x1 = a - alpha * np.log(c[:, 0]).reshape(-1, 1)
    x2 = b - beta * np.log(c[:, 1]).reshape(-1, 1)
    x3 = e * np.ones((c.shape[0], 1))
    lse = scipy.special.logsumexp([x1, x2, x3], axis=0)
    h = scipy.special.huber(delta, lse - log_l_target)
    return h.sum()

# Training function to run each trial
def train():
    wandb.init()
    config = wandb.config

    initial_params = [config.e, config.a, config.b, config.alpha, config.beta]

    # Perform the optimization
    result = opt.minimize(aggregate_huber_loss, initial_params, args=(C, L_target), method='BFGS')
    optimized_params = result.x

    e_opt, a_opt, b_opt, alpha_opt, beta_opt = optimized_params
    loss = aggregate_huber_loss(optimized_params, C, L_target)

    wandb.log({
        "e": e_opt,
        "a": a_opt,
        "b": b_opt,
        "alpha": alpha_opt,
        "beta": beta_opt,
        "loss": loss
    })

# Sweep configuration for grid search
sweep_config = {
    "method": "grid",
    "metric": {
        "name": "loss",
        "goal": "minimize"
    },
    "parameters": {
        "e": {
            "values": [-1, 0, 1]
        },
        "a": {
            "values": [0, 5, 10]
        },
        "b": {
            "values": [0, 5, 10]
        },
        "alpha": {
            "values": [0, 1, 2]
        },
        "beta": {
            "values": [0, 1, 2]
        }
    }
}

# Initialize the sweep
sweep_id = wandb.sweep(sweep_config, project="scaling-law-optimization")

# Print the sweep URL and ID
print(f"Sweep URL: https://wandb.ai/{wandb.run.entity}/{wandb.run.project}/sweeps/{sweep_id}")
print(f"Sweep ID: {sweep_id}")

# Run the sweep
# wandb.agent(sweep_id, function=train, count=10)  # only tries 10 out og G^5 sweeps
wandb.agent(sweep_id, function=train)  # tries all G^5 hps! Sweeps them all!

我的理解是,只要代理正在运行,它就会不断从服务器扫描中获取 hps,直到用于此扫描的服务器(您的 wandb 站点/帐户)耗尽。有些扫描(例如随机扫描和贝叶斯扫描)可以永远运行!所以计数在这里很重要(或手动杀死它)。

2 多重处理示例

我认为主要的想法是意识到当你创建一个扫描(在Python或CLI中)时,获取hps/Trials来尝试的进程是代理。所以我建议对代理进行并行化,例如

import numpy as np
import scipy.optimize as opt
import scipy.special
import wandb
from multiprocessing import Process, cpu_count

# Define the synthetic scaling law function
def scaling_law(c, e, a, b, alpha, beta):
    return np.exp(e) + np.exp(a) * c[:, 0] ** (-alpha) + np.exp(b) * c[:, 1] ** (-beta)

# Generate synthetic data
np.random.seed(0)
C = np.array([[7e9, 2e12], [13e9, 2e12], [34e9, 2e12], [70e9, 2e12]])  # [m, Din] = [m, 2]
e_true, a_true, b_true = np.log(1.8172), np.log(482.01), np.log(2085.43)
alpha_true, beta_true = 0.3478, 0.3658
L_target = scaling_law(C, e_true, a_true, b_true, alpha_true, beta_true).reshape(-1, 1)  # [m, K]
L_target = np.repeat(L_target, 1, axis=1)  # [m , K]

# Define the cost function using the Huber loss
def aggregate_huber_loss(theta_sl, c, l_target, delta=1e-3):
    e, a, b, alpha, beta = theta_sl
    E, A, B = np.exp(e), np.exp(a), np.exp(b)
    l_pred = E + A * c[:, 0] ** (-alpha) + B * c[:, 1] ** (-beta)
    log_l_target = np.log(l_target)
    x1 = a - alpha * np.log(c[:, 0]).reshape(-1, 1)
    x2 = b - beta * np.log(c[:, 1]).reshape(-1, 1)
    x3 = e * np.ones((c.shape[0], 1))
    lse = scipy.special.logsumexp([x1, x2, x3], axis=0)
    h = scipy.special.huber(delta, lse - log_l_target)
    return h.sum()

# Training function to run each trial
def train():
    wandb.init()
    config = wandb.config

    initial_params = [config.e, config.a, config.b, config.alpha, config.beta]

    # Perform the optimization
    result = opt.minimize(aggregate_huber_loss, initial_params, args=(C, L_target), method='BFGS')
    optimized_params = result.x

    e_opt, a_opt, b_opt, alpha_opt, beta_opt = optimized_params
    loss = aggregate_huber_loss(optimized_params, C, L_target)

    wandb.log({
        "e": e_opt,
        "a": a_opt,
        "b": b_opt,
        "alpha": alpha_opt,
        "beta": beta_opt,
        "loss": loss
    })

# Sweep configuration for grid search
sweep_config = {
    "method": "grid",
    "metric": {
        "name": "loss",
        "goal": "minimize"
    },
    "parameters": {
        "e": {
            "values": [-1, 0, 1]
        },
        "a": {
            "values": [0, 5, 10]
        },
        "b": {
            "values": [0, 5, 10]
        },
        "alpha": {
            "values": [0, 1, 2]
        },
        "beta": {
            "values": [0, 1, 2]
        }
    }
}

# Initialize the sweep
sweep_id = wandb.sweep(sweep_config, project="scaling-law-optimization")

# Print the sweep URL and ID
print(f"Sweep URL: https://wandb.ai/{wandb.run.entity}/{wandb.run.project}/sweeps/{sweep_id}")
print(f"Sweep ID: {sweep_id}")

# Function to run an agent
def run_agent():
    # wandb.agent(sweep_id, function=train, count=10)  # runs subset 10 <= G^5 sweeps
    wandb.agent(sweep_id, function=train)  # keeps fetching hps until all hps in sweep are done. All G^5

# Number of agents to run in parallel
num_agents = min(cpu_count(), 72)  # Adjust this number based on your system

if __name__ == "__main__":
    processes = []
    for _ in range(num_agents):
        p = Process(target=run_agent)
        p.start()
        processes.append(p)
    
    for p in processes:
        p.join()
    print('Done!\a')

使用 CLI/bash 的 TODO 示例

© www.soinside.com 2019 - 2024. All rights reserved.