基于时间戳持续时间合并行的棘手 pyspark 转换

问题描述 投票:0回答:1

我有一个 Delta Lake 表,其中包含时间列和计数(int)列。数据帧行需要合并,以便生成的数据帧应具有按 2 天间隔分组的行。 2 天的时间差应始终是其他行合并到的第一行的时间戳。 一旦时间戳超过172800秒,就应将其作为下一次分组的新开始时间参考,依此类推。 应将计数添加到分组(合并)行。

例如,原始数据帧具有以下时间戳:

**time, count**
2019-02-18 11:03:55, 500
2019-02-18 11:06:18, 30
2019-02-18 11:07:58, 20
2019-02-18 11:07:58, 12
2019-02-18 11:08:38, 8
2019-02-18 11:10:29, 2
2019-02-20 11:09:12, 25
2019-02-20 11:10:10, 10
2019-04-02 10:10:10, 1
2019-04-05 10:10:10, 2
2019-04-09 10:10:09, 4
2019-04-11 10:10:30, 6
2019-04-13 10:10:10, 3
2019-04-16 10:10:10, 5
2019-04-19 10:10:10, 7
2019-04-21 10:10:10, 8

预期结果是:

**time => count_sum**
2019-02-18 11:03:55 => (500 + 30 + 20 + 12 + 8 + 2) = 572
2019-02-20 11:09:12 => (25 + 10) = 35
2019-04-02 10:10:10 => 1
2019-04-05 10:10:10 => 2
2019-04-09 10:10:09 => 4
2019-04-11 10:10:30 => (6+3) = 9
2019-04-16 10:10:10 => 5
2019-04-19 10:10:10 => (7+8) = 15

有什么想法可以解决这个问题吗?

dataframe apache-spark pyspark delta-lake
1个回答
0
投票

在 scala 中:

您可以收集并迭代行,从而保持分组,而不是 DF 操作。使用您的时差逻辑并继续重复相同的过程。稍后您可以收集并转换为 DF。

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

val data = Seq(
  ("2019-02-18 11:03:55", 500),
  ("2019-02-18 11:06:18", 30),
  ("2019-02-18 11:07:58", 20),
  ("2019-02-18 11:07:58", 12),
  ("2019-02-18 11:08:38", 8),
  ("2019-02-18 11:10:29", 2),
  ("2019-02-20 11:09:12", 25),
  ("2019-02-20 11:10:10", 10),
  ("2019-04-02 10:10:10", 1),
  ("2019-04-05 10:10:10", 2),
  ("2019-04-09 10:10:09", 4),
  ("2019-04-11 10:10:30", 6),
  ("2019-04-13 10:10:10", 3),
  ("2019-04-16 10:10:10", 5),
  ("2019-04-19 10:10:10", 7),
  ("2019-04-21 10:10:10", 8)
)

import spark.implicits._
val df = data.toDF("time", "count")

val dfWithTimestamp = df.withColumn("time", unix_timestamp(col("time"), "yyyy-MM-dd HH:mm:ss").cast("timestamp"))

val sortedDF = dfWithTimestamp.orderBy("time")


var currentGroupStartTime = sortedDF.first().getTimestamp(0) 
var currentCountSum = 0 
var groupResults = Seq.empty[(String, Int)] 

sortedDF.collect().foreach { row =>
  val rowTime = row.getTimestamp(0)
  val rowCount = row.getInt(1)
  
  if (rowTime.getTime - currentGroupStartTime.getTime <= 172800000) { 
    currentCountSum += rowCount 
  } else {
    groupResults :+= (currentGroupStartTime.toString, currentCountSum)
    
    currentGroupStartTime = rowTime
    currentCountSum = rowCount 
  }
}

groupResults :+= (currentGroupStartTime.toString, currentCountSum)

val resultDF = spark.createDataFrame(groupResults).toDF("time", "count_sum")

resultDF.show(false)

+--------------------+---------+ |时间|count_sum| +--------------------+---------+ |2019-02-18 11:03:55.0|572 | |2019-02-20 11:09:12.0|35 | |2019-04-02 10:10:10.0|1 | |2019-04-05 10:10:10.0|2 | |2019-04-09 10:10:09.0|4 | |2019-04-11 10:10:30.0|9 | |2019-04-16 10:10:10.0|5 | |2019-04-19 10:10:10.0|15 | +--------------------+---------+

© www.soinside.com 2019 - 2024. All rights reserved.