当多个优化器针对 GaussianProcessRegressor 进行 GridSearched 时,如何修复“TypeError: getattr(): attribute name must be string”?

问题描述 投票:0回答:1

这是我的脚本,用于预测时间序列数据集最终日期的目标。我正在尝试合并一个

GaussianProcessRegressor
模型,以使用
GridSearchCV
找到最佳超参数:(请注意,此处未明确显示一些代码,包括大多数使用的常量,以避免混乱。)

HelperFunctions.py:

skf = StratifiedKFold(n_splits=17, shuffle=True, random_state=4)


def randomized_search(
    model,
    distribution,
    X_train,
    X_validation,
    y_train,
    y_validation,
) -> None:
    try:
        randomized_search = RandomizedSearchCV(
            model,
            distribution,
            cv=skf,
            return_train_score=True,
            n_jobs=-1,
            scoring="neg_mean_squared_error",
            n_iter=100,
        )

        try:
            search = randomized_search.fit(X_train, y_train)

            print(
                "Best estimator:\n{} \
                \nBest parameters:\n{} \
                \nBest cross-validation score: {:.3f} \
                \nBest test score: {:.3f}\n\n".format(
                    search.best_estimator_,
                    search.best_params_,
                    -1 * search.best_score_,
                    -1 * search.score(X_validation, y_validation),
                )
            )

        except Exception:
            print("'randomized_search.fit' NOT successful!")
            print(traceback.format_exc())
            raise
        else:
            print("'randomized_search.fit' Successful!")

    except Exception:
        print("'randomized_search' NOT successful!")
        print(traceback.format_exc())
        raise

    else:
        print("'randomized_search' successful!")


def doRandomizedSearch(
    model,
    distribution,
    feat_train,
    feat_validation,
    tgt_train,
    tgt_validation,
):
    try:
        randomized_search(
            model,
            distribution,
            feat_train,
            feat_validation,
            tgt_train,
            tgt_validation,
        )
    except Exception as e:
        print("'doRandomizedSearch' NOT successful!")
        raise e
    else:
        print("'doRandomizedSearch' Successful!")


def model_randomized_search(
    model_dist_pairs, feat_train, feat_validation, tgt_train, tgt_validation
):
    for model, distribution in model_dist_pairs:
        doRandomizedSearch(
            model,
            distribution,
            feat_train,
            feat_validation,
            tgt_train,
            tgt_validation,
        )


class CustomOptimizers:
    def __init__(self, model, initial_theta, bounds):
        self.model = model
        self.initial_theta = initial_theta
        self.bounds = bounds

    def obj_func(self, theta, eval_gradient):
        if eval_gradient:
            ll, grad = self.model.log_marginal_likelihood(theta, True)
            return -ll, -grad
        else:
            return -self.model.log_marginal_likelihood(theta)

    def minimize_wrapper(self, theta, eval_gradient):
        return minimize(self.obj_func, theta, args=(eval_gradient), bounds=self.bounds)

    def least_squares_wrapper(self, theta, eval_gradient):
        return least_squares(
            self.obj_func, theta, args=(eval_gradient), bounds=self.bounds
        )

    def differential_evolution_wrapper(self, theta, eval_gradient):
        return differential_evolution(
            self.obj_func, theta, args=(eval_gradient), bounds=self.bounds
        )

    def basinhopping_wrapper(self, theta, eval_gradient):
        return basinhopping(
            self.obj_func, theta, args=(eval_gradient), bounds=self.bounds
        )

    def dual_annealing_wrapper(self, theta, eval_gradient):
        return dual_annealing(
            self.obj_func, theta, args=(eval_gradient), bounds=self.bounds
        )


class GPRWithCustomOptimizer(GaussianProcessRegressor):
    def __init__(
        self,
        optimizer="minimize",
        initial_theta=None,
        bounds=None,
        random_state=None,
        normalize_y=True,
        n_restarts_optimizer=0,
        copy_X_train=True,
        **kwargs,
    ):
        self.initial_theta = initial_theta
        self.bounds = bounds
        self.custom_optimizers = CustomOptimizers(self, self.initial_theta, self.bounds)
        self.optimizer_func = getattr(self.custom_optimizers, optimizer)
        super().__init__(
            optimizer=self.optimizer_func,
            random_state=random_state,
            normalize_y=normalize_y,
            n_restarts_optimizer=n_restarts_optimizer,
            copy_X_train=copy_X_train,
            **kwargs,
        )

    def fit(self, X, y):
        super().fit(X, y)


def intermediate_models(kernel):
    dtr_dic = dict(
        ccp_alpha=uniform(loc=0.0, scale=10.0),
        max_features=randint(low=1, high=100),
        max_depth=randint(low=1, high=100),
        criterion=["squared_error", "friedman_mse", "absolute_error", "poisson"],
    )
    optimizer_names = [
        "minimize_wrapper",
        "least_squares_wrapper",
        "differential_evolution_wrapper",
        "basinhopping_wrapper",
        "dual_annealing_wrapper",
    ]
    model_dist_pairs = []
    for optimizer_name in optimizer_names:
        gpr = GPRWithCustomOptimizer(kernel=kernel, optimizer=optimizer_name)

        gpr_dic = dict(
            optimizer=optimizer_names,
            n_restarts_optimizer=np.arange(0, 20 + 1),
            normalize_y=[False, True],
            copy_X_train=[True, False],
            random_state=np.arange(0, 10 + 1),
        )
        model_dist_pairs.append((gpr, gpr_dic))
    return [(DecisionTreeRegressor(), dtr_dic)] + model_dist_pairs


def cast2Float64(X_train, X_test, y_train, y_test):
    X_train_new = np.nan_to_num(X_train.astype(np.float64))
    y_train_new = np.nan_to_num(y_train.astype(np.float64))
    X_test_new = np.nan_to_num(X_test.astype(np.float64))
    y_test_new = np.nan_to_num(y_test.astype(np.float64))
    return [X_train_new, X_test_new, y_train_new, y_test_new]

utilities.py:

from HelperFunctions import (
    np,
    intermediate_models,
    model_randomized_search,
    cast2Float64,
)

def initializeKernel(median_distance, data_range):
    return ConstantKernel(constant_value_bounds=np.array([[1e-3, 1e3]])) * Matern(
        length_scale_bounds=np.array([[1e-3, 1e3]])
    ) + WhiteKernel(noise_level_bounds=np.array([[1e-3, 1e3]]))


####################################################################
def all_combined_product_cols(df):
    cols = list(df.columns)
    product_cols = []
    for length in range(1, len(cols) + 1):
        for combination in combinations(cols, r=length):
            combined_col = None
            for col in combination:
                if combined_col is None:
                    combined_col = df[col].copy()
                else:
                    combined_col *= df[col]
            combined_col.name = "_".join(combination)
            product_cols.append(combined_col)
    return pd.concat(product_cols, axis=1)


def ensureDataFrameHasName(y, dataframe_name):
    if not isinstance(y, pd.DataFrame):
        y = pd.DataFrame(y, name=dataframe_name, freq="C", weekmask=weekmask_string)
    else:
        y.name = dataframe_name
    return y.set_axis(pd.to_datetime(y.index)).asfreq(cfreq)

##################################################################################
def model_comparison(original_df):
    a = original_df["Result"].to_numpy()
    if (a[0] == a).all():
        original_df = original_df.drop(columns=["Result"])
    bet, train, features, target = """train_features_target(original_df)"""
    features_cols = [col for col in list(original_df.columns) if "Score" not in col]
    train.dropna(inplace=True)
    data = train.values
    n_features = len(features_cols)
    score_features = data[:, :n_features]
    score_target = data[:, n_features]
    feat_tgt_tuple = train_test_split(
        score_features,
        score_target,
        test_size=0.33,
        random_state=4,
    )
    feat_train, feat_validation, tgt_train, tgt_validation = feat_tgt_tuple
    data_range = np.ptp(feat_train, axis=0)
    distances = pdist(feat_train, metric="euclidean")
    median_distance = np.median(distances)
    values = list(feat_tgt_tuple)
    kernel = initializeKernel(median_distance, data_range)
    model_dist_pairs = intermediate_models(kernel)
    model_randomized_search(model_dist_pairs, *cast2Float64(*values))

score.py:

from utilities import model_comparison
###############################################################################
def main():
    # data_cls_with_result is basically some dataframe after processing.
    model_comparison(data_cls_with_result)


if __name__ == "__main__":
    main()

但是,我收到一条似乎无法修复的错误消息:

'randomized_search.fit' NOT successful!
Traceback (most recent call last):
  File "c:\Users\username\Projects\Python\ScoreTest\HelperFunctions.py", line 38, in randomized_search
    search = randomized_search.fit(X_train, y_train)
  File "C:\Users\username\anaconda3\envs\ts-env\lib\site-packages\sklearn\base.py", line 1152, in wrapper
    return fit_method(estimator, *args, **kwargs)
  File "C:\Users\username\anaconda3\envs\ts-env\lib\site-packages\sklearn\model_selection\_search.py", line 812, in fit
    base_estimator = clone(self.estimator)
  File "C:\Users\username\anaconda3\envs\ts-env\lib\site-packages\sklearn\base.py", line 75, in clone
    return estimator.__sklearn_clone__()
  File "C:\Users\username\anaconda3\envs\ts-env\lib\site-packages\sklearn\base.py", line 268, in __sklearn_clone__
    return _clone_parametrized(self)
  File "C:\Users\username\anaconda3\envs\ts-env\lib\site-packages\sklearn\base.py", line 110, in _clone_parametrized
    new_object = klass(**new_object_params)
  File "c:\Users\username\Projects\Python\ScoreTest\HelperFunctions.py", line 158, in __init__   
    self.optimizer_func = getattr(self.custom_optimizers, optimizer)
TypeError: getattr(): attribute name must be string

'randomized_search' NOT successful!
Traceback (most recent call last):
  File "c:\Users\username\Projects\Python\ScoreTest\HelperFunctions.py", line 38, in randomized_search
    search = randomized_search.fit(X_train, y_train)
  File "C:\Users\username\anaconda3\envs\ts-env\lib\site-packages\sklearn\base.py", line 1152, in wrapper
    return fit_method(estimator, *args, **kwargs)
  File "C:\Users\username\anaconda3\envs\ts-env\lib\site-packages\sklearn\model_selection\_search.py", line 812, in fit
    base_estimator = clone(self.estimator)
  File "C:\Users\username\anaconda3\envs\ts-env\lib\site-packages\sklearn\base.py", line 75, in clone
    return estimator.__sklearn_clone__()
  File "C:\Users\username\anaconda3\envs\ts-env\lib\site-packages\sklearn\base.py", line 268, in __sklearn_clone__
    return _clone_parametrized(self)
  File "C:\Users\username\anaconda3\envs\ts-env\lib\site-packages\sklearn\base.py", line 110, in _clone_parametrized
    new_object = klass(**new_object_params)
  File "c:\Users\username\Projects\Python\ScoreTest\HelperFunctions.py", line 158, in __init__   
    self.optimizer_func = getattr(self.custom_optimizers, optimizer)
TypeError: getattr(): attribute name must be string

'doRandomizedSearch' NOT successful!
Traceback (most recent call last):
  File "c:/Users/username/Projects/Python/ScoreTest/score.py", line 483, in <module>
    main()
  File "c:/Users/username/Projects/Python/ScoreTest/score.py", line 479, in main
    model_comparison(data_cls_with_result)
  File "c:\Users\username\Projects\Python\ScoreTest\utilities.py", line 127, in model_comparison 
    model_randomized_search(model_dist_pairs, *cast2Float64(*values))
  File "c:\Users\username\Projects\Python\ScoreTest\HelperFunctions.py", line 96, in model_randomized_search
    doRandomizedSearch(
  File "c:\Users\username\Projects\Python\ScoreTest\HelperFunctions.py", line 87, in doRandomizedSearch
    raise e
  File "c:\Users\username\Projects\Python\ScoreTest\HelperFunctions.py", line 77, in doRandomizedSearch
    randomized_search(
  File "c:\Users\username\Projects\Python\ScoreTest\HelperFunctions.py", line 38, in randomized_search
    search = randomized_search.fit(X_train, y_train)
  File "C:\Users\username\anaconda3\envs\ts-env\lib\site-packages\sklearn\base.py", line 1152, in wrapper
    return fit_method(estimator, *args, **kwargs)
  File "C:\Users\username\anaconda3\envs\ts-env\lib\site-packages\sklearn\model_selection\_search.py", line 812, in fit
    base_estimator = clone(self.estimator)
  File "C:\Users\username\anaconda3\envs\ts-env\lib\site-packages\sklearn\base.py", line 75, in clone
    return estimator.__sklearn_clone__()
  File "C:\Users\username\anaconda3\envs\ts-env\lib\site-packages\sklearn\base.py", line 268, in __sklearn_clone__
    return _clone_parametrized(self)
  File "C:\Users\username\anaconda3\envs\ts-env\lib\site-packages\sklearn\base.py", line 110, in _clone_parametrized
    new_object = klass(**new_object_params)
  File "c:\Users\username\Projects\Python\ScoreTest\HelperFunctions.py", line 158, in __init__   
    self.optimizer_func = getattr(self.custom_optimizers, optimizer)
TypeError: getattr(): attribute name must be string
python optimization scikit-learn kernel-density gaussian-process
1个回答
0
投票

重构我的代码以创建一个最小的工作示例,我能够捕获问题并根据以下新版本解决它:

#HelperFunctions.py

kf = KFold(n_splits=17, shuffle=True, random_state=4)


def grid_search(
    model,
    distribution,
    X_train,
    X_validation,
    y_train,
    y_validation,
) -> None:
    try:
        grid_search = GridSearchCV(
            model,
            distribution,
            cv=kf,
            return_train_score=True,
            n_jobs=-2,
            scoring="neg_mean_squared_error",
        )

        # Fit the model directly without using joblib
        search = grid_search.fit(X_train, y_train)

        print(
            "Best estimator:\n{} \
            \nBest parameters:\n{} \
            \nBest cross-validation score: {:.3f} \
            \nBest test score: {:.3f}\n\n".format(
                search.best_estimator_,
                search.best_params_,
                -1 * search.best_score_,
                -1 * search.score(X_validation, y_validation),
            )
        )

        # Delete large variables and collect garbage
        del X_train, X_validation, y_train, y_validation, search
        gc.collect()

    except Exception:
        print("'grid_search.fit' NOT successful!")
        print(traceback.format_exc())
        raise


def doGridSearch(
    model,
    distribution,
    feat_train,
    feat_validation,
    tgt_train,
    tgt_validation,
):
    try:
        grid_search(
            model,
            distribution,
            feat_train,
            feat_validation,
            tgt_train,
            tgt_validation,
        )
    except Exception as e:
        print("'doGridSearch' NOT successful!")
        raise e


def model_grid_search(
    model_dist_pairs, feat_train, feat_validation, tgt_train, tgt_validation
):
    for model, distribution in model_dist_pairs:
        doGridSearch(
            model,
            distribution,
            feat_train,
            feat_validation,
            tgt_train,
            tgt_validation,
        )


class CustomOptimizers:
    def __init__(self, model, initial_theta, bounds):
        self.model = model
        self.initial_theta = initial_theta
        self.bounds = bounds

    def obj_func(self, theta, eval_gradient):
        if eval_gradient:
            ll, grad = self.model.log_marginal_likelihood(theta, True)
            return -ll, -grad
        else:
            return -self.model.log_marginal_likelihood(theta)

    def minimize_wrapper(self):
        return minimize(self.obj_func, self.initial_theta, bounds=self.bounds)

    def least_squares_wrapper(self):
        return least_squares(self.obj_func, self.initial_theta, bounds=self.bounds)

    def differential_evolution_wrapper(self):
        return differential_evolution(self.obj_func, self.bounds)

    def basinhopping_wrapper(self):
        return basinhopping(self.obj_func, self.initial_theta, bounds=self.bounds)

    def dual_annealing_wrapper(self):
        return dual_annealing(self.obj_func, self.bounds)


class GPRWithCustomOptimizer(GaussianProcessRegressor):
    def __init__(
        self,
        optimizer=None,
        initial_theta=None,
        bounds=None,
        random_state=None,
        normalize_y=True,
        n_restarts_optimizer=0,
        copy_X_train=True,
        **kwargs,
    ):
        self.initial_theta = initial_theta
        self.bounds = bounds
        self.custom_optimizers = CustomOptimizers(None, self.initial_theta, self.bounds)
        self.optimizer_func = getattr(self.custom_optimizers, optimizer)

        super().__init__(
            optimizer=self.optimizer_func,
            random_state=random_state,
            normalize_y=normalize_y,
            n_restarts_optimizer=n_restarts_optimizer,
            copy_X_train=copy_X_train,
            **kwargs,
        )

    def fit(self, X, y):
        super().fit(X, y)
        # Delete large variables and collect garbage
        del X, y
        gc.collect()


def intermediate_models(kernel):
    dtr_dic = dict(
        ccp_alpha=np.linspace(0.0, 10.0, num=100),
        max_features=np.arange(1, 101),
        max_depth=np.arange(1, 101),
        criterion=["squared_error", "friedman_mse", "absolute_error", "poisson"],
    )
    optimizer_names = [
        "minimize_wrapper",
        "least_squares_wrapper",
        "differential_evolution_wrapper",
        "basinhopping_wrapper",
        "dual_annealing_wrapper",
    ]
    model_dist_pairs = []

    for optimizer_name in optimizer_names:
        gpr = GPRWithCustomOptimizer(kernel=kernel, optimizer=optimizer_name)

        gpr_dic = dict(
            optimizer=[optimizer_name],
            n_restarts_optimizer=np.arange(0, 20 + 1),
            normalize_y=[False, True],
            copy_X_train=[True, False],
            random_state=np.arange(0, 10 + 1),
        )
        model_dist_pairs.append((gpr, gpr_dic))
    return [(DecisionTreeRegressor(), dtr_dic)] + model_dist_pairs


def cast2Float64(X_train, X_test, y_train, y_test):
    X_train_new = np.nan_to_num(X_train.astype(np.float64))
    y_train_new = np.nan_to_num(y_train.astype(np.float64))
    X_test_new = np.nan_to_num(X_test.astype(np.float64))
    y_test_new = np.nan_to_num(y_test.astype(np.float64))
    return [X_train_new, X_test_new, y_train_new, y_test_new]

实用程序.py

def initializeKernel(median_distance, data_range):
    return ConstantKernel(constant_value_bounds=np.array([[1e-3, 1e3]])) * Matern(
        length_scale_bounds=np.array([[1e-3, 1e3]])
    ) + WhiteKernel(noise_level_bounds=np.array([[1e-3, 1e3]]))


def all_combined_product_cols(df):
    cols = list(df.columns)
    product_cols = []
    for length in range(1, len(cols) + 1):
        for combination in combinations(cols, r=length):
            combined_col = None
            for col in combination:
                if combined_col is None:
                    combined_col = df[col].copy()
                else:
                    combined_col *= df[col]
            combined_col.name = "_".join(combination)
            product_cols.append(combined_col)
    return pd.concat(product_cols, axis=1)


def ensureDataFrameHasName(y, dataframe_name):
    if not isinstance(y, pd.DataFrame):
        y = pd.DataFrame(y, name=dataframe_name, freq="C", weekmask=weekmask_string)
    else:
        y.name = dataframe_name
    return y.set_axis(pd.to_datetime(y.index)).asfreq(cfreq)
##################################################################################
def model_comparison(original_df):
    a = original_df["Result"].to_numpy()
    if (a[0] == a).all():
        original_df = original_df.drop(columns=["Result"])
    bet, train, features, target = """train_features_target(original_df)"""
    features_cols = [col for col in list(original_df.columns) if "Score" not in col]
    train = train.dropna()
    data = train.values
    n_features = len(features_cols)
    score_features = data[:, :n_features]
    score_target = data[:, n_features]
    feat_tgt_tuple = train_test_split(
        score_features,
        score_target,
        test_size=0.33,
        random_state=4,
    )
    feat_train, feat_validation, tgt_train, tgt_validation = feat_tgt_tuple
    data_range = np.ptp(feat_train, axis=0)
    distances = pdist(feat_train, metric="euclidean")
    median_distance = np.median(distances)
    values = list(feat_tgt_tuple)
    kernel = initializeKernel(median_distance, data_range)
    model_dist_pairs = intermediate_models(kernel)
    model_grid_search(model_dist_pairs, *cast2Float64(*values))
© www.soinside.com 2019 - 2024. All rights reserved.