我有一个带有“时间”列和“值”列的数据框。示例:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Example").getOrCreate()
v = [1,4,1,12,4,2,1,2,5,6,2,2,4,4,7,3,3,4,7,3]
t = range(0, len(v))
data = list(zip(t, v))
df = spark.createDataFrame(data, ["time", "values"])
df.show()
+----+------+
|time|values|
+----+------+
| 0| 1|
| 1| 4|
| 2| 1|
| 3| 12|
| 4| 4|
| 5| 2|
| 6| 1|
| 7| 2|
| 8| 5|
| 9| 6|
| 10| 2|
| 11| 2|
| 12| 4|
| 13| 4|
| 14| 7|
| 15| 3|
| 16| 3|
| 17| 4|
| 18| 7|
| 19| 3|
+----+------+
我有一系列值
seq = [4,7,3,3]
我需要在我的“值”列中找到where我可以找到这个值序列
seq
,即我的序列开始时的“时间”。在我的示例中,它将在“时间”==13
请考虑到我使用的数据帧超过7000万行,并且我拥有的序列大约有3000个元素,因此该问题需要计算资源效率
我尝试了暴力解决方法,例如通过搜索“值”中的第一个元素,生成一个带有滞后的新列,这使我能够在确认滞后第一项的行中搜索第二项,并且重复该操作,每次确认新值和先前值。不幸的是我的资源不够,spark 给出了 OOM (Out Of Merory) 错误,因此需要一种更有效的方法
我也尝试过砍数据,但我认为这个问题的一个好的答案比手动砍数据有趣得多
您可以使用带有collect_list函数的窗口来生成包含4个连续行的列表的列。那么答案如下。
from pyspark.sql.functions import collect_list, col, lit
from pyspark.sql.window import Window
seq = [4,7,3,3]
windowSpec = Window.orderBy("time").rangeBetween(0, len(seq) - 1)
v = [1,4,1,12,4,2,1,2,5,6,2,2,4,4,7,3,3,4,7,3]
t = range(0, len(v))
data = list(zip(t, v))
df = spark.createDataFrame(data, ["time", "values"]) \
.withColumn("values_list", collect_list("values").over(windowSpec))
df = df.filter(col("values_list") == lit(seq))
df.show()
这会给你答案:
+----+------+------------+
|time|values| values_list|
+----+------+------------+
| 13| 4|[4, 7, 3, 3]|
+----+------+------------+