如何在PySpark中使用自定义函数在同一ML管道中传递bucketizer?

问题描述 投票:1回答:1

这是示例代码。目标是删除列标题中的特殊字符,并使用包含“bag”的标题名称对任何列进行bucketize。

data = pd.DataFrame({
    'ball_column': [0, 1, 2, 3],
    'keep_column': [7, 8, 9, 10],
    'hall_column': [14, 15, 16, 17],
    'bag_this_1': [21, 31, 41, 51],
    'bag_this_2': [21, 31, 41, 51]
})
df = spark.createDataFrame(data)
df.show()



+-----------+-----------+-----------+----------+----------+
|ball_column|keep_column|hall_column|bag_this_1|bag_this_2|
+-----------+-----------+-----------+----------+----------+
|          0|          7|         14|        21|        21|
|          1|          8|         15|        31|        31|
|          2|          9|         16|        41|        41|
|          3|         10|         17|        51|        51|
+-----------+-----------+-----------+----------+----------+

第一个类编辑列名:它从列标题中删除任何特殊字符,并仅返回标题中的字母和数字。

class EditColumnNameWithReplacement(Transformer):
    def __init__(self, existing, new):
        super().__init__()
        self.existing = existing
        self.new = new

    def _transform(self, df: DataFrame) -> DataFrame:

        for (x, y) in zip(self.existing, self.new):
            df = df.withColumnRenamed(x, y)

        return df.select(*self.new)


## Capture 'bigInt' columns, and drop the rest
bigint_list = [
    name for name, types in df.dtypes if types == 'bigint' or types == 'double'
]
edited_columns = [''.join(y for y in x if y.isalnum()) for x in bigint_list]

reformattedColumns = EditColumnNameWithReplacement(
    existing=bigint_list, new=edited_columns)

model = Pipeline(stages=[reformattedColumns]).fit(df).transform(df)

下一部分,列出列的列表。它选择包含单词bag的标题并将值包装起来。

spike_cols = [col for col in model.columns if "bag" in col]

bagging = [
    Bucketizer(
        splits=[-float("inf"), 10, 100, float("inf")],
        inputCol=x,
        outputCol=x + "bucketed") for x in spike_cols
]

model_1 = Pipeline(stages=bagging).fit(model).transform(model)
model_1.show()

如何在单个管道中添加两个函数(reformattedColumnsbagging),而不是创建2个单独的管道来完成任务? ?

python-3.x apache-spark pyspark
1个回答
3
投票

你需要改变几件小事。

由于您不适合第一个管道并对其进行转换,因此您无法使用以下内容:

spike_cols = [col for col in model.columns if "bag" in col]
                             ------------- <- This

而是使用edited_columns来引用这些列:

spike_cols = [col for col in edited_columns if "bag" in col]

其次,您只需将阶段合并到一个列表中:

stages_ = [reformattedColumns] + bagging

Pipeline(stages=stages_).fit(df).transform(df).show()
// +--------+--------+----------+----------+----------+----------------+----------------+
// |bagthis1|bagthis2|ballcolumn|hallcolumn|keepcolumn|bagthis1bucketed|bagthis2bucketed|
// +--------+--------+----------+----------+----------+----------------+----------------+
// |      21|      21|         0|        14|         7|             1.0|             1.0|
// |      31|      31|         1|        15|         8|             1.0|             1.0|
// |      41|      41|         2|        16|         9|             1.0|             1.0|
// |      51|      51|         3|        17|        10|             1.0|             1.0|
// +--------+--------+----------+----------+----------+----------------+----------------+

整个代码:

import pandas as pd
from pyspark.ml import Pipeline, Transformer
from pyspark.ml.feature import Bucketizer
from pyspark.sql import SparkSession, DataFrame

data = pd.DataFrame({
    'ball_column': [0, 1, 2, 3],
    'keep_column': [7, 8, 9, 10],
    'hall_column': [14, 15, 16, 17],
    'bag_this_1': [21, 31, 41, 51],
    'bag_this_2': [21, 31, 41, 51]
})

df = spark.createDataFrame(data)

df.show()

class EditColumnNameWithReplacement(Transformer):
    def __init__(self, existing, new):
        super().__init__()
        self.existing = existing
        self.new = new

    def _transform(self, df: DataFrame) -> DataFrame:
        for (x, y) in zip(self.existing, self.new):
            df = df.withColumnRenamed(x, y)

        return df.select(*self.new)

## Capture 'bigInt' columns, and drop the rest
bigint_list = [name for name, types in df.dtypes if types == 'bigint' or types == 'double']
edited_columns = [''.join(y for y in x if y.isalnum()) for x in bigint_list]
spike_cols = [col for col in edited_columns if "bag" in col]

reformattedColumns = EditColumnNameWithReplacement(
    existing=bigint_list, new=edited_columns)

bagging = [
    Bucketizer(
        splits=[-float("inf"), 10, 100, float("inf")],
        inputCol=x,
        outputCol=x + "bucketed") for x in spike_cols
    ]

stages_ = [reformattedColumns] + bagging

Pipeline(stages=stages_).fit(df).transform(df).show()
© www.soinside.com 2019 - 2024. All rights reserved.