我正在尝试使用权重和偏差 (W&B) 运行超参数扫描,并希望利用多重处理来尽可能并行化我的实验。我想确保每组超参数仅评估一次,并且我可以同时运行多个实验而无需重复超参数。
这是我的培训和评估脚本的简化版本:
import random
import numpy as np
import wandb
from concurrent.futures import ProcessPoolExecutor, as_completed
from typing import Tuple, Dict
def train_one_epoch(epoch: int, lr: float, bs: int) -> Tuple[float, float]:
"""
Simulate training for one epoch.
"""
acc = 0.25 + ((epoch / 30) + (random.random() / 10))
loss = 0.2 + (1 - ((epoch - 1) / 10 + random.random() / 5))
return acc, loss
def evaluate_one_epoch(epoch: int) -> Tuple[float, float]:
"""
Simulate evaluation for one epoch.
"""
acc = 0.1 + ((epoch / 20) + (random.random() / 10))
loss = 0.25 + (1 - ((epoch - 1) / 10 + random.random() / 6))
return acc, loss
config: Dict[str, float] = {"lr": 0.0001, "bs": 16, "epochs": 5}
def run_epoch(epoch: int, lr: float, bs: int) -> Tuple[int, float, float, float, float]:
"""
Run training and evaluation for one epoch.
"""
train_acc, train_loss = train_one_epoch(epoch, lr, bs)
val_acc, val_loss = evaluate_one_epoch(epoch)
return epoch, train_acc, train_loss, val_acc, val_loss
def main() -> None:
"""
Main function to run the training and evaluation in parallel using multiprocessing.
"""
lr = config["lr"]
bs = config["bs"]
epochs = config["epochs"]
# Initialize Weights and Biases
wandb.init(project="my_project", config=config)
with ProcessPoolExecutor() as executor:
futures = [executor.submit(run_epoch, epoch, lr, bs) for epoch in np.arange(1, epochs)]
for future in as_completed(futures):
epoch, train_acc, train_loss, val_acc, val_loss = future.result()
wandb.log({
"epoch": epoch,
"train_acc": train_acc,
"train_loss": train_loss,
"val_acc": val_acc,
"val_loss": val_loss
})
print(f"epoch: {epoch}")
print(f"training accuracy: {train_acc}, training loss: {train_loss}")
print(f"validation accuracy: {val_acc}, validation loss: {val_loss}")
if __name__ == "__main__":
main()
counts
在W&B多处理中的作用:如何正确配置counts
参数(如果存在)或W&B中的任何其他相关参数以最大化并行化而不重复超参数?ProcessPoolExecutor
或任何其他方法来最大化并行运行?我对两者都感兴趣
任何见解、代码示例或对 W&B 文档的引用将不胜感激!
也欢迎对如何在 CLI/Bash 选项中执行此操作进行扩展和评论!
参考资料:
使用权重和偏差 (W&B/wandb) 进行超参数 (hp) 优化时,您可以使用扫描来系统地探索超参数的不同组合,以找到最佳性能集。
W&B 中的扫描允许您定义一组要搜索的超参数。当您在 cli 或 python 中创建扫描时,这会创建一组(例如网格)用于搜索的超参数。这(通常是afaik)使用您的 wandb 帐户/服务器中所有可能的超参数创建一个扫描,稍后当您运行代理时,它会获取一个并尝试该 HP 并将其记录到 wandb。
Afaik,如果您使用网格搜索并且计数高于组合总数,则扫描在运行所有组合后停止。如果计数较低,则仅运行指定次数的试验。
据我所知,只要代理正在运行并且扫描有 hps 可以尝试,它就会不断从您的 wandb 帐户从您的 wandb 扫描服务器获取它们。您可以在您的 wandb 帐户中看到正在运行的代理(并杀死它们、暂停它们)等。
我将提供一个没有多重处理的示例,然后将其设为多重处理(mp):
import numpy as np
import scipy.optimize as opt
import scipy.special
import wandb
# Define the synthetic scaling law function
def scaling_law(c, e, a, b, alpha, beta):
return np.exp(e) + np.exp(a) * c[:, 0] ** (-alpha) + np.exp(b) * c[:, 1] ** (-beta)
# Generate synthetic data
np.random.seed(0)
C = np.array([[7e9, 2e12], [13e9, 2e12], [34e9, 2e12], [70e9, 2e12]]) # [m, Din] = [m, 2]
e_true, a_true, b_true = np.log(1.8172), np.log(482.01), np.log(2085.43)
alpha_true, beta_true = 0.3478, 0.3658
L_target = scaling_law(C, e_true, a_true, b_true, alpha_true, beta_true).reshape(-1, 1) # [m, K]
L_target = np.repeat(L_target, 1, axis=1) # [m , K]
# Define the cost function using the Huber loss
def aggregate_huber_loss(theta_sl, c, l_target, delta=1e-3):
e, a, b, alpha, beta = theta_sl
E, A, B = np.exp(e), np.exp(a), np.exp(b)
l_pred = E + A * c[:, 0] ** (-alpha) + B * c[:, 1] ** (-beta)
log_l_target = np.log(l_target)
x1 = a - alpha * np.log(c[:, 0]).reshape(-1, 1)
x2 = b - beta * np.log(c[:, 1]).reshape(-1, 1)
x3 = e * np.ones((c.shape[0], 1))
lse = scipy.special.logsumexp([x1, x2, x3], axis=0)
h = scipy.special.huber(delta, lse - log_l_target)
return h.sum()
# Training function to run each trial
def train():
wandb.init()
config = wandb.config
initial_params = [config.e, config.a, config.b, config.alpha, config.beta]
# Perform the optimization
result = opt.minimize(aggregate_huber_loss, initial_params, args=(C, L_target), method='BFGS')
optimized_params = result.x
e_opt, a_opt, b_opt, alpha_opt, beta_opt = optimized_params
loss = aggregate_huber_loss(optimized_params, C, L_target)
wandb.log({
"e": e_opt,
"a": a_opt,
"b": b_opt,
"alpha": alpha_opt,
"beta": beta_opt,
"loss": loss
})
# Sweep configuration for grid search
sweep_config = {
"method": "grid",
"metric": {
"name": "loss",
"goal": "minimize"
},
"parameters": {
"e": {
"values": [-1, 0, 1]
},
"a": {
"values": [0, 5, 10]
},
"b": {
"values": [0, 5, 10]
},
"alpha": {
"values": [0, 1, 2]
},
"beta": {
"values": [0, 1, 2]
}
}
}
# Initialize the sweep
sweep_id = wandb.sweep(sweep_config, project="scaling-law-optimization")
# Print the sweep URL and ID
print(f"Sweep URL: https://wandb.ai/{wandb.run.entity}/{wandb.run.project}/sweeps/{sweep_id}")
print(f"Sweep ID: {sweep_id}")
# Run the sweep
# wandb.agent(sweep_id, function=train, count=10) # only tries 10 out og G^5 sweeps
wandb.agent(sweep_id, function=train) # tries all G^5 hps! Sweeps them all!
我的理解是,只要代理正在运行,它就会不断从服务器扫描中获取 hps,直到用于此扫描的服务器(您的 wandb 站点/帐户)耗尽。有些扫描(例如随机扫描和贝叶斯扫描)可以永远运行!所以计数在这里很重要(或手动杀死它)。
我认为主要的想法是意识到当你创建一个扫描(在Python或CLI中)时,获取hps/Trials来尝试的进程是代理。所以我建议对代理进行并行化,例如
import numpy as np
import scipy.optimize as opt
import scipy.special
import wandb
from multiprocessing import Process, cpu_count
# Define the synthetic scaling law function
def scaling_law(c, e, a, b, alpha, beta):
return np.exp(e) + np.exp(a) * c[:, 0] ** (-alpha) + np.exp(b) * c[:, 1] ** (-beta)
# Generate synthetic data
np.random.seed(0)
C = np.array([[7e9, 2e12], [13e9, 2e12], [34e9, 2e12], [70e9, 2e12]]) # [m, Din] = [m, 2]
e_true, a_true, b_true = np.log(1.8172), np.log(482.01), np.log(2085.43)
alpha_true, beta_true = 0.3478, 0.3658
L_target = scaling_law(C, e_true, a_true, b_true, alpha_true, beta_true).reshape(-1, 1) # [m, K]
L_target = np.repeat(L_target, 1, axis=1) # [m , K]
# Define the cost function using the Huber loss
def aggregate_huber_loss(theta_sl, c, l_target, delta=1e-3):
e, a, b, alpha, beta = theta_sl
E, A, B = np.exp(e), np.exp(a), np.exp(b)
l_pred = E + A * c[:, 0] ** (-alpha) + B * c[:, 1] ** (-beta)
log_l_target = np.log(l_target)
x1 = a - alpha * np.log(c[:, 0]).reshape(-1, 1)
x2 = b - beta * np.log(c[:, 1]).reshape(-1, 1)
x3 = e * np.ones((c.shape[0], 1))
lse = scipy.special.logsumexp([x1, x2, x3], axis=0)
h = scipy.special.huber(delta, lse - log_l_target)
return h.sum()
# Training function to run each trial
def train():
wandb.init()
config = wandb.config
initial_params = [config.e, config.a, config.b, config.alpha, config.beta]
# Perform the optimization
result = opt.minimize(aggregate_huber_loss, initial_params, args=(C, L_target), method='BFGS')
optimized_params = result.x
e_opt, a_opt, b_opt, alpha_opt, beta_opt = optimized_params
loss = aggregate_huber_loss(optimized_params, C, L_target)
wandb.log({
"e": e_opt,
"a": a_opt,
"b": b_opt,
"alpha": alpha_opt,
"beta": beta_opt,
"loss": loss
})
# Sweep configuration for grid search
sweep_config = {
"method": "grid",
"metric": {
"name": "loss",
"goal": "minimize"
},
"parameters": {
"e": {
"values": [-1, 0, 1]
},
"a": {
"values": [0, 5, 10]
},
"b": {
"values": [0, 5, 10]
},
"alpha": {
"values": [0, 1, 2]
},
"beta": {
"values": [0, 1, 2]
}
}
}
# Initialize the sweep
sweep_id = wandb.sweep(sweep_config, project="scaling-law-optimization")
# Print the sweep URL and ID
print(f"Sweep URL: https://wandb.ai/{wandb.run.entity}/{wandb.run.project}/sweeps/{sweep_id}")
print(f"Sweep ID: {sweep_id}")
# Function to run an agent
def run_agent():
# wandb.agent(sweep_id, function=train, count=10) # runs subset 10 <= G^5 sweeps
wandb.agent(sweep_id, function=train) # keeps fetching hps until all hps in sweep are done. All G^5
# Number of agents to run in parallel
num_agents = min(cpu_count(), 72) # Adjust this number based on your system
if __name__ == "__main__":
processes = []
for _ in range(num_agents):
p = Process(target=run_agent)
p.start()
processes.append(p)
for p in processes:
p.join()
print('Done!\a')