我是pyspark的新手,正在尝试探索一些新的实现方法。我试图将数据框中的派生列作为参数传递给查询并返回值的函数
def getValue(col):
cfg = spark.sql("select value from config_table where key='"+str(col)+"'")
value = cfg.collect()[0][0]
return value
main():
df_output = df.withColumn('derived',getValue(col('col_to_fetch_the_value')))
上面的代码会导致语法错误。 预先感谢
尝试下面的代码对我有用,但不能使用,因为列值可能会改变并且不能进行硬编码
def getValue(col):
cfg = spark.sql("select value from config_table where key='"+str(col)+"'")
value = cfg.collect()[0][0]
return value
main():
df_output = df.withColumn('derived',getValue('key'))
这似乎是一个使用连接的机会。
左连接示例
如果您的输入
df
定义了查找键,您可以使用左连接将派生值应用到定义该查找键的行
config_df = spark.sql("SELECT key, value from config_table")
df_output = df.join(config_df, df.col_to_fetch_the_value=config_df.key "left").withColumnRenamed("value", "derived")
交叉连接示例
如果您的输入
df
未定义查找键,您可以使用如下所示的交叉联接将派生值应用于 df
中的所有行
input_key = "XX"
config_df = spark.sql("SELECT key, value from config_table").filter(F.col('key') == input_key)
df_output = df.crossjoin(config_df).withColumnRenamed("value", "derived")
getValue()函数
据我所知,到目前为止,还没有明确的案例表明使用
getValue
函数优于使用 join。
如果由于某种原因您必须使用 getValue
函数,则需要使用 PySpark UDF 函数将 getValue
函数传递到列定义中:https://spark.apache.org/docs /3.1.3/api/python/reference/api/pyspark.sql.functions.udf.html