我正在尝试使用 Ray 来调整我的 Pytorch 模型。当我尝试运行 函数时,它会抛出错误 PicklingError: Can't pickle 我正在尝试使用 Ray 来调整我的 Pytorch 模型。当我尝试运行 函数时,它会抛出错误 PicklingError: Can't pickle <cyfunction LocalFileSystem._reconstruct at 0x7f7d0c8ce4d0>: it's not the same object as pyarrow._fs.LocalFileSystem._reconstruct。调优的代码主要基于pytorchs教程here。我在教程中导入 from ray.air import Checkpoint 时遇到了问题,所以我将其切换为 from ray.train import Checkpoint 这是完整的错误消息: 2023-12-30 20:12:57,961 WARNING -- Trial controller checkpointing failed: Can't pickle <cyfunction LocalFileSystem._reconstruct at 0x7f7d0c8ce4d0>: it's not the same object as pyarrow._fs.LocalFileSystem._reconstruct --------------------------------------------------------------------------- TypeError Traceback (most recent call last) File /usr/local/lib/python3.10/dist-packages/ray/tune/utils/, in TuneFunctionEncoder.default(self, obj) 18 try: ---> 19 return super(TuneFunctionEncoder, self).default(obj) 20 except Exception: File /usr/lib/python3.10/json/, in JSONEncoder.default(self, o) 161 """Implement this method in a subclass such that it returns 162 a serializable object for ``o``, or calls the base implementation 163 (to raise a ``TypeError``). (...) 177 178 """ --> 179 raise TypeError(f'Object of type {o.__class__.__name__} ' 180 f'is not JSON serializable') TypeError: Object of type StorageContext is not JSON serializable During handling of the above exception, another exception occurred: PicklingError Traceback (most recent call last) Cell In[97], line 16 14 # ... 15 data_dir = os.path.abspath(os. getcwd()) ---> 16 result = 17 train_ray, 18 resources_per_trial={"gpu": gpus_per_trial}, 19 num_samples=10, 20 scheduler=scheduler, 21 keep_checkpoints_num=10 22 ) 23 best_trial = result.get_best_trial("loss", "min", "last") 24 print(f"Best trial config: {best_trial.config}") File /usr/local/lib/python3.10/dist-packages/ray/tune/, in run(run_or_experiment, name, metric, mode, stop, time_budget_s, config, resources_per_trial, num_samples, storage_path, storage_filesystem, search_alg, scheduler, checkpoint_config, verbose, progress_reporter, log_to_file, trial_name_creator, trial_dirname_creator, sync_config, export_formats, max_failures, fail_fast, restore, resume, reuse_actors, raise_on_failed_trial, callbacks, max_concurrent_trials, keep_checkpoints_num, checkpoint_score_attr, checkpoint_freq, checkpoint_at_end, chdir_to_trial_dir, local_dir, _remote, _remote_string_queue, _entrypoint) 991 pass 992 else: 993 logger.warning( 994 "Tune detects GPUs, but no trials are using GPUs. " 995 "To enable trials to use GPUs, wrap `train_func` with " 996 "`tune.with_resources(train_func, resources_per_trial={'gpu': 1})` " 997 "which allows Tune to expose 1 GPU to each trial. " 998 "For Ray AIR Trainers, you can specify GPU resources " 999 "through `ScalingConfig(use_gpu=True)`. " 1000 "You can also override " 1001 "`Trainable.default_resource_request` if using the " -> 1002 "Trainable API." 1003 ) 1005 experiment_interrupted_event = _setup_signal_catching() 1007 if progress_reporter and air_verbosity is not None: File /usr/local/lib/python3.10/dist-packages/ray/tune/execution/, in TuneController.step(self) 742 except Exception as e: 743 logger.warning(f"Trial controller checkpointing failed: {str(e)}") --> 744 raise e 746 self._iteration += 1 748 with warn_if_slow("on_step_end"): File /usr/local/lib/python3.10/dist-packages/ray/tune/execution/, in TuneController.step(self) 739 # Maybe save experiment state 740 try: --> 741 self.checkpoint() 742 except Exception as e: 743 logger.warning(f"Trial controller checkpointing failed: {str(e)}") File /usr/local/lib/python3.10/dist-packages/ray/tune/execution/, in TuneController.checkpoint(self, force, wait) 452 """Saves execution state to the local experiment path. 453 454 Overwrites the current session checkpoint, which starts when self (...) 463 464 """ 465 with warn_if_slow( 466 "experiment_checkpoint", 467 message="Checkpointing the experiment state took " (...) 476 disable=self._checkpoint_manager.auto_checkpoint_enabled or force or wait, 477 ): --> 478 self._checkpoint_manager.checkpoint( 479 save_fn=self.save_to_dir, force=force, wait=wait 480 ) File /usr/local/lib/python3.10/dist-packages/ray/tune/execution/, in _ExperimentCheckpointManager.checkpoint(self, save_fn, force, wait) 218 # NOTE: This context manager is for Datasets captured in a trial config. 219 # This is the case when *tuning over datasets*. 220 # If the datasets have already been full executed, then serializing 221 # block refs means that this checkpoint is not usable in a new Ray cluster. 222 # This context will serialize the dataset execution plan instead, if available. 223 with out_of_band_serialize_dataset(): --> 224 save_fn() 226 # Sync to cloud 227 self.sync_up(force=force, wait=wait) File /usr/local/lib/python3.10/dist-packages/ray/tune/execution/, in TuneController.save_to_dir(self) 350 experiment_dir = self._storage.experiment_local_path 352 # Get state from trial executor and runner 353 runner_state = { 354 # Trials --> 355 "trial_data": list(self._get_trial_checkpoints().values()), 356 # Experiment data 357 "runner_data": self.__getstate__(), 358 # Metadata 359 "stats": { 360 "start_time": self._start_time, 361 "timestamp": self._last_checkpoint_time, 362 }, 363 } 365 tmp_file_name = os.path.join( 366 experiment_dir, f".tmp_experiment_state_{uuid.uuid4()}" 367 ) 369 with open(tmp_file_name, "w") as f: File /usr/local/lib/python3.10/dist-packages/ray/tune/execution/, in TuneController._get_trial_checkpoints(self) 801 def _get_trial_checkpoints(self) -> Dict[str, str]: 802 for trial in self._trials_to_cache: --> 803 self._trial_metadata[trial.trial_id] = trial.get_json_state() 804 self._trials_to_cache.clear() 805 return self._trial_metadata File /usr/local/lib/python3.10/dist-packages/ray/tune/experiment/, in Trial.get_json_state(self) 962 state = self.__getstate__() 963 state.pop("run_metadata", None) --> 964 self._state_json = json.dumps(state, indent=2, cls=TuneFunctionEncoder) 966 runtime_metadata_json = self.run_metadata.get_json_state() 968 return self._state_json, runtime_metadata_json File /usr/lib/python3.10/json/, in dumps(obj, skipkeys, ensure_ascii, check_circular, allow_nan, cls, indent, separators, default, sort_keys, **kw) 232 if cls is None: 233 cls = JSONEncoder 234 return cls( 235 skipkeys=skipkeys, ensure_ascii=ensure_ascii, 236 check_circular=check_circular, allow_nan=allow_nan, indent=indent, 237 separators=separators, default=default, sort_keys=sort_keys, --> 238 **kw).encode(obj) File /usr/lib/python3.10/json/, in JSONEncoder.encode(self, o) 199 chunks = self.iterencode(o, _one_shot=True) 200 if not isinstance(chunks, (list, tuple)): --> 201 chunks = list(chunks) 202 return ''.join(chunks) File /usr/lib/python3.10/json/, in _make_iterencode.<locals>._iterencode(o, _current_indent_level) 429 yield from _iterencode_list(o, _current_indent_level) 430 elif isinstance(o, dict): --> 431 yield from _iterencode_dict(o, _current_indent_level) 432 else: 433 if markers is not None: File /usr/lib/python3.10/json/, in _make_iterencode.<locals>._iterencode_dict(dct, _current_indent_level) 403 else: 404 chunks = _iterencode(value, _current_indent_level) --> 405 yield from chunks 406 if newline_indent is not None: 407 _current_indent_level -= 1 File /usr/lib/python3.10/json/, in _make_iterencode.<locals>._iterencode(o, _current_indent_level) 436 raise ValueError("Circular reference detected") 437 markers[markerid] = o --> 438 o = _default(o) 439 yield from _iterencode(o, _current_indent_level) 440 if markers is not None: File /usr/local/lib/python3.10/dist-packages/ray/tune/utils/, in TuneFunctionEncoder.default(self, obj) 21 if log_once(f"tune_func_encode:{str(obj)}"): 22 logger.debug("Unable to encode. Falling back to cloudpickle.") ---> 23 return self._to_cloudpickle(obj) File /usr/local/lib/python3.10/dist-packages/ray/tune/utils/, in TuneFunctionEncoder._to_cloudpickle(self, obj) 25 def _to_cloudpickle(self, obj): 26 return { 27 "_type": "CLOUDPICKLE_FALLBACK", ---> 28 "value": binary_to_hex(cloudpickle.dumps(obj)), 29 } File /usr/local/lib/python3.10/dist-packages/ray/cloudpickle/, in dumps(obj, protocol, buffer_callback) 86 with io.BytesIO() as file: 87 cp = CloudPickler(file, protocol=protocol, buffer_callback=buffer_callback) ---> 88 cp.dump(obj) 89 return file.getvalue() File /usr/local/lib/python3.10/dist-packages/ray/cloudpickle/, in CloudPickler.dump(self, obj) 731 def dump(self, obj): 732 try: --> 733 return Pickler.dump(self, obj) 734 except RuntimeError as e: 735 if "recursion" in e.args[0]: PicklingError: Can't pickle <cyfunction LocalFileSystem._reconstruct at 0x7f7d0c8ce4d0>: it's not the same object as pyarrow._fs.LocalFileSystem._reconstruc 这是我的模型: class TransformerClassifier(nn.Module): def __init__(self, num_features=24, num_classes=3, heads=8): super().__init__() self.conv_backbone = nn.Sequential( nn.Conv1d(num_features, 128, 10, stride=8, bias=False), nn.BatchNorm1d(128), nn.LeakyReLU(), nn.Conv1d(128, 256, 10, stride=8, bias=False), nn.BatchNorm1d(256), nn.LeakyReLU(), ) self.transformer = nn.TransformerEncoder( nn.TransformerEncoderLayer(256, nhead=heads, dim_feedforward=1024, batch_first=True), num_layers=6) self.classifier = nn.Linear(256, num_classes) def forward(self, x): x = x.float() # convolutional feature extractor x = self.conv_backbone(x) # transformer encoder x = torch.transpose(x, 1, 2) x = self.transformer(x) # linear classifier x = x.mean(1) x = self.classifier(x) return x 这是我调整模型的代码: from functools import partial import os import torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim from import random_split from ray import tune import ray.util import ray.air from ray.train import Checkpoint from ray.air import session from ray.tune.schedulers import ASHAScheduler def train_ray(config): use_weighting = True best_f1s = [] class_f1s = [] all_accs = [] batch_size = int(config["batch_size"]) tr_label = open('labelTrain.pickle', 'rb') train_label = pickle.load(tr_label) tr_pickle= open('train_d.pickle', "rb") train_d = pickle.load(tr_pickle) vl_label = open('labelVal.pickle', 'rb') val_label = pickle.load(vl_label) print(np.unique(val_label)) vl_pickle= open('val_d.pickle', "rb") val_d = pickle.load(vl_pickle) t_label = open('labelTest.pickle', 'rb') test_label = pickle.load(t_label) t_pickle= open('test_d.pickle', "rb") test_d = pickle.load(t_pickle) dataset_train, train_label) dataset_test =, test_label) dataset_val =, val_label) train_dataloader =, batch_size) val_dataloader =, batch_size) test_dataloader =, batch_size) train_labels = train_label # Calculate Weights for Loss weights = None if use_weighting: x = torch.reciprocal(torch.bincount(torch.tensor(train_labels.long()))).float()**0.25 x/=x.mean() weights = x # Create Model model = TransformerClassifier(num_features=24, num_classes=3, heads=config["heads"]) checkpoint = session.get_checkpoint() if checkpoint: checkpoint_state = checkpoint.to_dict() start_epoch = checkpoint_state["epoch"] model.load_state_dict(checkpoint_state["model_state_dict"]) optimizer.load_state_dict(checkpoint_state["optimizer_state_dict"]) else: start_epoch = 0 epoch =int(config["epoch"]) lr=config["lr"] for epoch in range(start_epoch,10): train_f1, train_acc, f1, loss, optimizer = train_optuna(model, epoch, train_dataloader, val_dataloader,test_dataloader, lr_decay=0.98, lr=lr,device="cuda",weights=weights) checkpoint_data = { "epoch": epoch, "net_state_dict": model.state_dict(), "optimizer_state_dict": optimizer.state_dict(), } checkpoint = Checkpoint.from_dict(checkpoint_data) {"loss": loss, "accuracy": train_acc, "f1_score": f1}, checkpoint=checkpoint, ) print("Finished Training") config = { "heads":tune.choice([8,16]), "lr": tune.loguniform(1e-6, 1e-3), "batch_size": tune.choice([16, 32, 8]) } scheduler = ASHAScheduler( metric="loss", mode="min", max_t=20, grace_period=1, reduction_factor=2, ) gpus_per_trial = 1 data_dir = os.path.abspath(os. getcwd()) result = train_ray, resources_per_trial={"gpu": gpus_per_trial}, num_samples=10, scheduler=scheduler, keep_checkpoints_num=8 ) 根据 Pytorch 教程,我应该使用 checkpoint_at_end=True。然而,这也给了我一个错误。我正在 Docker 容器中的 jupyter 笔记本中运行代码。 docker 容器连接到一个卷。 老实说我没有收到错误消息。我将不胜感激任何形式的帮助。 我认为关键问题在于错误消息上说 StorageContext 不是 JSON 可序列化?
