Pyspark 迭代 1M 列的有效方法

问题描述 投票:0回答:1

我有一个 pyspark 数据框,如下所示:

+--------+-------------+---------+---------+---------+
|    code|    updatedAt|S0x223433|S1yd33333|S4r256467|
+--------+-------------+---------+---------+---------+
|AAAAAAAA|1713292448319|     3243|     null|     3422|
|BBBBBBBB|1689430451041|     3455|     2345|     7654|

我想在上面的数据框中添加一个新列,对于给定的行,它只会采用那些不为空的 s 前缀列。结果数据框如下所示:

+--------+-------------+---------+---------+---------+---------------------
|    code|    updatedAt|S0x223433|S1yd33333|S4r256467|new_col             |
+--------+-------------+---------+---------+---------+---------------------
|AAAAAAAA|1713292448319|     3243|     null|     3422|[S0x223433,S4r256467]
|BBBBBBBB|1689430451041|     null|     2345|     7654|[S1yd33333,S4r256467]

我的 Spark 数据框有近 1M 列,我希望它的处理方式如下:

  1. 检查列名是否与正则表达式匹配

  2. 然后检查它是否有非空值,然后将其添加到列表中

  3. 返回给定行的所有此类列的列表

(新列中添加的列表稍后将展开以进行标准化)

当前添加新列的逻辑需要永远运行(因为它是在 1M 列上运行的循环)

non_null_s_columns= array([when(col(c).isNotNull(), lit(c)) for c in SPrefixedcolumns])

向社区提问:

有没有有效的方法来执行此操作?我对 Spark 比较陌生,所以可能会错过一个明显的方法(如果有的话)。我正在寻找用于粘合动态框架或 Spark 数据框架的指针,我可以在不迭代超过 1M 列的情况下执行此操作。蒂亚!

我尝试删除空列,但我的数据集很稀疏,所以这没有帮助。我正在编写一个 UDF,我可以向其中传递数据帧行并填充新列,但到目前为止还没有运气。

dataframe pyspark aws-glue
1个回答
0
投票

鉴于:

+--------+-------------+---------+---------+---------+
|    code|    updatedAt|S0x223433|S1yd33333|S4r256467|
+--------+-------------+---------+---------+---------+
|AAAAAAAA|1713292448319|     3243|     null|     3422|
|BBBBBBBB|1689430451041|     null|     2345|     7654|

正在做:

  • melt
    由独特的行d列和
    SPrefixedcolumns
  • dropna
    保留填充的列名称
  • groupBy
    这些列和
    collect_list
    SPrefixedcolumns
    名称
  • join
    这回到原来的表
import pyspark.sql.functions as F

unique_cols = ["code", "updatedAt"]  # Ensure these make unique rows.
SPrefixedcolumns = [col for col in df.columns if col.startswith("S")]

df = (
    df.join(
        df.melt(
            ids=unique_cols,
            values=SPrefixedcolumns,
            variableColumnName="variable",
            valueColumnName="value",
          )
          .dropna()
          .groupBy(unique_cols).agg(F.collect_list("variable").alias("new_col")),
        unique_cols,
        "left_outer",
    )
)
df.show()

输出:

+--------+-------------+---------+---------+---------+--------------------+
|    code|    updatedAt|S0x223433|S1yd33333|S4r256467|             new_col|
+--------+-------------+---------+---------+---------+--------------------+
|AAAAAAAA|1713292448319|   3243.0|      NaN|     3422|[S4r256467, S0x22...|
|BBBBBBBB|1689430451041|      NaN|   2345.0|     7654|[S4r256467, S1yd3...|
+--------+-------------+---------+---------+---------+--------------------+
© www.soinside.com 2019 - 2024. All rights reserved.