这是我的脚本,用于预测时间序列数据集最终日期的目标。我正在尝试合并一个
GaussianProcessRegressor
模型,以使用 GridSearchCV
找到最佳超参数:(请注意,此处未明确显示一些代码,包括大多数使用的常量,以避免混乱。)
HelperFunctions.py:
skf = StratifiedKFold(n_splits=17, shuffle=True, random_state=4)
def randomized_search(
model,
distribution,
X_train,
X_validation,
y_train,
y_validation,
) -> None:
try:
randomized_search = RandomizedSearchCV(
model,
distribution,
cv=skf,
return_train_score=True,
n_jobs=-1,
scoring="neg_mean_squared_error",
n_iter=100,
)
try:
search = randomized_search.fit(X_train, y_train)
print(
"Best estimator:\n{} \
\nBest parameters:\n{} \
\nBest cross-validation score: {:.3f} \
\nBest test score: {:.3f}\n\n".format(
search.best_estimator_,
search.best_params_,
-1 * search.best_score_,
-1 * search.score(X_validation, y_validation),
)
)
except Exception:
print("'randomized_search.fit' NOT successful!")
print(traceback.format_exc())
raise
else:
print("'randomized_search.fit' Successful!")
except Exception:
print("'randomized_search' NOT successful!")
print(traceback.format_exc())
raise
else:
print("'randomized_search' successful!")
def doRandomizedSearch(
model,
distribution,
feat_train,
feat_validation,
tgt_train,
tgt_validation,
):
try:
randomized_search(
model,
distribution,
feat_train,
feat_validation,
tgt_train,
tgt_validation,
)
except Exception as e:
print("'doRandomizedSearch' NOT successful!")
raise e
else:
print("'doRandomizedSearch' Successful!")
def model_randomized_search(
model_dist_pairs, feat_train, feat_validation, tgt_train, tgt_validation
):
for model, distribution in model_dist_pairs:
doRandomizedSearch(
model,
distribution,
feat_train,
feat_validation,
tgt_train,
tgt_validation,
)
class CustomOptimizers:
def __init__(self, model, initial_theta, bounds):
self.model = model
self.initial_theta = initial_theta
self.bounds = bounds
def obj_func(self, theta, eval_gradient):
if eval_gradient:
ll, grad = self.model.log_marginal_likelihood(theta, True)
return -ll, -grad
else:
return -self.model.log_marginal_likelihood(theta)
def minimize_wrapper(self, theta, eval_gradient):
return minimize(self.obj_func, theta, args=(eval_gradient), bounds=self.bounds)
def least_squares_wrapper(self, theta, eval_gradient):
return least_squares(
self.obj_func, theta, args=(eval_gradient), bounds=self.bounds
)
def differential_evolution_wrapper(self, theta, eval_gradient):
return differential_evolution(
self.obj_func, theta, args=(eval_gradient), bounds=self.bounds
)
def basinhopping_wrapper(self, theta, eval_gradient):
return basinhopping(
self.obj_func, theta, args=(eval_gradient), bounds=self.bounds
)
def dual_annealing_wrapper(self, theta, eval_gradient):
return dual_annealing(
self.obj_func, theta, args=(eval_gradient), bounds=self.bounds
)
class GPRWithCustomOptimizer(GaussianProcessRegressor):
def __init__(
self,
optimizer="minimize",
initial_theta=None,
bounds=None,
random_state=None,
normalize_y=True,
n_restarts_optimizer=0,
copy_X_train=True,
**kwargs,
):
self.initial_theta = initial_theta
self.bounds = bounds
self.custom_optimizers = CustomOptimizers(self, self.initial_theta, self.bounds)
self.optimizer_func = getattr(self.custom_optimizers, optimizer)
super().__init__(
optimizer=self.optimizer_func,
random_state=random_state,
normalize_y=normalize_y,
n_restarts_optimizer=n_restarts_optimizer,
copy_X_train=copy_X_train,
**kwargs,
)
def fit(self, X, y):
super().fit(X, y)
def intermediate_models(kernel):
dtr_dic = dict(
ccp_alpha=uniform(loc=0.0, scale=10.0),
max_features=randint(low=1, high=100),
max_depth=randint(low=1, high=100),
criterion=["squared_error", "friedman_mse", "absolute_error", "poisson"],
)
optimizer_names = [
"minimize_wrapper",
"least_squares_wrapper",
"differential_evolution_wrapper",
"basinhopping_wrapper",
"dual_annealing_wrapper",
]
model_dist_pairs = []
for optimizer_name in optimizer_names:
gpr = GPRWithCustomOptimizer(kernel=kernel, optimizer=optimizer_name)
gpr_dic = dict(
optimizer=optimizer_names,
n_restarts_optimizer=np.arange(0, 20 + 1),
normalize_y=[False, True],
copy_X_train=[True, False],
random_state=np.arange(0, 10 + 1),
)
model_dist_pairs.append((gpr, gpr_dic))
return [(DecisionTreeRegressor(), dtr_dic)] + model_dist_pairs
def cast2Float64(X_train, X_test, y_train, y_test):
X_train_new = np.nan_to_num(X_train.astype(np.float64))
y_train_new = np.nan_to_num(y_train.astype(np.float64))
X_test_new = np.nan_to_num(X_test.astype(np.float64))
y_test_new = np.nan_to_num(y_test.astype(np.float64))
return [X_train_new, X_test_new, y_train_new, y_test_new]
utilities.py:
from HelperFunctions import (
np,
intermediate_models,
model_randomized_search,
cast2Float64,
)
def initializeKernel(median_distance, data_range):
return ConstantKernel(constant_value_bounds=np.array([[1e-3, 1e3]])) * Matern(
length_scale_bounds=np.array([[1e-3, 1e3]])
) + WhiteKernel(noise_level_bounds=np.array([[1e-3, 1e3]]))
####################################################################
def all_combined_product_cols(df):
cols = list(df.columns)
product_cols = []
for length in range(1, len(cols) + 1):
for combination in combinations(cols, r=length):
combined_col = None
for col in combination:
if combined_col is None:
combined_col = df[col].copy()
else:
combined_col *= df[col]
combined_col.name = "_".join(combination)
product_cols.append(combined_col)
return pd.concat(product_cols, axis=1)
def ensureDataFrameHasName(y, dataframe_name):
if not isinstance(y, pd.DataFrame):
y = pd.DataFrame(y, name=dataframe_name, freq="C", weekmask=weekmask_string)
else:
y.name = dataframe_name
return y.set_axis(pd.to_datetime(y.index)).asfreq(cfreq)
##################################################################################
def model_comparison(original_df):
a = original_df["Result"].to_numpy()
if (a[0] == a).all():
original_df = original_df.drop(columns=["Result"])
bet, train, features, target = """train_features_target(original_df)"""
features_cols = [col for col in list(original_df.columns) if "Score" not in col]
train.dropna(inplace=True)
data = train.values
n_features = len(features_cols)
score_features = data[:, :n_features]
score_target = data[:, n_features]
feat_tgt_tuple = train_test_split(
score_features,
score_target,
test_size=0.33,
random_state=4,
)
feat_train, feat_validation, tgt_train, tgt_validation = feat_tgt_tuple
data_range = np.ptp(feat_train, axis=0)
distances = pdist(feat_train, metric="euclidean")
median_distance = np.median(distances)
values = list(feat_tgt_tuple)
kernel = initializeKernel(median_distance, data_range)
model_dist_pairs = intermediate_models(kernel)
model_randomized_search(model_dist_pairs, *cast2Float64(*values))
score.py:
from utilities import model_comparison
###############################################################################
def main():
# data_cls_with_result is basically some dataframe after processing.
model_comparison(data_cls_with_result)
if __name__ == "__main__":
main()
但是,我收到一条似乎无法修复的错误消息:
'randomized_search.fit' NOT successful!
Traceback (most recent call last):
File "c:\Users\username\Projects\Python\ScoreTest\HelperFunctions.py", line 38, in randomized_search
search = randomized_search.fit(X_train, y_train)
File "C:\Users\username\anaconda3\envs\ts-env\lib\site-packages\sklearn\base.py", line 1152, in wrapper
return fit_method(estimator, *args, **kwargs)
File "C:\Users\username\anaconda3\envs\ts-env\lib\site-packages\sklearn\model_selection\_search.py", line 812, in fit
base_estimator = clone(self.estimator)
File "C:\Users\username\anaconda3\envs\ts-env\lib\site-packages\sklearn\base.py", line 75, in clone
return estimator.__sklearn_clone__()
File "C:\Users\username\anaconda3\envs\ts-env\lib\site-packages\sklearn\base.py", line 268, in __sklearn_clone__
return _clone_parametrized(self)
File "C:\Users\username\anaconda3\envs\ts-env\lib\site-packages\sklearn\base.py", line 110, in _clone_parametrized
new_object = klass(**new_object_params)
File "c:\Users\username\Projects\Python\ScoreTest\HelperFunctions.py", line 158, in __init__
self.optimizer_func = getattr(self.custom_optimizers, optimizer)
TypeError: getattr(): attribute name must be string
'randomized_search' NOT successful!
Traceback (most recent call last):
File "c:\Users\username\Projects\Python\ScoreTest\HelperFunctions.py", line 38, in randomized_search
search = randomized_search.fit(X_train, y_train)
File "C:\Users\username\anaconda3\envs\ts-env\lib\site-packages\sklearn\base.py", line 1152, in wrapper
return fit_method(estimator, *args, **kwargs)
File "C:\Users\username\anaconda3\envs\ts-env\lib\site-packages\sklearn\model_selection\_search.py", line 812, in fit
base_estimator = clone(self.estimator)
File "C:\Users\username\anaconda3\envs\ts-env\lib\site-packages\sklearn\base.py", line 75, in clone
return estimator.__sklearn_clone__()
File "C:\Users\username\anaconda3\envs\ts-env\lib\site-packages\sklearn\base.py", line 268, in __sklearn_clone__
return _clone_parametrized(self)
File "C:\Users\username\anaconda3\envs\ts-env\lib\site-packages\sklearn\base.py", line 110, in _clone_parametrized
new_object = klass(**new_object_params)
File "c:\Users\username\Projects\Python\ScoreTest\HelperFunctions.py", line 158, in __init__
self.optimizer_func = getattr(self.custom_optimizers, optimizer)
TypeError: getattr(): attribute name must be string
'doRandomizedSearch' NOT successful!
Traceback (most recent call last):
File "c:/Users/username/Projects/Python/ScoreTest/score.py", line 483, in <module>
main()
File "c:/Users/username/Projects/Python/ScoreTest/score.py", line 479, in main
model_comparison(data_cls_with_result)
File "c:\Users\username\Projects\Python\ScoreTest\utilities.py", line 127, in model_comparison
model_randomized_search(model_dist_pairs, *cast2Float64(*values))
File "c:\Users\username\Projects\Python\ScoreTest\HelperFunctions.py", line 96, in model_randomized_search
doRandomizedSearch(
File "c:\Users\username\Projects\Python\ScoreTest\HelperFunctions.py", line 87, in doRandomizedSearch
raise e
File "c:\Users\username\Projects\Python\ScoreTest\HelperFunctions.py", line 77, in doRandomizedSearch
randomized_search(
File "c:\Users\username\Projects\Python\ScoreTest\HelperFunctions.py", line 38, in randomized_search
search = randomized_search.fit(X_train, y_train)
File "C:\Users\username\anaconda3\envs\ts-env\lib\site-packages\sklearn\base.py", line 1152, in wrapper
return fit_method(estimator, *args, **kwargs)
File "C:\Users\username\anaconda3\envs\ts-env\lib\site-packages\sklearn\model_selection\_search.py", line 812, in fit
base_estimator = clone(self.estimator)
File "C:\Users\username\anaconda3\envs\ts-env\lib\site-packages\sklearn\base.py", line 75, in clone
return estimator.__sklearn_clone__()
File "C:\Users\username\anaconda3\envs\ts-env\lib\site-packages\sklearn\base.py", line 268, in __sklearn_clone__
return _clone_parametrized(self)
File "C:\Users\username\anaconda3\envs\ts-env\lib\site-packages\sklearn\base.py", line 110, in _clone_parametrized
new_object = klass(**new_object_params)
File "c:\Users\username\Projects\Python\ScoreTest\HelperFunctions.py", line 158, in __init__
self.optimizer_func = getattr(self.custom_optimizers, optimizer)
TypeError: getattr(): attribute name must be string
重构我的代码以创建一个最小的工作示例,我能够捕获问题并根据以下新版本解决它:
#HelperFunctions.py
kf = KFold(n_splits=17, shuffle=True, random_state=4)
def grid_search(
model,
distribution,
X_train,
X_validation,
y_train,
y_validation,
) -> None:
try:
grid_search = GridSearchCV(
model,
distribution,
cv=kf,
return_train_score=True,
n_jobs=-2,
scoring="neg_mean_squared_error",
)
# Fit the model directly without using joblib
search = grid_search.fit(X_train, y_train)
print(
"Best estimator:\n{} \
\nBest parameters:\n{} \
\nBest cross-validation score: {:.3f} \
\nBest test score: {:.3f}\n\n".format(
search.best_estimator_,
search.best_params_,
-1 * search.best_score_,
-1 * search.score(X_validation, y_validation),
)
)
# Delete large variables and collect garbage
del X_train, X_validation, y_train, y_validation, search
gc.collect()
except Exception:
print("'grid_search.fit' NOT successful!")
print(traceback.format_exc())
raise
def doGridSearch(
model,
distribution,
feat_train,
feat_validation,
tgt_train,
tgt_validation,
):
try:
grid_search(
model,
distribution,
feat_train,
feat_validation,
tgt_train,
tgt_validation,
)
except Exception as e:
print("'doGridSearch' NOT successful!")
raise e
def model_grid_search(
model_dist_pairs, feat_train, feat_validation, tgt_train, tgt_validation
):
for model, distribution in model_dist_pairs:
doGridSearch(
model,
distribution,
feat_train,
feat_validation,
tgt_train,
tgt_validation,
)
class CustomOptimizers:
def __init__(self, model, initial_theta, bounds):
self.model = model
self.initial_theta = initial_theta
self.bounds = bounds
def obj_func(self, theta, eval_gradient):
if eval_gradient:
ll, grad = self.model.log_marginal_likelihood(theta, True)
return -ll, -grad
else:
return -self.model.log_marginal_likelihood(theta)
def minimize_wrapper(self):
return minimize(self.obj_func, self.initial_theta, bounds=self.bounds)
def least_squares_wrapper(self):
return least_squares(self.obj_func, self.initial_theta, bounds=self.bounds)
def differential_evolution_wrapper(self):
return differential_evolution(self.obj_func, self.bounds)
def basinhopping_wrapper(self):
return basinhopping(self.obj_func, self.initial_theta, bounds=self.bounds)
def dual_annealing_wrapper(self):
return dual_annealing(self.obj_func, self.bounds)
class GPRWithCustomOptimizer(GaussianProcessRegressor):
def __init__(
self,
optimizer=None,
initial_theta=None,
bounds=None,
random_state=None,
normalize_y=True,
n_restarts_optimizer=0,
copy_X_train=True,
**kwargs,
):
self.initial_theta = initial_theta
self.bounds = bounds
self.custom_optimizers = CustomOptimizers(None, self.initial_theta, self.bounds)
self.optimizer_func = getattr(self.custom_optimizers, optimizer)
super().__init__(
optimizer=self.optimizer_func,
random_state=random_state,
normalize_y=normalize_y,
n_restarts_optimizer=n_restarts_optimizer,
copy_X_train=copy_X_train,
**kwargs,
)
def fit(self, X, y):
super().fit(X, y)
# Delete large variables and collect garbage
del X, y
gc.collect()
def intermediate_models(kernel):
dtr_dic = dict(
ccp_alpha=np.linspace(0.0, 10.0, num=100),
max_features=np.arange(1, 101),
max_depth=np.arange(1, 101),
criterion=["squared_error", "friedman_mse", "absolute_error", "poisson"],
)
optimizer_names = [
"minimize_wrapper",
"least_squares_wrapper",
"differential_evolution_wrapper",
"basinhopping_wrapper",
"dual_annealing_wrapper",
]
model_dist_pairs = []
for optimizer_name in optimizer_names:
gpr = GPRWithCustomOptimizer(kernel=kernel, optimizer=optimizer_name)
gpr_dic = dict(
optimizer=[optimizer_name],
n_restarts_optimizer=np.arange(0, 20 + 1),
normalize_y=[False, True],
copy_X_train=[True, False],
random_state=np.arange(0, 10 + 1),
)
model_dist_pairs.append((gpr, gpr_dic))
return [(DecisionTreeRegressor(), dtr_dic)] + model_dist_pairs
def cast2Float64(X_train, X_test, y_train, y_test):
X_train_new = np.nan_to_num(X_train.astype(np.float64))
y_train_new = np.nan_to_num(y_train.astype(np.float64))
X_test_new = np.nan_to_num(X_test.astype(np.float64))
y_test_new = np.nan_to_num(y_test.astype(np.float64))
return [X_train_new, X_test_new, y_train_new, y_test_new]
def initializeKernel(median_distance, data_range):
return ConstantKernel(constant_value_bounds=np.array([[1e-3, 1e3]])) * Matern(
length_scale_bounds=np.array([[1e-3, 1e3]])
) + WhiteKernel(noise_level_bounds=np.array([[1e-3, 1e3]]))
def all_combined_product_cols(df):
cols = list(df.columns)
product_cols = []
for length in range(1, len(cols) + 1):
for combination in combinations(cols, r=length):
combined_col = None
for col in combination:
if combined_col is None:
combined_col = df[col].copy()
else:
combined_col *= df[col]
combined_col.name = "_".join(combination)
product_cols.append(combined_col)
return pd.concat(product_cols, axis=1)
def ensureDataFrameHasName(y, dataframe_name):
if not isinstance(y, pd.DataFrame):
y = pd.DataFrame(y, name=dataframe_name, freq="C", weekmask=weekmask_string)
else:
y.name = dataframe_name
return y.set_axis(pd.to_datetime(y.index)).asfreq(cfreq)
##################################################################################
def model_comparison(original_df):
a = original_df["Result"].to_numpy()
if (a[0] == a).all():
original_df = original_df.drop(columns=["Result"])
bet, train, features, target = """train_features_target(original_df)"""
features_cols = [col for col in list(original_df.columns) if "Score" not in col]
train = train.dropna()
data = train.values
n_features = len(features_cols)
score_features = data[:, :n_features]
score_target = data[:, n_features]
feat_tgt_tuple = train_test_split(
score_features,
score_target,
test_size=0.33,
random_state=4,
)
feat_train, feat_validation, tgt_train, tgt_validation = feat_tgt_tuple
data_range = np.ptp(feat_train, axis=0)
distances = pdist(feat_train, metric="euclidean")
median_distance = np.median(distances)
values = list(feat_tgt_tuple)
kernel = initializeKernel(median_distance, data_range)
model_dist_pairs = intermediate_models(kernel)
model_grid_search(model_dist_pairs, *cast2Float64(*values))