我有一个 Delta Lake 表,其中包含时间列和计数(int)列。数据帧行需要合并,以便生成的数据帧应具有按 2 天间隔分组的行。 2 天的时间差应始终是其他行合并到的第一行的时间戳。 一旦时间戳超过172800秒,就应将其作为下一次分组的新开始时间参考,依此类推。 应将计数添加到分组(合并)行。
例如,原始数据帧具有以下时间戳:
**time, count**
2019-02-18 11:03:55, 500
2019-02-18 11:06:18, 30
2019-02-18 11:07:58, 20
2019-02-18 11:07:58, 12
2019-02-18 11:08:38, 8
2019-02-18 11:10:29, 2
2019-02-20 11:09:12, 25
2019-02-20 11:10:10, 10
2019-04-02 10:10:10, 1
2019-04-05 10:10:10, 2
2019-04-09 10:10:09, 4
2019-04-11 10:10:30, 6
2019-04-13 10:10:10, 3
2019-04-16 10:10:10, 5
2019-04-19 10:10:10, 7
2019-04-21 10:10:10, 8
预期结果是:
**time => count_sum**
2019-02-18 11:03:55 => (500 + 30 + 20 + 12 + 8 + 2) = 572
2019-02-20 11:09:12 => (25 + 10) = 35
2019-04-02 10:10:10 => 1
2019-04-05 10:10:10 => 2
2019-04-09 10:10:09 => 4
2019-04-11 10:10:30 => (6+3) = 9
2019-04-16 10:10:10 => 5
2019-04-19 10:10:10 => (7+8) = 15
有什么想法可以解决这个问题吗?
在 scala 中:
您可以收集并迭代行,从而保持分组,而不是 DF 操作。使用您的时差逻辑并继续重复相同的过程。稍后您可以收集并转换为 DF。
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val data = Seq(
("2019-02-18 11:03:55", 500),
("2019-02-18 11:06:18", 30),
("2019-02-18 11:07:58", 20),
("2019-02-18 11:07:58", 12),
("2019-02-18 11:08:38", 8),
("2019-02-18 11:10:29", 2),
("2019-02-20 11:09:12", 25),
("2019-02-20 11:10:10", 10),
("2019-04-02 10:10:10", 1),
("2019-04-05 10:10:10", 2),
("2019-04-09 10:10:09", 4),
("2019-04-11 10:10:30", 6),
("2019-04-13 10:10:10", 3),
("2019-04-16 10:10:10", 5),
("2019-04-19 10:10:10", 7),
("2019-04-21 10:10:10", 8)
)
import spark.implicits._
val df = data.toDF("time", "count")
val dfWithTimestamp = df.withColumn("time", unix_timestamp(col("time"), "yyyy-MM-dd HH:mm:ss").cast("timestamp"))
val sortedDF = dfWithTimestamp.orderBy("time")
var currentGroupStartTime = sortedDF.first().getTimestamp(0)
var currentCountSum = 0
var groupResults = Seq.empty[(String, Int)]
sortedDF.collect().foreach { row =>
val rowTime = row.getTimestamp(0)
val rowCount = row.getInt(1)
if (rowTime.getTime - currentGroupStartTime.getTime <= 172800000) {
currentCountSum += rowCount
} else {
groupResults :+= (currentGroupStartTime.toString, currentCountSum)
currentGroupStartTime = rowTime
currentCountSum = rowCount
}
}
groupResults :+= (currentGroupStartTime.toString, currentCountSum)
val resultDF = spark.createDataFrame(groupResults).toDF("time", "count_sum")
resultDF.show(false)
+--------------------+---------+ |时间|count_sum| +--------------------+---------+ |2019-02-18 11:03:55.0|572 | |2019-02-20 11:09:12.0|35 | |2019-04-02 10:10:10.0|1 | |2019-04-05 10:10:10.0|2 | |2019-04-09 10:10:09.0|4 | |2019-04-11 10:10:30.0|9 | |2019-04-16 10:10:10.0|5 | |2019-04-19 10:10:10.0|15 | +--------------------+---------+