使用 pyspark 将数据提取到独立文件中以解决:Spark 缓冲区持有者大小限制问题

问题描述 投票:0回答:1

问题

我遇到了与 这个相同的问题:Spark 缓冲区持有者大小限制问题

我的代码是这样的:

# Calculate the statistics
stats = df.groupBy("EventType").agg(
    size(collect_set("Parameters")).alias("ParameterLength"),
    collect_list("Parameters").alias("Parameters"),
    (count("*") / df.count() * 100).alias("Frequency"),
)

期待

解决方案1

我已经知道大多数

Parameters
都很短,只有
EventType
A
并且
B
有大量参数,我在想有没有办法跳过
EventType
==“A”或“B” " 首先,输出一个汇总表,然后将
EventType
== "A" 或 "B" 输出到独立文件或其他文件。

我想要的是这样的:

EventType   ParameterLength Frequency   Parameters
TimeChanged 0               0.799471    []
Alarm       40              71.643145   [ele1,ele2,...ele40]
...
A           9999999             12.12       None--> file generated
B           30031               5.21        None--> file generated

解决方案2

获取“A”和“B”等溢出列的前 100 个参数。或者当 element > 100 时,进行切片,否则进行正常操作。

解决方案3

Parameters
数组包含许多{“Key”:XXX,“Value”:YYY}对,有没有办法通过将它们更改为spark中的(XXX,YYY)或
StructType
这样的元组来减少大小?

dataframe apache-spark pyspark apache-spark-sql bigdata
1个回答
0
投票

我使用了解决方案2,这是我的代码:

stats = df.groupBy("EventType").agg(
    size(collect_set("Parameters")).alias("ParameterLength"),
    when(
        size(collect_set("Parameters")) > 1000,
        slice(collect_set("Parameters"), 1, 1000).alias("Parameters"),
    )
    .otherwise(collect_set("Parameters"))
    .alias("Parameters"),
    (count("*") / df.count() * 100).alias("Frequency"),
)
© www.soinside.com 2019 - 2024. All rights reserved.