(超参数)使用 Ray 进行调整会引发酸洗错误

问题描述 投票:0回答:1

我正在尝试使用 Ray 来调整我的 Pytorch 模型。当我尝试运行

tune.run
函数时,它会抛出错误
PicklingError: Can't pickle <cyfunction LocalFileSystem._reconstruct at 0x7f7d0c8ce4d0>: it's not the same object as pyarrow._fs.LocalFileSystem._reconstruct
。调优的代码主要基于pytorchs教程here。我在教程中导入
from ray.air import Checkpoint
时遇到了问题,所以我将其切换为
from ray.train import Checkpoint

这是完整的错误消息:

2023-12-30 20:12:57,961 WARNING tune_controller.py:743 -- Trial controller checkpointing failed: Can't pickle <cyfunction LocalFileSystem._reconstruct at 0x7f7d0c8ce4d0>: it's not the same object as pyarrow._fs.LocalFileSystem._reconstruct
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
File /usr/local/lib/python3.10/dist-packages/ray/tune/utils/serialization.py:19, in TuneFunctionEncoder.default(self, obj)
     18 try:
---> 19     return super(TuneFunctionEncoder, self).default(obj)
     20 except Exception:

File /usr/lib/python3.10/json/encoder.py:179, in JSONEncoder.default(self, o)
    161 """Implement this method in a subclass such that it returns
    162 a serializable object for ``o``, or calls the base implementation
    163 (to raise a ``TypeError``).
   (...)
    177 
    178 """
--> 179 raise TypeError(f'Object of type {o.__class__.__name__} '
    180                 f'is not JSON serializable')

TypeError: Object of type StorageContext is not JSON serializable

During handling of the above exception, another exception occurred:

PicklingError                             Traceback (most recent call last)
Cell In[97], line 16
     14 # ...
     15 data_dir = os.path.abspath(os. getcwd())
---> 16 result = tune.run(
     17     train_ray,
     18     resources_per_trial={"gpu": gpus_per_trial},
     19     num_samples=10,
     20     scheduler=scheduler,
     21     keep_checkpoints_num=10
     22     )
     23 best_trial = result.get_best_trial("loss", "min", "last")
     24 print(f"Best trial config: {best_trial.config}")

File /usr/local/lib/python3.10/dist-packages/ray/tune/tune.py:1002, in run(run_or_experiment, name, metric, mode, stop, time_budget_s, config, resources_per_trial, num_samples, storage_path, storage_filesystem, search_alg, scheduler, checkpoint_config, verbose, progress_reporter, log_to_file, trial_name_creator, trial_dirname_creator, sync_config, export_formats, max_failures, fail_fast, restore, resume, reuse_actors, raise_on_failed_trial, callbacks, max_concurrent_trials, keep_checkpoints_num, checkpoint_score_attr, checkpoint_freq, checkpoint_at_end, chdir_to_trial_dir, local_dir, _remote, _remote_string_queue, _entrypoint)
    991         pass
    992     else:
    993         logger.warning(
    994             "Tune detects GPUs, but no trials are using GPUs. "
    995             "To enable trials to use GPUs, wrap `train_func` with "
    996             "`tune.with_resources(train_func, resources_per_trial={'gpu': 1})` "
    997             "which allows Tune to expose 1 GPU to each trial. "
    998             "For Ray AIR Trainers, you can specify GPU resources "
    999             "through `ScalingConfig(use_gpu=True)`. "
   1000             "You can also override "
   1001             "`Trainable.default_resource_request` if using the "
-> 1002             "Trainable API."
   1003         )
   1005 experiment_interrupted_event = _setup_signal_catching()
   1007 if progress_reporter and air_verbosity is not None:

File /usr/local/lib/python3.10/dist-packages/ray/tune/execution/tune_controller.py:744, in TuneController.step(self)
    742 except Exception as e:
    743     logger.warning(f"Trial controller checkpointing failed: {str(e)}")
--> 744     raise e
    746 self._iteration += 1
    748 with warn_if_slow("on_step_end"):

File /usr/local/lib/python3.10/dist-packages/ray/tune/execution/tune_controller.py:741, in TuneController.step(self)
    739 # Maybe save experiment state
    740 try:
--> 741     self.checkpoint()
    742 except Exception as e:
    743     logger.warning(f"Trial controller checkpointing failed: {str(e)}")

File /usr/local/lib/python3.10/dist-packages/ray/tune/execution/tune_controller.py:478, in TuneController.checkpoint(self, force, wait)
    452 """Saves execution state to the local experiment path.
    453 
    454 Overwrites the current session checkpoint, which starts when self
   (...)
    463 
    464 """
    465 with warn_if_slow(
    466     "experiment_checkpoint",
    467     message="Checkpointing the experiment state took "
   (...)
    476     disable=self._checkpoint_manager.auto_checkpoint_enabled or force or wait,
    477 ):
--> 478     self._checkpoint_manager.checkpoint(
    479         save_fn=self.save_to_dir, force=force, wait=wait
    480     )

File /usr/local/lib/python3.10/dist-packages/ray/tune/execution/experiment_state.py:224, in _ExperimentCheckpointManager.checkpoint(self, save_fn, force, wait)
    218 # NOTE: This context manager is for Datasets captured in a trial config.
    219 # This is the case when *tuning over datasets*.
    220 # If the datasets have already been full executed, then serializing
    221 # block refs means that this checkpoint is not usable in a new Ray cluster.
    222 # This context will serialize the dataset execution plan instead, if available.
    223 with out_of_band_serialize_dataset():
--> 224     save_fn()
    226 # Sync to cloud
    227 self.sync_up(force=force, wait=wait)

File /usr/local/lib/python3.10/dist-packages/ray/tune/execution/tune_controller.py:355, in TuneController.save_to_dir(self)
    350 experiment_dir = self._storage.experiment_local_path
    352 # Get state from trial executor and runner
    353 runner_state = {
    354     # Trials
--> 355     "trial_data": list(self._get_trial_checkpoints().values()),
    356     # Experiment data
    357     "runner_data": self.__getstate__(),
    358     # Metadata
    359     "stats": {
    360         "start_time": self._start_time,
    361         "timestamp": self._last_checkpoint_time,
    362     },
    363 }
    365 tmp_file_name = os.path.join(
    366     experiment_dir, f".tmp_experiment_state_{uuid.uuid4()}"
    367 )
    369 with open(tmp_file_name, "w") as f:

File /usr/local/lib/python3.10/dist-packages/ray/tune/execution/tune_controller.py:803, in TuneController._get_trial_checkpoints(self)
    801 def _get_trial_checkpoints(self) -> Dict[str, str]:
    802     for trial in self._trials_to_cache:
--> 803         self._trial_metadata[trial.trial_id] = trial.get_json_state()
    804     self._trials_to_cache.clear()
    805     return self._trial_metadata

File /usr/local/lib/python3.10/dist-packages/ray/tune/experiment/trial.py:964, in Trial.get_json_state(self)
    962     state = self.__getstate__()
    963     state.pop("run_metadata", None)
--> 964     self._state_json = json.dumps(state, indent=2, cls=TuneFunctionEncoder)
    966 runtime_metadata_json = self.run_metadata.get_json_state()
    968 return self._state_json, runtime_metadata_json

File /usr/lib/python3.10/json/__init__.py:238, in dumps(obj, skipkeys, ensure_ascii, check_circular, allow_nan, cls, indent, separators, default, sort_keys, **kw)
    232 if cls is None:
    233     cls = JSONEncoder
    234 return cls(
    235     skipkeys=skipkeys, ensure_ascii=ensure_ascii,
    236     check_circular=check_circular, allow_nan=allow_nan, indent=indent,
    237     separators=separators, default=default, sort_keys=sort_keys,
--> 238     **kw).encode(obj)

File /usr/lib/python3.10/json/encoder.py:201, in JSONEncoder.encode(self, o)
    199 chunks = self.iterencode(o, _one_shot=True)
    200 if not isinstance(chunks, (list, tuple)):
--> 201     chunks = list(chunks)
    202 return ''.join(chunks)

File /usr/lib/python3.10/json/encoder.py:431, in _make_iterencode.<locals>._iterencode(o, _current_indent_level)
    429     yield from _iterencode_list(o, _current_indent_level)
    430 elif isinstance(o, dict):
--> 431     yield from _iterencode_dict(o, _current_indent_level)
    432 else:
    433     if markers is not None:

File /usr/lib/python3.10/json/encoder.py:405, in _make_iterencode.<locals>._iterencode_dict(dct, _current_indent_level)
    403         else:
    404             chunks = _iterencode(value, _current_indent_level)
--> 405         yield from chunks
    406 if newline_indent is not None:
    407     _current_indent_level -= 1

File /usr/lib/python3.10/json/encoder.py:438, in _make_iterencode.<locals>._iterencode(o, _current_indent_level)
    436         raise ValueError("Circular reference detected")
    437     markers[markerid] = o
--> 438 o = _default(o)
    439 yield from _iterencode(o, _current_indent_level)
    440 if markers is not None:

File /usr/local/lib/python3.10/dist-packages/ray/tune/utils/serialization.py:23, in TuneFunctionEncoder.default(self, obj)
     21 if log_once(f"tune_func_encode:{str(obj)}"):
     22     logger.debug("Unable to encode. Falling back to cloudpickle.")
---> 23 return self._to_cloudpickle(obj)

File /usr/local/lib/python3.10/dist-packages/ray/tune/utils/serialization.py:28, in TuneFunctionEncoder._to_cloudpickle(self, obj)
     25 def _to_cloudpickle(self, obj):
     26     return {
     27         "_type": "CLOUDPICKLE_FALLBACK",
---> 28         "value": binary_to_hex(cloudpickle.dumps(obj)),
     29     }

File /usr/local/lib/python3.10/dist-packages/ray/cloudpickle/cloudpickle_fast.py:88, in dumps(obj, protocol, buffer_callback)
     86 with io.BytesIO() as file:
     87     cp = CloudPickler(file, protocol=protocol, buffer_callback=buffer_callback)
---> 88     cp.dump(obj)
     89     return file.getvalue()

File /usr/local/lib/python3.10/dist-packages/ray/cloudpickle/cloudpickle_fast.py:733, in CloudPickler.dump(self, obj)
    731 def dump(self, obj):
    732     try:
--> 733         return Pickler.dump(self, obj)
    734     except RuntimeError as e:
    735         if "recursion" in e.args[0]:

PicklingError: Can't pickle <cyfunction LocalFileSystem._reconstruct at 0x7f7d0c8ce4d0>: it's not the same object as pyarrow._fs.LocalFileSystem._reconstruc

这是我的模型:

class TransformerClassifier(nn.Module):
    def __init__(self, num_features=24, num_classes=3, heads=8):
        super().__init__()

        self.conv_backbone = nn.Sequential(
            nn.Conv1d(num_features, 128, 10, stride=8, bias=False),
            nn.BatchNorm1d(128),
            nn.LeakyReLU(),
            nn.Conv1d(128, 256, 10, stride=8, bias=False),
            nn.BatchNorm1d(256),
            nn.LeakyReLU(),
        )

        self.transformer = nn.TransformerEncoder(
                                nn.TransformerEncoderLayer(256, nhead=heads, dim_feedforward=1024, batch_first=True),
                                num_layers=6)

        self.classifier = nn.Linear(256, num_classes)

    def forward(self, x):
        x = x.float()
        # convolutional feature extractor
        x = self.conv_backbone(x)

        # transformer encoder
        x = torch.transpose(x, 1, 2)
        x = self.transformer(x)

        # linear classifier
        x = x.mean(1)
        x = self.classifier(x)

        return x

这是我调整模型的代码:

from functools import partial
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import random_split
from ray import tune
import ray.util
import ray.air
from ray.train import Checkpoint
from ray.air import session
from ray.tune.schedulers import ASHAScheduler


def train_ray(config):
    use_weighting = True
    best_f1s = []
    class_f1s = []
    all_accs = []

    batch_size = int(config["batch_size"])
    tr_label = open('labelTrain.pickle', 'rb')
    train_label  = pickle.load(tr_label)

    tr_pickle= open('train_d.pickle', "rb")
    train_d = pickle.load(tr_pickle)

    vl_label = open('labelVal.pickle', 'rb')
    val_label  = pickle.load(vl_label)
    print(np.unique(val_label))

    vl_pickle= open('val_d.pickle', "rb")
    val_d = pickle.load(vl_pickle)

    t_label = open('labelTest.pickle', 'rb')
    test_label  = pickle.load(t_label)

    t_pickle= open('test_d.pickle', "rb")
    test_d = pickle.load(t_pickle)
    dataset_train =torch.utils.data.TensorDataset(train_d, train_label)
    dataset_test = torch.utils.data.TensorDataset(test_d, test_label)
    dataset_val = torch.utils.data.TensorDataset(val_d, val_label)

    train_dataloader = torch.utils.data.DataLoader(dataset_train, batch_size)
    val_dataloader = torch.utils.data.DataLoader(dataset_val, batch_size)
    test_dataloader = torch.utils.data.DataLoader(dataset_test, batch_size)

    train_labels = train_label 
    
    # Calculate Weights for Loss
    weights = None
    if use_weighting:
        x = torch.reciprocal(torch.bincount(torch.tensor(train_labels.long()))).float()**0.25
        x/=x.mean()
        weights = x
    
    # Create Model
    model = TransformerClassifier(num_features=24, num_classes=3, heads=config["heads"])
    
    checkpoint = session.get_checkpoint()

    if checkpoint:
        checkpoint_state = checkpoint.to_dict()
        start_epoch = checkpoint_state["epoch"]
        model.load_state_dict(checkpoint_state["model_state_dict"])
        optimizer.load_state_dict(checkpoint_state["optimizer_state_dict"])
    else:
        start_epoch = 0

    epoch =int(config["epoch"])
    lr=config["lr"]

    for epoch in range(start_epoch,10):
        train_f1, train_acc, f1, loss, optimizer = train_optuna(model, epoch, train_dataloader, val_dataloader,test_dataloader, lr_decay=0.98, lr=lr,device="cuda",weights=weights)

        checkpoint_data = {
            "epoch": epoch,
            "net_state_dict": model.state_dict(),
            "optimizer_state_dict": optimizer.state_dict(),
        }
        checkpoint = Checkpoint.from_dict(checkpoint_data)

        session.report(
            {"loss": loss, "accuracy": train_acc, "f1_score": f1},
            checkpoint=checkpoint,
        )
    print("Finished Training")

config = {
    "heads":tune.choice([8,16]),
    "lr": tune.loguniform(1e-6, 1e-3),
    "batch_size": tune.choice([16, 32, 8])
}
scheduler = ASHAScheduler(
        metric="loss",
        mode="min",
        max_t=20,
        grace_period=1,
        reduction_factor=2,
    )
gpus_per_trial = 1
data_dir = os.path.abspath(os. getcwd())
result = tune.run(
    train_ray,
    resources_per_trial={"gpu": gpus_per_trial},
    num_samples=10,
    scheduler=scheduler,
    keep_checkpoints_num=8
    )

根据 Pytorch 教程,我应该使用

checkpoint_at_end=True
。然而,这也给了我一个错误。我正在 Docker 容器中的 jupyter 笔记本中运行代码。 docker 容器连接到一个卷。

老实说我没有收到错误消息。我将不胜感激任何形式的帮助。

pytorch pickle ray ray-tune
1个回答
0
投票

我认为关键问题在于错误消息上说 StorageContext 不是 JSON 可序列化?

© www.soinside.com 2019 - 2024. All rights reserved.