我遇到了与 这个相同的问题:Spark 缓冲区持有者大小限制问题。
我的代码是这样的:
# Calculate the statistics
stats = df.groupBy("EventType").agg(
size(collect_set("Parameters")).alias("ParameterLength"),
collect_list("Parameters").alias("Parameters"),
(count("*") / df.count() * 100).alias("Frequency"),
)
我已经知道大多数
Parameters
都很短,只有EventType
是A
并且B
有大量参数,我在想有没有办法跳过EventType
==“A”或“B” " 首先,输出一个汇总表,然后将 EventType
== "A" 或 "B" 输出到独立文件或其他文件。
我想要的是这样的:
EventType ParameterLength Frequency Parameters
TimeChanged 0 0.799471 []
Alarm 40 71.643145 [ele1,ele2,...ele40]
...
A 9999999 12.12 None--> file generated
B 30031 5.21 None--> file generated
获取“A”和“B”等溢出列的前 100 个参数。或者当 element > 100 时,进行切片,否则进行正常操作。
Parameters
数组包含许多{“Key”:XXX,“Value”:YYY}对,有没有办法通过将它们更改为spark中的(XXX,YYY)或StructType
这样的元组来减少大小?
我使用了解决方案2,这是我的代码:
stats = df.groupBy("EventType").agg(
size(collect_set("Parameters")).alias("ParameterLength"),
when(
size(collect_set("Parameters")) > 1000,
slice(collect_set("Parameters"), 1, 1000).alias("Parameters"),
)
.otherwise(collect_set("Parameters"))
.alias("Parameters"),
(count("*") / df.count() * 100).alias("Frequency"),
)