在 Pyspark SQL 查询中使用列名作为参数

问题描述 投票:0回答:1

在一个特定的 DataFrame 上,我有一个 SQL 查询,我想使用它两次,一次生成每日结果,一次获取每月结果。 我不能只汇总每日信息,因为我有非附加指标,例如平均值、不同计数等。

SOURCE_BASIC_METRICS = """
select category,
    {date_field} as dt,
    count(distinct id) as unique_ids
from mytable
where 1=1
group by category,
    {date_field}
""";

mytable 有一个名为 event_date 和 event_month 的字段。 我想执行两次查询,就像这样。

dfBasicMetrics = spark.sql(SOURCE_BASIC_METRICS, date_field = "event_date");
dfBasicMetrics\
    .write\
    .parquet(DESTINATION_BASIC_METRICS + "/daily", mode = 'overwrite');

dfBasicMetrics = spark.sql(SOURCE_BASIC_METRICS, date_field = "event_month");
dfBasicMetrics\
    .write\
    .parquet(DESTINATION_BASIC_METRICS + "/monthly", mode = 'overwrite');

我是参数化查询的新手,我已经能够传递值,但还没有弄清楚如何传递实际的列名称。 我无法让参数标记方法起作用,就像这样......

spark.sql("select :date_field, etc etc", args={"date_field": "event_date"})

花括号中的命名参数与我所得到的最接近。 希望使用任何一种方法都能获得任何帮助,以便能够使用实际的列名称作为变量。

谢谢!

python pyspark apache-spark-sql
1个回答
0
投票

要动态参数化 Pyspark SQL 查询中的列名称,您可以使用字符串格式将列名称直接注入到查询字符串中,然后再将其传递给spark.sql()

解决方案

SOURCE_BASIC_METRICS = """
SELECT category,
       {date_field} AS dt,
       COUNT(DISTINCT id) AS unique_ids
FROM mytable
GROUP BY category, {date_field}
"""

dfDailyMetrics = spark.sql(SOURCE_BASIC_METRICS.format(date_field="event_date"))
dfDailyMetrics.write.parquet(DESTINATION_BASIC_METRICS + "/daily", mode='overwrite')

dfMonthlyMetrics = spark.sql(SOURCE_BASIC_METRICS.format(date_field="event_month"))
dfMonthlyMetrics.write.parquet(DESTINATION_BASIC_METRICS + "/monthly", mode='overwrite')
© www.soinside.com 2019 - 2024. All rights reserved.