使用 Spark Dataframe API 对列中的特定字符进行计数

问题描述 投票:0回答:4

我在 Spark 数据框中有一列包含位

df
。这些列是字符串格式:

10001010000000100000000000000000
10001010000000100000000100000000

是否有一种简单有效的方法来创建新列

"no_of_ones"
使用数据框计算列的频率?使用 RDD,我可以
map(lambda x:x.count('1'))
(PySpark)。
另外,我如何检索包含这些位置的列表?

dataframe apache-spark pyspark apache-spark-sql count
4个回答
13
投票

我能想到的一种方法是删除所有零,然后计算字段的长度。

df.show
+--------------------+
|          bytestring|
+--------------------+
|10001010000000100...|
|10001010000000100...|
+--------------------+


df.withColumn("no_of_ones" , length(regexp_replace($"bytestring", "0", "")) ).show
+--------------------+----------+
|          bytestring|no_of_ones|
+--------------------+----------+
|10001010000000100...|         4|
|10001010000000100...|         5|
+--------------------+----------+

3
投票

一般来说,当你在 (py)spark SQL 的预定义函数中找不到你需要的东西时,你可以编写一个用户定义函数 (UDF) 来执行你想要的任何操作(参见 UDF)。

请注意,在您的情况下,编码良好的 udf 可能比 scala 或 java 中的正则表达式解决方案更快,因为您不需要实例化新字符串并编译正则表达式(for 循环即可)。然而,在 pyspark 中它可能会慢得多,因为在执行器上执行 python 代码总是会严重损害性能。


3
投票

由于它是二进制(0/1),所以上面的答案将起作用。感谢奥利的回答。

但以防万一您需要查找具有 . 的字符串中字符/数字/符号的出现次数。

例如:

Find '~' in a string "ABCDE~FGH~IJAK~123~$$$" 

寻求以下解决方案。

df.withColumn("no_of_ones" , length($"bytestring") - length(regexp_replace($"bytestring", "~", "")) ).show

2
投票

对于 count,请使用

regexp_count
:

  • 星火3.5+
    F.regexp_count('bytestring', F.lit(r'1'))
    
  • 星火3.4+
    F.expr(r"regexp_count(col_name, '1')")
    
    使用
    expr
    时,转义规范。使用
    \\
    的字符,例如
    F.expr(r"regexp_count(col_name, '\\+')")

count的完整示例:

from pyspark.sql import functions as F
df = spark.createDataFrame(
    [("10001010000000100000000000000000",),
     ("10001010000000100000000100000000",)],
    ["bytestring"])

df.withColumn('no_of_ones', F.expr(r"regexp_count(bytestring, '1')")).show()
# +--------------------+----------+
# |          bytestring|no_of_ones|
# +--------------------+----------+
# |10001010000000100...|         4|
# |10001010000000100...|         5|
# +--------------------+----------+

对于位置,在你的情况下,我可以建议一种更复杂的方法,涉及高阶函数

transform
filter

seq = "sequence(1, length(bytestring))"
located = F.expr(f"transform({seq}, x -> locate('1', bytestring, x))")
cleaned = F.filter(F.array_distinct(located), lambda x: x != 0)

df.withColumn('pos_of_ones', cleaned).show(truncate=0)
# +--------------------------------+-----------------+
# |bytestring                      |pos_of_ones      |
# +--------------------------------+-----------------+
# |10001010000000100000000000000000|[1, 5, 7, 15]    |
# |10001010000000100000000100000000|[1, 5, 7, 15, 24]|
# +--------------------------------+-----------------+
© www.soinside.com 2019 - 2024. All rights reserved.