Spark Scala 聚合

问题描述 投票:0回答:4

我正在尝试将以下 sql 转换为 spark scala api。

select beg_dt, col1, col2, count(distinct col3)
from tbl
group by 1 ,2,3
union all 
select beg_dt, col1, "xyx" as col2 ,count(distinct col3)
from tbl
group by 1,2
union all
select beg_dt, "abc" as col1, "xyx" as col2 , count(distinct col3)
from tbl
group by 1

基本上在每个维度级别聚合。

尝试构建一个列表并按如下方式对其进行迭代,但我无法在每次传递中添加静态列。

val dimCols: List[List[String]] =  List(
  List("col1", "col2" ), 
  List("col1", "'xyx' as col2"), 
  List("'abc' as col1","'xyx' as col2"  )
) 

val df = for (dimCol <- dimCols) yield { val x = myDF.groupBy( ( $"beg_dt" +: dimCol ).map(col): _* ).
                                                         agg(
                                                             countDistinct($"col4").as("count"),
                            
                               )
                        x       
              }

关于如何干净地完成这项工作的任何指示?

dataframe scala apache-spark aggregate
4个回答
0
投票

为客户开发代码时,我尝试使用Spark SQL,因为大家都知道。只需将文件作为数据框读入并转换为临时视图。

https://spark.apache.org/docs/2.0.0-preview/sql-programming-guide.html

让我们相信您的数据位于已安装数据湖的青铜目录中,它被命名为“your_data.parquet”。

//
//  1 - read file and convert to spark session view
//

// read into df
path = "/mnt/lake/bronze/your_data.parquet"
val df1 = spark.read.format("parquet").load(path)

// convert to view
df1.createOrReplaceTempView("your_tbl")

下一步是执行您的 Spark SQL。结果作为数据框返回。

//
//  2 - execute sql and get df
//

// make sql string
val stmt = """
  select beg_dt, col1, col2, count(distinct col3)
  from your_tbl
  group by 1 ,2,3

  union all 

  select beg_dt, col1, "xyx" as col2 ,count(distinct col3)
  from your_tbl
  group by 1,2

  union all

  select beg_dt, "abc" as col1, "xyx" as col2 , count(distinct col3)
  from your_tbl
  group by 1
"""

// execute sql
val df2 = spark.sql(stmt)

// show results
display(df2)

简而言之,将文件转换为临时视图(表)并使用SQL。我再简单不过了!


0
投票

这段代码可以转换成一个函数。重要的是返回结果的结构必须相同!

// My list of queries
val queries_lst = List(
 
 """
 select variety,  max(`sepal.length`) as max_length, max(`sepal.width`) as max_width
 from iris 
 where variety = 'Versicolor'
 group by variety
 """,

 """
 select variety,  max(`sepal.length`) as max_length, max(`sepal.width`) as max_width
 from iris 
 where variety = 'Virginica'
 group by variety
 """,

 """
 select variety,  max(`sepal.length`) as max_length, max(`sepal.width`) as max_width
 from iris 
 where variety = 'Setosa'
 group by variety
 """,
  
)

// execute quries
val results_ary = queries_lst.map(spark.sql)

// empty data frame
var union_df = results(0)

// for each result, union to empty data frame
results_ary.slice(1, results.size).foreach((dataframe)=>{
  union_df = union_df.union(dataframe)
})

// Show the union of results
union_df.show()

我将使用数据科学中著名的 iris 数据集。它已作为 Hive 表加载到数据块中。

我想按所有三个品种汇总并合并结果。当然,这可以通过删除 where 子句轻松完成。但是那样我就不会有 3 个单独的查询了。

map 函数准备要执行的语句。 slice 和 for each 允许您附加每次执行的结果。 .show() 语句强制执行整个代码。

简而言之,我生成的数据框包含三个独立查询的结果。


0
投票

这里是 Spark Scala API 等价于你的 SQL 查询:

import org.apache.spark.sql.functions._

val resultDF = tbl.groupBy(col("beg_dt"), col("col1"), col("col2"))
  .agg(countDistinct(col("col3")).alias("distinct_count"))
  .select(col("beg_dt"), col("col1"), col("col2"), col("distinct_count"))

  .union(
    tbl.groupBy(col("beg_dt"), col("col1"))
      .agg(countDistinct(col("col3")).alias("distinct_count"))
      .select(col("beg_dt"), col("col1"), lit("xyx").as("col2"), col("distinct_count"))
  )

  .union(
    tbl.groupBy(col("beg_dt"))
      .agg(countDistinct(col("col3")).alias("distinct_count"))
      .select(col("beg_dt"), lit("abc").as("col1"), lit("xyx").as("col2"), col("distinct_count"))
  )

请注意,您需要将

tbl
替换为代码中实际 DataFrame 的名称。


-1
投票

你能尝试构建一个汇总来聚合不同级别吗?然后你需要从汇总中填充空值。

它可能看起来像这样:

import org.apache.spark.sql.functions.countDistinct

val resultDF = inputDF.rollup($"beg_dt", $"col_1", $"col_2"). 
                       agg(countDistinct($"col_3")).
                       na.fill(Map(
                          "col_1" -> "abc", 
                          "col_2" -> "xyz"
                       ))

抱歉格式化,我还在努力弄清楚所以

rollup的相关文档 fillna

的相关文档
© www.soinside.com 2019 - 2024. All rights reserved.