我正在尝试使用 Ray 来调整我的 Pytorch 模型。当我尝试运行
tune.run
函数时,它会抛出错误 PicklingError: Can't pickle <cyfunction LocalFileSystem._reconstruct at 0x7f7d0c8ce4d0>: it's not the same object as pyarrow._fs.LocalFileSystem._reconstruct
。调优的代码主要基于pytorchs教程here。我在教程中导入 from ray.air import Checkpoint
时遇到了问题,所以我将其切换为 from ray.train import Checkpoint
这是完整的错误消息:
2023-12-30 20:12:57,961 WARNING tune_controller.py:743 -- Trial controller checkpointing failed: Can't pickle <cyfunction LocalFileSystem._reconstruct at 0x7f7d0c8ce4d0>: it's not the same object as pyarrow._fs.LocalFileSystem._reconstruct
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
File /usr/local/lib/python3.10/dist-packages/ray/tune/utils/serialization.py:19, in TuneFunctionEncoder.default(self, obj)
18 try:
---> 19 return super(TuneFunctionEncoder, self).default(obj)
20 except Exception:
File /usr/lib/python3.10/json/encoder.py:179, in JSONEncoder.default(self, o)
161 """Implement this method in a subclass such that it returns
162 a serializable object for ``o``, or calls the base implementation
163 (to raise a ``TypeError``).
(...)
177
178 """
--> 179 raise TypeError(f'Object of type {o.__class__.__name__} '
180 f'is not JSON serializable')
TypeError: Object of type StorageContext is not JSON serializable
During handling of the above exception, another exception occurred:
PicklingError Traceback (most recent call last)
Cell In[97], line 16
14 # ...
15 data_dir = os.path.abspath(os. getcwd())
---> 16 result = tune.run(
17 train_ray,
18 resources_per_trial={"gpu": gpus_per_trial},
19 num_samples=10,
20 scheduler=scheduler,
21 keep_checkpoints_num=10
22 )
23 best_trial = result.get_best_trial("loss", "min", "last")
24 print(f"Best trial config: {best_trial.config}")
File /usr/local/lib/python3.10/dist-packages/ray/tune/tune.py:1002, in run(run_or_experiment, name, metric, mode, stop, time_budget_s, config, resources_per_trial, num_samples, storage_path, storage_filesystem, search_alg, scheduler, checkpoint_config, verbose, progress_reporter, log_to_file, trial_name_creator, trial_dirname_creator, sync_config, export_formats, max_failures, fail_fast, restore, resume, reuse_actors, raise_on_failed_trial, callbacks, max_concurrent_trials, keep_checkpoints_num, checkpoint_score_attr, checkpoint_freq, checkpoint_at_end, chdir_to_trial_dir, local_dir, _remote, _remote_string_queue, _entrypoint)
991 pass
992 else:
993 logger.warning(
994 "Tune detects GPUs, but no trials are using GPUs. "
995 "To enable trials to use GPUs, wrap `train_func` with "
996 "`tune.with_resources(train_func, resources_per_trial={'gpu': 1})` "
997 "which allows Tune to expose 1 GPU to each trial. "
998 "For Ray AIR Trainers, you can specify GPU resources "
999 "through `ScalingConfig(use_gpu=True)`. "
1000 "You can also override "
1001 "`Trainable.default_resource_request` if using the "
-> 1002 "Trainable API."
1003 )
1005 experiment_interrupted_event = _setup_signal_catching()
1007 if progress_reporter and air_verbosity is not None:
File /usr/local/lib/python3.10/dist-packages/ray/tune/execution/tune_controller.py:744, in TuneController.step(self)
742 except Exception as e:
743 logger.warning(f"Trial controller checkpointing failed: {str(e)}")
--> 744 raise e
746 self._iteration += 1
748 with warn_if_slow("on_step_end"):
File /usr/local/lib/python3.10/dist-packages/ray/tune/execution/tune_controller.py:741, in TuneController.step(self)
739 # Maybe save experiment state
740 try:
--> 741 self.checkpoint()
742 except Exception as e:
743 logger.warning(f"Trial controller checkpointing failed: {str(e)}")
File /usr/local/lib/python3.10/dist-packages/ray/tune/execution/tune_controller.py:478, in TuneController.checkpoint(self, force, wait)
452 """Saves execution state to the local experiment path.
453
454 Overwrites the current session checkpoint, which starts when self
(...)
463
464 """
465 with warn_if_slow(
466 "experiment_checkpoint",
467 message="Checkpointing the experiment state took "
(...)
476 disable=self._checkpoint_manager.auto_checkpoint_enabled or force or wait,
477 ):
--> 478 self._checkpoint_manager.checkpoint(
479 save_fn=self.save_to_dir, force=force, wait=wait
480 )
File /usr/local/lib/python3.10/dist-packages/ray/tune/execution/experiment_state.py:224, in _ExperimentCheckpointManager.checkpoint(self, save_fn, force, wait)
218 # NOTE: This context manager is for Datasets captured in a trial config.
219 # This is the case when *tuning over datasets*.
220 # If the datasets have already been full executed, then serializing
221 # block refs means that this checkpoint is not usable in a new Ray cluster.
222 # This context will serialize the dataset execution plan instead, if available.
223 with out_of_band_serialize_dataset():
--> 224 save_fn()
226 # Sync to cloud
227 self.sync_up(force=force, wait=wait)
File /usr/local/lib/python3.10/dist-packages/ray/tune/execution/tune_controller.py:355, in TuneController.save_to_dir(self)
350 experiment_dir = self._storage.experiment_local_path
352 # Get state from trial executor and runner
353 runner_state = {
354 # Trials
--> 355 "trial_data": list(self._get_trial_checkpoints().values()),
356 # Experiment data
357 "runner_data": self.__getstate__(),
358 # Metadata
359 "stats": {
360 "start_time": self._start_time,
361 "timestamp": self._last_checkpoint_time,
362 },
363 }
365 tmp_file_name = os.path.join(
366 experiment_dir, f".tmp_experiment_state_{uuid.uuid4()}"
367 )
369 with open(tmp_file_name, "w") as f:
File /usr/local/lib/python3.10/dist-packages/ray/tune/execution/tune_controller.py:803, in TuneController._get_trial_checkpoints(self)
801 def _get_trial_checkpoints(self) -> Dict[str, str]:
802 for trial in self._trials_to_cache:
--> 803 self._trial_metadata[trial.trial_id] = trial.get_json_state()
804 self._trials_to_cache.clear()
805 return self._trial_metadata
File /usr/local/lib/python3.10/dist-packages/ray/tune/experiment/trial.py:964, in Trial.get_json_state(self)
962 state = self.__getstate__()
963 state.pop("run_metadata", None)
--> 964 self._state_json = json.dumps(state, indent=2, cls=TuneFunctionEncoder)
966 runtime_metadata_json = self.run_metadata.get_json_state()
968 return self._state_json, runtime_metadata_json
File /usr/lib/python3.10/json/__init__.py:238, in dumps(obj, skipkeys, ensure_ascii, check_circular, allow_nan, cls, indent, separators, default, sort_keys, **kw)
232 if cls is None:
233 cls = JSONEncoder
234 return cls(
235 skipkeys=skipkeys, ensure_ascii=ensure_ascii,
236 check_circular=check_circular, allow_nan=allow_nan, indent=indent,
237 separators=separators, default=default, sort_keys=sort_keys,
--> 238 **kw).encode(obj)
File /usr/lib/python3.10/json/encoder.py:201, in JSONEncoder.encode(self, o)
199 chunks = self.iterencode(o, _one_shot=True)
200 if not isinstance(chunks, (list, tuple)):
--> 201 chunks = list(chunks)
202 return ''.join(chunks)
File /usr/lib/python3.10/json/encoder.py:431, in _make_iterencode.<locals>._iterencode(o, _current_indent_level)
429 yield from _iterencode_list(o, _current_indent_level)
430 elif isinstance(o, dict):
--> 431 yield from _iterencode_dict(o, _current_indent_level)
432 else:
433 if markers is not None:
File /usr/lib/python3.10/json/encoder.py:405, in _make_iterencode.<locals>._iterencode_dict(dct, _current_indent_level)
403 else:
404 chunks = _iterencode(value, _current_indent_level)
--> 405 yield from chunks
406 if newline_indent is not None:
407 _current_indent_level -= 1
File /usr/lib/python3.10/json/encoder.py:438, in _make_iterencode.<locals>._iterencode(o, _current_indent_level)
436 raise ValueError("Circular reference detected")
437 markers[markerid] = o
--> 438 o = _default(o)
439 yield from _iterencode(o, _current_indent_level)
440 if markers is not None:
File /usr/local/lib/python3.10/dist-packages/ray/tune/utils/serialization.py:23, in TuneFunctionEncoder.default(self, obj)
21 if log_once(f"tune_func_encode:{str(obj)}"):
22 logger.debug("Unable to encode. Falling back to cloudpickle.")
---> 23 return self._to_cloudpickle(obj)
File /usr/local/lib/python3.10/dist-packages/ray/tune/utils/serialization.py:28, in TuneFunctionEncoder._to_cloudpickle(self, obj)
25 def _to_cloudpickle(self, obj):
26 return {
27 "_type": "CLOUDPICKLE_FALLBACK",
---> 28 "value": binary_to_hex(cloudpickle.dumps(obj)),
29 }
File /usr/local/lib/python3.10/dist-packages/ray/cloudpickle/cloudpickle_fast.py:88, in dumps(obj, protocol, buffer_callback)
86 with io.BytesIO() as file:
87 cp = CloudPickler(file, protocol=protocol, buffer_callback=buffer_callback)
---> 88 cp.dump(obj)
89 return file.getvalue()
File /usr/local/lib/python3.10/dist-packages/ray/cloudpickle/cloudpickle_fast.py:733, in CloudPickler.dump(self, obj)
731 def dump(self, obj):
732 try:
--> 733 return Pickler.dump(self, obj)
734 except RuntimeError as e:
735 if "recursion" in e.args[0]:
PicklingError: Can't pickle <cyfunction LocalFileSystem._reconstruct at 0x7f7d0c8ce4d0>: it's not the same object as pyarrow._fs.LocalFileSystem._reconstruc
这是我的模型:
class TransformerClassifier(nn.Module):
def __init__(self, num_features=24, num_classes=3, heads=8):
super().__init__()
self.conv_backbone = nn.Sequential(
nn.Conv1d(num_features, 128, 10, stride=8, bias=False),
nn.BatchNorm1d(128),
nn.LeakyReLU(),
nn.Conv1d(128, 256, 10, stride=8, bias=False),
nn.BatchNorm1d(256),
nn.LeakyReLU(),
)
self.transformer = nn.TransformerEncoder(
nn.TransformerEncoderLayer(256, nhead=heads, dim_feedforward=1024, batch_first=True),
num_layers=6)
self.classifier = nn.Linear(256, num_classes)
def forward(self, x):
x = x.float()
# convolutional feature extractor
x = self.conv_backbone(x)
# transformer encoder
x = torch.transpose(x, 1, 2)
x = self.transformer(x)
# linear classifier
x = x.mean(1)
x = self.classifier(x)
return x
这是我调整模型的代码:
from functools import partial
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import random_split
from ray import tune
import ray.util
import ray.air
from ray.train import Checkpoint
from ray.air import session
from ray.tune.schedulers import ASHAScheduler
def train_ray(config):
use_weighting = True
best_f1s = []
class_f1s = []
all_accs = []
batch_size = int(config["batch_size"])
tr_label = open('labelTrain.pickle', 'rb')
train_label = pickle.load(tr_label)
tr_pickle= open('train_d.pickle', "rb")
train_d = pickle.load(tr_pickle)
vl_label = open('labelVal.pickle', 'rb')
val_label = pickle.load(vl_label)
print(np.unique(val_label))
vl_pickle= open('val_d.pickle', "rb")
val_d = pickle.load(vl_pickle)
t_label = open('labelTest.pickle', 'rb')
test_label = pickle.load(t_label)
t_pickle= open('test_d.pickle', "rb")
test_d = pickle.load(t_pickle)
dataset_train =torch.utils.data.TensorDataset(train_d, train_label)
dataset_test = torch.utils.data.TensorDataset(test_d, test_label)
dataset_val = torch.utils.data.TensorDataset(val_d, val_label)
train_dataloader = torch.utils.data.DataLoader(dataset_train, batch_size)
val_dataloader = torch.utils.data.DataLoader(dataset_val, batch_size)
test_dataloader = torch.utils.data.DataLoader(dataset_test, batch_size)
train_labels = train_label
# Calculate Weights for Loss
weights = None
if use_weighting:
x = torch.reciprocal(torch.bincount(torch.tensor(train_labels.long()))).float()**0.25
x/=x.mean()
weights = x
# Create Model
model = TransformerClassifier(num_features=24, num_classes=3, heads=config["heads"])
checkpoint = session.get_checkpoint()
if checkpoint:
checkpoint_state = checkpoint.to_dict()
start_epoch = checkpoint_state["epoch"]
model.load_state_dict(checkpoint_state["model_state_dict"])
optimizer.load_state_dict(checkpoint_state["optimizer_state_dict"])
else:
start_epoch = 0
epoch =int(config["epoch"])
lr=config["lr"]
for epoch in range(start_epoch,10):
train_f1, train_acc, f1, loss, optimizer = train_optuna(model, epoch, train_dataloader, val_dataloader,test_dataloader, lr_decay=0.98, lr=lr,device="cuda",weights=weights)
checkpoint_data = {
"epoch": epoch,
"net_state_dict": model.state_dict(),
"optimizer_state_dict": optimizer.state_dict(),
}
checkpoint = Checkpoint.from_dict(checkpoint_data)
session.report(
{"loss": loss, "accuracy": train_acc, "f1_score": f1},
checkpoint=checkpoint,
)
print("Finished Training")
config = {
"heads":tune.choice([8,16]),
"lr": tune.loguniform(1e-6, 1e-3),
"batch_size": tune.choice([16, 32, 8])
}
scheduler = ASHAScheduler(
metric="loss",
mode="min",
max_t=20,
grace_period=1,
reduction_factor=2,
)
gpus_per_trial = 1
data_dir = os.path.abspath(os. getcwd())
result = tune.run(
train_ray,
resources_per_trial={"gpu": gpus_per_trial},
num_samples=10,
scheduler=scheduler,
keep_checkpoints_num=8
)
根据 Pytorch 教程,我应该使用
checkpoint_at_end=True
。然而,这也给了我一个错误。我正在 Docker 容器中的 jupyter 笔记本中运行代码。 docker 容器连接到一个卷。
老实说我没有收到错误消息。我将不胜感激任何形式的帮助。
我认为关键问题在于错误消息上说 StorageContext 不是 JSON 可序列化?