如何根据存储在Cassandra中的结果使用spark为多家公司执行累积平均值?

问题描述 投票:0回答:1

我需要得到avg并计算给定的数据帧,并且需要从每个公司的Cassandra表值获得先前存储的avg和count。

然后需要计算平均值和计数并持续回到Cassandra表中。

我怎样才能为每家公司做到这一点?

我有两个数据帧模式如下

ingested_df
 |-- company_id: string (nullable = true)
 |-- max_dd: date (nullable = true)
 |-- min_dd: date (nullable = true)
 |-- mean: double (nullable = true)
 |-- count: long (nullable = false)

cassandra_df 
 |-- company_id: string (nullable = true)
 |-- max_dd: date (nullable = true)
 |-- mean: double (nullable = true)
 |-- count: long (nullable = false)

对于每个company_id,我需要存储“mean”和“count”并计算“new_mean”和“new_count”并存储回cassandra ...

    new_mean = ( ingested_df.mean  + cassandra_df.mean) / (ingested_df.count + cassandra_df.count)

   new_count  = (ingested_df.count + cassandra_df.count)

如何为每家公司做到这一点?

第二次 :

当我尝试下面加入以上提到的相同逻辑时

 val resultDf = cassandra_df.join(ingested_df , 
                            ( cassandra_df("company_id") === ingested_df ("company_id") )
                            ( ingested_df ("min_dd") > cassandra_df("max_dd") )
                        , "left")

这是抛出错误,如下所示:org.apache.spark.sql.AnalysisException:引用'cassandra_df'不明确,可能是:company_id,company_id。;在org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:213)

这有什么不对?

apache-spark apache-spark-sql datastax databricks
1个回答
1
投票

请尝试以下方法:

import spark.implicits._

val ingested_df = Seq(("1", "10", "3")).toDF("company_id", "mean", "count")
val cassandra_df = Seq(("1", "123123", "20", "10")).toDF("company_id", "max_dd", "mean", "count")

val preparedIngestedDf = ingested_df.select("company_id", "mean", "count")

val resultDf = cassandra_df.join(preparedIngestedDf, Seq("company_id"), "left")
  .withColumn("new_mean", (ingested_df("mean") + cassandra_df("mean")) / (ingested_df("count") + cassandra_df("count")))
  .withColumn("new_count", ingested_df("count") + cassandra_df("count"))
  .select(
    col("company_id"),
    col("max_dd"),
    col("new_mean").as("mean"),
    col("new_count").as("new_count")
  )
© www.soinside.com 2019 - 2024. All rights reserved.