这是示例代码。目标是删除列标题中的特殊字符,并使用包含“bag”的标题名称对任何列进行bucketize。
data = pd.DataFrame({
'ball_column': [0, 1, 2, 3],
'keep_column': [7, 8, 9, 10],
'hall_column': [14, 15, 16, 17],
'bag_this_1': [21, 31, 41, 51],
'bag_this_2': [21, 31, 41, 51]
})
df = spark.createDataFrame(data)
df.show()
+-----------+-----------+-----------+----------+----------+
|ball_column|keep_column|hall_column|bag_this_1|bag_this_2|
+-----------+-----------+-----------+----------+----------+
| 0| 7| 14| 21| 21|
| 1| 8| 15| 31| 31|
| 2| 9| 16| 41| 41|
| 3| 10| 17| 51| 51|
+-----------+-----------+-----------+----------+----------+
第一个类编辑列名:它从列标题中删除任何特殊字符,并仅返回标题中的字母和数字。
class EditColumnNameWithReplacement(Transformer):
def __init__(self, existing, new):
super().__init__()
self.existing = existing
self.new = new
def _transform(self, df: DataFrame) -> DataFrame:
for (x, y) in zip(self.existing, self.new):
df = df.withColumnRenamed(x, y)
return df.select(*self.new)
## Capture 'bigInt' columns, and drop the rest
bigint_list = [
name for name, types in df.dtypes if types == 'bigint' or types == 'double'
]
edited_columns = [''.join(y for y in x if y.isalnum()) for x in bigint_list]
reformattedColumns = EditColumnNameWithReplacement(
existing=bigint_list, new=edited_columns)
model = Pipeline(stages=[reformattedColumns]).fit(df).transform(df)
下一部分,列出列的列表。它选择包含单词bag
的标题并将值包装起来。
spike_cols = [col for col in model.columns if "bag" in col]
bagging = [
Bucketizer(
splits=[-float("inf"), 10, 100, float("inf")],
inputCol=x,
outputCol=x + "bucketed") for x in spike_cols
]
model_1 = Pipeline(stages=bagging).fit(model).transform(model)
model_1.show()
如何在单个管道中添加两个函数(reformattedColumns
和bagging
),而不是创建2个单独的管道来完成任务? ?
你需要改变几件小事。
由于您不适合第一个管道并对其进行转换,因此您无法使用以下内容:
spike_cols = [col for col in model.columns if "bag" in col]
------------- <- This
而是使用edited_columns
来引用这些列:
spike_cols = [col for col in edited_columns if "bag" in col]
其次,您只需将阶段合并到一个列表中:
stages_ = [reformattedColumns] + bagging
Pipeline(stages=stages_).fit(df).transform(df).show()
// +--------+--------+----------+----------+----------+----------------+----------------+
// |bagthis1|bagthis2|ballcolumn|hallcolumn|keepcolumn|bagthis1bucketed|bagthis2bucketed|
// +--------+--------+----------+----------+----------+----------------+----------------+
// | 21| 21| 0| 14| 7| 1.0| 1.0|
// | 31| 31| 1| 15| 8| 1.0| 1.0|
// | 41| 41| 2| 16| 9| 1.0| 1.0|
// | 51| 51| 3| 17| 10| 1.0| 1.0|
// +--------+--------+----------+----------+----------+----------------+----------------+
整个代码:
import pandas as pd
from pyspark.ml import Pipeline, Transformer
from pyspark.ml.feature import Bucketizer
from pyspark.sql import SparkSession, DataFrame
data = pd.DataFrame({
'ball_column': [0, 1, 2, 3],
'keep_column': [7, 8, 9, 10],
'hall_column': [14, 15, 16, 17],
'bag_this_1': [21, 31, 41, 51],
'bag_this_2': [21, 31, 41, 51]
})
df = spark.createDataFrame(data)
df.show()
class EditColumnNameWithReplacement(Transformer):
def __init__(self, existing, new):
super().__init__()
self.existing = existing
self.new = new
def _transform(self, df: DataFrame) -> DataFrame:
for (x, y) in zip(self.existing, self.new):
df = df.withColumnRenamed(x, y)
return df.select(*self.new)
## Capture 'bigInt' columns, and drop the rest
bigint_list = [name for name, types in df.dtypes if types == 'bigint' or types == 'double']
edited_columns = [''.join(y for y in x if y.isalnum()) for x in bigint_list]
spike_cols = [col for col in edited_columns if "bag" in col]
reformattedColumns = EditColumnNameWithReplacement(
existing=bigint_list, new=edited_columns)
bagging = [
Bucketizer(
splits=[-float("inf"), 10, 100, float("inf")],
inputCol=x,
outputCol=x + "bucketed") for x in spike_cols
]
stages_ = [reformattedColumns] + bagging
Pipeline(stages=stages_).fit(df).transform(df).show()