这感觉像是一个基本问题,但我在这里。我有两个有序列表列,我想过滤它们的值组合,然后计算作为聚合出现的次数。这是我到目前为止所拥有的:
val df=(Seq(
("id1", Array(0,1,2), Array(2,3,4)),
("id2", Array(0,1,2), Array(2,3,4)),
("id3", Array(0,1,2), Array(2,3,4))
)).toDF("id", "feature1", "feature2")
首先,我将数组压缩在一起以一起处理它们:
val dfz = df.withColumn("zipped", arrays_zip($"feature1", $"feature2"))
然后我有这个用于聚合:
dfz.groupBy("query").agg(sum(($"zipped").filter(x => x._1 > 0 && x._2 == 0).size))
我收到错误
error: value filter is not a member of org.apache.spark.sql.ColumnName
所以我认为我没有正确过滤。感谢所有帮助!
我也尝试编写一个 UDF,但这最终也成为一个看起来不太简单的地图/案例/计数,所以我认为可能有更好的方法。
一位同事帮助我指出
zip_with
是我正在寻找的功能!问题是这样解决的:
df.groupBy("id").agg(sum(size(zip_with(df("feature1"), df("feature2"), (x, y) => (x > 0 && y == 0)))))