如何将 Dataframe 类型的函数参数传递给 SparkSQL 查询

问题描述 投票:0回答:1

我有一个包含在函数内的 Spark.sql 查询。我想向查询传递一个函数参数,该参数是一个数据帧,但出现一些错误。谁能看看我是否做错了什么?

功能:

1  def my_function(df_table: DataFrame) -> DataFrame:
2 
3    sql_query = f"""
4    SELECT DISTINCT dt.CountryId,
5    Cast(RIGHT(dt.RegionIdentifier, 2) as Integer) as RegionID
6    FROM {df_table} dt 
7    WHERE dt.CountryCode = 23
8    """
9
10   df = spark.sql(sql_query)
11   return df

这就是我在笔记本中的称呼:

df_table = spark.table('path_to_table/_location/')
my_function(df_table)

我收到的错误消息:

[PARSE_SYNTAX_ERROR] Syntax error at or near '['. SQLSTATE: 42601

如果我删除 LINE-6 上的

{df_table}
,并将表的名称硬编码,它就可以工作。有没有办法将表名作为 Dataframe 传递并作为参数传递?

当我打印 sql_query 时,它显示:

SELECT DISTINCT dt.CountryId,
Cast(RIGHT(dt.RegionIdentifier, 2) as Integer) as RegionID
FROM DataFrame[CountryId: bigint, RegionID: int, RegionIdentifier: string, TimeOf: timestamp] dt
WHERE dt.CountryCode = 23
python pyspark apache-spark-sql
1个回答
0
投票

您的 SQL 语句需要表的名称 - 您不能使用 DataFrame 对象来代替,它们不等效。 DataFrame 不一定有名称,因为它可能是您加载的 DataFrame 上的某些操作的结果。 如果您希望此函数始终接受表作为参数,则只需传递名称(字符串)即可。 如果您期望有时它可能会得到一个作为其他操作结果的 DataFrame,那么您无法将您的选择构建为 SQL 语句,但您可以通过应用 pyspark 函数来添加它,就像那样

 import pyspark.sql.functions as F

 df = df_table.where(F.col("CountryCode") == 23).\
      select(F.countDistinct(F.col("CountryId")), 
             F.right(F.col("RegionIdentifier"),2).\
                        cast("integer").alias("RegionID"))
© www.soinside.com 2019 - 2024. All rights reserved.