将数据帧列作为参数传递给pyspark中的函数

问题描述 投票:0回答:1

我是pyspark的新手,正在尝试探索一些新的实现方法。我试图将数据框中的派生列作为参数传递给查询并返回值的函数

def getValue(col):
   cfg = spark.sql("select value from config_table where key='"+str(col)+"'")
   value = cfg.collect()[0][0]
   return value

main():
   df_output = df.withColumn('derived',getValue(col('col_to_fetch_the_value')))

上面的代码会导致语法错误。 预先感谢

尝试下面的代码对我有用,但不能使用,因为列值可能会改变并且不能进行硬编码

def getValue(col):
   cfg = spark.sql("select value from config_table where key='"+str(col)+"'")
   value = cfg.collect()[0][0]
   return value

main():
   df_output = df.withColumn('derived',getValue('key'))
pyspark apache-spark-sql
1个回答
0
投票

这似乎是一个使用连接的机会。

左连接示例

如果您的输入

df
定义了查找键,您可以使用左连接将派生值应用到定义该查找键的行

config_df = spark.sql("SELECT key, value from config_table")

df_output = df.join(config_df, df.col_to_fetch_the_value=config_df.key "left").withColumnRenamed("value", "derived")

交叉连接示例

如果您的输入

df
未定义查找键,您可以使用如下所示的交叉联接将派生值应用于
df

中的所有行
input_key = "XX"
config_df = spark.sql("SELECT key, value from config_table").filter(F.col('key') == input_key)

df_output = df.crossjoin(config_df).withColumnRenamed("value", "derived")

getValue()函数

据我所知,到目前为止,还没有明确的案例表明使用

getValue
函数优于使用 join。 如果由于某种原因您必须使用
getValue
函数,则需要使用 PySpark UDF 函数将
getValue
函数传递到列定义中:https://spark.apache.org/docs /3.1.3/api/python/reference/api/pyspark.sql.functions.udf.html

© www.soinside.com 2019 - 2024. All rights reserved.