我正在尝试将以下 sql 转换为 spark scala api。
select beg_dt, col1, col2, count(distinct col3)
from tbl
group by 1 ,2,3
union all
select beg_dt, col1, "xyx" as col2 ,count(distinct col3)
from tbl
group by 1,2
union all
select beg_dt, "abc" as col1, "xyx" as col2 , count(distinct col3)
from tbl
group by 1
基本上在每个维度级别聚合。
尝试构建一个列表并按如下方式对其进行迭代,但我无法在每次传递中添加静态列。
val dimCols: List[List[String]] = List(
List("col1", "col2" ),
List("col1", "'xyx' as col2"),
List("'abc' as col1","'xyx' as col2" )
)
val df = for (dimCol <- dimCols) yield { val x = myDF.groupBy( ( $"beg_dt" +: dimCol ).map(col): _* ).
agg(
countDistinct($"col4").as("count"),
)
x
}
关于如何干净地完成这项工作的任何指示?
为客户开发代码时,我尝试使用Spark SQL,因为大家都知道。只需将文件作为数据框读入并转换为临时视图。
https://spark.apache.org/docs/2.0.0-preview/sql-programming-guide.html
让我们相信您的数据位于已安装数据湖的青铜目录中,它被命名为“your_data.parquet”。
//
// 1 - read file and convert to spark session view
//
// read into df
path = "/mnt/lake/bronze/your_data.parquet"
val df1 = spark.read.format("parquet").load(path)
// convert to view
df1.createOrReplaceTempView("your_tbl")
下一步是执行您的 Spark SQL。结果作为数据框返回。
//
// 2 - execute sql and get df
//
// make sql string
val stmt = """
select beg_dt, col1, col2, count(distinct col3)
from your_tbl
group by 1 ,2,3
union all
select beg_dt, col1, "xyx" as col2 ,count(distinct col3)
from your_tbl
group by 1,2
union all
select beg_dt, "abc" as col1, "xyx" as col2 , count(distinct col3)
from your_tbl
group by 1
"""
// execute sql
val df2 = spark.sql(stmt)
// show results
display(df2)
简而言之,将文件转换为临时视图(表)并使用SQL。我再简单不过了!
这段代码可以转换成一个函数。重要的是返回结果的结构必须相同!
// My list of queries
val queries_lst = List(
"""
select variety, max(`sepal.length`) as max_length, max(`sepal.width`) as max_width
from iris
where variety = 'Versicolor'
group by variety
""",
"""
select variety, max(`sepal.length`) as max_length, max(`sepal.width`) as max_width
from iris
where variety = 'Virginica'
group by variety
""",
"""
select variety, max(`sepal.length`) as max_length, max(`sepal.width`) as max_width
from iris
where variety = 'Setosa'
group by variety
""",
)
// execute quries
val results_ary = queries_lst.map(spark.sql)
// empty data frame
var union_df = results(0)
// for each result, union to empty data frame
results_ary.slice(1, results.size).foreach((dataframe)=>{
union_df = union_df.union(dataframe)
})
// Show the union of results
union_df.show()
我将使用数据科学中著名的 iris 数据集。它已作为 Hive 表加载到数据块中。
我想按所有三个品种汇总并合并结果。当然,这可以通过删除 where 子句轻松完成。但是那样我就不会有 3 个单独的查询了。
map 函数准备要执行的语句。 slice 和 for each 允许您附加每次执行的结果。 .show() 语句强制执行整个代码。
简而言之,我生成的数据框包含三个独立查询的结果。
这里是 Spark Scala API 等价于你的 SQL 查询:
import org.apache.spark.sql.functions._
val resultDF = tbl.groupBy(col("beg_dt"), col("col1"), col("col2"))
.agg(countDistinct(col("col3")).alias("distinct_count"))
.select(col("beg_dt"), col("col1"), col("col2"), col("distinct_count"))
.union(
tbl.groupBy(col("beg_dt"), col("col1"))
.agg(countDistinct(col("col3")).alias("distinct_count"))
.select(col("beg_dt"), col("col1"), lit("xyx").as("col2"), col("distinct_count"))
)
.union(
tbl.groupBy(col("beg_dt"))
.agg(countDistinct(col("col3")).alias("distinct_count"))
.select(col("beg_dt"), lit("abc").as("col1"), lit("xyx").as("col2"), col("distinct_count"))
)
请注意,您需要将
tbl
替换为代码中实际 DataFrame 的名称。
你能尝试构建一个汇总来聚合不同级别吗?然后你需要从汇总中填充空值。
它可能看起来像这样:
import org.apache.spark.sql.functions.countDistinct
val resultDF = inputDF.rollup($"beg_dt", $"col_1", $"col_2").
agg(countDistinct($"col_3")).
na.fill(Map(
"col_1" -> "abc",
"col_2" -> "xyz"
))
抱歉格式化,我还在努力弄清楚所以
的相关文档