在一个特定的 DataFrame 上,我有一个 SQL 查询,我想使用它两次,一次生成每日结果,一次获取每月结果。 我不能只汇总每日信息,因为我有非附加指标,例如平均值、不同计数等。
SOURCE_BASIC_METRICS = """
select category,
{date_field} as dt,
count(distinct id) as unique_ids
from mytable
where 1=1
group by category,
{date_field}
""";
mytable 有一个名为 event_date 和 event_month 的字段。 我想执行两次查询,就像这样。
dfBasicMetrics = spark.sql(SOURCE_BASIC_METRICS, date_field = "event_date");
dfBasicMetrics\
.write\
.parquet(DESTINATION_BASIC_METRICS + "/daily", mode = 'overwrite');
dfBasicMetrics = spark.sql(SOURCE_BASIC_METRICS, date_field = "event_month");
dfBasicMetrics\
.write\
.parquet(DESTINATION_BASIC_METRICS + "/monthly", mode = 'overwrite');
我是参数化查询的新手,我已经能够传递值,但还没有弄清楚如何传递实际的列名称。 我无法让参数标记方法起作用,就像这样......
spark.sql("select :date_field, etc etc", args={"date_field": "event_date"})
花括号中的命名参数与我所得到的最接近。 希望使用任何一种方法都能获得任何帮助,以便能够使用实际的列名称作为变量。
谢谢!
要动态参数化 Pyspark SQL 查询中的列名称,您可以使用字符串格式将列名称直接注入到查询字符串中,然后再将其传递给spark.sql()
解决方案
SOURCE_BASIC_METRICS = """
SELECT category,
{date_field} AS dt,
COUNT(DISTINCT id) AS unique_ids
FROM mytable
GROUP BY category, {date_field}
"""
dfDailyMetrics = spark.sql(SOURCE_BASIC_METRICS.format(date_field="event_date"))
dfDailyMetrics.write.parquet(DESTINATION_BASIC_METRICS + "/daily", mode='overwrite')
dfMonthlyMetrics = spark.sql(SOURCE_BASIC_METRICS.format(date_field="event_month"))
dfMonthlyMetrics.write.parquet(DESTINATION_BASIC_METRICS + "/monthly", mode='overwrite')