我有一个 PySpark 数据框
+-------+--------------+----+----+
|address| date|name|food|
+-------+--------------+----+----+
|1111111|20151122045510| Yin|gre |
|1111111|20151122045501| Yin|gre |
|1111111|20151122045500| Yln|gra |
|1111112|20151122065832| Yun|ddd |
|1111113|20160101003221| Yan|fdf |
|1111111|20160703045231| Yin|gre |
|1111114|20150419134543| Yin|fdf |
|1111115|20151123174302| Yen|ddd |
|2111115| 20123192| Yen|gre |
+-------+--------------+----+----+
我想转换为与 pyspark.ml 一起使用。我可以使用 StringIndexer 将名称列转换为数字类别:
indexer = StringIndexer(inputCol="name", outputCol="name_index").fit(df)
df_ind = indexer.transform(df)
df_ind.show()
+-------+--------------+----+----------+----+
|address| date|name|name_index|food|
+-------+--------------+----+----------+----+
|1111111|20151122045510| Yin| 0.0|gre |
|1111111|20151122045501| Yin| 0.0|gre |
|1111111|20151122045500| Yln| 2.0|gra |
|1111112|20151122065832| Yun| 4.0|ddd |
|1111113|20160101003221| Yan| 3.0|fdf |
|1111111|20160703045231| Yin| 0.0|gre |
|1111114|20150419134543| Yin| 0.0|fdf |
|1111115|20151123174302| Yen| 1.0|ddd |
|2111115| 20123192| Yen| 1.0|gre |
+-------+--------------+----+----------+----+
如何使用 StringIndexer 转换多个列(例如,
name
和 food
,每个列都有自己的 StringIndexer
),然后使用 VectorAssembler 生成特征向量?或者我是否必须为每一列创建一个 StringIndexer
?
** 编辑 **:这不是一个骗局,因为我需要以编程方式对具有不同列名的多个数据框进行此操作。我无法使用
VectorIndexer
或 VectorAssembler
,因为这些列不是数字。
** 编辑2**:暂定解决方案是
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(df).transform(df) for column in df.columns ]
我现在创建一个包含三个数据框的列表,每个数据框与原始数据框和转换后的列相同。现在我需要加入 then 来形成最终的数据框,但这非常低效。
我发现最好的方法是将多个
StringIndex
组合在一个列表上,并使用 Pipeline
来执行它们:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(df) for column in list(set(df.columns)-set(['date'])) ]
pipeline = Pipeline(stages=indexers)
df_r = pipeline.fit(df).transform(df)
df_r.show()
+-------+--------------+----+----+----------+----------+-------------+
|address| date|food|name|food_index|name_index|address_index|
+-------+--------------+----+----+----------+----------+-------------+
|1111111|20151122045510| gre| Yin| 0.0| 0.0| 0.0|
|1111111|20151122045501| gra| Yin| 2.0| 0.0| 0.0|
|1111111|20151122045500| gre| Yln| 0.0| 2.0| 0.0|
|1111112|20151122065832| gre| Yun| 0.0| 4.0| 3.0|
|1111113|20160101003221| gre| Yan| 0.0| 3.0| 1.0|
|1111111|20160703045231| gre| Yin| 0.0| 0.0| 0.0|
|1111114|20150419134543| gre| Yin| 0.0| 0.0| 5.0|
|1111115|20151123174302| ddd| Yen| 1.0| 1.0| 2.0|
|2111115| 20123192| ddd| Yen| 1.0| 1.0| 4.0|
+-------+--------------+----+----+----------+----------+-------------+
使用 PySpark 3.0+ 现在更容易,您可以使用
inputCols
和 outputCols
选项:
https://spark.apache.org/docs/latest/ml-features#stringindexer
class pyspark.ml.feature.StringIndexer(
inputCol=...,
outputCol=...,
inputCols=...,
outputCols=...,
handleInvalid='error',
stringOrderType='frequencyDesc'
)
我可以为您提供以下解决方案。最好使用管道对较大的数据集进行此类转换。它们还使您的代码更容易遵循和理解。如果需要,您可以向管道添加更多阶段。例如添加编码器。
#create a list of the columns that are string typed
categoricalColumns = [item[0] for item in df.dtypes if item[1].startswith('string') ]
#define a list of stages in your pipeline. The string indexer will be one stage
stages = []
#iterate through all categorical values
for categoricalCol in categoricalColumns:
#create a string indexer for those categorical values and assign a new name including the word 'Index'
stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')
#append the string Indexer to our list of stages
stages += [stringIndexer]
#Create the pipeline. Assign the satges list to the pipeline key word stages
pipeline = Pipeline(stages = stages)
#fit the pipeline to our dataframe
pipelineModel = pipeline.fit(df)
#transform the dataframe
df= pipelineModel.transform(df)
请看看我的参考资料
将 StringIndexer 应用于 PySpark Dataframe 中的多个列 对于火花2.4.7
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
indexers = [StringIndexer(inputCol="F1", outputCol="F1Index") , StringIndexer(inputCol="F5", outputCol="F5Index")]
pipeline = Pipeline(stages=indexers)
DF6 = pipeline.fit(DF5).transform(DF5)
DF6.show()