如果状态列大于阈值,如何获取 Spark scala 数据框中的第一行

问题描述 投票:0回答:1

我有一个数据框,如下所示。我每小时从 HBase 表加载一次原始数据。如果状态等于 1 连续超过 10 分钟,那么我需要每小时批量取第一行......类似地,我也需要对其他 ID 执行此操作。

身份证 时间戳 状态
1 2021-01-01 10:00:00 1
1 2021-01-01 10:01:06 1
1 2021-01-01 10:02:18 1
1 2021-01-01 10:03:24 1
1 2021-01-01 10:04:30 1
1 2021-01-01 10:05:36 1
1 2021-01-01 11:06:00 1
1 2021-01-01 11:07:06 1
1 2021-01-01 11:08:12 1
1 2021-01-01 11:09:24 1
1 2021-01-01 11:10:30 1
1 2021-01-01 11:11:36 1


身份证 时间戳 状态
1 2021-01-01 10:00:00 1



身份证 时间戳 状态
1 2021-01-01 10:00:00 1
1 2021-01-01 10:01:06 1
1 2021-01-01 10:02:18 1
1 2021-01-01 10:03:24 1
1 2021-01-01 10:04:30 1
1 2021-01-01 10:05:36 2
1 2021-01-01 11:06:00 3
1 2021-01-01 11:07:06 1
1 2021-01-01 11:08:12 1
1 2021-01-01 11:09:24 1
1 2021-01-01 11:10:30 1
1 2021-01-01 11:11:36 1
dataframe scala apache-spark apache-spark-sql

我认为这看起来像孤岛和间隙 SQL 问题(并且很确定有更干净、更惯用的解决方案):

//build dataframe
val data = Seq((1,"2021-01-01T10:00:00",1),(1,"2021-01-01T10:01:06",1),(1,"2021-01-01T10:02:18",1),(1,"2021-01-01T10:03:24",1),(1,"2021-01-01T10:04:30",1),(1,"2021-01-01T10:05:36",2),(1,"2021-01-01T11:06:00",3),(1,"2021-01-01T11:07:06",1),(1,"2021-01-01T11:08:12",1),(1,"2021-01-01T11:09:24",1),(1,"2021-01-01T11:10:30",1),(1,"2021-01-01T11:11:36",1))

import org.apache.spark.sql.types._

val schema = StructType(List(StructField("id", IntegerType, true), StructField("timestamp", TimestampType, true), StructField("status", IntegerType, true)))

val df = spark.createDataFrame(data).toDF(schema.fieldNames: _*)

| id|          timestamp|status|
|  1|2021-01-01T10:00:00|     1|
|  1|2021-01-01T10:01:06|     1|
|  1|2021-01-01T10:02:18|     1|
|  1|2021-01-01T10:03:24|     1|
|  1|2021-01-01T10:04:30|     1|
|  1|2021-01-01T10:05:36|     2|
|  1|2021-01-01T11:06:00|     3|
|  1|2021-01-01T11:07:06|     1|
|  1|2021-01-01T11:08:12|     1|
|  1|2021-01-01T11:09:24|     1|
|  1|2021-01-01T11:10:30|     1|
|  1|2021-01-01T11:11:36|     1|

 |-- id: integer (nullable = false)
 |-- timestamp: string (nullable = true)
 |-- status: integer (nullable = false)
 // identify contigous regions by status then get the duration of that status
 df.withColumn("id_sequence", row_number().over(Window.partitionBy(col("id")).orderBy(col("timestamp").asc))).
 withColumn("id_and_status", row_number().over(Window.partitionBy(col("id"), col("status")).orderBy(col("timestamp").asc))).
 withColumn("group", col("id_sequence") - col("id_and_status")).
 withColumn("group_max", max(col("timestamp")).over(Window.partitionBy(col("id"),col("group"))).cast(TimestampType)).
 withColumn("group_min", min(col("timestamp")).over(Window.partitionBy(col("id"),col("group"))).cast(TimestampType)).
 withColumn("difference_in_mins", (unix_timestamp(col("group_max")) - unix_timestamp(col("group_min"))) / 60).
 orderBy(col("id"), col("timestamp").asc).
| id|          timestamp|status|id_sequence|id_and_status|group|          group_max|          group_min|difference_in_mins|
|  1|2021-01-01T10:00:00|     1|          1|            1|    0|2021-01-01 10:04:30|2021-01-01 10:00:00|               4.5|
|  1|2021-01-01T10:01:06|     1|          2|            2|    0|2021-01-01 10:04:30|2021-01-01 10:00:00|               4.5|
|  1|2021-01-01T10:02:18|     1|          3|            3|    0|2021-01-01 10:04:30|2021-01-01 10:00:00|               4.5|
|  1|2021-01-01T10:03:24|     1|          4|            4|    0|2021-01-01 10:04:30|2021-01-01 10:00:00|               4.5|
|  1|2021-01-01T10:04:30|     1|          5|            5|    0|2021-01-01 10:04:30|2021-01-01 10:00:00|               4.5|
|  1|2021-01-01T10:05:36|     2|          6|            1|    5|2021-01-01 10:05:36|2021-01-01 10:05:36|               0.0|
|  1|2021-01-01T11:06:00|     3|          7|            1|    6|2021-01-01 11:06:00|2021-01-01 11:06:00|               0.0|
|  1|2021-01-01T11:07:06|     1|          8|            6|    2|2021-01-01 11:11:36|2021-01-01 11:07:06|               4.5|
|  1|2021-01-01T11:08:12|     1|          9|            7|    2|2021-01-01 11:11:36|2021-01-01 11:07:06|               4.5|
|  1|2021-01-01T11:09:24|     1|         10|            8|    2|2021-01-01 11:11:36|2021-01-01 11:07:06|               4.5|
|  1|2021-01-01T11:10:30|     1|         11|            9|    2|2021-01-01 11:11:36|2021-01-01 11:07:06|               4.5|
|  1|2021-01-01T11:11:36|     1|         12|           10|    2|2021-01-01 11:11:36|2021-01-01 11:07:06|               4.5|

我没有再进一步,但我认为过滤“difference_in_mins”中大于 10 分钟的值然后获取第一条记录相对简单。


© www.soinside.com 2019 - 2024. All rights reserved.