如何在 PySpark 列中搜索值序列

问题描述 投票:0回答:1

我有一个带有“时间”列和“值”列的数据框。示例:

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Example").getOrCreate()

v = [1,4,1,12,4,2,1,2,5,6,2,2,4,4,7,3,3,4,7,3]
t = range(0, len(v))

data = list(zip(t, v))
df = spark.createDataFrame(data, ["time", "values"])

df.show()
+----+------+
|time|values|
+----+------+
|   0|     1|
|   1|     4|
|   2|     1|
|   3|    12|
|   4|     4|
|   5|     2|
|   6|     1|
|   7|     2|
|   8|     5|
|   9|     6|
|  10|     2|
|  11|     2|
|  12|     4|
|  13|     4|
|  14|     7|
|  15|     3|
|  16|     3|
|  17|     4|
|  18|     7|
|  19|     3|
+----+------+

我有一系列值

seq = [4,7,3,3]

我需要在我的“值”列中找到where我可以找到这个值序列

seq
,即我的序列开始时的“时间”。在我的示例中,它将在“时间”==13

请考虑到我使用的数据帧超过7000万行,并且我拥有的序列大约有3000个元素,因此该问题需要计算资源效率

我尝试了暴力解决方法,例如通过搜索“值”中的第一个元素,生成一个带有滞后的新列,这使我能够在确认滞后第一项的行中搜索第二项,并且重复该操作,每次确认新值和先前值。不幸的是我的资源不够,spark 给出了 OOM (Out Of Merory) 错误,因此需要一种更有效的方法

我也尝试过砍数据,但我认为这个问题的一个好的答案比手动砍数据有趣得多

apache-spark pyspark search sequence
1个回答
0
投票

您可以使用带有collect_list函数的窗口来生成包含4个连续行的列表的列。那么答案如下。

from pyspark.sql.functions import collect_list, col, lit
from pyspark.sql.window import Window

seq = [4,7,3,3]

windowSpec = Window.orderBy("time").rangeBetween(0, len(seq) - 1)

v = [1,4,1,12,4,2,1,2,5,6,2,2,4,4,7,3,3,4,7,3]
t = range(0, len(v))

data = list(zip(t, v))
df = spark.createDataFrame(data, ["time", "values"]) \
    .withColumn("values_list", collect_list("values").over(windowSpec))

df = df.filter(col("values_list") == lit(seq))

df.show()

这会给你答案:

+----+------+------------+
|time|values| values_list|
+----+------+------------+
|  13|     4|[4, 7, 3, 3]|
+----+------+------------+
© www.soinside.com 2019 - 2024. All rights reserved.