我需要得到avg并计算给定的数据帧,并且需要从每个公司的Cassandra表值获得先前存储的avg和count。
然后需要计算平均值和计数并持续回到Cassandra表中。
我怎样才能为每家公司做到这一点?
我有两个数据帧模式如下
ingested_df
|-- company_id: string (nullable = true)
|-- max_dd: date (nullable = true)
|-- min_dd: date (nullable = true)
|-- mean: double (nullable = true)
|-- count: long (nullable = false)
cassandra_df
|-- company_id: string (nullable = true)
|-- max_dd: date (nullable = true)
|-- mean: double (nullable = true)
|-- count: long (nullable = false)
对于每个company_id,我需要存储“mean”和“count”并计算“new_mean”和“new_count”并存储回cassandra ...
即
new_mean = ( ingested_df.mean + cassandra_df.mean) / (ingested_df.count + cassandra_df.count)
new_count = (ingested_df.count + cassandra_df.count)
如何为每家公司做到这一点?
第二次 :
当我尝试下面加入以上提到的相同逻辑时
val resultDf = cassandra_df.join(ingested_df ,
( cassandra_df("company_id") === ingested_df ("company_id") )
( ingested_df ("min_dd") > cassandra_df("max_dd") )
, "left")
这是抛出错误,如下所示:org.apache.spark.sql.AnalysisException:引用'cassandra_df'不明确,可能是:company_id,company_id。;在org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:213)
这有什么不对?
请尝试以下方法:
import spark.implicits._
val ingested_df = Seq(("1", "10", "3")).toDF("company_id", "mean", "count")
val cassandra_df = Seq(("1", "123123", "20", "10")).toDF("company_id", "max_dd", "mean", "count")
val preparedIngestedDf = ingested_df.select("company_id", "mean", "count")
val resultDf = cassandra_df.join(preparedIngestedDf, Seq("company_id"), "left")
.withColumn("new_mean", (ingested_df("mean") + cassandra_df("mean")) / (ingested_df("count") + cassandra_df("count")))
.withColumn("new_count", ingested_df("count") + cassandra_df("count"))
.select(
col("company_id"),
col("max_dd"),
col("new_mean").as("mean"),
col("new_count").as("new_count")
)