该虚拟数据表示具有测量周期的设备。一个测量周期从“Type”Int到Int。
我想知道的是f.e.每个测量周期内的最后一个错误(条件会变得更复杂)。
我已经找到了解决方案。我真正想知道的是,是否有更简单/更有效的方法来计算它。
示例数据集
val df_orig = spark.sparkContext.parallelize(Seq(
("Init", 1, 17, "I"),
("TypeA", 2, 17, "W"),
("TypeA", 3, 17, "E"),
("TypeA", 4, 17, "W"),
("TypeA", 5, 17, "E"),
("TypeA", 6, 17, "W"),
("Init", 7, 12, "I"),
("TypeB", 8, 12, "W"),
("TypeB", 9, 12, "E"),
("TypeB", 10, 12, "W"),
("TypeB", 11, 12, "W"),
("TypeB", 12, 12, "E"),
("TypeB", 13, 12, "E")
)).toDF("Type", "rn", "X_ChannelC", "Error_Type")
以下代码代表我的解决方案。
val fillWindow = Window.partitionBy().orderBy($"rn").rowsBetween(Window.unboundedPreceding, 0)
//create window
val df_with_window = df_orig.withColumn("window_flag", when($"Type".contains("Init"), 1).otherwise(null))
.withColumn("window_filled", sum($"window_flag").over(fillWindow))
val window = Window.partitionBy("window_filled").orderBy($"rn").rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
//calulate last entry
val df_new = df_with_window.withColumn("is_relevant", when($"Error_Type".contains("E"), $"rn").otherwise(null))
.withColumn("last", last($"is_relevant", true).over(window))
.withColumn("pass", when($"last" === $"is_relevant", "Fail").otherwise(null))
df_new.show()
结果:
+-----+---+----------+----------+-----------+-------------+-----------+----+--------+
| Type| rn|X_ChannelC|Error_Type|window_flag|window_filled|is_relevant|last| pass|
+-----+---+----------+----------+-----------+-------------+-----------+----+--------+
| Init| 1| 17| I| 1| 1| null| 5| null|
|TypeA| 2| 17| W| null| 1| null| 5| null|
|TypeA| 3| 17| E| null| 1| 3| 5| null|
|TypeA| 4| 17| W| null| 1| null| 5| null|
|TypeA| 5| 17| E| null| 1| 5| 5|This one|
|TypeA| 6| 17| W| null| 1| null| 5| null|
| Init| 7| 12| I| 1| 2| null| 13| null|
|TypeB| 8| 12| W| null| 2| null| 13| null|
|TypeB| 9| 12| E| null| 2| 9| 13| null|
|TypeB| 10| 12| W| null| 2| null| 13| null|
|TypeB| 11| 12| W| null| 2| null| 13| null|
|TypeB| 12| 12| E| null| 2| 12| 13| null|
|TypeB| 13| 12| E| null| 2| 13| 13|This one|
+-----+---+----------+----------+-----------+-------------+-----------+----+--------+
不确定这是否更有效(仍然使用2个窗口函数,但有点短):
val df_new = df_orig
.withColumn("measurement", sum(when($"Type"==="Init",1)).over(Window.orderBy($"rn")))
.withColumn("pass", $"rn"===max(when($"Error_Type"==="E",$"rn")).over(Window.partitionBy($"measurement")))
.show()
+-----+---+----------+----------+-----------+-----+
| Type| rn|X_ChannelC|Error_Type|measurement| pass|
+-----+---+----------+----------+-----------+-----+
| Init| 1| 17| I| 1|false|
|TypeA| 2| 17| W| 1|false|
|TypeA| 3| 17| E| 1|false|
|TypeA| 4| 17| W| 1|false|
|TypeA| 5| 17| E| 1| true|
|TypeA| 6| 17| W| 1|false|
| Init| 7| 12| I| 2|false|
|TypeB| 8| 12| W| 2|false|
|TypeB| 9| 12| E| 2|false|
|TypeB| 10| 12| W| 2|false|
|TypeB| 11| 12| W| 2|false|
|TypeB| 12| 12| E| 2|false|
|TypeB| 13| 12| E| 2| true|
+-----+---+----------+----------+-----------+-----+